00001
00002
00003 #include "consmgr.h"
00004 #include "messageable.h"
00005 #include "connlist.h"
00006 #include "guard.h"
00007
00008
00009
00010
00011 Messageable::Messageable(void)
00012 {
00013 int ret;
00014
00015 ret = pthread_mutex_init(&msg_mutex, NULL);
00016 PTHREAD_CHECK_AND_THROW(ret, "mutex_init(msg_mutex)");
00017 ret = pthread_cond_init(&msg_avail, NULL);
00018 PTHREAD_CHECK_AND_THROW(ret, "mutex_init(msg_mutex)");
00019 }
00020
00024 Messageable::~Messageable(void)
00025 {
00026
00027 (void)pthread_mutex_destroy(&msg_mutex);
00028 (void)pthread_cond_destroy(&msg_avail);
00029
00030
00031 #warning "Messageable wants to know how to remove itself from everyone elses lists, but doesn't know how to!"
00032 #if SHOULD_NOT_BE_HERE
00033
00034 ConnectionList::outlet_list.mutating_for_each(invalidate_messageable(this));
00035 #endif
00036 }
00037
00038
00039
00040
00041
00042
00046 void
00047 Messageable::queue_message(MessageP &msg)
00048 {
00049 int ret;
00050 clog << pthread_self() << " Locking Messageable's mutex " << &msg_mutex << endl;
00051 Guard locker(&msg_mutex);
00052
00053 clog << pthread_self() << " Locked " << &msg_mutex << ", pushing message onto the queue" << endl;
00054 msg_queue.push_back(msg);
00055
00056 clog << "Messageable::queue_message() (" << this->get_id() << ") accepting a message and broadcasting on the condition variable" << endl;
00057 ret = pthread_cond_broadcast(&msg_avail);
00058 PTHREAD_CHECK_AND_THROW(ret, "signal_broadcast(msg_avail)");
00059
00060 clog << pthread_self() << " returning from queue_message" << endl;
00061 return;
00062 }
00063
00069 MessageP
00070 Messageable::get_message(void)
00071 {
00072 int ret;
00073 clog << pthread_self() << " Gonna lock the message mutex " << &msg_mutex << endl;
00074 Guard locker(&msg_mutex);
00075
00076
00077
00078 while (msg_queue.empty()) {
00079 clog << pthread_self() << " Waiting on msg condition variable" << endl;
00080 ret = pthread_cond_wait(&msg_avail, &msg_mutex);
00081 PTHREAD_CHECK_AND_THROW(ret, "cond_wait(msg_avail, msg_mutex)");
00082
00083 }
00084
00085 MessageP retmsg = msg_queue.front();
00086 msg_queue.pop_front();
00087
00088 return (retmsg);
00089 }
00090
00091
00092
00093
00094
00099 Data *
00100 Messageable::handle_message(MessageP &msg)
00101 {
00102 int ret;
00103
00104 clog << "I (" << this << ") got a message! Type " << msg->name() << endl;
00105
00106
00107 if (msg->type() == Message::M_Command) {
00108 switch (msg->code()) {
00109 case Message::C_Exit:
00110 clog << "Got an Exit command, need to shut down..." << endl;
00111 #if 0
00112 if ((ret = pthread_mutex_trylock(&mutex)) == 0) {
00113 pthread_mutex_unlock(&mutex);
00114 MUTEX_LOG("Calling shutdown, mutex (" << &mutex << ") unlocked");
00115 } else {
00116 if (ret == EBUSY)
00117 MUTEX_LOG("Calling shutdown, but mutex (" << &mutex << ") is locked already.");
00118 else
00119 MUTEX_LOG("Error! trylock of mutex (" << &mutex << ") failed: " << strerror(ret));
00120 }
00121 #endif
00122 {
00123 Connection *c = dynamic_cast<Connection*>(this);
00124
00125 c->shutdown();
00126 }
00127 break;
00128 default:
00129 clog << "Got a command (" << msg->code() << "), ignoring...\n" << endl;
00130 break;
00131 }
00132 } else {
00133
00134 clog << "msg->type() is " << msg->name() << endl;
00135 Data *d = dynamic_cast<Data*>(MessageP::GetPointer(msg));
00136 if (d == NULL) {
00137 throw(fireball(114,"Ahh! Can't get a Data* from the msg in Messageable::handle_message"));
00138 }
00139
00140
00141 return(new Data(*d));
00142 }
00143
00144 return(NULL);
00145 }
00146