Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

LinkLayerInterface.cpp

Go to the documentation of this file.
00001 
00019 
00020 #include "headers.h"
00021 
00022 //const int LinkLayerInterface::CONNECTION_OPENED = GlobalObjects::g_eventTypeCounter++;
00023 //const int LinkLayerInterface::CONNECTION_CLOSED = GlobalObjects::g_eventTypeCounter++;
00024 
00040 LinkLayerInterface::LinkLayerInterface()
00041 {
00042         // reserve space for the connection table
00043     m_connectionTable.reserve(GlobalObjects::instance()->getConfig()->getMaxConnections() + 1);
00044 
00045         // initialize the socket sets
00046     FD_ZERO(&m_socketSet);
00047     m_highestFd = 0;
00048 
00049         // initialize the packetHandler object pointer
00050         m_packetHandler = NULL;
00051 
00052     //m_nodeListener = NULL;
00053     m_connectionListener = NULL;
00054 
00055     // register as a catcher listener
00056     GlobalObjects::instance()->getCatcher()->registerListener(this);
00057 
00058     pthread_attr_t attr;
00059         pthread_attr_init(&attr);
00060         
00061     // Create listen thread
00062     pthread_create(&m_listenThreadId, &attr, LinkLayerInterface::listen, this);
00063         
00064     // Create peerConnectThread
00065     pthread_create(&m_peerConnectThreadId, &attr, LinkLayerInterface::peerConnect, this);
00066     pthread_detach(m_peerConnectThreadId);
00067 
00068         // Create the receive packet thread
00069         pthread_create(&m_receivePacketThreadId, &attr, pollAllSockets, this);
00070 
00071     // Cleanup
00072         pthread_attr_destroy(&attr);
00073 } // ctor
00074 
00075 
00081 LinkLayerInterface::~LinkLayerInterface()
00082 {
00083         void* statusp;
00084 
00085         // kill the peer connect thread
00086     pthread_cancel(m_peerConnectThreadId);
00087 
00088         // make sure it is dead before continueing...
00089         pthread_join(m_peerConnectThreadId, &statusp);
00090 
00091     // kill the listen thread
00092         pthread_cancel(m_listenThreadId);
00093         
00094         // wake up the thread so it dies
00095         GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->close();
00096 
00097         // make sure it is dead before continuing...
00098         pthread_join(m_listenThreadId, &statusp);
00099 
00100         // kill the pollAllSockets thread
00101         pthread_cancel(m_receivePacketThreadId);
00102 
00103         // Close all connections 
00104     m_connectionTableLock.lock();
00105     vector<ConnectionTableEntry*>::iterator iter;
00106     for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00107                 // XXX Since this is not normal node disconnection, there can be null pointer problems
00108                 // when closing the links when the listeners are attached.  This is probably not the best way to
00109                 // do it 
00110                 (*iter)->getConnection()->removeListener(NULL);
00111                 (*iter)->getConnection()->close();
00112     }
00113     // Since these Connection objects may be inside of Node objects, we dont want to delete them.
00114     m_connectionTable.clear();
00115     m_connectionTableLock.unlock();
00116 
00117         // make sure the receive packet thread is dead before continueing
00118         pthread_join(m_receivePacketThreadId, &statusp);
00119 
00120 } // destructor
00121 
00122 
00126 void*
00127 LinkLayerInterface::pollAllSockets(void* arg)
00128 {
00129         LinkLayerInterface* lli = (LinkLayerInterface*)arg;
00130         lli->pollAllSocketsImpl();
00131         return NULL;
00132 }
00133 
00149 void 
00150 LinkLayerInterface::pollAllSocketsImpl(void) {
00151         Node* fromNode = NULL;
00152         NpPacket* packet = NULL;
00153 
00154     int errorCount = 0;
00155     int errorLimit = 10;
00156         int selectReturnCode; 
00157 
00158         while (true) {
00159                 // check if we have been cancelled
00160                 pthread_testcancel();
00161                 packet = NULL;
00162                 // Receive a packet
00163 
00164                 selectReturnCode = 0;
00165 
00166         // If there are no current connections, wait until we are connected 
00167         // to something.
00168                 m_connectionTableLock.lock();
00169         if (m_connectionTable.empty()) {
00170                         // temporarily release the lock and wait for a connection
00171             m_newConnection.wait(&m_connectionTableLock);
00172                 }
00173                 
00174                 // Initialize socket set
00175                 FD_ZERO(&m_socketSet);
00176         vector<ConnectionTableEntry*>::iterator iter;
00177         for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00178                         OS_SPEC_FD_SET((*iter)->getConnection()->getStream(), &m_socketSet);
00179         }
00180                 m_connectionTableLock.unlock();
00181                 
00182                 // select - check for readable data on the valid fd's only 
00183                 selectReturnCode = select(m_highestFd + 1, &m_socketSet, NULL, NULL, NULL);
00184                 
00185         debug(DEBUG_LLI, "Select returned: %d", selectReturnCode);
00186 
00187                 // Check for errors             
00188 #ifdef WIN32
00189                 if (selectReturnCode == SOCKET_ERROR) {
00190 #else
00191                 if (selectReturnCode < 0) {
00192 #endif
00193             debug(DEBUG_LLI,"Socket error: %s", printWsaErrorCode());
00194                         errorCount++;
00195                         if (errorCount > errorLimit) {
00196                                 OS_SPEC_SLEEP(3000);
00197                         }
00198                 }
00199                 
00200                 if (selectReturnCode > 0)       {
00201                         m_connectionTableLock.lock();
00202             for (int j = 0; j < (int)m_connectionTable.size(); j++) {
00203                                 if (FD_ISSET(m_connectionTable[j]->getConnection()->getStream(), &m_socketSet))
00204                                 {
00205                     const int tmpBufLen = 1;
00206                     char tmpBuf[tmpBufLen];
00207                     int socketError = recv(m_connectionTable[j]->getConnection()->getStream(), &tmpBuf[0], tmpBufLen, MSG_PEEK);
00208 
00209                     // Check for connection closed
00210 #ifdef WIN32
00211                     if (socketError == 0 || socketError == OS_SPEC_SOCKET_ERROR) {
00212 #else
00213                             if (socketError == -1) {
00214 #endif
00215                         debug(DEBUG_LLI, "Socket Disconnect");
00216                         closeLink(j);
00217                         j = 0;
00218                         continue;
00219                     }
00220                     // try to read data
00221                                         debug(DEBUG_LLI, "reading input from connection ID %d", j);
00222                     packet = NpPacket::read(m_connectionTable[j]->getConnection());
00223                                         if (packet == NULL) {
00224                                                 debug(DEBUG_LLI, "Error receiving packet");
00225                                                 closeLink(j);
00226                         j = 0;
00227                         continue;
00228                                         }
00229                                         fromNode = m_connectionTable[j]->getNode();
00230                                 }
00231                         } // for
00232                         m_connectionTableLock.unlock();
00233             if (packet != NULL) {
00234                 debug(DEBUG_LLI, "Received packet, Packet type: %s, VCN: %d", packet->getControlTypeString(), packet->getVcn());
00235                 // Hand it off to the packet handler for processing
00236                 if (m_packetHandler != NULL) {
00237                     m_packetHandler->handlePacket(fromNode, packet);
00238                 }
00239             }
00240                 } // if (selectReturnCode > 0)
00241         } // while (true)       
00242 } // fn pollAllSocketsImpl
00243 
00244 
00252 void* LinkLayerInterface::peerConnect(void* arg)
00253 {
00254         LinkLayerInterface* lli = (LinkLayerInterface*)arg;
00255     lli->peerConnectImpl();
00256     return NULL;
00257 } // fn peerConnect
00258 
00259 
00267 void
00268 LinkLayerInterface::peerConnectImpl() {
00269     Mutex tmpLock;
00270 
00271         // wait for a connection
00272         Node* peerNode;
00273 
00274         debug(DEBUG_LLI, "Connecting to peers...");
00275         // Do this forever
00276         while (true)
00277         {
00278         // We check if we have reached our minimum connection limit
00279         if (getNumConnections() >= GlobalObjects::instance()->getConfig()->getMinConnections()) {
00280             debug(DEBUG_LLI, "MIN CONNECTION LIMIT REACHED");
00281             // If so, wait for a connection to close before listening for
00282             // more connections.
00283             m_connectionClosed.wait(&tmpLock);
00284             continue;
00285         }
00286 
00287         // Get a node to connect to 
00288         peerNode = GlobalObjects::instance()->getCatcher()->getUnconnectedNode();
00289 
00290         // no nodes to connect to
00291         if (peerNode == NULL) {
00292             debug(DEBUG_LLI, "No nodes to connect to. Sleeping until we find more nodes.");
00293             m_nodeAdded.wait(&tmpLock);
00294             continue;
00295         }
00296     
00297         // ignore loopback and LAN IPs
00298         if (peerNode->getSocketAddress()->getIpAddress().isLanIp() ||
00299             peerNode->getSocketAddress()->getIpAddress().isLoopback()) {
00300             peerNode->setCantConnect(true);
00301             continue;
00302         }
00303 
00304         // We try to connect now - it will be automatically added to 
00305         // the connection table if we connect to it
00306         debug(DEBUG_LLI, "Trying to connect to node");
00307         if (connect(peerNode)) {
00308                     debug(DEBUG_LLI, "Connection created, total connections: %d, min connections allowed: %d", 
00309                 getNumConnections(), GlobalObjects::instance()->getConfig()->getMinConnections());
00310 
00311             } // if
00312         else {
00313             debug(DEBUG_LLI, "Peer connect thread: Error creating connection");
00314             peerNode->setCantConnect(true);
00315         }
00316         } // while (true)
00317 } // fn peerConnectImpl
00318 
00319 
00325 void* 
00326 LinkLayerInterface::listen(void* arg)
00327 {
00328         LinkLayerInterface* lli = (LinkLayerInterface*)arg;
00329     lli->listenImpl();
00330     return NULL;
00331 } // fn listen
00332 
00333 
00339 void
00340 LinkLayerInterface::listenImpl() {
00341     // Allow this thread to be cancelled
00342         int oldType;
00343         pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldType);
00344     pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldType);
00345 
00346     ConnectionTableEntry* connection;
00347 
00348     GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->listen();
00349 
00350         // Do this forever
00351         while (true)
00352         {
00353                 // check if we have been cancelled by another thread
00354                 pthread_testcancel();
00355 
00356         // We check if we have reached our maximum connection limit
00357         if (getNumConnections() >= GlobalObjects::instance()->getConfig()->getMaxConnections()) {
00358             debug(DEBUG_LLI, "MAX CONNECTION LIMIT REACHED");
00359             // If so, wait for a connection to close before listening for
00360             // more connections.
00361             m_connectionClosed.wait();
00362         }
00363 
00364         // We listen for a connection now - it will be automatically added to 
00365         // the connection table if we accept one
00366         if ((connection = accept()) != NULL) {
00367             debug(DEBUG_LLI, "Connection accepted, total connections: %d, max connections allowed: %d", 
00368                 getNumConnections(), GlobalObjects::instance()->getConfig()->getMaxConnections());
00369             // add the node to the catcher
00370             // Do not add node to the catcher because it will have the wrong port number
00371             //GlobalObjects::instance()->getCatcher()->addNode(connection);
00372             } // if
00373         else {
00374                         // if we are not cancelled, there was an error
00375             debug(DEBUG_LLI, "Error accepting connection.  Please note: if you switched listening ports this message may also appear.");
00376         }
00377         } // while (true)
00378 } // fn listenImpl
00379 
00380 
00389 //Node*
00390 ConnectionTableEntry*
00391 LinkLayerInterface::accept()
00392 {
00393         ConnectionInterface* newConnection 
00394         = GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->accept();
00395 
00396     if (newConnection != NULL) {
00397         // check if we have reached our connection limit
00398         if ((int)m_connectionTable.size() >= GlobalObjects::instance()->getConfig()->getMaxConnections()) {
00399             // yep, we reached our limit
00400             debug(DEBUG_LLI, "MAX CONNECTION LIMIT REACHED");
00401             delete newConnection;
00402             return NULL;
00403         }
00404 
00405         IpAddress ip = newConnection->getSocketAddress()->getIpAddress();
00406         
00407         // check if we are already connected to the node
00408         if (isConnectedTo(ip)) {
00409             debug(DEBUG_LLI, "We are already connected to that node.");
00410             // we are already connected to it
00411             delete newConnection;
00412             return NULL;
00413         }
00414         
00415         // check if we have heard of this node before
00416         Node* target = GlobalObjects::instance()->getCatcher()->lookup(ip);
00417         if (target == NULL) {
00418             // we did not find the entry in the catcher database
00419             target = new Node();
00420             target->setSocketAddress(newConnection->getSocketAddress());
00421         }
00422 
00423         // We havent heard of this node before, or we have heard of it but we're not connected to it already.
00424 
00425         // append newest connection to the end of the connection table
00426         ConnectionTableEntry* newEntry = new ConnectionTableEntry(newConnection, target);
00427         debug(DEBUG_LLI, "ConnectionTable slot %d = addr %s", m_connectionTable.size(), ip.toCStr());
00428         add(newEntry);
00429         
00430         // Signal that we have accepted a new connection - used in pollAllSockets
00431         m_newConnection.broadcast();
00432         return newEntry;   
00433     }
00434     else {
00435         // error receiving a connection
00436         return NULL;
00437     }
00438 } // fn accept
00439 
00440 
00447 int
00448 LinkLayerInterface::connect(Node* node)
00449 {
00450         debug(DEBUG_LLI, "connect begin");
00451 
00452     int retVal = 0;
00453 
00454     // check if we have enough connections already
00455     if ((int)(m_connectionTable.size()+1) >= GlobalObjects::instance()->getConfig()->getMaxConnections()) {
00456         debug(DEBUG_LLI, "MAX CONNECTION LIMIT REACHED");
00457         return retVal;
00458     }
00459 
00460     // Check if we are already connected to it
00461     if (isConnectedTo(node->getSocketAddress()->getIpAddress())) {
00462         debug(DEBUG_LLI, "Already connected to %s", node->getSocketAddress()->getIpAddress().toCStr());
00463         return retVal;
00464     }
00465 
00466     // Check if it is a Loopback IP
00467     if (node->getSocketAddress()->getIpAddress().isLoopback() || 
00468         node->getSocketAddress()->getIpAddress().isInAddrAny()) {
00469         debug(DEBUG_LLI, "Not allow to connect to loopback or INADDR_ANY");
00470         return retVal;
00471     }
00472 
00473     // create the connection object and initialize it
00474     ConnectionInterface* newConnection = ConnectionInterface::createConnectionObject(DEFAULT_CONNECTION_TYPE);
00475     newConnection->setSocketAddress(node->getSocketAddress());
00476 
00477     // create the new entry
00478     ConnectionTableEntry* newEntry = new ConnectionTableEntry(newConnection, node);
00479 
00480     // try to connect
00481     if (newConnection->connect()) {
00482         // Connected successfully
00483 
00484         // check again if we have reached the connection limit
00485         m_connectionTableLock.lock();
00486         if ((int)m_connectionTable.size() >= GlobalObjects::instance()->getConfig()->getMaxConnections()) {
00487             // the limit has been reached
00488             m_connectionTableLock.unlock();
00489             debug(DEBUG_LLI, "MAX CONNECTION LIMIT REACHED");
00490         }
00491         else {
00492             // limit has not been reached
00493             m_connectionTableLock.unlock();
00494             debug(DEBUG_LLI, "New connection : connection table slot %d, to IP address %s", 
00495                 m_connectionTable.size(), node->getSocketAddress()->getIpAddress().toCStr());
00496             
00497             // add it to the list
00498             add(newEntry);
00499             
00500             // Signal to waiting threads that we have a new connection
00501             m_newConnection.broadcast();
00502             
00503             // remember that we were successful
00504             retVal = 1;
00505         }
00506     }
00507     else {
00508         // failed to connect
00509         debug(DEBUG_LLI, "error connecting");
00510         delete newEntry;
00511     }
00512 
00513     if(retVal >= 1) {
00514         node->setCantConnect(false);
00515     }
00516     else {
00517         node->setCantConnect(true);
00518     }
00519 
00520     return retVal;
00521 } // fn connect
00522 
00523 
00527 void 
00528 LinkLayerInterface::changeListeningPortTo(int port) {
00529     void* statusp;
00530 
00531         // cancel the thread
00532     pthread_cancel(m_listenThreadId);
00533 
00534         GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->close();
00535 
00536         // wait for it to exit
00537         pthread_join(m_listenThreadId, &statusp);
00538         
00539     // change the port
00540     
00541     // only change the port number for the Node - dont change the IP address because the connection portion
00542     // should always be listening on 0.0.0.0, while the Node should store the public IP.
00543     GlobalObjects::instance()->getConfig()->getSelf()->getNode()->getSocketAddress()->setPort(port);
00544 
00545     // change the port in the connection object
00546     SocketAddress socketAddress(*GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->getSocketAddress());
00547     socketAddress.setPort(port);
00548     GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->setSocketAddress(&socketAddress);
00549 
00550     // restart the thread
00551     pthread_attr_t attr;
00552         pthread_attr_init(&attr);
00553     pthread_create(&m_listenThreadId, &attr, LinkLayerInterface::listen, this);
00554 } // fn changeListeningPortTo
00555 
00556 
00561 void 
00562 LinkLayerInterface::registerPacketHandler(PacketHandlerInterface* packetHandler) {
00563         if (m_packetHandler == NULL) {
00564                 m_packetHandler = packetHandler;
00565         }
00566         else {
00567                 debug(DEBUG_LLI, "Trying to assign more than one packet handler!!!!");
00568         }
00569 } // fn registerPacketHandler
00570 
00571 
00576 /*
00577 void 
00578 LinkLayerInterface::registerConnectionListener(ListenerInterface* listener) {
00579         if (m_connectionListener == NULL) {
00580                 m_connectionListener = listener;
00581         }
00582         else {
00583                 debug(DEBUG_LLI, "Trying to assign more than one connection listener!!!!");
00584         }
00585 } // fn registerConnectionListener
00586 */
00587 
00588   
00589 /*
00590 void 
00591 LinkLayerInterface::registerNodeListener(ListenerInterface* nodeListener) {
00592         if (m_nodeListener == NULL) {
00593                 m_nodeListener = nodeListener;
00594         }
00595         else {
00596                 debug(DEBUG_LLI, "Trying to assign more than one connection listener!!!!");
00597         }
00598 } // fn registerNodeListener
00599 */
00600 
00601 
00605 void
00606 LinkLayerInterface::add(ConnectionTableEntry* entry)
00607 {
00608         Guard guard(&m_connectionTableLock);
00609     
00610     // add node to our table
00611     m_connectionTable.push_back(entry);
00612 
00613     if(m_highestFd < entry->getConnection()->getStream()) {
00614                 m_highestFd = entry->getConnection()->getStream();
00615         }
00616         
00617     // add the registered listener if one exists
00618     //if (m_connectionListener) {
00619     //    entry->getConnection()->registerListener(m_connectionListener);
00620     //}
00621     fireEvent(CONNECTION_OPENED, (void*)entry);
00622 } // fn add
00623 
00624 
00630 void 
00631 LinkLayerInterface::remove(int connectionTableIndex)
00632 {
00633     Guard guard(&m_connectionTableLock);
00634 
00635     m_highestFd = 0;
00636     // check that we are trying to remove a valid table entry
00637     if (connectionTableIndex < (int)m_connectionTable.size()) {
00638         
00639         // shut down the socket
00640         m_connectionTable[connectionTableIndex]->getConnection()->close();
00641         
00642         // remove the listener if one exists
00643         //if (m_connectionListener) {
00644         //    m_connectionTable[connectionTableIndex]->getConnection()->removeListener(m_connectionListener);
00645         //}
00646 
00647         // inform listeners
00648         fireEvent(CONNECTION_CLOSED, (void*)m_connectionTable[connectionTableIndex]);
00649 
00650         // remove the entry from the connection table
00651         delete m_connectionTable[connectionTableIndex];
00652         m_connectionTable[connectionTableIndex] = NULL;
00653 
00654         // Squeaze the table
00655         vector<ConnectionTableEntry*>::iterator removeThis;
00656         removeThis = m_connectionTable.begin() + connectionTableIndex;
00657         m_connectionTable.erase(removeThis);
00658 
00659         // And reset m_highestFd
00660         vector<ConnectionTableEntry*>::iterator iter;
00661         for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00662             if ((*iter)->getConnection()->getStream() > m_highestFd) {
00663                 m_highestFd = (*iter)->getConnection()->getStream();
00664             }
00665         }
00666 
00667     }
00668     else {
00669         debug(DEBUG_LLI, "ERROR: trying to remove a non-existant link!");
00670     }
00671 } // fn remove
00672 
00673 
00678 /*
00679 void 
00680 LinkLayerInterface::remove(Node* node) {
00681         int i = 0;
00682     vector<Node*>::iterator iter;
00683     Guard guard(&m_connectionTableLock);
00684     for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00685                 if ((*iter)->getNode() == node) {
00686                         remove(i);
00687                         break;
00688                 }
00689         i++;
00690     }
00691 } // fn remove
00692 */
00693 
00694 
00701 bool
00702 LinkLayerInterface::isConnectedTo(IpAddress ipAddr)
00703 {
00704     Guard guard(&m_connectionTableLock);
00705     vector<ConnectionTableEntry*>::iterator iter;
00706     for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00707         if ((*iter)->getConnection()->isConnectedTo(ipAddr)) {
00708                         return true;
00709         }
00710     }
00711         return false;
00712 } // fn isConnectedTo
00713 
00714 
00719 bool
00720 LinkLayerInterface::isConnected() {
00721     return (!m_connectionTable.size())?false:true;
00722 } // fn isConnected
00723  
00724 
00728 int
00729 LinkLayerInterface::getNumConnections()
00730 {
00731     return m_connectionTable.size();
00732 } // fn getNumConnections
00733 
00734 
00739 Node* 
00740 LinkLayerInterface::getRandomNeighbor()
00741 {
00742     Guard guard(&m_connectionTableLock);
00743     if (m_connectionTable.empty()) {
00744                 debug(DEBUG_LLI, "Error! Not connected.");
00745                 return NULL;
00746         }
00747     int index = m_randomNumberGenerator.IRandom(0, m_connectionTable.size() - 1);
00748         return m_connectionTable[index]->getNode();
00749 }  // fn getRandomNeighbor
00750  
00751 
00757 Node* 
00758 LinkLayerInterface::getRandomNeighbor(Node* node)
00759 {
00760     Guard guard(&m_connectionTableLock);
00761     if (m_connectionTable.size() <= 1) {
00762                 return NULL;
00763         }
00764 
00765     int index = m_randomNumberGenerator.IRandom(0, m_connectionTable.size() - 1);
00766     if (m_connectionTable.size() > 1) {
00767                 while (node->equals(m_connectionTable[index]->getNode())) {
00768             index = m_randomNumberGenerator.IRandom(0, m_connectionTable.size() - 1);
00769                 }
00770         }
00771         return m_connectionTable[index]->getNode();
00772 }  // fn getRandomNeighbor
00773 
00774 
00781 bool
00782 LinkLayerInterface::getConnection(int index, Node* retval) {
00783     Guard guard(&m_connectionTableLock);
00784     if ((index >= (int)m_connectionTable.size()) || (index < 0)) {
00785         return false;
00786     } 
00787     *retval = *(m_connectionTable[index]->getNode());
00788     return true;
00789 } // fn getConnection
00790 
00791 
00792 Node*
00793 LinkLayerInterface::getConnection(int index)
00794 {
00795         Guard guard(&m_connectionTableLock);
00796     if ((index >= (int)m_connectionTable.size()) || (index < 0)) {
00797         return false;
00798     } 
00799     return (m_connectionTable[index]->getNode());
00800 } // fn getConnection
00801 
00802 
00803 
00807 int 
00808 LinkLayerInterface::sendPacket(ConnectionPacket* packet) {
00809     int index;
00810     int retVal = 0;
00811 
00812     if (packet == NULL) {
00813         debug(DEBUG_LLI, "NULL parameter");
00814     }
00815 
00816     Guard guard(&m_connectionTableLock);
00817 
00818     if (m_connectionTable.empty()) {
00819         return LLI_NOT_CONNECTED;
00820     }
00821     // select a random neighbor
00822     index = m_randomNumberGenerator.IRandom(0, m_connectionTable.size() - 1);
00823     //debug(DEBUG_LLI, "Selected random index %d from [0-%d]\n", index, m_connectionTable.size() - 1);
00824     debug(DEBUG_LLI, "Sending to neighbor %s", m_connectionTable[index]->getNode()->getSocketAddress()->toCStr());
00825     retVal = packet->write(m_connectionTable[index]->getConnection());
00826     //debug(DEBUG_LLI, "Leaving sendPacket");
00827 
00828     return retVal;
00829 } // fn sendPacket
00830 
00831 
00838 int
00839 LinkLayerInterface::sendPacket(NpPacket* packet, Node* node)
00840 {
00841     if ((packet == NULL) || (node == NULL)) {
00842         debug(DEBUG_LLI, "NULL parameter");
00843         return -1;
00844     }
00845 
00846     // Lookup the node in the table
00847     int s = -1;
00848     vector<ConnectionTableEntry*>::iterator iter;
00849     Guard guard(&m_connectionTableLock);
00850     for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00851         if (node->equals((*iter)->getNode())) {
00852                 debug(DEBUG_LLI, "Sending packet to %s, Packet Type: %s, destination VCN = %d", 
00853                 (*iter)->getNode()->getSocketAddress()->toCStr(),
00854                 packet->getControlTypeString(), 
00855                 packet->getVcn());
00856             s = packet->write((*iter)->getConnection());
00857             break;
00858         }
00859     }
00860     return s;
00861 } // fn sendPacket
00862 
00863 
00869 int
00870 LinkLayerInterface::close(Node* node) {
00871     Guard guard(&m_connectionTableLock);
00872     vector<ConnectionTableEntry*>::iterator iter;
00873     int i = 0;
00874     for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00875         if (node->equals((*iter)->getNode())) {
00876             closeLink(i);
00877             return 1;
00878         }
00879         i++;
00880     }
00881     return 0;
00882 } // fn close
00883 
00884 
00888 void
00889 LinkLayerInterface::closeLink(int connectionTableIndex)
00890 {
00891     remove(connectionTableIndex);
00892     m_connectionClosed.signal();
00893     debug(DEBUG_LLI, "Connection closed to index %d", connectionTableIndex);
00894 } // fn closeLink
00895 
00896 
00902 void
00903 LinkLayerInterface::handleEvent(ObservableInterface* observable, int event, void* object) {
00904     debug(DEBUG_LLI, "callback: nodeAdded");
00905     m_nodeAdded.broadcast();
00906 } // fn handleEvent
00907 
00908 
00912 int 
00913 LinkLayerInterface::broadcast(NpPacket* packet) {
00914         int s = 0;
00915         
00916     Guard guard(&m_connectionTableLock);
00917     vector<ConnectionTableEntry*>::iterator iter;
00918     for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00919         s |= packet->write((*iter)->getConnection());
00920     }
00921     return s;
00922 } // fn broadcast
00923 
00924 
00928 int 
00929 LinkLayerInterface::broadcast(NpPacket* packet, Node* node) {
00930         int s = 0;
00931         
00932     Guard guard(&m_connectionTableLock);
00933     vector<ConnectionTableEntry*>::iterator iter;
00934     for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00935         if (!node->equals((*iter)->getNode())) {
00936             s |= packet->write((*iter)->getConnection());
00937         }
00938     }
00939 
00940     return s;
00941 } // fn broadcast
00942 
00943 
00944 void
00945 LinkLayerInterface::toStream(std::ostream& out) {
00946     Guard guard(&m_connectionTableLock);
00947         out << "Link Layer" << "\n";
00948         out << "----------" << "\n";
00949     out << "Listening port: " << GlobalObjects::instance()->getConfig()->getSelf()->getNode()->getSocketAddress()->getPort() << "\n";
00950     out << "Number of connections: " << m_connectionTable.size()
00951         << " (Min: " << GlobalObjects::instance()->getConfig()->getMinConnections() << ", "
00952         << "Max: " << GlobalObjects::instance()->getConfig()->getMaxConnections() << ")" << "\n";
00953     vector<ConnectionTableEntry*>::iterator iter;
00954     int i = 0;
00955     for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00956         out << "index: " << i 
00957             << "  IP/port: " << (*iter)->getConnection()->getSocketAddress()->toCStr() << "\n";
00958         i++;
00959     }
00960 } // fn toStream
00961 
00962 

Generated at Thu Jul 11 13:31:50 2002 for Peekabooty by doxygen1.2.9 written by Dimitri van Heesch, © 1997-2001