00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 extern "C" {
00040 #include <unistd.h>
00041 }
00042
00043 #include <iostream>
00044 #include <typeinfo>
00045
00046 #include "consmgr.h"
00047 #include "bidirconn.h"
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057 #if SHOULD_NOT_BE_HERE
00058
00059
00060
00061
00062
00063
00064
00065 void *
00066 BiDirConn::establish_wrapper(void *instance)
00067 {
00068 BiDirConn *me = static_cast<BiDirConn*>(instance);
00069
00070
00071 Outlet::establish(me);
00072
00073 me->establish();
00074
00075
00076 return(NULL);
00077 }
00078 #endif
00079
00080 void *
00081 BiDirConn::reader_thread(void *instance)
00082 {
00083 void *ret;
00084 BiDirConn *self = static_cast<BiDirConn*>(instance);
00085
00086
00087
00088 try {
00089 ret = self->reader();
00090 } catch (pthread_exception &pe) {
00091 cerr << "* reader thread (" << pthread_self() << ") terminating; ";
00092 cerr << pe.what() << endl;
00093 } catch (fireball &fb) {
00094 cerr << "* reader thread (" << pthread_self() << ") terminating; ";
00095 fb.bail(cerr);
00096 } catch (std::exception &e) {
00097 cerr << "reader thread (" << pthread_self() <<
00098 ") caught std::exception: " << e.what() << endl;
00099 } catch (string &e) {
00100 cerr << "reader thread (" << pthread_self() <<
00101 ") caught string exception: " << e << endl;
00102 } catch (char *e) {
00103 cerr << "reader thread (" << pthread_self() <<
00104 ") caught char* exception: " << e << endl;
00105 } catch (long l) {
00106 cerr << "reader thread (" << pthread_self() <<
00107 ") caught long exception: " << l << endl;
00108 } catch (unsigned long l) {
00109 cerr << "reader thread (" << pthread_self() <<
00110 ") caught unsigned long exception: " << l << endl;
00111 } catch (short l) {
00112 cerr << "reader thread (" << pthread_self() <<
00113 ") caught short exception: " << l << endl;
00114 } catch (unsigned short l) {
00115 cerr << "reader thread (" << pthread_self() <<
00116 ") caught unsigned short exception: " << l << endl;
00117 } catch (int l) {
00118 cerr << "reader thread (" << pthread_self() <<
00119 ") caught int exception: " << l << endl;
00120 } catch (unsigned int l) {
00121 cerr << "reader thread (" << pthread_self() <<
00122 ") caught unsigned int exception: " << l << endl;
00123 } catch (char l) {
00124 cerr << "reader thread (" << pthread_self() <<
00125 ") caught char exception: " << l << endl;
00126 } catch (unsigned char l) {
00127 cerr << "reader thread (" << pthread_self() <<
00128 ") caught unsigned char exception: " << l << endl;
00129 #ifndef __linux__
00130 } catch (...) {
00131
00132 cerr << "reader thread (" << pthread_self() <<
00133 ") caught unknown exception" << endl;
00134 #endif
00135 }
00136
00137 clog << instance << "::reader_thread (thread id " << pthread_self() << ") exiting..." << endl;
00138
00139 return(ret);
00140 }
00141
00142
00143
00144
00145
00146 BiDirConn::BiDirConn(ConnectorP &ourConnector)
00147 : Outlet::Outlet(ourConnector), tid_reader((pthread_t)NULL)
00148 {
00149
00150
00151
00152
00153 return;
00154 }
00155
00160 BiDirConn::~BiDirConn(void) throw()
00161 {
00162
00163
00164
00165 #if SHOULD_NOT_BE_HERE
00166 int ret;
00167
00168
00169 #if 0
00170 MUTEX_LOG("Checking the lock on " << &mutex);
00171 ret = pthread_mutex_trylock(&mutex);
00172 if (ret == EBUSY) {
00173 MUTEX_LOG(&mutex << " is already locked,"
00174 #ifdef __NetBSD__
00175 << " locked by thread " << pthread__id(mutex.ptm_owner) << ";"
00176 #endif
00177 << " my thread id is " << pthread_self());
00178
00179
00180
00181 MUTEX_LOG("Waiting for lock on " << &mutex);
00182 ret = pthread_mutex_lock(&mutex);
00183 }
00184 #else
00185 MUTEX_LOG("Locking " << &mutex);
00186 ret = pthread_mutex_lock(&mutex);
00187 #endif
00188 PTHREAD_CHECK_AND_THROW(ret, "mutex_lock(mutex)");
00189 MUTEX_LOG(&mutex << " is locked.");
00190
00191 #if 0
00192
00193 if ((is_connected()) && (read_fd >= 0)) {
00194 close(read_fd);
00195 read_fd = -1;
00196 }
00197 #endif
00198
00199 if (tid_reader) {
00200
00201
00202 clog << "In BiDirConn::~BiDirConn, joining tid_reader ("
00203 << tid_reader << ")" << endl;
00204 ret = pthread_join(tid_reader, NULL);
00205 clog << "BiDirConn::~BiDirConn, pthread_join of reader returned "
00206 << ret << endl;
00207 }
00208
00209
00210 ret = pthread_mutex_unlock(&mutex);
00211 PTHREAD_CHECK_AND_THROW(ret, "mutex_unlock(mutex)");
00212 MUTEX_LOG(&mutex << " is unlocked again.");
00213 #endif
00214
00215 clog << "BiDirConn::~BiDirConn returning" << endl;
00216 return;
00217 }
00218
00219
00220
00221
00222 #if SHOULD_NOT_BE_HERE
00223
00224
00225
00226 void
00227 BiDirConn::establish(void)
00228 {
00229 int ret;
00230
00231
00232 if (is_connected()) {
00233
00234
00235 clog << "** Creating the reader thread..." << endl;
00236 ret = pthread_create(&tid_reader, NULL, reader_wrapper, this);
00237 PTHREAD_CHECK_AND_THROW(ret, "pthread_create(tid_reader)");
00238 clog << "BiDirConn::establish created reader thread ("
00239 << tid_reader << ")" << endl;
00240 } else {
00241 clog << "** Bogon! \"" << get_id() << ";" << this <<
00242 "\" not connected!" << endl;
00243 return;
00244 }
00245
00246
00247
00248 MUTEX_LOG("Locking mutex " << &mutex);
00249 ret = pthread_mutex_lock(&mutex);
00250 PTHREAD_CHECK_AND_THROW(ret, "mutex_lock(mutex)");
00251 MUTEX_LOG("Mutex " << &mutex << " locked");
00252 try {
00253 while (is_connected(true)) {
00254
00255
00256
00257
00258 pthread_cleanup_push(cleanup_unlock, &mutex);
00259
00260 MUTEX_LOG("cond_wait'ing with " << &mutex);
00261 ret = pthread_cond_wait(&state_change, &mutex);
00262
00263 PTHREAD_CHECK_AND_THROW(ret, "cond_wait(state_change)");
00264
00265 pthread_cleanup_pop(0);
00266
00267 #if 0
00268
00269
00270
00271
00272
00273 clog << "BiDirConn(" << this << ")::establish(" << get_id()
00274 << "; " << pthread_self() << "): State has changed ";
00275 #else
00276 clog << "BiDirConn(" << this << ")::establish("
00277 << pthread_self() << "): State has changed ";
00278 #endif
00279
00280 if (is_connected(true)) {
00281
00282 clog << "(still connected)" << endl;
00283
00284
00285 clog << "BiDirConn::establish: got a control message?!?" << endl;
00286 }
00287 }
00288 } catch (...) {
00289 MUTEX_LOG("Unlocking " << &mutex);
00290 pthread_mutex_unlock(&mutex);
00291 throw;
00292 }
00293
00294 clog << "(disconnected; nuking threads)" << endl;
00295
00296
00297
00298 try {
00299 ret = pthread_mutex_unlock(&mutex);
00300 PTHREAD_CHECK_AND_THROW(ret, "mutex_unlock(mutex)");
00301 shutdown();
00302 } CATCH_STR_OR_DIE(72) {
00303 cerr << "Umm, unknown error at " << __FILE__ << ":"
00304 << (__LINE__ - 2) << endl;
00305 exit(73);
00306 }
00307
00308 return;
00309 }
00310 #endif
00311
00315 void
00316 BiDirConn::stop_engine(void)
00317 {
00318 int ret;
00319
00320 clog << "In BiDirConn::stop_engine() of (" << this->get_id() << ")" << endl;
00321
00322 clog << "(my thread_id is " << pthread_self() << ", gonna kill " << tid_reader << ")" << endl;
00323
00324 try {
00325
00326
00327
00328 if (base->connected())
00329 base->disconnect();
00330
00331
00332
00333
00334
00335 if (tid_reader && (pthread_kill(tid_reader, 0) == 0)) {
00336
00337
00338 clog << "Canceling reader thread (" << tid_reader << ")" << endl;
00339 ret = pthread_cancel(tid_reader);
00340 clog << "pthread_cancel of reader (" << tid_reader << ") returned " << ret << endl;
00341 if (ret == 0) {
00342 ret = pthread_detach(tid_reader);
00343 if (ret == EINVAL) {
00344
00345 clog << "stop_engine(" << this->get_id() << "): reader thread "
00346 << "pthread_detach() failed after pthread_cancel(). "
00347 << "Assuming adequate death." << endl;
00348 } else
00349 PTHREAD_CHECK_AND_THROW(ret, "detach(tid_reader)");
00350 clog << "BiDirConn::stop_engine() of (" << this->get_id()
00351 << "); canceled reader thread" << endl;
00352 } else if (ret != ESRCH)
00353 PTHREAD_CHECK_AND_THROW(ret, "cancel(tid_reader)");
00354 } else
00355 clog << "tid_reader is " << tid_reader << ", but kill returned 0" << endl;
00356 } catch (CMException &e) {
00357 clog << "Consmgr exception in BiDirConn::stop_engine(): " << e.what() << endl;
00358 } catch (...) {
00359 cerr << "BiDirConn produced unexpected exception!" << endl;
00360 }
00361
00362 clog << "BiDirConn::stop_engine() successful, calling Outlet::stop_engine()" << endl;
00363
00364 Outlet::stop_engine();
00365
00366 return;
00367
00368 }
00369
00370 void
00371 BiDirConn::start_engine(void)
00372 {
00373 int ret;
00374
00375 ret = pthread_create(&tid_reader, NULL, reader_thread, this);
00376 PTHREAD_CHECK_AND_THROW(ret, "pthread_create(tid_reader)");
00377
00378 clog << "** BiDirConn::reader() started up, thread id " << tid_reader
00379 << std::endl;
00380
00381
00382 Outlet::start_engine();
00383
00384 return;
00385 }
00386
00387 void *
00388 BiDirConn::reader()
00389 {
00390 int ret;
00391
00392 clog << "Reader thread..." << endl;
00393
00394 clog << "Reader (" << this->get_id() << "; " << pthread_self() << ") about to read..." << endl;
00395
00396 do {
00397 try {
00398
00399 Data *d = base->read();
00400
00401 clog << "read " << d->len() << " bytes (" << base->get_id()
00402 << "): " << *d << std::endl;
00403
00404
00405
00406
00407
00408
00409 process_data(d);
00410
00411
00412 if (d->len() > 0) {
00413 MessageP msg(new Data(*d));
00414
00415 clog << "Reader (" << this->get_id() << ") about to send message (" << msg << ") to all our recipients" << endl;
00416
00417 this->send_message(msg);
00418 }
00419 } catch (CMStateChange &e) {
00420 cerr << "Got a state change (disconnect?) from Connector::read()"
00421 << "; sending message to MasterController." << endl;
00422 note_disconnect();
00423
00424 break;
00425 } catch (...) {
00426 cerr << "Connector::read() failed within " << base->get_id()
00427 << ", rethrowing." << endl;
00428 throw;
00429 }
00430
00431
00432 clog << "Reader (" << this->get_id() << ") about to read..." << endl;
00433 } while (1);
00434 clog << "Reader: How'd I get here?" << endl;
00435
00436 #if SHOULD_NOT_BE_HERE
00437
00438
00439
00440 if (len < 0) {
00441 if ((len == -1) && (errno == EBADF)) {
00442 clog << "Reader (" << this->get_id() << ") read -1 (EBADF)" << endl;
00443
00444 } else
00445 throw(fireball(9, "BiDirConn::reader - read(2)", true));
00446 } else if (len == 0) {
00447 cerr << "EOF on BiDirConn::reader() (" << this->get_id() << "; " << pthread_self() << ")" << endl;
00448
00449
00450 this->disconnect();
00451 }
00452 #endif
00453
00454 clog << "Reader thread, exiting..." << endl;
00455
00456 return(NULL);
00457 }
00458
00462 void
00463 BiDirConn::process_data(Data *d)
00464 {
00465
00466
00467 clog << "BiDirConn::process_data(): we're " << get_id() << " (" << typeid(this).name() << "): Doing nothing" << endl;
00468
00469 return;
00470 }
00471
00472
00473
00474 void
00475 BiDirConn::add_encoder(Encoder *e)
00476 {
00477 encoders.insert(e);
00478
00479 return;
00480 }
00481
00482 std::string
00483 BiDirConn::get_id(void) const
00484 {
00485 return(string("BiDirConn above ")+base->get_id());
00486 }