Alex Bikfalvi
SimStream Documentation
ConnectionReceiver.cpp
00001 #include "Headers.h" 00002 #include "ConnectionReceiver.h" 00003 #include "PacketConnectionFeedback.h" 00004 #include "Numeric.h" 00005 00006 #include "Debug.h" 00007 #include "Console.h" 00008 00009 #pragma warning(disable : 4996) 00010 00011 double CConnectionReceiver::lossIntervalWeigth[MAX_LOSS_EVENTS] = {1.0, 1.0, 1.0, 1.0, 0.8, 0.6, 0.4, 0.2}; 00012 00013 CConnectionReceiver::CConnectionReceiver( 00014 __uint32 idEntry, 00015 __uint16 port, 00016 CSimHandler* sim, 00017 IDelegate5<void, __uint16, __uint16, CAddress, __byte, CPacket*>* delegateSend, 00018 IDelegate2<void, CConnectionReceiver*, CPacket*>* delegateRecv, 00019 IDelegate1<void, CConnection*>* delegateDispose, 00020 CAddress remoteAddress, 00021 __uint16 remotePort 00022 ) : CConnection(REQUESTER, idEntry, port, sim, delegateSend, delegateDispose, remoteAddress, remotePort) 00023 { 00024 // Delegate receive 00025 this->delegateRecv = delegateRecv; 00026 00027 // Flow 00028 this->flowLossRate = 0; 00029 this->flowRateRecv = 0; 00030 this->flowSequenceLast = 0; 00031 this->flowTimeLast = 0; 00032 00033 // Timers 00034 this->timerFeedback = new CTimer<CConnectionReceiver>(this->sim, this, &CConnectionReceiver::TimerFeedback); 00035 00036 // Receive function : set to first 00037 this->recvData = &CConnectionReceiver::RecvDataFirst; 00038 00039 // Loss 00040 this->lossEventFirst = 0; 00041 this->lossEventLast = 0; 00042 this->lossEventCount = 0; 00043 00044 // Receive rate 00045 this->recvSize = 0; 00046 00047 // Feedback 00048 this->timeFeedbackLast = 0; 00049 } 00050 00051 CConnectionReceiver::~CConnectionReceiver() 00052 { 00053 // Timers 00054 delete this->timerFeedback; 00055 } 00056 00057 void CConnectionReceiver::Recv(CPacketConnection *packet) 00058 { 00059 // General processing for all received packets (without considering the connection state) 00060 } 00061 00062 void CConnectionReceiver::RecvData(CPacketConnectionData* packet) 00063 { 00064 // Check the packet source and destination 00065 assert(packet->Dst() == this->id); 00066 assert(packet->DstEntry() == this->idEntry); 00067 assert(packet->Flow() == this->flow); 00068 00069 // Check the connection state : must be OPENED or greater 00070 if(this->state < OPENED) return; 00071 00072 #ifdef LOG_FLOW 00073 CConsole::SetColor(CConsole::LIGHT_GREEN); 00074 printf("\n\tRECV T = %8.3lf RecvData() : %s %s @ %u", this->sim->Time(), this->ToString(), packet->Payload()->ToString(), packet->Sequence()); 00075 CConsole::SetColor(CConsole::LIGHT_GRAY); 00076 #endif 00077 00078 // Global packet processing 00079 this->flowRtt = packet->Rtt(); // Round-trip-time 00080 00081 assert(this->flowRtt > 0); 00082 00083 // Process packet 00084 (this->*this->recvData)(packet); 00085 00086 // Send payload to the delegate 00087 (*this->delegateRecv)(this, packet->Payload()); 00088 } 00089 00090 void CConnectionReceiver::RecvDataFirst(CPacketConnectionData* packet) 00091 { 00092 // Send a feedback packet 00093 CPacketConnectionFeedback* reply = new CPacketConnectionFeedback( 00094 this->flow, 00095 this->id, 00096 this->remoteId, 00097 this->idEntry, 00098 this->remoteIdEntry, 00099 this->sim->Time(), 00100 packet->TimeTx(), 00101 0, // Time delay 00102 this->flowRateRecv, 00103 this->flowLossRate, 00104 1, // Bitmap of acknowledged packets ( **** 0001) 00105 packet->Sequence(), // SEQ of first acknowledged packet 00106 0, // Bitmap of loss packets (N/A) 00107 0 // SEQ of first loss packet 00108 ); 00109 00110 (*this->delegateSend)(this->port, this->remotePort, this->remoteAddress, 128, reply); 00111 00112 #ifdef LOG_FLOW 00113 CConsole::SetColor(CConsole::LIGHT_RED); 00114 printf("\n\tRECV T = %8.3lf SendFeedback() : %s", this->sim->Time(), this->ToString()); 00115 CConsole::SetColor(CConsole::LIGHT_GRAY); 00116 printf("\n\t\tACK : %u", packet->Sequence()); 00117 #endif 00118 00119 // Do not add the received packet info to packet history - the packet has already been acknowledged 00120 00121 // Set the maximum sequence number and time to this packet (as first packet) 00122 this->flowSequenceLast = packet->Sequence(); 00123 this->flowTimeLast = this->sim->Time(); 00124 00125 // Add the payload size to the received size 00126 this->recvSize += (packet->Payload())?packet->Payload()->Size():0; 00127 00128 // Set the feedback timer 00129 this->timerFeedback->SetAfter(this->flowRtt); 00130 00131 // Receive function : set to next 00132 this->recvData = &CConnectionReceiver::RecvDataNext; 00133 } 00134 00135 void CConnectionReceiver::RecvDataNext(CPacketConnectionData* packet) 00136 { 00137 /* 00138 * Step 0 00139 */ 00140 // Record the Tx and Rx time for last received packet 00141 this->timeRxLast = packet->TimeTx(); 00142 this->timeTxLast = this->sim->Time(); 00143 00144 // Add the payload size to the received size 00145 this->recvSize += (packet->Payload())?packet->Payload()->Size():0; 00146 00147 /* 00148 * Step 1 : Add the received packet info to acknowledgement history : packet SEQ and received time 00149 */ 00150 this->historyRecv.insert(pair<__uint32, __time>(packet->Sequence(), this->sim->Time())); 00151 00152 /* 00153 * Step 2 : Check if packet indicates a new loss event 00154 */ 00155 00156 bool newLossEvent = false; 00157 00158 // Calculate whether there are missing packets between this packet and the maximum ID number 00159 for(__uint32 seq = this->flowSequenceLast + 1; seq < packet->Sequence(); seq++) 00160 { 00161 // The packets with this SEQ is missing : add the missing SEQ at the end of the loss history 00162 this->historyLoss.push(seq); 00163 00164 // Calculate the interpolated time of this lost packet 00165 __time lossTime = this->flowTimeLast + (this->sim->Time() - this->flowTimeLast) * (seq - this->flowSequenceLast) / (packet->Sequence() - this->flowSequenceLast); 00166 00167 // If there exists a last loss event 00168 if(this->lossEventCount > 0) 00169 { 00170 // Check if the packet belongs to the last loss event 00171 00172 // If the loss time is not within an RTT from the last loss event 00173 if(this->lossEvents[this->lossEventLast].firstLossTime + this->flowRtt < lossTime) 00174 { 00175 // Add lost packet belongs to a new loss event 00176 00177 // Increment the last loss event index 00178 this->lossEventLast = (++this->lossEventLast) % MAX_LOSS_EVENTS; 00179 00180 // If the maximum number of loss events has been reached 00181 if(this->lossEventFirst == this->lossEventLast) 00182 { 00183 // Overwrite the first loss event 00184 assert(MAX_LOSS_EVENTS == this->lossEventCount); 00185 this->lossEventFirst = (++this->lossEventFirst) % MAX_LOSS_EVENTS; 00186 } 00187 else 00188 // Increment the loss event count 00189 this->lossEventCount++; 00190 00191 // Add the time and sequence number to the last loss event 00192 this->lossEvents[this->lossEventLast].firstLossSequence = seq; // The sequence of the lost packet 00193 this->lossEvents[this->lossEventLast].firstLossTime = lossTime; // The interpolated loss time 00194 00195 // Set there has been a new loss event 00196 newLossEvent = true; 00197 } 00198 // Else the lost packet belongs to the last loss event 00199 } 00200 else 00201 { 00202 // Create the first loss event 00203 assert(this->lossEventFirst == this->lossEventLast); 00204 00205 // Add the time and sequence number to the last loss event 00206 this->lossEvents[this->lossEventLast].firstLossSequence = seq; 00207 this->lossEvents[this->lossEventLast].firstLossTime = lossTime; 00208 00209 // Increment the loss event count 00210 this->lossEventCount++; 00211 00212 // Set there has been a new loss event 00213 newLossEvent = true; 00214 } 00215 } 00216 00217 // Set the maximum sequence number and time to this packet (as last packet) 00218 this->flowSequenceLast = packet->Sequence(); 00219 this->flowTimeLast = this->sim->Time(); 00220 00221 /* 00222 * Step 3 : Calculate loss rate (p) 00223 */ 00224 00225 // If there has been a new loss event 00226 if(newLossEvent) 00227 { 00228 assert(this->lossEventCount); 00229 00230 // Set the last packet for which the loss rate is calculated 00231 this->lossSequenceLast = packet->Sequence(); 00232 00233 double lossInterval; 00234 00235 // Reset previous lost intervals 00236 this->lossPrevInterval0 = 0; 00237 this->lossPrevInterval1 = 0; 00238 this->lossPrevWeigth = 0; 00239 00240 // Calculate the previous loss intervals is the diff beween the the loss events 00241 for(__byte index = 1; index < this->lossEventCount; index++) 00242 { 00243 __byte begin = ((this->lossEventLast + MAX_LOSS_EVENTS) - index) % MAX_LOSS_EVENTS; 00244 __byte end = ((this->lossEventLast + MAX_LOSS_EVENTS) - index + 1) % MAX_LOSS_EVENTS; 00245 00246 lossInterval = this->lossEvents[end].firstLossSequence - this->lossEvents[begin].firstLossSequence; 00247 00248 this->lossPrevInterval0 += lossInterval * this->lossIntervalWeigth[index]; 00249 this->lossPrevInterval1 += lossInterval * this->lossIntervalWeigth[index-1]; 00250 this->lossPrevWeigth += this->lossIntervalWeigth[index]; 00251 } 00252 00253 // Calculate the current loss interva 00254 double lossInterval0 = (this->flowSequenceLast - this->lossEvents[this->lossEventLast].firstLossSequence + 1) * this->lossIntervalWeigth[0] + this->lossPrevInterval0; 00255 double lossWeight = this->lossIntervalWeigth[0] + this->lossPrevWeigth; 00256 lossInterval = MAX(lossInterval0, this->lossPrevInterval1); 00257 00258 // Calculate the average loss interval 00259 double rateLoss = lossWeight / lossInterval; 00260 00261 // If current loss rate is greater than previous loss rate (early loss detection) 00262 if(rateLoss > this->flowLossRate) 00263 { 00264 // Set the loss rate 00265 this->flowLossRate = rateLoss; 00266 00267 // Expire the feedback timer 00268 assert(this->timerFeedback->IsSet()); 00269 this->timerFeedback->Cancel(); 00270 00271 // Send feedback 00272 this->TimerFeedback(NULL); 00273 } 00274 else 00275 { 00276 // Set the loss rate 00277 this->flowLossRate = rateLoss; 00278 } 00279 } 00280 // Else, do nothing 00281 00282 00283 // Check the feedback timer is set 00284 assert(this->timerFeedback->IsSet()); 00285 } 00286 00287 void CConnectionReceiver::TimerFeedback(CTimerInfo* info) 00288 { 00289 // The feedback timer expired 00290 00291 // Only if at least one packet has been received since the last feedback 00292 00293 if(!this->historyRecv.empty()) 00294 { 00295 #ifdef LOG_FLOW 00296 CConsole::SetColor(CConsole::LIGHT_RED); 00297 printf("\n\tRECV T = %8.3lf TimerFeedback() : %s (RTT=%.3lf)", this->sim->Time(), this->ToString(), this->flowRtt); 00298 CConsole::SetColor(CConsole::LIGHT_GRAY); 00299 #endif 00300 /* 00301 * Step 1 : Calculate the loss rate 00302 */ 00303 00304 // If the last sequence number is different from the one for which the loss was previously calculated; 00305 // and there are loss events 00306 if((this->flowSequenceLast != this->lossSequenceLast) && (this->lossEventCount)) 00307 { 00308 // Use the partial data calculated at the last loss event 00309 00310 double lossInterval0 = (this->flowSequenceLast - this->lossEvents[this->lossEventLast].firstLossSequence + 1) * this->lossIntervalWeigth[0]; 00311 double lossWeigth = this->lossIntervalWeigth[0] + this->lossPrevWeigth; 00312 double lossInterval = MAX(lossInterval0, this->lossPrevInterval1); 00313 00314 this->flowLossRate = lossWeigth / lossInterval; 00315 } 00316 00317 /* 00318 * Step 2 : Calculate the received rate since the last expiration of the feedback timer 00319 */ 00320 this->flowRateRecv = this->recvSize / (this->sim->Time() - this->timeFeedbackLast); 00321 00322 // Reset the received size 00323 this->recvSize = 0; 00324 00325 /* 00326 * Step 3 : Calculate the packets to acknowledge or report as loss 00327 */ 00328 00329 // Acknowledge ALL packets in the received history 00330 __uint32 ackSeq = 0; 00331 __uint64 ack = 0; 00332 00333 if(this->historyRecv.size() > 0) // If there are packets to acknowledge 00334 { 00335 #ifdef LOG_FLOW 00336 printf("\n\t\tACK : "); 00337 #endif 00338 // Select the first packet 00339 THistoryRecv::iterator iter = this->historyRecv.begin(); 00340 ackSeq = iter->first; 00341 00342 // Select the next packets 00343 for(; iter != this->historyRecv.end(); iter++) 00344 { 00345 // Calculate the bitmap index as the difference between the current ID and first ID 00346 __byte bit = iter->first - ackSeq; 00347 00348 // Current implementation only alows acknowledgment of a 64 range of packets at a time 00349 assert(bit < 64); 00350 00351 // Set the corresponing bit to 1 00352 ack |= (((__uint64)1) << bit); 00353 00354 #ifdef LOG_FLOW 00355 printf("%u ", iter->first); 00356 #endif 00357 } 00358 00359 // Clear the received history : all packets have been acknowledged 00360 this->historyRecv.clear(); 00361 } 00362 00363 // Report packets in the loss history with 00364 __uint32 lossSeq = 0; 00365 __uint64 loss = 0; 00366 00367 if(this->historyLoss.size() > 0) // If there are packets loss 00368 { 00369 #ifdef LOG_FLOW 00370 printf("\n\t\tLOST : "); 00371 #endif 00372 00373 // Select the first packet 00374 lossSeq = this->historyLoss.front(); 00375 00376 // Select the next packets 00377 while(!this->historyLoss.empty()) 00378 { 00379 // Get the front SEQ 00380 __uint32 seq = this->historyLoss.front(); 00381 00382 // Remove the front SEQ 00383 this->historyLoss.pop(); 00384 00385 // Calculate the bitmap index as the difference between the current ID and the first ID 00386 __byte bit = seq - lossSeq; 00387 00388 // Current implementation only alows loss reporting of a 64 range of packets at a time 00389 assert(bit < 64); 00390 00391 // Set the corresponding bit to 1 00392 loss |= (((__uint64)1) << bit); 00393 00394 #ifdef LOG_FLOW 00395 printf("%u ", seq); 00396 #endif 00397 } 00398 } 00399 00400 // Send a feedback packet 00401 CPacketConnectionFeedback* reply = new CPacketConnectionFeedback( 00402 this->flow, 00403 this->id, 00404 this->remoteId, 00405 this->idEntry, 00406 this->remoteIdEntry, 00407 this->timeTxLast, 00408 this->timeRxLast, 00409 this->sim->Time() - this->timeTxLast, // Time delay : current time minus last Tx time 00410 this->flowRateRecv, 00411 this->flowLossRate, 00412 ack, // Bitmap of acknowledged packets ( **** 0001) 00413 ackSeq, // SEQ of first acknowledged packet 00414 loss, // Bitmap of loss packets (N/A) 00415 lossSeq // SEQ of first loss packet 00416 ); 00417 (*this->delegateSend)(this->port, this->remotePort, this->remoteAddress, 128, reply); 00418 } 00419 else 00420 { 00421 // Do nothing 00422 #ifdef LOG_FLOW 00423 CConsole::SetColor(CConsole::LIGHT_YELLOW); 00424 printf("\n\tRECV T = %8.3lf TimerFeedback() : %s (RTT=%.3lf)", this->sim->Time(), this->ToString(), this->flowRtt); 00425 CConsole::SetColor(CConsole::LIGHT_GRAY); 00426 #endif 00427 } 00428 00429 // Set the current time of the expiration of the feedback timer 00430 this->timeFeedbackLast = this->sim->Time(); 00431 00432 // Set the feedback timer after one RTT 00433 this->timerFeedback->SetAfter(this->flowRtt); 00434 } 00435 00436 00437 char* CConnectionReceiver::ToString() const 00438 { 00439 sprintf((char*)this->str, "[ l=*:%u:%u:%u r=%u:%u:%u:%u t=%s s=%s recv=%u loss=%u ]", 00440 this->port, this->id, this->idEntry, ((CAddress)this->remoteAddress).Address(), this->remotePort, this->remoteId, this->remoteIdEntry, 00441 CConnection::strConnectionType[this->type], CConnection::strState[this->state], 00442 this->historyRecv.size(), this->historyLoss.size() 00443 ); 00444 00445 return (char*)this->str; 00446 }
Last updated: February 8, 2011