Alex Bikfalvi
SimStream Documentation
StreamServerPushUcast.cpp
00001 #include "Headers.h" 00002 #include "StreamServerPushUcast.h" 00003 00004 CStreamServerPushUcast::CStreamServerPushUcast( 00005 CSimHandler* sim, 00006 CStreamSource* source, 00007 IDelegate2<void, CAddress, CPacketStream*>* delegateSendStream, 00008 IDelegate2<void, CAddress, CStreamMessage*>* delegateSendMessage 00009 ) : CStreamServer(sim) 00010 { 00011 // Set source 00012 this->source = source; 00013 00014 // Set lower layer delegates 00015 this->delegateSendStream = delegateSendStream; 00016 this->delegateSendMessage = delegateSendMessage; 00017 00018 // Create the server timer and hook it to the timer method 00019 this->timer = new CTimer<CStreamServerPushUcast>( 00020 sim, 00021 this, 00022 &CStreamServerPushUcast::Timer); 00023 00024 // Set the schedule rate (the channel fps) 00025 this->scheduleRate = this->source->Channel()->Fps(); 00026 00027 // Create the bootstrap 00028 this->bootstrap = new CStreamBootPush(this->source->Channel()->Address()); 00029 00030 // Encoder 00031 this->encoder = new CStreamEncoderFrame( 00032 this->delegateSendStream 00033 ); 00034 } 00035 00036 CStreamServerPushUcast::~CStreamServerPushUcast() 00037 { 00038 // Timer 00039 delete this->timer; 00040 // Bootstrap 00041 delete this->bootstrap; 00042 // Encoder 00043 delete this->encoder; 00044 } 00045 00046 void CStreamServerPushUcast::Start() 00047 { 00048 // Check the server timer is stopped 00049 assert(!this->timer->IsSet()); 00050 00051 // Synchronize with the stream source 00052 this->source->Synchronize(this->sim->Time(), this->scheduleTime, this->scheduleFrame); 00053 00054 // Set the delta frame to zero 00055 this->deltaFrame = 0; 00056 00057 // If the schedule time is the current time, call the timer function 00058 if(this->scheduleTime == this->sim->Time()) 00059 this->Timer(NULL); 00060 else // Else, set the timer at the scheduled time 00061 this->timer->SetAt(this->scheduleTime); 00062 } 00063 00064 void CStreamServerPushUcast::Stop() 00065 { 00066 // If the timer is set, cancel 00067 if(this->timer->IsSet()) this->timer->Cancel(); 00068 } 00069 00070 void CStreamServerPushUcast::Timer(CTimerInfo* info) 00071 { 00072 // Process server timer 00073 00074 // Fetch the frame from the source 00075 CStreamFrame frame = this->source->Fetch(this->scheduleFrame + this->deltaFrame); 00076 00077 // Encode the frame to all destinations (the destination is the receiver unicast address) 00078 for(set<CAddress>::iterator iter = this->receivers.begin(); iter != this->receivers.end(); iter++) 00079 { 00080 this->encoder->Encode(*iter, frame); 00081 } 00082 00083 // Increment the delta frame 00084 this->deltaFrame++; 00085 00086 // Set a new timer 00087 this->timer->SetAt(this->scheduleTime + ((__time)this->deltaFrame)/this->scheduleRate); 00088 } 00089 00090 void CStreamServerPushUcast::Recv(CAddress src, CStreamMessage* message) 00091 { 00092 // Check the message 00093 assert(message); 00094 assert(message->Stream() == this->source->Channel()->Id()); 00095 00096 // Process the message depending on the message type 00097 switch(message->MessageType()) 00098 { 00099 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_REQUEST: this->RecvMessageQuery(src, type_cast<CStreamMessageBootPushRequest*>(message)); break; 00100 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_REGISTER: this->RecvMessageRegister(src, type_cast<CStreamMessageBootPushRegister*>(message)); break; 00101 case CStreamMessage::STREAM_MESSAGE_BOOT_PUSH_DEREGISTER: this->RecvMessageDeregister(src, type_cast<CStreamMessageBootPushDeregister*>(message)); break; 00102 case CStreamMessage::STREAM_MESSAGE_PUSH_JOIN: this->RecvMessageJoin(src, type_cast<CStreamMessagePushJoin*>(message)); break; 00103 case CStreamMessage::STREAM_MESSAGE_PUSH_LEAVE: this->RecvMessageLeave(src, type_cast<CStreamMessagePushLeave*>(message)); break; 00104 } 00105 } 00106 00107 void CStreamServerPushUcast::RecvMessageQuery(CAddress src, CStreamMessageBootPushRequest* message) 00108 { 00109 // Process query 00110 // Reply with a response 00111 CStreamMessageBootPushResponse* reply; 00112 00113 // Retrieve a random set of neighbors 00114 // Get all registered hosts 00115 CStreamBootPush::List* hosts = this->bootstrap->Fetch(); 00116 00117 // Copy all hosts locally to remove any ineligible hosts (i.e. the source hosts and descendants) 00118 CAddress* chosts = new CAddress[hosts->size()]; 00119 00120 __uint32 count = 0; 00121 for(CStreamBootPush::List::iterator iter = hosts->begin(); iter != hosts->end(); iter++) 00122 { 00123 if(!this->bootstrap->IsDependent(src, iter->first)) chosts[count++] = iter->first; 00124 } 00125 00126 assert(count <= hosts->size()); 00127 00128 if(count <= message->Count()) 00129 { 00130 // If the number of hosts is less than or equal to the requested number, put all in the reply 00131 reply = new CStreamMessageBootPushResponse(this->source->Channel()->Id(),count); 00132 00133 for(__uint32 index = 0; index < count; index++) 00134 reply->Host(index) = chosts[index]; 00135 } 00136 else 00137 { 00138 // If the number of hosts is greater than the requested number, choose a random subset 00139 reply = new CStreamMessageBootPushResponse(this->source->Channel()->Id(), message->Count()); 00140 00141 CShuffle shuffle(count); 00142 00143 for(__uint32 index = 0; index < message->Count(); index++) 00144 reply->Host(index) = chosts[shuffle[index]]; 00145 } 00146 00147 delete[] chosts; 00148 00149 // Send the message through the delegate 00150 (*this->delegateSendMessage)(src, reply); 00151 } 00152 00153 void CStreamServerPushUcast::RecvMessageRegister(CAddress src, CStreamMessageBootPushRegister* message) 00154 { 00155 // Register a host 00156 assert(message->Stream() == this->source->Channel()->Id()); 00157 00158 this->bootstrap->Register(message->Address(), message->Sender()); 00159 } 00160 00161 void CStreamServerPushUcast::RecvMessageDeregister(CAddress src, CStreamMessageBootPushDeregister* message) 00162 { 00163 // Deregister a host 00164 assert(message->Stream() == this->source->Channel()->Id()); 00165 00166 this->bootstrap->Deregister(src); 00167 } 00168 00169 void CStreamServerPushUcast::RecvMessageJoin(CAddress src, CStreamMessagePushJoin* message) 00170 { 00171 // Process join 00172 00173 // Add the source to the list of receivers for the specified layer 00174 this->receivers.insert(src); 00175 } 00176 00177 void CStreamServerPushUcast::RecvMessageLeave(CAddress src, CStreamMessagePushLeave* message) 00178 { 00179 // Process leave 00180 00181 // Remove the source from the list of receivers for the specified layer 00182 this->receivers.erase(src); 00183 } 00184 00185 void CStreamServerPushUcast::Finalize() 00186 { 00187 // Call base class finalizer 00188 CStreamServer::Finalize(); 00189 }
Last updated: February 8, 2011