ThreadPool.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. /*
  2. * Copyright (c) 2014, Oculus VR, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under the BSD-style license found in the
  6. * LICENSE file in the root directory of this source tree. An additional grant
  7. * of patent rights can be found in the PATENTS file in the same directory.
  8. *
  9. */
  10. #ifndef __THREAD_POOL_H
  11. #define __THREAD_POOL_H
  12. #include "RakMemoryOverride.h"
  13. #include "DS_Queue.h"
  14. #include "SimpleMutex.h"
  15. #include "Export.h"
  16. #include "RakThread.h"
  17. #include "SignaledEvent.h"
  18. #ifdef _MSC_VER
  19. #pragma warning( push )
  20. #endif
  21. class ThreadDataInterface
  22. {
  23. public:
  24. ThreadDataInterface() {}
  25. virtual ~ThreadDataInterface() {}
  26. virtual void* PerThreadFactory(void *context)=0;
  27. virtual void PerThreadDestructor(void* factoryResult, void *context)=0;
  28. };
  29. /// A simple class to create worker threads that processes a queue of functions with data.
  30. /// This class does not allocate or deallocate memory. It is up to the user to handle memory management.
  31. /// InputType and OutputType are stored directly in a queue. For large structures, if you plan to delete from the middle of the queue,
  32. /// you might wish to store pointers rather than the structures themselves so the array can shift efficiently.
  33. template <class InputType, class OutputType>
  34. struct RAK_DLL_EXPORT ThreadPool
  35. {
  36. ThreadPool();
  37. ~ThreadPool();
  38. /// Start the specified number of threads.
  39. /// \param[in] numThreads The number of threads to start
  40. /// \param[in] stackSize 0 for default (except on consoles).
  41. /// \param[in] _perThreadInit User callback to return data stored per thread. Pass 0 if not needed.
  42. /// \param[in] _perThreadDeinit User callback to destroy data stored per thread, created by _perThreadInit. Pass 0 if not needed.
  43. /// \return True on success, false on failure.
  44. bool StartThreads(int numThreads, int stackSize, void* (*_perThreadInit)()=0, void (*_perThreadDeinit)(void*)=0);
  45. // Alternate form of _perThreadDataFactory, _perThreadDataDestructor
  46. void SetThreadDataInterface(ThreadDataInterface *tdi, void *context);
  47. /// Stops all threads
  48. void StopThreads(void);
  49. /// Adds a function to a queue with data to pass to that function. This function will be called from the thread
  50. /// Memory management is your responsibility! This class does not allocate or deallocate memory.
  51. /// The best way to deallocate \a inputData is in userCallback. If you call EndThreads such that callbacks were not called, you
  52. /// can iterate through the inputQueue and deallocate all pending input data there
  53. /// The best way to deallocate output is as it is returned to you from GetOutput. Similarly, if you end the threads such that
  54. /// not all output was returned, you can iterate through outputQueue and deallocate it there.
  55. /// \param[in] workerThreadCallback The function to call from the thread
  56. /// \param[in] inputData The parameter to pass to \a userCallback
  57. void AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData);
  58. /// Adds to the output queue
  59. /// Use it if you want to inject output into the same queue that the system uses. Normally you would not use this. Consider it a convenience function.
  60. /// \param[in] outputData The output to inject
  61. void AddOutput(OutputType outputData);
  62. /// Returns true if output from GetOutput is waiting.
  63. /// \return true if output is waiting, false otherwise
  64. bool HasOutput(void);
  65. /// Inaccurate but fast version of HasOutput. If this returns true, you should still check HasOutput for the real value.
  66. /// \return true if output is probably waiting, false otherwise
  67. bool HasOutputFast(void);
  68. /// Returns true if input from GetInput is waiting.
  69. /// \return true if input is waiting, false otherwise
  70. bool HasInput(void);
  71. /// Inaccurate but fast version of HasInput. If this returns true, you should still check HasInput for the real value.
  72. /// \return true if input is probably waiting, false otherwise
  73. bool HasInputFast(void);
  74. /// Gets the output of a call to \a userCallback
  75. /// HasOutput must return true before you call this function. Otherwise it will assert.
  76. /// \return The output of \a userCallback. If you have different output signatures, it is up to you to encode the data to indicate this
  77. OutputType GetOutput(void);
  78. /// Clears internal buffers
  79. void Clear(void);
  80. /// Lock the input buffer before calling the functions InputSize, InputAtIndex, and RemoveInputAtIndex
  81. /// It is only necessary to lock the input or output while the threads are running
  82. void LockInput(void);
  83. /// Unlock the input buffer after you are done with the functions InputSize, GetInputAtIndex, and RemoveInputAtIndex
  84. void UnlockInput(void);
  85. /// Length of the input queue
  86. unsigned InputSize(void);
  87. /// Get the input at a specified index
  88. InputType GetInputAtIndex(unsigned index);
  89. /// Remove input from a specific index. This does NOT do memory deallocation - it only removes the item from the queue
  90. void RemoveInputAtIndex(unsigned index);
  91. /// Lock the output buffer before calling the functions OutputSize, OutputAtIndex, and RemoveOutputAtIndex
  92. /// It is only necessary to lock the input or output while the threads are running
  93. void LockOutput(void);
  94. /// Unlock the output buffer after you are done with the functions OutputSize, GetOutputAtIndex, and RemoveOutputAtIndex
  95. void UnlockOutput(void);
  96. /// Length of the output queue
  97. unsigned OutputSize(void);
  98. /// Get the output at a specified index
  99. OutputType GetOutputAtIndex(unsigned index);
  100. /// Remove output from a specific index. This does NOT do memory deallocation - it only removes the item from the queue
  101. void RemoveOutputAtIndex(unsigned index);
  102. /// Removes all items from the input queue
  103. void ClearInput(void);
  104. /// Removes all items from the output queue
  105. void ClearOutput(void);
  106. /// Are any of the threads working, or is input or output available?
  107. bool IsWorking(void);
  108. /// The number of currently active threads.
  109. int NumThreadsWorking(void);
  110. /// Did we call Start?
  111. bool WasStarted(void);
  112. // Block until all threads are stopped.
  113. bool Pause(void);
  114. // Continue running
  115. void Resume(void);
  116. protected:
  117. // It is valid to cancel input before it is processed. To do so, lock the inputQueue with inputQueueMutex,
  118. // Scan the list, and remove the item you don't want.
  119. RakNet::SimpleMutex inputQueueMutex, outputQueueMutex, workingThreadCountMutex, runThreadsMutex;
  120. void* (*perThreadDataFactory)();
  121. void (*perThreadDataDestructor)(void*);
  122. // inputFunctionQueue & inputQueue are paired arrays so if you delete from one at a particular index you must delete from the other
  123. // at the same index
  124. DataStructures::Queue<OutputType (*)(InputType, bool *, void*)> inputFunctionQueue;
  125. DataStructures::Queue<InputType> inputQueue;
  126. DataStructures::Queue<OutputType> outputQueue;
  127. ThreadDataInterface *threadDataInterface;
  128. void *tdiContext;
  129. template <class ThreadInputType, class ThreadOutputType>
  130. friend RAK_THREAD_DECLARATION(WorkerThread);
  131. /*
  132. #ifdef _WIN32
  133. friend unsigned __stdcall WorkerThread( LPVOID arguments );
  134. #else
  135. friend void* WorkerThread( void* arguments );
  136. #endif
  137. */
  138. /// \internal
  139. bool runThreads;
  140. /// \internal
  141. int numThreadsRunning;
  142. /// \internal
  143. int numThreadsWorking;
  144. /// \internal
  145. RakNet::SimpleMutex numThreadsRunningMutex;
  146. RakNet::SignaledEvent quitAndIncomingDataEvents;
  147. // #if defined(SN_TARGET_PSP2)
  148. // RakNet::RakThread::UltUlThreadRuntime *runtime;
  149. // #endif
  150. };
  151. #include "ThreadPool.h"
  152. #include "RakSleep.h"
  153. #ifdef _WIN32
  154. #else
  155. #include <unistd.h>
  156. #endif
  157. #ifdef _MSC_VER
  158. #pragma warning(disable:4127)
  159. #pragma warning( disable : 4701 ) // potentially uninitialized local variable 'inputData' used
  160. #endif
  161. template <class ThreadInputType, class ThreadOutputType>
  162. RAK_THREAD_DECLARATION(WorkerThread)
  163. /*
  164. #ifdef _WIN32
  165. unsigned __stdcall WorkerThread( LPVOID arguments )
  166. #else
  167. void* WorkerThread( void* arguments )
  168. #endif
  169. */
  170. {
  171. ThreadPool<ThreadInputType, ThreadOutputType> *threadPool = (ThreadPool<ThreadInputType, ThreadOutputType>*) arguments;
  172. bool returnOutput;
  173. ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*);
  174. ThreadInputType inputData;
  175. ThreadOutputType callbackOutput;
  176. userCallback=0;
  177. void *perThreadData;
  178. if (threadPool->perThreadDataFactory)
  179. perThreadData=threadPool->perThreadDataFactory();
  180. else if (threadPool->threadDataInterface)
  181. perThreadData=threadPool->threadDataInterface->PerThreadFactory(threadPool->tdiContext);
  182. else
  183. perThreadData=0;
  184. // Increase numThreadsRunning
  185. threadPool->numThreadsRunningMutex.Lock();
  186. ++threadPool->numThreadsRunning;
  187. threadPool->numThreadsRunningMutex.Unlock();
  188. while (1)
  189. {
  190. //#ifdef _WIN32
  191. if (userCallback==0)
  192. {
  193. threadPool->quitAndIncomingDataEvents.WaitOnEvent(1000);
  194. }
  195. // #else
  196. // if (userCallback==0)
  197. // RakSleep(30);
  198. // #endif
  199. threadPool->runThreadsMutex.Lock();
  200. if (threadPool->runThreads==false)
  201. {
  202. threadPool->runThreadsMutex.Unlock();
  203. break;
  204. }
  205. threadPool->runThreadsMutex.Unlock();
  206. threadPool->workingThreadCountMutex.Lock();
  207. ++threadPool->numThreadsWorking;
  208. threadPool->workingThreadCountMutex.Unlock();
  209. // Read input data
  210. userCallback=0;
  211. threadPool->inputQueueMutex.Lock();
  212. if (threadPool->inputFunctionQueue.Size())
  213. {
  214. userCallback=threadPool->inputFunctionQueue.Pop();
  215. inputData=threadPool->inputQueue.Pop();
  216. }
  217. threadPool->inputQueueMutex.Unlock();
  218. if (userCallback)
  219. {
  220. callbackOutput=userCallback(inputData, &returnOutput,perThreadData);
  221. if (returnOutput)
  222. {
  223. threadPool->outputQueueMutex.Lock();
  224. threadPool->outputQueue.Push(callbackOutput, _FILE_AND_LINE_ );
  225. threadPool->outputQueueMutex.Unlock();
  226. }
  227. }
  228. threadPool->workingThreadCountMutex.Lock();
  229. --threadPool->numThreadsWorking;
  230. threadPool->workingThreadCountMutex.Unlock();
  231. }
  232. // Decrease numThreadsRunning
  233. threadPool->numThreadsRunningMutex.Lock();
  234. --threadPool->numThreadsRunning;
  235. threadPool->numThreadsRunningMutex.Unlock();
  236. if (threadPool->perThreadDataDestructor)
  237. threadPool->perThreadDataDestructor(perThreadData);
  238. else if (threadPool->threadDataInterface)
  239. threadPool->threadDataInterface->PerThreadDestructor(perThreadData, threadPool->tdiContext);
  240. return 0;
  241. }
  242. template <class InputType, class OutputType>
  243. ThreadPool<InputType, OutputType>::ThreadPool()
  244. {
  245. runThreads=false;
  246. numThreadsRunning=0;
  247. threadDataInterface=0;
  248. tdiContext=0;
  249. numThreadsWorking=0;
  250. }
  251. template <class InputType, class OutputType>
  252. ThreadPool<InputType, OutputType>::~ThreadPool()
  253. {
  254. StopThreads();
  255. Clear();
  256. }
  257. template <class InputType, class OutputType>
  258. bool ThreadPool<InputType, OutputType>::StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)(), void (*_perThreadDataDestructor)(void *))
  259. {
  260. (void) stackSize;
  261. // #if defined(SN_TARGET_PSP2)
  262. // runtime = RakNet::RakThread::AllocRuntime(numThreads);
  263. // #endif
  264. runThreadsMutex.Lock();
  265. if (runThreads==true)
  266. {
  267. // Already running
  268. runThreadsMutex.Unlock();
  269. return false;
  270. }
  271. runThreadsMutex.Unlock();
  272. quitAndIncomingDataEvents.InitEvent();
  273. perThreadDataFactory=_perThreadDataFactory;
  274. perThreadDataDestructor=_perThreadDataDestructor;
  275. runThreadsMutex.Lock();
  276. runThreads=true;
  277. runThreadsMutex.Unlock();
  278. numThreadsWorking=0;
  279. unsigned threadId = 0;
  280. (void) threadId;
  281. int i;
  282. for (i=0; i < numThreads; i++)
  283. {
  284. int errorCode;
  285. errorCode = RakNet::RakThread::Create(WorkerThread<InputType, OutputType>, this);
  286. if (errorCode!=0)
  287. {
  288. StopThreads();
  289. return false;
  290. }
  291. }
  292. // Wait for number of threads running to increase to numThreads
  293. bool done=false;
  294. while (done==false)
  295. {
  296. RakSleep(50);
  297. numThreadsRunningMutex.Lock();
  298. if (numThreadsRunning==numThreads)
  299. done=true;
  300. numThreadsRunningMutex.Unlock();
  301. }
  302. return true;
  303. }
  304. template <class InputType, class OutputType>
  305. void ThreadPool<InputType, OutputType>::SetThreadDataInterface(ThreadDataInterface *tdi, void *context)
  306. {
  307. threadDataInterface=tdi;
  308. tdiContext=context;
  309. }
  310. template <class InputType, class OutputType>
  311. void ThreadPool<InputType, OutputType>::StopThreads(void)
  312. {
  313. runThreadsMutex.Lock();
  314. if (runThreads==false)
  315. {
  316. runThreadsMutex.Unlock();
  317. return;
  318. }
  319. runThreads=false;
  320. runThreadsMutex.Unlock();
  321. // Wait for number of threads running to decrease to 0
  322. bool done=false;
  323. while (done==false)
  324. {
  325. quitAndIncomingDataEvents.SetEvent();
  326. RakSleep(50);
  327. numThreadsRunningMutex.Lock();
  328. if (numThreadsRunning==0)
  329. done=true;
  330. numThreadsRunningMutex.Unlock();
  331. }
  332. quitAndIncomingDataEvents.CloseEvent();
  333. // #if defined(SN_TARGET_PSP2)
  334. // RakNet::RakThread::DeallocRuntime(runtime);
  335. // runtime=0;
  336. // #endif
  337. }
  338. template <class InputType, class OutputType>
  339. void ThreadPool<InputType, OutputType>::AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData)
  340. {
  341. inputQueueMutex.Lock();
  342. inputQueue.Push(inputData, _FILE_AND_LINE_ );
  343. inputFunctionQueue.Push(workerThreadCallback, _FILE_AND_LINE_ );
  344. inputQueueMutex.Unlock();
  345. quitAndIncomingDataEvents.SetEvent();
  346. }
  347. template <class InputType, class OutputType>
  348. void ThreadPool<InputType, OutputType>::AddOutput(OutputType outputData)
  349. {
  350. outputQueueMutex.Lock();
  351. outputQueue.Push(outputData, _FILE_AND_LINE_ );
  352. outputQueueMutex.Unlock();
  353. }
  354. template <class InputType, class OutputType>
  355. bool ThreadPool<InputType, OutputType>::HasOutputFast(void)
  356. {
  357. return outputQueue.IsEmpty()==false;
  358. }
  359. template <class InputType, class OutputType>
  360. bool ThreadPool<InputType, OutputType>::HasOutput(void)
  361. {
  362. bool res;
  363. outputQueueMutex.Lock();
  364. res=outputQueue.IsEmpty()==false;
  365. outputQueueMutex.Unlock();
  366. return res;
  367. }
  368. template <class InputType, class OutputType>
  369. bool ThreadPool<InputType, OutputType>::HasInputFast(void)
  370. {
  371. return inputQueue.IsEmpty()==false;
  372. }
  373. template <class InputType, class OutputType>
  374. bool ThreadPool<InputType, OutputType>::HasInput(void)
  375. {
  376. bool res;
  377. inputQueueMutex.Lock();
  378. res=inputQueue.IsEmpty()==false;
  379. inputQueueMutex.Unlock();
  380. return res;
  381. }
  382. template <class InputType, class OutputType>
  383. OutputType ThreadPool<InputType, OutputType>::GetOutput(void)
  384. {
  385. // Real output check
  386. OutputType output;
  387. outputQueueMutex.Lock();
  388. output=outputQueue.Pop();
  389. outputQueueMutex.Unlock();
  390. return output;
  391. }
  392. template <class InputType, class OutputType>
  393. void ThreadPool<InputType, OutputType>::Clear(void)
  394. {
  395. runThreadsMutex.Lock();
  396. if (runThreads)
  397. {
  398. runThreadsMutex.Unlock();
  399. inputQueueMutex.Lock();
  400. inputFunctionQueue.Clear(_FILE_AND_LINE_);
  401. inputQueue.Clear(_FILE_AND_LINE_);
  402. inputQueueMutex.Unlock();
  403. outputQueueMutex.Lock();
  404. outputQueue.Clear(_FILE_AND_LINE_);
  405. outputQueueMutex.Unlock();
  406. }
  407. else
  408. {
  409. inputFunctionQueue.Clear(_FILE_AND_LINE_);
  410. inputQueue.Clear(_FILE_AND_LINE_);
  411. outputQueue.Clear(_FILE_AND_LINE_);
  412. }
  413. }
  414. template <class InputType, class OutputType>
  415. void ThreadPool<InputType, OutputType>::LockInput(void)
  416. {
  417. inputQueueMutex.Lock();
  418. }
  419. template <class InputType, class OutputType>
  420. void ThreadPool<InputType, OutputType>::UnlockInput(void)
  421. {
  422. inputQueueMutex.Unlock();
  423. }
  424. template <class InputType, class OutputType>
  425. unsigned ThreadPool<InputType, OutputType>::InputSize(void)
  426. {
  427. return inputQueue.Size();
  428. }
  429. template <class InputType, class OutputType>
  430. InputType ThreadPool<InputType, OutputType>::GetInputAtIndex(unsigned index)
  431. {
  432. return inputQueue[index];
  433. }
  434. template <class InputType, class OutputType>
  435. void ThreadPool<InputType, OutputType>::RemoveInputAtIndex(unsigned index)
  436. {
  437. inputQueue.RemoveAtIndex(index);
  438. inputFunctionQueue.RemoveAtIndex(index);
  439. }
  440. template <class InputType, class OutputType>
  441. void ThreadPool<InputType, OutputType>::LockOutput(void)
  442. {
  443. outputQueueMutex.Lock();
  444. }
  445. template <class InputType, class OutputType>
  446. void ThreadPool<InputType, OutputType>::UnlockOutput(void)
  447. {
  448. outputQueueMutex.Unlock();
  449. }
  450. template <class InputType, class OutputType>
  451. unsigned ThreadPool<InputType, OutputType>::OutputSize(void)
  452. {
  453. return outputQueue.Size();
  454. }
  455. template <class InputType, class OutputType>
  456. OutputType ThreadPool<InputType, OutputType>::GetOutputAtIndex(unsigned index)
  457. {
  458. return outputQueue[index];
  459. }
  460. template <class InputType, class OutputType>
  461. void ThreadPool<InputType, OutputType>::RemoveOutputAtIndex(unsigned index)
  462. {
  463. outputQueue.RemoveAtIndex(index);
  464. }
  465. template <class InputType, class OutputType>
  466. void ThreadPool<InputType, OutputType>::ClearInput(void)
  467. {
  468. inputQueue.Clear(_FILE_AND_LINE_);
  469. inputFunctionQueue.Clear(_FILE_AND_LINE_);
  470. }
  471. template <class InputType, class OutputType>
  472. void ThreadPool<InputType, OutputType>::ClearOutput(void)
  473. {
  474. outputQueue.Clear(_FILE_AND_LINE_);
  475. }
  476. template <class InputType, class OutputType>
  477. bool ThreadPool<InputType, OutputType>::IsWorking(void)
  478. {
  479. bool isWorking;
  480. // workingThreadCountMutex.Lock();
  481. // isWorking=numThreadsWorking!=0;
  482. // workingThreadCountMutex.Unlock();
  483. // if (isWorking)
  484. // return true;
  485. // Bug fix: Originally the order of these two was reversed.
  486. // It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks
  487. // here and sees there is no data. So it thinks the thread is not working when it was.
  488. if (HasOutputFast() && HasOutput())
  489. return true;
  490. if (HasInputFast() && HasInput())
  491. return true;
  492. // Need to check is working again, in case the thread was between the first and second checks
  493. workingThreadCountMutex.Lock();
  494. isWorking=numThreadsWorking!=0;
  495. workingThreadCountMutex.Unlock();
  496. return isWorking;
  497. }
  498. template <class InputType, class OutputType>
  499. int ThreadPool<InputType, OutputType>::NumThreadsWorking(void)
  500. {
  501. return numThreadsWorking;
  502. }
  503. template <class InputType, class OutputType>
  504. bool ThreadPool<InputType, OutputType>::WasStarted(void)
  505. {
  506. bool b;
  507. runThreadsMutex.Lock();
  508. b = runThreads;
  509. runThreadsMutex.Unlock();
  510. return b;
  511. }
  512. template <class InputType, class OutputType>
  513. bool ThreadPool<InputType, OutputType>::Pause(void)
  514. {
  515. if (WasStarted()==false)
  516. return false;
  517. workingThreadCountMutex.Lock();
  518. while (numThreadsWorking>0)
  519. {
  520. RakSleep(30);
  521. }
  522. return true;
  523. }
  524. template <class InputType, class OutputType>
  525. void ThreadPool<InputType, OutputType>::Resume(void)
  526. {
  527. workingThreadCountMutex.Unlock();
  528. }
  529. #ifdef _MSC_VER
  530. #pragma warning( pop )
  531. #endif
  532. #endif
粤ICP备19079148号