Alex Bikfalvi
SimStream Documentation
SimWorker.cpp
00001 #include "Headers.h" 00002 #include "SimWorker.h" 00003 #include "ExceptionWorker.h" 00004 #include "ExceptionThread.h" 00005 #include "ExceptionSignal.h" 00006 #include "ExceptionMutex.h" 00007 00008 CSimWorker::CSimWorker( 00009 unsigned int id, 00010 unsigned int queueSize 00011 ) 00012 { 00013 this->id = id; 00014 this->queueSize = queueSize; 00015 this->queueCount = 0; 00016 this->queuePtr = 0; 00017 00018 // Create queue 00019 this->queue = new CSimWorkItem*[this->queueSize]; 00020 00021 // Initial state 00022 this->state = WORKER_STATE_STOPPED; 00023 00024 #ifdef WIN32 00025 // Initialize the thread mutex 00026 InitializeCriticalSection(&this->threadMutex); 00027 00028 // Initialize the state signal 00029 if(NULL == (this->stateHandle = CreateEvent( 00030 NULL, // default security attributes 00031 false, // manual reset 00032 false, // initial state (true is signaled) 00033 NULL // event name 00034 ))) throw CExceptionSignal("create state signal failed", __FILE__, __LINE__); 00035 00036 // Initialize the work signal 00037 if(NULL == (this->workHandle = CreateEvent( 00038 NULL, // default security attributes 00039 false, // manual reset 00040 false, // initial state (true is signaled) 00041 NULL // event name 00042 ))) throw CExceptionSignal("create work signal failed", __FILE__, __LINE__); 00043 00044 #elif POSIX 00045 // Initialize the thread mutex 00046 if(pthread_mutex_init(&this->threadMutex, NULL)) throw CExceptionMutex("create thread mutex failed", __FILE__, __LINE__); 00047 00048 // Initialize the state mutex 00049 if(pthread_mutex_init(&this->stateMutex, NULL)) throw CExceptionMutex("create state mutex failed", __FILE__, __LINE__); 00050 // Initialize the state codition 00051 if(pthread_cond_init(&this->stateCond, NULL)) throw CExceptionSignal("create state signal failed", __FILE__, __LINE__); 00052 00053 // Initialize the work mutex 00054 if(pthread_mutex_init(&this->workMutex, NULL)) throw CExceptionMutex("create work mutex failed", __FILE__, __LINE__); 00055 // Initialize the work codition 00056 if(pthread_cond_init(&this->workCond, NULL)) throw CExceptionSignal("create work signal failed", __FILE__, __LINE__); 00057 #endif 00058 } 00059 00060 CSimWorker::~CSimWorker() 00061 { 00062 #ifdef WIN32 00063 // Close the thread mutex 00064 DeleteCriticalSection(&this->threadMutex); 00065 00066 // Close the state signal 00067 if(!CloseHandle(this->stateHandle)) throw CExceptionSignal("close state signal failed", __FILE__, __LINE__); 00068 00069 // Close the work signal 00070 if(!CloseHandle(this->workHandle)) throw CExceptionSignal("close work signal failed", __FILE__, __LINE__); 00071 00072 #elif POSIX 00073 // Close the thread mutex 00074 if(pthread_mutex_destroy(&this->threadMutex)) throw CExceptionMutex("close thread mutex failed", __FILE__, __LINE__); 00075 00076 // Close the state condition 00077 if(pthread_cond_destroy(&this->stateCond)) throw CExceptionSignal("close state signal failed", __FILE__, __LINE__); 00078 // Close the state mutex 00079 if(pthread_mutex_destroy(&this->stateMutex)) throw CExceptionMutex("close state mutex failed", __FILE__, __LINE__); 00080 00081 // Close the work condition 00082 if(pthread_cond_destroy(&this->workCond)) throw CExceptionSignal("close work signal failed", __FILE__, __LINE__); 00083 // Close the work mutex 00084 if(pthread_mutex_destroy(&this->workMutex)) throw CExceptionMutex("close work mutex failed", __FILE__, __LINE__); 00085 #endif 00086 00087 // Delete queue 00088 delete[] this->queue; 00089 } 00090 00091 void CSimWorker::Start() 00092 { 00093 #ifdef WIN32 00094 // Lock the thread mutex 00095 EnterCriticalSection(&this->threadMutex); 00096 try 00097 { 00098 // Check the worker state 00099 if(WORKER_STATE_STOPPED != this->state) throw CExceptionWorker("worker is not stopped", __FILE__, __LINE__, this->id); 00100 00101 // Change the worker state 00102 this->state = WORKER_STATE_STARTING; 00103 00104 // Start thread 00105 if(!(this->threadHandle = CreateThread( 00106 NULL, // default security attributes 00107 0, // default stack size 00108 &CSimWorker::Execute, // thread function address 00109 this, // parameter 00110 0, // creation flags 00111 &this->threadId // thread ID 00112 ))) throw CExceptionThread("create thread failed", __FILE__, __LINE__, this->threadId); 00113 } 00114 catch(...) 00115 { 00116 // Unlock the thread mutex and re-throw exception 00117 LeaveCriticalSection(&this->threadMutex); 00118 throw; 00119 } 00120 // Unlock the thread mutex 00121 LeaveCriticalSection(&this->threadMutex); 00122 00123 // Wait for thread to signal the state 00124 if(WAIT_FAILED == WaitForSingleObject(this->stateHandle, INFINITE)) throw CExceptionSignal("waiting for worker start signal failed", __FILE__, __LINE__); 00125 #elif POSIX 00126 // Lock the thread mutex 00127 if(pthread_mutex_lock(&this->threadMutex)) throw CExceptionMutex("locking thread mutex failed", __FILE__, __LINE__); 00128 // Lock the state mutex 00129 if(pthread_mutex_lock(&this->stateMutex)) throw CExceptionMutex("locking state mutex failed", __FILE__, __LINE__); 00130 try 00131 { 00132 // Check the worker state 00133 if(WORKER_STATE_STOPPED != this->state) throw CExceptionWorker("worker is not stopped", __FILE__, __LINE__, this->id); 00134 00135 // Change the worker state 00136 this->state = WORKER_STATE_STARTING; 00137 00138 if(pthread_create( 00139 &this->threadHandle, // thread handle 00140 NULL, // default thread attribute 00141 &CSimWorker::Execute, // thread function address 00142 (void*)this // parameter 00143 )) throw CExceptionThread("create thread failed", __FILE__, __LINE__, this->threadHandle); 00144 } 00145 catch(...) 00146 { 00147 // Unlock the thread mutex and re-throw exception 00148 if(pthread_mutex_unlock(&this->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00149 throw; 00150 } 00151 // Unlock the thread mutex 00152 if(pthread_mutex_unlock(&this->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00153 00154 // Wait for thread to signal the state 00155 if(pthread_cond_wait(&this->stateCond, &this->stateMutex)) throw CExceptionSignal("waiting for worker start signal failed", __FILE__, __LINE__); 00156 #endif 00157 } 00158 00159 void CSimWorker::Stop() 00160 { 00161 #ifdef WIN32 00162 // Lock the thread mutex 00163 EnterCriticalSection(&this->threadMutex); 00164 try 00165 { 00166 // Check the worker state 00167 if((WORKER_STATE_STARTED_IDLE != this->state) && (WORKER_STATE_STARTED_BUSY != this->state)) 00168 throw CExceptionWorker("worker is not started", __FILE__, __LINE__, this->id); 00169 00170 // Save the old worker state 00171 EWorkerState oldState = this->state; 00172 00173 // Change the worker state 00174 this->state = WORKER_STATE_STOPPING; 00175 00176 // Send work change signal 00177 if(!SetEvent(this->workHandle)) throw CExceptionSignal("set work signal to signaled state failed", __FILE__, __LINE__); 00178 } 00179 catch(...) 00180 { 00181 // Unlock the thread mutex and re-throw exception 00182 LeaveCriticalSection(&this->threadMutex); 00183 throw; 00184 } 00185 // Unlock the thread mutex 00186 LeaveCriticalSection(&this->threadMutex); 00187 00188 // Wait for thread to complete 00189 if(WAIT_FAILED == WaitForSingleObject(this->threadHandle, INFINITE)) throw CExceptionSignal("waiting for worker stop signal failed", __FILE__, __LINE__); 00190 00191 // Close the thread handle 00192 if(!CloseHandle(this->threadHandle)) throw CExceptionThread("close thread failed", __FILE__, __LINE__, this->threadId); 00193 00194 #elif POSIX 00195 // Lock the thread mutex 00196 if(pthread_mutex_lock(&this->threadMutex)) throw CExceptionMutex("locking thread mutex failed", __FILE__, __LINE__); 00197 try 00198 { 00199 // Check the worker state 00200 if((WORKER_STATE_STARTED_IDLE != this->state) && (WORKER_STATE_STARTED_BUSY != this->state)) 00201 throw CExceptionWorker("worker is not started", __FILE__, __LINE__, this->id); 00202 00203 // Change the worker state 00204 this->state = WORKER_STATE_STOPPING; 00205 00206 // If can acquire a lock on the work mutex 00207 if(!pthread_mutex_trylock(&this->workMutex)) 00208 { 00209 try 00210 { 00211 // Send work signal 00212 if(pthread_cond_signal(&this->workCond)) throw CExceptionSignal("set work signal to signaled state failed", __FILE__, __LINE__); 00213 } 00214 catch(...) 00215 { 00216 // Unlock the work mutex and re-throw the exception 00217 if(pthread_mutex_unlock(&this->workMutex)) throw CExceptionMutex("unlocking work mutex failed", __FILE__, __LINE__); 00218 throw; 00219 } 00220 // Unlock the work mutex 00221 if(pthread_mutex_unlock(&this->workMutex)) throw CExceptionMutex("unlocking work mutex failed", __FILE__, __LINE__); 00222 } 00223 } 00224 catch(...) 00225 { 00226 // Unlock the thread mutex and re-throw exception 00227 if(pthread_mutex_unlock(&this->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00228 throw; 00229 } 00230 // Unlock the thread mutex 00231 if(pthread_mutex_unlock(&this->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00232 00233 // Wait for thread to complete 00234 if(pthread_join(this->threadHandle, NULL)) throw CExceptionSignal("waiting for worker stop signal failed", __FILE__, __LINE__); 00235 // Unlock the state mutex 00236 if(pthread_mutex_unlock(&this->stateMutex)) throw CExceptionMutex("unlocking the state mutex failed", __FILE__, __LINE__); 00237 #endif 00238 } 00239 00240 void CSimWorker::Enqueue(CSimWorkItem* item) 00241 { 00242 #ifdef WIN32 00243 // Synchronize access to shared variables 00244 EnterCriticalSection(&this->threadMutex); 00245 try 00246 { 00247 // Check the worker state 00248 if((WORKER_STATE_STARTED_IDLE != this->state) && (WORKER_STATE_STARTED_BUSY != this->state)) throw CExceptionWorker("cannot enqueue item; worker to started", __FILE__, __LINE__, this->id); 00249 00250 // Check the worker queue is not empty 00251 if(this->queueCount >= this->queueSize) throw CExceptionWorker("worker queue is full", __FILE__, __LINE__, this->id); 00252 00253 // Add the work item to the queue 00254 this->queue[(this->queuePtr + this->queueCount) % this->queueSize] = item; 00255 this->queueCount++; 00256 00257 // If the worker is in idle, send work change signal 00258 if(WORKER_STATE_STARTED_IDLE == this->state) 00259 if(!SetEvent(this->workHandle)) throw CExceptionSignal("set work signal to signaled state failed", __FILE__, __LINE__); 00260 } 00261 catch(...) 00262 { 00263 LeaveCriticalSection(&this->threadMutex); 00264 throw; 00265 } 00266 LeaveCriticalSection(&this->threadMutex); 00267 #elif POSIX 00268 // Synchronize access to shared variables 00269 if(pthread_mutex_lock(&this->threadMutex)) throw CExceptionMutex("locking thread mutex failed", __FILE__, __LINE__); 00270 try 00271 { 00272 // Check the worker state 00273 if((WORKER_STATE_STARTED_IDLE != this->state) && (WORKER_STATE_STARTED_BUSY != this->state)) throw CExceptionWorker("cannot enqueue item; worker to started", __FILE__, __LINE__, this->id); 00274 00275 // Check the worker queue is not empty 00276 if(this->queueCount >= this->queueSize) throw CExceptionWorker("worker queue is full", __FILE__, __LINE__, this->id); 00277 00278 // Add the work item to the queue 00279 this->queue[(this->queuePtr + this->queueCount) % this->queueSize] = item; 00280 this->queueCount++; 00281 00282 // If can acquire a lock on the work mutex 00283 if(!pthread_mutex_trylock(&this->workMutex)) 00284 { 00285 try 00286 { 00287 // Send work signal 00288 if(pthread_cond_signal(&this->workCond)) throw CExceptionSignal("set work signal to signaled state failed", __FILE__, __LINE__); 00289 } 00290 catch(...) 00291 { 00292 // Unlock the work mutex and re-throw the exception 00293 if(pthread_mutex_unlock(&this->workMutex)) throw CExceptionMutex("unlocking work mutex failed", __FILE__, __LINE__); 00294 throw; 00295 } 00296 // Unlock the work mutex 00297 if(pthread_mutex_unlock(&this->workMutex)) throw CExceptionMutex("unlocking work mutex failed", __FILE__, __LINE__); 00298 } 00299 } 00300 catch(...) 00301 { 00302 if(pthread_mutex_unlock(&this->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00303 throw; 00304 } 00305 if(pthread_mutex_unlock(&this->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00306 #endif 00307 } 00308 00309 #ifdef WIN32 00310 DWORD CSimWorker::Execute(__in LPVOID parameter) 00311 { 00312 // Get the object 00313 CSimWorker* worker = (CSimWorker*)(parameter); 00314 00315 // Synchronize access to shared variables 00316 EnterCriticalSection(&worker->threadMutex); 00317 try 00318 { 00319 // Change the state 00320 worker->state = WORKER_STATE_STARTED_IDLE; 00321 00322 // Send state change signal 00323 if(!SetEvent(worker->stateHandle)) throw CExceptionSignal("set state signal to signaled state failed", __FILE__, __LINE__); 00324 } 00325 catch(...) 00326 { 00327 LeaveCriticalSection(&worker->threadMutex); 00328 throw; 00329 } 00330 LeaveCriticalSection(&worker->threadMutex); 00331 00332 // Worker execution loop 00333 do 00334 { 00335 // Wait for a work signal 00336 if(WAIT_FAILED == WaitForSingleObject(worker->workHandle, INFINITE)) throw CExceptionSignal("waiting for work signal failed", __FILE__, __LINE__); 00337 00338 // Local copy of the work item 00339 CSimWorkItem* item = NULL; 00340 00341 do 00342 { 00343 // Synchronize access to shared variables 00344 EnterCriticalSection(&worker->threadMutex); 00345 try 00346 { 00347 // If the worker is not stopping and the worker queue is not empty 00348 if(WORKER_STATE_STOPPING != worker->state) 00349 { 00350 if(worker->queueCount) 00351 { 00352 // Pop the oldest work item from the queue 00353 item = worker->queue[worker->queuePtr]; 00354 00355 // Shift the end of the queue 00356 worker->queuePtr = (++worker->queuePtr) % worker->queueSize; 00357 worker->queueCount--; 00358 00359 // Set worker state to busy 00360 worker->state = WORKER_STATE_STARTED_BUSY; 00361 } 00362 else 00363 { 00364 // Set item to null 00365 item = NULL; 00366 00367 // Set worker state to idle 00368 worker->state = WORKER_STATE_STARTED_IDLE; 00369 } 00370 } 00371 } 00372 catch(...) 00373 { 00374 LeaveCriticalSection(&worker->threadMutex); 00375 throw; 00376 } 00377 LeaveCriticalSection(&worker->threadMutex); 00378 00379 // If the work item is not null 00380 if(item && (WORKER_STATE_STOPPING != worker->state)) 00381 { 00382 try 00383 { 00384 // Execute the work item 00385 item->Execute(); 00386 00387 // Signal work item completion : success 00388 item->Signal(CSimWorkItem::COMPLETED_SUCCESS); 00389 } 00390 catch(...) 00391 { 00392 // Signal work item completion : fail 00393 item->Signal(CSimWorkItem::COMPLETED_FAIL); 00394 } 00395 } 00396 } 00397 while(item && (WORKER_STATE_STOPPING != worker->state)); 00398 00399 } 00400 while(WORKER_STATE_STOPPING != worker->state); 00401 00402 // Synchronize access to shared variables 00403 EnterCriticalSection(&worker->threadMutex); 00404 try 00405 { 00406 // Change worker state 00407 worker->state = WORKER_STATE_STOPPED; 00408 00409 // Signal all work items remaining in the queue 00410 for(; worker->queueCount; worker->queueCount--) 00411 { 00412 // Pop item from the queue 00413 CSimWorkItem* item = worker->queue[worker->queuePtr]; 00414 00415 // Shift the end of the queue 00416 worker->queuePtr = (++worker->queuePtr) % worker->queueSize; 00417 00418 // Signal work item completion : pending 00419 item->Signal(CSimWorkItem::PENDING); 00420 } 00421 } 00422 catch(...) 00423 { 00424 LeaveCriticalSection(&worker->threadMutex); 00425 throw; 00426 } 00427 LeaveCriticalSection(&worker->threadMutex); 00428 00429 return 0; 00430 } 00431 #elif POSIX 00432 void* CSimWorker::Execute(void* parameter) 00433 { 00434 // Get the object 00435 CSimWorker* worker = (CSimWorker*)(parameter); 00436 00437 // Lock the work mutex 00438 if(pthread_mutex_lock(&worker->workMutex)) throw CExceptionMutex("locking work mutex failed", __FILE__, __LINE__); 00439 00440 try 00441 { 00442 // Synchronize access to shared variables 00443 if(pthread_mutex_lock(&worker->threadMutex)) throw CExceptionMutex("locking thread mutex failed", __FILE__, __LINE__); 00444 try 00445 { 00446 // Change the state 00447 worker->state = WORKER_STATE_STARTED_IDLE; 00448 00449 // Send state change signal 00450 00451 // Lock the state mutex 00452 if(pthread_mutex_lock(&worker->stateMutex)) throw CExceptionMutex("locking state mutex failed", __FILE__, __LINE__); 00453 try 00454 { 00455 // Send signal 00456 if(pthread_cond_signal(&worker->stateCond)) throw CExceptionSignal("set state signal to signaled state failed", __FILE__, __LINE__); 00457 } 00458 catch(...) 00459 { 00460 // Unlock the state mutex 00461 if(pthread_mutex_unlock(&worker->stateMutex)) throw CExceptionMutex("unlocking state mutex failed", __FILE__, __LINE__); 00462 throw; 00463 } 00464 if(pthread_mutex_unlock(&worker->stateMutex)) throw CExceptionMutex("unlocking state mutex failed", __FILE__, __LINE__); 00465 } 00466 catch(...) 00467 { 00468 if(pthread_mutex_unlock(&worker->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00469 throw; 00470 } 00471 if(pthread_mutex_unlock(&worker->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00472 00473 // Worker execution loop 00474 do 00475 { 00476 // Wait for a work signal 00477 if(pthread_cond_wait(&worker->workCond, &worker->workMutex)) throw CExceptionSignal("waiting for work signal failed", __FILE__, __LINE__); 00478 00479 // Local copy of the work item 00480 CSimWorkItem* item = NULL; 00481 00482 do 00483 { 00484 // Synchronize access to shared variables 00485 if(pthread_mutex_lock(&worker->threadMutex)) throw CExceptionMutex("locking thread mutex failed", __FILE__, __LINE__); 00486 try 00487 { 00488 // If the worker is not stopping and the worker queue is not empty 00489 if(WORKER_STATE_STOPPING != worker->state) 00490 { 00491 if(worker->queueCount) 00492 { 00493 // Pop the oldest work item from the queue 00494 item = worker->queue[worker->queuePtr]; 00495 00496 // Shift the end of the queue 00497 worker->queuePtr = (++worker->queuePtr) % worker->queueSize; 00498 worker->queueCount--; 00499 00500 // Set worker state to busy 00501 worker->state = WORKER_STATE_STARTED_BUSY; 00502 } 00503 else 00504 { 00505 // Set item to null 00506 item = NULL; 00507 00508 // Set worker state to idle 00509 worker->state = WORKER_STATE_STARTED_IDLE; 00510 } 00511 } 00512 } 00513 catch(...) 00514 { 00515 if(pthread_mutex_unlock(&worker->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00516 throw; 00517 } 00518 if(pthread_mutex_unlock(&worker->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00519 00520 // If the work item is not null 00521 if(item && (WORKER_STATE_STOPPING != worker->state)) 00522 { 00523 try 00524 { 00525 // Execute the work item 00526 item->Execute(); 00527 00528 // Signal work item completion : success 00529 item->Signal(CSimWorkItem::COMPLETED_SUCCESS); 00530 } 00531 catch(...) 00532 { 00533 // Signal work item completion : fail 00534 item->Signal(CSimWorkItem::COMPLETED_FAIL); 00535 } 00536 } 00537 } 00538 while(item && (WORKER_STATE_STOPPING != worker->state)); 00539 00540 } 00541 while(WORKER_STATE_STOPPING != worker->state); 00542 00543 // Synchronize access to shared variables 00544 if(pthread_mutex_lock(&worker->threadMutex)) throw CExceptionMutex("locking thread mutex failed", __FILE__, __LINE__); 00545 try 00546 { 00547 // Change worker state 00548 worker->state = WORKER_STATE_STOPPED; 00549 00550 // Signal all work items remaining in the queue 00551 for(; worker->queueCount; worker->queueCount--) 00552 { 00553 // Pop item from the queue 00554 CSimWorkItem* item = worker->queue[worker->queuePtr]; 00555 00556 // Shift the end of the queue 00557 worker->queuePtr = (++worker->queuePtr) % worker->queueSize; 00558 00559 // Signal work item completion : pending 00560 item->Signal(CSimWorkItem::PENDING); 00561 } 00562 } 00563 catch(...) 00564 { 00565 if(pthread_mutex_unlock(&worker->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00566 throw; 00567 } 00568 if(pthread_mutex_unlock(&worker->threadMutex)) throw CExceptionMutex("unlocking thread mutex failed", __FILE__, __LINE__); 00569 } 00570 catch(...) 00571 { 00572 // Unlock the work mutex and re-throw exception 00573 if(pthread_mutex_unlock(&worker->workMutex)) throw CExceptionMutex("unlocking work mutex failed", __FILE__, __LINE__); 00574 throw; 00575 } 00576 // Unlock the work mutex 00577 if(pthread_mutex_unlock(&worker->workMutex)) throw CExceptionMutex("unlocking work mutex failed", __FILE__, __LINE__); 00578 00579 return 0; 00580 } 00581 #endif
Last updated: February 8, 2011