Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

SendWindow.cpp

Go to the documentation of this file.
00001 #include "headers.h"
00002 
00003 SendWindowThreadArg::SendWindowThreadArg(SendWindow* sw, WindowPosition num){
00004     this->sw = sw;
00005     this->num = num;
00006 }
00007 
00008     
00014 
00019 SendWindow::SendWindow(int size, SAR* sar)
00020 {
00021     m_size = size;
00022     m_window.reserve(m_size);
00023     m_retries.reserve(m_size);
00024     m_timeStamp.reserve(m_size);
00025     for (int i = 0; i < m_size; i++) {
00026         m_window.push_back(NULL);
00027         bool* newBool = new bool;
00028         *newBool = false;
00029         m_needSend.push_back(newBool);
00030         m_retries.push_back(0);
00031     }
00032 
00033     m_sar = sar;
00034     m_packetSequenceNum = Counter(0, SEQ_NUM_SPACE);
00035     m_lastAckRecv = WindowPosition(SEQ_NUM_SPACE); //, m_size-1);
00036     m_firstPacketSent = WindowPosition(SEQ_NUM_SPACE, 0);
00037     m_effectiveWindow = m_size - 1; 
00038     m_end = false;
00039     m_rtt.setMilliseconds(DEFAULT_RTT);
00040     m_sending = false;
00041     m_credits = size;
00042     
00043     m_packetTimeoutCallbackTimer.setName("Packet Timeout Callback Timer");
00044 } // constructor
00045 
00046 
00050 SendWindow::~SendWindow()
00051 {
00052     if (!m_end) {
00053         end();
00054     }
00055 
00056     Guard guard(&m_sendWindowLock);
00057     
00058     for (int i = 0; i < m_size; i++) {
00059         delete m_window[i];
00060         m_window[i] = NULL;
00061         delete m_needSend[i];
00062         m_needSend[i]=NULL;
00063     }
00064 } // destructor
00065 
00066 
00071 int 
00072 SendWindow::addPacket(TpPacket* packet)
00073 {
00074     WindowPosition pos = WindowPosition(SEQ_NUM_SPACE, packet->getSeqNum());
00075     removePacket(pos.getPosition());
00076     Guard guard(&m_sendWindowLock);
00077     m_window[pos.getPosition()] = packet;
00078     return pos.getPosition();
00079 } // fn addPacket
00080 
00081 
00086 void 
00087 SendWindow::removePacket(int index)
00088 {
00089     if ((index < 0) || (index > m_size-1)) {
00090         debug(DEBUG_ERR, "sequence number out of range: %d", index);
00091         return;
00092     }
00093 
00094     Guard guard(&m_sendWindowLock);
00095     if (index < (int)m_window.size()) {
00096         delete m_window[index];
00097         m_window[index] = NULL;
00098     }
00099 } // fn removePacket
00100 
00101 
00107 int
00108 SendWindow::reliableSend(u_char* data, int dataLength)
00109 {
00110     // segment as much data as we can
00111     m_dataLength = dataLength;
00112     m_data = data;
00113     ThreadMessageQueue<Packet>* outQueue = initialSegmentation();
00114     Guard guard(&m_sendWindowLock);
00115     debug(DEBUG_SENDW, "[Reliable Send, %d bytes]", dataLength);
00116     bool isEnd = false;
00117     TpPacket* packet;
00118     int totalBytesSent = 0;
00119 
00120     if (m_end) {
00121         return PB_UNREACHABLE;
00122     }
00123 
00124     m_sending = true;
00125     do { 
00126         // if the queue is not empty
00127         if (!outQueue->isEmpty()) {
00128             // get the next packet
00129             packet = (TpPacket*)outQueue->getNext();
00130 
00131             // Block while send window is full.
00132             int sendWindowFullTimeouts = 0;
00133             while ( windowFull() && (sendWindowFullTimeouts < MAX_SEND_WINDOW_TIMEOUTS) ) {
00134                 //debug(DEBUG_SENDW, "[Send window full]");
00135                 bool gotAck = m_ackSignal.timedWait(&m_sendWindowLock, SEND_WINDOW_TIMEOUT);
00136                 // Only increment if we timed out waiting for an ACK
00137                 if (!gotAck) {
00138                     sendEmptyPacket();
00139                     sendWindowFullTimeouts++;
00140                 }
00141                 else {
00142                     //debug(DEBUG_SENDW, "Ack signaled while send window full, not incrementing timeout counter.");
00143                 }
00144                 // We were requested to die by destructor
00145                 if (m_end) {
00146                     debug(DEBUG_SENDW, "reliableSend(): KILLED");
00147                     m_terminate.signal();
00148                     m_sending = false;
00149                     return PB_ERROR;
00150                 }
00151             }
00152 
00153             // If it is full for too long, we quit.
00154             if (sendWindowFullTimeouts >= MAX_SEND_WINDOW_TIMEOUTS) {
00155                 debug(DEBUG_SENDW, "reliableSend() Exiting: MAX_SEND_WINDOW_TIMEOUTS reached!");
00156                 m_sending = false;
00157                 return PB_ERROR;
00158             }
00159             
00160             // add the packet to the window and remember its position
00161             addPacket(packet);
00162             m_lastPacketSent = WindowPosition(SEQ_NUM_SPACE, packet->getSeqNum());
00163             computeEffectiveWindow();
00164 
00165             // remember the time the packet was sent out
00166             m_timeStamp[m_lastPacketSent.getPosition()].setToCurrentTime();
00167 
00168             // send the packet out
00169             debug(DEBUG_SENDW, "Sending seq number %d, in window position %d", m_lastPacketSent.getSeqNum(), m_lastPacketSent.getPosition());
00170             if ((m_sar->sendDataPacket(m_window[m_lastPacketSent.getPosition()])) < 0) {
00171                 debug(DEBUG_ERR, "Error sending data packet: service doesnt exist");
00172                 m_sending = false;
00173                 return PB_ERROR;
00174             }
00175             totalBytesSent += m_window[m_lastPacketSent.getPosition()]->getDataLength();
00176 
00177             dumpDebug();
00178 
00179             // Queue an event that will retransmit the packet unless an ACK is received
00180             //debug(DEBUG_SENDW, "Queuing ACK timeout event for seq num %d", m_lastPacketSent.getSeqNum());
00181             m_retries[m_lastPacketSent.getPosition()] = 0;
00182             *(m_needSend[m_lastPacketSent.getPosition()]) = true;
00183             SendWindowThreadArg* args = new SendWindowThreadArg(this, m_lastPacketSent);
00184             MEMCHECK(args);
00185             TimeValue retransmitTime = (m_rtt * RETRANSMISSION_FACTOR);
00186             int totalMilliseconds = (int)(retransmitTime.getTotalMilliseconds());
00187             m_packetTimeoutCallbackTimer.addEvent(
00188                 totalMilliseconds, 
00189                 sentPacketTimeout, 
00190                 args, 
00191                 m_needSend[m_lastPacketSent.getPosition()]);
00192 
00193             // if we havent segmented all the data yet, segment some more
00194             if (m_currentPosition < m_dataLength) {
00195                 TpPacket* packet = segment();
00196                 outQueue->add(packet);
00197             }
00198         }
00199         else {
00200             // queue is empty, we are done sending
00201             isEnd = true;
00202         }
00203     } while(!isEnd);
00204 
00205     debug(DEBUG_SENDW, "All data sent, waiting for ACKs");
00206 
00207     // Make sure all packets are ACKed.
00208     int timeoutCounter = 0;
00209     while( (m_lastAckRecv.getSeqNum() != m_lastPacketSent.getSeqNum()) && (timeoutCounter < MAX_SEND_WINDOW_TIMEOUTS) ) {
00210         debug(DEBUG_SENDW, "last ack = %d, last packet sent = %d", m_lastAckRecv.getSeqNum(), m_lastPacketSent.getSeqNum());
00211         int timeToWait = (int)(SEND_PACKET_MAX_RETRY * DEFAULT_RTT * RETRANSMISSION_FACTOR);
00212         if (m_ackSignal.timedWait(&m_sendWindowLock, timeToWait)) {
00213             debug(DEBUG_SENDW, "ACK signalled");
00214         }
00215         else {
00216             debug(DEBUG_SENDW, "ACK Timeout after waiting for %d millisecs", timeToWait);
00217             timeoutCounter++;
00218         }
00219         if (m_end) {
00220             m_terminate.signal();
00221             m_sending = false;
00222             return PB_ERROR;
00223         }
00224     }
00225 
00226     debug(DEBUG_SENDW, "Cleaning up");
00227 
00228     // clean up
00229     m_sending = false;
00230     delete outQueue;
00231     if ( m_lastAckRecv.getSeqNum() == m_lastPacketSent.getSeqNum() ) {
00232         debug(DEBUG_SENDW, "[All data sent]");
00233         return totalBytesSent;
00234     }
00235     else {
00236         debug(DEBUG_SENDW, "[Error sending data]");
00237         return PB_ERROR;
00238     }
00239 } // fn reliableSend
00240 
00241 
00247 void
00248 SendWindow::sendEmptyPacket() {
00249     TpPacket* keepAlivePacket = new TpPacket(NULL, 0, 0, m_lastPacketSent.getSeqNum());
00250     MEMCHECK(keepAlivePacket);
00251     if (m_sar->sendDataPacket(keepAlivePacket) < 0) {
00252         debug(DEBUG_SENDW, "Could not send empty keep-alive packet.");
00253     }
00254     debug(DEBUG_SENDW, "[Sent Keep Alive, seq num %d]", m_lastPacketSent.getSeqNum());
00255     
00256     // clean up
00257     delete keepAlivePacket;
00258 } // fn sendEmptyPacket
00259 
00260 
00264 void 
00265 SendWindow::computeEffectiveWindow() {
00266     Guard guard(&m_sendWindowLock);
00267     m_effectiveWindow = m_credits - (m_lastAckRecv.distanceTo(m_lastPacketSent));
00268     debug(DEBUG_SENDW, "Effective window: %d (credits - (lastSent - lastAck)) ==> (%d - (%d - %d (%d)))", 
00269         m_effectiveWindow, m_credits, m_lastPacketSent.getSeqNum(), m_lastAckRecv.getSeqNum(), 
00270         m_lastAckRecv.distanceTo(m_lastPacketSent));
00271 } // fn computeEffectiveWindow
00272 
00273 
00277 TimeValue
00278 SendWindow::getRtt() {
00279     return this->m_rtt;
00280 } // fn getRtt
00281 
00282 
00286 void 
00287 SendWindow::end()
00288 {
00289     int i;
00290     Guard guard(&m_sendWindowLock);
00291     if (m_sending) {
00292         for (i=0; i<m_size; i++) {
00293             *(m_needSend[i]) = false;
00294         }
00295         m_end = true;
00296         i = 0;
00297         do {
00298             m_ackSignal.signal();
00299         } while (!m_terminate.timedWait(&m_sendWindowLock, (int)(DEFAULT_RTT*RETRANSMISSION_FACTOR)) 
00300                && (++i < 10));
00301     }
00302 } // fn end
00303 
00304 
00309 void* 
00310 SendWindow::sentPacketTimeout(void* threadArg)
00311 {
00312     SendWindow* sendWindow = ((SendWindowThreadArg*)threadArg)->sw;
00313     WindowPosition seqNum = ((SendWindowThreadArg*)threadArg)->num;
00314     sendWindow->sentPacketTimeoutImpl(seqNum);
00315     delete (SendWindowThreadArg*)threadArg;
00316     return NULL;
00317 } // fn sentPacketTimeout
00318 
00319 
00324 void
00325 SendWindow::sentPacketTimeoutImpl(WindowPosition seqNum) {
00326     Guard guard(&m_sendWindowLock);
00327     
00328     // if the packet isnt in the window
00329     if (m_window[seqNum.getPosition()] == NULL) {
00330         debug(DEBUG_SENDW, "ERROR packet not in window (should not be here)");
00331         return;
00332     }
00333     
00334     // if we have already tried to send this packet a bunch of times
00335     if (m_retries[seqNum.getPosition()] > SEND_PACKET_MAX_RETRY) {
00336         debug(DEBUG_SENDW, "PACKET FAILED - didnt receive an ACK for packet %d", seqNum.getPosition());
00337         m_end = true;
00338         return;
00339     }
00340 
00341     // Increment the number of times we have tried to resend this packet.
00342     ++(m_retries[seqNum.getPosition()]);
00343 
00344     debug(DEBUG_SENDW, "[Timeout] Retransmitting packet number %d", seqNum.getPosition());
00345     
00346     // Make the packet into a retransmission packet
00347     m_window[seqNum.getPosition()]->makeRetrans();
00348             
00349     // resend the packet
00350     if ((m_sar->sendDataPacket(m_window[seqNum.getPosition()])) < 0) {
00351         debug(DEBUG_ERR, "Error sending data packet: service doesnt exist");
00352         m_end = true;
00353         return;
00354     }
00355     
00356     // set another timeout event
00357     // params: time, function, function arg, bool pointer
00358     SendWindowThreadArg* args = new SendWindowThreadArg(this, seqNum);
00359     m_packetTimeoutCallbackTimer.addEvent(
00360         (int)((m_rtt * RETRANSMISSION_FACTOR).getTotalMilliseconds()), 
00361         sentPacketTimeout, 
00362         args, 
00363         m_needSend[seqNum.getPosition()]);
00364 
00365     debug(DEBUG_SENDW, "Retransmitted packet %d (%d)", seqNum.getSeqNum(), seqNum.getPosition());
00366     return;
00367 } // fn sentPacketTimeoutImpl
00368 
00369 
00374 void 
00375 SendWindow::onAckRecvd(int numAcked, int credits, bool retrans)
00376 {
00377     Guard guard(&m_sendWindowLock);
00378 
00379     WindowPosition ack = WindowPosition(SEQ_NUM_SPACE, numAcked);
00380 
00381     debug(DEBUG_SENDW, "ACK received %d (%d)", ack.getSeqNum(), ack.getPosition());
00382 
00383     // Check if the ACK is valid - it must be within the range of the packets we care about.
00384     // If it isnt, we just drop it.
00385     if (!ack.withinRange(m_lastAckRecv, m_lastPacketSent)) {
00386         debug(DEBUG_SENDW, "Invalid ACK received %d, not within (%d, %d)", 
00387             ack.getSeqNum(), m_lastAckRecv.getSeqNum(), m_lastPacketSent.getSeqNum());
00388         return;
00389     }
00390 
00391     m_credits = credits;
00392 
00393     if (!retrans) {
00394         TimeValue rtt = TimeValue::getCurrentTime() - m_timeStamp[ack.getPosition()];
00395         // as long as its not an outlier
00396         if (rtt.getSeconds() < 60) {
00397             m_rtt = (m_rtt * RTT_FACTOR) + (rtt * (1-RTT_FACTOR));
00398             stringstream tmp;
00399             tmp << rtt;
00400             debug(DEBUG_SENDW, "Current RTT: %s", tmp.str().c_str());
00401         }
00402     }
00403     
00404     // Special case - if ACK number is EQUAL TO last packet sent.
00405     if (ack.getSeqNum() == m_lastPacketSent.getSeqNum()) { 
00406         // Remove the packet from the send window.
00407         removePacket(ack.getPosition());
00408         // This lets the timer know that we dont need to retransmit.
00409         *(m_needSend[ack.getPosition()]) = false;
00410     }
00411     else {
00412         // Remove all packets up to the one that has been ACKed.
00413         Counter tmpCounter = Counter(0, m_size - 1, ack.getPosition());
00414         for (; tmpCounter.getCount() != m_lastPacketSent.getPosition(); tmpCounter.decrement()) {
00415             removePacket(tmpCounter.getCount());
00416             // This lets the timer know that we dont need to retransmit.
00417             *(m_needSend[tmpCounter.getCount()]) = false;
00418         }
00419     }
00420 
00421     // Remember that this was the last ACK we received.
00422     // Since the receiver ACKs the max seq num it has received, and we have already
00423     // checked that this is a valid ACK (with the withinRange() function), we can
00424     // safely assume that this is the highest ACK that we have received.
00425     // Note: dont use "ack" here because it might have been changed in the above loop.
00426     m_lastAckRecv = WindowPosition(SEQ_NUM_SPACE, numAcked);
00427     computeEffectiveWindow();
00428 
00429     // Signal any packets trying to get in the queue there is more space.
00430     m_ackSignal.signal();
00431 
00432     dumpDebug();
00433 } // fn onAckRecvd
00434 
00435 
00439 bool 
00440 SendWindow::windowFull()
00441 {
00442     /*
00443     if (m_lastAckRecv.getSeqNum() == WindowPosition::DONT_CARE) {
00444         debug(DEBUG_SENDW, "windowFull(): ((m_firstPacketSent + (WINDOW_SIZE - 1)) == lastPacketSent) ==> ((%d) == %d)",
00445             (m_firstPacketSent + (WINDOW_SIZE - 1)).getSeqNum(), m_lastPacketSent.getSeqNum());
00446         return ((m_firstPacketSent + (WINDOW_SIZE - 1)).getSeqNum() == m_lastPacketSent.getSeqNum());
00447     }
00448     debug(DEBUG_SENDW, "windowFull(): (m_lastPacketSent == (m_lastAckRecv + (WINDOW_SIZE - 1))) ==> (%d == %d)",
00449             m_lastPacketSent.getSeqNum(), (m_lastAckRecv + (WINDOW_SIZE - 1)).getSeqNum());
00450     return (m_lastPacketSent.getSeqNum() == (m_lastAckRecv + (WINDOW_SIZE - 1)).getSeqNum());
00451     //return (m_lastPacketSent.distanceTo(lastAck) == 0);
00452     */
00453     return (m_effectiveWindow <= 0);
00454 } // fn windowFull
00455 
00456 
00462 ThreadMessageQueue<Packet>* 
00463 SendWindow::initialSegmentation() {
00464     ThreadMessageQueue<Packet>* outQueue = new ThreadMessageQueue<Packet>;
00465     MEMCHECK(outQueue);
00466 
00467     m_currentPosition = 0;
00468     int packetsCreated = 0;
00469     do {
00470         TpPacket* packet = segment();
00471         packetsCreated++;
00472         outQueue->add(packet);
00473     } while ( (packetsCreated < WINDOW_SIZE) && (m_currentPosition < m_dataLength) );
00474     return outQueue;
00475 } // fn initialSegmentation
00476 
00477 
00485 TpPacket*
00486 SendWindow::segment() {
00487     TpPacket* packet;
00488     int copyAmount = ((m_dataLength - m_currentPosition) > SEGMENT_SIZE) ? SEGMENT_SIZE : (m_dataLength - m_currentPosition);
00489     packet = new TpPacket(m_data + m_currentPosition, copyAmount, false, m_packetSequenceNum.getCount());
00490     MEMCHECK(packet);
00491     debug(DEBUG_SENDW, "[Segmenting: Seq num %d, data length: %d]", m_packetSequenceNum.getCount(), copyAmount);
00492     m_currentPosition += copyAmount;
00493     m_packetSequenceNum.increment();
00494     return packet;
00495 } // fn segment
00496 
00497 
00501 void
00502 SendWindow::dumpDebug() {
00503 #ifdef DEBUG_ON
00504     stringstream tmp;
00505     tmp << *this;
00506     debug(DEBUG_RECVW, "Current state of receive window:\n%s", tmp.str().c_str());
00507 #endif
00508 } // fn dumpDebug
00509 
00510 
00514 void
00515 SendWindow::toStream(std::ostream& out)
00516 {
00517     Guard guard(&m_sendWindowLock);
00518     out << "---------- SEND WINDOW ----------" << "\n";
00519     for (int i = 0; i < m_size; i++) {
00520         if (m_window[i] != NULL) {
00521             out << m_window[i]->getSeqNum();
00522         }
00523         else {
00524             out << "[null]";
00525         }
00526         if (m_lastAckRecv.getPosition() == i) {
00527             out << "  <-- (last ACK received)";
00528         }
00529         if (m_lastPacketSent.getPosition() == i) {
00530             out << "  <-- (last packet sent)";
00531         }
00532         out << "\n";
00533     }
00534     out << "---------------------------------" << "\n";
00535 } // fn toStream
00536 
00537 
00538 
00539 

Generated at Thu Jul 11 13:31:51 2002 for Peekabooty by doxygen1.2.9 written by Dimitri van Heesch, © 1997-2001