Alex Bikfalvi
SimStream Documentation
StreamServerPushUcastMulti.cpp
00001 #include "Headers.h" 00002 #include "StreamServerPushUcastMulti.h" 00003 00004 CStreamServerPushUcastMulti::CStreamServerPushUcastMulti( 00005 CSimHandler* sim, 00006 CStreamSource* source, 00007 IDelegate2<void, CAddress, CPacketStream*>* delegateSendStream, 00008 IDelegate2<void, CAddress, CStreamMessage*>* delegateSendMessage, 00009 __uint32 layers, 00010 __byte gopLength 00011 ) : CStreamServer(sim) 00012 { 00013 // Set source 00014 this->source = source; 00015 00016 // Set lower layer delegates 00017 this->delegateSendStream = delegateSendStream; 00018 this->delegateSendMessage = delegateSendMessage; 00019 00020 // Create the server timer and hook it to the timer method 00021 this->timer = new CTimer<CStreamServerPushUcastMulti>( 00022 sim, 00023 this, 00024 &CStreamServerPushUcastMulti::Timer); 00025 00026 // Set the schedule rate (the channel fps) 00027 this->scheduleRate = this->source->Channel()->Fps(); 00028 00029 // Set the number of layers 00030 this->layers = layers; 00031 00032 // Set the GOP length 00033 this->gopLength = gopLength; 00034 00035 // Create the bootstrap 00036 this->bootstrap = new CStreamBootPushMulti(this->layers, this->source->Channel()->Address()); 00037 00038 // Create the list of receivers 00039 this->receivers = new set<CAddress>[this->layers]; 00040 00041 // Encoder 00042 this->encoder = new CStreamEncoderFrame( 00043 this->delegateSendStream 00044 ); 00045 } 00046 00047 CStreamServerPushUcastMulti::~CStreamServerPushUcastMulti() 00048 { 00049 // Timer 00050 delete this->timer; 00051 // Bootstrap 00052 delete this->bootstrap; 00053 // Encoder 00054 delete this->encoder; 00055 // Receivers 00056 delete[] this->receivers; 00057 } 00058 00059 void CStreamServerPushUcastMulti::Start() 00060 { 00061 // Check the server timer is stopped 00062 assert(!this->timer->IsSet()); 00063 00064 // Synchronize with the stream source 00065 this->source->Synchronize(this->sim->Time(), this->scheduleTime, this->scheduleFrame); 00066 00067 // Set the delta frame to zero 00068 this->deltaFrame = 0; 00069 00070 // If the schedule time is the current time, call the timer function 00071 if(this->scheduleTime == this->sim->Time()) 00072 this->Timer(NULL); 00073 else // Else, set the timer at the scheduled time 00074 this->timer->SetAt(this->scheduleTime); 00075 } 00076 00077 void CStreamServerPushUcastMulti::Stop() 00078 { 00079 // If the timer is set, cancel 00080 if(this->timer->IsSet()) this->timer->Cancel(); 00081 } 00082 00083 void CStreamServerPushUcastMulti::Timer(CTimerInfo* info) 00084 { 00085 // Process server timer 00086 00087 // Fetch the frame from the source 00088 CStreamFrame frame = this->source->Fetch(this->scheduleFrame + this->deltaFrame); 00089 00090 // Calculate the current layer 00091 __uint32 layer = (frame.Index() / this->gopLength) % this->layers; 00092 00093 // Encode the frame to all destinations (the destination is the receiver unicast address) 00094 for(set<CAddress>::iterator iter = this->receivers[layer].begin(); iter != this->receivers[layer].end(); iter++) 00095 { 00096 this->encoder->Encode(*iter, frame); 00097 } 00098 00099 // Increment the delta frame 00100 this->deltaFrame++; 00101 00102 // Set a new timer 00103 this->timer->SetAt(this->scheduleTime + ((__time)this->deltaFrame)/this->scheduleRate); 00104 } 00105 00106 void CStreamServerPushUcastMulti::Recv(CAddress src, CStreamMessage* message) 00107 { 00108 // Check the message 00109 assert(message); 00110 assert(message->Stream() == this->source->Channel()->Id()); 00111 00112 // Process the message depending on the message type 00113 switch(message->MessageType()) 00114 { 00115 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_MULTI_REQUEST: this->RecvMessageQuery(src, type_cast<CStreamMessageBootPushMultiRequest*>(message)); break; 00116 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_MULTI_REGISTER: this->RecvMessageRegister(src, type_cast<CStreamMessageBootPushMultiRegister*>(message)); break; 00117 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_MULTI_DEREGISTER: this->RecvMessageDeregister(src, type_cast<CStreamMessageBootPushMultiDeregister*>(message)); break; 00118 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_JOIN: this->RecvMessageJoin(src, type_cast<CStreamMessagePushMultiJoin*>(message)); break; 00119 case CStreamMessage::STREAM_MESSAGE_PUSH_MULTI_LEAVE: this->RecvMessageLeave(src, type_cast<CStreamMessagePushMultiLeave*>(message)); break; 00120 } 00121 } 00122 00123 void CStreamServerPushUcastMulti::RecvMessageQuery(CAddress src, CStreamMessageBootPushMultiRequest* message) 00124 { 00125 // Verify the type of query 00126 switch(message->RequestType()) 00127 { 00128 case CStreamMessageBootPushMultiRequest::STREAM: this->RecvMessageQueryStream(src, message); break; // Request for stream (all layers) 00129 case CStreamMessageBootPushMultiRequest::LAYER: this->RecvMessageQueryLayer(src, message); break; // Request for layer 00130 } 00131 } 00132 00133 void CStreamServerPushUcastMulti::RecvMessageQueryStream(CAddress src, CStreamMessageBootPushMultiRequest* message) 00134 { 00135 // Process query 00136 00137 // Reply with a response 00138 CStreamMessageBootPushMultiResponse* reply = new CStreamMessageBootPushMultiResponse( 00139 this->source->Channel()->Id(), 00140 this->layers 00141 ); 00142 00143 for(__uint32 layer = 0; layer < this->layers; layer++) 00144 { 00145 // Retrieve a random set of neighbors for each layer 00146 // Get all registered hosts 00147 CStreamBootPushMulti::List* hosts = this->bootstrap->Fetch(layer); 00148 00149 // Copy all hosts locally to remove the source 00150 CAddress* chosts = new CAddress[hosts->size()]; 00151 00152 __uint32 count = 0; 00153 for(CStreamBootPushMulti::List::iterator iter = hosts->begin(); iter != hosts->end(); iter++) 00154 { 00155 if(!this->bootstrap->IsDependent(layer, src, iter->first)) chosts[count++] = iter->first; 00156 } 00157 00158 assert(count <= hosts->size()); 00159 00160 if(count <= message->Count()) 00161 { 00162 // If the number of hosts is less than or equal to the requested number, put all in the reply 00163 reply->Set(layer, count); 00164 00165 for(__uint32 index = 0; index < count; index++) 00166 reply->Host(layer, index) = chosts[index]; 00167 } 00168 else 00169 { 00170 // If the number of hosts is greater than the requested number, choose a random subset 00171 reply->Set(layer, message->Count()); 00172 00173 CShuffle shuffle(count); 00174 00175 for(__uint32 index = 0; index < message->Count(); index++) 00176 reply->Host(layer, index) = chosts[shuffle[index]]; 00177 } 00178 00179 delete[] chosts; 00180 } 00181 00182 // Send the message through the delegate 00183 (*this->delegateSendMessage)(src, reply); 00184 } 00185 00186 void CStreamServerPushUcastMulti::RecvMessageQueryLayer(CAddress src, CStreamMessageBootPushMultiRequest* message) 00187 { 00188 // Process query 00189 00190 // Reply with a response 00191 CStreamMessageBootPushMultiResponse* reply; 00192 00193 // Retrieve a random set of neighbors 00194 // Get all registered hosts 00195 CStreamBootPushMulti::List* hosts = this->bootstrap->Fetch(message->Layer()); 00196 00197 // Copy all hosts locally to remove any ineligible hosts (i.e. the source hosts and descendants) 00198 CAddress* chosts = new CAddress[hosts->size()]; 00199 00200 __uint32 count = 0; 00201 for(CStreamBootPushMulti::List::iterator iter = hosts->begin(); iter != hosts->end(); iter++) 00202 { 00203 if(!this->bootstrap->IsDependent(message->Layer(), src, iter->first)) chosts[count++] = iter->first; 00204 } 00205 00206 assert(count <= hosts->size()); 00207 00208 if(count <= message->Count()) 00209 { 00210 // If the number of hosts is less than or equal to the requested number, put all in the reply 00211 reply = new CStreamMessageBootPushMultiResponse(this->source->Channel()->Id(), message->Layer(), count); 00212 00213 for(__uint32 index = 0; index < count; index++) 00214 reply->Host(0, index) = chosts[index]; 00215 } 00216 else 00217 { 00218 // If the number of hosts is greater than the requested number, choose a random subset 00219 reply = new CStreamMessageBootPushMultiResponse(this->source->Channel()->Id(), message->Layer(), message->Count()); 00220 00221 CShuffle shuffle(count); 00222 00223 for(__uint32 index = 0; index < message->Count(); index++) 00224 reply->Host(0, index) = chosts[shuffle[index]]; 00225 } 00226 00227 delete[] chosts; 00228 00229 // Send the message through the delegate 00230 (*this->delegateSendMessage)(src, reply); 00231 } 00232 00233 void CStreamServerPushUcastMulti::RecvMessageRegister(CAddress src, CStreamMessageBootPushMultiRegister* message) 00234 { 00235 // Register a host 00236 assert(message->Stream() == this->source->Channel()->Id()); 00237 00238 this->bootstrap->Register(message->Layer(), message->Address(), message->Sender()); 00239 } 00240 00241 void CStreamServerPushUcastMulti::RecvMessageDeregister(CAddress src, CStreamMessageBootPushMultiDeregister* message) 00242 { 00243 // Deregister a host 00244 assert(message->Stream() == this->source->Channel()->Id()); 00245 00246 this->bootstrap->Deregister(message->Layer(), src); 00247 } 00248 00249 void CStreamServerPushUcastMulti::RecvMessageJoin(CAddress src, CStreamMessagePushMultiJoin* message) 00250 { 00251 // Process join 00252 00253 // Add the source to the list of receivers for the specified layer 00254 this->receivers[message->Layer()].insert(src); 00255 } 00256 00257 void CStreamServerPushUcastMulti::RecvMessageLeave(CAddress src, CStreamMessagePushMultiLeave* message) 00258 { 00259 // Process leave 00260 00261 // Remove the source from the list of receivers for the specified layer 00262 this->receivers[message->Layer()].erase(src); 00263 } 00264 00265 void CStreamServerPushUcastMulti::Finalize() 00266 { 00267 // Call base class finalizer 00268 CStreamServer::Finalize(); 00269 }
Last updated: February 8, 2011