Alex Bikfalvi
SimStream Documentation
StreamClientPush.cpp
00001 #include "Headers.h" 00002 #include "StreamClientPush.h" 00003 #include "Rand.h" 00004 #include "Console.h" 00005 #include "Debug.h" 00006 00007 CStreamClientPush::CStreamClientPush( 00008 CSimHandler* sim, 00009 CInfoPush* info, 00010 CAddress address, 00011 IDelegate2<void, CAddress, CPacketStream*>* delegateSendStream, 00012 IDelegate2<void, CAddress, CStreamMessage*>* delegateSendMessage, 00013 IDelegate2<void, CAddress, __uint32>* delegateIgmpJoin, 00014 IDelegate1<void, CAddress>* delegateIgmpLeave, 00015 IDelegate0<CLink*>* delegateLink, 00016 __uint32 entry, 00017 __bitrate bw 00018 ) : CStreamClient(sim) 00019 { 00020 assert(info); 00021 00022 assert(delegateSendStream); 00023 assert(delegateSendMessage); 00024 assert(delegateIgmpJoin); 00025 assert(delegateIgmpLeave); 00026 assert(delegateLink); 00027 00028 // Set the initial state 00029 this->stateClient = STOPPED; 00030 this->stateRegistration = NOT_REGISTERED; 00031 00032 // Set simulator 00033 this->info = info; 00034 this->address = address; 00035 00036 // Set the channel 00037 this->channel = NULL; 00038 00039 // Create the buffer 00040 this->buffer = new CStreamBuffer(this->info->StreamBufferSize(), this->info->StreamBufferSizeHistory()); 00041 00042 // Unicast session 00043 this->bw = bw; 00044 this->bwUsed = 0; 00045 00046 // Multicast session 00047 this->entry = entry; 00048 00049 // Create the timer 00050 this->timerPlayMcast = new CTimer<CStreamClientPush>( 00051 this->sim, 00052 this, 00053 &CStreamClientPush::TimerPlayMcast 00054 ); 00055 this->timerPlayUcast = new CTimer<CStreamClientPush>( 00056 this->sim, 00057 this, 00058 &CStreamClientPush::TimerPlayUcast 00059 ); 00060 this->timerBoot = new CTimer<CStreamClientPush>( 00061 this->sim, 00062 this, 00063 &CStreamClientPush::TimerBoot 00064 ); 00065 this->timerRegister = new CTimer<CStreamClientPush>( 00066 this->sim, 00067 this, 00068 &CStreamClientPush::TimerRegister 00069 ); 00070 this->timerBuffer = new CTimer<CStreamClientPush>( 00071 this->sim, 00072 this, 00073 &CStreamClientPush::TimerBuffer 00074 ); 00075 00076 // Set the interface 00077 this->entry = entry; 00078 00079 // Set the lower layer delegates 00080 this->delegateSendStream = delegateSendStream; 00081 this->delegateSendMessage = delegateSendMessage; 00082 this->delegateIgmpJoin = delegateIgmpJoin; 00083 this->delegateIgmpLeave = delegateIgmpLeave; 00084 this->delegateLink = delegateLink; 00085 00086 // Delegates 00087 this->delegateRecvFrameMcast = new Delegate2<CStreamClientPush, void, CAddress, CStreamFrame>(this, &CStreamClientPush::RecvFrameMcast); 00088 this->delegateRecvFrameUcast = new Delegate2<CStreamClientPush, void, CAddress, CStreamFrame>(this, &CStreamClientPush::RecvFrameUcast); 00089 00090 // Encoder 00091 this->encoder = new CStreamEncoderFrame(this->delegateSendStream); 00092 00093 // Decoders 00094 this->decoder = NULL; 00095 this->decoderMcast = new CStreamDecoderFrame(this->delegateRecvFrameMcast); 00096 this->decoderUcast = new CStreamDecoderFrame(this->delegateRecvFrameUcast); 00097 } 00098 00099 CStreamClientPush::~CStreamClientPush() 00100 { 00101 // Delete delegates 00102 delete this->delegateRecvFrameMcast; 00103 delete this->delegateRecvFrameUcast; 00104 00105 // Delete coders 00106 delete this->encoder; 00107 delete this->decoderMcast; 00108 delete this->decoderUcast; 00109 00110 // Delete the buffer 00111 delete this->buffer; 00112 00113 // Delete the timers 00114 delete this->timerPlayMcast; 00115 delete this->timerPlayUcast; 00116 delete this->timerBoot; 00117 delete this->timerRegister; 00118 delete this->timerBuffer; 00119 } 00120 00121 void CStreamClientPush::Start(CChannel* channel) 00122 { 00123 assert(channel); 00124 00125 // Check the state 00126 assert(STOPPED == this->stateClient); 00127 00128 // Check the timers are stopped 00129 assert(!this->timerPlayMcast->IsSet()); 00130 assert(!this->timerPlayUcast->IsSet()); 00131 assert(!this->timerBoot->IsSet()); 00132 assert(!this->timerRegister->IsSet()); 00133 assert(!this->timerBuffer->IsSet()); 00134 00135 // Set the channel 00136 this->channel = channel; 00137 00138 // Log 00139 #ifdef LOG 00140 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00141 printf("\n\tT = %7.3lf PEER %3u START %3u (%u)", this->sim->Time(), this->address.Address(), this->channel->Id(), this->channel->Type()); 00142 #endif 00143 00144 // Reset statistics 00145 this->statRecvFrames = 0; 00146 this->statDiscardedFrames = 0; 00147 this->statPlayFrames = 0; 00148 this->statSuccessFrames[0] = 0; 00149 this->statSuccessFrames[1] = 0; 00150 this->statSuccessFrames[2] = 0; 00151 this->statFailFrames[0] = 0; 00152 this->statFailFrames[1] = 0; 00153 this->statFailFrames[2] = 0; 00154 00155 this->statTimeClientStart = this->sim->Time(); 00156 this->statTimeRecvStart = -1; 00157 this->statTimePlayStart = -1; 00158 this->statTimeWait = 0; 00159 00160 this->statPlayFirstFrame = 0; 00161 this->statPlayLastFrame = 0; 00162 00163 this->statSyncDelaySum = 0; 00164 this->statSyncDelayCount = 0; 00165 00166 // Check the channel type 00167 switch(this->channel->Type()) 00168 { 00169 case CChannel::CHANNEL_MULTICAST: this->StartMcast(); break; 00170 case CChannel::CHANNEL_UNICAST: this->StartUcast(); break; 00171 } 00172 } 00173 00174 void CStreamClientPush::StartMcast() 00175 { 00176 // Start session with phase 1 00177 this->processFrame = &CStreamClientPush::ProcessFrameMcastFirst; 00178 00179 // Set decoder 00180 this->decoder = this->decoderMcast; 00181 00182 // Reset the decoder 00183 this->decoder->Reset(this->channel->Id()); 00184 00185 // Join the multicast group 00186 (*this->delegateIgmpJoin)(this->channel->Address(), this->entry); 00187 00188 // Change the state 00189 this->stateClient = MCAST_STREAM_FIRST; 00190 } 00191 00192 void CStreamClientPush::StartUcast() 00193 { 00194 // Check the registration state (client must not be registered) 00195 assert(NOT_REGISTERED == this->stateRegistration); 00196 00197 // Set decoder 00198 this->decoder = this->decoderUcast; 00199 00200 // Reset the decoder 00201 this->decoder->Reset(this->channel->Id()); 00202 00203 // Set the session variables 00204 this->neighbors.clear(); // Clear the neighbors 00205 this->sender = channel->Address(); // Set sender to the server 00206 00207 // Change the client state 00208 this->stateClient = UCAST_BOOTSTRAP_REQUEST; 00209 00210 // Go to streaming STEP 1 00211 this->StreamBootstrap(); 00212 } 00213 00214 void CStreamClientPush::Stop() 00215 { 00216 // Check the state 00217 assert(STOPPED != this->stateClient); 00218 00219 // Log 00220 #ifdef LOG 00221 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00222 printf("\n\tT = %7.3lf PEER %3u STOP %3u", this->sim->Time(), this->address.Address(), this->channel->Id()); 00223 #endif 00224 00225 // Stop client 00226 switch(this->channel->Type()) 00227 { 00228 case CChannel::CHANNEL_MULTICAST: this->StopMcast(); break; 00229 case CChannel::CHANNEL_UNICAST: this->StopUcast(); break; 00230 } 00231 00232 // If there are active timer, cancel 00233 if(this->timerPlayMcast->IsSet()) this->timerPlayMcast->Cancel(); 00234 if(this->timerPlayUcast->IsSet()) this->timerPlayUcast->Cancel(); 00235 if(this->timerBoot->IsSet()) this->timerBoot->Cancel(); 00236 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 00237 if(this->timerBuffer->IsSet()) this->timerBuffer->Cancel(); 00238 00239 // Finalize statistics 00240 this->statTimeFinish = this->sim->Time(); 00241 00242 // Set channel to null 00243 this->channel = NULL; 00244 00245 // Change the state 00246 this->stateClient = STOPPED; 00247 } 00248 00249 void CStreamClientPush::StopMcast() 00250 { 00251 // Leave the multicast group 00252 (*this->delegateIgmpLeave)(this->channel->Address()); 00253 00254 // If the session is waiting, add the waiting time 00255 if(MCAST_STREAM_WAIT == this->stateClient) this->statTimeWait += (this->sim->Time() - this->sessionTimeLastStop); 00256 } 00257 00258 void CStreamClientPush::StopUcast() 00259 { 00260 // Close all receivers 00261 this->StreamClose(); 00262 00263 // Leave the stream 00264 this->StreamLeave(); 00265 00266 // If registered, deregister from bootstrap server 00267 if(REGISTERED == this->stateRegistration) this->StreamDeregister(); 00268 00269 // If the session is waiting, add the waiting time 00270 if(UCAST_STREAM_WAIT == this->stateClient) this->statTimeWait += (this->sim->Time() - this->sessionTimeLastStop); 00271 } 00272 00273 void CStreamClientPush::Recv(CAddress srcAddress, CAddress dstAddress, __uint16 srcPort, __uint16 dstPort, CPacket* packet) 00274 { 00275 // Process received packets 00276 if(NULL == packet) return; 00277 if((dstAddress != this->address) && (!dstAddress.IsMulticast())) return; 00278 00279 // Process packet depending on UDP destination 00280 if(this->info->PortStream() == dstPort) this->RecvStream(srcAddress, type_cast<CPacketStream*>(packet)); 00281 if(this->info->PortControl() == dstPort) this->RecvMessage(srcAddress, type_cast<CStreamMessage*>(packet)); 00282 } 00283 00284 void CStreamClientPush::RecvStream(CAddress src, CPacketStream* packet) 00285 { 00286 #ifdef LOG 00287 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00288 { 00289 CConsole::SetColor(CConsole::DARK_GRAY); 00290 printf("\n\t\tT = %7.3lf RECV STREAM %u --- %u : ", this->sim->Time(), packet->Stream(), src.Address()); 00291 CConsole::SetColor(CConsole::LIGHT_GRAY); 00292 } 00293 #endif 00294 00295 // Send received stream packets to the decoder 00296 CStreamDecoder::EResult result = this->decoder->Decode(src, packet); 00297 00298 #ifdef LOG 00299 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00300 { 00301 CConsole::SetColor(CConsole::LIGHT_RED); 00302 switch(result) 00303 { 00304 case CStreamDecoder::FAIL_PACKET_TYPE: printf("FAIL PACKET TYPE"); break; 00305 case CStreamDecoder::FAIL_STREAM: printf("FAIL STREAM"); break; 00306 } 00307 CConsole::SetColor(CConsole::LIGHT_GRAY); 00308 } 00309 #endif 00310 } 00311 00312 void CStreamClientPush::RecvMessage(CAddress src, CStreamMessage* message) 00313 { 00314 // Process messages 00315 switch(message->MessageType()) 00316 { 00317 case CStreamMessage::STREAM_MESSAGE_PUSH_JOIN: this->RecvMessageJoin(src, type_cast<CStreamMessagePushJoin*>(message)); break; 00318 case CStreamMessage::STREAM_MESSAGE_PUSH_LEAVE: this->RecvMessageLeave(src, type_cast<CStreamMessagePushLeave*>(message)); break; 00319 case CStreamMessage::STREAM_MESSAGE_PUSH_CLOSE: this->RecvMessageClose(src, type_cast<CStreamMessagePushClose*>(message)); break; 00320 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_RESPONSE: this->RecvMessageBootResponse(src, type_cast<CStreamMessageBootPushResponse*>(message)); break; 00321 default: assert(0); /* do nothing */ 00322 } 00323 } 00324 00325 void CStreamClientPush::RecvFrameMcast(CAddress src, CStreamFrame frame) 00326 { 00327 #ifdef LOG 00328 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00329 { 00330 CConsole::SetColor(CConsole::LIGHT_CYAN); 00331 printf("\n\t\tT = %7.3lf RECV FRAME (M) %u", this->sim->Time(), frame.Index()); 00332 CConsole::SetColor(CConsole::LIGHT_GRAY); 00333 } 00334 #endif 00335 00336 // Check the state 00337 if(this->stateClient < MCAST_STREAM_FIRST) 00338 { 00339 this->statDiscardedFrames = 0; 00340 return; 00341 } 00342 00343 // Check the decoder passed a correct frame 00344 if(frame.Stream() != this->channel->Id()) 00345 { 00346 this->statDiscardedFrames = 0; 00347 return; 00348 } 00349 00350 // Process frame 00351 (this->*this->processFrame)(frame); 00352 00353 // Statistics 00354 this->statRecvFrames++; 00355 } 00356 00357 void CStreamClientPush::RecvFrameUcast(CAddress src, CStreamFrame frame) 00358 { 00359 #ifdef LOG 00360 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00361 { 00362 CConsole::SetColor(CConsole::LIGHT_CYAN); 00363 printf("\n\t\tT = %7.3lf RECV FRAME (U) %u", this->sim->Time(), frame.Index()); 00364 CConsole::SetColor(CConsole::LIGHT_GRAY); 00365 } 00366 #endif 00367 00368 // Check the client state (if not a client state that allows receiving frames, ignore) 00369 if(this->stateClient < UCAST_STREAM_FIRST) 00370 { 00371 this->statDiscardedFrames++; 00372 return; 00373 } 00374 00375 // Check the decoder passed a correct frame 00376 if(frame.Stream() != this->channel->Id()) 00377 { 00378 this->statDiscardedFrames++; 00379 return; 00380 } 00381 00382 // Process frame 00383 (this->*this->processFrame)(frame); 00384 00385 // Statistics 00386 this->statRecvFrames++; 00387 } 00388 00389 void CStreamClientPush::ProcessFrameMcastFirst(CStreamFrame frame) 00390 { 00391 // PLAYBACK Phase 1 : Process first frame to initialize playback buffer 00392 00393 #ifdef LOG 00394 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00395 printf("\t\t[MCAST FIRST]"); 00396 #endif 00397 00398 // Check the state 00399 assert(MCAST_STREAM_FIRST == this->stateClient); 00400 00401 // Reset the buffer 00402 this->buffer->Reset(frame.Index()); 00403 00404 // Add the frame to the buffer 00405 this->buffer->Add(frame); 00406 00407 // Statistics 00408 this->statTimeRecvStart = this->sim->Time(); 00409 00410 // Switch to process buffering frames (phase 2) 00411 this->processFrame = &CStreamClientPush::ProcessFrameMcastBuffering; 00412 00413 // Change the state 00414 this->stateClient = MCAST_STREAM_BUFFERING; 00415 } 00416 00417 void CStreamClientPush::ProcessFrameMcastBuffering(CStreamFrame frame) 00418 { 00419 // PLAYBACK Phase 2 : Buffering phase (playback is not started and frames are saved in the buffer) 00420 00421 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00422 // buffer, plus the buffering size 00423 00424 #ifdef LOG 00425 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00426 printf("\t\t[MCAST BUFFERING]"); 00427 #endif 00428 00429 // Check the state 00430 assert(MCAST_STREAM_BUFFERING == this->stateClient); 00431 00432 // Add the frame to the buffer 00433 this->buffer->Add(frame); 00434 00435 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00436 if(frame.Index() >= this->buffer->CurrentIndex() + this->info->StreamBufferSizeBuffering()) 00437 { 00438 // Find the first independent frame 00439 for(__uint32 count = 0; count < this->buffer->Count(); count++) 00440 { 00441 if(this->buffer->IsCurrentIndependent()) 00442 { 00443 // First frame found : start playback 00444 00445 // Start playback : save session and stat information 00446 this->sessionFrameLastStart = this->buffer->CurrentIndex(); 00447 this->sessionTimeLastStart = this->sim->Time(); 00448 00449 this->statPlayFirstFrame = this->buffer->CurrentIndex(); 00450 this->statTimePlayStart = this->sim->Time(); 00451 00452 // Switch to process playback frames (phase 3) 00453 this->processFrame = &CStreamClientPush::ProcessFrameMcastPlay; 00454 00455 // Change the state 00456 this->stateClient = MCAST_STREAM_PLAY; 00457 00458 // Call the play timer 00459 this->TimerPlayMcast(NULL); 00460 00461 break; 00462 } 00463 else this->buffer->Shift(); 00464 } 00465 } 00466 } 00467 00468 void CStreamClientPush::ProcessFrameMcastPlay(CStreamFrame frame) 00469 { 00470 #ifdef LOG 00471 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00472 printf("\t\t[MCAST PLAY]"); 00473 #endif 00474 00475 // PLAYBACK Phase 3 : Play phase 00476 assert(MCAST_STREAM_PLAY == this->stateClient); 00477 00478 // Add the frame to the buffer 00479 this->buffer->Add(frame); 00480 } 00481 00482 void CStreamClientPush::ProcessFrameMcastWait(CStreamFrame frame) 00483 { 00484 // PLAYBACK Wait : Buffering phase after a buffer underrun (playback is not started and frames are saved in the buffer) 00485 00486 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00487 // buffer, plus the buffering size 00488 00489 #ifdef LOG 00490 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00491 printf("\t\t[MCAST WAIT]"); 00492 #endif 00493 00494 // Check the state 00495 assert(MCAST_STREAM_WAIT == this->stateClient); 00496 00497 // Add the frame to the buffer 00498 this->buffer->Add(frame); 00499 00500 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00501 if(frame.Index() >= this->buffer->CurrentIndex() + this->info->StreamBufferSizeBuffering()) 00502 { 00503 // Find the first frame 00504 for(__uint32 count = 0; count < this->buffer->Count(); count++) 00505 { 00506 if(this->buffer->HasCurrentIndex()) 00507 { 00508 // Check the last stop 00509 assert(this->sim->Time() >= this->sessionTimeLastStop); 00510 00511 // Save statistics 00512 this->statTimeWait += this->sim->Time() - this->sessionTimeLastStop; 00513 00514 // Restart playback 00515 this->sessionFrameLastStart = this->buffer->CurrentIndex(); 00516 this->sessionTimeLastStart = this->sim->Time(); 00517 00518 // Switch to process playback frames (phase 3) 00519 this->processFrame = &CStreamClientPush::ProcessFrameMcastPlay; 00520 00521 // Change the state 00522 this->stateClient = MCAST_STREAM_PLAY; 00523 00524 // Call the play timer 00525 this->TimerPlayMcast(NULL); 00526 00527 break; 00528 } 00529 else 00530 { 00531 // Increment the number of played frames 00532 this->statPlayFrames++; 00533 00534 // Playback sync statistics 00535 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()); 00536 this->statSyncDelayCount++; 00537 00538 // Mark the frame as failed 00539 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex()) < 3); 00540 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())]++; 00541 00542 // Shift the buffer 00543 this->buffer->Shift(); 00544 } 00545 } 00546 } 00547 } 00548 00549 void CStreamClientPush::ProcessFrameUcastFirst(CStreamFrame frame) 00550 { 00551 // PROCESS Phase 1 : Process first frame to initialize playback buffer 00552 00553 #ifdef LOG 00554 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00555 printf("\t\t[UCAST FIRST]"); 00556 #endif 00557 00558 // Check the client state 00559 assert(UCAST_STREAM_FIRST == this->stateClient); 00560 00561 // Reset the buffer 00562 this->buffer->Reset(frame.Index()); 00563 00564 // Add the frame to the buffer and if the frame is new send it to the receivers 00565 if(this->buffer->Add(frame)) this->SendStream(frame); 00566 00567 // Statistics 00568 this->statTimeRecvStart = this->sim->Time(); 00569 00570 // Switch to process buffering frames (phase 2) 00571 this->processFrame = &CStreamClientPush::ProcessFrameUcastBuffering; 00572 00573 // Change the client state 00574 this->stateClient = UCAST_STREAM_BUFFERING; 00575 } 00576 00577 void CStreamClientPush::ProcessFrameUcastBuffering(CStreamFrame frame) 00578 { 00579 // PLAYBACK Phase 2 : Buffering phase (playback is not started and frames are saved in the buffer) 00580 00581 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00582 // buffer, plus the buffering size 00583 00584 #ifdef LOG 00585 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00586 printf("\t\t[UCAST BUFFERING]\t(frame_index=%u current_index=%u buffering_size=%u buffer_num=%u)", frame.Index(), this->buffer->CurrentIndex(), this->info->StreamBufferSizeBuffering(), this->buffer->Count()); 00587 #endif 00588 00589 // Check the state 00590 assert(UCAST_STREAM_BUFFERING == this->stateClient); 00591 00592 // Add the frame to the buffer and if the frame is new send it to the receivers 00593 if(this->buffer->Add(frame)) this->SendStream(frame); 00594 00595 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00596 if(frame.Index() >= this->buffer->CurrentIndex() + this->info->StreamBufferSizeBuffering()) 00597 { 00598 // Find the first independent frame 00599 for(__uint32 count = 0; count < this->buffer->Count(); count++) 00600 { 00601 if(this->buffer->IsCurrentIndependent()) 00602 { 00603 // First frame found : start playback 00604 00605 // Start playback : save session and stat information 00606 this->sessionFrameLastStart = this->buffer->CurrentIndex(); 00607 this->sessionTimeLastStart = this->sim->Time(); 00608 00609 this->statPlayFirstFrame = this->buffer->CurrentIndex(); 00610 this->statTimePlayStart = this->sim->Time(); 00611 00612 // Switch to process playback frames (phase 3) 00613 this->processFrame = &CStreamClientPush::ProcessFrameUcastPlay; 00614 00615 // Change the state 00616 this->stateClient = UCAST_STREAM_PLAY; 00617 00618 // Call the play timer 00619 this->TimerPlayUcast(NULL); 00620 00621 // Set the registration timer 00622 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 00623 this->timerRegister->SetAfter(this->info->BootRegisterDelay()); 00624 00625 break; 00626 } 00627 else this->buffer->Shift(); 00628 } 00629 } 00630 } 00631 00632 void CStreamClientPush::ProcessFrameUcastPlay(CStreamFrame frame) 00633 { 00634 #ifdef LOG 00635 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00636 printf("\t\t[UCAST PLAY]"); 00637 #endif 00638 00639 // PLAYBACK Phase 3 : Play phase 00640 assert(UCAST_STREAM_PLAY == this->stateClient); 00641 00642 // Add the frame to the buffer and if the frame is new send it to the receivers 00643 if(this->buffer->Add(frame)) this->SendStream(frame); 00644 } 00645 00646 void CStreamClientPush::ProcessFrameUcastWait(CStreamFrame frame) 00647 { 00648 // PLAYBACK Wait : Buffering phase after a buffer underrun (playback is not started and frames are saved in the buffer) 00649 00650 #ifdef LOG 00651 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00652 printf("\t\t[UCAST WAIT]"); 00653 #endif 00654 00655 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00656 // buffer, plus the buffering size 00657 00658 // Check the state 00659 assert(UCAST_STREAM_WAIT == this->stateClient); 00660 00661 // Add the frame to the buffer and if the frame is new send it to the receivers 00662 if(this->buffer->Add(frame)) this->SendStream(frame); 00663 00664 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00665 if(frame.Index() >= this->buffer->CurrentIndex() + this->info->StreamBufferSizeBuffering()) 00666 { 00667 // Find the first frame 00668 for(__uint32 count = 0; count < this->buffer->Count(); count++) 00669 { 00670 if(this->buffer->HasCurrentIndex()) 00671 { 00672 // Check the last stop 00673 assert(this->sim->Time() >= this->sessionTimeLastStop); 00674 00675 #ifdef LOG 00676 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00677 { 00678 CConsole::SetColor(CConsole::LIGHT_YELLOW); 00679 printf("\n\tT = %7.3lf WAIT : %7.3lf (%7.3lf - %7.3lf)", this->sim->Time(), this->sim->Time() - this->sessionTimeLastStop, this->sessionTimeLastStop, this->sim->Time()); 00680 CConsole::SetColor(CConsole::LIGHT_GRAY); 00681 } 00682 #endif 00683 // Save statistics 00684 this->statTimeWait += this->sim->Time() - this->sessionTimeLastStop; 00685 00686 // Restart playback 00687 this->sessionFrameLastStart = this->buffer->CurrentIndex(); 00688 this->sessionTimeLastStart = this->sim->Time(); 00689 00690 // Switch to process playback frames (phase 3) 00691 this->processFrame = &CStreamClientPush::ProcessFrameUcastPlay; 00692 00693 // Change the state 00694 this->stateClient = UCAST_STREAM_PLAY; 00695 00696 // Call the play timer 00697 this->TimerPlayUcast(NULL); 00698 00699 // Set the registration timer (cancel and reset) 00700 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 00701 this->timerRegister->SetAfter(this->info->BootRegisterDelay()); 00702 00703 // If a buffer underrun timer is set, cancel 00704 if(this->timerBuffer->IsSet()) this->timerBuffer->Cancel(); 00705 00706 break; 00707 } 00708 else 00709 { 00710 // Increment the number of played frames 00711 this->statPlayFrames++; 00712 00713 // Playback sync statistics 00714 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()); 00715 this->statSyncDelayCount++; 00716 00717 // Mark the frame as failed 00718 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex()) < 3); 00719 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())]++; 00720 00721 #ifdef LOG 00722 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00723 { 00724 CConsole::SetColor(CConsole::LIGHT_RED); 00725 printf("\n\t\tT = %7.3lf PLAY FAIL %6u\t%c : %s", this->sim->Time(), this->buffer->CurrentIndex(), stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())], 00726 this->buffer->HasCurrentIndex()?"DECODE_FAIL":"MISSING"); 00727 CConsole::SetColor(CConsole::LIGHT_GRAY); 00728 } 00729 #endif 00730 // Shift the buffer 00731 this->buffer->Shift(); 00732 } 00733 } 00734 } 00735 } 00736 00737 void CStreamClientPush::ProcessFrameUcastReset(CStreamFrame frame) 00738 { 00739 // PLAYBACK Reset : Reset the buffer after a long buffer underrun (playback is not started and frames are saved in the buffer) 00740 00741 #ifdef LOG 00742 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00743 printf("\t\t[UCAST RESET]"); 00744 #endif 00745 00746 // After the buffer is reset by the first frame, switch to wait state and wait for the buffer to fill again 00747 // Check the state 00748 assert(UCAST_STREAM_RESET == this->stateClient); 00749 00750 // Reset the buffer 00751 this->buffer->Reset(frame.Index()); 00752 00753 // Add the frame to the buffer and if the frame is new send it to the receivers 00754 if(this->buffer->Add(frame)) this->SendStream(frame); 00755 00756 // Switch to the wait state 00757 this->processFrame = &CStreamClientPush::ProcessFrameUcastWait; 00758 00759 // Change the state 00760 this->stateClient = UCAST_STREAM_WAIT; 00761 } 00762 00763 void CStreamClientPush::StreamBootstrap() 00764 { 00765 // Streaming STEP 1 : Bootstraping (connect to bootstrap server and retrieve list of neighbors) 00766 00767 #ifdef LOG 00768 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00769 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP REQUEST %3u", this->sim->Time(), this->address.Address(), this->channel->Id()); 00770 #endif 00771 00772 // Check the state 00773 assert(UCAST_BOOTSTRAP_REQUEST == this->stateClient); 00774 00775 // Send a boot push query message to the bootstrap server (the channel address) 00776 CStreamMessageBootPushRequest* message = new CStreamMessageBootPushRequest( 00777 this->channel->Id(), 00778 this->info->BootQueryMax() 00779 ); 00780 00781 // Send the message 00782 (*this->delegateSendMessage)(this->channel->Address(), message); 00783 00784 // Activate the timer to resend the query after a timeout period 00785 this->timerBoot->SetAfter(this->info->BootQueryTimeout()); 00786 00787 // Change the state 00788 this->stateClient = UCAST_BOOTSTRAP_RESPONSE; 00789 } 00790 00791 void CStreamClientPush::StreamJoin() 00792 { 00793 // Streaming STEP 2a : Joining (start streaming from the senders) 00794 00795 #ifdef LOG 00796 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00797 printf("\n\tT = %7.3lf PEER %3u JOIN %3u -> %u", this->sim->Time(), this->address.Address(), this->channel->Id(), this->sender.Address()); 00798 #endif 00799 00800 // Check the state 00801 assert(UCAST_JOIN_REQUEST == this->stateClient); 00802 00803 if(NULL == this->channel) return; 00804 00805 // Send a join request to the sender 00806 CStreamMessagePushJoin* message = new CStreamMessagePushJoin(this->channel->Id()); 00807 00808 (*this->delegateSendMessage)(this->sender, message); 00809 00810 // Get ready to receive the stream : start streaming session with phase 1 00811 this->processFrame = &CStreamClientPush::ProcessFrameUcastFirst; 00812 00813 // Change the state 00814 this->stateClient = UCAST_STREAM_FIRST; 00815 } 00816 00817 void CStreamClientPush::StreamLeave() 00818 { 00819 // Streaming STEP 3a : Leave (inform the senders) 00820 00821 // Send a leave to the sender 00822 CStreamMessagePushLeave* message = new CStreamMessagePushLeave(this->channel->Id()); 00823 00824 (*this->delegateSendMessage)(this->sender, message); 00825 } 00826 00827 void CStreamClientPush::StreamRegister() 00828 { 00829 // Streaming STEP 2b : Register (register to bootstrap server) 00830 00831 // Check the client state (client must start playing in order to register) 00832 assert(this->stateClient >= UCAST_STREAM_FIRST); 00833 00834 // Check the registration state 00835 assert(NOT_REGISTERED == this->stateRegistration); 00836 00837 CStreamMessageBootPushRegister* message = new CStreamMessageBootPushRegister(this->channel->Id(), this->address, this->sender); 00838 00839 (*this->delegateSendMessage)(this->channel->Address(), message); 00840 00841 // Change the registration state 00842 this->stateRegistration = REGISTERED; 00843 } 00844 00845 void CStreamClientPush::StreamDeregister() 00846 { 00847 // Check the resgistration state 00848 assert(REGISTERED == this->stateRegistration); 00849 00850 // Streaming STEP 3b : Deregister (deregister from bootstrap server) 00851 CStreamMessageBootPushDeregister* message = new CStreamMessageBootPushDeregister(this->channel->Id(), this->address); 00852 00853 (*this->delegateSendMessage)(this->channel->Address(), message); 00854 00855 // Change the registration state 00856 this->stateRegistration = NOT_REGISTERED; 00857 } 00858 00859 void CStreamClientPush::StreamClose() 00860 { 00861 // Streaming STEP ASYNC : Close (sent to receivers upon leaving or when receiving a request that cannot be served) 00862 00863 for(set<CAddress>::iterator iter = this->receivers.begin(); iter != this->receivers.end(); iter++) 00864 { 00865 CStreamMessagePushClose* message = new CStreamMessagePushClose(this->channel->Id()); 00866 00867 (*this->delegateSendMessage)(*iter, message); 00868 00869 #ifdef LOG 00870 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00871 printf("\n\tT = %7.3lf PEER %3u CLOSE RECEIVER %3u", this->sim->Time(), this->address.Address(), ((CAddress)(*iter)).Address()); 00872 #endif 00873 } 00874 00875 // Clear the receivers list 00876 this->receivers.clear(); 00877 00878 // Set the used bandwidth to zero 00879 this->bwUsed = 0; 00880 } 00881 00882 void CStreamClientPush::SendStream(CStreamFrame frame) 00883 { 00884 // Check the client state 00885 assert(this->stateClient >= UCAST_STREAM_FIRST); 00886 00887 #ifdef LOG 00888 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00889 { 00890 CConsole::SetColor(CConsole::DARK_GRAY); 00891 printf("\n\t\tT = %7.3lf SEND FRAME (U) %u : [ ", this->sim->Time(), frame.Index()); 00892 } 00893 #endif 00894 00895 for(set<CAddress>::iterator iter = this->receivers.begin(); iter != this->receivers.end(); iter++) 00896 { 00897 // Send frame to the encoder 00898 this->encoder->Encode(*iter, frame); 00899 00900 #ifdef LOG 00901 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00902 printf("%u ", ((CAddress)(*iter)).Address()); 00903 #endif 00904 } 00905 00906 #ifdef LOG 00907 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00908 { 00909 printf("]"); 00910 CConsole::SetColor(CConsole::LIGHT_GRAY); 00911 } 00912 #endif 00913 } 00914 00915 void CStreamClientPush::RecvMessageJoin(CAddress src, CStreamMessagePushJoin* message) 00916 { 00917 // Process received JOIN 00918 00919 // Check whether the client can serve the channel 00920 if(NULL == this->channel) { this->RejectJoin(src, message); return; } 00921 if(this->channel->Id() != message->Stream()) { this->RejectJoin(src, message); return; } 00922 00923 // Check whether the client is registered 00924 if(REGISTERED != this->stateRegistration) { this->RejectJoin(src, message); return; } 00925 00926 #ifdef LOG 00927 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00928 printf("\n\tT = %7.3lf PEER %3u RECV JOIN %u : ", this->sim->Time(), this->address.Address(), src.Address()); 00929 #endif 00930 00931 // Check whether the peer would have bandwidth to serve the channel considering the current usage 00932 if(this->bw - this->bwUsed < this->info->StreamBwMargin() * this->channel->Bw()) 00933 { 00934 00935 #ifdef LOG 00936 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00937 { 00938 CConsole::SetColor(CConsole::LIGHT_RED); 00939 printf("REJECT"); 00940 CConsole::SetColor(CConsole::LIGHT_GRAY); 00941 } 00942 #endif 00943 00944 // Reject the request 00945 this->RejectJoin(src, message); 00946 // Deregister 00947 this->StreamDeregister(); 00948 return; 00949 } 00950 00951 #ifdef LOG 00952 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00953 { 00954 CConsole::SetColor(CConsole::LIGHT_GREEN); 00955 printf("ACCEPT"); 00956 CConsole::SetColor(CConsole::LIGHT_GRAY); 00957 } 00958 #endif 00959 00960 // Acceptance conditions met : add the host to the receivers list 00961 this->receivers.insert(src); 00962 00963 // Estimate the increase in bandwidth usage 00964 this->bwUsed += this->channel->Bw(); 00965 } 00966 00967 void CStreamClientPush::RecvMessageLeave(CAddress src, CStreamMessagePushLeave* message) 00968 { 00969 // Process received LEAVE 00970 00971 // Check whether the peer is serving the channel (may be a delayed message) 00972 if(NULL == this->channel) return; 00973 if(this->channel->Id() != message->Stream()) return; 00974 00975 // Check if the host is in the receivers list 00976 if(this->receivers.find(src) != this->receivers.end()) 00977 { 00978 // Remove the host from the receivers list 00979 this->receivers.erase(src); 00980 00981 // Estimate the decrease in bandwidth usage 00982 this->bwUsed -= this->channel->Bw(); 00983 } 00984 } 00985 00986 void CStreamClientPush::RecvMessageClose(CAddress src, CStreamMessagePushClose* message) 00987 { 00988 // Process received CLOSE 00989 00990 #ifdef LOG 00991 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00992 printf("\n\tT = %7.3lf PEER %3u CLOSE %3u <- %u", this->sim->Time(), this->address.Address(), this->channel->Id(), src.Address()); 00993 #endif 00994 00995 // Check the close is for current channel 00996 if(NULL == this->channel) return; 00997 if(this->channel->Id() != message->Stream()) return; 00998 00999 // Check the close has been sent by the current sender (otherwise, ignore) 01000 if(src != this->sender) return; 01001 01002 // Check the state of the client (if the close message is delayed) 01003 if(this->stateClient < UCAST_STREAM_FIRST) return; 01004 01005 // If more neighbors are available 01006 if(this->neighbors.size() > 0) 01007 { 01008 // Select a new random neighbor as a receiver for this layer 01009 __uint32 index = CRand::Generate(this->neighbors.size()); 01010 assert(index < this->neighbors.size()); 01011 01012 __uint32 idx = 0; 01013 for(set<CAddress>::iterator iter = this->neighbors.begin(); iter != this->neighbors.end(); iter++, idx++) 01014 { 01015 if(idx == index) 01016 { 01017 this->sender = *iter; // Set the sender 01018 break; 01019 } 01020 } 01021 01022 // Remove the sender from the neighbors list 01023 this->neighbors.erase(this->sender); 01024 01025 // If the number of remaining neighbors is less than the threshold, refresh the list of neighbors 01026 if(this->neighbors.size() <= this->info->BootRefreshThreshold()) 01027 { 01028 #ifdef LOG 01029 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01030 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP RE-REQUEST %3u (%u neighbors remaining / %u threshold)", this->sim->Time(), this->address.Address(), this->channel->Id(), this->neighbors.size(), this->info->BootRefreshThreshold()); 01031 #endif 01032 // Send a boot push query message to the bootstrap server (the channel address) 01033 CStreamMessageBootPushRequest* request = new CStreamMessageBootPushRequest( 01034 this->channel->Id(), 01035 this->info->BootQueryMax() 01036 ); 01037 01038 (*this->delegateSendMessage)(this->channel->Address(), request); 01039 } 01040 } 01041 // Else, use the server 01042 else 01043 { 01044 this->sender = this->channel->Address(); 01045 } 01046 01047 #ifdef LOG 01048 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01049 printf("\n\tT = %7.3lf PEER %3u JOIN %3u -> %u", this->sim->Time(), this->address.Address(), this->channel->Id(), this->sender.Address()); 01050 #endif 01051 01052 // Send a JOIN to the new sender 01053 CStreamMessagePushJoin* reply = new CStreamMessagePushJoin(this->channel->Id()); 01054 01055 (*this->delegateSendMessage)(this->sender, reply); 01056 } 01057 01058 void CStreamClientPush::RecvMessageBootResponse(CAddress src, CStreamMessageBootPushResponse* message) 01059 { 01060 // Process received BOOT RESPONSE 01061 01062 // If no current channel, do nothing (delayed response) 01063 if(NULL == this->channel) return; 01064 01065 // If the message is not for this channel, do nothing (delayed response) 01066 if(message->Stream() != this->channel->Id()) return; 01067 01068 // If the boot timer is active, cancel 01069 if(this->timerBoot->IsSet()) this->timerBoot->Cancel(); 01070 01071 #ifdef LOG 01072 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01073 { 01074 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP RESPONSE %3u : [%u neighbors] : ", this->sim->Time(), this->address.Address(), this->channel->Id(), message->Count()); 01075 for(__uint32 host = 0; host < message->Count(); host++) printf("%u ", message->Host(host).Address()); 01076 } 01077 #endif 01078 01079 // Process the response, depending on the client state 01080 if(UCAST_BOOTSTRAP_RESPONSE == this->stateClient) 01081 { 01082 // CASE 1 : Bootstrap request sent at the beginning of the session; this populates the neighbor 01083 // list and changes the state of the client 01084 01085 01086 // Process the response 01087 if(message->Count() > 0) 01088 { 01089 // If there are neighbors available (otherwise, the server is used by default) 01090 01091 // Choose a random neighbor to the the sender for this layer 01092 __uint32 sender = CRand::Generate(message->Count()); 01093 assert(sender < message->Count()); 01094 01095 // Save the neighbors 01096 for(__uint32 host = 0; host < sender; host++) 01097 this->neighbors.insert(message->Host(host)); 01098 for(__uint32 host = sender+1; host < message->Count(); host++) 01099 this->neighbors.insert(message->Host(host)); 01100 01101 // Save the sender 01102 this->sender = message->Host(sender); 01103 } 01104 01105 // Change the state 01106 this->stateClient = UCAST_JOIN_REQUEST; 01107 01108 // Go to streaming STEP 2a 01109 this->StreamJoin(); 01110 } 01111 else 01112 { 01113 // CASE 2 : Refresh bootstrap request sent during a session; this refreshes the neighbor 01114 // list but does not change the state of the client or the sender 01115 01116 // Process the response 01117 // Clear the neighbors list 01118 this->neighbors.clear(); 01119 01120 // Save the neighbors 01121 for(__uint32 host = 0; host < message->Count(); host++) 01122 this->neighbors.insert(message->Host(host)); 01123 } 01124 } 01125 01126 void CStreamClientPush::RejectJoin(CAddress src, CStreamMessagePushJoin* message) 01127 { 01128 // Reject JOIN request : send a CLOSE message to the sender 01129 CStreamMessagePushClose* reply = new CStreamMessagePushClose(message->Stream()); 01130 01131 (*this->delegateSendMessage)(src, reply); 01132 } 01133 01134 void CStreamClientPush::TimerPlayMcast(CTimerInfo* info) 01135 { 01136 assert(MCAST_STREAM_PLAY == this->stateClient); 01137 01138 // Check if there is a buffer underrun 01139 if(this->buffer->NumPlay() == 0) 01140 { 01141 // Change the state 01142 this->stateClient = MCAST_STREAM_WAIT; 01143 01144 // Pause playback 01145 this->sessionTimeLastStop = this->sim->Time(); 01146 01147 // Switch to process wait frames (phase 3) 01148 this->processFrame = &CStreamClientPush::ProcessFrameMcastWait; 01149 01150 return; 01151 } 01152 01153 // Play the first frame from the buffer 01154 this->statPlayFrames++; 01155 01156 // Playback sync statistics 01157 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()); 01158 this->statSyncDelayCount++; 01159 01160 if(this->buffer->IsCurrentDecodable()) 01161 { 01162 this->statPlayLastFrame = this->buffer->CurrentIndex(); 01163 assert(this->buffer->CurrentFrame()->Type() < 3); 01164 this->statSuccessFrames[this->buffer->CurrentFrame()->Type()]++; 01165 01166 #ifdef LOG 01167 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01168 { 01169 CConsole::SetColor(CConsole::LIGHT_GREEN); 01170 printf("\n\t\tT = %7.3lf PLAY OKAY %6u\t%c", this->sim->Time(), this->buffer->CurrentIndex(), stringFrameType[this->buffer->CurrentFrame()->Type()]); 01171 CConsole::SetColor(CConsole::LIGHT_GRAY); 01172 printf("\t\t(time=%.3lf frame_time=%.3lf sync_delay=%.3lf)", this->sim->Time(), this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()), 01173 this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex())); 01174 } 01175 #endif 01176 } 01177 else 01178 { 01179 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex()) < 3); 01180 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())]++; 01181 01182 #ifdef LOG 01183 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01184 { 01185 CConsole::SetColor(CConsole::LIGHT_RED); 01186 printf("\n\t\tT = %7.3lf PLAY FAIL %6u\t%c : %s", this->sim->Time(), this->buffer->CurrentIndex(), stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())], 01187 this->buffer->HasCurrentIndex()?"DECODE_FAIL":"MISSING"); 01188 CConsole::SetColor(CConsole::LIGHT_GRAY); 01189 printf("\t\t(time=%.3lf frame_time=%.3lf sync_delay=%.3lf)", this->sim->Time(), this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()), 01190 this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex())); 01191 } 01192 #endif 01193 } 01194 01195 // Shift the buffer 01196 this->buffer->Shift(); 01197 01198 // Schedule next play event 01199 this->timerPlayMcast->SetAt(this->sessionTimeLastStart + ((__time)(this->buffer->CurrentIndex() - this->sessionFrameLastStart)) / (__time)this->channel->Fps()); 01200 } 01201 01202 void CStreamClientPush::TimerPlayUcast(CTimerInfo* info) 01203 { 01204 assert(UCAST_STREAM_PLAY == this->stateClient); 01205 01206 // Check if there is a buffer underrun 01207 if(this->buffer->NumPlay() == 0) 01208 { 01209 // Change the state 01210 this->stateClient = UCAST_STREAM_WAIT; 01211 01212 // Pause playback 01213 this->sessionTimeLastStop = this->sim->Time(); 01214 01215 // If the host is registered, deregister 01216 if(REGISTERED == this->stateRegistration) this->StreamDeregister(); 01217 01218 // If the registration timer is set, cancel the registration 01219 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 01220 01221 // Switch to process wait frames (phase 3) 01222 this->processFrame = &CStreamClientPush::ProcessFrameUcastWait; 01223 01224 // Set a buffer underrun timer: if the buffer underrun is not fixed until the timer expires, select a new sender 01225 if(this->timerBuffer->IsSet()) this->timerBuffer->Cancel(); 01226 01227 // Set the info 01228 this->timerBufferChannel = this->channel->Id(); 01229 this->timerBuffer->SetAfter(this->info->StreamBufferUnderrunTimeout()); 01230 01231 #ifdef LOG 01232 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01233 { 01234 CConsole::SetColor(CConsole::LIGHT_YELLOW); 01235 printf("\n\t\tT = %7.3lf PLAY WAIT", this->sim->Time()); 01236 CConsole::SetColor(CConsole::LIGHT_GRAY); 01237 } 01238 #endif 01239 return; 01240 } 01241 01242 // Play the first frame from the buffer 01243 this->statPlayFrames++; 01244 01245 // Playback sync statistics 01246 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()); 01247 this->statSyncDelayCount++; 01248 01249 if(this->buffer->IsCurrentDecodable()) 01250 { 01251 assert(this->buffer->HasCurrentIndex()); 01252 assert(this->buffer->CurrentIndex() == this->buffer->CurrentFrame()->Index()); 01253 assert(this->buffer->CurrentFrame()->Type() < 3); 01254 01255 this->statPlayLastFrame = this->buffer->CurrentIndex(); 01256 this->statSuccessFrames[this->buffer->CurrentFrame()->Type()]++; 01257 01258 #ifdef LOG 01259 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01260 { 01261 CConsole::SetColor(CConsole::LIGHT_GREEN); 01262 printf("\n\t\tT = %7.3lf PLAY OKAY %6u\t%c", this->sim->Time(), this->buffer->CurrentIndex(), stringFrameType[this->buffer->CurrentFrame()->Type()]); 01263 CConsole::SetColor(CConsole::LIGHT_GRAY); 01264 printf("\t\t(time=%.3lf frame_time=%.3lf sync_delay=%.3lf)", this->sim->Time(), this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()), 01265 this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex())); 01266 } 01267 #endif 01268 } 01269 else 01270 { 01271 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex()) < 3); 01272 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())]++; 01273 01274 #ifdef LOG 01275 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01276 { 01277 CConsole::SetColor(CConsole::LIGHT_RED); 01278 printf("\n\t\tT = %7.3lf PLAY FAIL %6u\t%c : %s", this->sim->Time(), this->buffer->CurrentIndex(), stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())], 01279 this->buffer->HasCurrentIndex()?"DECODE_FAIL":"MISSING"); 01280 CConsole::SetColor(CConsole::LIGHT_GRAY); 01281 printf("\t\t(time=%.3lf frame_time=%.3lf sync_delay=%.3lf)", this->sim->Time(), this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()), 01282 this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex())); 01283 } 01284 #endif 01285 } 01286 01287 // Shift the buffer 01288 this->buffer->Shift(); 01289 01290 // Schedule next play event 01291 this->timerPlayUcast->SetAt(this->sessionTimeLastStart + ((__time)(this->buffer->CurrentIndex() - this->sessionFrameLastStart)) / (__time)this->channel->Fps()); 01292 } 01293 01294 void CStreamClientPush::TimerBoot(CTimerInfo* info) 01295 { 01296 if(NULL == this->channel) return; 01297 01298 // Bootstrap timer : resend the boostrap query 01299 CStreamMessageBootPushRequest* message = new CStreamMessageBootPushRequest( 01300 this->channel->Id(), 01301 this->info->BootQueryMax() 01302 ); 01303 01304 (*this->delegateSendMessage)(this->channel->Address(), message); 01305 01306 // Activate the timer to resend the query after a timeout period 01307 this->timerBoot->SetAfter(this->info->BootQueryTimeout()); 01308 } 01309 01310 void CStreamClientPush::TimerRegister(CTimerInfo* info) 01311 { 01312 // Register 01313 this->StreamRegister(); 01314 } 01315 01316 void CStreamClientPush::TimerBuffer(CTimerInfo* info) 01317 { 01318 // Buffer underrun timer : if a buffer underrun instance is not self-repairing (i.e. caused by delayed packets), 01319 // select a new source 01320 01321 // First, check the buffer underrun instance is genuine : the client must be in a wait state 01322 if(UCAST_STREAM_WAIT != this->stateClient) return; 01323 01324 // Check the timer is set for the current channel 01325 if(NULL == this->channel) return; 01326 assert(timerBufferChannel == this->channel->Id()); 01327 if(this->channel->Id() != this->timerBufferChannel) return; 01328 01329 // Then, send a leave message to the old sender 01330 this->StreamLeave(); 01331 01332 // Select a new sender from the neighbors list : if more neighbors are available 01333 if(this->neighbors.size() > 0) 01334 { 01335 // Select a new random neighbor as a receiver for this layer 01336 __uint32 index = CRand::Generate(this->neighbors.size()); 01337 assert(index < this->neighbors.size()); 01338 01339 __uint32 idx = 0; 01340 for(set<CAddress>::iterator iter = this->neighbors.begin(); iter != this->neighbors.end(); iter++, idx++) 01341 { 01342 if(idx == index) 01343 { 01344 this->sender = *iter; 01345 break; 01346 } 01347 } 01348 01349 // Remove the sender from the neighbors list 01350 this->neighbors.erase(this->sender); 01351 01352 // If the number of remaining neighbors is less than the threshold, refresh the list of neighbors 01353 if(this->neighbors.size() <= this->info->BootRefreshThreshold()) 01354 { 01355 // Send a boot push query message to the bootstrap server (the channel address) 01356 CStreamMessageBootPushRequest* message = new CStreamMessageBootPushRequest( 01357 this->channel->Id(), 01358 this->info->BootQueryMax() 01359 ); 01360 01361 (*this->delegateSendMessage)(this->channel->Address(), message); 01362 } 01363 } 01364 // Else, use the server 01365 else this->sender = this->channel->Address(); 01366 01367 // Send a JOIN to the new sender 01368 CStreamMessagePushJoin* reply = new CStreamMessagePushJoin(this->channel->Id()); 01369 01370 (*this->delegateSendMessage)(this->sender, reply); 01371 01372 // Switch the client to the reset state (after a long buffer underrun, the client must be resync with the the frames sent by the source) 01373 this->stateClient = UCAST_STREAM_RESET; 01374 this->processFrame = &CStreamClientPush::ProcessFrameUcastReset; 01375 } 01376 01377 void CStreamClientPush::Finalize() 01378 { 01379 }
Last updated: February 8, 2011