Alex Bikfalvi
SimStream Documentation
StreamClientPushMulti.cpp
00001 #include "Headers.h" 00002 #include "StreamClientPushMulti.h" 00003 #include "Rand.h" 00004 #include "Console.h" 00005 #include "Debug.h" 00006 00007 CStreamClientPushMulti::CLayer::CLayer( 00008 __uint32 layer, 00009 CSimHandler* sim, 00010 CStreamClientPushMulti* client, 00011 CLayersInfo* info, 00012 void (CStreamClientPushMulti::*timerBootHandler)(CTimerInfo*), 00013 void (CStreamClientPushMulti::*timerRegisterHandler)(CTimerInfo*), 00014 void (CStreamClientPushMulti::*timerBufferHandler)(CTimerInfo*) 00015 ) 00016 { 00017 // Layer 00018 this->layer = layer; 00019 00020 // Layer state 00021 this->stateLayer = LAYER_IDLE; 00022 this->stateRegister = NOT_REGISTERED; 00023 00024 // Layers info 00025 this->info = info; 00026 00027 // Process function 00028 this->process = NULL; 00029 00030 // Timers 00031 this->timerRegister = new CTimer<CStreamClientPushMulti>(sim, client, timerRegisterHandler); 00032 this->timerBuffer = new CTimer<CStreamClientPushMulti>(sim, client, timerBufferHandler); 00033 } 00034 00035 CStreamClientPushMulti::CLayer::~CLayer() 00036 { 00037 // Delete timer 00038 delete this->timerRegister; 00039 delete this->timerBuffer; 00040 } 00041 00042 CStreamClientPushMulti::CLayersInfo::CLayersInfo() 00043 { 00044 this->numBuffering = 0; 00045 this->numReady = 0; 00046 this->numRegistered = 0; 00047 } 00048 00049 void CStreamClientPushMulti::CLayer::Start(CChannel* channel) 00050 { 00051 assert(LAYER_IDLE == this->stateLayer); 00052 assert(NOT_REGISTERED == this->stateRegister); 00053 00054 // Set the channel 00055 this->channel = channel; 00056 00057 // Set layer state 00058 this->stateLayer = LAYER_BUFFERING; 00059 00060 // Check the timers 00061 assert(!this->timerBuffer->IsSet()); 00062 assert(!this->timerRegister->IsSet()); 00063 00064 // Update info 00065 this->info->IdleToBuffering(); 00066 00067 // Reset streaming info 00068 this->neighbors.clear(); 00069 this->receivers.clear(); 00070 this->sender = channel->Address(); 00071 this->process = NULL; 00072 } 00073 00074 void CStreamClientPushMulti::CLayer::Stop() 00075 { 00076 assert(LAYER_IDLE != this->stateLayer); 00077 00078 // Set the layer state 00079 this->stateLayer = LAYER_IDLE; 00080 00081 // Cancel timers 00082 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 00083 if(this->timerBuffer->IsSet()) this->timerBuffer->Cancel(); 00084 } 00085 00086 void CStreamClientPushMulti::CLayer::Buffering() 00087 { 00088 assert(LAYER_READY == this->stateLayer); 00089 00090 this->stateLayer = LAYER_BUFFERING; 00091 00092 // Change the info 00093 this->info->ReadyToBuffering(); 00094 } 00095 00096 void CStreamClientPushMulti::CLayer::Ready() 00097 { 00098 assert(LAYER_BUFFERING == this->stateLayer); 00099 00100 this->stateLayer = LAYER_READY; 00101 00102 // Change the info 00103 this->info->BufferingToReady(); 00104 } 00105 00106 CStreamClientPushMulti::CStreamClientPushMulti( 00107 CSimHandler* sim, 00108 CInfoPushMulti* info, 00109 CAddress address, 00110 IDelegate2<void, CAddress, CPacketStream*>* delegateSendStream, 00111 IDelegate2<void, CAddress, CStreamMessage*>* delegateSendMessage, 00112 IDelegate2<void, CAddress, __uint32>* delegateIgmpJoin, 00113 IDelegate1<void, CAddress>* delegateIgmpLeave, 00114 IDelegate0<CLink*>* delegateLink, 00115 __uint32 entry, 00116 __bitrate bw 00117 ) : CStreamClient(sim) 00118 { 00119 assert(info); 00120 00121 assert(delegateSendStream); 00122 assert(delegateSendMessage); 00123 assert(delegateIgmpJoin); 00124 assert(delegateIgmpLeave); 00125 assert(delegateLink); 00126 00127 // Set the initial state 00128 this->stateClient = STOPPED; 00129 00130 // Set simulator 00131 this->info = info; 00132 this->address = address; 00133 00134 // Set the channel 00135 this->channel = NULL; 00136 00137 // Create the buffers 00138 this->bufferMcast = new CStreamBuffer( 00139 this->info->StreamBufferMcastSize(), 00140 this->info->StreamBufferMcastSizeHistory() 00141 ); 00142 this->bufferUcast = new CStreamBufferMulti( 00143 this->info->NumLayers(), 00144 this->info->StreamBufferUcastSize(), 00145 this->info->StreamBufferUcastSizeHistory(), 00146 this->info->MpegGopSize() 00147 ); 00148 00149 // Unicast session 00150 this->bw = bw; 00151 this->bwUsed = 0; 00152 00153 // Multicast session 00154 this->entry = entry; 00155 00156 // Create the timers 00157 this->timerPlayMcast = new CTimer<CStreamClientPushMulti>( 00158 this->sim, 00159 this, 00160 &CStreamClientPushMulti::TimerPlayMcast 00161 ); 00162 this->timerPlayUcast = new CTimer<CStreamClientPushMulti>( 00163 this->sim, 00164 this, 00165 &CStreamClientPushMulti::TimerPlayUcast 00166 ); 00167 this->timerBoot = new CTimer<CStreamClientPushMulti>( 00168 this->sim, 00169 this, 00170 &CStreamClientPushMulti::TimerBoot 00171 ); 00172 00173 // Set the lower layer delegates 00174 this->delegateSendStream = delegateSendStream; 00175 this->delegateSendMessage = delegateSendMessage; 00176 this->delegateIgmpJoin = delegateIgmpJoin; 00177 this->delegateIgmpLeave = delegateIgmpLeave; 00178 this->delegateLink = delegateLink; 00179 00180 // Delegates 00181 this->delegateRecvFrameMcast = new Delegate2<CStreamClientPushMulti, void, CAddress, CStreamFrame>(this, &CStreamClientPushMulti::RecvFrameMcast); 00182 this->delegateRecvFrameUcast = new Delegate2<CStreamClientPushMulti, void, CAddress, CStreamFrame>(this, &CStreamClientPushMulti::RecvFrameUcast); 00183 00184 // Encoder 00185 this->encoder = new CStreamEncoderFrame(this->delegateSendStream); 00186 00187 // Decoder 00188 this->decoder = NULL; 00189 this->decoderMcast = new CStreamDecoderFrame(this->delegateRecvFrameMcast); 00190 this->decoderUcast = new CStreamDecoderFrame(this->delegateRecvFrameUcast); 00191 00192 // Layers 00193 this->layers = new CLayer*[this->info->NumLayers()]; 00194 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00195 { 00196 this->layers[index] = new CLayer( 00197 index, 00198 this->sim, 00199 this, 00200 &this->layersInfo, 00201 &CStreamClientPushMulti::TimerBoot, 00202 &CStreamClientPushMulti::TimerRegister, 00203 &CStreamClientPushMulti::TimerBuffer 00204 ); 00205 } 00206 } 00207 00208 CStreamClientPushMulti::~CStreamClientPushMulti() 00209 { 00210 // Delete delegates 00211 delete this->delegateRecvFrameMcast; 00212 delete this->delegateRecvFrameUcast; 00213 00214 // Delete coders 00215 delete this->encoder; 00216 delete this->decoderMcast; 00217 delete this->decoderUcast; 00218 00219 // Delete the buffers 00220 delete this->bufferMcast; 00221 delete this->bufferUcast; 00222 00223 // Delete the timers 00224 delete this->timerPlayMcast; 00225 delete this->timerPlayUcast; 00226 delete this->timerBoot; 00227 00228 // Delete the layers 00229 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00230 delete this->layers[index]; 00231 delete[] this->layers; 00232 } 00233 00234 void CStreamClientPushMulti::Start(CChannel* channel) 00235 { 00236 assert(channel); 00237 00238 // Check the state 00239 assert(STOPPED == this->stateClient); 00240 00241 // Check the timers are stopped 00242 assert(!this->timerPlayMcast->IsSet()); 00243 assert(!this->timerPlayUcast->IsSet()); 00244 assert(!this->timerBoot->IsSet()); 00245 00246 // Set the channel 00247 this->channel = channel; 00248 00249 // Log 00250 #ifdef LOG 00251 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00252 printf("\n\tT = %7.3lf PEER %3u START %3u (%u)", this->sim->Time(), this->address.Address(), this->channel->Id(), this->channel->Type()); 00253 #endif 00254 00255 // Reset statistics 00256 this->statRecvFrames = 0; 00257 this->statDiscardedFrames = 0; 00258 this->statPlayFrames = 0; 00259 this->statSuccessFrames[0] = 0; 00260 this->statSuccessFrames[1] = 0; 00261 this->statSuccessFrames[2] = 0; 00262 this->statFailFrames[0] = 0; 00263 this->statFailFrames[1] = 0; 00264 this->statFailFrames[2] = 0; 00265 00266 this->statTimeClientStart = this->sim->Time(); 00267 this->statTimeRecvStart = -1; 00268 this->statTimePlayStart = -1; 00269 this->statTimeWait = 0; 00270 00271 this->statPlayFirstFrame = 0; 00272 this->statPlayLastFrame = 0; 00273 00274 this->statSyncDelaySum = 0; 00275 this->statSyncDelayCount = 0; 00276 00277 // Check the channel type 00278 switch(this->channel->Type()) 00279 { 00280 case CChannel::CHANNEL_MULTICAST: this->StartMcast(); break; 00281 case CChannel::CHANNEL_UNICAST: this->StartUcast(); break; 00282 } 00283 } 00284 00285 void CStreamClientPushMulti::StartMcast() 00286 { 00287 // Start session with phase 1 00288 this->processFrame = &CStreamClientPushMulti::ProcessFrameMcastFirst; 00289 00290 // Set decoder 00291 this->decoder = this->decoderMcast; 00292 00293 // Reset the decoder 00294 this->decoder->Reset(this->channel->Id()); 00295 00296 // Join the multicast group 00297 (*this->delegateIgmpJoin)(this->channel->Address(), this->entry); 00298 00299 // Change the state 00300 this->stateClient = MCAST_STREAM_FIRST; 00301 } 00302 00303 void CStreamClientPushMulti::StartUcast() 00304 { 00305 // Check the registration state (client must not be registered) 00306 assert(0 == this->layersInfo.NumRegistered()); 00307 00308 // Set decoder 00309 this->decoder = this->decoderUcast; 00310 00311 // Reset the decoder 00312 this->decoder->Reset(this->channel->Id()); 00313 00314 // Start layers 00315 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00316 this->layers[index]->Start(this->channel); 00317 00318 // Change the client state 00319 this->stateClient = UCAST_BOOTSTRAP_REQUEST; 00320 00321 // Go to streaming STEP 1 00322 this->StreamBootstrap(); 00323 } 00324 00325 void CStreamClientPushMulti::Stop() 00326 { 00327 // Check the state 00328 assert(STOPPED != this->stateClient); 00329 00330 // Log 00331 #ifdef LOG 00332 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00333 printf("\n\tT = %7.3lf PEER %3u STOP %3u", this->sim->Time(), this->address.Address(), this->channel->Id()); 00334 #endif 00335 00336 // Stop client 00337 switch(this->channel->Type()) 00338 { 00339 case CChannel::CHANNEL_MULTICAST: this->StopMcast(); break; 00340 case CChannel::CHANNEL_UNICAST: this->StopUcast(); break; 00341 } 00342 00343 // If there are active timer, cancel 00344 if(this->timerPlayMcast->IsSet()) this->timerPlayMcast->Cancel(); 00345 if(this->timerPlayUcast->IsSet()) this->timerPlayUcast->Cancel(); 00346 if(this->timerBoot->IsSet()) this->timerBoot->Cancel(); 00347 00348 // Finalize statistics 00349 this->statTimeFinish = this->sim->Time(); 00350 00351 // Set channel to null 00352 this->channel = NULL; 00353 00354 // Change the state 00355 this->stateClient = STOPPED; 00356 } 00357 00358 void CStreamClientPushMulti::StopMcast() 00359 { 00360 // Leave the multicast group 00361 (*this->delegateIgmpLeave)(this->channel->Address()); 00362 00363 // If the session is waiting, add the waiting time 00364 if(MCAST_STREAM_WAIT == this->stateClient) this->statTimeWait += (this->sim->Time() - this->sessionTimeLastStop); 00365 } 00366 00367 void CStreamClientPushMulti::StopUcast() 00368 { 00369 // Close all receivers 00370 this->StreamClose(); 00371 00372 // Leave the stream 00373 this->StreamLeave(); 00374 00375 // Deregister and stop all layers 00376 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00377 { 00378 // If layer is registered, deregister 00379 if(REGISTERED == this->layers[index]->StateRegister()) this->StreamDeregister(index); 00380 00381 this->layers[index]->Stop(); 00382 } 00383 00384 // Reset layers info 00385 this->layersInfo.Reset(); 00386 00387 // If the session is waiting, add the waiting time 00388 if(UCAST_STREAM_WAIT == this->stateClient) this->statTimeWait += (this->sim->Time() - this->sessionTimeLastStop); 00389 } 00390 00391 void CStreamClientPushMulti::Recv(CAddress srcAddress, CAddress dstAddress, __uint16 srcPort, __uint16 dstPort, CPacket* packet) 00392 { 00393 // Process received packets 00394 if(NULL == packet) return; 00395 if((dstAddress != this->address) && (!dstAddress.IsMulticast())) return; 00396 00397 // Process packet depending on UDP destination 00398 if(this->info->PortStream() == dstPort) this->RecvStream(srcAddress, type_cast<CPacketStream*>(packet)); 00399 if(this->info->PortControl() == dstPort) this->RecvMessage(srcAddress, type_cast<CStreamMessage*>(packet)); 00400 } 00401 00402 void CStreamClientPushMulti::RecvStream(CAddress src, CPacketStream* packet) 00403 { 00404 #ifdef LOG 00405 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00406 { 00407 CConsole::SetColor(CConsole::DARK_GRAY); 00408 printf("\n\tT = %7.3lf RECV STREAM %u --- %u : ", this->sim->Time(), packet->Stream(), src.Address()); 00409 CConsole::SetColor(CConsole::LIGHT_GRAY); 00410 } 00411 #endif 00412 00413 // Send received stream packets to the decoder 00414 CStreamDecoder::EResult result = this->decoder->Decode(src, packet); 00415 00416 #ifdef LOG 00417 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00418 { 00419 CConsole::SetColor(CConsole::LIGHT_RED); 00420 switch(result) 00421 { 00422 case CStreamDecoder::FAIL_PACKET_TYPE: printf("FAIL PACKET TYPE"); break; 00423 case CStreamDecoder::FAIL_STREAM: printf("FAIL STREAM"); break; 00424 } 00425 CConsole::SetColor(CConsole::LIGHT_GRAY); 00426 } 00427 #endif 00428 } 00429 00430 void CStreamClientPushMulti::RecvMessage(CAddress src, CStreamMessage* message) 00431 { 00432 // Process messages 00433 switch(message->MessageType()) 00434 { 00435 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_JOIN: this->RecvMessageJoin(src, type_cast<CStreamMessagePushMultiJoin*>(message)); break; 00436 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_LEAVE: this->RecvMessageLeave(src, type_cast<CStreamMessagePushMultiLeave*>(message)); break; 00437 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_CLOSE: this->RecvMessageClose(src, type_cast<CStreamMessagePushMultiClose*>(message)); break; 00438 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_MULTI_RESPONSE: this->RecvMessageBootResponse(src, type_cast<CStreamMessageBootPushMultiResponse*>(message)); break; 00439 default: assert(0); /* do nothing */ 00440 } 00441 } 00442 00443 void CStreamClientPushMulti::RecvFrameMcast(CAddress src, CStreamFrame frame) 00444 { 00445 #ifdef LOG 00446 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00447 { 00448 CConsole::SetColor(CConsole::LIGHT_CYAN); 00449 printf("\n\tT = %7.3lf RECV FRAME (M) %u", this->sim->Time(), frame.Index()); 00450 CConsole::SetColor(CConsole::LIGHT_GRAY); 00451 } 00452 #endif 00453 00454 // Check the state 00455 if(this->stateClient < MCAST_STREAM_FIRST) 00456 { 00457 this->statDiscardedFrames = 0; 00458 return; 00459 } 00460 00461 // Check the decoder passed a correct frame 00462 if(frame.Stream() != this->channel->Id()) 00463 { 00464 this->statDiscardedFrames = 0; 00465 return; 00466 } 00467 00468 // Process frame 00469 (this->*this->processFrame)(frame); 00470 00471 // Statistics 00472 this->statRecvFrames++; 00473 } 00474 00475 void CStreamClientPushMulti::RecvFrameUcast(CAddress src, CStreamFrame frame) 00476 { 00477 #ifdef LOG 00478 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00479 { 00480 CConsole::SetColor(CConsole::LIGHT_CYAN); 00481 printf("\n\tT = %7.3lf RECV FRAME (U) %u", this->sim->Time(), frame.Index()); 00482 CConsole::SetColor(CConsole::LIGHT_GRAY); 00483 } 00484 #endif 00485 00486 // Check the client state (if not a client state that allows receiving frames, ignore) 00487 if(this->stateClient < UCAST_STREAM_FIRST) 00488 { 00489 this->statDiscardedFrames++; 00490 return; 00491 } 00492 00493 // Check the decoder passed a correct frame 00494 if(frame.Stream() != this->channel->Id()) 00495 { 00496 this->statDiscardedFrames++; 00497 return; 00498 } 00499 00500 // Calculate the layer of the current frame 00501 __uint32 layer = this->bufferUcast->IndexToLayer(frame.Index()); 00502 assert(layer < this->info->NumLayers()); 00503 00504 // Process frame by client (client processing is done first) 00505 assert(this->processFrame); 00506 (this->*this->processFrame)(frame); 00507 00508 // Process frame by layer (layer processing is done last) 00509 assert(this->layers[layer]->ProcessFrame()); 00510 (this->*this->layers[layer]->ProcessFrame())(frame, layer); 00511 00512 // Statistics 00513 this->statRecvFrames++; 00514 } 00515 00516 void CStreamClientPushMulti::ProcessFrameMcastFirst(CStreamFrame frame) 00517 { 00518 // PLAYBACK Phase 1 : Process first frame to initialize playback buffer 00519 00520 #ifdef LOG 00521 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00522 printf("\t\t[MCAST FIRST]"); 00523 #endif 00524 00525 // Check the state 00526 assert(MCAST_STREAM_FIRST == this->stateClient); 00527 00528 // Reset the buffer 00529 this->bufferMcast->Reset(frame.Index()); 00530 00531 // Add the frame to the buffer 00532 this->bufferMcast->Add(frame); 00533 00534 // Statistics 00535 this->statTimeRecvStart = this->sim->Time(); 00536 00537 // Switch to process buffering frames (phase 2) 00538 this->processFrame = &CStreamClientPushMulti::ProcessFrameMcastBuffering; 00539 00540 // Change the state 00541 this->stateClient = MCAST_STREAM_BUFFERING; 00542 } 00543 00544 void CStreamClientPushMulti::ProcessFrameMcastBuffering(CStreamFrame frame) 00545 { 00546 // PLAYBACK Phase 2 : Buffering phase (playback is not started and frames are saved in the buffer) 00547 00548 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00549 // buffer, plus the buffering size 00550 00551 #ifdef LOG 00552 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00553 printf("\t\t[MCAST BUFFERING]"); 00554 #endif 00555 00556 // Check the state 00557 assert(MCAST_STREAM_BUFFERING == this->stateClient); 00558 00559 // Add the frame to the buffer 00560 this->bufferMcast->Add(frame); 00561 00562 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00563 if(frame.Index() >= this->bufferMcast->CurrentIndex() + this->info->StreamBufferMcastSizeBuffering()) 00564 { 00565 // Find the first independent frame 00566 for(__uint32 count = 0; count < this->bufferMcast->Count(); count++) 00567 { 00568 if(this->bufferMcast->IsCurrentIndependent()) 00569 { 00570 // First frame found : start playback 00571 00572 // Start playback : save session and stat information 00573 this->sessionFrameLastStart = this->bufferMcast->CurrentIndex(); 00574 this->sessionTimeLastStart = this->sim->Time(); 00575 00576 this->statPlayFirstFrame = this->bufferMcast->CurrentIndex(); 00577 this->statTimePlayStart = this->sim->Time(); 00578 00579 // Switch to process playback frames (phase 3) 00580 this->processFrame = &CStreamClientPushMulti::ProcessFrameMcastPlay; 00581 00582 // Change the state 00583 this->stateClient = MCAST_STREAM_PLAY; 00584 00585 // Call the play timer 00586 this->TimerPlayMcast(NULL); 00587 00588 break; 00589 } 00590 else this->bufferMcast->Shift(); 00591 } 00592 } 00593 } 00594 00595 void CStreamClientPushMulti::ProcessFrameMcastPlay(CStreamFrame frame) 00596 { 00597 #ifdef LOG 00598 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00599 printf("\t\t[MCAST PLAY]"); 00600 #endif 00601 00602 // PLAYBACK Phase 3 : Play phase 00603 assert(MCAST_STREAM_PLAY == this->stateClient); 00604 00605 // Add the frame to the buffer 00606 this->bufferMcast->Add(frame); 00607 } 00608 00609 void CStreamClientPushMulti::ProcessFrameMcastWait(CStreamFrame frame) 00610 { 00611 // PLAYBACK Wait : Buffering phase after a buffer underrun (playback is not started and frames are saved in the buffer) 00612 00613 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00614 // buffer, plus the buffering size 00615 00616 #ifdef LOG 00617 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00618 printf("\t\t[MCAST WAIT]"); 00619 #endif 00620 00621 // Check the state 00622 assert(MCAST_STREAM_WAIT == this->stateClient); 00623 00624 // Add the frame to the buffer 00625 this->bufferMcast->Add(frame); 00626 00627 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00628 if(frame.Index() >= this->bufferMcast->CurrentIndex() + this->info->StreamBufferMcastSizeBuffering()) 00629 { 00630 // Find the first frame 00631 for(__uint32 count = 0; count < this->bufferMcast->Count(); count++) 00632 { 00633 if(this->bufferMcast->HasCurrentIndex()) 00634 { 00635 // Check the last stop 00636 assert(this->sim->Time() >= this->sessionTimeLastStop); 00637 00638 // Save statistics 00639 this->statTimeWait += this->sim->Time() - this->sessionTimeLastStop; 00640 00641 // Restart playback 00642 this->sessionFrameLastStart = this->bufferMcast->CurrentIndex(); 00643 this->sessionTimeLastStart = this->sim->Time(); 00644 00645 // Switch to process playback frames (phase 3) 00646 this->processFrame = &CStreamClientPushMulti::ProcessFrameMcastPlay; 00647 00648 // Change the state 00649 this->stateClient = MCAST_STREAM_PLAY; 00650 00651 // Call the play timer 00652 this->TimerPlayMcast(NULL); 00653 00654 break; 00655 } 00656 else 00657 { 00658 // Increment the number of played frames 00659 this->statPlayFrames++; 00660 00661 // Playback sync statistics 00662 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferMcast->CurrentIndex()); 00663 this->statSyncDelayCount++; 00664 00665 // Mark the frame as failed 00666 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->bufferMcast->CurrentIndex()) < 3); 00667 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferMcast->CurrentIndex())]++; 00668 00669 // Shift the buffer 00670 this->bufferMcast->Shift(); 00671 } 00672 } 00673 } 00674 } 00675 00676 void CStreamClientPushMulti::ProcessFrameUcastFirst(CStreamFrame frame) 00677 { 00678 // PROCESS Phase 1 : Process first frame to initialize playback buffer 00679 00680 #ifdef LOG 00681 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00682 printf("\t\t[UCAST FIRST]"); 00683 #endif 00684 00685 // Check the client state 00686 assert(UCAST_STREAM_FIRST == this->stateClient); 00687 00688 // Reset the buffer 00689 this->bufferUcast->Reset(frame.Index()); 00690 00691 // Calculate the layer 00692 __uint32 layer = this->bufferUcast->IndexToLayer(frame.Index()); 00693 00694 // Add the frame to the buffer and if the frame is new send it to the receivers 00695 if(this->bufferUcast->Add(layer, frame)) this->SendStream(frame, layer); 00696 00697 // Statistics 00698 this->statTimeRecvStart = this->sim->Time(); 00699 00700 // Switch to process buffering frames (phase 2) 00701 this->processFrame = &CStreamClientPushMulti::ProcessFrameUcastBuffering; 00702 00703 // Change the client state 00704 this->stateClient = UCAST_STREAM_BUFFERING; 00705 } 00706 00707 void CStreamClientPushMulti::ProcessFrameUcastBuffering(CStreamFrame frame) 00708 { 00709 // PLAYBACK Phase 2 : Buffering phase (playback is not started and frames are saved in the buffer) 00710 00711 // Buffering phase finishes when the minimum number of layers is in the ready state 00712 00713 #ifdef LOG 00714 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00715 printf("\n\t\t[UCAST BUFFERING]\t(frame_index=%u layers_ready=%u layers_min=%u)", frame.Index(), this->layersInfo.NumReady(), this->info->NumLayersMin()); 00716 #endif 00717 00718 // Check the state 00719 assert(UCAST_STREAM_BUFFERING == this->stateClient); 00720 00721 // If the minimum number of layers is in the READY state 00722 if(this->layersInfo.NumReady() >= this->info->NumLayersMin()) 00723 { 00724 // Find the first independent frame 00725 for(__uint32 count = 0; count < this->bufferUcast->Size(); count++) 00726 { 00727 // If the current frame is independent 00728 if(this->bufferUcast->IsCurrentIndependent()) 00729 { 00730 // First frame found : start playback 00731 00732 // Start playback : save session and stat information 00733 this->sessionFrameLastStart = this->bufferUcast->CurrentIndex(); 00734 this->sessionTimeLastStart = this->sim->Time(); 00735 00736 this->statPlayFirstFrame = this->bufferUcast->CurrentIndex(); 00737 this->statTimePlayStart = this->sim->Time(); 00738 00739 // Switch to process playback frames (phase 3) 00740 this->processFrame = &CStreamClientPushMulti::ProcessFrameUcastPlay; 00741 00742 // Change the state 00743 this->stateClient = UCAST_STREAM_PLAY; 00744 00745 // Call the play timer 00746 this->TimerPlayUcast(NULL); 00747 00748 break; 00749 } 00750 else this->bufferUcast->Shift(); 00751 } 00752 } 00753 } 00754 00755 void CStreamClientPushMulti::ProcessFrameUcastPlay(CStreamFrame frame) 00756 { 00757 #ifdef LOG 00758 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00759 printf("\t\t[UCAST PLAY]"); 00760 #endif 00761 00762 // PLAYBACK Phase 3 : Play phase 00763 assert(UCAST_STREAM_PLAY == this->stateClient); 00764 00765 // Do nothing 00766 } 00767 00768 void CStreamClientPushMulti::ProcessFrameUcastWait(CStreamFrame frame) 00769 { 00770 // PLAYBACK Wait : Buffering phase after a buffer underrun (playback is not started and frames are saved in the buffer) 00771 // Buffering phase finishes when there is a minimum number of layers in the READY state 00772 00773 #ifdef LOG 00774 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00775 printf("\t\t[UCAST WAIT]"); 00776 #endif 00777 00778 // Check the state 00779 assert(UCAST_STREAM_WAIT == this->stateClient); 00780 00781 // Calculate layer 00782 __uint32 layer = this->bufferUcast->IndexToLayer(frame.Index()); 00783 00784 // Check if the minimum number of layers is in the READY state 00785 if(this->layersInfo.NumReady() >= this->info->NumLayersMin()) 00786 { 00787 // Find the first frame 00788 for(__uint32 count = 0; count < this->bufferUcast->Count(layer); count++) 00789 { 00790 if(this->bufferUcast->HasCurrentIndex()) 00791 { 00792 // Check the last stop 00793 assert(this->sim->Time() >= this->sessionTimeLastStop); 00794 00795 #ifdef LOG 00796 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00797 { 00798 CConsole::SetColor(CConsole::LIGHT_YELLOW); 00799 printf("\n\tT = %7.3lf WAIT : %7.3lf (%7.3lf - %7.3lf)", this->sim->Time(), this->sim->Time() - this->sessionTimeLastStop, this->sessionTimeLastStop, this->sim->Time()); 00800 CConsole::SetColor(CConsole::LIGHT_GRAY); 00801 } 00802 #endif 00803 // Save statistics 00804 this->statTimeWait += this->sim->Time() - this->sessionTimeLastStop; 00805 00806 // Restart playback 00807 this->sessionFrameLastStart = this->bufferUcast->CurrentIndex(); 00808 this->sessionTimeLastStart = this->sim->Time(); 00809 00810 // Switch to process playback frames (phase 3) 00811 this->processFrame = &CStreamClientPushMulti::ProcessFrameUcastPlay; 00812 00813 // Change the state 00814 this->stateClient = UCAST_STREAM_PLAY; 00815 00816 // Call the play timer 00817 this->TimerPlayUcast(NULL); 00818 00819 break; 00820 } 00821 else 00822 { 00823 // Increment the number of played frames 00824 this->statPlayFrames++; 00825 00826 // Playback sync statistics 00827 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferUcast->CurrentIndex()); 00828 this->statSyncDelayCount++; 00829 00830 // Mark the frame as failed 00831 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndex()) < 3); 00832 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndex())]++; 00833 00834 #ifdef LOG 00835 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00836 { 00837 CConsole::SetColor(CConsole::LIGHT_RED); 00838 printf("\n\t\tT = %7.3lf PLAY FAIL %6u\t%c : %s", this->sim->Time(), this->bufferUcast->CurrentIndex(), stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndex())], 00839 this->bufferUcast->HasCurrentIndex()?"DECODE_FAIL":"MISSING"); 00840 CConsole::SetColor(CConsole::LIGHT_GRAY); 00841 } 00842 #endif 00843 // Shift the buffer 00844 this->bufferUcast->Shift(); 00845 } 00846 } 00847 } 00848 } 00849 00850 void CStreamClientPushMulti::ProcessFrameUcastReset(CStreamFrame frame) 00851 { 00852 // PLAYBACK Reset : Reset the buffer after a long buffer underrun (playback is not started and frames are saved in the buffer) 00853 00854 #ifdef LOG 00855 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00856 printf("\t\t[UCAST RESET]"); 00857 #endif 00858 00859 // After the buffer is reset by the first frame, switch to wait state and wait for the buffer to fill again 00860 // Check the state 00861 assert(UCAST_STREAM_RESET == this->stateClient); 00862 00863 // Reset the buffer 00864 this->bufferUcast->Reset(frame.Index()); 00865 00866 // Switch all layers to the buffering state 00867 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00868 { 00869 // If the layer is not in buffering state, switch to buffering state 00870 if(this->layers[index]->StateLayer() == LAYER_READY) this->layers[index]->Buffering(); 00871 this->layers[index]->Process(&CStreamClientPushMulti::ProcessFrameUcastLayerBuffering); 00872 00873 // If the layer is registered, deregister 00874 if(this->layers[index]->StateRegister() == REGISTERED) this->layers[index]->Deregister(); 00875 00876 // If there is a registration timer set for this layer, cancel 00877 if(this->layers[index]->TimerRegister()->IsSet()) this->layers[index]->TimerRegister()->Cancel(); 00878 00879 // If there is a buffering timer set for this layer, cancel 00880 if(this->layers[index]->TimerBuffer()->IsSet()) this->layers[index]->TimerBuffer()->Cancel(); 00881 } 00882 00883 // Switch to the wait state 00884 this->processFrame = &CStreamClientPushMulti::ProcessFrameUcastWait; 00885 00886 // Change the state 00887 this->stateClient = UCAST_STREAM_WAIT; 00888 } 00889 00890 void CStreamClientPushMulti::ProcessFrameUcastLayerBuffering(CStreamFrame frame, __uint32 layer) 00891 { 00892 // PROCESS LAYER FRAME Phase 1 : Buffering phase (playback is not started and frames are saved in the buffer) 00893 00894 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00895 // buffer, plus the buffering size 00896 00897 // Check the state 00898 assert(LAYER_BUFFERING == this->layers[layer]->StateLayer()); 00899 00900 #ifdef LOG 00901 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00902 printf("\n\t\t[UCAST LAYER BUFFERING]\t(frame_index=%u layer=%u frame_layer_index=%u)", frame.Index(), layer, this->bufferUcast->IndexToLayerIndex(frame.Index())); 00903 #endif 00904 00905 // Add the frame to the buffer 00906 if(this->bufferUcast->Add(layer, frame)) 00907 { 00908 // If the frame was added to the buffer 00909 00910 #ifdef LOG 00911 CConsole::SetColor(CConsole::LIGHT_GREEN); 00912 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00913 printf("\t\tBUFFER HIT"); 00914 CConsole::SetColor(CConsole::LIGHT_GRAY); 00915 #endif 00916 00917 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00918 if(this->bufferUcast->IndexToLayerIndex(frame.Index()) >= this->bufferUcast->LayerIndex(layer) + this->info->StreamBufferUcastSizeBuffering() - 1) 00919 { 00920 #ifdef LOG 00921 CConsole::SetColor(CConsole::LIGHT_GREEN); 00922 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00923 printf("\tREADY : YES (frame_layer_index=%u buffer_layer_index=%u buffering=%u)", 00924 this->bufferUcast->IndexToLayerIndex(frame.Index()), this->bufferUcast->LayerIndex(layer), this->info->StreamBufferUcastSizeBuffering() 00925 ); 00926 CConsole::SetColor(CConsole::LIGHT_GRAY); 00927 #endif 00928 00929 // Change the layer into the ready state 00930 this->layers[layer]->Ready(); 00931 00932 // Switch to process buffering frames for this layer (phase 3) 00933 this->layers[layer]->Process(&CStreamClientPushMulti::ProcessFrameUcastLayerReady); 00934 00935 // If the buffering timer is active, cancel the timer 00936 if(this->layers[layer]->TimerBuffer()->IsSet()) this->layers[layer]->TimerBuffer()->Cancel(); 00937 00938 // Set the registration timer for this layer 00939 if(this->layers[layer]->TimerRegister()->IsSet()) this->layers[layer]->TimerRegister()->Cancel(); 00940 00941 assert(this->layers[layer]->StateRegister() == NOT_REGISTERED); 00942 this->layers[layer]->TimerRegister()->SetAfter(this->info->BootRegisterDelay(), this->layers[layer]); 00943 } 00944 else 00945 { 00946 #ifdef LOG 00947 CConsole::SetColor(CConsole::LIGHT_RED); 00948 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00949 printf("\tREADY : NO (frame_layer_index=%u buffer_layer_index=%u buffering=%u)", 00950 this->bufferUcast->IndexToLayerIndex(frame.Index()), this->bufferUcast->LayerIndex(layer), this->info->StreamBufferUcastSizeBuffering() 00951 ); 00952 CConsole::SetColor(CConsole::LIGHT_GRAY); 00953 #endif 00954 } 00955 00956 this->SendStream(frame, layer); 00957 } 00958 else 00959 { 00960 #ifdef LOG 00961 CConsole::SetColor(CConsole::LIGHT_RED); 00962 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00963 printf("\t\tBUFFER MISS"); 00964 CConsole::SetColor(CConsole::LIGHT_GRAY); 00965 #endif 00966 } 00967 } 00968 00969 void CStreamClientPushMulti::ProcessFrameUcastLayerReady(CStreamFrame frame, __uint32 layer) 00970 { 00971 // PLAYBACK Phase 2 : Play phase 00972 00973 #ifdef LOG 00974 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00975 printf("\n\t\t[UCAST LAYER READY]\t(frame_index=%u layer=%u frame_layer_index=%u)", frame.Index(), layer, this->bufferUcast->IndexToLayerIndex(frame.Index())); 00976 #endif 00977 00978 // Add frame to the buffer 00979 if(this->bufferUcast->Add(layer, frame)) 00980 { 00981 #ifdef LOG 00982 CConsole::SetColor(CConsole::LIGHT_GREEN); 00983 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00984 printf("\t\tBUFFER HIT"); 00985 CConsole::SetColor(CConsole::LIGHT_GRAY); 00986 #endif 00987 00988 this->SendStream(frame, layer); 00989 } 00990 else 00991 { 00992 #ifdef LOG 00993 CConsole::SetColor(CConsole::LIGHT_RED); 00994 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00995 printf("\t\tBUFFER MISS"); 00996 CConsole::SetColor(CConsole::LIGHT_GRAY); 00997 #endif 00998 } 00999 } 01000 01001 void CStreamClientPushMulti::StreamBootstrap() 01002 { 01003 // Streaming STEP 1 : Bootstraping (connect to bootstrap server and retrieve list of neighbors for all layers) 01004 01005 #ifdef LOG 01006 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01007 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP REQUEST %3u (STREAM)", this->sim->Time(), this->address.Address(), this->channel->Id()); 01008 #endif 01009 01010 // Check the state 01011 assert(UCAST_BOOTSTRAP_REQUEST == this->stateClient); 01012 01013 // Send a boot push query message to the bootstrap server (the channel address) 01014 CStreamMessageBootPushMultiRequest* message = new CStreamMessageBootPushMultiRequest( 01015 this->channel->Id(), 01016 this->info->BootQueryMax() 01017 ); 01018 01019 // Send the message 01020 (*this->delegateSendMessage)(this->channel->Address(), message); 01021 01022 // Activate the timer to resend the query after a timeout period 01023 this->timerBoot->SetAfter(this->info->BootQueryTimeout()); 01024 01025 // Change the state 01026 this->stateClient = UCAST_BOOTSTRAP_RESPONSE; 01027 } 01028 01029 void CStreamClientPushMulti::StreamJoin() 01030 { 01031 // Streaming STEP 2a : Joining (start streaming from the senders) 01032 01033 // Check the state 01034 assert(UCAST_JOIN_REQUEST == this->stateClient); 01035 01036 if(NULL == this->channel) return; 01037 01038 // Send a join request to the senders for all layers 01039 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 01040 { 01041 this->StreamJoin(layer); 01042 01043 // Get ready to receive the stream : start streaming session with phase 1 01044 this->layers[layer]->Process(&CStreamClientPushMulti::ProcessFrameUcastLayerBuffering); 01045 01046 // Check the layer state is buffering 01047 assert(this->layers[layer]->StateLayer() == LAYER_BUFFERING); 01048 } 01049 01050 // Set the processing function to first frame 01051 this->processFrame = &CStreamClientPushMulti::ProcessFrameUcastFirst; 01052 01053 // Change the state 01054 this->stateClient = UCAST_STREAM_FIRST; 01055 } 01056 01057 void CStreamClientPushMulti::StreamJoin(__uint32 layer) 01058 { 01059 #ifdef LOG 01060 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01061 printf("\n\tT = %7.3lf PEER %3u JOIN %3u/%u -> %u", this->sim->Time(), this->address.Address(), this->channel->Id(), layer, this->layers[layer]->Sender().Address()); 01062 #endif 01063 01064 CStreamMessagePushMultiJoin* message = new CStreamMessagePushMultiJoin(this->channel->Id(), layer); 01065 (*this->delegateSendMessage)(this->layers[layer]->Sender(), message); 01066 } 01067 01068 void CStreamClientPushMulti::StreamLeave() 01069 { 01070 // Streaming STEP 3a : Leave (inform the senders) 01071 01072 // Send a leave to the sender for each layer 01073 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 01074 { 01075 this->StreamLeave(layer); 01076 } 01077 } 01078 01079 void CStreamClientPushMulti::StreamLeave(__uint32 layer) 01080 { 01081 // Streaming buffer underrun for this layer : Leave 01082 CStreamMessagePushMultiLeave* message = new CStreamMessagePushMultiLeave(this->channel->Id(), layer); 01083 (*this->delegateSendMessage)(this->layers[layer]->Sender(), message); 01084 } 01085 01086 void CStreamClientPushMulti::StreamRegister(__uint32 layer) 01087 { 01088 // Streaming STEP 2b : Register (register to bootstrap server) 01089 01090 // Check the client state (client must start playing in order to register) 01091 assert(this->stateClient >= UCAST_STREAM_FIRST); 01092 01093 // Check the layer state (layer must be in ready state in order to register) 01094 if(this->layers[layer]->StateLayer() != LAYER_READY) return; 01095 01096 // Check the registration state 01097 if(this->layers[layer]->StateRegister() != NOT_REGISTERED) return; 01098 01099 CStreamMessageBootPushMultiRegister* message = new CStreamMessageBootPushMultiRegister( 01100 this->channel->Id(), 01101 layer, 01102 this->address, 01103 this->layers[layer]->Sender()); 01104 (*this->delegateSendMessage)(this->channel->Address(), message); 01105 01106 // Change the registration state 01107 this->layers[layer]->Register(); 01108 } 01109 01110 void CStreamClientPushMulti::StreamDeregister(__uint32 layer) 01111 { 01112 // Check the resgistration state 01113 assert(this->layers[layer]->StateRegister() == REGISTERED); 01114 01115 // Streaming STEP 3b : Deregister (deregister from bootstrap server) 01116 CStreamMessageBootPushMultiDeregister* message = new CStreamMessageBootPushMultiDeregister( 01117 this->channel->Id(), 01118 layer, 01119 this->address); 01120 (*this->delegateSendMessage)(this->channel->Address(), message); 01121 01122 // Change the registration state 01123 this->layers[layer]->Deregister(); 01124 } 01125 01126 void CStreamClientPushMulti::StreamClose() 01127 { 01128 // Streaming STEP ASYNC : Close (sent to receivers upon leaving or when receiving a request that cannot be served) 01129 01130 // Send the close to all layers / all receivers 01131 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 01132 { 01133 for(set<CAddress>::iterator iter = this->layers[layer]->Receivers()->begin(); iter != this->layers[layer]->Receivers()->end(); iter++) 01134 { 01135 CStreamMessagePushMultiClose* message = new CStreamMessagePushMultiClose(this->channel->Id(), layer); 01136 (*this->delegateSendMessage)(*iter, message); 01137 01138 #ifdef LOG 01139 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01140 printf("\n\tT = %7.3lf PEER %3u CLOSE RECEIVER %3u", this->sim->Time(), this->address.Address(), ((CAddress)(*iter)).Address()); 01141 #endif 01142 } 01143 01144 // Clear the receivers list 01145 this->layers[layer]->Receivers()->clear(); 01146 } 01147 01148 // Set the used bandwidth to zero 01149 this->bwUsed = 0; 01150 } 01151 01152 void CStreamClientPushMulti::SendStream(CStreamFrame frame, __uint32 layer) 01153 { 01154 // Check the client state 01155 assert(this->stateClient >= UCAST_STREAM_FIRST); 01156 01157 #ifdef LOG 01158 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01159 { 01160 CConsole::SetColor(CConsole::DARK_GRAY); 01161 printf("\n\tT = %7.3lf SEND FRAME (U) %u : [ ", this->sim->Time(), frame.Index()); 01162 } 01163 #endif 01164 01165 for(set<CAddress>::iterator iter = this->layers[layer]->Receivers()->begin(); iter != this->layers[layer]->Receivers()->end(); iter++) 01166 { 01167 // Send frame to the encoder 01168 this->encoder->Encode(*iter, frame); 01169 01170 #ifdef LOG 01171 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01172 printf("%u ", ((CAddress)(*iter)).Address()); 01173 #endif 01174 } 01175 01176 #ifdef LOG 01177 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01178 { 01179 printf("]"); 01180 CConsole::SetColor(CConsole::LIGHT_GRAY); 01181 } 01182 #endif 01183 } 01184 01185 void CStreamClientPushMulti::RecvMessageJoin(CAddress src, CStreamMessagePushMultiJoin* message) 01186 { 01187 // Process received JOIN 01188 01189 // Check whether the client can serve the channel 01190 if(NULL == this->channel) { this->RejectJoin(src, message); return; } 01191 if(this->channel->Id() != message->Stream()) { this->RejectJoin(src, message); return; } 01192 01193 // Check whether the client is registered 01194 if(REGISTERED != this->layers[message->Layer()]->StateRegister()) { this->RejectJoin(src, message); return; } 01195 01196 #ifdef LOG 01197 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01198 printf("\n\tT = %7.3lf PEER %3u RECV JOIN %u : ", this->sim->Time(), this->address.Address(), src.Address()); 01199 #endif 01200 01201 // Check whether the peer would have bandwidth to serve the channel considering the current usage 01202 __bitrate bwLayer = this->channel->Bw() / this->info->NumLayers(); 01203 if(this->bw - this->bwUsed < this->info->StreamBwMargin() * bwLayer) 01204 { 01205 01206 #ifdef LOG 01207 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01208 { 01209 CConsole::SetColor(CConsole::LIGHT_RED); 01210 printf("REJECT"); 01211 CConsole::SetColor(CConsole::LIGHT_GRAY); 01212 } 01213 #endif 01214 01215 // Reject the request 01216 this->RejectJoin(src, message); 01217 // Deregister 01218 this->StreamDeregister(message->Layer()); 01219 return; 01220 } 01221 01222 #ifdef LOG 01223 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01224 { 01225 CConsole::SetColor(CConsole::LIGHT_GREEN); 01226 printf("ACCEPT"); 01227 CConsole::SetColor(CConsole::LIGHT_GRAY); 01228 } 01229 #endif 01230 01231 // Acceptance conditions met : add the host to the receivers list 01232 this->layers[message->Layer()]->Receivers()->insert(src); 01233 01234 // Estimate the increase in bandwidth usage 01235 this->bwUsed += bwLayer; 01236 } 01237 01238 void CStreamClientPushMulti::RecvMessageLeave(CAddress src, CStreamMessagePushMultiLeave* message) 01239 { 01240 // Process received LEAVE 01241 01242 // Check whether the peer is serving the channel (may be a delayed message) 01243 if(NULL == this->channel) return; 01244 if(this->channel->Id() != message->Stream()) return; 01245 01246 // Check if the host is in the receivers list 01247 if(this->layers[message->Layer()]->Receivers()->find(src) != this->layers[message->Layer()]->Receivers()->end()) 01248 { 01249 // Remove the host from the receivers list 01250 this->layers[message->Layer()]->Receivers()->erase(src); 01251 01252 // Estimate the decrease in bandwidth usage 01253 this->bwUsed -= this->channel->Bw() / this->info->NumLayers(); 01254 } 01255 } 01256 01257 void CStreamClientPushMulti::RecvMessageClose(CAddress src, CStreamMessagePushMultiClose* message) 01258 { 01259 // Process received CLOSE 01260 01261 #ifdef LOG 01262 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01263 printf("\n\tT = %7.3lf PEER %3u CLOSE %3u <- %u", this->sim->Time(), this->address.Address(), this->channel->Id(), src.Address()); 01264 #endif 01265 01266 // Check the close is for current channel 01267 if(NULL == this->channel) return; 01268 if(this->channel->Id() != message->Stream()) return; 01269 01270 // Check the close has been sent by the current sender (otherwise, ignore) 01271 if(src != this->layers[message->Layer()]->Sender()) return; 01272 01273 // Check the state of the client (if the close message is delayed) 01274 if(this->stateClient < UCAST_STREAM_FIRST) return; 01275 01276 // If more neighbors are available 01277 if(this->layers[message->Layer()]->Neighbors()->size() > 0) 01278 { 01279 // Select a new random neighbor as a receiver for this layer 01280 __uint32 index = CRand::Generate(this->layers[message->Layer()]->Neighbors()->size()); 01281 assert(index < this->layers[message->Layer()]->Neighbors()->size()); 01282 01283 __uint32 idx = 0; 01284 for(set<CAddress>::iterator iter = this->layers[message->Layer()]->Neighbors()->begin(); iter != this->layers[message->Layer()]->Neighbors()->end(); iter++, idx++) 01285 { 01286 if(idx == index) 01287 { 01288 this->layers[message->Layer()]->Sender(*iter); // Set the sender 01289 break; 01290 } 01291 } 01292 01293 // Remove the sender from the neighbors list 01294 this->layers[message->Layer()]->Neighbors()->erase(this->layers[message->Layer()]->Sender()); 01295 01296 // If the number of remaining neighbors is less than the threshold, refresh the list of neighbors 01297 if(this->layers[message->Layer()]->Neighbors()->size() <= this->info->BootRefreshThreshold()) 01298 { 01299 #ifdef LOG 01300 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01301 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP RE-REQUEST %3u:%u (%u neighbors remaining / %u threshold)", this->sim->Time(), this->address.Address(), this->channel->Id(), message->Layer(), this->layers[message->Layer()]->Neighbors()->size(), this->info->BootRefreshThreshold()); 01302 #endif 01303 // Send a boot push query message to the bootstrap server (the channel address) 01304 CStreamMessageBootPushMultiRequest* request = new CStreamMessageBootPushMultiRequest( 01305 this->channel->Id(), 01306 message->Layer(), 01307 this->info->BootQueryMax() 01308 ); 01309 (*this->delegateSendMessage)(this->channel->Address(), request); 01310 } 01311 } 01312 // Else, use the server 01313 else 01314 { 01315 this->layers[message->Layer()]->Sender(this->channel->Address()); 01316 } 01317 01318 #ifdef LOG 01319 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01320 printf("\n\tT = %7.3lf PEER %3u JOIN %3u:%u -> %u", this->sim->Time(), this->address.Address(), this->channel->Id(), message->Layer(), this->layers[message->Layer()]->Sender().Address()); 01321 #endif 01322 01323 // Send a JOIN to the new sender 01324 CStreamMessagePushMultiJoin* reply = new CStreamMessagePushMultiJoin(this->channel->Id(), message->Layer()); 01325 (*this->delegateSendMessage)(this->layers[message->Layer()]->Sender(), reply); 01326 } 01327 01328 void CStreamClientPushMulti::RecvMessageBootResponse(CAddress src, CStreamMessageBootPushMultiResponse* message) 01329 { 01330 // Process received BOOT RESPONSE 01331 01332 // If no current channel, do nothing (delayed response) 01333 if(NULL == this->channel) return; 01334 01335 // If the message is not for this channel, do nothing (delayed response) 01336 if(message->Stream() != this->channel->Id()) return; 01337 01338 // If the response is for a global bootstrap request 01339 if(message->ResponseType() == CStreamMessageBootPushMultiResponse::STREAM) 01340 { 01341 // If the boot timer is set, cancel (layer bootstrap requests do not have boot timers) 01342 if(this->timerBoot->IsSet()) this->timerBoot->Cancel(); 01343 } 01344 01345 // Process the response, depending on the client state 01346 if(UCAST_BOOTSTRAP_RESPONSE == this->stateClient) 01347 { 01348 // CASE 1 : Bootstrap request sent at the beginning of the session; this populates the neighbor 01349 // list and changes the state of the client 01350 01351 // Check the response is for the stream 01352 assert(message->ResponseType() == CStreamMessageBootPushMultiResponse::STREAM); 01353 01354 #ifdef LOG 01355 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01356 { 01357 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP RESPONSE %3u (STREAM) : ", this->sim->Time(), this->address.Address(), this->channel->Id()); 01358 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 01359 { 01360 printf("\n\t\t\tLayer %u : ", layer); 01361 for(__uint32 host = 0; host < message->Count(layer); host++) printf("%u ", message->Host(layer, host).Address()); 01362 } 01363 } 01364 #endif 01365 01366 // Process the response for each layer 01367 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 01368 { 01369 if(message->Count(layer) > 0) 01370 { 01371 // If there are neighbors available (otherwise, the server is used by default) 01372 01373 // Choose a random neighbor to the the sender for this layer 01374 __uint32 sender = CRand::Generate(message->Count(layer)); 01375 assert(sender < message->Count(layer)); 01376 01377 // Save the neighbors 01378 for(__uint32 host = 0; host < sender; host++) 01379 this->layers[layer]->Neighbors()->insert(message->Host(layer, host)); 01380 for(__uint32 host = sender+1; host < message->Count(layer); host++) 01381 this->layers[layer]->Neighbors()->insert(message->Host(layer, host)); 01382 01383 // Save the sender 01384 this->layers[layer]->Sender(message->Host(layer, sender)); 01385 } 01386 } 01387 01388 // Change the state 01389 this->stateClient = UCAST_JOIN_REQUEST; 01390 01391 // Go to streaming STEP 2a 01392 this->StreamJoin(); 01393 } 01394 else 01395 { 01396 // CASE 2 : Refresh bootstrap request sent during a session; this refreshes the neighbor 01397 // list but does not change the state of the client or the sender 01398 01399 // Check the response is for one layer only 01400 assert(message->ResponseType() == CStreamMessageBootPushMultiResponse::LAYER); 01401 01402 // Process the response 01403 // Clear the neighbors list 01404 this->layers[message->Layer()]->Neighbors()->clear(); 01405 01406 // Save the neighbors 01407 for(__uint32 host = 0; host < message->Count(0); host++) 01408 this->layers[message->Layer()]->Neighbors()->insert(message->Host(0, host)); 01409 } 01410 } 01411 01412 void CStreamClientPushMulti::RejectJoin(CAddress src, CStreamMessagePushMultiJoin* message) 01413 { 01414 // Reject JOIN request : send a CLOSE message to the sender 01415 CStreamMessagePushMultiClose* reply = new CStreamMessagePushMultiClose(message->Stream(), message->Layer()); 01416 (*this->delegateSendMessage)(src, reply); 01417 } 01418 01419 void CStreamClientPushMulti::TimerPlayMcast(CTimerInfo* info) 01420 { 01421 assert(MCAST_STREAM_PLAY == this->stateClient); 01422 01423 // Check if there is a buffer underrun 01424 if(this->bufferMcast->NumPlay() == 0) 01425 { 01426 // Change the state 01427 this->stateClient = MCAST_STREAM_WAIT; 01428 01429 // Pause playback 01430 this->sessionTimeLastStop = this->sim->Time(); 01431 01432 // Switch to process wait frames (phase 3) 01433 this->processFrame = &CStreamClientPushMulti::ProcessFrameMcastWait; 01434 01435 return; 01436 } 01437 01438 // Play the first frame from the buffer 01439 this->statPlayFrames++; 01440 01441 // Playback sync statistics 01442 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferMcast->CurrentIndex()); 01443 this->statSyncDelayCount++; 01444 01445 if(this->bufferMcast->IsCurrentDecodable()) 01446 { 01447 this->statPlayLastFrame = this->bufferMcast->CurrentIndex(); 01448 assert(this->bufferMcast->CurrentFrame()->Type() < 3); 01449 this->statSuccessFrames[this->bufferMcast->CurrentFrame()->Type()]++; 01450 } 01451 else 01452 { 01453 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->bufferMcast->CurrentIndex()) < 3); 01454 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferMcast->CurrentIndex())]++; 01455 } 01456 01457 // Shift the buffer 01458 this->bufferMcast->Shift(); 01459 01460 // Schedule next play event 01461 this->timerPlayMcast->SetAt(this->sessionTimeLastStart + ((__time)(this->bufferMcast->CurrentIndex() - this->sessionFrameLastStart)) / (__time)this->channel->Fps()); 01462 } 01463 01464 void CStreamClientPushMulti::TimerPlayUcast(CTimerInfo* info) 01465 { 01466 assert(UCAST_STREAM_PLAY == this->stateClient); 01467 01468 // Calculate the layer of the current frame 01469 __uint32 layer = this->bufferUcast->IndexToLayer(this->bufferUcast->CurrentIndex()); 01470 01471 // If the current layer is READY, check if there is a buffer underrun 01472 if((this->layers[layer]->StateLayer() == LAYER_READY) && (this->bufferUcast->NumPlay(layer) == 0)) 01473 { 01474 // Change the layer state to buffering 01475 assert(this->layers[layer]->StateLayer() == LAYER_READY); 01476 this->layers[layer]->Buffering(); 01477 01478 // Switch to process buffering frames (phase 2) 01479 this->layers[layer]->Process(&CStreamClientPushMulti::ProcessFrameUcastLayerBuffering); 01480 01481 // If the host is registered for this layer, deregister 01482 if(REGISTERED == this->layers[layer]->StateRegister()) this->StreamDeregister(layer); 01483 01484 // Set a buffer underrun timer for the layer; if the buffer underrun is not fixed until the timer expires, select a new sender for this layer 01485 assert(!this->layers[layer]->TimerBuffer()->IsSet()); 01486 this->layers[layer]->TimerBuffer()->SetAfter(this->info->StreamBufferUnderrunTimeout(), this->layers[layer]); 01487 } 01488 01489 // Check if the number of layers in the READY state is lower than the minimum 01490 if(this->layersInfo.NumReady() < this->info->NumLayersMin()) 01491 { 01492 // Change the state 01493 this->stateClient = UCAST_STREAM_WAIT; 01494 01495 // Pause playback 01496 this->sessionTimeLastStop = this->sim->Time(); 01497 01498 // Switch to process wait frames (phase 3) 01499 this->processFrame = &CStreamClientPushMulti::ProcessFrameUcastWait; 01500 01501 #ifdef LOG 01502 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01503 { 01504 CConsole::SetColor(CConsole::LIGHT_YELLOW); 01505 printf("\n\t\tT = %7.3lf PLAY WAIT", this->sim->Time()); 01506 CConsole::SetColor(CConsole::LIGHT_GRAY); 01507 } 01508 #endif 01509 return; 01510 } 01511 01512 // Play the first frame from the buffer 01513 this->statPlayFrames++; 01514 01515 // Playback sync statistics 01516 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->bufferUcast->CurrentIndex()); 01517 this->statSyncDelayCount++; 01518 01519 if(this->bufferUcast->IsCurrentDecodable()) 01520 { 01521 assert(this->bufferUcast->HasCurrentIndex()); 01522 assert(this->bufferUcast->CurrentIndex() == this->bufferUcast->CurrentFrame()->Index()); 01523 assert(this->bufferUcast->CurrentFrame()->Type() < 3); 01524 01525 this->statPlayLastFrame = this->bufferUcast->CurrentIndex(); 01526 this->statSuccessFrames[this->bufferUcast->CurrentFrame()->Type()]++; 01527 01528 #ifdef LOG 01529 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01530 { 01531 CConsole::SetColor(CConsole::LIGHT_GREEN); 01532 printf("\n\t\tT = %7.3lf PLAY OKAY %6u\t%c", this->sim->Time(), this->bufferUcast->CurrentIndex(), stringFrameType[this->bufferUcast->CurrentFrame()->Type()]); 01533 CConsole::SetColor(CConsole::LIGHT_GRAY); 01534 } 01535 #endif 01536 } 01537 else 01538 { 01539 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndex()) < 3); 01540 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndex())]++; 01541 01542 #ifdef LOG 01543 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01544 { 01545 CConsole::SetColor(CConsole::LIGHT_RED); 01546 printf("\n\t\tT = %7.3lf PLAY FAIL %6u\t%c : %s", this->sim->Time(), this->bufferUcast->CurrentIndex(), stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(this->bufferUcast->CurrentIndex())], 01547 this->bufferUcast->HasCurrentIndex()?"DECODE_FAIL":"MISSING"); 01548 CConsole::SetColor(CConsole::LIGHT_GRAY); 01549 } 01550 #endif 01551 } 01552 01553 // Shift the buffer 01554 this->bufferUcast->Shift(); 01555 01556 // Schedule next play event 01557 this->timerPlayUcast->SetAt(this->sessionTimeLastStart + ((__time)(this->bufferUcast->CurrentIndex() - this->sessionFrameLastStart)) / (__time)this->channel->Fps()); 01558 } 01559 01560 void CStreamClientPushMulti::TimerBoot(CTimerInfo* info) 01561 { 01562 if(NULL == this->channel) return; 01563 01564 // Bootstrap timer : resend the boostrap query for all layers 01565 CStreamMessageBootPushMultiRequest* message = new CStreamMessageBootPushMultiRequest( 01566 this->channel->Id(), 01567 this->info->BootQueryMax() 01568 ); 01569 (*this->delegateSendMessage)(this->channel->Address(), message); 01570 01571 // Activate the timer to resend the query after a timeout period 01572 this->timerBoot->SetAfter(this->info->BootQueryTimeout()); 01573 } 01574 01575 void CStreamClientPushMulti::TimerRegister(CTimerInfo* info) 01576 { 01577 // Get the layer from the timer info 01578 assert(info); 01579 CLayer* layer = type_cast<CLayer*>(info); 01580 01581 // Register 01582 this->StreamRegister(layer->Layer()); 01583 } 01584 01585 void CStreamClientPushMulti::TimerBuffer(CTimerInfo* info) 01586 { 01587 // Buffer underrun timer : if a buffer underrun instance is not self-repairing (i.e. caused by delayed packets), 01588 // select a new source 01589 01590 // First, check the buffer underrun instance is genuine : the client must be in a wait or play state 01591 if((UCAST_STREAM_WAIT != this->stateClient) && (UCAST_STREAM_PLAY != this->stateClient)) return; 01592 01593 // Check the timer is set for the current channel 01594 if(NULL == this->channel) return; 01595 01596 // Get the layer 01597 CLayer* layer = type_cast<CLayer*>(info); 01598 01599 // Check the layer is in buffering state 01600 assert(LAYER_BUFFERING == layer->StateLayer()); 01601 01602 // Then, send a leave message to the old sender 01603 this->StreamLeave(layer->Layer()); 01604 01605 // Select a new sender from the neighbors list : if more neighbors are available 01606 if(layer->Neighbors()->size() > 0) 01607 { 01608 // Select a new random neighbor as a receiver for this layer 01609 __uint32 index = CRand::Generate(layer->Neighbors()->size()); 01610 assert(index < layer->Neighbors()->size()); 01611 01612 __uint32 idx = 0; 01613 for(set<CAddress>::iterator iter = layer->Neighbors()->begin(); iter != layer->Neighbors()->end(); iter++, idx++) 01614 { 01615 if(idx == index) 01616 { 01617 layer->Sender(*iter); 01618 break; 01619 } 01620 } 01621 01622 // Remove the sender from the neighbors list 01623 layer->Neighbors()->erase(layer->Sender()); 01624 01625 // If the number of remaining neighbors is less than the threshold, refresh the list of neighbors 01626 if(layer->Neighbors()->size() <= this->info->BootRefreshThreshold()) 01627 { 01628 // Send a boot push query message to the bootstrap server (the channel address) 01629 CStreamMessageBootPushMultiRequest* message = new CStreamMessageBootPushMultiRequest( 01630 this->channel->Id(), 01631 layer->Layer(), 01632 this->info->BootQueryMax() 01633 ); 01634 (*this->delegateSendMessage)(this->channel->Address(), message); 01635 } 01636 } 01637 // Else, use the server 01638 else layer->Sender(this->channel->Address()); 01639 01640 // Send a JOIN to the new sender 01641 this->StreamJoin(layer->Layer()); 01642 01643 // If the client is in the wait state and the number of ready layers is less than the minimum number, 01644 // switch the client to the reset state (after a long buffer underrun, the client must be resync with the the frames sent by the source) 01645 if((UCAST_STREAM_WAIT == this->stateClient) && (this->layersInfo.NumReady() < this->info->NumLayersMin())) 01646 { 01647 // Change the client state 01648 this->stateClient = UCAST_STREAM_RESET; 01649 01650 // Switch to reset 01651 this->processFrame = &CStreamClientPushMulti::ProcessFrameUcastReset; 01652 } 01653 } 01654 01655 void CStreamClientPushMulti::Finalize() 01656 { 01657 }
Last updated: February 8, 2011