Alex Bikfalvi
SimStream Documentation
StreamServerPull.cpp
00001 #include "Headers.h" 00002 #include "StreamServerPull.h" 00003 #include "Shuffle.h" 00004 00005 CStreamServerPull::CStreamServerPull( 00006 CSimHandler* sim, 00007 CInfoPull* info, 00008 IDelegate5<void, __uint16, __uint16, CAddress, __byte, CPacket*>* delegateSend, 00009 IDelegate0<CLink*>* delegateLink 00010 ) : CStreamServer(sim) 00011 { 00012 // Set the info 00013 this->info = info; 00014 00015 assert(delegateSend); 00016 assert(delegateLink); 00017 00018 // Set lower layer delegates 00019 this->delegateSend = delegateSend; 00020 this->delegateLink = delegateLink; 00021 00022 // Delegates 00023 this->delegateConnectionRecv = new Delegate2<CStreamServerPull, void, CConnectionReceiver*, CPacket*>(this, &CStreamServerPull::ConnectionRecv); 00024 this->delegateConnectionAccept = new Delegate2<CStreamServerPull, bool, CAddress, CPacket*>(this, &CStreamServerPull::ConnectionAccept); 00025 this->delegateConnectionAccepted = new Delegate1<CStreamServerPull, void, CConnectionSender*>(this, &CStreamServerPull::ConnectionAccepted); 00026 this->delegateConnectionSenderOpen = new Delegate2<CStreamServerPull, void, CConnection*, CConnection::EOpenResult>(this, &CStreamServerPull::ConnectionSenderOpen); 00027 this->delegateConnectionSenderClose = new Delegate2<CStreamServerPull, void, CConnection*, CConnection::ECloseResult>(this, &CStreamServerPull::ConnectionSenderClose); 00028 00029 // Create the bootstrap 00030 this->bootstraps = new CStreamBootPull[this->info->NumChannelsUnicast()]; 00031 00032 // Connection layer 00033 this->connectionLayer = new CConnectionLayer( 00034 this->sim, 00035 this->info->PortConnection(), 00036 this->info->ServerConnectionsMax(), 00037 this->delegateSend, 00038 this->delegateConnectionRecv, 00039 this->delegateConnectionAccept, 00040 this->delegateConnectionAccepted, 00041 this->info->ConnectionSegmentSize()); 00042 } 00043 00044 CStreamServerPull::~CStreamServerPull() 00045 { 00046 // Bootstrap 00047 delete[] this->bootstraps; 00048 00049 // Connections 00050 delete this->connectionLayer; 00051 00052 // Delete delegates 00053 delete this->delegateConnectionRecv; 00054 delete this->delegateConnectionAccept; 00055 delete this->delegateConnectionAccepted; 00056 00057 delete this->delegateConnectionSenderOpen; 00058 delete this->delegateConnectionSenderClose; 00059 00060 // Delete receivers 00061 while(!this->receivers.empty()) delete this->receivers.begin()->second; 00062 } 00063 00064 void CStreamServerPull::Start() 00065 { 00066 // Do nothing : all requests are asynchronous 00067 } 00068 00069 void CStreamServerPull::Stop() 00070 { 00071 // Do nothing : all requests are asynchronous 00072 } 00073 00074 void CStreamServerPull::Recv(CAddress srcAddress, __uint16 srcPort, __uint16 dstPort, CPacket* packet) 00075 { 00076 if(this->info->PortControl() == dstPort) this->RecvMessage(srcAddress, type_cast<CStreamMessage*>(packet)); 00077 if(this->info->PortConnection() == dstPort) this->RecvConnection(srcAddress, srcPort, dstPort, type_cast<CPacketConnection*>(packet)); 00078 } 00079 00080 void CStreamServerPull::RecvMessage(CAddress src, CStreamMessage* message) 00081 { 00082 // Check the message 00083 assert(message); 00084 00085 // Process the message depending on the message type 00086 switch(message->MessageType()) 00087 { 00088 case CStreamMessage::STREAM_MESSAGE_BOOT_PULL_REQUEST: this->RecvMessageQuery(src, type_cast<CStreamMessageBootPullRequest*>(message)); break; 00089 case CStreamMessage::STREAM_MESSAGE_BOOT_PULL_REGISTER: this->RecvMessageRegister(src, type_cast<CStreamMessageBootPullRegister*>(message)); break; 00090 case CStreamMessage::STREAM_MESSAGE_BOOT_PULL_DEREGISTER: this->RecvMessageDeregister(src, type_cast<CStreamMessageBootPullDeregister*>(message)); break; 00091 case CStreamMessage::STREAM_MESSAGE_PULL_BITMAP_REQUEST: this->RecvMessageBitmapRequest(src, type_cast<CStreamMessagePullBitmapRequest*>(message)); break; 00092 case CStreamMessage::STREAM_MESSAGE_PULL_SEGMENT_REQUEST: this->RecvMessageSegmentRequest(src, type_cast<CStreamMessagePullSegmentRequest*>(message)); break; 00093 default: assert(0); 00094 } 00095 } 00096 00097 void CStreamServerPull::RecvConnection(CAddress srcAddress, __uint16 srcPort, __uint16 dstPort, CPacketConnection* packet) 00098 { 00099 this->connectionLayer->Recv(srcAddress, srcPort, dstPort, packet); 00100 } 00101 00102 void CStreamServerPull::RecvMessageQuery(CAddress src, CStreamMessageBootPullRequest* message) 00103 { 00104 // Process query 00105 00106 // Get the index of the unicast stream 00107 __uint32 stream = this->info->IndexGlobalToUnicast(message->Stream()); 00108 00109 // Reply with a response 00110 CStreamMessageBootPullResponse* reply; 00111 00112 // Retrieve a random set of neighbors 00113 // Get all registered hosts 00114 CStreamBootPull::List* hosts = this->bootstraps[stream].Fetch(); 00115 00116 // Copy all hosts locally to remove any ineligible hosts (i.e. the source hosts and descendants) 00117 CAddress* chosts = new CAddress[hosts->size() + 1]; 00118 00119 __uint32 count = 0; 00120 00121 // Add the server 00122 // chosts[count++] = this->info->Channel(message->Stream())->Address(); 00123 00124 for(CStreamBootPull::List::iterator iter = hosts->begin(); iter != hosts->end(); iter++) 00125 { 00126 if((*iter) != src) chosts[count++] = *iter; 00127 } 00128 00129 assert(count <= hosts->size()); 00130 00131 // If the number of found hosts is less than the number of neighbors, also add the server 00132 if(0 == count) 00133 { 00134 chosts[count++] = this->info->Channel(message->Stream())->Address(); 00135 } 00136 00137 if(count <= message->Count()) 00138 { 00139 // If the number of hosts is less than or equal to the requested number, put all in the reply 00140 reply = new CStreamMessageBootPullResponse(message->Stream(), count); 00141 00142 for(__uint32 index = 0; index < count; index++) 00143 reply->Host(index, chosts[index]); 00144 } 00145 else 00146 { 00147 // If the number of hosts is greater than the requested number, choose a random subset 00148 reply = new CStreamMessageBootPullResponse(message->Stream(), message->Count()); 00149 00150 CShuffle shuffle(count); 00151 00152 for(__uint32 index = 0; index < message->Count(); index++) 00153 reply->Host(index, chosts[shuffle[index]]); 00154 } 00155 00156 delete[] chosts; 00157 00158 // Send the message through the delegate 00159 this->SendMessage(src, reply); 00160 } 00161 00162 void CStreamServerPull::RecvMessageRegister(CAddress src, CStreamMessageBootPullRegister* message) 00163 { 00164 // Register a host 00165 00166 // Get the index of the unicast stream 00167 __uint32 stream = this->info->IndexGlobalToUnicast(message->Stream()); 00168 00169 this->bootstraps[stream].Register(message->Address()); 00170 } 00171 00172 void CStreamServerPull::RecvMessageDeregister(CAddress src, CStreamMessageBootPullDeregister* message) 00173 { 00174 // Deregister a host 00175 00176 // Get the index of the unicast stream 00177 __uint32 stream = this->info->IndexGlobalToUnicast(message->Stream()); 00178 00179 this->bootstraps[stream].Deregister(message->Address()); 00180 } 00181 00182 void CStreamServerPull::RecvMessageBitmapRequest(CAddress src, CStreamMessagePullBitmapRequest* message) 00183 { 00184 // Received bitmap request 00185 00186 // Generate the bitmap : the server always returns complete bitmap with LOW preference and channel bitrate bandwidth 00187 __uint64 bitmap = 0xFFFFFFFFFFFFFFFFLL; 00188 00189 // Send the reply 00190 CStreamMessagePullBitmapResponse* reply = new CStreamMessagePullBitmapResponse( 00191 message->Stream(), 00192 message->First(), 00193 message->Count(), 00194 bitmap, 00195 (*this->delegateLink)()->Bandwidth(0), 00196 (*this->delegateLink)()->MeterUtil(0), 00197 message->Connection(), 00198 message->ConnectionId(), 00199 message->ConnectionIdEntry(), 00200 CStreamMessagePullBitmapResponse::SUCCESS, 00201 CStreamMessagePullBitmapResponse::PREF_LOW 00202 ); 00203 this->SendMessage(src, reply); 00204 } 00205 00206 void CStreamServerPull::RecvMessageSegmentRequest(CAddress src, CStreamMessagePullSegmentRequest* message) 00207 { 00208 // Receiver segment request 00209 00210 // The server always return a successfull reply and sends all requested segments through the receiver 00211 00212 // Get the local connection 00213 CConnectionSender* sender = type_cast<CConnectionSender*>(this->connectionLayer->Get(message->DstId(), message->DstIdEntry())); 00214 assert(sender); 00215 assert(sender->State() >= CConnection::OPENED); 00216 00217 __uint64 success = 0; 00218 __uint64 fail = 0xFFFFFFFFFFFFFFFFLL; 00219 00220 // If the connection is in OPEN state 00221 if(sender->State() == CConnection::OPENED) 00222 { 00223 00224 // Get the receiver 00225 assert(sender->Tag()); 00226 CStreamPullReceiver* receiver = type_cast<CStreamPullReceiver*>(sender->Tag()); 00227 00228 // Verify the receiver 00229 assert(receiver); 00230 assert(receiver->Connection() == sender); 00231 00232 // Get the source 00233 CStreamSource* source = this->info->StreamSource(message->Stream()); 00234 00235 // Send the requested segments through the receiver 00236 for(__uint32 sIndex = 0; sIndex < message->Count(); sIndex++) 00237 { 00238 // If the segment is requested 00239 if((message->Bitmap() >> sIndex) & 1) 00240 { 00241 __uint32 segmentIndex = message->First() + sIndex; 00242 __uint32 frameIndexStart = segmentIndex * this->info->StreamSegmentSize(); 00243 00244 // Send all frames from that segment 00245 for(__uint32 fIndex = 0; fIndex < this->info->StreamSegmentSize(); fIndex++) 00246 { 00247 receiver->Send(source->Fetch(frameIndexStart + fIndex)); 00248 } 00249 } 00250 } 00251 00252 // Set the sucess to all one, fail to zero 00253 success = 0xFFFFFFFFFFFFFFFFLL; 00254 fail = 0LL; 00255 } 00256 00257 // Reply to the request 00258 CStreamMessagePullSegmentResponse* reply = new CStreamMessagePullSegmentResponse( 00259 message->Stream(), 00260 message->DstId(), 00261 message->DstIdEntry(), 00262 message->SrcId(), 00263 message->SrcIdEntry(), 00264 message->First(), 00265 message->Count(), 00266 success, 00267 fail 00268 ); 00269 this->SendMessage(src, reply); 00270 } 00271 00272 void CStreamServerPull::SendMessage(CAddress dst, CStreamMessage* message) 00273 { 00274 (*this->delegateSend)(this->info->PortControl(), this->info->PortControl(), dst, 128, message); 00275 } 00276 00277 void CStreamServerPull::ConnectionRecv(CConnectionReceiver* receiver, CPacket* packet) 00278 { 00279 // Do nothing 00280 } 00281 00282 bool CStreamServerPull::ConnectionAccept(CAddress src, CPacket* packet) 00283 { 00284 // Accept all connections 00285 return true; 00286 } 00287 00288 void CStreamServerPull::ConnectionAccepted(CConnectionSender* sender) 00289 { 00290 // Sender connection accepted : hook connection events 00291 (*sender->EventOpen()) += this->delegateConnectionSenderOpen; 00292 (*sender->EventClose()) += this->delegateConnectionSenderClose; 00293 } 00294 00295 void CStreamServerPull::ConnectionSenderOpen(CConnection* sender, CConnection::EOpenResult result) 00296 { 00297 // New connection opened 00298 00299 // Check this is a sender connection 00300 assert(sender->Type() == CConnection::RESPONDER); 00301 00302 // If the connection has been opened successfully 00303 if(CConnection::OPEN_SUCCESS == result) 00304 { 00305 // Create a new receiver 00306 CStreamPullReceiver* receiver = new CStreamPullReceiver( 00307 this->sim, 00308 type_cast<CConnectionSender*>(sender), 00309 this->info->StreamFrameInterval(), 00310 sender->Id(), 00311 &this->receivers 00312 ); 00313 } 00314 // Else, do nothing 00315 } 00316 00317 void CStreamServerPull::ConnectionSenderClose(CConnection* sender, CConnection::ECloseResult result) 00318 { 00319 // Existing connection closed 00320 assert(sender->Type() == CConnection::RESPONDER); 00321 00322 // If closing is complete 00323 if(CConnection::CLOSE_COMPLETE == result) 00324 { 00325 // If the connection has a tag, delete the tag 00326 if(sender->Tag()) delete sender->Tag(); 00327 } 00328 // Else, do nothing 00329 } 00330 00331 void CStreamServerPull::Finalize() 00332 { 00333 // Call base class finalizer 00334 CStreamServer::Finalize(); 00335 }
Last updated: February 8, 2011