00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 extern "C" {
00044 #include <unistd.h>
00045 }
00046
00047 #include <iostream>
00048 #include <sstream>
00049
00050 #include "consmgr.h"
00051 #include "outlet.h"
00052 #include "guard.h"
00053
00054 #if SHOULD_NOT_BE_HERE
00055
00056 OutletList Outlet::outlet_list;
00057 #endif
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067 void *
00068 Outlet::writer_thread(void *instance)
00069 {
00070 Outlet *me = (Outlet *)instance;
00071
00072
00073
00074 try {
00075 me->writer();
00076 } catch (pthread_exception &pe) {
00077 cerr << "* Writer thread (" << pthread_self() << ") terminating; ";
00078 cerr << pe.what() << endl;
00079
00080 } catch (fireball &fb) {
00081 fb.bail(cerr);
00082 #ifndef __linux__
00083 } catch (...) {
00084 cerr << "writer thread (" << pthread_self() <<
00085 ") caught unknown exception" << endl;
00086
00087 #endif
00088 }
00089
00090 clog << instance << "::writer_wrapper (thread id " << pthread_self() << ") exiting..." << endl;
00091
00092 return(NULL);
00093 }
00094
00095
00096
00097
00098
00099
00100 Outlet::Outlet(ConnectorP &ourConnector)
00101 : Connection::Connection(ourConnector), tid_writer((pthread_t)NULL)
00102 {
00103 int ret;
00104
00105
00106 ret = pthread_cond_init(&data_avail, NULL);
00107 PTHREAD_CHECK_AND_THROW(ret, "cond_init(data_avail)");
00108
00109
00110 ret = pthread_mutex_init(&data_mutex, NULL);
00111 DMUTEX_LOG("Mutex " << &data_mutex << " initialized, in object " << this);
00112 PTHREAD_CHECK_AND_THROW(ret, "mutex_init(data_mutex)");
00113
00114
00115
00116
00117
00118 return;
00119 }
00120
00121 Outlet::~Outlet(void) throw()
00122 {
00123 int ret;
00124
00125 clog << "In Outlet::~Outlet (thread " << pthread_self() << ")" << endl;
00126
00127 #if 0
00128
00129 MUTEX_LOG("Outlet::~Outlet locking mutex " << &mutex);
00130 ret = pthread_mutex_lock(&mutex);
00131 PTHREAD_CHECK_AND_THROW(ret, "mutex_lock(mutex)");
00132 MUTEX_LOG("Mutex " << &mutex << " locked.");
00133
00134 #if 0
00135
00136 if (write_fd >= 0) {
00137 close(write_fd);
00138 write_fd = -1;
00139 }
00140 #endif
00141
00142 #warning "Need to figure out how to remove ourself from the other folkses recipient list"
00143 #if SHOULD_NOT_BE_HERE
00144
00145 Outlet::outlet_list.del_item(this);
00146 #endif
00147
00148
00149 MUTEX_LOG("Outlet::~Outlet unlocking mutex " << &mutex);
00150 ret = pthread_mutex_unlock(&mutex);
00151 PTHREAD_CHECK_AND_THROW(ret, "mutex_unlock(mutex)");
00152
00153 #endif
00154
00155
00156
00157
00158 pthread_yield();
00159 usleep(5000);
00160
00161 DMUTEX_LOG("Destroying data_mutex " << &data_mutex);
00162 ret = pthread_mutex_destroy(&data_mutex);
00163 DMUTEX_LOG("mutex_destroy(" << &data_mutex << ") returned " << ret);
00164 if (ret) {
00165 cerr << "Unable to destroy data_mutex " << &data_mutex << ": "
00166 << strerror(ret) << endl;
00167 mutex_info(std::cerr,&data_mutex);
00168 }
00169
00170 return;
00171 }
00172
00173 void
00174 Outlet::writer(void)
00175 {
00176 int ret;
00177
00178 DMUTEX_LOG("locking data mutex " << &data_mutex);
00179 Guard locker(&data_mutex);
00180 DMUTEX_LOG("data mutex " << &data_mutex << " locked.");
00181
00182 clog << "Writer thread..." << endl;
00183
00184
00185 this->write_data();
00186
00187
00188
00189
00190 clog << "Writer (" << this->get_id() << "; " << pthread_self() << ") waiting for queued data..." << endl;
00191
00192 do {
00193 DMUTEX_LOG("cond_wait'ing with " << &data_mutex);
00194 ret = pthread_cond_wait(&data_avail, &data_mutex);
00195 PTHREAD_CHECK_AND_THROW(ret, "cond_wait(data_avail, data_mutex)");
00196
00197 DMUTEX_LOG("back from cond_wait with " << &data_mutex << " locked");
00198
00199
00200
00201 if (! data.empty()) {
00202 clog << "Writer (" << this->get_id() << "; " << pthread_self() << ") writing data..." << endl;
00203 this->write_data();
00204 } else {
00205
00206
00207 if (please_exit)
00208 break;
00209 }
00210 } while (ret == 0);
00211
00212 clog << "Writer (" << this->get_id() << "; " << pthread_self() << ") exiting..." << endl;
00213 DMUTEX_LOG("Unlocking mutex " << &data_mutex << " upon method end");
00214 return;
00215 }
00216
00217
00218
00219
00220
00221 void
00222 Outlet::write_data()
00223 {
00224 int ret;
00225 queue_iterator q_iter;
00226
00227 for (q_iter = data.begin() ; q_iter != data.end() ;
00228 q_iter = data.erase(q_iter)) {
00229
00230
00231
00232 Data &d = dynamic_cast<Data&>(*MessageP::GetPointer(*q_iter));
00233
00234 #if 0
00235 set<Decoder*>::iterator d_iter = decoders.begin();
00236 Decoder *d = *d_iter;
00237
00238 if (d_iter == decoders.end()) {
00239 clog << "** Empty decoders set: d == " << d << endl;
00240 } else {
00241
00242 b2 = b;
00243 b = (*d_iter)->decode(b2);
00244 b->inc_ref();
00245 }
00246 #endif
00247
00248
00249 DMUTEX_LOG("Unlocking " << &data_mutex);
00250 ret = pthread_mutex_unlock(&data_mutex);
00251 PTHREAD_CHECK_AND_THROW(ret, "mutex_unlock(data_mutex)");
00252
00253 clog << "gonna write Data " << d << " (len " << d.len() << ")" << endl;
00254
00255
00256 try {
00257 ret = base->write(d);
00258 clog << "Outlet::write of Data " << d << " returned " << ret << endl;
00259 if (ret < 0)
00260 throw(fireball(19, "write failed", true));
00261 } catch (CMStateChange &e) {
00262 stop_engine();
00263 note_disconnect();
00264 } catch (...) {
00265 throw;
00266 }
00267
00268 clog << "written Data " << d << "(len " << d.len() << ")" << endl;
00269
00270
00271 DMUTEX_LOG("Locking " << &data_mutex << " for next for()");
00272 ret = pthread_mutex_lock(&data_mutex);
00273 PTHREAD_CHECK_AND_THROW(ret, "mutex_lock(data_mutex)");
00274 }
00275
00276 DMUTEX_LOG("write_data returning, leaving " << &data_mutex << " locked");
00277
00278 return;
00279 }
00280
00281 void
00282 Outlet::stop_engine(void)
00283 {
00284 int ret;
00285
00286
00287
00288
00289
00290 if (tid_writer == 0) {
00291 clog << "In Outlet::stop_engine(); tid_writer == 0" << endl;
00292 return;
00293 }
00294
00295 clog << "In Outlet::stop_engine() (" << this->get_id() << "; "
00296 << tid_writer << ")" << endl;
00297
00298
00299
00300 if (tid_writer && !pthread_kill(tid_writer, 0)) {
00301 ret = pthread_cancel(tid_writer);
00302 PTHREAD_CHECK_AND_THROW(ret, "pthread_cancel(tid_writer)");
00303 ret = pthread_detach(tid_writer);
00304 PTHREAD_CHECK_AND_THROW(ret, "pthread_detach(tid_writer)");
00305 clog << "Canceled and detached writer thread (" << tid_writer
00306 << ")" << endl;
00307 tid_writer = (pthread_t)NULL;
00308 }
00309
00310 clog << "Outlet::stop_engine() successful, returning..." << endl;
00311
00312 return;
00313 }
00314
00315
00316 void
00317 Outlet::start_engine(void)
00318 {
00319 int ret;
00320
00321 ret = pthread_create(&tid_writer, NULL, Outlet::writer_thread, this);
00322 PTHREAD_CHECK_AND_THROW(ret, "pthread_create(tid_writer)");
00323 clog << "Outlet writer wrapper thread created, id " << tid_writer << endl;
00324
00325 return;
00326 }
00327
00328 std::string
00329 Outlet::get_id(void) const
00330 {
00331 std::ostringstream id_str;
00332
00333 id_str << class_name() << " using " << base->get_id();
00334
00335 return(id_str.str());
00336 }
00337
00338 #if 0
00339
00340
00341 void
00342 Outlet::add_decoder(Decoder *d)
00343 {
00344
00345 clog << "Inserting Decoder " << d << endl;
00346
00347 decoders.insert(d);
00348
00349 clog << "Testing..." << endl;
00350
00351 set<Decoder*>::iterator d_iter = decoders.begin();
00352
00353 clog << "*d_iter is " << *d_iter << endl;
00354
00355 return;
00356 }
00357 #endif
00358
00370 void
00371 Outlet::queue_data(MessageP &dmsg)
00372 {
00373 int ret;
00374
00375 clog << "Trying to enqueue Message (type " << dmsg->name() << ")" << endl;
00376
00377
00378
00379 Data *d = dynamic_cast<Data*>(MessageP::GetPointer(dmsg));
00380
00381 if (d) {
00382 Guard locker(&data_mutex);
00383
00384
00385 data.push_back(dmsg);
00386
00387 clog << "Data queue'd and condition being broadcast on " << get_id() << endl;
00388 ret = pthread_cond_broadcast(&this->data_avail);
00389 PTHREAD_CHECK_AND_THROW(ret, "signal_broadcast(data_avail)");
00390 } else {
00391 clog << "Oops, Outlet::queue_data got a non-Data message..." << endl;
00392 }
00393
00394 return;
00395 }
00396