00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 #include <typeinfo>
00042 
00043 #include "consmgr.h"
00044 #include "mastercont.h"
00045 #include "connection.h"
00046 #include "outlet.h"
00047 #include "guard.h"
00048 
00049 
00050 
00051 
00052 Connection::Connection(ConnectorP &c) : base(c), please_exit(false)
00053 {
00054     int         ret;
00055 
00056     
00057     ret = pthread_mutex_init(&mutex, NULL);
00058     PTHREAD_CHECK_AND_THROW(ret, "mutex_init(mutex)");
00059 
00060 #if MAY_NOT_WANT_THIS
00061     
00062     ret = pthread_cond_init(&state_change, NULL);
00063     PTHREAD_CHECK_AND_THROW(ret, "cond_init(state_change)");
00064 #endif
00065     
00066     ret = pthread_create(&tid_manager, NULL, manager_thread, this);
00067     PTHREAD_CHECK_AND_THROW(ret, "pthread_create(manager)");
00068 
00069     return;
00070 }
00071 
00072 Connection::~Connection(void) throw()
00073 {
00074     int         ret;
00075 
00076     
00077     ret = pthread_mutex_destroy(&mutex);
00078     if (ret != 0)
00079         cerr << "Error destroying Connection::mutex (" << &mutex << "): "
00080              << strerror(ret) << endl;
00081 #if MAY_NOT_WANT_THIS
00082     ret = pthread_cond_destroy(&state_change);
00083     if (ret != 0)
00084         cerr << "Error destroying Connection::state_change (" << &state_change
00085              << "): " << strerror(ret) << endl;
00086 #endif
00087     return;
00088 }
00089 
00090 
00091 
00092 
00093 
00094 
00095 void *
00096 Connection::manager_thread(void *instance)
00097 {
00098     Connection  *me = reinterpret_cast<Connection*>(instance);
00099     me->manager();
00100     return(NULL);
00101 }
00102 
00103 
00104 
00105 
00106 
00107 
00108 
00109 void
00110 Connection::manager(void)
00111 {
00112     clog << "Connection::manager() starting: thread id " << pthread_self()
00113          << endl;
00114     if (! base->connected()) {
00115         clog << base->get_id() << " not connected in "
00116              << "Connection::manager().  Unexpected!" << endl;
00117         throw(fireball(32, base->get_id() + " not connected!"));
00118     }
00119 
00120     
00121 
00122 
00123 
00124 
00125 
00126     while (this->class_name().compare("Connection") == 0)
00127         usleep(1000);
00128 
00129     clog << "Connection::manager() running" << endl;
00130 
00131     
00132     
00133 
00134 
00135 
00136 
00137     start_engine();
00138     
00139 manage_top:
00140     try {
00141         MUTEX_LOG("locking " << &mutex << " with Guard");
00142         Guard   locker(&mutex);
00143 
00144         
00145         clog << "Our base object (" << typeid(*base).name() << " at "
00146              << base.GetPointer(base) << ") is: " << endl;
00147         clog << "\t" << base->get_id() << ", and it is ";
00148         clog << (base->connected() ? "" : "NOT ") << "connected." << endl;
00149 #if MAY_NOT_WANT_THIS
00150         conn_state = base->connected();
00151         clog << "Our connection state is " << conn_state << endl;
00152 #endif
00153         
00154         
00155         while (!please_exit) {
00156             MessageP    msg = get_message();
00157             
00158             
00159             
00160             
00161             
00162             Data        *data = handle_message(msg);
00163             if (data) {
00164                 
00165                 
00166                 MessageP        msg2(data);
00167                 if ((dynamic_cast<Outlet*>(this)))
00168                     dynamic_cast<Outlet*>(this)->queue_data(msg2);
00169             }
00170         }
00171 #ifdef SHOULD_NOT_BE_HERE
00172         int     ret;
00173         while (connected()) {
00174             MUTEX_LOG("cond_wait'ing with " << &mutex);
00175             ret = pthread_cond_wait(&state_change, &mutex);
00176             PTHREAD_CHECK_AND_THROW(ret, "cond_wait(state_change)");
00177 
00178             if (base->connected()) {
00179                 
00180                 clog << "(still connected)" << endl;
00181 
00182                 
00183                 
00184                 
00185                 clog << "Connection::manager: got a control message?!?" << endl;
00186             } else {
00187                 conn_state = base->connected();
00188                 
00189                 
00190                 clog << "Connection::manager() needs to know how to restart the connection!" << endl;
00191             }
00192 
00196         }
00197 #endif
00198     } catch (CMStateChange &e) {
00199         clog << "Got a state change exception" << endl;
00200 #if MAY_NOT_WANT_THIS
00201         Guard   locker(&mutex);
00202         conn_state = base->connected();
00203         if (!conn_state) {
00204 #else
00205         if (!base->connected()) {
00206 #endif
00207             clog << "Our Connector is disconnected, shutting down and noting the disconnect" << endl;
00208             stop_engine();
00209             note_disconnect();
00210         }
00211         goto manage_top;
00212     } catch (CMException &e) {
00213         cerr << "Got a consmgr exception: " << e.what() << endl;
00214     } catch (std::exception &e) {
00215         cerr << "Standard exception: " << e.what() << endl;
00216     } catch (...) {
00217         cerr << "Caught an unknown thing in Connection::manager(), rethrowing" << endl;
00218         throw;
00219     }
00220 
00221 #ifdef  SHOULD_NOT_BE_HERE
00222     try {
00223         while (is_connected(true)) {
00224             
00225             
00226             
00227             
00228             pthread_cleanup_push(cleanup_unlock, &state_mutex);
00229 
00230             MUTEX_LOG("cond_wait'ing with " << &state_mutex);
00231             ret = pthread_cond_wait(&state_change, &state_mutex);
00232             PTHREAD_CHECK_AND_THROW(ret, "cond_wait(state_change)");
00233 
00234             clog << "Connection::manager(" << get_id()
00235                  << "): State has changed ";
00236             pthread_cleanup_pop(0);
00237 
00238             if (is_connected(true)) {
00239                 
00240                 clog << "(still connected)" << endl;
00241 
00242                 
00243                 clog << "Connection::manager: got a control message?!?" << endl;
00244             } else {
00245                 
00246                 
00247                 clog << "Connection::manage() needs to know how to restart the connection!" << endl;
00248             }
00249         }
00250     } catch (...) {
00251         
00252         MUTEX_LOG("Unlocking " << &state_mutex << "; and re-throwing");
00253         pthread_mutex_unlock(&state_mutex);
00254         throw;
00255     }
00256 
00257     clog << "(disconnected; nuking threads)" << endl;
00258 
00259     
00260     
00261     try {
00262         MUTEX_LOG("Unlocking " << &state_mutex << " at end of Connection::manage");
00263         ret = pthread_mutex_unlock(&state_mutex);
00264         PTHREAD_CHECK_AND_THROW(ret, "mutex_unlock(state_mutex)");
00265 
00266         shutdown();
00267     } CATCH_STR_OR_DIE(72) {
00268         cerr << "Umm, unknown exception at " << __FILE__ << ":"
00269              << (__LINE__ - 2) << endl;
00270         exit(74);
00271     }
00272 #endif  
00273 
00274     clog << "exiting Connection::manager(); should I be?" << endl;
00275 
00276     
00277     return;
00278 }
00279 
00280 #if MAY_NOT_WANT_THIS
00281 
00285 bool
00286 Connection::connected()
00287 {
00288     Guard       locker(&mutex);
00289 
00290     return(this->conn_state);
00291 }
00292 #endif
00293 
00299 void
00300 Connection::note_disconnect(void)
00301 {
00302     MasterController    *mc = MasterController::get_instance();
00303     MessageP            msg(new Message(Message::C_Disconnected));
00304     
00305     mc->queue_message(msg);
00306     return;
00307 }
00308 
00309 #ifdef  SHOULD_NOT_BE_HERE
00310 
00311 
00312 
00313 void
00314 Connection::set_connected(bool isconn)
00315 {
00316     int         ret;
00317 
00318     
00319     MUTEX_LOG("Connection::set_connected() locking " << &state_mutex);
00320     ret = pthread_mutex_lock(&state_mutex);
00321     PTHREAD_CHECK_AND_THROW(ret, "mutex_lock(state_mutex)");
00322 
00323     
00324     
00325     try {
00326         bool    old_state = this->connected;
00327 
00328         
00329         if (old_state != isconn) {
00330             this->connected = isconn;
00331 
00332             clog << "Connection::set_connected() broadcasting "
00333                  << "on state_change" << endl;
00334             ret = pthread_cond_broadcast(&state_change);
00335             PTHREAD_CHECK_AND_THROW(ret, "cond_broadcast(state_change)");
00336 #if MAYBE_A_BAD_IDEA
00337             
00338             if (! id_cache.empty())
00339                 id_cache.erase();
00340 #endif
00341         }
00342     } catch (...) {
00343         (void)pthread_mutex_unlock(&state_mutex);
00344         throw;
00345     }
00346 
00347     
00348     MUTEX_LOG("Connected::set_connected() unlocking " << &state_mutex);
00349     ret = pthread_mutex_unlock(&state_mutex);
00350     PTHREAD_CHECK_AND_THROW(ret, "mutex_unlock(state_mutex)");
00351 
00352     return;
00353 }
00354 #endif  
00355 
00356 #ifdef  SHOULD_NOT_BE_HERE
00360 void
00361 Connection::initiate_connection(void)
00362 {
00363     
00364     if (is_connected())
00365         throw(fireball(32, "Tried to initiate an already connected connection!"));
00366 
00367     try {
00368         
00369         this->connect();
00370 
00371         
00372         set_connected(true);
00373 
00374         
00375         
00376         clog << "*8* Starting the processing engine..." << endl;
00377         this->start_engine();
00378     } CATCH_OR_DIE(114) {
00379         
00380         throw;
00381     }
00382 
00383     return;
00384 }
00385 #endif  
00386 
00388 void
00389 Connection::shutdown(void)
00390 {
00391     try {
00392         
00393         
00394         clog << "*8* Calling engine shutdown from Connection::shutdown()..."
00395              << endl;
00396         clog << "*8* We are a '" << typeid(this).name() << "' (" << this->get_id() << endl;
00397         stop_engine();
00398     } CATCH_OR_DIE(115) {
00399         
00400         throw;
00401     }
00402 
00403     
00404     
00405     this->please_exit = true;
00406     MessageP    noop(new Message(Message::C_None));
00407     queue_message(noop);
00408     
00409     return;
00410 }