00001
00002 extern "C" {
00003 #include <unistd.h>
00004 #include <string.h>
00005 #include <sys/types.h>
00006 #include <sys/socket.h>
00007 #include <sys/un.h>
00008 #include <netinet/in.h>
00009 }
00010
00011 #include <sstream>
00012 #include <typeinfo>
00013
00014 #include "consmgr.h"
00015 #include "socklistener.h"
00016 #include "fdconnector.h"
00017 #include "guard.h"
00018
00019 SocketListener::SocketListener(SocketAddressP &addr)
00020 : sock_addr(addr)
00021 {
00022
00023 Guard::initialize_mutex(mutex);
00024 MUTEX_LOG("initialized mutex " << &mutex);
00025
00026 #warning "SocketListener is calling make_incoming_socket() before the object is fully constructed. Subclasses will never have their make_incoming_socket invoked!"
00027
00028 sock_fd = make_incoming_socket();
00029 }
00030
00031 SocketListener::~SocketListener()
00032 {
00033
00034 disconnect();
00035
00036
00037 MUTEX_LOG("destroying mutex " << &mutex);
00038 Guard::destroy_mutex(mutex);
00039 }
00040
00045 void
00046 SocketListener::disconnect()
00047 {
00048 MUTEX_LOG("SocketListener::disconnect() locking mutex " << &mutex);
00049 Guard locker(&mutex);
00050
00051 if (sock_fd != -1) {
00052 close(sock_fd);
00053 sock_fd = -1;
00054 }
00055 MUTEX_LOG("SocketListener::disconnect() unlocking mutex " << &mutex << " upon return");
00056 }
00057
00063 int
00064 SocketListener::make_incoming_socket(void)
00065 {
00066
00067
00068
00069
00070
00071
00080 MUTEX_LOG("SocketListener::make_incoming_socket() locking mutex " << &mutex);
00081 Guard locker(&mutex);
00082
00083
00084 int new_s = ::socket(sock_addr->family(), SOCK_STREAM, 0);
00085 if (new_s < 0)
00086 throw(CMFailure("socket(2) call in "
00087 "SocketListener::make_incoming_socket", true));
00088
00089
00090 if (::bind(new_s, sock_addr->get_addr(), sock_addr->length()) < 0) {
00091 clog << "Thread id " << pthread_self() << " throwing bind(2) failure"
00092 << endl;
00093 throw(CMFailure("bind(2) call in "
00094 "SocketListener::make_incoming_socket", true));
00095 }
00096
00097
00098 ::listen(new_s, 1);
00099
00100 clog << this->get_id() << " fd#" << new_s << " bound, ready to accept()"
00101 << endl;
00102
00103 MUTEX_LOG("SocketListener::make_incoming_socket() unlocking mutex " << &mutex << " upon return");
00104
00105 return new_s;
00106 }
00107
00120 ConnectorP
00121 SocketListener::listen_for_incoming(void)
00122 {
00123 int s;
00124 FDConnector *new_sc;
00125 #if 0
00126 sockaddr *other;
00127 socklen_t other_len;
00128 #endif
00129
00130 if (sock_fd < 0)
00131 throw(CMUnsupported("Unable to listen for incoming connection; no socket available."));
00132
00133 MUTEX_LOG("SocketListener::listen_for_incoming() locking mutex " << &mutex);
00134 Guard locker(&mutex);
00135 int ret, prev_canceltype;
00136
00137
00138 ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &prev_canceltype);
00139 PTHREAD_CHECK_AND_THROW(ret,"pthread_setcanceltype");
00140 #if 0
00141
00142 switch (sock_addr->family()) {
00143 case AF_LOCAL:
00144 other = (sockaddr *)new sockaddr_un;
00145 break;
00146 case AF_INET:
00147 other = (sockaddr *)new sockaddr_in;
00148 break;
00149 #ifdef INET6
00150 case AF_INET6:
00151 other = (sockaddr *)new sockaddr_in6;
00152 break;
00153 #endif
00154 default:
00155 throw(fireball(17, "Ugh. Unknown socket address family in SocketListener::listen_for_incoming()"));
00156 break;
00157 }
00158
00159 try {
00160
00161
00162 clog << "Calling accept(" << sock_fd << ") ... " << std::flush;
00163 if ((s = ::accept(sock_fd, other, &other_len)) < 0) {
00164 #else
00165 try {
00166
00167
00168 clog << "Calling accept(" << sock_fd << ") ... " << std::flush;
00169 if ((s = ::accept(sock_fd, NULL, NULL)) < 0) {
00170 #endif
00171 clog << "error from ::accept! What do we do?!?" << endl;
00172 sleep(2);
00173 throw(fireball(15, "SocketListener::listen_for_incoming accept", true));
00174 }
00175 clog << "back from accept, returned fd " << s << endl;
00176 } catch (std::exception &e) {
00177 clog << "Got an exception (" << typeid(&e).name() << "): " << e.what() << endl;
00178 pthread_setcanceltype(prev_canceltype, NULL);
00179 throw;
00180 } catch (...) {
00181 cerr << "Caught an unknown exception from ::accept, rethrowing" << endl;
00182 pthread_setcanceltype(prev_canceltype, NULL);
00183 throw;
00184 }
00185 pthread_setcanceltype(prev_canceltype, NULL);
00186
00187
00188
00189 new_sc = new FDConnector(s);
00190
00191 if (!new_sc) {
00192 throw (fireball(29, "Ack! Unable to allocate a new FDConnector!"));
00193 }
00194 clog << this->get_id() << " spawned a new Connector: " << new_sc << ": "
00195 << new_sc->get_id() << endl;
00196
00197 MUTEX_LOG("SocketListener::listen_for_incoming() unlocking mutex " << &mutex<< " upon return");
00198
00199
00200
00201 return (ConnectorP(new_sc));
00202 }
00203
00205 std::string
00206 SocketListener::get_id(void) const
00207 {
00208 std::ostringstream id_str;
00209
00210 id_str << "SocketListener[" << sock_addr->get_id() << "]";
00211
00212
00213 return(id_str.str());
00214 }