Alex Bikfalvi
SimStream Documentation
StreamClientPull.cpp
00001 #include "Headers.h" 00002 #include "StreamClientPull.h" 00003 #include "Rand.h" 00004 #include "Debug.h" 00005 #include "Console.h" 00006 #include "PacketStreamFrame.h" 00007 00008 CStreamClientPull::CStreamClientPull( 00009 CSimHandler* sim, 00010 CInfoPull* info, 00011 CAddress address, 00012 IDelegate5<void, __uint16, __uint16, CAddress, __byte, CPacket*>* delegateSend, 00013 IDelegate2<void, CAddress, __uint32>* delegateIgmpJoin, 00014 IDelegate1<void, CAddress>* delegateIgmpLeave, 00015 IDelegate0<CLink*>* delegateLink, 00016 __uint32 entry 00017 ) : CStreamClient(sim) 00018 { 00019 assert(info); 00020 00021 assert(delegateSend); 00022 assert(delegateIgmpJoin); 00023 assert(delegateIgmpLeave); 00024 assert(delegateLink); 00025 00026 // Set the initial state 00027 this->stateClient = STOPPED; 00028 this->stateRegistration = NOT_REGISTERED; 00029 this->stateSchedule = SCHEDULE_IDLE; 00030 00031 // Set simulator 00032 this->info = info; 00033 this->address = address; 00034 00035 // Set the channel 00036 this->channel = NULL; 00037 00038 // Create the buffers 00039 this->bufferMcast = new CStreamBuffer( 00040 this->info->StreamBufferMcastSize(), 00041 this->info->StreamBufferMcastSizeHistory() 00042 ); 00043 this->bufferUcast = new CStreamBufferSegment( 00044 this->info->StreamBufferUcastSize(), 00045 this->info->StreamBufferUcastSizeHistory(), 00046 this->info->StreamSegmentSize() 00047 ); 00048 00049 // Multicast session 00050 this->entry = entry; 00051 00052 // Create the timers 00053 this->timerPlayMcast = new CTimer<CStreamClientPull>( 00054 this->sim, 00055 this, 00056 &CStreamClientPull::TimerPlayMcast 00057 ); 00058 this->timerPlayUcast = new CTimer<CStreamClientPull>( 00059 this->sim, 00060 this, 00061 &CStreamClientPull::TimerPlayUcast 00062 ); 00063 this->timerSchedule = new CTimer<CStreamClientPull>( 00064 this->sim, 00065 this, 00066 &CStreamClientPull::TimerSchedule 00067 ); 00068 this->timerRegister = new CTimer<CStreamClientPull>( 00069 this->sim, 00070 this, 00071 &CStreamClientPull::TimerRegister 00072 ); 00073 00074 // Set the lower layer delegates 00075 this->delegateIgmpJoin = delegateIgmpJoin; 00076 this->delegateIgmpLeave = delegateIgmpLeave; 00077 this->delegateSend = delegateSend; 00078 this->delegateLink = delegateLink; 00079 00080 // Delegates 00081 this->delegateRecvFrameMcast = new Delegate2<CStreamClientPull, void, CAddress, CStreamFrame>(this, &CStreamClientPull::RecvFrameMcast); 00082 this->delegateConnectionRecv = new Delegate2<CStreamClientPull, void, CConnectionReceiver*, CPacket*>(this, &CStreamClientPull::ConnectionRecv); 00083 this->delegateConnectionAccept = new Delegate2<CStreamClientPull, bool, CAddress, CPacket*>(this, &CStreamClientPull::ConnectionAccept); 00084 this->delegateConnectionAccepted = new Delegate1<CStreamClientPull, void, CConnectionSender*>(this, &CStreamClientPull::ConnectionAccepted); 00085 this->delegateConnectionReceiverOpen = new Delegate2<CStreamClientPull, void, CConnection*, CConnection::EOpenResult>(this, &CStreamClientPull::ConnectionReceiverOpen); 00086 this->delegateConnectionReceiverClose = new Delegate2<CStreamClientPull, void, CConnection*, CConnection::ECloseResult>(this, &CStreamClientPull::ConnectionReceiverClose); 00087 this->delegateConnectionSenderOpen = new Delegate2<CStreamClientPull, void, CConnection*, CConnection::EOpenResult>(this, &CStreamClientPull::ConnectionSenderOpen); 00088 this->delegateConnectionSenderClose = new Delegate2<CStreamClientPull, void, CConnection*, CConnection::ECloseResult>(this, &CStreamClientPull::ConnectionSenderClose); 00089 00090 // Decoder 00091 this->decoderMcast = new CStreamDecoderFrame(this->delegateRecvFrameMcast); 00092 00093 // Connections 00094 this->connectionLayer = new CConnectionLayer( 00095 this->sim, 00096 this->info->PortConnection(), 00097 this->info->ClientConnectionsMax(), 00098 delegateSend, 00099 this->delegateConnectionRecv, 00100 this->delegateConnectionAccept, 00101 this->delegateConnectionAccepted, 00102 this->info->ConnectionSegmentSize()); 00103 00104 // The deadline delta : the approximate buffering time 00105 this->scheduleDeadlineDelta = this->info->StreamFrameInterval() * this->info->StreamSegmentSize() * this->info->StreamBufferUcastSizeBuffering(); 00106 } 00107 00108 CStreamClientPull::~CStreamClientPull() 00109 { 00110 // Delete delegates 00111 delete this->delegateRecvFrameMcast; 00112 00113 delete this->delegateConnectionRecv; 00114 delete this->delegateConnectionAccept; 00115 delete this->delegateConnectionAccepted; 00116 00117 delete this->delegateConnectionReceiverOpen; 00118 delete this->delegateConnectionReceiverClose; 00119 delete this->delegateConnectionSenderOpen; 00120 delete this->delegateConnectionSenderClose; 00121 00122 // Delete coders 00123 delete this->decoderMcast; 00124 00125 // Delete the buffers 00126 delete this->bufferMcast; 00127 delete this->bufferUcast; 00128 00129 // Delete the timers 00130 delete this->timerPlayMcast; 00131 delete this->timerPlayUcast; 00132 delete this->timerSchedule; 00133 delete this->timerRegister; 00134 00135 // Delete connections 00136 delete this->connectionLayer; 00137 00138 // Delete senders and receivers 00139 while(!this->senders.empty()) delete this->senders.begin()->second; 00140 while(!this->sendersConnecting.empty()) delete this->sendersConnecting.begin()->second; 00141 while(!this->receivers.empty()) delete this->receivers.begin()->second; 00142 } 00143 00144 void CStreamClientPull::Start(CChannel* channel) 00145 { 00146 assert(channel); 00147 00148 // Check the state 00149 assert(STOPPED == this->stateClient); 00150 00151 // Check the timers are stopped 00152 assert(!this->timerPlayMcast->IsSet()); 00153 assert(!this->timerPlayUcast->IsSet()); 00154 assert(!this->timerSchedule->IsSet()); 00155 assert(!this->timerRegister->IsSet()); 00156 00157 // Set the channel 00158 this->channel = channel; 00159 00160 // Log 00161 #ifdef LOG 00162 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00163 printf("\n\tT = %7.3lf PEER %3u START %3u (%u)", this->sim->Time(), this->address.Address(), this->channel->Id(), this->channel->Type()); 00164 #endif 00165 00166 // Reset statistics 00167 this->statRecvFrames = 0; 00168 this->statDiscardedFrames = 0; 00169 this->statPlayFrames = 0; 00170 this->statSuccessFrames[0] = 0; 00171 this->statSuccessFrames[1] = 0; 00172 this->statSuccessFrames[2] = 0; 00173 this->statFailFrames[0] = 0; 00174 this->statFailFrames[1] = 0; 00175 this->statFailFrames[2] = 0; 00176 00177 this->statTimeClientStart = this->sim->Time(); 00178 this->statTimeRecvStart = -1; 00179 this->statTimePlayStart = -1; 00180 this->statTimeWait = 0; 00181 00182 this->statPlayFirstFrame = 0; 00183 this->statPlayLastFrame = 0; 00184 00185 this->statSyncDelaySum = 0; 00186 this->statSyncDelayCount = 0; 00187 00188 // Check the channel type 00189 switch(this->channel->Type()) 00190 { 00191 case CChannel::CHANNEL_MULTICAST: this->StartMcast(); break; 00192 case CChannel::CHANNEL_UNICAST: this->StartUcast(); break; 00193 } 00194 } 00195 00196 void CStreamClientPull::StartMcast() 00197 { 00198 // Start session with phase 1 00199 this->processFrame = &CStreamClientPull::ProcessFrameMcastFirst; 00200 00201 // Reset the decoder 00202 this->decoderMcast->Reset(this->channel->Id()); 00203 00204 // Join the multicast group 00205 (*this->delegateIgmpJoin)(this->channel->Address(), this->entry); 00206 00207 // Change the state 00208 this->stateClient = MCAST_STREAM_FIRST; 00209 } 00210 00211 void CStreamClientPull::StartUcast() 00212 { 00213 // Check the registration state (client must not be registered) 00214 assert_client(NOT_REGISTERED == this->stateRegistration); 00215 assert_client(SCHEDULE_IDLE == this->stateSchedule); 00216 00217 // Check the neighbor list is empty 00218 assert_client(this->neighbors.empty()); 00219 00220 // Check the connecting senders list is empty 00221 assert_client(this->sendersConnecting.empty()); 00222 00223 // Check the senders list is empty 00224 assert_client(this->senders.empty()); 00225 00226 // Check the receivers list is empty 00227 assert_client(this->receivers.empty()); 00228 00229 // Check the requests 00230 assert_client(this->requestBitmaps.empty()); 00231 assert_client(this->requestSegments.empty()); 00232 00233 // Change the client state 00234 this->stateClient = UCAST_STREAM_FIRST; 00235 00236 // Set processing function 00237 this->processFrame = &CStreamClientPull::ProcessFrameUcastFirst; 00238 00239 // Reset the buffer to the initial segment 00240 __uint32 initialSegment = this->info->StreamSource(this->channel->Id())->Synchronize(this->sim->Time()) / this->info->StreamSegmentSize(); 00241 this->bufferUcast->Reset(this->channel->Id(), initialSegment); 00242 00243 #ifdef LOG 00244 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00245 printf("\n\tT = %7.3lf PEER %3u RESET BUFFER to ch=%u seg=%u", this->sim->Time(), this->address.Address(), this->channel->Id(), initialSegment); 00246 #endif 00247 00248 // Start the schedule 00249 this->TimerSchedule(NULL); 00250 } 00251 00252 void CStreamClientPull::Stop() 00253 { 00254 // Check the state 00255 assert_client(STOPPED != this->stateClient); 00256 00257 // Log 00258 #ifdef LOG 00259 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00260 printf("\n\tT = %7.3lf PEER %3u STOP %3u", this->sim->Time(), this->address.Address(), this->channel->Id()); 00261 #endif 00262 00263 // Stop client 00264 switch(this->channel->Type()) 00265 { 00266 case CChannel::CHANNEL_MULTICAST: this->StopMcast(); break; 00267 case CChannel::CHANNEL_UNICAST: this->StopUcast(); break; 00268 } 00269 00270 // If there are active timer, cancel 00271 if(this->timerPlayMcast->IsSet()) this->timerPlayMcast->Cancel(); 00272 if(this->timerPlayUcast->IsSet()) this->timerPlayUcast->Cancel(); 00273 if(this->timerSchedule->IsSet()) this->timerSchedule->Cancel(); 00274 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 00275 00276 // Finalize statistics 00277 this->statTimeFinish = this->sim->Time(); 00278 00279 // Set channel to null 00280 this->channel = NULL; 00281 00282 // Change the state 00283 this->stateClient = STOPPED; 00284 } 00285 00286 void CStreamClientPull::StopMcast() 00287 { 00288 // Leave the multicast group 00289 (*this->delegateIgmpLeave)(this->channel->Address()); 00290 00291 // If the session is waiting, add the waiting time 00292 if(MCAST_STREAM_WAIT == this->stateClient) this->statTimeWait += (this->sim->Time() - this->sessionTimeLastStop); 00293 } 00294 00295 void CStreamClientPull::StopUcast() 00296 { 00297 // Close all receivers 00298 while(!this->receivers.empty()) 00299 { 00300 // Get the first receiver 00301 CStreamPullReceiver::TReceiverList::iterator receiver = this->receivers.begin(); 00302 00303 // Close the connection 00304 receiver->second->Connection()->Close(); 00305 00306 // Delete the receiver (this will remove the connection tag and the receiver from the list) 00307 if(this->receivers.begin() == receiver) delete this->receivers.begin()->second; 00308 } 00309 00310 // Close all senders 00311 while(!this->senders.empty()) 00312 { 00313 // Get the first sender 00314 CStreamPullSender::TSenderList::iterator sender = this->senders.begin(); 00315 00316 // Close the connection 00317 sender->second->Connection()->Close(); 00318 00319 // Delete the sender (this will remove the connection tag and the sender from the list) 00320 if(this->senders.begin() == sender) delete sender->second; 00321 } 00322 00323 // Clear connecting senders 00324 while(!this->sendersConnecting.empty()) 00325 { 00326 // Get the first sender 00327 CStreamPullSender::TSenderList::iterator sender = this->sendersConnecting.begin(); 00328 00329 // Close the connection 00330 sender->second->Connection()->Close(); 00331 00332 // Delete the connecting sender (this will remove the connection tag and the sender from the list) 00333 if(this->sendersConnecting.begin() == sender) delete sender->second; 00334 } 00335 00336 // Clear neighbors 00337 this->neighbors.clear(); 00338 00339 // Clear requests 00340 this->requestBitmaps.clear(); 00341 this->requestSegments.clear(); 00342 00343 // Deregister 00344 if(REGISTERED == this->stateRegistration) this->UcastDeregister(); 00345 00346 // Change the state 00347 this->stateSchedule = SCHEDULE_IDLE; 00348 } 00349 00350 void CStreamClientPull::Recv(CAddress srcAddress, CAddress dstAddress, __uint16 srcPort, __uint16 dstPort, CPacket* packet) 00351 { 00352 // Process received packets 00353 if(NULL == packet) return; 00354 if((dstAddress != this->address) && (!dstAddress.IsMulticast())) return; 00355 00356 // Process packet depending on UDP destination 00357 if(this->info->PortStream() == dstPort) this->RecvStream(srcAddress, type_cast<CPacketStream*>(packet)); 00358 if(this->info->PortControl() == dstPort) this->RecvMessage(srcAddress, type_cast<CStreamMessage*>(packet)); 00359 if(this->info->PortConnection() == dstPort) this->RecvConnection(srcAddress, srcPort, dstPort, type_cast<CPacketConnection*>(packet)); 00360 } 00361 00362 void CStreamClientPull::RecvStream(CAddress src, CPacketStream* packet) 00363 { 00364 #ifdef LOG 00365 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00366 { 00367 CConsole::SetColor(CConsole::DARK_GRAY); 00368 printf("\n\tT = %7.3lf RECV STREAM %u --- %u : ", this->sim->Time(), packet->Stream(), src.Address()); 00369 CConsole::SetColor(CConsole::LIGHT_GRAY); 00370 } 00371 #endif 00372 00373 // Send received stream packets to the decoder 00374 CStreamDecoder::EResult result = this->decoderMcast->Decode(src, packet); 00375 00376 #ifdef LOG 00377 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00378 { 00379 CConsole::SetColor(CConsole::LIGHT_RED); 00380 switch(result) 00381 { 00382 case CStreamDecoder::FAIL_PACKET_TYPE: printf("FAIL PACKET TYPE"); break; 00383 case CStreamDecoder::FAIL_STREAM: printf("FAIL STREAM"); break; 00384 } 00385 CConsole::SetColor(CConsole::LIGHT_GRAY); 00386 } 00387 #endif 00388 } 00389 00390 void CStreamClientPull::RecvMessage(CAddress src, CStreamMessage* message) 00391 { 00392 // Process messages 00393 00394 switch(message->MessageType()) 00395 { 00396 case CStreamMessage::STREAM_MESSAGE_BOOT_PULL_RESPONSE: this->RecvMessageBootResponse(src, type_cast<CStreamMessageBootPullResponse*>(message)); break; 00397 case CStreamMessage::STREAM_MESSAGE_PULL_BITMAP_REQUEST: this->RecvMessageBitmapRequest(src, type_cast<CStreamMessagePullBitmapRequest*>(message)); break; 00398 case CStreamMessage::STREAM_MESSAGE_PULL_BITMAP_RESPONSE: this->RecvMessageBitmapResponse(src, type_cast<CStreamMessagePullBitmapResponse*>(message)); break; 00399 case CStreamMessage::STREAM_MESSAGE_PULL_SEGMENT_REQUEST: this->RecvMessageSegmentRequest(src, type_cast<CStreamMessagePullSegmentRequest*>(message)); break; 00400 case CStreamMessage::STREAM_MESSAGE_PULL_SEGMENT_RESPONSE: this->RecvMessageSegmentResponse(src, type_cast<CStreamMessagePullSegmentResponse*>(message)); break; 00401 default: assert_client(0); /* do nothing */ 00402 } 00403 } 00404 00405 void CStreamClientPull::RecvConnection(CAddress srcAddress, __uint16 srcPort, __uint16 dstPort, CPacketConnection* packet) 00406 { 00407 // Pass the packet to the connection layer 00408 this->connectionLayer->Recv(srcAddress, srcPort, dstPort, packet); 00409 } 00410 00411 void CStreamClientPull::RecvFrameMcast(CAddress src, CStreamFrame frame) 00412 { 00413 #ifdef LOG 00414 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00415 { 00416 CConsole::SetColor(CConsole::LIGHT_CYAN); 00417 printf("\n\tT = %7.3lf RECV FRAME (M) %u", this->sim->Time(), frame.Index()); 00418 CConsole::SetColor(CConsole::LIGHT_GRAY); 00419 } 00420 #endif 00421 00422 // Check the state 00423 if(this->stateClient < MCAST_STREAM_FIRST) 00424 { 00425 this->statDiscardedFrames = 0; 00426 return; 00427 } 00428 00429 // Check the decoder passed a correct frame 00430 if(frame.Stream() != this->channel->Id()) 00431 { 00432 this->statDiscardedFrames = 0; 00433 return; 00434 } 00435 00436 // Process frame 00437 (this->*this->processFrame)(frame); 00438 00439 // Statistics 00440 this->statRecvFrames++; 00441 } 00442 00443 void CStreamClientPull::ProcessFrameMcastFirst(CStreamFrame frame) 00444 { 00445 // PLAYBACK Phase 1 : Process first frame to initialize playback buffer 00446 00447 #ifdef LOG 00448 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00449 printf("\t\t[MCAST FIRST]"); 00450 #endif 00451 00452 // Check the state 00453 assert_client(MCAST_STREAM_FIRST == this->stateClient); 00454 00455 // Reset the buffer 00456 this->bufferMcast->Reset(frame.Index()); 00457 00458 // Add the frame to the buffer 00459 this->bufferMcast->Add(frame); 00460 00461 // Statistics 00462 this->statTimeRecvStart = this->sim->Time(); 00463 00464 // Switch to process buffering frames (phase 2) 00465 this->processFrame = &CStreamClientPull::ProcessFrameMcastBuffering; 00466 00467 // Change the state 00468 this->stateClient = MCAST_STREAM_BUFFERING; 00469 } 00470 00471 void CStreamClientPull::ProcessFrameMcastBuffering(CStreamFrame frame) 00472 { 00473 // PLAYBACK Phase 2 : Buffering phase (playback is not started and frames are saved in the buffer) 00474 00475 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00476 // buffer, plus the buffering size 00477 00478 #ifdef LOG 00479 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00480 printf("\t\t[MCAST BUFFERING]"); 00481 #endif 00482 00483 // Check the state 00484 assert_client(MCAST_STREAM_BUFFERING == this->stateClient); 00485 00486 // Add the frame to the buffer 00487 this->bufferMcast->Add(frame); 00488 00489 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00490 if(frame.Index() >= this->bufferMcast->CurrentIndex() + this->info->StreamBufferMcastSizeBuffering()) 00491 { 00492 // Find the first independent frame 00493 for(__uint32 count = 0; count < this->bufferMcast->Count(); count++) 00494 { 00495 if(this->bufferMcast->IsCurrentIndependent()) 00496 { 00497 // First frame found : start playback 00498 00499 // Start playback : save session and stat information 00500 this->sessionFrameLastStart = this->bufferMcast->CurrentIndex(); 00501 this->sessionTimeLastStart = this->sim->Time(); 00502 00503 this->statPlayFirstFrame = this->bufferMcast->CurrentIndex(); 00504 this->statTimePlayStart = this->sim->Time(); 00505 00506 // Switch to process playback frames (phase 3) 00507 this->processFrame = &CStreamClientPull::ProcessFrameMcastPlay; 00508 00509 // Change the state 00510 this->stateClient = MCAST_STREAM_PLAY; 00511 00512 // Call the play timer 00513 this->TimerPlayMcast(NULL); 00514 00515 break; 00516 } 00517 else this->bufferMcast->Shift(); 00518 } 00519 } 00520 } 00521 00522 void CStreamClientPull::ProcessFrameMcastPlay(CStreamFrame frame) 00523 { 00524 #ifdef LOG 00525 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00526 printf("\t\t[MCAST PLAY]"); 00527 #endif 00528 00529 // PLAYBACK Phase 3 : Play phase 00530 assert_client(MCAST_STREAM_PLAY == this->stateClient); 00531 00532 // Add the frame to the buffer 00533 this->bufferMcast->Add(frame); 00534 } 00535 00536 void CStreamClientPull::ProcessFrameMcastWait(CStreamFrame frame) 00537 { 00538 // PLAYBACK Wait : Buffering phase after a buffer underrun (playback is not started and frames are saved in the buffer) 00539 00540 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00541 // buffer, plus the buffering size 00542 00543 #ifdef LOG 00544 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00545 printf("\t\t[MCAST WAIT]"); 00546 #endif 00547 00548 // Check the state 00549 assert_client(MCAST_STREAM_WAIT == this->stateClient); 00550 00551 // Add the frame to the buffer 00552 this->bufferMcast->Add(frame); 00553 00554 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00555 if(frame.Index() >= this->bufferMcast->CurrentIndex() + this->info->StreamBufferMcastSizeBuffering()) 00556 { 00557 // Find the first frame 00558 for(__uint32 count = 0; count < this->bufferMcast->Count(); count++) 00559 { 00560 if(this->bufferMcast->HasCurrentIndex()) 00561 { 00562 // Check the last stop 00563 assert_client(this->sim->Time() >= this->sessionTimeLastStop); 00564 00565 // Save statistics 00566 this->statTimeWait += this->sim->Time() - this->sessionTimeLastStop; 00567 00568 // Restart playback 00569 this->sessionFrameLastStart = this->bufferMcast->CurrentIndex(); 00570 this->sessionTimeLastStart = this->sim->Time(); 00571 00572 // Switch to process playback frames (phase 3) 00573 this->processFrame = &CStreamClientPull::ProcessFrameMcastPlay; 00574 00575 // Change the state 00576 this->stateClient = MCAST_STREAM_PLAY; 00577 00578 // Call the play timer 00579 this->TimerPlayMcast(NULL); 00580 00581 break; 00582 } 00583 else 00584 { 00585 // Increment the number of played frames 00586 this->statPlayFrames++; 00587 00588 // Playback sync statistics 00589 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferMcast->CurrentIndex()); 00590 this->statSyncDelayCount++; 00591 00592 // Mark the frame as failed 00593 assert_client(this->info->StreamSource(this->channel->Id())->FrameType(this->bufferMcast->CurrentIndex()) < 3); 00594 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferMcast->CurrentIndex())]++; 00595 00596 // Shift the buffer 00597 this->bufferMcast->Shift(); 00598 } 00599 } 00600 } 00601 } 00602 00603 void CStreamClientPull::ProcessFrameUcastFirst(CStreamFrame frame) 00604 { 00605 // Process first frame 00606 00607 #ifdef LOG 00608 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00609 printf("\n\tADD %u (%u/%u) [UCAST FIRST]", frame.Index(), frame.SegmentIndex(), frame.Segment()); 00610 #endif 00611 00612 // Check the client state 00613 assert_client(UCAST_STREAM_FIRST == this->stateClient); 00614 00615 // Add the frame to the buffer 00616 this->bufferUcast->Add(frame); 00617 00618 // Statistics 00619 this->statTimeRecvStart = this->sim->Time(); 00620 00621 // Switch to process buffering frames (phase 2) 00622 this->processFrame = &CStreamClientPull::ProcessFrameUcastBuffering; 00623 00624 // Change the client state 00625 this->stateClient = UCAST_STREAM_BUFFERING; 00626 } 00627 00628 void CStreamClientPull::ProcessFrameUcastBuffering(CStreamFrame frame) 00629 { 00630 // PLAYBACK Phase 2 : Buffering phase (playback is not started and frames are saved in the buffer) 00631 00632 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00633 // buffer, plus the buffering size 00634 00635 // Check the state 00636 assert_client(UCAST_STREAM_BUFFERING == this->stateClient); 00637 00638 // Add the frame to the buffer 00639 this->bufferUcast->Add(frame); 00640 00641 #ifdef LOG 00642 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00643 printf("\n\tADD %u (%u/%u) [UCAST BUFFERING]\t(segment_index=%u frame_index=%u buffering_size=%u)", frame.Index(), frame.SegmentIndex(), frame.Segment(), this->bufferUcast->CurrentIndexSegment(), this->bufferUcast->CurrentIndexFrame(), this->info->StreamBufferUcastSizeBuffering()); 00644 #endif 00645 00646 // Check if the buffering is complete 00647 bool bufferingComplete = true; 00648 for(__uint32 index = 0; (index < this->info->StreamBufferUcastSizeBuffering()) && bufferingComplete; index++) 00649 { 00650 bufferingComplete = bufferingComplete && this->bufferUcast->IsSegmentComplete(this->bufferUcast->CurrentIndexSegment() + index); 00651 } 00652 00653 if(bufferingComplete) 00654 { 00655 #ifdef LOG 00656 CConsole::SetColor(CConsole::LIGHT_GREEN); 00657 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00658 printf("YES"); 00659 CConsole::SetColor(CConsole::LIGHT_GRAY); 00660 #endif 00661 00662 // Buffering is complete : start playback 00663 00664 this->sessionFrameLastStart = this->bufferUcast->CurrentIndexFrame(); 00665 this->sessionTimeLastStart = this->sim->Time(); 00666 00667 this->statPlayFirstFrame = this->bufferUcast->CurrentIndexFrame(); 00668 this->statTimePlayStart = this->sim->Time(); 00669 00670 // Switch to process playback frames (phase 3) 00671 this->processFrame = &CStreamClientPull::ProcessFrameUcastPlay; 00672 00673 // Change the state 00674 this->stateClient = UCAST_STREAM_PLAY; 00675 00676 // Call the play timer 00677 this->TimerPlayUcast(NULL); 00678 00679 // Set the registration timer 00680 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 00681 this->timerRegister->SetAfter(this->info->BootRegisterDelay()); 00682 } 00683 else 00684 { 00685 #ifdef LOG 00686 CConsole::SetColor(CConsole::LIGHT_RED); 00687 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00688 printf("NO"); 00689 CConsole::SetColor(CConsole::LIGHT_GRAY); 00690 #endif 00691 } 00692 } 00693 00694 void CStreamClientPull::ProcessFrameUcastPlay(CStreamFrame frame) 00695 { 00696 #ifdef LOG 00697 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00698 { 00699 printf("\t\t[UCAST PLAY]"); 00700 if(this->bufferUcast->IsSegmentComplete(this->bufferUcast->CurrentIndexSegment())) 00701 { 00702 CConsole::SetColor(CConsole::LIGHT_GREEN); 00703 printf("YES"); 00704 CConsole::SetColor(CConsole::LIGHT_GRAY); 00705 } 00706 else 00707 { 00708 CConsole::SetColor(CConsole::LIGHT_RED); 00709 printf("NO"); 00710 CConsole::SetColor(CConsole::LIGHT_GRAY); 00711 } 00712 } 00713 #endif 00714 00715 // PLAYBACK Phase 3 : Play phase 00716 assert_client(UCAST_STREAM_PLAY == this->stateClient); 00717 00718 // Add the frame to the buffer and if the frame is new send it to the receivers 00719 this->bufferUcast->Add(frame); 00720 } 00721 00722 void CStreamClientPull::TimerPlayMcast(CTimerInfo* info) 00723 { 00724 assert_client(MCAST_STREAM_PLAY == this->stateClient); 00725 00726 // Check if there is a buffer underrun 00727 if(this->bufferMcast->NumPlay() == 0) 00728 { 00729 // Change the state 00730 this->stateClient = MCAST_STREAM_WAIT; 00731 00732 // Pause playback 00733 this->sessionTimeLastStop = this->sim->Time(); 00734 00735 // Switch to process wait frames (phase 3) 00736 this->processFrame = &CStreamClientPull::ProcessFrameMcastWait; 00737 00738 return; 00739 } 00740 00741 // Play the first frame from the buffer 00742 this->statPlayFrames++; 00743 00744 // Playback sync statistics 00745 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferMcast->CurrentIndex()); 00746 this->statSyncDelayCount++; 00747 00748 if(this->bufferMcast->IsCurrentDecodable()) 00749 { 00750 this->statPlayLastFrame = this->bufferMcast->CurrentIndex(); 00751 assert_client(this->bufferMcast->CurrentFrame()->Type() < 3); 00752 this->statSuccessFrames[this->bufferMcast->CurrentFrame()->Type()]++; 00753 } 00754 else 00755 { 00756 assert_client(this->info->StreamSource(this->channel->Id())->FrameType(this->bufferMcast->CurrentIndex()) < 3); 00757 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferMcast->CurrentIndex())]++; 00758 } 00759 00760 // Shift the buffer 00761 this->bufferMcast->Shift(); 00762 00763 // Schedule next play event 00764 this->timerPlayMcast->SetAt(this->sessionTimeLastStart + ((__time)(this->bufferMcast->CurrentIndex() - this->sessionFrameLastStart)) / (__time)this->channel->Fps()); 00765 } 00766 00767 void CStreamClientPull::TimerPlayUcast(CTimerInfo* info) 00768 { 00769 assert_client(UCAST_STREAM_PLAY == this->stateClient); 00770 00771 // Play the first frame from the buffer 00772 this->statPlayFrames++; 00773 00774 // Playback sync statistics 00775 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferUcast->CurrentIndexFrame()); 00776 this->statSyncDelayCount++; 00777 00778 if(this->bufferUcast->IsCurrentFrameDecodable()) 00779 { 00780 assert_client(this->bufferUcast->HasCurrentFrame()); 00781 00782 assert_client(this->bufferUcast->CurrentIndexFrame() == this->bufferUcast->CurrentFrame().Index()); 00783 assert_client(this->bufferUcast->CurrentFrame().Type() < 3); 00784 00785 this->statPlayLastFrame = this->bufferUcast->CurrentIndexFrame(); 00786 this->statSuccessFrames[this->bufferUcast->CurrentFrame().Type()]++; 00787 00788 #ifdef LOG 00789 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00790 { 00791 CConsole::SetColor(CConsole::LIGHT_GREEN); 00792 printf("\n\t\tT = %7.3lf PLAY OKAY %6u\t%c", this->sim->Time(), this->bufferUcast->CurrentIndexFrame(), stringFrameType[this->bufferUcast->CurrentFrame().Type()]); 00793 CConsole::SetColor(CConsole::LIGHT_GRAY); 00794 printf("\t\t(time=%.3lf frame_time=%.3lf sync_delay=%.3lf)", this->sim->Time(), this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferUcast->CurrentIndexFrame()), 00795 this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferUcast->CurrentIndexFrame())); 00796 } 00797 #endif 00798 } 00799 else 00800 { 00801 assert_client(this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndexFrame()) < 3); 00802 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndexFrame())]++; 00803 00804 #ifdef LOG 00805 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00806 { 00807 CConsole::SetColor(CConsole::LIGHT_RED); 00808 printf("\n\t\tT = %7.3lf PLAY FAIL %6u\t%c : %s", this->sim->Time(), this->bufferUcast->CurrentIndexFrame(), stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndexFrame())], 00809 this->bufferUcast->HasCurrentFrame()?"DECODE_FAIL":"MISSING"); 00810 CConsole::SetColor(CConsole::LIGHT_GRAY); 00811 printf("\t\t(time=%.3lf frame_time=%.3lf sync_delay=%.3lf)", this->sim->Time(), this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferUcast->CurrentIndexFrame()), 00812 this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferUcast->CurrentIndexFrame())); 00813 } 00814 #endif 00815 } 00816 00817 // Shift the buffer 00818 this->bufferUcast->ShiftFrame(); 00819 00820 if(REGISTERED == this->stateRegistration) 00821 { 00822 // Limit peer utilization : if the uplink utilization exceeds the available minus a guard interval of 20 % 00823 CLink* link = (*this->delegateLink)(); 00824 00825 if((link->MeterUtil(0) > 0.8 * link->Bandwidth(0)) || (this->receivers.size() > 0.8*link->Bandwidth(0) / this->channel->Bw())) 00826 { 00827 // If the client is registered, deregister 00828 if(REGISTERED == this->stateRegistration) this->UcastDeregister(); 00829 00830 // If the registration timer is set, cancel the timer 00831 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 00832 } 00833 } 00834 00835 // Schedule next play event 00836 this->timerPlayUcast->SetAt(this->sessionTimeLastStart + ((__time)(this->bufferUcast->CurrentIndexFrame() - this->sessionFrameLastStart)) / (__time)this->channel->Fps()); 00837 } 00838 00839 void CStreamClientPull::TimerRegister(CTimerInfo* info) 00840 { 00841 this->UcastRegister(); 00842 } 00843 00844 void CStreamClientPull::TimerSchedule(CTimerInfo* info) 00845 { 00846 // Check the client and schedule status 00847 assert_client(this->stateClient >= UCAST_STREAM_FIRST); 00848 assert_client(this->stateSchedule == SCHEDULE_IDLE); 00849 00850 // Calculate the schedule segment 00851 this->scheduleSegment = this->bufferUcast->CurrentIndexSegment(); 00852 00853 #ifdef LOG 00854 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00855 printf("\n\tT = %7.3lf PEER %3u TIMER_SCHEDULE neighbors %u of %u : ", this->sim->Time(), this->address.Address(), this->neighbors.size(), this->info->NumPartners()); 00856 #endif 00857 00858 // If the number of senders is less than the number of partners 00859 if(this->neighbors.size() < this->info->NumPartners()) 00860 { 00861 // Go to bootstrap phase 00862 00863 // Change the schedule state 00864 this->stateSchedule = SCHEDULE_BOOTSTRAP_REQUEST; 00865 00866 this->ScheduleBootstrapRequest(); 00867 } 00868 else 00869 { 00870 // Go to bitmap phase 00871 00872 // Change the schedule state 00873 this->stateSchedule = SCHEDULE_BITMAP_REQUEST; 00874 00875 this->ScheduleBitmapRequest(); 00876 } 00877 00878 // Limit peer utilization : if the uplink utilization exceeds the available minus a guard interval of 20 % 00879 // If playback has started 00880 if(UCAST_STREAM_PLAY == this->stateClient) 00881 { 00882 // If not registered and registration timer is not set 00883 if((NOT_REGISTERED == this->stateRegistration) && (!this->timerRegister->IsSet())) 00884 { 00885 CLink* link = (*this->delegateLink)(); 00886 00887 if((link->MeterUtil(0) < 0.7 * link->Bandwidth(0)) && (this->receivers.size() < 0.7*link->Bandwidth(0)/this->channel->Bw())) 00888 { 00889 // Register 00890 this->UcastRegister(); 00891 } 00892 } 00893 } 00894 00895 // Reset the schedule timer after the schedule interval 00896 this->timerSchedule->SetAfter(this->info->ScheduleInterval()); 00897 } 00898 00899 void CStreamClientPull::ScheduleBootstrapRequest() 00900 { 00901 // Check the state 00902 assert_client(SCHEDULE_BOOTSTRAP_REQUEST == this->stateSchedule); 00903 00904 #ifdef LOG 00905 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00906 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP_REQUEST channel=%u", this->sim->Time(), this->address.Address(), this->channel->Id()); 00907 #endif 00908 00909 // Send the bootstrap request 00910 CStreamMessageBootPullRequest* message = new CStreamMessageBootPullRequest( 00911 this->channel->Id(), 00912 this->info->NumPartners()); 00913 00914 this->SendMessage(this->channel->Address(), message); 00915 00916 // Change the state 00917 this->stateSchedule = SCHEDULE_BOOTSTRAP_RESPONSE; 00918 } 00919 00920 void CStreamClientPull::ScheduleConnectRequest() 00921 { 00922 // Check the state 00923 assert_client(SCHEDULE_CONNECT_REQUEST == this->stateSchedule); 00924 00925 #ifdef LOG 00926 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00927 printf("\n\tT = %7.3lf PEER %3u CONNECT_REQUEST to : ", this->sim->Time(), this->address.Address()); 00928 #endif 00929 00930 // Check the connecting senders list is empty 00931 assert_client(this->sendersConnecting.empty()); 00932 00933 // Assign a number of neighbors as new senders until the number of senders reaches the number of partners or the neighbors set is exhausted 00934 while((this->senders.size() + this->sendersConnecting.size() < this->info->NumPartners()) && (this->neighbors.size() > 0)) 00935 { 00936 // Select a random neighbor 00937 __uint32 rnd = CRand::Generate(this->neighbors.size()-1); 00938 TNeighborList::iterator iter = this->neighbors.begin(); 00939 00940 for(__uint32 idx = 0; iter != this->neighbors.end(); iter++, idx++) 00941 if(idx == rnd) 00942 break; 00943 assert_client(iter != this->neighbors.end()); 00944 00945 // Select the new sender 00946 CAddress address = *iter; 00947 00948 #ifdef LOG 00949 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00950 printf("%u ", address.Address()); 00951 #endif 00952 00953 // Remove the neighbor from the neighbor list 00954 this->neighbors.erase(iter); 00955 00956 // Create a new connection for the new sender 00957 CConnectionReceiver* receiver; 00958 CConnectionLayer::EResult result = this->connectionLayer->Create(address, this->info->PortConnection(), &receiver); 00959 assert_client(CConnectionLayer::SUCCESS == result); 00960 00961 // Create the sender, add it to the connecting senders list and as tag to the connection 00962 CStreamPullSender* sender = new CStreamPullSender(address, receiver, &this->sendersConnecting); 00963 00964 // Add event handler for the connection 00965 (*receiver->EventOpen()) += this->delegateConnectionReceiverOpen; 00966 (*receiver->EventClose()) += this->delegateConnectionReceiverClose; 00967 00968 // Open the connection 00969 receiver->Open(); 00970 } 00971 00972 // If the number of connecting sender is non-zero 00973 if(this->sendersConnecting.size() > 0) 00974 { 00975 // Change state to connect response 00976 this->stateSchedule = SCHEDULE_CONNECT_RESPONSE; 00977 } 00978 else 00979 { 00980 // Change state to bitmap request 00981 this->stateSchedule = SCHEDULE_BITMAP_REQUEST; 00982 00983 // Go to bitmap request 00984 this->ScheduleBitmapRequest(); 00985 } 00986 } 00987 00988 void CStreamClientPull::ScheduleBitmapRequest() 00989 { 00990 // Check the state 00991 assert_client(SCHEDULE_BITMAP_REQUEST == this->stateSchedule); 00992 00993 #ifdef LOG 00994 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00995 printf("\n\tT = %7.3lf PEER %3u BITMAP_REQUEST : [%u -- %u]", this->sim->Time(), this->address.Address(), 00996 this->scheduleSegment, this->scheduleSegment + this->info->ScheduleSize() - 1); 00997 #endif 00998 00999 // Check the request list is empty 01000 assert_client(this->requestBitmaps.empty()); 01001 01002 // Request bitmaps from current senders 01003 for(CStreamPullSender::TSenderList::iterator iter = this->senders.begin(); iter != this->senders.end(); iter++) 01004 { 01005 // Add the sender to the request list 01006 this->requestBitmaps.insert(pair<CAddress, CStreamPullSender*>(iter->first, iter->second)); 01007 01008 // Send a bitmap request message for the current schedule window 01009 CStreamMessagePullBitmapRequest* message = new CStreamMessagePullBitmapRequest( 01010 this->channel->Id(), 01011 CStreamMessagePull::CONNECTION_YES, 01012 iter->second->Connection()->Id(), // Set local connection ID 01013 iter->second->Connection()->IdEntry(), // Set local connection ID entry 01014 this->scheduleSegment, 01015 this->info->ScheduleSize()); 01016 01017 this->SendMessage(iter->first, message); 01018 01019 #ifdef LOG 01020 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01021 printf("\n\t\t\tto : %u [%u -- %u]", ((CAddress)iter->first).Address(), this->scheduleSegment, this->scheduleSegment + this->info->ScheduleSize() - 1); 01022 #endif 01023 } 01024 01025 // If the number of bitmap request is non-zero 01026 if(!this->requestBitmaps.empty()) 01027 { 01028 // Change the state and wait for bitmap responses 01029 this->stateSchedule = SCHEDULE_BITMAP_RESPONSE; 01030 } 01031 else 01032 { 01033 // Cancel the schedule and go to idle 01034 this->stateSchedule = SCHEDULE_IDLE; 01035 } 01036 } 01037 01038 void CStreamClientPull::SchedulePostBitmapResponse() 01039 { 01040 // Post bitmap response : execute segment scheduling using DoNET / Coolstreaming algorithm 01041 01042 // Because in our scenario the server is provisioned with much more bandiwidth than the client, 01043 // which - using DoNET - might lead to be selected whenever is advertised - we modifiy the algorithm 01044 // such that the server has a lower preference compared to peers 01045 01046 // Using preference is not limited to the server : peers may also advertise a lower preference in the 01047 // bitmap response 01048 01049 // Check the state 01050 assert_client(SCHEDULE_POST_BITMAP_RESPONSE == this->stateSchedule); 01051 01052 #ifdef LOG 01053 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01054 printf("\n\tT = %7.3lf PEER %3u POST_BITMAP_REQUEST ", this->sim->Time(), this->address.Address()); 01055 #endif 01056 01057 // Calculate the number of copies with high and low preference 01058 if(this->senders.size() > PULL_MAX_SENDERS) printf("%u %u %u", this->address.Address(), this->channel->Id(), this->senders.size()); 01059 assert_client(this->senders.size() <= PULL_MAX_SENDERS); 01060 01061 // Reset the schedule number of copies to zero for each segment in the schedule 01062 for(__uint8 indexSegment = 0; indexSegment < this->info->ScheduleSize(); indexSegment++) 01063 { 01064 this->scheduleNumCopiesHi[indexSegment] = 0; 01065 this->scheduleNumCopiesLo[indexSegment] = 0; 01066 } 01067 01068 // The segment size (in bits) 01069 __bits segmentSize = (__bits)(this->info->StreamSegmentSize() * this->channel->Bw() / this->channel->Fps()); 01070 01071 // For every sender 01072 __uint8 indexSender = 0; 01073 for(CStreamPullSender::TSenderList::iterator iter = this->senders.begin(); iter != this->senders.end(); iter++, indexSender++) 01074 { 01075 // Get the last response 01076 CStreamPullSender::CBitmapResponse* response = &iter->second->LastBitmapResponse(); 01077 01078 // Check the last bitmap response is for the current schedule 01079 assert_client(this->scheduleSegment == response->First()); 01080 assert_client(this->info->ScheduleSize() == response->Count()); 01081 01082 // Save the bandwidth from each sender 01083 this->scheduleBw[indexSender] = response->Bw(); 01084 01085 if(response->Preference() == CStreamMessagePullBitmapResponse::PREF_HIGH) 01086 { 01087 // High preference 01088 01089 // For every segment in the schedule 01090 for(__uint8 indexSegment = 0; indexSegment < this->info->ScheduleSize(); indexSegment++) 01091 { 01092 if(!this->bufferUcast->IsSegmentReceiving(indexSegment + this->scheduleSegment)) 01093 { 01094 // Calculate the deadline as frame time (can be negative) 01095 __time deadline = (indexSegment - this->scheduleSegment) * this->info->StreamSegmentSize() / this->channel->Fps() + this->scheduleDeadlineDelta; 01096 01097 if((response->Bitmap() >> indexSegment) & 1) 01098 { 01099 // If the segment is in the bitmap 01100 this->scheduleBitmapHi[indexSender][indexSegment] = 1; 01101 // Set the sender for random access 01102 this->scheduleSenderHi[indexSender][indexSegment] = iter->second; 01103 // Set the deadline 01104 this->scheduleDeadlineHi[indexSender][indexSegment] = deadline; 01105 // Set the index for incremental access 01106 this->scheduleSenderIndexHi[this->scheduleNumCopiesHi[indexSegment]][indexSegment] = indexSender; 01107 // Increment the number of copies 01108 this->scheduleNumCopiesHi[indexSegment]++; 01109 } 01110 else 01111 { 01112 this->scheduleBitmapHi[indexSender][indexSegment] = 0; 01113 this->scheduleSenderHi[indexSender][indexSegment] = NULL; 01114 } 01115 } 01116 } 01117 } 01118 else 01119 { 01120 // Low preference 01121 01122 // For every segment in the schedule 01123 for(__uint8 indexSegment = 0; indexSegment < this->info->ScheduleSize(); indexSegment++) 01124 { 01125 if(!this->bufferUcast->IsSegmentReceiving(indexSegment + this->scheduleSegment)) 01126 { 01127 // Calculate the deadline as frame time (can be negative) 01128 __time deadline = (indexSegment - this->scheduleSegment) * this->info->StreamSegmentSize() / this->channel->Fps() + this->scheduleDeadlineDelta; 01129 01130 if((response->Bitmap() >> indexSegment) & 1) 01131 { 01132 // If the segment is in the bitmap 01133 this->scheduleBitmapLo[indexSender][indexSegment] = 1; 01134 // Set the sender for random access 01135 this->scheduleSenderLo[indexSender][indexSegment] = iter->second; 01136 // Set the deadline 01137 this->scheduleDeadlineLo[indexSender][indexSegment] = deadline; 01138 // Set the index for incremental access 01139 this->scheduleSenderIndexLo[this->scheduleNumCopiesHi[indexSegment]][indexSegment] = indexSender; 01140 // Increment the number of copies 01141 this->scheduleNumCopiesLo[indexSegment]++; 01142 } 01143 else 01144 { 01145 this->scheduleBitmapLo[indexSender][indexSegment] = 0; 01146 this->scheduleSenderLo[indexSender][indexSegment] = NULL; 01147 } 01148 } 01149 } 01150 } 01151 } 01152 01153 // Schedule segments 01154 01155 // Start with scheduling the segments with a single sender 01156 for(__uint8 indexSegment = 0; indexSegment < this->info->ScheduleSize(); indexSegment++) 01157 { 01158 if(!this->bufferUcast->IsSegmentReceiving(indexSegment + this->scheduleSegment)) 01159 { 01160 // Use a high preference sender, if available 01161 if(this->scheduleNumCopiesHi[indexSegment] == 1) 01162 { 01163 __uint8 indexSender = this->scheduleSenderIndexHi[0][indexSegment]; 01164 01165 // Select the only sender 01166 this->scheduleSenders[indexSegment] = this->scheduleSenderHi[indexSender][indexSegment]; 01167 01168 // Decrease the available time for subsequent segments 01169 for(__uint32 index = indexSegment + 1; index < this->info->ScheduleSize(); index++) 01170 { 01171 if(this->scheduleBitmapHi[indexSender][index]) 01172 { 01173 this->scheduleDeadlineHi[indexSender][index] -= segmentSize / this->scheduleBw[indexSender]; 01174 } 01175 } 01176 } 01177 // Else, use a low preference sender, if available 01178 else if(this->scheduleNumCopiesLo[indexSegment] == 1) 01179 { 01180 __uint8 indexSender = this->scheduleSenderIndexLo[0][indexSegment]; 01181 01182 // Select the only sender 01183 this->scheduleSenders[indexSegment] = this->scheduleSenderLo[indexSender][indexSegment]; 01184 01185 // Decrease the available time for subsequent segments 01186 for(__uint32 index = indexSegment + 1; index < this->info->ScheduleSize(); index++) 01187 { 01188 if(this->scheduleBitmapLo[indexSender][index]) 01189 { 01190 this->scheduleDeadlineLo[indexSender][index] -= segmentSize / this->scheduleBw[indexSender]; 01191 } 01192 } 01193 } 01194 // Else, no sender available : set the sender to NULL 01195 else this->scheduleSenders[indexSegment] = NULL; 01196 } 01197 } 01198 01199 01200 for(__uint8 indexSegment = 0; indexSegment < this->info->ScheduleSize(); indexSegment++) 01201 { 01202 if(!this->bufferUcast->IsSegmentReceiving(indexSegment + this->scheduleSegment)) 01203 { 01204 // Use a high preference sender, if available 01205 if(this->scheduleNumCopiesHi[indexSegment] > 1) 01206 { 01207 // Choose the sender with the maximum bandwidth that meets the deadline 01208 __bitrate senderBwMax = -1; 01209 __uint8 indexSenderMax = 0; 01210 for(__uint8 index = 0; index < this->scheduleNumCopiesHi[indexSegment]; index++) 01211 { 01212 __uint8 indexSender = this->scheduleSenderIndexHi[index][indexSegment]; 01213 01214 // If the sender meets the deadline condition 01215 if(this->scheduleDeadlineHi[indexSender][indexSegment]) 01216 { 01217 if(this->scheduleBw[indexSender] > senderBwMax) 01218 { 01219 senderBwMax = this->scheduleBw[indexSender]; 01220 indexSenderMax = indexSender; 01221 } 01222 } 01223 } 01224 01225 if(senderBwMax > 0) 01226 { 01227 // Select the sender with the maximum bandwidth 01228 this->scheduleSenders[indexSegment] = this->scheduleSenderHi[indexSenderMax][indexSegment]; 01229 01230 // Decrease the available time for subsequent segments 01231 for(__uint32 index = indexSegment + 1; index < this->info->ScheduleSize(); index++) 01232 { 01233 if(this->scheduleBitmapHi[indexSenderMax][index]) 01234 { 01235 this->scheduleDeadlineHi[indexSenderMax][index] -= segmentSize / this->scheduleBw[indexSenderMax]; 01236 } 01237 } 01238 } 01239 else 01240 { 01241 // No sender has been found to meet the deadline 01242 this->scheduleSenders[indexSegment] = NULL; 01243 } 01244 } 01245 01246 // If a high preference sender was not found use a low preference sender, if available 01247 if(this->scheduleNumCopiesLo[indexSegment] > 1) 01248 { 01249 // Choose the sender with the maximum bandwidth that meets the deadline 01250 __bitrate senderBwMax = -1; 01251 __uint8 indexSenderMax = 0; 01252 for(__uint8 index = 0; index < this->scheduleNumCopiesLo[indexSegment]; index++) 01253 { 01254 __uint8 indexSender = this->scheduleSenderIndexLo[index][indexSegment]; 01255 01256 // If the sender meets the deadline condition 01257 if(this->scheduleDeadlineLo[indexSender][indexSegment]) 01258 { 01259 if(this->scheduleBw[indexSender] > senderBwMax) 01260 { 01261 senderBwMax = this->scheduleBw[indexSender]; 01262 indexSenderMax = indexSender; 01263 } 01264 } 01265 } 01266 01267 if(senderBwMax > 0) 01268 { 01269 // Select the sender with the maximum bandwidth 01270 this->scheduleSenders[indexSegment] = this->scheduleSenderLo[indexSenderMax][indexSegment]; 01271 01272 // Decrease the available time for subsequent segments 01273 for(__uint32 index = indexSegment + 1; index < this->info->ScheduleSize(); index++) 01274 { 01275 if(this->scheduleBitmapLo[indexSenderMax][index]) 01276 { 01277 this->scheduleDeadlineLo[indexSenderMax][index] -= segmentSize / this->scheduleBw[indexSenderMax]; 01278 } 01279 } 01280 } 01281 else 01282 { 01283 // No sender has been found to meet the deadline 01284 this->scheduleSenders[indexSegment] = NULL; 01285 } 01286 } 01287 } 01288 } 01289 01290 // Scheduling complete 01291 01292 // Change the state 01293 this->stateSchedule = SCHEDULE_SEGMENT_REQUEST; 01294 01295 // Go to segment request 01296 this->ScheduleSegmentRequest(); 01297 } 01298 01299 void CStreamClientPull::ScheduleSegmentRequest() 01300 { 01301 // Check the state 01302 assert_client(SCHEDULE_SEGMENT_REQUEST == this->stateSchedule); 01303 01304 #ifdef LOG 01305 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01306 { 01307 printf("\n\tT = %7.3lf PEER %3u SEGMENT_REQUEST : [%u -- %u] -> [ ", this->sim->Time(), this->address.Address(), 01308 this->scheduleSegment, this->scheduleSegment + this->info->ScheduleSize() - 1); 01309 for(__uint8 indexSegment = 0; indexSegment < this->info->ScheduleSize(); indexSegment++) 01310 { 01311 if(this->bufferUcast->IsSegmentReceiving(this->scheduleSegment + indexSegment)) 01312 CConsole::SetColor(CConsole::LIGHT_RED); 01313 else 01314 CConsole::SetColor(CConsole::LIGHT_GREEN); 01315 printf("%u ", this->scheduleSenders[indexSegment]->Connection()->RemoteAddress().Address()); 01316 CConsole::SetColor(CConsole::LIGHT_GRAY); 01317 } 01318 printf("]"); 01319 } 01320 #endif 01321 01322 // Send requests according to the scheduling algorithm 01323 01324 // Check the request list is empty 01325 assert_client(this->requestSegments.empty()); 01326 01327 // For all senders 01328 __uint8 indexSender = 0; 01329 for(CStreamPullSender::TSenderList::iterator iter = this->senders.begin(); iter != this->senders.end(); iter++, indexSender++) 01330 { 01331 // Generate request bitmap for this sender 01332 __uint64 bitmap = 0; 01333 01334 for(__uint8 indexSegment = 0; indexSegment < this->info->ScheduleSize(); indexSegment++) 01335 { 01336 if(!this->bufferUcast->IsSegmentReceiving(indexSegment + this->scheduleSegment)) 01337 { 01338 if(this->scheduleSenders[indexSegment] == iter->second) 01339 { 01340 bitmap |= (((__uint64)1) << indexSegment); 01341 } 01342 } 01343 } 01344 01345 #ifdef LOG 01346 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01347 printf("\n\t\t\tSender %3u : %016llX", iter->first, bitmap); 01348 #endif 01349 01350 // If there is at least one segment to request from this sender 01351 if(bitmap) 01352 { 01353 // Send the request 01354 CStreamMessagePullSegmentRequest* message = new CStreamMessagePullSegmentRequest( 01355 this->channel->Id(), 01356 iter->second->Connection()->Id(), 01357 iter->second->Connection()->IdEntry(), 01358 iter->second->Connection()->RemoteId(), 01359 iter->second->Connection()->RemoteIdEntry(), 01360 this->scheduleSegment, 01361 this->info->ScheduleSize(), 01362 bitmap); 01363 01364 this->SendMessage(iter->first, message); 01365 01366 // Add the sender to the request list 01367 this->requestSegments.insert(pair<CAddress, CStreamPullSender*>(iter->first, iter->second)); 01368 } 01369 } 01370 01371 // If the number of requests is non-zero 01372 if(!this->requestSegments.empty()) 01373 { 01374 // Change the state and wait for responses 01375 this->stateSchedule = SCHEDULE_SEGMENT_RESPONSE; 01376 } 01377 else 01378 { 01379 // Otherwise cancel the schedule and go to idle 01380 this->stateSchedule = SCHEDULE_IDLE; 01381 } 01382 } 01383 01384 void CStreamClientPull::RecvMessageBootResponse(CAddress src, CStreamMessageBootPullResponse* message) 01385 { 01386 // Received bootstrap response 01387 01388 #ifdef LOG 01389 CConsole::SetColor(CConsole::LIGHT_CYAN); 01390 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01391 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP_RESPONSE %u neighbors : ", this->sim->Time(), this->address.Address(), src.Address(), 01392 message->Count()); 01393 CConsole::SetColor(CConsole::LIGHT_GRAY); 01394 #endif 01395 01396 // If a delayed message, ignore 01397 if(message->Stream() != this->channel->Id()) return; 01398 01399 // Check the schedule state 01400 assert_client(SCHEDULE_BOOTSTRAP_RESPONSE == this->stateSchedule); 01401 01402 // Assert 01403 01404 // Save the number of neighbors in the neighbor list 01405 for(__uint32 index = 0; index < message->Count(); index++) 01406 { 01407 // Check the neighbor is not already a sender 01408 if(this->senders.find(message->Host(index)) == this->senders.end()) 01409 { 01410 this->neighbors.insert(message->Host(index)); 01411 #ifdef LOG 01412 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01413 printf("%u ", message->Host(index)); 01414 #endif 01415 } 01416 } 01417 01418 // If the number of sender is less than the number of partners and the number of neighbors is greater than zero 01419 if((this->senders.size() < this->info->NumPartners()) && (this->neighbors.size() > 0)) 01420 { 01421 // Change the state to connect request 01422 this->stateSchedule = SCHEDULE_CONNECT_REQUEST; 01423 01424 // Go to connect request 01425 this->ScheduleConnectRequest(); 01426 } 01427 else 01428 { 01429 // Change the state to bitmap request 01430 this->stateSchedule = SCHEDULE_BITMAP_REQUEST; 01431 01432 // Go to bitmap request 01433 this->ScheduleBitmapRequest(); 01434 } 01435 } 01436 01437 void CStreamClientPull::RecvMessageBitmapRequest(CAddress src, CStreamMessagePullBitmapRequest* message) 01438 { 01439 // Received bitmap request 01440 01441 // Verify the requested stream 01442 if(message->Stream() == this->channel->Id()) 01443 { 01444 // Generate the bitmap 01445 __uint64 bitmap = 0LL; 01446 01447 // For all requested segments 01448 for(__uint8 index = 0; index < message->Count(); index++) 01449 { 01450 // If the requested segments exists in the buffer, set its bit to 1 01451 if(this->bufferUcast->IsSegmentComplete(message->First() + index)) 01452 { 01453 bitmap |= ((__uint64)1) << index; 01454 } 01455 } 01456 01457 // Send the reply 01458 CStreamMessagePullBitmapResponse* reply = new CStreamMessagePullBitmapResponse( 01459 message->Stream(), 01460 message->First(), 01461 message->Count(), 01462 bitmap, 01463 (*this->delegateLink)()->Bandwidth(0), 01464 (*this->delegateLink)()->MeterUtil(0), 01465 message->Connection(), 01466 message->ConnectionId(), 01467 message->ConnectionIdEntry(), 01468 CStreamMessagePullBitmapResponse::SUCCESS, 01469 CStreamMessagePullBitmapResponse::PREF_HIGH 01470 ); 01471 01472 this->SendMessage(src, reply); 01473 } 01474 else 01475 { 01476 // Send a fail reply with bitmap set to zero 01477 CStreamMessagePullBitmapResponse* reply = new CStreamMessagePullBitmapResponse( 01478 message->Stream(), 01479 message->First(), 01480 message->Count(), 01481 0LL, 01482 0, // Set bandwidth to zero 01483 0, // Set bandwidth utilization to zero 01484 message->Connection(), 01485 message->ConnectionId(), 01486 message->ConnectionIdEntry(), 01487 CStreamMessagePullBitmapResponse::FAIL, 01488 CStreamMessagePullBitmapResponse::PREF_HIGH 01489 ); 01490 01491 this->SendMessage(src, reply); 01492 } 01493 } 01494 01495 void CStreamClientPull::RecvMessageBitmapResponse(CAddress src, CStreamMessagePullBitmapResponse* message) 01496 { 01497 // Received bitmap response 01498 01499 // If a delayed message, ignore 01500 if(message->Stream() != this->channel->Id()) return; 01501 01502 #ifdef LOG 01503 CConsole::SetColor(CConsole::LIGHT_CYAN); 01504 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01505 printf("\n\tT = %7.3lf PEER %3u BITMAP_RESPONSE from %u [%u -- %u]=%016llX bw=%.0lf bw_util=%.0lf pref=%u : ", this->sim->Time(), this->address.Address(), src.Address(), 01506 message->First(), message->First() + message->Count() - 1, message->Bitmap(), message->Bw(), message->BwUtil(), message->Preference()); 01507 CConsole::SetColor(CConsole::LIGHT_GRAY); 01508 #endif 01509 01510 // Check the schedule state 01511 assert_client(SCHEDULE_BITMAP_RESPONSE == this->stateSchedule); 01512 01513 // Check the source is in the request list 01514 TRequestList::iterator iter = this->requestBitmaps.find(src); 01515 assert_client(iter != this->requestBitmaps.end()); 01516 01517 // Get the sender 01518 CStreamPullSender* sender = iter->second; 01519 01520 // Remove the entry from the request list 01521 this->requestBitmaps.erase(iter); 01522 01523 // Check the message against the sender 01524 assert_client(message->Connection() == CStreamMessagePull::CONNECTION_YES); 01525 assert_client(message->ConnectionId() == sender->Connection()->Id()); 01526 assert_client(message->ConnectionIdEntry() == sender->Connection()->IdEntry()); 01527 01528 // Save the response information in the sender record 01529 sender->SetBitmapResponse(CStreamPullSender::CBitmapResponse( 01530 message->First(), 01531 message->Count(), 01532 message->Bitmap(), 01533 message->Bw(), 01534 message->Preference())); 01535 01536 // If the response has been successful 01537 01538 01539 if(message->Response() != CStreamMessagePullBitmapResponse::SUCCESS) 01540 { 01541 CConnectionReceiver* connection = sender->Connection(); 01542 01543 // Delete the sender 01544 delete sender; 01545 01546 // Close the connection 01547 connection->Close(); 01548 01549 01550 #ifdef LOG 01551 CConsole::SetColor(CConsole::LIGHT_RED); 01552 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01553 printf("FAIL"); 01554 CConsole::SetColor(CConsole::LIGHT_GRAY); 01555 #endif 01556 } 01557 else 01558 { 01559 #ifdef LOG 01560 CConsole::SetColor(CConsole::LIGHT_GREEN); 01561 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01562 printf("SUCCESS"); 01563 CConsole::SetColor(CConsole::LIGHT_GRAY); 01564 #endif 01565 } 01566 01567 // If bitmap request list is empty (all responses have been received) 01568 if(this->requestBitmaps.empty()) 01569 { 01570 // Change the state to post bitmap response 01571 this->stateSchedule = SCHEDULE_POST_BITMAP_RESPONSE; 01572 01573 // Go to post bitmap response 01574 this->SchedulePostBitmapResponse(); 01575 } 01576 } 01577 01578 void CStreamClientPull::RecvMessageSegmentRequest(CAddress src, CStreamMessagePullSegmentRequest* message) 01579 { 01580 // Receiver segment request 01581 01582 // Get the local connection 01583 CConnectionSender* sender = type_cast<CConnectionSender*>(this->connectionLayer->Get(message->DstId(), message->DstIdEntry())); 01584 assert_client(sender); 01585 assert_client(sender->State() >= CConnection::OPENED); 01586 01587 __uint64 success = 0; 01588 __uint64 fail = 0xFFFFFFFFFFFFFFFFLL; 01589 __uint8 count = 0; 01590 01591 // If the connection is in OPEN state 01592 if(sender->State() == CConnection::OPENED) 01593 { 01594 success = 0; 01595 fail = 0; 01596 01597 // Get the receiver 01598 assert_client(sender->Tag()); 01599 CStreamPullReceiver* receiver = type_cast<CStreamPullReceiver*>(sender->Tag()); 01600 01601 // Verify the receiver 01602 assert_client(receiver); 01603 assert_client(receiver->Connection() == sender); 01604 01605 // Send the requested segments through the receiver 01606 for(__uint32 sIndex = 0; sIndex < message->Count(); sIndex++) 01607 { 01608 // If the segment is requested 01609 if((message->Bitmap() >> sIndex) & 1) 01610 { 01611 __uint32 segmentIndex = message->First() + sIndex; 01612 // If the segment exists in the buffer and is complete 01613 if(this->bufferUcast->IsSegmentComplete(segmentIndex)) 01614 { 01615 // Get the segment from the buffer 01616 CStreamSegment* segment = this->bufferUcast->GetSegment(segmentIndex); 01617 01618 __uint32 frameIndexStart = segmentIndex * this->info->StreamSegmentSize(); 01619 01620 // Send all frames from that segment 01621 for(__uint32 fIndex = 0; fIndex < this->info->StreamSegmentSize(); fIndex++) 01622 { 01623 assert_client((*segment)[fIndex].Valid()); 01624 receiver->Send((*segment)[fIndex]); 01625 } 01626 01627 // Mark the segment as success 01628 success |= ((__uint64)1) << sIndex; 01629 } 01630 else 01631 { 01632 // Mark the segment as fail 01633 fail |= ((__uint64)1) << sIndex; 01634 } 01635 } 01636 } 01637 01638 // Set the count to the requested count 01639 count = message->Count(); 01640 } 01641 01642 // Reply to the request 01643 CStreamMessagePullSegmentResponse* reply = new CStreamMessagePullSegmentResponse( 01644 message->Stream(), 01645 message->DstId(), 01646 message->DstIdEntry(), 01647 message->SrcId(), 01648 message->SrcIdEntry(), 01649 message->First(), 01650 message->Count(), 01651 0LL, 01652 0xFFFFFFFFFFFFFFFFLL 01653 ); 01654 01655 this->SendMessage(src, reply); 01656 } 01657 01658 void CStreamClientPull::RecvMessageSegmentResponse(CAddress src, CStreamMessagePullSegmentResponse* message) 01659 { 01660 // Received segment response 01661 01662 // If a delayed message, ignore 01663 if(message->Stream() != this->channel->Id()) return; 01664 01665 // Check the schedule state 01666 assert_client(SCHEDULE_SEGMENT_RESPONSE == this->stateSchedule); 01667 01668 #ifdef LOG 01669 CConsole::SetColor(CConsole::LIGHT_CYAN); 01670 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01671 printf("\n\tT = %7.3lf PEER %3u SEGMENT_RESPONSE : [%u -- %u] : ", this->sim->Time(), this->address.Address(), 01672 message->First(), message->First() + message->Count() - 1); 01673 CConsole::SetColor(CConsole::LIGHT_GRAY); 01674 #endif 01675 01676 if(src == this->channel->Address()) return; 01677 01678 // Check the source is in the request list 01679 TRequestList::iterator iter = this->requestSegments.find(src); 01680 assert_client(iter != this->requestSegments.end()); 01681 01682 // Get the sender 01683 CStreamPullSender* sender = iter->second; 01684 01685 // Remove the entry from the request list 01686 this->requestSegments.erase(iter); 01687 01688 // Check the message against the sender 01689 assert_client(message->DstId() == sender->Connection()->Id()); 01690 assert_client(message->DstIdEntry() == sender->Connection()->IdEntry()); 01691 01692 // Mark in the buffer the segments that have been marked as successful 01693 for(__uint32 index = 0; index < message->Count(); index++) 01694 { 01695 __uint32 segmentIndex = message->First() + index; 01696 01697 this->bufferUcast->SegmentReceiving(segmentIndex, true); 01698 01699 #ifdef LOG 01700 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01701 { 01702 if((message->Success() >> index) & 1) CConsole::SetColor(CConsole::LIGHT_GREEN); 01703 else if((message->Fail() >> index) & 1) CConsole::SetColor(CConsole::LIGHT_RED); 01704 else CConsole::SetColor(CConsole::DARK_GRAY); 01705 printf("%u ", segmentIndex); 01706 CConsole::SetColor(CConsole::LIGHT_GRAY); 01707 } 01708 #endif 01709 } 01710 01711 // If the requests list is empty (all responses have been received) 01712 if(this->requestSegments.empty()) 01713 { 01714 // Change the state to IDLE 01715 this->stateSchedule = SCHEDULE_IDLE; 01716 } 01717 } 01718 01719 void CStreamClientPull::ConnectionRecv(CConnectionReceiver* receiver, CPacket* packet) 01720 { 01721 assert_client(packet->Type() == PACKET_TYPE_STREAM); 01722 01723 CPacketStreamFrame* packetFrame = type_cast<CPacketStreamFrame*>(packet); 01724 01725 // Ignore frames not for this channel 01726 if(packetFrame->Stream() != this->channel->Id()) return; 01727 01728 // Call the frame processing function 01729 (this->*this->processFrame)(packetFrame->Frame()); 01730 01731 #ifdef LOG 01732 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01733 { 01734 CConsole::SetColor(CConsole::DARK_GRAY); 01735 printf("\n\t\t\tT = %7.3lf PEER %3u RECV %u bits :: ", this->sim->Time(), this->address.Address(), packet->Size()); 01736 CConsole::SetColor(CConsole::DARK_GREEN); 01737 printf("%u [%c]", type_cast<CPacketStreamFrame*>(packet)->Frame().Index(), 01738 stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(type_cast<CPacketStreamFrame*>(packet)->Frame().Index())]); 01739 CConsole::SetColor(CConsole::LIGHT_GRAY); 01740 } 01741 #endif 01742 } 01743 01744 bool CStreamClientPull::ConnectionAccept(CAddress src, CPacket* packet) 01745 { 01746 CLink* link = (*this->delegateLink)(); 01747 01748 // Only accept connections 01749 return true; 01750 // return ((link->MeterUtil(0) < 0.7 * link->Bandwidth(0)) && (this->receivers.size() < 0.7*link->Bandwidth(0)/this->channel->Bw())); 01751 } 01752 01753 void CStreamClientPull::ConnectionAccepted(CConnectionSender* sender) 01754 { 01755 // Sender connection accepted : hook connection events 01756 (*sender->EventOpen()) += this->delegateConnectionSenderOpen; 01757 (*sender->EventClose()) += this->delegateConnectionSenderClose; 01758 } 01759 01760 void CStreamClientPull::ConnectionReceiverOpen(CConnection* receiver, CConnection::EOpenResult result) 01761 { 01762 // Opening receiver connection 01763 01764 // Check the state 01765 assert_client(SCHEDULE_CONNECT_RESPONSE == this->stateSchedule); 01766 01767 #ifdef LOG 01768 CConsole::SetColor(CConsole::LIGHT_YELLOW); 01769 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01770 printf("\n\tT = %7.3lf PEER %3u RECIEVER_OPEN : ", this->sim->Time(), this->address.Address()); 01771 CConsole::SetColor(CConsole::LIGHT_GRAY); 01772 #endif 01773 01774 // If connection is successfull 01775 if(CConnection::OPEN_SUCCESS == result) 01776 { 01777 #ifdef LOG 01778 CConsole::SetColor(CConsole::LIGHT_GREEN); 01779 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01780 printf("SUCCESS"); 01781 CConsole::SetColor(CConsole::LIGHT_GRAY); 01782 #endif 01783 01784 // Get the sender record 01785 assert_client(receiver->Tag()); 01786 CStreamPullSender* sender = type_cast<CStreamPullSender*>(receiver->Tag()); 01787 01788 // Check the receiver type is connecting and is in the connecting senders list 01789 assert_client(sender->List() == &this->sendersConnecting); 01790 assert_client(sender->Type() == CStreamPullSender::CONNECTING_SENDER); 01791 assert_client(this->sendersConnecting.find(receiver->RemoteAddress()) != this->sendersConnecting.end()); 01792 01793 // Change the sender list from connecting senders to senders 01794 sender->List(&this->senders); 01795 01796 // Change the sender type 01797 sender->Type(CStreamPullSender::SENDER); 01798 } 01799 else 01800 { 01801 #ifdef LOG 01802 CConsole::SetColor(CConsole::LIGHT_RED); 01803 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01804 printf("FAIL (%u)", result); 01805 CConsole::SetColor(CConsole::LIGHT_GRAY); 01806 #endif 01807 01808 // Connection closed : if the connection has a sender tag, delete it (this will remove the tag and the sender from the connection list) 01809 if(receiver->Tag()) delete receiver->Tag(); 01810 } 01811 01812 // If the connecting senders list is empty 01813 if(this->sendersConnecting.empty()) 01814 { 01815 // Connection establishment for all senders has finished 01816 01817 // Change state to bitmap request 01818 this->stateSchedule = SCHEDULE_BITMAP_REQUEST; 01819 01820 // Go to bitmap request 01821 this->ScheduleBitmapRequest(); 01822 } 01823 } 01824 01825 void CStreamClientPull::ConnectionReceiverClose(CConnection* receiver, CConnection::ECloseResult result) 01826 { 01827 // Closing receiver connection 01828 01829 #ifdef LOG 01830 CConsole::SetColor(CConsole::LIGHT_YELLOW); 01831 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01832 printf("\n\tT = %7.3lf PEER %3u RECIEVER_CLOSE ", this->sim->Time(), this->address.Address()); 01833 CConsole::SetColor(CConsole::LIGHT_GRAY); 01834 #endif 01835 01836 // Existing connection closed 01837 assert_client(receiver->Type() == CConnection::REQUESTER); 01838 01839 // If closing is complete and the tag is set 01840 if(CConnection::CLOSE_COMPLETE == result) 01841 { 01842 // If the receiver has a sender tag, delete it (this will remove the tag an the sender from its associated list) 01843 if(receiver->Tag()) delete receiver->Tag(); 01844 } 01845 // Else, do nothing 01846 } 01847 01848 void CStreamClientPull::ConnectionSenderOpen(CConnection* sender, CConnection::EOpenResult result) 01849 { 01850 // New connection opened 01851 01852 // Check this is a sender connection 01853 assert_client(sender->Type() == CConnection::RESPONDER); 01854 01855 // If the connection has been opened successfully 01856 if((CConnection::OPEN_SUCCESS == result) && (sender->RemoteAddress() != this->channel->Address())) 01857 { 01858 // Create a new receiver 01859 CStreamPullReceiver* receiver = new CStreamPullReceiver( 01860 this->sim, 01861 type_cast<CConnectionSender*>(sender), 01862 this->info->StreamFrameInterval(), 01863 sender->Id(), 01864 &this->receivers 01865 ); 01866 } 01867 // Else, do nothing 01868 } 01869 01870 void CStreamClientPull::ConnectionSenderClose(CConnection* sender, CConnection::ECloseResult result) 01871 { 01872 // Existing connection closed 01873 assert_client(sender->Type() == CConnection::RESPONDER); 01874 01875 // If closing is complete 01876 if(CConnection::CLOSE_COMPLETE == result) 01877 { 01878 // If the connection has a tag, delete the tag (this will remove the receiver tag from the connection and its associated list) 01879 if(sender->Tag()) delete sender->Tag(); 01880 } 01881 // Else, do nothing 01882 } 01883 01884 void CStreamClientPull::SendMessage(CAddress dst, CStreamMessage* message) 01885 { 01886 (*this->delegateSend)(this->info->PortControl(), this->info->PortControl(), dst, 128, message); 01887 } 01888 01889 void CStreamClientPull::UcastRegister() 01890 { 01891 // Register for the current channel 01892 01893 // Check the client state 01894 assert_client(this->stateClient >= UCAST_STREAM_FIRST); 01895 01896 // Check the registration state 01897 assert_client(this->stateRegistration == NOT_REGISTERED); 01898 01899 // Send a bootstrap registration message to the server 01900 CStreamMessageBootPullRegister* message = new CStreamMessageBootPullRegister( 01901 this->channel->Id(), 01902 this->address 01903 ); 01904 01905 this->SendMessage(this->channel->Address(), message); 01906 01907 // Change the registration state 01908 this->stateRegistration = REGISTERED; 01909 } 01910 01911 void CStreamClientPull::UcastDeregister() 01912 { 01913 // Deregister for the current channel 01914 01915 // Check the client state 01916 assert_client(this->stateClient >= UCAST_STREAM_FIRST); 01917 01918 // Check the registration state 01919 assert_client(this->stateRegistration == REGISTERED); 01920 01921 // Send a bootstrap registration message to the server 01922 CStreamMessageBootPullDeregister* message = new CStreamMessageBootPullDeregister( 01923 this->channel->Id(), 01924 this->address 01925 ); 01926 01927 this->SendMessage(this->channel->Address(), message); 01928 01929 // Change the registration state 01930 this->stateRegistration = NOT_REGISTERED; 01931 } 01932 01933 01934 void CStreamClientPull::Finalize() 01935 { 01936 }
Last updated: February 8, 2011