Alex Bikfalvi
SimStream Documentation
StreamServerPushSelect.cpp
00001 #include "Headers.h" 00002 #include "StreamServerPushSelect.h" 00003 00004 CStreamServerPushSelect::CStreamServerPushSelect( 00005 CSimHandler* sim, 00006 CInfoPushSelect* info, 00007 CStreamSource* source, 00008 IDelegate2<void, CAddress, CPacketStream*>* delegateSendStream, 00009 IDelegate2<void, CAddress, CStreamMessage*>* delegateSendMessage 00010 ) : CStreamServer(sim) 00011 { 00012 // Set source 00013 this->source = source; 00014 00015 // Set info 00016 this->info = info; 00017 00018 this->layers = this->info->NumLayers(); 00019 this->gopLength = this->info->MpegGopSize(); 00020 00021 // Set lower layer delegates 00022 this->delegateSendStream = delegateSendStream; 00023 this->delegateSendMessage = delegateSendMessage; 00024 00025 // Create the server timer and hook it to the timer method 00026 this->timer = new CTimer<CStreamServerPushSelect>( 00027 sim, 00028 this, 00029 &CStreamServerPushSelect::Timer); 00030 00031 // Set the schedule rate (the channel fps) 00032 this->scheduleRate = this->source->Channel()->Fps(); 00033 00034 // Set the number of layers 00035 this->layers = layers; 00036 00037 // Set the GOP length 00038 this->gopLength = gopLength; 00039 00040 // Create the list of receivers 00041 this->receivers = new set<CAddress>[this->layers]; 00042 00043 // Encoder 00044 this->encoder = new CStreamEncoderFrame( 00045 this->delegateSendStream 00046 ); 00047 } 00048 00049 CStreamServerPushSelect::~CStreamServerPushSelect() 00050 { 00051 // Timer 00052 delete this->timer; 00053 // Encoder 00054 delete this->encoder; 00055 // Receivers 00056 delete[] this->receivers; 00057 } 00058 00059 void CStreamServerPushSelect::Start() 00060 { 00061 // Check the server timer is stopped 00062 assert(!this->timer->IsSet()); 00063 00064 // Synchronize with the stream source 00065 this->source->Synchronize(this->sim->Time(), this->scheduleTime, this->scheduleFrame); 00066 00067 // Set the delta frame to zero 00068 this->deltaFrame = 0; 00069 00070 // If the schedule time is the current time, call the timer function 00071 if(this->scheduleTime == this->sim->Time()) 00072 this->Timer(NULL); 00073 else // Else, set the timer at the scheduled time 00074 this->timer->SetAt(this->scheduleTime); 00075 } 00076 00077 void CStreamServerPushSelect::Stop() 00078 { 00079 // If the timer is set, cancel 00080 if(this->timer->IsSet()) this->timer->Cancel(); 00081 } 00082 00083 void CStreamServerPushSelect::Timer(CTimerInfo* info) 00084 { 00085 // Process server timer 00086 00087 // Fetch the frame from the source 00088 CStreamFrame frame = this->source->Fetch(this->scheduleFrame + this->deltaFrame); 00089 00090 // Calculate the current layer 00091 __uint32 layer = (frame.Index() / this->gopLength) % this->layers; 00092 00093 // Encode the frame to all destinations (the destination is the receiver unicast address) 00094 for(set<CAddress>::iterator iter = this->receivers[layer].begin(); iter != this->receivers[layer].end(); iter++) 00095 { 00096 this->encoder->Encode(*iter, frame); 00097 } 00098 00099 // Increment the delta frame 00100 this->deltaFrame++; 00101 00102 // Set a new timer 00103 this->timer->SetAt(this->scheduleTime + ((__time)this->deltaFrame)/this->scheduleRate); 00104 } 00105 00106 void CStreamServerPushSelect::Recv(CAddress src, CStreamMessage* message) 00107 { 00108 // Check the message 00109 assert(message); 00110 assert(message->Stream() == this->source->Channel()->Id()); 00111 00112 // Process the message depending on the message type 00113 switch(message->MessageType()) 00114 { 00115 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_MULTI_REQUEST: this->RecvMessageQuery(src, type_cast<CStreamMessageBootPushSelectRequest*>(message)); break; 00116 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_MULTI_REGISTER: this->RecvMessageRegister(src, type_cast<CStreamMessageBootPushSelectRegister*>(message)); break; 00117 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_MULTI_DEREGISTER: this->RecvMessageDeregister(src, type_cast<CStreamMessageBootPushSelectDeregister*>(message)); break; 00118 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_JOIN: this->RecvMessageJoin(src, type_cast<CStreamMessagePushSelectJoin*>(message)); break; 00119 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_LEAVE: this->RecvMessageLeave(src, type_cast<CStreamMessagePushSelectLeave*>(message)); break; 00120 } 00121 } 00122 00123 void CStreamServerPushSelect::RecvMessageQuery(CAddress src, CStreamMessageBootPushSelectRequest* message) 00124 { 00125 // Verify the type of query 00126 switch(message->RequestType()) 00127 { 00128 case CStreamMessageBootPushSelectRequest::STREAM: this->RecvMessageQueryStream(src, message); break; // Request for stream (all layers) 00129 case CStreamMessageBootPushSelectRequest::LAYER: this->RecvMessageQueryLayer(src, message); break; // Request for layer 00130 } 00131 } 00132 00133 void CStreamServerPushSelect::RecvMessageQueryStream(CAddress src, CStreamMessageBootPushSelectRequest* message) 00134 { 00135 // Process query 00136 00137 // Get the peer index of the child 00138 __uint32 child = this->info->HostAddressToIndex(src); 00139 00140 // Determine the parent for each layer 00141 CPeer** peers = this->info->ModelSelect()->Select(this->sim->Time(), this->source->Channel()->Id(), child); 00142 00143 // Set the list of hosts for each layer 00144 CAddress hosts[MAX_LAYERS]; 00145 00146 for(__uint32 layer = 0; layer < this->layers; layer++) 00147 { 00148 hosts[layer] = peers[layer] ? peers[layer]->Address() : this->source->Channel()->Address(); 00149 } 00150 00151 // Set the peer information 00152 this->info->ModelSelect()-> 00153 00154 // Reply with a response 00155 CStreamMessageBootPushSelectResponse* reply = new CStreamMessageBootPushSelectResponse( 00156 this->source->Channel()->Id(), 00157 this->layers, 00158 hosts 00159 ); 00160 00161 // Send the message through the delegate 00162 (*this->delegateSendMessage)(src, reply); 00163 } 00164 00165 void CStreamServerPushSelect::RecvMessageQueryLayer(CAddress src, CStreamMessageBootPushSelectRequest* message) 00166 { 00167 // Process query 00168 00169 // Get the peer index of the child 00170 __uint32 child = this->info->HostAddressToIndex(src); 00171 00172 // Determine the parent for each layer 00173 CPeer* peer = this->info->ModelSelect()->Select(this->sim->Time(), this->source->Channel()->Id(), message->Layer(), child); 00174 00175 // Reply with a response 00176 CStreamMessageBootPushSelectResponse* reply = new CStreamMessageBootPushSelectResponse( 00177 this->source->Channel()->Id(), 00178 message->Layer(), 00179 peer ? peer->Address() : this->source->Channel()->Address() 00180 ); 00181 00182 // Send the message through the delegate 00183 (*this->delegateSendMessage)(src, reply); 00184 } 00185 00186 void CStreamServerPushSelect::RecvMessageRegister(CAddress src, CStreamMessageBootPushSelectRegister* message) 00187 { 00188 // Register a host 00189 assert(message->Stream() == this->source->Channel()->Id()); 00190 00191 // Add the host to 00192 00193 // this->bootstrap->Register(message->Layer(), message->Address(), message->Sender()); 00194 } 00195 00196 void CStreamServerPushSelect::RecvMessageDeregister(CAddress src, CStreamMessageBootPushSelectDeregister* message) 00197 { 00198 // Deregister a host 00199 assert(message->Stream() == this->source->Channel()->Id()); 00200 00201 // this->bootstrap->Deregister(message->Layer(), src); 00202 } 00203 00204 void CStreamServerPushSelect::RecvMessageJoin(CAddress src, CStreamMessagePushSelectJoin* message) 00205 { 00206 // Process join 00207 00208 // Add the source to the list of receivers for the specified layer 00209 this->receivers[message->Layer()].insert(src); 00210 } 00211 00212 void CStreamServerPushSelect::RecvMessageLeave(CAddress src, CStreamMessagePushSelectLeave* message) 00213 { 00214 // Process leave 00215 00216 // Remove the source from the list of receivers for the specified layer 00217 this->receivers[message->Layer()].erase(src); 00218 } 00219 00220 void CStreamServerPushSelect::Finalize() 00221 { 00222 // Call base class finalizer 00223 CStreamServer::Finalize(); 00224 }
Last updated: February 8, 2011