00001 #include "headers.h"
00002
00003 SendWindowThreadArg::SendWindowThreadArg(SendWindow* sw, WindowPosition num){
00004 this->sw = sw;
00005 this->num = num;
00006 }
00007
00008
00014
00019 SendWindow::SendWindow(int size, SAR* sar)
00020 {
00021 m_size = size;
00022 m_window.reserve(m_size);
00023 m_retries.reserve(m_size);
00024 m_timeStamp.reserve(m_size);
00025 for (int i = 0; i < m_size; i++) {
00026 m_window.push_back(NULL);
00027 bool* newBool = new bool;
00028 *newBool = false;
00029 m_needSend.push_back(newBool);
00030 m_retries.push_back(0);
00031 }
00032
00033 m_sar = sar;
00034 m_packetSequenceNum = Counter(0, SEQ_NUM_SPACE);
00035 m_lastAckRecv = WindowPosition(SEQ_NUM_SPACE);
00036 m_firstPacketSent = WindowPosition(SEQ_NUM_SPACE, 0);
00037 m_effectiveWindow = m_size - 1;
00038 m_end = false;
00039 m_rtt.setMilliseconds(DEFAULT_RTT);
00040 m_sending = false;
00041 m_credits = size;
00042
00043 m_packetTimeoutCallbackTimer.setName("Packet Timeout Callback Timer");
00044 }
00045
00046
00050 SendWindow::~SendWindow()
00051 {
00052 if (!m_end) {
00053 end();
00054 }
00055
00056 Guard guard(&m_sendWindowLock);
00057
00058 for (int i = 0; i < m_size; i++) {
00059 delete m_window[i];
00060 m_window[i] = NULL;
00061 delete m_needSend[i];
00062 m_needSend[i]=NULL;
00063 }
00064 }
00065
00066
00071 int
00072 SendWindow::addPacket(TpPacket* packet)
00073 {
00074 WindowPosition pos = WindowPosition(SEQ_NUM_SPACE, packet->getSeqNum());
00075 removePacket(pos.getPosition());
00076 Guard guard(&m_sendWindowLock);
00077 m_window[pos.getPosition()] = packet;
00078 return pos.getPosition();
00079 }
00080
00081
00086 void
00087 SendWindow::removePacket(int index)
00088 {
00089 if ((index < 0) || (index > m_size-1)) {
00090 debug(DEBUG_ERR, "sequence number out of range: %d", index);
00091 return;
00092 }
00093
00094 Guard guard(&m_sendWindowLock);
00095 if (index < (int)m_window.size()) {
00096 delete m_window[index];
00097 m_window[index] = NULL;
00098 }
00099 }
00100
00101
00107 int
00108 SendWindow::reliableSend(u_char* data, int dataLength)
00109 {
00110
00111 m_dataLength = dataLength;
00112 m_data = data;
00113 ThreadMessageQueue<Packet>* outQueue = initialSegmentation();
00114 Guard guard(&m_sendWindowLock);
00115 debug(DEBUG_SENDW, "[Reliable Send, %d bytes]", dataLength);
00116 bool isEnd = false;
00117 TpPacket* packet;
00118 int totalBytesSent = 0;
00119
00120 if (m_end) {
00121 return PB_UNREACHABLE;
00122 }
00123
00124 m_sending = true;
00125 do {
00126
00127 if (!outQueue->isEmpty()) {
00128
00129 packet = (TpPacket*)outQueue->getNext();
00130
00131
00132 int sendWindowFullTimeouts = 0;
00133 while ( windowFull() && (sendWindowFullTimeouts < MAX_SEND_WINDOW_TIMEOUTS) ) {
00134
00135 bool gotAck = m_ackSignal.timedWait(&m_sendWindowLock, SEND_WINDOW_TIMEOUT);
00136
00137 if (!gotAck) {
00138 sendEmptyPacket();
00139 sendWindowFullTimeouts++;
00140 }
00141 else {
00142
00143 }
00144
00145 if (m_end) {
00146 debug(DEBUG_SENDW, "reliableSend(): KILLED");
00147 m_terminate.signal();
00148 m_sending = false;
00149 return PB_ERROR;
00150 }
00151 }
00152
00153
00154 if (sendWindowFullTimeouts >= MAX_SEND_WINDOW_TIMEOUTS) {
00155 debug(DEBUG_SENDW, "reliableSend() Exiting: MAX_SEND_WINDOW_TIMEOUTS reached!");
00156 m_sending = false;
00157 return PB_ERROR;
00158 }
00159
00160
00161 addPacket(packet);
00162 m_lastPacketSent = WindowPosition(SEQ_NUM_SPACE, packet->getSeqNum());
00163 computeEffectiveWindow();
00164
00165
00166 m_timeStamp[m_lastPacketSent.getPosition()].setToCurrentTime();
00167
00168
00169 debug(DEBUG_SENDW, "Sending seq number %d, in window position %d", m_lastPacketSent.getSeqNum(), m_lastPacketSent.getPosition());
00170 if ((m_sar->sendDataPacket(m_window[m_lastPacketSent.getPosition()])) < 0) {
00171 debug(DEBUG_ERR, "Error sending data packet: service doesnt exist");
00172 m_sending = false;
00173 return PB_ERROR;
00174 }
00175 totalBytesSent += m_window[m_lastPacketSent.getPosition()]->getDataLength();
00176
00177 dumpDebug();
00178
00179
00180
00181 m_retries[m_lastPacketSent.getPosition()] = 0;
00182 *(m_needSend[m_lastPacketSent.getPosition()]) = true;
00183 SendWindowThreadArg* args = new SendWindowThreadArg(this, m_lastPacketSent);
00184 MEMCHECK(args);
00185 TimeValue retransmitTime = (m_rtt * RETRANSMISSION_FACTOR);
00186 int totalMilliseconds = (int)(retransmitTime.getTotalMilliseconds());
00187 m_packetTimeoutCallbackTimer.addEvent(
00188 totalMilliseconds,
00189 sentPacketTimeout,
00190 args,
00191 m_needSend[m_lastPacketSent.getPosition()]);
00192
00193
00194 if (m_currentPosition < m_dataLength) {
00195 TpPacket* packet = segment();
00196 outQueue->add(packet);
00197 }
00198 }
00199 else {
00200
00201 isEnd = true;
00202 }
00203 } while(!isEnd);
00204
00205 debug(DEBUG_SENDW, "All data sent, waiting for ACKs");
00206
00207
00208 int timeoutCounter = 0;
00209 while( (m_lastAckRecv.getSeqNum() != m_lastPacketSent.getSeqNum()) && (timeoutCounter < MAX_SEND_WINDOW_TIMEOUTS) ) {
00210 debug(DEBUG_SENDW, "last ack = %d, last packet sent = %d", m_lastAckRecv.getSeqNum(), m_lastPacketSent.getSeqNum());
00211 int timeToWait = (int)(SEND_PACKET_MAX_RETRY * DEFAULT_RTT * RETRANSMISSION_FACTOR);
00212 if (m_ackSignal.timedWait(&m_sendWindowLock, timeToWait)) {
00213 debug(DEBUG_SENDW, "ACK signalled");
00214 }
00215 else {
00216 debug(DEBUG_SENDW, "ACK Timeout after waiting for %d millisecs", timeToWait);
00217 timeoutCounter++;
00218 }
00219 if (m_end) {
00220 m_terminate.signal();
00221 m_sending = false;
00222 return PB_ERROR;
00223 }
00224 }
00225
00226 debug(DEBUG_SENDW, "Cleaning up");
00227
00228
00229 m_sending = false;
00230 delete outQueue;
00231 if ( m_lastAckRecv.getSeqNum() == m_lastPacketSent.getSeqNum() ) {
00232 debug(DEBUG_SENDW, "[All data sent]");
00233 return totalBytesSent;
00234 }
00235 else {
00236 debug(DEBUG_SENDW, "[Error sending data]");
00237 return PB_ERROR;
00238 }
00239 }
00240
00241
00247 void
00248 SendWindow::sendEmptyPacket() {
00249 TpPacket* keepAlivePacket = new TpPacket(NULL, 0, 0, m_lastPacketSent.getSeqNum());
00250 MEMCHECK(keepAlivePacket);
00251 if (m_sar->sendDataPacket(keepAlivePacket) < 0) {
00252 debug(DEBUG_SENDW, "Could not send empty keep-alive packet.");
00253 }
00254 debug(DEBUG_SENDW, "[Sent Keep Alive, seq num %d]", m_lastPacketSent.getSeqNum());
00255
00256
00257 delete keepAlivePacket;
00258 }
00259
00260
00264 void
00265 SendWindow::computeEffectiveWindow() {
00266 Guard guard(&m_sendWindowLock);
00267 m_effectiveWindow = m_credits - (m_lastAckRecv.distanceTo(m_lastPacketSent));
00268 debug(DEBUG_SENDW, "Effective window: %d (credits - (lastSent - lastAck)) ==> (%d - (%d - %d (%d)))",
00269 m_effectiveWindow, m_credits, m_lastPacketSent.getSeqNum(), m_lastAckRecv.getSeqNum(),
00270 m_lastAckRecv.distanceTo(m_lastPacketSent));
00271 }
00272
00273
00277 TimeValue
00278 SendWindow::getRtt() {
00279 return this->m_rtt;
00280 }
00281
00282
00286 void
00287 SendWindow::end()
00288 {
00289 int i;
00290 Guard guard(&m_sendWindowLock);
00291 if (m_sending) {
00292 for (i=0; i<m_size; i++) {
00293 *(m_needSend[i]) = false;
00294 }
00295 m_end = true;
00296 i = 0;
00297 do {
00298 m_ackSignal.signal();
00299 } while (!m_terminate.timedWait(&m_sendWindowLock, (int)(DEFAULT_RTT*RETRANSMISSION_FACTOR))
00300 && (++i < 10));
00301 }
00302 }
00303
00304
00309 void*
00310 SendWindow::sentPacketTimeout(void* threadArg)
00311 {
00312 SendWindow* sendWindow = ((SendWindowThreadArg*)threadArg)->sw;
00313 WindowPosition seqNum = ((SendWindowThreadArg*)threadArg)->num;
00314 sendWindow->sentPacketTimeoutImpl(seqNum);
00315 delete (SendWindowThreadArg*)threadArg;
00316 return NULL;
00317 }
00318
00319
00324 void
00325 SendWindow::sentPacketTimeoutImpl(WindowPosition seqNum) {
00326 Guard guard(&m_sendWindowLock);
00327
00328
00329 if (m_window[seqNum.getPosition()] == NULL) {
00330 debug(DEBUG_SENDW, "ERROR packet not in window (should not be here)");
00331 return;
00332 }
00333
00334
00335 if (m_retries[seqNum.getPosition()] > SEND_PACKET_MAX_RETRY) {
00336 debug(DEBUG_SENDW, "PACKET FAILED - didnt receive an ACK for packet %d", seqNum.getPosition());
00337 m_end = true;
00338 return;
00339 }
00340
00341
00342 ++(m_retries[seqNum.getPosition()]);
00343
00344 debug(DEBUG_SENDW, "[Timeout] Retransmitting packet number %d", seqNum.getPosition());
00345
00346
00347 m_window[seqNum.getPosition()]->makeRetrans();
00348
00349
00350 if ((m_sar->sendDataPacket(m_window[seqNum.getPosition()])) < 0) {
00351 debug(DEBUG_ERR, "Error sending data packet: service doesnt exist");
00352 m_end = true;
00353 return;
00354 }
00355
00356
00357
00358 SendWindowThreadArg* args = new SendWindowThreadArg(this, seqNum);
00359 m_packetTimeoutCallbackTimer.addEvent(
00360 (int)((m_rtt * RETRANSMISSION_FACTOR).getTotalMilliseconds()),
00361 sentPacketTimeout,
00362 args,
00363 m_needSend[seqNum.getPosition()]);
00364
00365 debug(DEBUG_SENDW, "Retransmitted packet %d (%d)", seqNum.getSeqNum(), seqNum.getPosition());
00366 return;
00367 }
00368
00369
00374 void
00375 SendWindow::onAckRecvd(int numAcked, int credits, bool retrans)
00376 {
00377 Guard guard(&m_sendWindowLock);
00378
00379 WindowPosition ack = WindowPosition(SEQ_NUM_SPACE, numAcked);
00380
00381 debug(DEBUG_SENDW, "ACK received %d (%d)", ack.getSeqNum(), ack.getPosition());
00382
00383
00384
00385 if (!ack.withinRange(m_lastAckRecv, m_lastPacketSent)) {
00386 debug(DEBUG_SENDW, "Invalid ACK received %d, not within (%d, %d)",
00387 ack.getSeqNum(), m_lastAckRecv.getSeqNum(), m_lastPacketSent.getSeqNum());
00388 return;
00389 }
00390
00391 m_credits = credits;
00392
00393 if (!retrans) {
00394 TimeValue rtt = TimeValue::getCurrentTime() - m_timeStamp[ack.getPosition()];
00395
00396 if (rtt.getSeconds() < 60) {
00397 m_rtt = (m_rtt * RTT_FACTOR) + (rtt * (1-RTT_FACTOR));
00398 stringstream tmp;
00399 tmp << rtt;
00400 debug(DEBUG_SENDW, "Current RTT: %s", tmp.str().c_str());
00401 }
00402 }
00403
00404
00405 if (ack.getSeqNum() == m_lastPacketSent.getSeqNum()) {
00406
00407 removePacket(ack.getPosition());
00408
00409 *(m_needSend[ack.getPosition()]) = false;
00410 }
00411 else {
00412
00413 Counter tmpCounter = Counter(0, m_size - 1, ack.getPosition());
00414 for (; tmpCounter.getCount() != m_lastPacketSent.getPosition(); tmpCounter.decrement()) {
00415 removePacket(tmpCounter.getCount());
00416
00417 *(m_needSend[tmpCounter.getCount()]) = false;
00418 }
00419 }
00420
00421
00422
00423
00424
00425
00426 m_lastAckRecv = WindowPosition(SEQ_NUM_SPACE, numAcked);
00427 computeEffectiveWindow();
00428
00429
00430 m_ackSignal.signal();
00431
00432 dumpDebug();
00433 }
00434
00435
00439 bool
00440 SendWindow::windowFull()
00441 {
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453 return (m_effectiveWindow <= 0);
00454 }
00455
00456
00462 ThreadMessageQueue<Packet>*
00463 SendWindow::initialSegmentation() {
00464 ThreadMessageQueue<Packet>* outQueue = new ThreadMessageQueue<Packet>;
00465 MEMCHECK(outQueue);
00466
00467 m_currentPosition = 0;
00468 int packetsCreated = 0;
00469 do {
00470 TpPacket* packet = segment();
00471 packetsCreated++;
00472 outQueue->add(packet);
00473 } while ( (packetsCreated < WINDOW_SIZE) && (m_currentPosition < m_dataLength) );
00474 return outQueue;
00475 }
00476
00477
00485 TpPacket*
00486 SendWindow::segment() {
00487 TpPacket* packet;
00488 int copyAmount = ((m_dataLength - m_currentPosition) > SEGMENT_SIZE) ? SEGMENT_SIZE : (m_dataLength - m_currentPosition);
00489 packet = new TpPacket(m_data + m_currentPosition, copyAmount, false, m_packetSequenceNum.getCount());
00490 MEMCHECK(packet);
00491 debug(DEBUG_SENDW, "[Segmenting: Seq num %d, data length: %d]", m_packetSequenceNum.getCount(), copyAmount);
00492 m_currentPosition += copyAmount;
00493 m_packetSequenceNum.increment();
00494 return packet;
00495 }
00496
00497
00501 void
00502 SendWindow::dumpDebug() {
00503 #ifdef DEBUG_ON
00504 stringstream tmp;
00505 tmp << *this;
00506 debug(DEBUG_RECVW, "Current state of receive window:\n%s", tmp.str().c_str());
00507 #endif
00508 }
00509
00510
00514 void
00515 SendWindow::toStream(std::ostream& out)
00516 {
00517 Guard guard(&m_sendWindowLock);
00518 out << "---------- SEND WINDOW ----------" << "\n";
00519 for (int i = 0; i < m_size; i++) {
00520 if (m_window[i] != NULL) {
00521 out << m_window[i]->getSeqNum();
00522 }
00523 else {
00524 out << "[null]";
00525 }
00526 if (m_lastAckRecv.getPosition() == i) {
00527 out << " <-- (last ACK received)";
00528 }
00529 if (m_lastPacketSent.getPosition() == i) {
00530 out << " <-- (last packet sent)";
00531 }
00532 out << "\n";
00533 }
00534 out << "---------------------------------" << "\n";
00535 }
00536
00537
00538
00539