00001
00019
00020 #include "headers.h"
00021
00022
00023
00024
00040 LinkLayerInterface::LinkLayerInterface()
00041 {
00042
00043 m_connectionTable.reserve(GlobalObjects::instance()->getConfig()->getMaxConnections() + 1);
00044
00045
00046 FD_ZERO(&m_socketSet);
00047 m_highestFd = 0;
00048
00049
00050 m_packetHandler = NULL;
00051
00052
00053 m_connectionListener = NULL;
00054
00055
00056 GlobalObjects::instance()->getCatcher()->registerListener(this);
00057
00058 pthread_attr_t attr;
00059 pthread_attr_init(&attr);
00060
00061
00062 pthread_create(&m_listenThreadId, &attr, LinkLayerInterface::listen, this);
00063
00064
00065 pthread_create(&m_peerConnectThreadId, &attr, LinkLayerInterface::peerConnect, this);
00066 pthread_detach(m_peerConnectThreadId);
00067
00068
00069 pthread_create(&m_receivePacketThreadId, &attr, pollAllSockets, this);
00070
00071
00072 pthread_attr_destroy(&attr);
00073 }
00074
00075
00081 LinkLayerInterface::~LinkLayerInterface()
00082 {
00083 void* statusp;
00084
00085
00086 pthread_cancel(m_peerConnectThreadId);
00087
00088
00089 pthread_join(m_peerConnectThreadId, &statusp);
00090
00091
00092 pthread_cancel(m_listenThreadId);
00093
00094
00095 GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->close();
00096
00097
00098 pthread_join(m_listenThreadId, &statusp);
00099
00100
00101 pthread_cancel(m_receivePacketThreadId);
00102
00103
00104 m_connectionTableLock.lock();
00105 vector<ConnectionTableEntry*>::iterator iter;
00106 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00107
00108
00109
00110 (*iter)->getConnection()->removeListener(NULL);
00111 (*iter)->getConnection()->close();
00112 }
00113
00114 m_connectionTable.clear();
00115 m_connectionTableLock.unlock();
00116
00117
00118 pthread_join(m_receivePacketThreadId, &statusp);
00119
00120 }
00121
00122
00126 void*
00127 LinkLayerInterface::pollAllSockets(void* arg)
00128 {
00129 LinkLayerInterface* lli = (LinkLayerInterface*)arg;
00130 lli->pollAllSocketsImpl();
00131 return NULL;
00132 }
00133
00149 void
00150 LinkLayerInterface::pollAllSocketsImpl(void) {
00151 Node* fromNode = NULL;
00152 NpPacket* packet = NULL;
00153
00154 int errorCount = 0;
00155 int errorLimit = 10;
00156 int selectReturnCode;
00157
00158 while (true) {
00159
00160 pthread_testcancel();
00161 packet = NULL;
00162
00163
00164 selectReturnCode = 0;
00165
00166
00167
00168 m_connectionTableLock.lock();
00169 if (m_connectionTable.empty()) {
00170
00171 m_newConnection.wait(&m_connectionTableLock);
00172 }
00173
00174
00175 FD_ZERO(&m_socketSet);
00176 vector<ConnectionTableEntry*>::iterator iter;
00177 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00178 OS_SPEC_FD_SET((*iter)->getConnection()->getStream(), &m_socketSet);
00179 }
00180 m_connectionTableLock.unlock();
00181
00182
00183 selectReturnCode = select(m_highestFd + 1, &m_socketSet, NULL, NULL, NULL);
00184
00185 debug(DEBUG_LLI, "Select returned: %d", selectReturnCode);
00186
00187
00188 #ifdef WIN32
00189 if (selectReturnCode == SOCKET_ERROR) {
00190 #else
00191 if (selectReturnCode < 0) {
00192 #endif
00193 debug(DEBUG_LLI,"Socket error: %s", printWsaErrorCode());
00194 errorCount++;
00195 if (errorCount > errorLimit) {
00196 OS_SPEC_SLEEP(3000);
00197 }
00198 }
00199
00200 if (selectReturnCode > 0) {
00201 m_connectionTableLock.lock();
00202 for (int j = 0; j < (int)m_connectionTable.size(); j++) {
00203 if (FD_ISSET(m_connectionTable[j]->getConnection()->getStream(), &m_socketSet))
00204 {
00205 const int tmpBufLen = 1;
00206 char tmpBuf[tmpBufLen];
00207 int socketError = recv(m_connectionTable[j]->getConnection()->getStream(), &tmpBuf[0], tmpBufLen, MSG_PEEK);
00208
00209
00210 #ifdef WIN32
00211 if (socketError == 0 || socketError == OS_SPEC_SOCKET_ERROR) {
00212 #else
00213 if (socketError == -1) {
00214 #endif
00215 debug(DEBUG_LLI, "Socket Disconnect");
00216 closeLink(j);
00217 j = 0;
00218 continue;
00219 }
00220
00221 debug(DEBUG_LLI, "reading input from connection ID %d", j);
00222 packet = NpPacket::read(m_connectionTable[j]->getConnection());
00223 if (packet == NULL) {
00224 debug(DEBUG_LLI, "Error receiving packet");
00225 closeLink(j);
00226 j = 0;
00227 continue;
00228 }
00229 fromNode = m_connectionTable[j]->getNode();
00230 }
00231 }
00232 m_connectionTableLock.unlock();
00233 if (packet != NULL) {
00234 debug(DEBUG_LLI, "Received packet, Packet type: %s, VCN: %d", packet->getControlTypeString(), packet->getVcn());
00235
00236 if (m_packetHandler != NULL) {
00237 m_packetHandler->handlePacket(fromNode, packet);
00238 }
00239 }
00240 }
00241 }
00242 }
00243
00244
00252 void* LinkLayerInterface::peerConnect(void* arg)
00253 {
00254 LinkLayerInterface* lli = (LinkLayerInterface*)arg;
00255 lli->peerConnectImpl();
00256 return NULL;
00257 }
00258
00259
00267 void
00268 LinkLayerInterface::peerConnectImpl() {
00269 Mutex tmpLock;
00270
00271
00272 Node* peerNode;
00273
00274 debug(DEBUG_LLI, "Connecting to peers...");
00275
00276 while (true)
00277 {
00278
00279 if (getNumConnections() >= GlobalObjects::instance()->getConfig()->getMinConnections()) {
00280 debug(DEBUG_LLI, "MIN CONNECTION LIMIT REACHED");
00281
00282
00283 m_connectionClosed.wait(&tmpLock);
00284 continue;
00285 }
00286
00287
00288 peerNode = GlobalObjects::instance()->getCatcher()->getUnconnectedNode();
00289
00290
00291 if (peerNode == NULL) {
00292 debug(DEBUG_LLI, "No nodes to connect to. Sleeping until we find more nodes.");
00293 m_nodeAdded.wait(&tmpLock);
00294 continue;
00295 }
00296
00297
00298 if (peerNode->getSocketAddress()->getIpAddress().isLanIp() ||
00299 peerNode->getSocketAddress()->getIpAddress().isLoopback()) {
00300 peerNode->setCantConnect(true);
00301 continue;
00302 }
00303
00304
00305
00306 debug(DEBUG_LLI, "Trying to connect to node");
00307 if (connect(peerNode)) {
00308 debug(DEBUG_LLI, "Connection created, total connections: %d, min connections allowed: %d",
00309 getNumConnections(), GlobalObjects::instance()->getConfig()->getMinConnections());
00310
00311 }
00312 else {
00313 debug(DEBUG_LLI, "Peer connect thread: Error creating connection");
00314 peerNode->setCantConnect(true);
00315 }
00316 }
00317 }
00318
00319
00325 void*
00326 LinkLayerInterface::listen(void* arg)
00327 {
00328 LinkLayerInterface* lli = (LinkLayerInterface*)arg;
00329 lli->listenImpl();
00330 return NULL;
00331 }
00332
00333
00339 void
00340 LinkLayerInterface::listenImpl() {
00341
00342 int oldType;
00343 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldType);
00344 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldType);
00345
00346 ConnectionTableEntry* connection;
00347
00348 GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->listen();
00349
00350
00351 while (true)
00352 {
00353
00354 pthread_testcancel();
00355
00356
00357 if (getNumConnections() >= GlobalObjects::instance()->getConfig()->getMaxConnections()) {
00358 debug(DEBUG_LLI, "MAX CONNECTION LIMIT REACHED");
00359
00360
00361 m_connectionClosed.wait();
00362 }
00363
00364
00365
00366 if ((connection = accept()) != NULL) {
00367 debug(DEBUG_LLI, "Connection accepted, total connections: %d, max connections allowed: %d",
00368 getNumConnections(), GlobalObjects::instance()->getConfig()->getMaxConnections());
00369
00370
00371
00372 }
00373 else {
00374
00375 debug(DEBUG_LLI, "Error accepting connection. Please note: if you switched listening ports this message may also appear.");
00376 }
00377 }
00378 }
00379
00380
00389
00390 ConnectionTableEntry*
00391 LinkLayerInterface::accept()
00392 {
00393 ConnectionInterface* newConnection
00394 = GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->accept();
00395
00396 if (newConnection != NULL) {
00397
00398 if ((int)m_connectionTable.size() >= GlobalObjects::instance()->getConfig()->getMaxConnections()) {
00399
00400 debug(DEBUG_LLI, "MAX CONNECTION LIMIT REACHED");
00401 delete newConnection;
00402 return NULL;
00403 }
00404
00405 IpAddress ip = newConnection->getSocketAddress()->getIpAddress();
00406
00407
00408 if (isConnectedTo(ip)) {
00409 debug(DEBUG_LLI, "We are already connected to that node.");
00410
00411 delete newConnection;
00412 return NULL;
00413 }
00414
00415
00416 Node* target = GlobalObjects::instance()->getCatcher()->lookup(ip);
00417 if (target == NULL) {
00418
00419 target = new Node();
00420 target->setSocketAddress(newConnection->getSocketAddress());
00421 }
00422
00423
00424
00425
00426 ConnectionTableEntry* newEntry = new ConnectionTableEntry(newConnection, target);
00427 debug(DEBUG_LLI, "ConnectionTable slot %d = addr %s", m_connectionTable.size(), ip.toCStr());
00428 add(newEntry);
00429
00430
00431 m_newConnection.broadcast();
00432 return newEntry;
00433 }
00434 else {
00435
00436 return NULL;
00437 }
00438 }
00439
00440
00447 int
00448 LinkLayerInterface::connect(Node* node)
00449 {
00450 debug(DEBUG_LLI, "connect begin");
00451
00452 int retVal = 0;
00453
00454
00455 if ((int)(m_connectionTable.size()+1) >= GlobalObjects::instance()->getConfig()->getMaxConnections()) {
00456 debug(DEBUG_LLI, "MAX CONNECTION LIMIT REACHED");
00457 return retVal;
00458 }
00459
00460
00461 if (isConnectedTo(node->getSocketAddress()->getIpAddress())) {
00462 debug(DEBUG_LLI, "Already connected to %s", node->getSocketAddress()->getIpAddress().toCStr());
00463 return retVal;
00464 }
00465
00466
00467 if (node->getSocketAddress()->getIpAddress().isLoopback() ||
00468 node->getSocketAddress()->getIpAddress().isInAddrAny()) {
00469 debug(DEBUG_LLI, "Not allow to connect to loopback or INADDR_ANY");
00470 return retVal;
00471 }
00472
00473
00474 ConnectionInterface* newConnection = ConnectionInterface::createConnectionObject(DEFAULT_CONNECTION_TYPE);
00475 newConnection->setSocketAddress(node->getSocketAddress());
00476
00477
00478 ConnectionTableEntry* newEntry = new ConnectionTableEntry(newConnection, node);
00479
00480
00481 if (newConnection->connect()) {
00482
00483
00484
00485 m_connectionTableLock.lock();
00486 if ((int)m_connectionTable.size() >= GlobalObjects::instance()->getConfig()->getMaxConnections()) {
00487
00488 m_connectionTableLock.unlock();
00489 debug(DEBUG_LLI, "MAX CONNECTION LIMIT REACHED");
00490 }
00491 else {
00492
00493 m_connectionTableLock.unlock();
00494 debug(DEBUG_LLI, "New connection : connection table slot %d, to IP address %s",
00495 m_connectionTable.size(), node->getSocketAddress()->getIpAddress().toCStr());
00496
00497
00498 add(newEntry);
00499
00500
00501 m_newConnection.broadcast();
00502
00503
00504 retVal = 1;
00505 }
00506 }
00507 else {
00508
00509 debug(DEBUG_LLI, "error connecting");
00510 delete newEntry;
00511 }
00512
00513 if(retVal >= 1) {
00514 node->setCantConnect(false);
00515 }
00516 else {
00517 node->setCantConnect(true);
00518 }
00519
00520 return retVal;
00521 }
00522
00523
00527 void
00528 LinkLayerInterface::changeListeningPortTo(int port) {
00529 void* statusp;
00530
00531
00532 pthread_cancel(m_listenThreadId);
00533
00534 GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->close();
00535
00536
00537 pthread_join(m_listenThreadId, &statusp);
00538
00539
00540
00541
00542
00543 GlobalObjects::instance()->getConfig()->getSelf()->getNode()->getSocketAddress()->setPort(port);
00544
00545
00546 SocketAddress socketAddress(*GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->getSocketAddress());
00547 socketAddress.setPort(port);
00548 GlobalObjects::instance()->getConfig()->getSelf()->getConnection()->setSocketAddress(&socketAddress);
00549
00550
00551 pthread_attr_t attr;
00552 pthread_attr_init(&attr);
00553 pthread_create(&m_listenThreadId, &attr, LinkLayerInterface::listen, this);
00554 }
00555
00556
00561 void
00562 LinkLayerInterface::registerPacketHandler(PacketHandlerInterface* packetHandler) {
00563 if (m_packetHandler == NULL) {
00564 m_packetHandler = packetHandler;
00565 }
00566 else {
00567 debug(DEBUG_LLI, "Trying to assign more than one packet handler!!!!");
00568 }
00569 }
00570
00571
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00605 void
00606 LinkLayerInterface::add(ConnectionTableEntry* entry)
00607 {
00608 Guard guard(&m_connectionTableLock);
00609
00610
00611 m_connectionTable.push_back(entry);
00612
00613 if(m_highestFd < entry->getConnection()->getStream()) {
00614 m_highestFd = entry->getConnection()->getStream();
00615 }
00616
00617
00618
00619
00620
00621 fireEvent(CONNECTION_OPENED, (void*)entry);
00622 }
00623
00624
00630 void
00631 LinkLayerInterface::remove(int connectionTableIndex)
00632 {
00633 Guard guard(&m_connectionTableLock);
00634
00635 m_highestFd = 0;
00636
00637 if (connectionTableIndex < (int)m_connectionTable.size()) {
00638
00639
00640 m_connectionTable[connectionTableIndex]->getConnection()->close();
00641
00642
00643
00644
00645
00646
00647
00648 fireEvent(CONNECTION_CLOSED, (void*)m_connectionTable[connectionTableIndex]);
00649
00650
00651 delete m_connectionTable[connectionTableIndex];
00652 m_connectionTable[connectionTableIndex] = NULL;
00653
00654
00655 vector<ConnectionTableEntry*>::iterator removeThis;
00656 removeThis = m_connectionTable.begin() + connectionTableIndex;
00657 m_connectionTable.erase(removeThis);
00658
00659
00660 vector<ConnectionTableEntry*>::iterator iter;
00661 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00662 if ((*iter)->getConnection()->getStream() > m_highestFd) {
00663 m_highestFd = (*iter)->getConnection()->getStream();
00664 }
00665 }
00666
00667 }
00668 else {
00669 debug(DEBUG_LLI, "ERROR: trying to remove a non-existant link!");
00670 }
00671 }
00672
00673
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00701 bool
00702 LinkLayerInterface::isConnectedTo(IpAddress ipAddr)
00703 {
00704 Guard guard(&m_connectionTableLock);
00705 vector<ConnectionTableEntry*>::iterator iter;
00706 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00707 if ((*iter)->getConnection()->isConnectedTo(ipAddr)) {
00708 return true;
00709 }
00710 }
00711 return false;
00712 }
00713
00714
00719 bool
00720 LinkLayerInterface::isConnected() {
00721 return (!m_connectionTable.size())?false:true;
00722 }
00723
00724
00728 int
00729 LinkLayerInterface::getNumConnections()
00730 {
00731 return m_connectionTable.size();
00732 }
00733
00734
00739 Node*
00740 LinkLayerInterface::getRandomNeighbor()
00741 {
00742 Guard guard(&m_connectionTableLock);
00743 if (m_connectionTable.empty()) {
00744 debug(DEBUG_LLI, "Error! Not connected.");
00745 return NULL;
00746 }
00747 int index = m_randomNumberGenerator.IRandom(0, m_connectionTable.size() - 1);
00748 return m_connectionTable[index]->getNode();
00749 }
00750
00751
00757 Node*
00758 LinkLayerInterface::getRandomNeighbor(Node* node)
00759 {
00760 Guard guard(&m_connectionTableLock);
00761 if (m_connectionTable.size() <= 1) {
00762 return NULL;
00763 }
00764
00765 int index = m_randomNumberGenerator.IRandom(0, m_connectionTable.size() - 1);
00766 if (m_connectionTable.size() > 1) {
00767 while (node->equals(m_connectionTable[index]->getNode())) {
00768 index = m_randomNumberGenerator.IRandom(0, m_connectionTable.size() - 1);
00769 }
00770 }
00771 return m_connectionTable[index]->getNode();
00772 }
00773
00774
00781 bool
00782 LinkLayerInterface::getConnection(int index, Node* retval) {
00783 Guard guard(&m_connectionTableLock);
00784 if ((index >= (int)m_connectionTable.size()) || (index < 0)) {
00785 return false;
00786 }
00787 *retval = *(m_connectionTable[index]->getNode());
00788 return true;
00789 }
00790
00791
00792 Node*
00793 LinkLayerInterface::getConnection(int index)
00794 {
00795 Guard guard(&m_connectionTableLock);
00796 if ((index >= (int)m_connectionTable.size()) || (index < 0)) {
00797 return false;
00798 }
00799 return (m_connectionTable[index]->getNode());
00800 }
00801
00802
00803
00807 int
00808 LinkLayerInterface::sendPacket(ConnectionPacket* packet) {
00809 int index;
00810 int retVal = 0;
00811
00812 if (packet == NULL) {
00813 debug(DEBUG_LLI, "NULL parameter");
00814 }
00815
00816 Guard guard(&m_connectionTableLock);
00817
00818 if (m_connectionTable.empty()) {
00819 return LLI_NOT_CONNECTED;
00820 }
00821
00822 index = m_randomNumberGenerator.IRandom(0, m_connectionTable.size() - 1);
00823
00824 debug(DEBUG_LLI, "Sending to neighbor %s", m_connectionTable[index]->getNode()->getSocketAddress()->toCStr());
00825 retVal = packet->write(m_connectionTable[index]->getConnection());
00826
00827
00828 return retVal;
00829 }
00830
00831
00838 int
00839 LinkLayerInterface::sendPacket(NpPacket* packet, Node* node)
00840 {
00841 if ((packet == NULL) || (node == NULL)) {
00842 debug(DEBUG_LLI, "NULL parameter");
00843 return -1;
00844 }
00845
00846
00847 int s = -1;
00848 vector<ConnectionTableEntry*>::iterator iter;
00849 Guard guard(&m_connectionTableLock);
00850 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00851 if (node->equals((*iter)->getNode())) {
00852 debug(DEBUG_LLI, "Sending packet to %s, Packet Type: %s, destination VCN = %d",
00853 (*iter)->getNode()->getSocketAddress()->toCStr(),
00854 packet->getControlTypeString(),
00855 packet->getVcn());
00856 s = packet->write((*iter)->getConnection());
00857 break;
00858 }
00859 }
00860 return s;
00861 }
00862
00863
00869 int
00870 LinkLayerInterface::close(Node* node) {
00871 Guard guard(&m_connectionTableLock);
00872 vector<ConnectionTableEntry*>::iterator iter;
00873 int i = 0;
00874 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00875 if (node->equals((*iter)->getNode())) {
00876 closeLink(i);
00877 return 1;
00878 }
00879 i++;
00880 }
00881 return 0;
00882 }
00883
00884
00888 void
00889 LinkLayerInterface::closeLink(int connectionTableIndex)
00890 {
00891 remove(connectionTableIndex);
00892 m_connectionClosed.signal();
00893 debug(DEBUG_LLI, "Connection closed to index %d", connectionTableIndex);
00894 }
00895
00896
00902 void
00903 LinkLayerInterface::handleEvent(ObservableInterface* observable, int event, void* object) {
00904 debug(DEBUG_LLI, "callback: nodeAdded");
00905 m_nodeAdded.broadcast();
00906 }
00907
00908
00912 int
00913 LinkLayerInterface::broadcast(NpPacket* packet) {
00914 int s = 0;
00915
00916 Guard guard(&m_connectionTableLock);
00917 vector<ConnectionTableEntry*>::iterator iter;
00918 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00919 s |= packet->write((*iter)->getConnection());
00920 }
00921 return s;
00922 }
00923
00924
00928 int
00929 LinkLayerInterface::broadcast(NpPacket* packet, Node* node) {
00930 int s = 0;
00931
00932 Guard guard(&m_connectionTableLock);
00933 vector<ConnectionTableEntry*>::iterator iter;
00934 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00935 if (!node->equals((*iter)->getNode())) {
00936 s |= packet->write((*iter)->getConnection());
00937 }
00938 }
00939
00940 return s;
00941 }
00942
00943
00944 void
00945 LinkLayerInterface::toStream(std::ostream& out) {
00946 Guard guard(&m_connectionTableLock);
00947 out << "Link Layer" << "\n";
00948 out << "----------" << "\n";
00949 out << "Listening port: " << GlobalObjects::instance()->getConfig()->getSelf()->getNode()->getSocketAddress()->getPort() << "\n";
00950 out << "Number of connections: " << m_connectionTable.size()
00951 << " (Min: " << GlobalObjects::instance()->getConfig()->getMinConnections() << ", "
00952 << "Max: " << GlobalObjects::instance()->getConfig()->getMaxConnections() << ")" << "\n";
00953 vector<ConnectionTableEntry*>::iterator iter;
00954 int i = 0;
00955 for (iter = m_connectionTable.begin(); iter != m_connectionTable.end(); ++iter) {
00956 out << "index: " << i
00957 << " IP/port: " << (*iter)->getConnection()->getSocketAddress()->toCStr() << "\n";
00958 i++;
00959 }
00960 }
00961
00962