00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include <typeinfo>
00042
00043 #include "consmgr.h"
00044 #include "mastercont.h"
00045 #include "connection.h"
00046 #include "outlet.h"
00047 #include "guard.h"
00048
00049
00050
00051
00052 Connection::Connection(ConnectorP &c) : base(c), please_exit(false)
00053 {
00054 int ret;
00055
00056
00057 ret = pthread_mutex_init(&mutex, NULL);
00058 PTHREAD_CHECK_AND_THROW(ret, "mutex_init(mutex)");
00059
00060 #if MAY_NOT_WANT_THIS
00061
00062 ret = pthread_cond_init(&state_change, NULL);
00063 PTHREAD_CHECK_AND_THROW(ret, "cond_init(state_change)");
00064 #endif
00065
00066 ret = pthread_create(&tid_manager, NULL, manager_thread, this);
00067 PTHREAD_CHECK_AND_THROW(ret, "pthread_create(manager)");
00068
00069 return;
00070 }
00071
00072 Connection::~Connection(void) throw()
00073 {
00074 int ret;
00075
00076
00077 ret = pthread_mutex_destroy(&mutex);
00078 if (ret != 0)
00079 cerr << "Error destroying Connection::mutex (" << &mutex << "): "
00080 << strerror(ret) << endl;
00081 #if MAY_NOT_WANT_THIS
00082 ret = pthread_cond_destroy(&state_change);
00083 if (ret != 0)
00084 cerr << "Error destroying Connection::state_change (" << &state_change
00085 << "): " << strerror(ret) << endl;
00086 #endif
00087 return;
00088 }
00089
00090
00091
00092
00093
00094
00095 void *
00096 Connection::manager_thread(void *instance)
00097 {
00098 Connection *me = reinterpret_cast<Connection*>(instance);
00099 me->manager();
00100 return(NULL);
00101 }
00102
00103
00104
00105
00106
00107
00108
00109 void
00110 Connection::manager(void)
00111 {
00112 clog << "Connection::manager() starting: thread id " << pthread_self()
00113 << endl;
00114 if (! base->connected()) {
00115 clog << base->get_id() << " not connected in "
00116 << "Connection::manager(). Unexpected!" << endl;
00117 throw(fireball(32, base->get_id() + " not connected!"));
00118 }
00119
00120
00121
00122
00123
00124
00125
00126 while (this->class_name().compare("Connection") == 0)
00127 usleep(1000);
00128
00129 clog << "Connection::manager() running" << endl;
00130
00131
00132
00133
00134
00135
00136
00137 start_engine();
00138
00139 manage_top:
00140 try {
00141 MUTEX_LOG("locking " << &mutex << " with Guard");
00142 Guard locker(&mutex);
00143
00144
00145 clog << "Our base object (" << typeid(*base).name() << " at "
00146 << base.GetPointer(base) << ") is: " << endl;
00147 clog << "\t" << base->get_id() << ", and it is ";
00148 clog << (base->connected() ? "" : "NOT ") << "connected." << endl;
00149 #if MAY_NOT_WANT_THIS
00150 conn_state = base->connected();
00151 clog << "Our connection state is " << conn_state << endl;
00152 #endif
00153
00154
00155 while (!please_exit) {
00156 MessageP msg = get_message();
00157
00158
00159
00160
00161
00162 Data *data = handle_message(msg);
00163 if (data) {
00164
00165
00166 MessageP msg2(data);
00167 if ((dynamic_cast<Outlet*>(this)))
00168 dynamic_cast<Outlet*>(this)->queue_data(msg2);
00169 }
00170 }
00171 #ifdef SHOULD_NOT_BE_HERE
00172 int ret;
00173 while (connected()) {
00174 MUTEX_LOG("cond_wait'ing with " << &mutex);
00175 ret = pthread_cond_wait(&state_change, &mutex);
00176 PTHREAD_CHECK_AND_THROW(ret, "cond_wait(state_change)");
00177
00178 if (base->connected()) {
00179
00180 clog << "(still connected)" << endl;
00181
00182
00183
00184
00185 clog << "Connection::manager: got a control message?!?" << endl;
00186 } else {
00187 conn_state = base->connected();
00188
00189
00190 clog << "Connection::manager() needs to know how to restart the connection!" << endl;
00191 }
00192
00196 }
00197 #endif
00198 } catch (CMStateChange &e) {
00199 clog << "Got a state change exception" << endl;
00200 #if MAY_NOT_WANT_THIS
00201 Guard locker(&mutex);
00202 conn_state = base->connected();
00203 if (!conn_state) {
00204 #else
00205 if (!base->connected()) {
00206 #endif
00207 clog << "Our Connector is disconnected, shutting down and noting the disconnect" << endl;
00208 stop_engine();
00209 note_disconnect();
00210 }
00211 goto manage_top;
00212 } catch (CMException &e) {
00213 cerr << "Got a consmgr exception: " << e.what() << endl;
00214 } catch (std::exception &e) {
00215 cerr << "Standard exception: " << e.what() << endl;
00216 } catch (...) {
00217 cerr << "Caught an unknown thing in Connection::manager(), rethrowing" << endl;
00218 throw;
00219 }
00220
00221 #ifdef SHOULD_NOT_BE_HERE
00222 try {
00223 while (is_connected(true)) {
00224
00225
00226
00227
00228 pthread_cleanup_push(cleanup_unlock, &state_mutex);
00229
00230 MUTEX_LOG("cond_wait'ing with " << &state_mutex);
00231 ret = pthread_cond_wait(&state_change, &state_mutex);
00232 PTHREAD_CHECK_AND_THROW(ret, "cond_wait(state_change)");
00233
00234 clog << "Connection::manager(" << get_id()
00235 << "): State has changed ";
00236 pthread_cleanup_pop(0);
00237
00238 if (is_connected(true)) {
00239
00240 clog << "(still connected)" << endl;
00241
00242
00243 clog << "Connection::manager: got a control message?!?" << endl;
00244 } else {
00245
00246
00247 clog << "Connection::manage() needs to know how to restart the connection!" << endl;
00248 }
00249 }
00250 } catch (...) {
00251
00252 MUTEX_LOG("Unlocking " << &state_mutex << "; and re-throwing");
00253 pthread_mutex_unlock(&state_mutex);
00254 throw;
00255 }
00256
00257 clog << "(disconnected; nuking threads)" << endl;
00258
00259
00260
00261 try {
00262 MUTEX_LOG("Unlocking " << &state_mutex << " at end of Connection::manage");
00263 ret = pthread_mutex_unlock(&state_mutex);
00264 PTHREAD_CHECK_AND_THROW(ret, "mutex_unlock(state_mutex)");
00265
00266 shutdown();
00267 } CATCH_STR_OR_DIE(72) {
00268 cerr << "Umm, unknown exception at " << __FILE__ << ":"
00269 << (__LINE__ - 2) << endl;
00270 exit(74);
00271 }
00272 #endif
00273
00274 clog << "exiting Connection::manager(); should I be?" << endl;
00275
00276
00277 return;
00278 }
00279
00280 #if MAY_NOT_WANT_THIS
00281
00285 bool
00286 Connection::connected()
00287 {
00288 Guard locker(&mutex);
00289
00290 return(this->conn_state);
00291 }
00292 #endif
00293
00299 void
00300 Connection::note_disconnect(void)
00301 {
00302 MasterController *mc = MasterController::get_instance();
00303 MessageP msg(new Message(Message::C_Disconnected));
00304
00305 mc->queue_message(msg);
00306 return;
00307 }
00308
00309 #ifdef SHOULD_NOT_BE_HERE
00310
00311
00312
00313 void
00314 Connection::set_connected(bool isconn)
00315 {
00316 int ret;
00317
00318
00319 MUTEX_LOG("Connection::set_connected() locking " << &state_mutex);
00320 ret = pthread_mutex_lock(&state_mutex);
00321 PTHREAD_CHECK_AND_THROW(ret, "mutex_lock(state_mutex)");
00322
00323
00324
00325 try {
00326 bool old_state = this->connected;
00327
00328
00329 if (old_state != isconn) {
00330 this->connected = isconn;
00331
00332 clog << "Connection::set_connected() broadcasting "
00333 << "on state_change" << endl;
00334 ret = pthread_cond_broadcast(&state_change);
00335 PTHREAD_CHECK_AND_THROW(ret, "cond_broadcast(state_change)");
00336 #if MAYBE_A_BAD_IDEA
00337
00338 if (! id_cache.empty())
00339 id_cache.erase();
00340 #endif
00341 }
00342 } catch (...) {
00343 (void)pthread_mutex_unlock(&state_mutex);
00344 throw;
00345 }
00346
00347
00348 MUTEX_LOG("Connected::set_connected() unlocking " << &state_mutex);
00349 ret = pthread_mutex_unlock(&state_mutex);
00350 PTHREAD_CHECK_AND_THROW(ret, "mutex_unlock(state_mutex)");
00351
00352 return;
00353 }
00354 #endif
00355
00356 #ifdef SHOULD_NOT_BE_HERE
00360 void
00361 Connection::initiate_connection(void)
00362 {
00363
00364 if (is_connected())
00365 throw(fireball(32, "Tried to initiate an already connected connection!"));
00366
00367 try {
00368
00369 this->connect();
00370
00371
00372 set_connected(true);
00373
00374
00375
00376 clog << "*8* Starting the processing engine..." << endl;
00377 this->start_engine();
00378 } CATCH_OR_DIE(114) {
00379
00380 throw;
00381 }
00382
00383 return;
00384 }
00385 #endif
00386
00388 void
00389 Connection::shutdown(void)
00390 {
00391 try {
00392
00393
00394 clog << "*8* Calling engine shutdown from Connection::shutdown()..."
00395 << endl;
00396 clog << "*8* We are a '" << typeid(this).name() << "' (" << this->get_id() << endl;
00397 stop_engine();
00398 } CATCH_OR_DIE(115) {
00399
00400 throw;
00401 }
00402
00403
00404
00405 this->please_exit = true;
00406 MessageP noop(new Message(Message::C_None));
00407 queue_message(noop);
00408
00409 return;
00410 }