Alex Bikfalvi
SimStream Documentation
StreamClientPushSelect.cpp
00001 #include "Headers.h" 00002 #include "StreamClientPushSelect.h" 00003 #include "Rand.h" 00004 #include "Console.h" 00005 #include "Debug.h" 00006 00007 CStreamClientPushSelect::CLayer::CLayer( 00008 __uint32 layer, 00009 CSimHandler* sim, 00010 CStreamClientPushSelect* client, 00011 CLayersInfo* info, 00012 void (CStreamClientPushSelect::*timerBootHandler)(CTimerInfo*), 00013 void (CStreamClientPushSelect::*timerRegisterHandler)(CTimerInfo*), 00014 void (CStreamClientPushSelect::*timerBufferHandler)(CTimerInfo*) 00015 ) 00016 { 00017 // Layer 00018 this->layer = layer; 00019 00020 // Layer state 00021 this->stateLayer = LAYER_IDLE; 00022 this->stateRegister = NOT_REGISTERED; 00023 00024 // Layers info 00025 this->info = info; 00026 00027 // Process function 00028 this->process = NULL; 00029 00030 // Timers 00031 this->timerRegister = new CTimer<CStreamClientPushSelect>(sim, client, timerRegisterHandler); 00032 this->timerBuffer = new CTimer<CStreamClientPushSelect>(sim, client, timerBufferHandler); 00033 } 00034 00035 CStreamClientPushSelect::CLayer::~CLayer() 00036 { 00037 // Delete timer 00038 delete this->timerRegister; 00039 delete this->timerBuffer; 00040 } 00041 00042 CStreamClientPushSelect::CLayersInfo::CLayersInfo() 00043 { 00044 this->numBuffering = 0; 00045 this->numReady = 0; 00046 this->numRegistered = 0; 00047 } 00048 00049 void CStreamClientPushSelect::CLayer::Start(CChannel* channel) 00050 { 00051 assert(LAYER_IDLE == this->stateLayer); 00052 assert(NOT_REGISTERED == this->stateRegister); 00053 00054 // Set the channel 00055 this->channel = channel; 00056 00057 // Set layer state 00058 this->stateLayer = LAYER_BUFFERING; 00059 00060 // Check the timers 00061 assert(!this->timerBuffer->IsSet()); 00062 assert(!this->timerRegister->IsSet()); 00063 00064 // Update info 00065 this->info->IdleToBuffering(); 00066 00067 // Reset streaming info 00068 this->neighbors.clear(); 00069 this->receivers.clear(); 00070 this->sender = channel->Address(); 00071 this->process = NULL; 00072 } 00073 00074 void CStreamClientPushSelect::CLayer::Stop() 00075 { 00076 assert(LAYER_IDLE != this->stateLayer); 00077 00078 // Set the layer state 00079 this->stateLayer = LAYER_IDLE; 00080 00081 // Cancel timers 00082 if(this->timerRegister->IsSet()) this->timerRegister->Cancel(); 00083 if(this->timerBuffer->IsSet()) this->timerBuffer->Cancel(); 00084 } 00085 00086 void CStreamClientPushSelect::CLayer::Buffering() 00087 { 00088 assert(LAYER_READY == this->stateLayer); 00089 00090 this->stateLayer = LAYER_BUFFERING; 00091 00092 // Change the info 00093 this->info->ReadyToBuffering(); 00094 } 00095 00096 void CStreamClientPushSelect::CLayer::Ready() 00097 { 00098 assert(LAYER_BUFFERING == this->stateLayer); 00099 00100 this->stateLayer = LAYER_READY; 00101 00102 // Change the info 00103 this->info->BufferingToReady(); 00104 } 00105 00106 CStreamClientPushSelect::CStreamClientPushSelect( 00107 CSimHandler* sim, 00108 CInfoPushSelect* info, 00109 CAddress address, 00110 IDelegate2<void, CAddress, CPacketStream*>* delegateSendStream, 00111 IDelegate2<void, CAddress, CStreamMessage*>* delegateSendMessage, 00112 IDelegate2<void, CAddress, __uint32>* delegateIgmpJoin, 00113 IDelegate1<void, CAddress>* delegateIgmpLeave, 00114 IDelegate0<CLink*>* delegateLink, 00115 __uint32 entry, 00116 __bitrate bw 00117 ) : CStreamClient(sim) 00118 { 00119 assert(info); 00120 00121 assert(delegateSendStream); 00122 assert(delegateSendMessage); 00123 assert(delegateIgmpJoin); 00124 assert(delegateIgmpLeave); 00125 assert(delegateLink); 00126 00127 // Set the initial state 00128 this->stateClient = STOPPED; 00129 00130 // Set simulator 00131 this->info = info; 00132 this->address = address; 00133 00134 // Set the channel 00135 this->channel = NULL; 00136 00137 // Create the buffers 00138 this->buffer = new CStreamBufferMulti( 00139 this->info->NumLayers(), 00140 this->info->StreamBufferSize(), 00141 this->info->StreamBufferSizeHistory(), 00142 this->info->MpegGopSize() 00143 ); 00144 00145 // Unicast session 00146 this->bw = bw; 00147 this->bwUsed = 0; 00148 00149 // Multicast session 00150 this->entry = entry; 00151 00152 // Create the timers 00153 this->timerPlay = new CTimer<CStreamClientPushSelect>( 00154 this->sim, 00155 this, 00156 &CStreamClientPushSelect::TimerPlay 00157 ); 00158 00159 this->timerBoot = new CTimer<CStreamClientPushSelect>( 00160 this->sim, 00161 this, 00162 &CStreamClientPushSelect::TimerBoot 00163 ); 00164 00165 // Set the lower layer delegates 00166 this->delegateSendStream = delegateSendStream; 00167 this->delegateSendMessage = delegateSendMessage; 00168 this->delegateIgmpJoin = delegateIgmpJoin; 00169 this->delegateIgmpLeave = delegateIgmpLeave; 00170 this->delegateLink = delegateLink; 00171 00172 // Delegates 00173 this->delegateRecvFrame = new Delegate2<CStreamClientPushSelect, void, CAddress, CStreamFrame>(this, &CStreamClientPushSelect::RecvFrame); 00174 00175 // Encoder 00176 this->encoder = new CStreamEncoderFrame(this->delegateSendStream); 00177 00178 // Decoder 00179 this->decoder = NULL; 00180 this->decoder = new CStreamDecoderFrame(this->delegateRecvFrame); 00181 00182 // Layers 00183 this->layers = new CLayer*[this->info->NumLayers()]; 00184 00185 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00186 { 00187 this->layers[index] = new CLayer( 00188 index, 00189 this->sim, 00190 this, 00191 &this->layersInfo, 00192 &CStreamClientPushSelect::TimerBoot, 00193 &CStreamClientPushSelect::TimerRegister, 00194 &CStreamClientPushSelect::TimerBuffer 00195 ); 00196 } 00197 } 00198 00199 CStreamClientPushSelect::~CStreamClientPushSelect() 00200 { 00201 // Delete delegates 00202 delete this->delegateRecvFrame; 00203 00204 // Delete coders 00205 delete this->encoder; 00206 delete this->decoder; 00207 00208 // Delete the buffers 00209 delete this->buffer; 00210 00211 // Delete the timers 00212 delete this->timerPlay; 00213 delete this->timerBoot; 00214 00215 // Delete the layers 00216 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00217 delete this->layers[index]; 00218 delete[] this->layers; 00219 } 00220 00221 void CStreamClientPushSelect::Start(CChannel* channel) 00222 { 00223 assert(channel); 00224 00225 // Check the state 00226 assert(STOPPED == this->stateClient); 00227 00228 // Check the timers are stopped 00229 assert(!this->timerPlay->IsSet()); 00230 assert(!this->timerBoot->IsSet()); 00231 00232 // Set the channel 00233 this->channel = channel; 00234 00235 // Log 00236 #ifdef LOG 00237 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00238 printf("\n\tT = %7.3lf PEER %3u START %3u (%u)", this->sim->Time(), this->address.Address(), this->channel->Id(), this->channel->Type()); 00239 #endif 00240 00241 // Reset statistics 00242 this->statRecvFrames = 0; 00243 this->statDiscardedFrames = 0; 00244 this->statPlayFrames = 0; 00245 this->statSuccessFrames[0] = 0; 00246 this->statSuccessFrames[1] = 0; 00247 this->statSuccessFrames[2] = 0; 00248 this->statFailFrames[0] = 0; 00249 this->statFailFrames[1] = 0; 00250 this->statFailFrames[2] = 0; 00251 00252 this->statTimeClientStart = this->sim->Time(); 00253 this->statTimeRecvStart = -1; 00254 this->statTimePlayStart = -1; 00255 this->statTimeWait = 0; 00256 00257 this->statPlayFirstFrame = 0; 00258 this->statPlayLastFrame = 0; 00259 00260 this->statSyncDelaySum = 0; 00261 this->statSyncDelayCount = 0; 00262 00263 // Check the registration state (client must not be registered) 00264 assert(0 == this->layersInfo.NumRegistered()); 00265 00266 // Set decoder 00267 this->decoder = this->decoder; 00268 00269 // Reset the decoder 00270 this->decoder->Reset(this->channel->Id()); 00271 00272 // Start layers 00273 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00274 this->layers[index]->Start(this->channel); 00275 00276 // Change the client state 00277 this->stateClient = UCAST_BOOTSTRAP_REQUEST; 00278 00279 // Go to streaming STEP 1 00280 this->StreamBootstrap(); 00281 } 00282 00283 void CStreamClientPushSelect::Stop() 00284 { 00285 // Check the state 00286 assert(STOPPED != this->stateClient); 00287 00288 // Log 00289 #ifdef LOG 00290 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00291 printf("\n\tT = %7.3lf PEER %3u STOP %3u", this->sim->Time(), this->address.Address(), this->channel->Id()); 00292 #endif 00293 00294 // Close all receivers 00295 this->StreamClose(); 00296 00297 // Leave the stream 00298 this->StreamLeave(); 00299 00300 // Deregister and stop all layers 00301 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00302 { 00303 // If layer is registered, deregister 00304 if(REGISTERED == this->layers[index]->StateRegister()) this->StreamDeregister(index); 00305 00306 this->layers[index]->Stop(); 00307 } 00308 00309 // Reset layers info 00310 this->layersInfo.Reset(); 00311 00312 // If the session is waiting, add the waiting time 00313 if(UCAST_STREAM_WAIT == this->stateClient) this->statTimeWait += (this->sim->Time() - this->sessionTimeLastStop); 00314 00315 // If there are active timer, cancel 00316 if(this->timerPlay->IsSet()) this->timerPlay->Cancel(); 00317 if(this->timerBoot->IsSet()) this->timerBoot->Cancel(); 00318 00319 // Finalize statistics 00320 this->statTimeFinish = this->sim->Time(); 00321 00322 // Set channel to null 00323 this->channel = NULL; 00324 00325 // Change the state 00326 this->stateClient = STOPPED; 00327 } 00328 00329 00330 void CStreamClientPushSelect::Recv(CAddress srcAddress, CAddress dstAddress, __uint16 srcPort, __uint16 dstPort, CPacket* packet) 00331 { 00332 // Process received packets 00333 if(NULL == packet) return; 00334 if((dstAddress != this->address) && (!dstAddress.IsMulticast())) return; 00335 00336 // Process packet depending on UDP destination 00337 if(this->info->PortStream() == dstPort) this->RecvStream(srcAddress, type_cast<CPacketStream*>(packet)); 00338 if(this->info->PortControl() == dstPort) this->RecvMessage(srcAddress, type_cast<CStreamMessage*>(packet)); 00339 } 00340 00341 void CStreamClientPushSelect::RecvStream(CAddress src, CPacketStream* packet) 00342 { 00343 #ifdef LOG 00344 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00345 { 00346 CConsole::SetColor(CConsole::DARK_GRAY); 00347 printf("\n\tT = %7.3lf RECV STREAM %u --- %u : ", this->sim->Time(), packet->Stream(), src.Address()); 00348 CConsole::SetColor(CConsole::LIGHT_GRAY); 00349 } 00350 #endif 00351 00352 // Send received stream packets to the decoder 00353 CStreamDecoder::EResult result = this->decoder->Decode(src, packet); 00354 00355 #ifdef LOG 00356 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00357 { 00358 CConsole::SetColor(CConsole::LIGHT_RED); 00359 switch(result) 00360 { 00361 case CStreamDecoder::FAIL_PACKET_TYPE: printf("FAIL PACKET TYPE"); break; 00362 case CStreamDecoder::FAIL_STREAM: printf("FAIL STREAM"); break; 00363 } 00364 CConsole::SetColor(CConsole::LIGHT_GRAY); 00365 } 00366 #endif 00367 } 00368 00369 void CStreamClientPushSelect::RecvMessage(CAddress src, CStreamMessage* message) 00370 { 00371 // Process messages 00372 switch(message->MessageType()) 00373 { 00374 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_JOIN: this->RecvMessageJoin(src, type_cast<CStreamMessagePushSelectJoin*>(message)); break; 00375 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_LEAVE: this->RecvMessageLeave(src, type_cast<CStreamMessagePushSelectLeave*>(message)); break; 00376 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_CLOSE: this->RecvMessageClose(src, type_cast<CStreamMessagePushSelectClose*>(message)); break; 00377 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_MULTI_RESPONSE: this->RecvMessageBootResponse(src, type_cast<CStreamMessageBootPushSelectResponse*>(message)); break; 00378 default: assert(0); /* do nothing */ 00379 } 00380 } 00381 00382 void CStreamClientPushSelect::RecvFrame(CAddress src, CStreamFrame frame) 00383 { 00384 #ifdef LOG 00385 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00386 { 00387 CConsole::SetColor(CConsole::LIGHT_CYAN); 00388 printf("\n\tT = %7.3lf RECV FRAME (U) %u", this->sim->Time(), frame.Index()); 00389 CConsole::SetColor(CConsole::LIGHT_GRAY); 00390 } 00391 #endif 00392 00393 // Check the client state (if not a client state that allows receiving frames, ignore) 00394 if(this->stateClient < UCAST_STREAM_FIRST) 00395 { 00396 this->statDiscardedFrames++; 00397 return; 00398 } 00399 00400 // Check the decoder passed a correct frame 00401 if(frame.Stream() != this->channel->Id()) 00402 { 00403 this->statDiscardedFrames++; 00404 return; 00405 } 00406 00407 // Calculate the layer of the current frame 00408 __uint32 layer = this->buffer->IndexToLayer(frame.Index()); 00409 assert(layer < this->info->NumLayers()); 00410 00411 // Process frame by client (client processing is done first) 00412 assert(this->processFrame); 00413 (this->*this->processFrame)(frame); 00414 00415 // Process frame by layer (layer processing is done last) 00416 assert(this->layers[layer]->ProcessFrame()); 00417 (this->*this->layers[layer]->ProcessFrame())(frame, layer); 00418 00419 // Statistics 00420 this->statRecvFrames++; 00421 } 00422 00423 void CStreamClientPushSelect::ProcessFrameFirst(CStreamFrame frame) 00424 { 00425 // PROCESS Phase 1 : Process first frame to initialize playback buffer 00426 00427 #ifdef LOG 00428 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00429 printf("\t\t[UCAST FIRST]"); 00430 #endif 00431 00432 // Check the client state 00433 assert(UCAST_STREAM_FIRST == this->stateClient); 00434 00435 // Reset the buffer 00436 this->buffer->Reset(frame.Index()); 00437 00438 // Calculate the layer 00439 __uint32 layer = this->buffer->IndexToLayer(frame.Index()); 00440 00441 // Add the frame to the buffer and if the frame is new send it to the receivers 00442 if(this->buffer->Add(layer, frame)) this->SendStream(frame, layer); 00443 00444 // Statistics 00445 this->statTimeRecvStart = this->sim->Time(); 00446 00447 // Switch to process buffering frames (phase 2) 00448 this->processFrame = &CStreamClientPushSelect::ProcessFrameBuffering; 00449 00450 // Change the client state 00451 this->stateClient = UCAST_STREAM_BUFFERING; 00452 } 00453 00454 void CStreamClientPushSelect::ProcessFrameBuffering(CStreamFrame frame) 00455 { 00456 // PLAYBACK Phase 2 : Buffering phase (playback is not started and frames are saved in the buffer) 00457 00458 // Buffering phase finishes when the minimum number of layers is in the ready state 00459 00460 #ifdef LOG 00461 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00462 printf("\n\t\t[UCAST BUFFERING]\t(frame_index=%u layers_ready=%u layers_min=%u)", frame.Index(), this->layersInfo.NumReady(), this->info->NumLayersMin()); 00463 #endif 00464 00465 // Check the state 00466 assert(UCAST_STREAM_BUFFERING == this->stateClient); 00467 00468 // If the minimum number of layers is in the READY state 00469 if(this->layersInfo.NumReady() >= this->info->NumLayersMin()) 00470 { 00471 // Find the first independent frame 00472 for(__uint32 count = 0; count < this->buffer->Size(); count++) 00473 { 00474 // If the current frame is independent 00475 if(this->buffer->IsCurrentIndependent()) 00476 { 00477 // First frame found : start playback 00478 00479 // Start playback : save session and stat information 00480 this->sessionFrameLastStart = this->buffer->CurrentIndex(); 00481 this->sessionTimeLastStart = this->sim->Time(); 00482 00483 this->statPlayFirstFrame = this->buffer->CurrentIndex(); 00484 this->statTimePlayStart = this->sim->Time(); 00485 00486 // Switch to process playback frames (phase 3) 00487 this->processFrame = &CStreamClientPushSelect::ProcessFramePlay; 00488 00489 // Change the state 00490 this->stateClient = UCAST_STREAM_PLAY; 00491 00492 // Call the play timer 00493 this->TimerPlay(NULL); 00494 00495 break; 00496 } 00497 else this->buffer->Shift(); 00498 } 00499 } 00500 } 00501 00502 void CStreamClientPushSelect::ProcessFramePlay(CStreamFrame frame) 00503 { 00504 #ifdef LOG 00505 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00506 printf("\t\t[UCAST PLAY]"); 00507 #endif 00508 00509 // PLAYBACK Phase 3 : Play phase 00510 assert(UCAST_STREAM_PLAY == this->stateClient); 00511 00512 // Do nothing 00513 } 00514 00515 void CStreamClientPushSelect::ProcessFrameWait(CStreamFrame frame) 00516 { 00517 // PLAYBACK Wait : Buffering phase after a buffer underrun (playback is not started and frames are saved in the buffer) 00518 // Buffering phase finishes when there is a minimum number of layers in the READY state 00519 00520 #ifdef LOG 00521 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00522 printf("\t\t[UCAST WAIT]"); 00523 #endif 00524 00525 // Check the state 00526 assert(UCAST_STREAM_WAIT == this->stateClient); 00527 00528 // Calculate layer 00529 __uint32 layer = this->buffer->IndexToLayer(frame.Index()); 00530 00531 // Check if the minimum number of layers is in the READY state 00532 if(this->layersInfo.NumReady() >= this->info->NumLayersMin()) 00533 { 00534 // Find the first frame 00535 for(__uint32 count = 0; count < this->buffer->Count(layer); count++) 00536 { 00537 if(this->buffer->HasCurrentIndex()) 00538 { 00539 // Check the last stop 00540 assert(this->sim->Time() >= this->sessionTimeLastStop); 00541 00542 #ifdef LOG 00543 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00544 { 00545 CConsole::SetColor(CConsole::LIGHT_YELLOW); 00546 printf("\n\tT = %7.3lf WAIT : %7.3lf (%7.3lf - %7.3lf)", this->sim->Time(), this->sim->Time() - this->sessionTimeLastStop, this->sessionTimeLastStop, this->sim->Time()); 00547 CConsole::SetColor(CConsole::LIGHT_GRAY); 00548 } 00549 #endif 00550 // Save statistics 00551 this->statTimeWait += this->sim->Time() - this->sessionTimeLastStop; 00552 00553 // Restart playback 00554 this->sessionFrameLastStart = this->buffer->CurrentIndex(); 00555 this->sessionTimeLastStart = this->sim->Time(); 00556 00557 // Switch to process playback frames (phase 3) 00558 this->processFrame = &CStreamClientPushSelect::ProcessFramePlay; 00559 00560 // Change the state 00561 this->stateClient = UCAST_STREAM_PLAY; 00562 00563 // Call the play timer 00564 this->TimerPlay(NULL); 00565 00566 break; 00567 } 00568 else 00569 { 00570 // Increment the number of played frames 00571 this->statPlayFrames++; 00572 00573 // Playback sync statistics 00574 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()); 00575 this->statSyncDelayCount++; 00576 00577 // Mark the frame as failed 00578 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex()) < 3); 00579 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())]++; 00580 00581 #ifdef LOG 00582 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00583 { 00584 CConsole::SetColor(CConsole::LIGHT_RED); 00585 printf("\n\t\tT = %7.3lf PLAY FAIL %6u\t%c : %s", this->sim->Time(), this->buffer->CurrentIndex(), stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())], 00586 this->buffer->HasCurrentIndex()?"DECODE_FAIL":"MISSING"); 00587 CConsole::SetColor(CConsole::LIGHT_GRAY); 00588 } 00589 #endif 00590 // Shift the buffer 00591 this->buffer->Shift(); 00592 } 00593 } 00594 } 00595 } 00596 00597 void CStreamClientPushSelect::ProcessFrameReset(CStreamFrame frame) 00598 { 00599 // PLAYBACK Reset : Reset the buffer after a long buffer underrun (playback is not started and frames are saved in the buffer) 00600 00601 #ifdef LOG 00602 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00603 printf("\t\t[UCAST RESET]"); 00604 #endif 00605 00606 // After the buffer is reset by the first frame, switch to wait state and wait for the buffer to fill again 00607 // Check the state 00608 assert(UCAST_STREAM_RESET == this->stateClient); 00609 00610 // Reset the buffer 00611 this->buffer->Reset(frame.Index()); 00612 00613 // Switch all layers to the buffering state 00614 for(__uint32 index = 0; index < this->info->NumLayers(); index++) 00615 { 00616 // If the layer is not in buffering state, switch to buffering state 00617 if(this->layers[index]->StateLayer() == LAYER_READY) this->layers[index]->Buffering(); 00618 this->layers[index]->Process(&CStreamClientPushSelect::ProcessFrameLayerBuffering); 00619 00620 // If the layer is registered, deregister 00621 if(this->layers[index]->StateRegister() == REGISTERED) this->layers[index]->Deregister(); 00622 00623 // If there is a registration timer set for this layer, cancel 00624 if(this->layers[index]->TimerRegister()->IsSet()) this->layers[index]->TimerRegister()->Cancel(); 00625 00626 // If there is a buffering timer set for this layer, cancel 00627 if(this->layers[index]->TimerBuffer()->IsSet()) this->layers[index]->TimerBuffer()->Cancel(); 00628 } 00629 00630 // Switch to the wait state 00631 this->processFrame = &CStreamClientPushSelect::ProcessFrameWait; 00632 00633 // Change the state 00634 this->stateClient = UCAST_STREAM_WAIT; 00635 } 00636 00637 void CStreamClientPushSelect::ProcessFrameLayerBuffering(CStreamFrame frame, __uint32 layer) 00638 { 00639 // PROCESS LAYER FRAME Phase 1 : Buffering phase (playback is not started and frames are saved in the buffer) 00640 00641 // Buffering phase finishes when the index of the received frame exceeds the the first frame from the 00642 // buffer, plus the buffering size 00643 00644 // Check the state 00645 assert(LAYER_BUFFERING == this->layers[layer]->StateLayer()); 00646 00647 #ifdef LOG 00648 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00649 printf("\n\t\t[UCAST LAYER BUFFERING]\t(frame_index=%u layer=%u frame_layer_index=%u)", frame.Index(), layer, this->buffer->IndexToLayerIndex(frame.Index())); 00650 #endif 00651 00652 // Add the frame to the buffer 00653 if(this->buffer->Add(layer, frame)) 00654 { 00655 // If the frame was added to the buffer 00656 00657 #ifdef LOG 00658 CConsole::SetColor(CConsole::LIGHT_GREEN); 00659 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00660 printf("\t\tBUFFER HIT"); 00661 CConsole::SetColor(CConsole::LIGHT_GRAY); 00662 #endif 00663 00664 // Check if there are enough frames to start playback (the distance between the first frame in the buffer and the received frame) 00665 if(this->buffer->IndexToLayerIndex(frame.Index()) >= this->buffer->LayerIndex(layer) + this->info->StreamBufferSizeBuffering() - 1) 00666 { 00667 #ifdef LOG 00668 CConsole::SetColor(CConsole::LIGHT_GREEN); 00669 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00670 printf("\tREADY : YES (frame_layer_index=%u buffer_layer_index=%u buffering=%u)", 00671 this->buffer->IndexToLayerIndex(frame.Index()), this->buffer->LayerIndex(layer), this->info->StreamBufferSizeBuffering() 00672 ); 00673 CConsole::SetColor(CConsole::LIGHT_GRAY); 00674 #endif 00675 00676 // Change the layer into the ready state 00677 this->layers[layer]->Ready(); 00678 00679 // Switch to process buffering frames for this layer (phase 3) 00680 this->layers[layer]->Process(&CStreamClientPushSelect::ProcessFrameLayerReady); 00681 00682 // If the buffering timer is active, cancel the timer 00683 if(this->layers[layer]->TimerBuffer()->IsSet()) this->layers[layer]->TimerBuffer()->Cancel(); 00684 00685 // Set the registration timer for this layer 00686 if(this->layers[layer]->TimerRegister()->IsSet()) this->layers[layer]->TimerRegister()->Cancel(); 00687 00688 assert(this->layers[layer]->StateRegister() == NOT_REGISTERED); 00689 this->layers[layer]->TimerRegister()->SetAfter(this->info->BootRegisterDelay(), this->layers[layer]); 00690 } 00691 else 00692 { 00693 #ifdef LOG 00694 CConsole::SetColor(CConsole::LIGHT_RED); 00695 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00696 printf("\tREADY : NO (frame_layer_index=%u buffer_layer_index=%u buffering=%u)", 00697 this->buffer->IndexToLayerIndex(frame.Index()), this->buffer->LayerIndex(layer), this->info->StreamBufferSizeBuffering() 00698 ); 00699 CConsole::SetColor(CConsole::LIGHT_GRAY); 00700 #endif 00701 } 00702 00703 this->SendStream(frame, layer); 00704 } 00705 else 00706 { 00707 #ifdef LOG 00708 CConsole::SetColor(CConsole::LIGHT_RED); 00709 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00710 printf("\t\tBUFFER MISS"); 00711 CConsole::SetColor(CConsole::LIGHT_GRAY); 00712 #endif 00713 } 00714 } 00715 00716 void CStreamClientPushSelect::ProcessFrameLayerReady(CStreamFrame frame, __uint32 layer) 00717 { 00718 // PLAYBACK Phase 2 : Play phase 00719 00720 #ifdef LOG 00721 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00722 printf("\n\t\t[UCAST LAYER READY]\t(frame_index=%u layer=%u frame_layer_index=%u)", frame.Index(), layer, this->buffer->IndexToLayerIndex(frame.Index())); 00723 #endif 00724 00725 // Add frame to the buffer 00726 if(this->buffer->Add(layer, frame)) 00727 { 00728 #ifdef LOG 00729 CConsole::SetColor(CConsole::LIGHT_GREEN); 00730 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00731 printf("\t\tBUFFER HIT"); 00732 CConsole::SetColor(CConsole::LIGHT_GRAY); 00733 #endif 00734 00735 this->SendStream(frame, layer); 00736 } 00737 else 00738 { 00739 #ifdef LOG 00740 CConsole::SetColor(CConsole::LIGHT_RED); 00741 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00742 printf("\t\tBUFFER MISS"); 00743 CConsole::SetColor(CConsole::LIGHT_GRAY); 00744 #endif 00745 } 00746 } 00747 00748 void CStreamClientPushSelect::StreamBootstrap() 00749 { 00750 // Streaming STEP 1 : Bootstraping (connect to bootstrap server and retrieve list of neighbors for all layers) 00751 00752 #ifdef LOG 00753 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00754 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP REQUEST %3u (STREAM)", this->sim->Time(), this->address.Address(), this->channel->Id()); 00755 #endif 00756 00757 // Check the state 00758 assert(UCAST_BOOTSTRAP_REQUEST == this->stateClient); 00759 00760 // Send a boot push query message to the bootstrap server (the channel address) 00761 CStreamMessageBootPushSelectRequest* message = new CStreamMessageBootPushSelectRequest( 00762 this->channel->Id(), 00763 this->info->BootQueryMax() 00764 ); 00765 00766 // Send the message 00767 (*this->delegateSendMessage)(this->channel->Address(), message); 00768 00769 // Activate the timer to resend the query after a timeout period 00770 this->timerBoot->SetAfter(this->info->BootQueryTimeout()); 00771 00772 // Change the state 00773 this->stateClient = UCAST_BOOTSTRAP_RESPONSE; 00774 } 00775 00776 void CStreamClientPushSelect::StreamJoin() 00777 { 00778 // Streaming STEP 2a : Joining (start streaming from the senders) 00779 00780 // Check the state 00781 assert(UCAST_JOIN_REQUEST == this->stateClient); 00782 00783 if(NULL == this->channel) return; 00784 00785 // Send a join request to the senders for all layers 00786 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 00787 { 00788 this->StreamJoin(layer); 00789 00790 // Get ready to receive the stream : start streaming session with phase 1 00791 this->layers[layer]->Process(&CStreamClientPushSelect::ProcessFrameLayerBuffering); 00792 00793 // Check the layer state is buffering 00794 assert(this->layers[layer]->StateLayer() == LAYER_BUFFERING); 00795 } 00796 00797 // Set the processing function to first frame 00798 this->processFrame = &CStreamClientPushSelect::ProcessFrameFirst; 00799 00800 // Change the state 00801 this->stateClient = UCAST_STREAM_FIRST; 00802 } 00803 00804 void CStreamClientPushSelect::StreamJoin(__uint32 layer) 00805 { 00806 #ifdef LOG 00807 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00808 printf("\n\tT = %7.3lf PEER %3u JOIN %3u/%u -> %u", this->sim->Time(), this->address.Address(), this->channel->Id(), layer, this->layers[layer]->Sender().Address()); 00809 #endif 00810 00811 CStreamMessagePushSelectJoin* message = new CStreamMessagePushSelectJoin(this->channel->Id(), layer); 00812 00813 (*this->delegateSendMessage)(this->layers[layer]->Sender(), message); 00814 } 00815 00816 void CStreamClientPushSelect::StreamLeave() 00817 { 00818 // Streaming STEP 3a : Leave (inform the senders) 00819 00820 // Send a leave to the sender for each layer 00821 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 00822 { 00823 this->StreamLeave(layer); 00824 } 00825 } 00826 00827 void CStreamClientPushSelect::StreamLeave(__uint32 layer) 00828 { 00829 // Streaming buffer underrun for this layer : Leave 00830 CStreamMessagePushSelectLeave* message = new CStreamMessagePushSelectLeave(this->channel->Id(), layer); 00831 00832 (*this->delegateSendMessage)(this->layers[layer]->Sender(), message); 00833 } 00834 00835 void CStreamClientPushSelect::StreamRegister(__uint32 layer) 00836 { 00837 // Streaming STEP 2b : Register (register to bootstrap server) 00838 00839 // Check the client state (client must start playing in order to register) 00840 assert(this->stateClient >= UCAST_STREAM_FIRST); 00841 00842 // Check the layer state (layer must be in ready state in order to register) 00843 if(this->layers[layer]->StateLayer() != LAYER_READY) return; 00844 00845 // Check the registration state 00846 if(this->layers[layer]->StateRegister() != NOT_REGISTERED) return; 00847 00848 CStreamMessageBootPushSelectRegister* message = new CStreamMessageBootPushSelectRegister( 00849 this->channel->Id(), 00850 layer, 00851 this->address, 00852 this->layers[layer]->Sender()); 00853 00854 (*this->delegateSendMessage)(this->channel->Address(), message); 00855 00856 // Change the registration state 00857 this->layers[layer]->Register(); 00858 } 00859 00860 void CStreamClientPushSelect::StreamDeregister(__uint32 layer) 00861 { 00862 // Check the resgistration state 00863 assert(this->layers[layer]->StateRegister() == REGISTERED); 00864 00865 // Streaming STEP 3b : Deregister (deregister from bootstrap server) 00866 CStreamMessageBootPushSelectDeregister* message = new CStreamMessageBootPushSelectDeregister( 00867 this->channel->Id(), 00868 layer, 00869 this->address); 00870 00871 (*this->delegateSendMessage)(this->channel->Address(), message); 00872 00873 // Change the registration state 00874 this->layers[layer]->Deregister(); 00875 } 00876 00877 void CStreamClientPushSelect::StreamClose() 00878 { 00879 // Streaming STEP ASYNC : Close (sent to receivers upon leaving or when receiving a request that cannot be served) 00880 00881 // Send the close to all layers / all receivers 00882 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 00883 { 00884 for(set<CAddress>::iterator iter = this->layers[layer]->Receivers()->begin(); iter != this->layers[layer]->Receivers()->end(); iter++) 00885 { 00886 CStreamMessagePushSelectClose* message = new CStreamMessagePushSelectClose(this->channel->Id(), layer); 00887 00888 (*this->delegateSendMessage)(*iter, message); 00889 00890 #ifdef LOG 00891 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00892 printf("\n\tT = %7.3lf PEER %3u CLOSE RECEIVER %3u", this->sim->Time(), this->address.Address(), ((CAddress)(*iter)).Address()); 00893 #endif 00894 } 00895 00896 // Clear the receivers list 00897 this->layers[layer]->Receivers()->clear(); 00898 } 00899 00900 // Set the used bandwidth to zero 00901 this->bwUsed = 0; 00902 } 00903 00904 void CStreamClientPushSelect::SendStream(CStreamFrame frame, __uint32 layer) 00905 { 00906 // Check the client state 00907 assert(this->stateClient >= UCAST_STREAM_FIRST); 00908 00909 #ifdef LOG 00910 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00911 { 00912 CConsole::SetColor(CConsole::DARK_GRAY); 00913 printf("\n\tT = %7.3lf SEND FRAME (U) %u : [ ", this->sim->Time(), frame.Index()); 00914 } 00915 #endif 00916 00917 for(set<CAddress>::iterator iter = this->layers[layer]->Receivers()->begin(); iter != this->layers[layer]->Receivers()->end(); iter++) 00918 { 00919 // Send frame to the encoder 00920 this->encoder->Encode(*iter, frame); 00921 00922 #ifdef LOG 00923 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00924 printf("%u ", ((CAddress)(*iter)).Address()); 00925 #endif 00926 } 00927 00928 #ifdef LOG 00929 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00930 { 00931 printf("]"); 00932 CConsole::SetColor(CConsole::LIGHT_GRAY); 00933 } 00934 #endif 00935 } 00936 00937 void CStreamClientPushSelect::RecvMessageJoin(CAddress src, CStreamMessagePushSelectJoin* message) 00938 { 00939 // Process received JOIN 00940 00941 // Check whether the client can serve the channel 00942 if(NULL == this->channel) { this->RejectJoin(src, message); return; } 00943 if(this->channel->Id() != message->Stream()) { this->RejectJoin(src, message); return; } 00944 00945 // Check whether the client is registered 00946 if(REGISTERED != this->layers[message->Layer()]->StateRegister()) { this->RejectJoin(src, message); return; } 00947 00948 #ifdef LOG 00949 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00950 printf("\n\tT = %7.3lf PEER %3u RECV JOIN %u : ", this->sim->Time(), this->address.Address(), src.Address()); 00951 #endif 00952 00953 // Check whether the peer would have bandwidth to serve the channel considering the current usage 00954 __bitrate bwLayer = this->channel->Bw() / this->info->NumLayers(); 00955 if(this->bw - this->bwUsed < this->info->StreamBwMargin() * bwLayer) 00956 { 00957 00958 #ifdef LOG 00959 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00960 { 00961 CConsole::SetColor(CConsole::LIGHT_RED); 00962 printf("REJECT"); 00963 CConsole::SetColor(CConsole::LIGHT_GRAY); 00964 } 00965 #endif 00966 00967 // Reject the request 00968 this->RejectJoin(src, message); 00969 // Deregister 00970 this->StreamDeregister(message->Layer()); 00971 return; 00972 } 00973 00974 #ifdef LOG 00975 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 00976 { 00977 CConsole::SetColor(CConsole::LIGHT_GREEN); 00978 printf("ACCEPT"); 00979 CConsole::SetColor(CConsole::LIGHT_GRAY); 00980 } 00981 #endif 00982 00983 // Acceptance conditions met : add the host to the receivers list 00984 this->layers[message->Layer()]->Receivers()->insert(src); 00985 00986 // Estimate the increase in bandwidth usage 00987 this->bwUsed += bwLayer; 00988 } 00989 00990 void CStreamClientPushSelect::RecvMessageLeave(CAddress src, CStreamMessagePushSelectLeave* message) 00991 { 00992 // Process received LEAVE 00993 00994 // Check whether the peer is serving the channel (may be a delayed message) 00995 if(NULL == this->channel) return; 00996 if(this->channel->Id() != message->Stream()) return; 00997 00998 // Check if the host is in the receivers list 00999 if(this->layers[message->Layer()]->Receivers()->find(src) != this->layers[message->Layer()]->Receivers()->end()) 01000 { 01001 // Remove the host from the receivers list 01002 this->layers[message->Layer()]->Receivers()->erase(src); 01003 01004 // Estimate the decrease in bandwidth usage 01005 this->bwUsed -= this->channel->Bw() / this->info->NumLayers(); 01006 } 01007 } 01008 01009 void CStreamClientPushSelect::RecvMessageClose(CAddress src, CStreamMessagePushSelectClose* message) 01010 { 01011 // Process received CLOSE 01012 01013 #ifdef LOG 01014 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01015 printf("\n\tT = %7.3lf PEER %3u CLOSE %3u <- %u", this->sim->Time(), this->address.Address(), this->channel->Id(), src.Address()); 01016 #endif 01017 01018 // Check the close is for current channel 01019 if(NULL == this->channel) return; 01020 if(this->channel->Id() != message->Stream()) return; 01021 01022 // Check the close has been sent by the current sender (otherwise, ignore) 01023 if(src != this->layers[message->Layer()]->Sender()) return; 01024 01025 // Check the state of the client (if the close message is delayed) 01026 if(this->stateClient < UCAST_STREAM_FIRST) return; 01027 01028 // If more neighbors are available 01029 if(this->layers[message->Layer()]->Neighbors()->size() > 0) 01030 { 01031 // Select a new random neighbor as a receiver for this layer 01032 __uint32 index = CRand::Generate(this->layers[message->Layer()]->Neighbors()->size()); 01033 assert(index < this->layers[message->Layer()]->Neighbors()->size()); 01034 01035 __uint32 idx = 0; 01036 for(set<CAddress>::iterator iter = this->layers[message->Layer()]->Neighbors()->begin(); iter != this->layers[message->Layer()]->Neighbors()->end(); iter++, idx++) 01037 { 01038 if(idx == index) 01039 { 01040 this->layers[message->Layer()]->Sender(*iter); // Set the sender 01041 break; 01042 } 01043 } 01044 01045 // Remove the sender from the neighbors list 01046 this->layers[message->Layer()]->Neighbors()->erase(this->layers[message->Layer()]->Sender()); 01047 01048 // If the number of remaining neighbors is less than the threshold, refresh the list of neighbors 01049 if(this->layers[message->Layer()]->Neighbors()->size() <= this->info->BootRefreshThreshold()) 01050 { 01051 #ifdef LOG 01052 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01053 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP RE-REQUEST %3u:%u (%u neighbors remaining / %u threshold)", this->sim->Time(), this->address.Address(), this->channel->Id(), message->Layer(), this->layers[message->Layer()]->Neighbors()->size(), this->info->BootRefreshThreshold()); 01054 #endif 01055 // Send a boot push query message to the bootstrap server (the channel address) 01056 CStreamMessageBootPushSelectRequest* request = new CStreamMessageBootPushSelectRequest( 01057 this->channel->Id(), 01058 message->Layer(), 01059 this->info->BootQueryMax() 01060 ); 01061 01062 (*this->delegateSendMessage)(this->channel->Address(), request); 01063 } 01064 } 01065 // Else, use the server 01066 else 01067 { 01068 this->layers[message->Layer()]->Sender(this->channel->Address()); 01069 } 01070 01071 #ifdef LOG 01072 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01073 printf("\n\tT = %7.3lf PEER %3u JOIN %3u:%u -> %u", this->sim->Time(), this->address.Address(), this->channel->Id(), message->Layer(), this->layers[message->Layer()]->Sender().Address()); 01074 #endif 01075 01076 // Send a JOIN to the new sender 01077 CStreamMessagePushSelectJoin* reply = new CStreamMessagePushSelectJoin(this->channel->Id(), message->Layer()); 01078 01079 (*this->delegateSendMessage)(this->layers[message->Layer()]->Sender(), reply); 01080 } 01081 01082 void CStreamClientPushSelect::RecvMessageBootResponse(CAddress src, CStreamMessageBootPushSelectResponse* message) 01083 { 01084 // Process received BOOT RESPONSE 01085 01086 // If no current channel, do nothing (delayed response) 01087 if(NULL == this->channel) return; 01088 01089 // If the message is not for this channel, do nothing (delayed response) 01090 if(message->Stream() != this->channel->Id()) return; 01091 01092 // If the response is for a global bootstrap request 01093 if(message->ResponseType() == CStreamMessageBootPushSelectResponse::STREAM) 01094 { 01095 // If the boot timer is set, cancel (layer bootstrap requests do not have boot timers) 01096 if(this->timerBoot->IsSet()) this->timerBoot->Cancel(); 01097 } 01098 01099 // Process the response, depending on the client state 01100 if(UCAST_BOOTSTRAP_RESPONSE == this->stateClient) 01101 { 01102 // CASE 1 : Bootstrap request sent at the beginning of the session; this populates the neighbor 01103 // list and changes the state of the client 01104 01105 // Check the response is for the stream 01106 assert(message->ResponseType() == CStreamMessageBootPushSelectResponse::STREAM); 01107 01108 #ifdef LOG 01109 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01110 { 01111 printf("\n\tT = %7.3lf PEER %3u BOOTSTRAP RESPONSE %3u (STREAM) : ", this->sim->Time(), this->address.Address(), this->channel->Id()); 01112 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 01113 { 01114 printf("\n\t\t\tLayer %u : ", layer); 01115 for(__uint32 host = 0; host < message->Count(layer); host++) printf("%u ", message->Host(layer, host).Address()); 01116 } 01117 } 01118 #endif 01119 01120 // Process the response for each layer 01121 for(__uint32 layer = 0; layer < this->info->NumLayers(); layer++) 01122 { 01123 if(message->Count(layer) > 0) 01124 { 01125 // If there are neighbors available (otherwise, the server is used by default) 01126 01127 // Choose a random neighbor to the the sender for this layer 01128 __uint32 sender = CRand::Generate(message->Count(layer)); 01129 assert(sender < message->Count(layer)); 01130 01131 // Save the neighbors 01132 for(__uint32 host = 0; host < sender; host++) 01133 this->layers[layer]->Neighbors()->insert(message->Host(layer, host)); 01134 for(__uint32 host = sender+1; host < message->Count(layer); host++) 01135 this->layers[layer]->Neighbors()->insert(message->Host(layer, host)); 01136 01137 // Save the sender 01138 this->layers[layer]->Sender(message->Host(layer, sender)); 01139 } 01140 } 01141 01142 // Change the state 01143 this->stateClient = UCAST_JOIN_REQUEST; 01144 01145 // Go to streaming STEP 2a 01146 this->StreamJoin(); 01147 } 01148 else 01149 { 01150 // CASE 2 : Refresh bootstrap request sent during a session; this refreshes the neighbor 01151 // list but does not change the state of the client or the sender 01152 01153 // Check the response is for one layer only 01154 assert(message->ResponseType() == CStreamMessageBootPushSelectResponse::LAYER); 01155 01156 // Process the response 01157 // Clear the neighbors list 01158 this->layers[message->Layer()]->Neighbors()->clear(); 01159 01160 // Save the neighbors 01161 for(__uint32 host = 0; host < message->Count(0); host++) 01162 this->layers[message->Layer()]->Neighbors()->insert(message->Host(0, host)); 01163 } 01164 } 01165 01166 void CStreamClientPushSelect::RejectJoin(CAddress src, CStreamMessagePushSelectJoin* message) 01167 { 01168 // Reject JOIN request : send a CLOSE message to the sender 01169 CStreamMessagePushSelectClose* reply = new CStreamMessagePushSelectClose(message->Stream(), message->Layer()); 01170 01171 (*this->delegateSendMessage)(src, reply); 01172 } 01173 01174 void CStreamClientPushSelect::TimerPlay(CTimerInfo* info) 01175 { 01176 assert(UCAST_STREAM_PLAY == this->stateClient); 01177 01178 // Calculate the layer of the current frame 01179 __uint32 layer = this->buffer->IndexToLayer(this->buffer->CurrentIndex()); 01180 01181 // If the current layer is READY, check if there is a buffer underrun 01182 if((this->layers[layer]->StateLayer() == LAYER_READY) && (this->buffer->NumPlay(layer) == 0)) 01183 { 01184 // Change the layer state to buffering 01185 assert(this->layers[layer]->StateLayer() == LAYER_READY); 01186 this->layers[layer]->Buffering(); 01187 01188 // Switch to process buffering frames (phase 2) 01189 this->layers[layer]->Process(&CStreamClientPushSelect::ProcessFrameLayerBuffering); 01190 01191 // If the host is registered for this layer, deregister 01192 if(REGISTERED == this->layers[layer]->StateRegister()) this->StreamDeregister(layer); 01193 01194 // Set a buffer underrun timer for the layer; if the buffer underrun is not fixed until the timer expires, select a new sender for this layer 01195 assert(!this->layers[layer]->TimerBuffer()->IsSet()); 01196 this->layers[layer]->TimerBuffer()->SetAfter(this->info->StreamBufferUnderrunTimeout(), this->layers[layer]); 01197 } 01198 01199 // Check if the number of layers in the READY state is lower than the minimum 01200 if(this->layersInfo.NumReady() < this->info->NumLayersMin()) 01201 { 01202 // Change the state 01203 this->stateClient = UCAST_STREAM_WAIT; 01204 01205 // Pause playback 01206 this->sessionTimeLastStop = this->sim->Time(); 01207 01208 // Switch to process wait frames (phase 3) 01209 this->processFrame = &CStreamClientPushSelect::ProcessFrameWait; 01210 01211 #ifdef LOG 01212 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01213 { 01214 CConsole::SetColor(CConsole::LIGHT_YELLOW); 01215 printf("\n\t\tT = %7.3lf PLAY WAIT", this->sim->Time()); 01216 CConsole::SetColor(CConsole::LIGHT_GRAY); 01217 } 01218 #endif 01219 return; 01220 } 01221 01222 // Play the first frame from the buffer 01223 this->statPlayFrames++; 01224 01225 // Playback sync statistics 01226 this->statSyncDelaySum += this->sim->Time() - this->info->StreamSource(this->channel->Id())->FrameTime(this->buffer->CurrentIndex()); 01227 this->statSyncDelayCount++; 01228 01229 if(this->buffer->IsCurrentDecodable()) 01230 { 01231 assert(this->buffer->HasCurrentIndex()); 01232 assert(this->buffer->CurrentIndex() == this->buffer->CurrentFrame()->Index()); 01233 assert(this->buffer->CurrentFrame()->Type() < 3); 01234 01235 this->statPlayLastFrame = this->buffer->CurrentIndex(); 01236 this->statSuccessFrames[this->buffer->CurrentFrame()->Type()]++; 01237 01238 #ifdef LOG 01239 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01240 { 01241 CConsole::SetColor(CConsole::LIGHT_GREEN); 01242 printf("\n\t\tT = %7.3lf PLAY OKAY %6u\t%c", this->sim->Time(), this->buffer->CurrentIndex(), stringFrameType[this->buffer->CurrentFrame()->Type()]); 01243 CConsole::SetColor(CConsole::LIGHT_GRAY); 01244 } 01245 #endif 01246 } 01247 else 01248 { 01249 assert(this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex()) < 3); 01250 this->statFailFrames[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())]++; 01251 01252 #ifdef LOG 01253 if((LOG_ADDRESS == this->address.Address()) && (LOG_CHANNEL == this->channel->Id())) 01254 { 01255 CConsole::SetColor(CConsole::LIGHT_RED); 01256 printf("\n\t\tT = %7.3lf PLAY FAIL %6u\t%c : %s", this->sim->Time(), this->buffer->CurrentIndex(), stringFrameType[this->info->StreamSource(this->channel->Id())->FrameType(this->buffer->CurrentIndex())], 01257 this->buffer->HasCurrentIndex()?"DECODE_FAIL":"MISSING"); 01258 CConsole::SetColor(CConsole::LIGHT_GRAY); 01259 } 01260 #endif 01261 } 01262 01263 // Shift the buffer 01264 this->buffer->Shift(); 01265 01266 // Schedule next play event 01267 this->timerPlay->SetAt(this->sessionTimeLastStart + ((__time)(this->buffer->CurrentIndex() - this->sessionFrameLastStart)) / (__time)this->channel->Fps()); 01268 } 01269 01270 void CStreamClientPushSelect::TimerBoot(CTimerInfo* info) 01271 { 01272 if(NULL == this->channel) return; 01273 01274 // Bootstrap timer : resend the boostrap query for all layers 01275 CStreamMessageBootPushSelectRequest* message = new CStreamMessageBootPushSelectRequest( 01276 this->channel->Id(), 01277 this->info->BootQueryMax() 01278 ); 01279 01280 (*this->delegateSendMessage)(this->channel->Address(), message); 01281 01282 // Activate the timer to resend the query after a timeout period 01283 this->timerBoot->SetAfter(this->info->BootQueryTimeout()); 01284 } 01285 01286 void CStreamClientPushSelect::TimerRegister(CTimerInfo* info) 01287 { 01288 // Get the layer from the timer info 01289 assert(info); 01290 CLayer* layer = type_cast<CLayer*>(info); 01291 01292 // Register 01293 this->StreamRegister(layer->Layer()); 01294 } 01295 01296 void CStreamClientPushSelect::TimerBuffer(CTimerInfo* info) 01297 { 01298 // Buffer underrun timer : if a buffer underrun instance is not self-repairing (i.e. caused by delayed packets), 01299 // select a new source 01300 01301 // First, check the buffer underrun instance is genuine : the client must be in a wait or play state 01302 if((UCAST_STREAM_WAIT != this->stateClient) && (UCAST_STREAM_PLAY != this->stateClient)) return; 01303 01304 // Check the timer is set for the current channel 01305 if(NULL == this->channel) return; 01306 01307 // Get the layer 01308 CLayer* layer = type_cast<CLayer*>(info); 01309 01310 // Check the layer is in buffering state 01311 assert(LAYER_BUFFERING == layer->StateLayer()); 01312 01313 // Then, send a leave message to the old sender 01314 this->StreamLeave(layer->Layer()); 01315 01316 // Select a new sender from the neighbors list : if more neighbors are available 01317 if(layer->Neighbors()->size() > 0) 01318 { 01319 // Select a new random neighbor as a receiver for this layer 01320 __uint32 index = CRand::Generate(layer->Neighbors()->size()); 01321 assert(index < layer->Neighbors()->size()); 01322 01323 __uint32 idx = 0; 01324 for(set<CAddress>::iterator iter = layer->Neighbors()->begin(); iter != layer->Neighbors()->end(); iter++, idx++) 01325 { 01326 if(idx == index) 01327 { 01328 layer->Sender(*iter); 01329 break; 01330 } 01331 } 01332 01333 // Remove the sender from the neighbors list 01334 layer->Neighbors()->erase(layer->Sender()); 01335 01336 // If the number of remaining neighbors is less than the threshold, refresh the list of neighbors 01337 if(layer->Neighbors()->size() <= this->info->BootRefreshThreshold()) 01338 { 01339 // Send a boot push query message to the bootstrap server (the channel address) 01340 CStreamMessageBootPushSelectRequest* message = new CStreamMessageBootPushSelectRequest( 01341 this->channel->Id(), 01342 layer->Layer(), 01343 this->info->BootQueryMax() 01344 ); 01345 01346 (*this->delegateSendMessage)(this->channel->Address(), message); 01347 } 01348 } 01349 // Else, use the server 01350 else layer->Sender(this->channel->Address()); 01351 01352 // Send a JOIN to the new sender 01353 this->StreamJoin(layer->Layer()); 01354 01355 // If the client is in the wait state and the number of ready layers is less than the minimum number, 01356 // switch the client to the reset state (after a long buffer underrun, the client must be resync with the the frames sent by the source) 01357 if((UCAST_STREAM_WAIT == this->stateClient) && (this->layersInfo.NumReady() < this->info->NumLayersMin())) 01358 { 01359 // Change the client state 01360 this->stateClient = UCAST_STREAM_RESET; 01361 01362 // Switch to reset 01363 this->processFrame = &CStreamClientPushSelect::ProcessFrameReset; 01364 } 01365 } 01366 01367 void CStreamClientPushSelect::Finalize() 01368 { 01369 }
Last updated: February 8, 2011