AutopatcherServer.cpp 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888
  1. /*
  2. * Copyright (c) 2014, Oculus VR, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under the BSD-style license found in the
  6. * LICENSE file in the root directory of this source tree. An additional grant
  7. * of patent rights can be found in the PATENTS file in the same directory.
  8. *
  9. */
  10. /// \file
  11. /// \brief The server plugin for the autopatcher. Must be running for the client to get patches.
  12. #include "AutopatcherServer.h"
  13. #include "DirectoryDeltaTransfer.h"
  14. #include "FileList.h"
  15. #include "StringCompressor.h"
  16. #include "RakPeerInterface.h"
  17. #include "FileListTransfer.h"
  18. #include "FileListTransferCBInterface.h"
  19. #include "BitStream.h"
  20. #include "MessageIdentifiers.h"
  21. #include "AutopatcherRepositoryInterface.h"
  22. #include "RakAssert.h"
  23. #include "AutopatcherPatchContext.h"
  24. #include <stdio.h>
  25. #include <time.h>
  26. #ifdef _MSC_VER
  27. #pragma warning( push )
  28. #endif
  29. using namespace RakNet;
  30. const static unsigned HASH_LENGTH=4;
  31. void AutopatcherServerLoadNotifier_Printf::OnQueueUpdate(SystemAddress remoteSystem, AutopatcherServerLoadNotifier::RequestType requestType, AutopatcherServerLoadNotifier::QueueOperation queueOperation, AutopatcherServerLoadNotifier::AutopatcherState *autopatcherState)
  32. {
  33. char *operationString;
  34. char *requestTypeString;
  35. char systemAddressString[32];
  36. remoteSystem.ToString(true, systemAddressString);
  37. if (requestType==ASUMC_GET_CHANGELIST)
  38. requestTypeString="GetChangelist";
  39. else
  40. requestTypeString="GetPatch";
  41. if (queueOperation==QO_WAS_ADDED)
  42. operationString="added";
  43. else if (queueOperation==QO_POPPED_ONTO_TO_PROCESSING_THREAD)
  44. operationString="processing";
  45. else if (queueOperation==QO_WAS_ABORTED)
  46. operationString="aborted";
  47. printf("%s %s %s. %i queued. %i working.\n", systemAddressString, requestTypeString, operationString, autopatcherState->requestsQueued, autopatcherState->requestsWorking);
  48. }
  49. void AutopatcherServerLoadNotifier_Printf::OnGetChangelistCompleted(
  50. SystemAddress remoteSystem,
  51. AutopatcherServerLoadNotifier::GetChangelistResult getChangelistResult,
  52. AutopatcherServerLoadNotifier::AutopatcherState *autopatcherState)
  53. {
  54. char systemAddressString[32];
  55. remoteSystem.ToString(true, systemAddressString);
  56. char *changelistString;
  57. if (getChangelistResult==GCR_DELETE_FILES)
  58. changelistString="Delete files";
  59. else if (getChangelistResult==GCR_ADD_FILES)
  60. changelistString="Add files";
  61. else if (getChangelistResult==GCR_ADD_AND_DELETE_FILES)
  62. changelistString="Add and delete files";
  63. else if (getChangelistResult==GCR_NOTHING_TO_DO)
  64. changelistString="No files in changelist";
  65. else if (getChangelistResult==GCR_REPOSITORY_ERROR)
  66. changelistString="Repository error";
  67. printf("%s GetChangelist complete. %s. %i queued. %i working.\n", systemAddressString, changelistString, autopatcherState->requestsQueued, autopatcherState->requestsWorking);
  68. }
  69. void AutopatcherServerLoadNotifier_Printf::OnGetPatchCompleted(SystemAddress remoteSystem, AutopatcherServerLoadNotifier::PatchResult patchResult, AutopatcherServerLoadNotifier::AutopatcherState *autopatcherState)
  70. {
  71. char systemAddressString[32];
  72. remoteSystem.ToString(true, systemAddressString);
  73. char *patchResultString;
  74. if (patchResult==PR_NO_FILES_NEEDED_PATCHING)
  75. patchResultString="No files needed patching";
  76. else if (patchResult==PR_REPOSITORY_ERROR)
  77. patchResultString="Repository error";
  78. else if (patchResult==PR_DISALLOWED_DOWNLOADING_ORIGINAL_FILES)
  79. patchResultString="Disallowed downloading original files";
  80. else if (patchResult==PR_PATCHES_WERE_SENT)
  81. patchResultString="Files pushed for patching";
  82. else if (patchResult==PR_ABORTED_FROM_INPUT_THREAD)
  83. patchResultString="Aborted from input thread";
  84. else if (patchResult==PR_ABORTED_FROM_DOWNLOAD_THREAD)
  85. patchResultString="Aborted from download thread";
  86. printf("%s GetPatch complete. %s. %i queued. %i working.\n", systemAddressString, patchResultString, autopatcherState->requestsQueued, autopatcherState->requestsWorking);
  87. }
  88. AutopatcherServer::AutopatcherServer()
  89. {
  90. fileListTransfer=0;
  91. priority=HIGH_PRIORITY;
  92. orderingChannel=0;
  93. // repository=0;
  94. maxConcurrentUsers=0;
  95. loadNotifier=0;
  96. cache_minTime=0;
  97. cache_maxTime=0;
  98. cacheLoaded=false;
  99. allowDownloadOfOriginalUnmodifiedFiles=true;
  100. }
  101. AutopatcherServer::~AutopatcherServer()
  102. {
  103. Clear();
  104. }
  105. void AutopatcherServer::SetUploadSendParameters(PacketPriority _priority, char _orderingChannel)
  106. {
  107. priority=_priority;
  108. orderingChannel=_orderingChannel;
  109. }
  110. void AutopatcherServer::SetFileListTransferPlugin(FileListTransfer *flt)
  111. {
  112. if (fileListTransfer)
  113. fileListTransfer->RemoveCallback(this);
  114. fileListTransfer=flt;
  115. if (fileListTransfer)
  116. fileListTransfer->AddCallback(this);
  117. }
  118. void AutopatcherServer::StartThreads(int numThreads, int numSQLConnections, AutopatcherRepositoryInterface **sqlConnectionPtrArray)
  119. {
  120. RakAssert(numSQLConnections >= numThreads);
  121. connectionPoolMutex.Lock();
  122. for (int i=0; i < numSQLConnections; i++)
  123. {
  124. // Test the pointers passed, in case the user incorrectly casted an array of a different type
  125. sqlConnectionPtrArray[i]->GetLastError();
  126. connectionPool.Push(sqlConnectionPtrArray[i],_FILE_AND_LINE_);
  127. }
  128. connectionPoolMutex.Unlock();
  129. threadPool.SetThreadDataInterface(this,0);
  130. threadPool.StartThreads(numThreads, 0);
  131. }
  132. void AutopatcherServer::CacheMostRecentPatch(const char *applicationName)
  133. {
  134. if (connectionPool.Size()>0)
  135. {
  136. if (applicationName)
  137. cache_appName=applicationName;
  138. else
  139. cache_appName.Clear();
  140. cache_patchedFiles.Clear();
  141. cache_addedFiles.Clear();
  142. cache_deletedFiles.Clear();
  143. cache_addedOrModifiedFileHashes.Clear();
  144. cache_minTime=0;
  145. cache_maxTime=0;
  146. cacheLoaded = connectionPool[0]->GetMostRecentChangelistWithPatches(cache_appName, &cache_patchedFiles, &cache_addedFiles, &cache_addedOrModifiedFileHashes, &cache_deletedFiles, &cache_minTime, &cache_maxTime);
  147. if (cacheLoaded==false)
  148. {
  149. printf("Warning: Cache not loaded. This is OK if no patch was ever saved.\n");
  150. }
  151. }
  152. }
  153. void AutopatcherServer::OnAttach(void)
  154. {
  155. }
  156. void AutopatcherServer::OnDetach(void)
  157. {
  158. Clear();
  159. }
  160. #ifdef _MSC_VER
  161. #pragma warning( disable : 4100 ) // warning C4100: <variable name> : unreferenced formal parameter
  162. #endif
  163. void AutopatcherServer::Update(void)
  164. {
  165. while (PatchingUserLimitReached()==false && userRequestWaitingQueue.Size()>0)
  166. {
  167. Packet *packet = PopOffWaitingQueue();
  168. switch (packet->data[0])
  169. {
  170. case ID_AUTOPATCHER_GET_CHANGELIST_SINCE_DATE:
  171. OnGetChangelistSinceDateInt(packet);
  172. break;
  173. // Client sends ID_AUTOPATCHER_GET_PATCH with files that they have different or missing
  174. case ID_AUTOPATCHER_GET_PATCH:
  175. OnGetPatchInt(packet);
  176. break;
  177. }
  178. DeallocPacketUnified(packet);
  179. }
  180. }
  181. PluginReceiveResult AutopatcherServer::OnReceive(Packet *packet)
  182. {
  183. switch (packet->data[0])
  184. {
  185. case ID_AUTOPATCHER_GET_CHANGELIST_SINCE_DATE:
  186. return OnGetChangelistSinceDate(packet);
  187. case ID_AUTOPATCHER_GET_PATCH:
  188. return OnGetPatch(packet);
  189. }
  190. return RR_CONTINUE_PROCESSING;
  191. }
  192. #ifdef _MSC_VER
  193. #pragma warning( disable : 4100 ) // warning C4100: <variable name> : unreferenced formal parameter
  194. #endif
  195. void AutopatcherServer::OnShutdown(void)
  196. {
  197. Clear();
  198. }
  199. void AutopatcherServer::Clear(void)
  200. {
  201. // Clear the waiting input and output from the thread pool.
  202. unsigned i;
  203. threadPool.StopThreads();
  204. for (i=0; i < threadPool.InputSize(); i++)
  205. {
  206. if (DecrementPatchingUserCount(threadPool.GetInputAtIndex(i).systemAddress))
  207. CallPatchCompleteCallback(threadPool.GetInputAtIndex(i).systemAddress, AutopatcherServerLoadNotifier::PR_ABORTED_FROM_INPUT_THREAD);
  208. RakNet::OP_DELETE(threadPool.GetInputAtIndex(i).clientList, _FILE_AND_LINE_);
  209. }
  210. threadPool.ClearInput();
  211. for (i=0; i < threadPool.OutputSize(); i++)
  212. {
  213. RakNet::OP_DELETE(threadPool.GetOutputAtIndex(i)->patchList, _FILE_AND_LINE_);
  214. RakNet::OP_DELETE(threadPool.GetOutputAtIndex(i)->deletedFiles, _FILE_AND_LINE_);
  215. RakNet::OP_DELETE(threadPool.GetOutputAtIndex(i)->addedOrModifiedFilesWithHashData, _FILE_AND_LINE_);
  216. }
  217. threadPool.ClearOutput();
  218. while (userRequestWaitingQueue.Size())
  219. DeallocPacketUnified(AbortOffWaitingQueue());
  220. patchingUsers.Clear(true, _FILE_AND_LINE_);
  221. }
  222. #ifdef _MSC_VER
  223. #pragma warning( disable : 4100 ) // warning C4100: <variable name> : unreferenced formal parameter
  224. #endif
  225. void AutopatcherServer::OnStartup(RakPeerInterface *peer)
  226. {
  227. }
  228. #ifdef _MSC_VER
  229. #pragma warning( disable : 4100 ) // warning C4100: <variable name> : unreferenced formal parameter
  230. #endif
  231. void AutopatcherServer::OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason )
  232. {
  233. RemoveFromThreadPool(systemAddress);
  234. unsigned int i=0;
  235. patchingUsersMutex.Lock();
  236. while (i < patchingUsers.Size())
  237. {
  238. if (patchingUsers[i]==systemAddress)
  239. patchingUsers.RemoveAtIndexFast(i);
  240. else
  241. i++;
  242. }
  243. patchingUsersMutex.Unlock();
  244. i=0;
  245. while (i < userRequestWaitingQueue.Size())
  246. {
  247. if (userRequestWaitingQueue[i]->systemAddress==systemAddress)
  248. userRequestWaitingQueue.RemoveAtIndex(i);
  249. else
  250. i++;
  251. }
  252. }
  253. void AutopatcherServer::RemoveFromThreadPool(SystemAddress systemAddress)
  254. {
  255. unsigned i;
  256. i=0;
  257. threadPool.LockInput();
  258. while (i < threadPool.InputSize())
  259. {
  260. if (threadPool.GetInputAtIndex(i).systemAddress==systemAddress)
  261. {
  262. if (DecrementPatchingUserCount(systemAddress))
  263. CallPatchCompleteCallback(threadPool.GetInputAtIndex(i).systemAddress, AutopatcherServerLoadNotifier::PR_ABORTED_FROM_INPUT_THREAD);
  264. RakNet::OP_DELETE(threadPool.GetInputAtIndex(i).clientList, _FILE_AND_LINE_);
  265. threadPool.RemoveInputAtIndex(i);
  266. }
  267. else
  268. i++;
  269. }
  270. threadPool.UnlockInput();
  271. i=0;
  272. threadPool.LockOutput();
  273. while (i < threadPool.OutputSize())
  274. {
  275. if (threadPool.GetOutputAtIndex(i)->systemAddress==systemAddress)
  276. {
  277. RakNet::OP_DELETE(threadPool.GetOutputAtIndex(i)->patchList, _FILE_AND_LINE_);
  278. RakNet::OP_DELETE(threadPool.GetOutputAtIndex(i)->deletedFiles, _FILE_AND_LINE_);
  279. RakNet::OP_DELETE(threadPool.GetOutputAtIndex(i)->addedOrModifiedFilesWithHashData, _FILE_AND_LINE_);
  280. threadPool.RemoveOutputAtIndex(i);
  281. }
  282. else
  283. i++;
  284. }
  285. threadPool.UnlockOutput();
  286. }
  287. namespace RakNet
  288. {
  289. AutopatcherServer::ResultTypeAndBitstream* GetChangelistSinceDateCB(AutopatcherServer::ThreadData threadData, bool *returnOutput, void* perThreadData)
  290. {
  291. AutopatcherRepositoryInterface *repository = (AutopatcherRepositoryInterface*)perThreadData;
  292. FileList addedOrModifiedFilesWithHashData, deletedFiles;
  293. AutopatcherServer *server = threadData.server;
  294. //AutopatcherServer::ResultTypeAndBitstream *rtab = RakNet::OP_NEW<AutopatcherServer::ResultTypeAndBitstream>( _FILE_AND_LINE_ );
  295. AutopatcherServer::ResultTypeAndBitstream rtab;
  296. rtab.systemAddress=threadData.systemAddress;
  297. // rtab.deletedFiles=RakNet::OP_NEW<FileList>( _FILE_AND_LINE_ );
  298. // rtab.addedFiles=RakNet::OP_NEW<FileList>( _FILE_AND_LINE_ );
  299. rtab.deletedFiles=&deletedFiles;
  300. rtab.addedOrModifiedFilesWithHashData=&addedOrModifiedFilesWithHashData;
  301. // Query the database for a changelist since this date
  302. RakAssert(server);
  303. //if (server->repository->GetChangelistSinceDate(threadData.applicationName.C_String(), rtab.addedFiles, rtab.deletedFiles, threadData.lastUpdateDate.C_String(), currentDate))
  304. if (repository->GetChangelistSinceDate(threadData.applicationName.C_String(), rtab.addedOrModifiedFilesWithHashData, rtab.deletedFiles, threadData.lastUpdateDate))
  305. {
  306. rtab.resultCode=1;
  307. }
  308. else
  309. {
  310. rtab.resultCode=0;
  311. }
  312. rtab.operation=AutopatcherServer::ResultTypeAndBitstream::GET_CHANGELIST_SINCE_DATE;
  313. rtab.currentDate=(double) time(NULL);
  314. // *returnOutput=true;
  315. // return rtab;
  316. if (rtab.resultCode==1)
  317. {
  318. if (rtab.deletedFiles->fileList.Size())
  319. {
  320. rtab.bitStream1.Write((unsigned char) ID_AUTOPATCHER_DELETION_LIST);
  321. rtab.deletedFiles->Serialize(&rtab.bitStream1);
  322. }
  323. if (rtab.addedOrModifiedFilesWithHashData->fileList.Size())
  324. {
  325. rtab.bitStream2.Write((unsigned char) ID_AUTOPATCHER_CREATION_LIST);
  326. rtab.addedOrModifiedFilesWithHashData->Serialize(&rtab.bitStream2);
  327. rtab.bitStream2.Write(rtab.currentDate);
  328. rtab.bitStream2.WriteCasted<double>(0);
  329. rtab.addedOrModifiedFilesWithHashData->Clear();
  330. }
  331. else
  332. {
  333. rtab.bitStream2.Write((unsigned char) ID_AUTOPATCHER_FINISHED);
  334. rtab.bitStream2.Write(rtab.currentDate);
  335. }
  336. }
  337. else
  338. {
  339. rtab.bitStream2.Write((unsigned char) ID_AUTOPATCHER_REPOSITORY_FATAL_ERROR);
  340. StringCompressor::Instance()->EncodeString(repository->GetLastError(), 256, &rtab.bitStream2);
  341. }
  342. // RakNet::OP_DELETE(rtab.deletedFiles, _FILE_AND_LINE_);
  343. // RakNet::OP_DELETE(rtab.addedFiles, _FILE_AND_LINE_);
  344. *returnOutput=false;
  345. if (server->DecrementPatchingUserCount(rtab.systemAddress))
  346. {
  347. if (rtab.bitStream1.GetNumberOfBitsUsed()>0)
  348. server->SendUnified(&(rtab.bitStream1), server->priority, RELIABLE_ORDERED, server->orderingChannel, rtab.systemAddress, false);
  349. if (rtab.bitStream2.GetNumberOfBitsUsed()>0)
  350. server->SendUnified(&(rtab.bitStream2), server->priority, RELIABLE_ORDERED, server->orderingChannel, rtab.systemAddress, false);
  351. if (server->loadNotifier)
  352. {
  353. AutopatcherServerLoadNotifier::AutopatcherState autopatcherState;
  354. autopatcherState.requestsQueued=server->userRequestWaitingQueue.Size();
  355. autopatcherState.requestsWorking=server->patchingUsers.Size();
  356. AutopatcherServerLoadNotifier::GetChangelistResult getChangelistResult;
  357. if (rtab.resultCode!=1)
  358. getChangelistResult=AutopatcherServerLoadNotifier::GCR_REPOSITORY_ERROR;
  359. else if (rtab.deletedFiles->fileList.Size()==0 && rtab.addedOrModifiedFilesWithHashData->fileList.Size()==0)
  360. getChangelistResult=AutopatcherServerLoadNotifier::GCR_NOTHING_TO_DO;
  361. else if (rtab.deletedFiles->fileList.Size()==0)
  362. getChangelistResult=AutopatcherServerLoadNotifier::GCR_ADD_FILES;
  363. else if (rtab.addedOrModifiedFilesWithHashData->fileList.Size()==0)
  364. getChangelistResult=AutopatcherServerLoadNotifier::GCR_DELETE_FILES;
  365. else
  366. getChangelistResult=AutopatcherServerLoadNotifier::GCR_ADD_AND_DELETE_FILES;
  367. server->loadNotifier->OnGetChangelistCompleted(rtab.systemAddress, getChangelistResult, &autopatcherState);
  368. }
  369. }
  370. return 0;
  371. }
  372. }
  373. PluginReceiveResult AutopatcherServer::OnGetChangelistSinceDate(Packet *packet)
  374. {
  375. RakNet::BitStream inBitStream(packet->data, packet->length, false);
  376. ThreadData threadData;
  377. threadData.clientList=0;
  378. inBitStream.IgnoreBits(8);
  379. inBitStream.ReadCompressed(threadData.applicationName);
  380. inBitStream.Read(threadData.lastUpdateDate);
  381. if (cacheLoaded && threadData.lastUpdateDate!=0 && threadData.applicationName==cache_appName)
  382. {
  383. RakNet::BitStream bitStream1;
  384. RakNet::BitStream bitStream2;
  385. double currentDate=(double) time(NULL);
  386. if (cache_maxTime!=0 && threadData.lastUpdateDate>cache_maxTime)
  387. {
  388. bitStream2.Write((unsigned char) ID_AUTOPATCHER_FINISHED);
  389. bitStream2.Write(currentDate);
  390. SendUnified(&bitStream2, priority, RELIABLE_ORDERED,orderingChannel, packet->systemAddress, false);
  391. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  392. }
  393. // Check in-memory cache, use if possible rather than accessing database
  394. if (cache_minTime!=0 && threadData.lastUpdateDate>cache_minTime)
  395. {
  396. if (cache_deletedFiles.fileList.Size())
  397. {
  398. bitStream1.Write((unsigned char) ID_AUTOPATCHER_DELETION_LIST);
  399. cache_deletedFiles.Serialize(&bitStream1);
  400. SendUnified(&bitStream1, priority, RELIABLE_ORDERED,orderingChannel, packet->systemAddress, false);
  401. }
  402. if (cache_addedOrModifiedFileHashes.fileList.Size())
  403. {
  404. bitStream2.Write((unsigned char) ID_AUTOPATCHER_CREATION_LIST);
  405. cache_addedOrModifiedFileHashes.Serialize(&bitStream2);
  406. bitStream2.Write(currentDate);
  407. bitStream2.Write(threadData.lastUpdateDate);
  408. }
  409. else
  410. {
  411. bitStream2.Write((unsigned char) ID_AUTOPATCHER_FINISHED);
  412. bitStream2.Write(currentDate);
  413. }
  414. SendUnified(&bitStream2, priority, RELIABLE_ORDERED,orderingChannel, packet->systemAddress, false);
  415. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  416. }
  417. }
  418. if (PatchingUserLimitReached())
  419. {
  420. AddToWaitingQueue(packet);
  421. return RR_STOP_PROCESSING;
  422. }
  423. OnGetChangelistSinceDateInt(packet);
  424. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  425. }
  426. void AutopatcherServer::OnGetChangelistSinceDateInt(Packet *packet)
  427. {
  428. RakNet::BitStream inBitStream(packet->data, packet->length, false);
  429. ThreadData threadData;
  430. threadData.clientList=0;
  431. inBitStream.IgnoreBits(8);
  432. inBitStream.ReadCompressed(threadData.applicationName);
  433. inBitStream.Read(threadData.lastUpdateDate);
  434. if (IncrementPatchingUserCount(packet->systemAddress))
  435. {
  436. CallPacketCallback(packet, AutopatcherServerLoadNotifier::QO_POPPED_ONTO_TO_PROCESSING_THREAD);
  437. threadData.server=this;
  438. threadData.systemAddress=packet->systemAddress;
  439. threadPool.AddInput(GetChangelistSinceDateCB, threadData);
  440. }
  441. }
  442. namespace RakNet {
  443. AutopatcherServer::ResultTypeAndBitstream* GetPatchCB(AutopatcherServer::ThreadData threadData, bool *returnOutput, void* perThreadData)
  444. {
  445. AutopatcherServer *server = threadData.server;
  446. AutopatcherRepositoryInterface *repository = (AutopatcherRepositoryInterface*)perThreadData;
  447. // AutopatcherServer::ResultTypeAndBitstream *rtab = RakNet::OP_NEW<AutopatcherServer::ResultTypeAndBitstream>( _FILE_AND_LINE_ );
  448. AutopatcherServer::ResultTypeAndBitstream rtab;
  449. rtab.systemAddress=threadData.systemAddress;
  450. FileList fileList;
  451. // rtab.patchList=RakNet::OP_NEW<FileList>( _FILE_AND_LINE_ );
  452. rtab.patchList=&fileList;
  453. RakAssert(server);
  454. // RakAssert(server->repository);
  455. // if (server->repository->GetPatches(threadData.applicationName.C_String(), threadData.clientList, rtab.patchList, currentDate))
  456. rtab.resultCode = repository->GetPatches(threadData.applicationName.C_String(), threadData.clientList, server->allowDownloadOfOriginalUnmodifiedFiles, rtab.patchList);
  457. rtab.operation=AutopatcherServer::ResultTypeAndBitstream::GET_PATCH;
  458. rtab.setId=threadData.setId;
  459. rtab.currentDate=(double) time(NULL);
  460. RakNet::OP_DELETE(threadData.clientList, _FILE_AND_LINE_);
  461. if (rtab.resultCode==1)
  462. {
  463. if (rtab.patchList->fileList.Size())
  464. {
  465. //server->fileListTransfer->Send(rtab.patchList, 0, rtab.systemAddress, rtab.setId, server->priority, server->orderingChannel, false, server->repository);
  466. server->fileListTransfer->Send(rtab.patchList, 0, rtab.systemAddress, rtab.setId, server->priority, server->orderingChannel, repository, repository->GetIncrementalReadChunkSize());
  467. }
  468. else
  469. {
  470. // No files needed to send
  471. if (server->DecrementPatchingUserCount(rtab.systemAddress))
  472. server->CallPatchCompleteCallback(rtab.systemAddress, AutopatcherServerLoadNotifier::PR_NO_FILES_NEEDED_PATCHING);
  473. }
  474. rtab.bitStream1.Write((unsigned char) ID_AUTOPATCHER_FINISHED_INTERNAL);
  475. rtab.bitStream1.Write(rtab.currentDate);
  476. }
  477. else
  478. {
  479. AutopatcherServerLoadNotifier::PatchResult pr;
  480. if (rtab.resultCode==0)
  481. {
  482. rtab.bitStream1.Write((unsigned char) ID_AUTOPATCHER_REPOSITORY_FATAL_ERROR);
  483. StringCompressor::Instance()->EncodeString(repository->GetLastError(), 256, &rtab.bitStream1);
  484. pr = AutopatcherServerLoadNotifier::PR_REPOSITORY_ERROR;
  485. }
  486. else
  487. {
  488. rtab.bitStream1.Write((unsigned char) ID_AUTOPATCHER_CANNOT_DOWNLOAD_ORIGINAL_UNMODIFIED_FILES);
  489. pr = AutopatcherServerLoadNotifier::PR_DISALLOWED_DOWNLOADING_ORIGINAL_FILES;
  490. }
  491. if (server->DecrementPatchingUserCount(rtab.systemAddress))
  492. {
  493. server->CallPatchCompleteCallback(rtab.systemAddress, pr);
  494. }
  495. else
  496. {
  497. *returnOutput=false;
  498. return 0;
  499. }
  500. }
  501. *returnOutput=false;
  502. if (rtab.bitStream1.GetNumberOfBitsUsed()>0)
  503. server->SendUnified(&(rtab.bitStream1), server->priority, RELIABLE_ORDERED, server->orderingChannel, rtab.systemAddress, false);
  504. if (rtab.bitStream2.GetNumberOfBitsUsed()>0)
  505. server->SendUnified(&(rtab.bitStream2), server->priority, RELIABLE_ORDERED, server->orderingChannel, rtab.systemAddress, false);
  506. // 12/1/2010 This doesn't scale well. Changing to allocating a connection object per request
  507. /*
  508. // Wait for repository to finish
  509. // This is so that the same sql connection is not used between two different plugins, which causes thrashing and bad performance
  510. // Plus if fileListTransfer uses multiple threads, this will keep this thread and the fileListTransfer thread from using the same connection at the same time
  511. // PostgreSQL possibly MySQL are not threadsafe for multiple threads on the same connection
  512. int pendingFiles = server->fileListTransfer->GetPendingFilesToAddress(rtab.systemAddress);
  513. while (pendingFiles>0)
  514. {
  515. RakSleep(pendingFiles*10);
  516. pendingFiles = server->fileListTransfer->GetPendingFilesToAddress(rtab.systemAddress);
  517. }
  518. */
  519. // *returnOutput=true;
  520. // return rtab;
  521. return 0;
  522. }
  523. }
  524. PluginReceiveResult AutopatcherServer::OnGetPatch(Packet *packet)
  525. {
  526. RakNet::BitStream inBitStream(packet->data, packet->length, false);
  527. ThreadData threadData;
  528. inBitStream.IgnoreBits(8);
  529. inBitStream.Read(threadData.setId);
  530. double lastUpdateDate;
  531. inBitStream.Read(lastUpdateDate);
  532. inBitStream.ReadCompressed(threadData.applicationName);
  533. threadData.clientList=0;
  534. // Check in-memory cache, use if possible rather than accessing database
  535. if (threadData.applicationName==cache_appName && lastUpdateDate!=0 && cacheLoaded && cache_minTime!=0 && lastUpdateDate>cache_minTime)
  536. {
  537. threadData.systemAddress=packet->systemAddress;
  538. threadData.server=this;
  539. threadData.clientList=RakNet::OP_NEW<FileList>( _FILE_AND_LINE_ );
  540. if (threadData.clientList->Deserialize(&inBitStream)==false)
  541. {
  542. RakNet::OP_DELETE(threadData.clientList, _FILE_AND_LINE_);
  543. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  544. }
  545. if (threadData.clientList->fileList.Size()==0)
  546. {
  547. RakAssert(0);
  548. RakNet::OP_DELETE(threadData.clientList, _FILE_AND_LINE_);
  549. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  550. }
  551. char *userHash;
  552. RakNet::RakString userFilename;
  553. FileList patchList;
  554. bool cacheUpdateFailed=false;
  555. unsigned int i,j;
  556. // FileList is the list of all files missing or changed as determined by the client
  557. for (i=0; i < threadData.clientList->fileList.Size(); i++)
  558. {
  559. userHash=threadData.clientList->fileList[i].data;
  560. userFilename=threadData.clientList->fileList[i].filename;
  561. if (userHash)
  562. {
  563. // If the user has a hash, check for this file in cache_patchedFiles. If not found, or hash is wrong, use DB
  564. if (threadData.clientList->fileList[i].dataLengthBytes!=HASH_LENGTH)
  565. {
  566. RakNet::OP_DELETE(threadData.clientList, _FILE_AND_LINE_);
  567. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  568. }
  569. for (j=0; j < cache_patchedFiles.fileList.Size(); j++)
  570. {
  571. if (userFilename == cache_patchedFiles.fileList[j].filename)
  572. {
  573. if (memcmp(cache_patchedFiles.fileList[j].data, userHash, HASH_LENGTH)==0)
  574. {
  575. // Send patch
  576. RakAssert(cache_patchedFiles.fileList[j].context.op==PC_HASH_2_WITH_PATCH);
  577. patchList.AddFile(userFilename,userFilename, 0, cache_patchedFiles.fileList[j].dataLengthBytes, cache_patchedFiles.fileList[j].fileLengthBytes, cache_patchedFiles.fileList[j].context, true, false);
  578. }
  579. else
  580. {
  581. // Bad hash
  582. cacheUpdateFailed=true;
  583. }
  584. break;
  585. }
  586. }
  587. if (j==cache_patchedFiles.fileList.Size())
  588. {
  589. // Didn't find the patch even though the client has an older version of the file
  590. cacheUpdateFailed=true;
  591. }
  592. }
  593. else
  594. {
  595. // If the user does not have a hash, check for this file in cache_addedFiles. If not found, use DB
  596. for (j=0; j < cache_addedFiles.fileList.Size(); j++)
  597. {
  598. if (userFilename == cache_addedFiles.fileList[j].filename)
  599. {
  600. // Send added file
  601. patchList.AddFile(userFilename,userFilename, 0, cache_addedFiles.fileList[j].dataLengthBytes, cache_addedFiles.fileList[j].fileLengthBytes, cache_addedFiles.fileList[j].context, true, false);
  602. break;
  603. }
  604. }
  605. if (j==cache_addedFiles.fileList.Size())
  606. {
  607. // Didn't find the file in the cache even though the client asked for it
  608. cacheUpdateFailed=true;
  609. }
  610. }
  611. if (cacheUpdateFailed==true)
  612. {
  613. // Failure to find file in cache
  614. // Will fall to use database
  615. patchList.Clear();
  616. break;
  617. }
  618. }
  619. if (patchList.fileList.Size()>0)
  620. {
  621. if (IncrementPatchingUserCount(packet->systemAddress))
  622. {
  623. fileListTransfer->Send(&patchList, 0, packet->systemAddress, threadData.setId, priority, orderingChannel, this, 262144*4*4);
  624. RakNet::BitStream bitStream1;
  625. bitStream1.Write((unsigned char) ID_AUTOPATCHER_FINISHED_INTERNAL);
  626. double t =(double) time(NULL);
  627. bitStream1.Write(t);
  628. SendUnified(&bitStream1, priority, RELIABLE_ORDERED, orderingChannel, packet->systemAddress, false);
  629. RakNet::OP_DELETE(threadData.clientList, _FILE_AND_LINE_);
  630. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  631. }
  632. }
  633. }
  634. RakNet::OP_DELETE(threadData.clientList, _FILE_AND_LINE_);
  635. if (PatchingUserLimitReached())
  636. {
  637. AddToWaitingQueue(packet);
  638. return RR_STOP_PROCESSING;
  639. }
  640. OnGetPatchInt(packet);
  641. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  642. }
  643. void AutopatcherServer::OnGetPatchInt(Packet *packet)
  644. {
  645. RakNet::BitStream inBitStream(packet->data, packet->length, false);
  646. ThreadData threadData;
  647. inBitStream.IgnoreBits(8);
  648. inBitStream.Read(threadData.setId);
  649. double lastUpdateDate;
  650. inBitStream.Read(lastUpdateDate);
  651. inBitStream.ReadCompressed(threadData.applicationName);
  652. threadData.systemAddress=packet->systemAddress;
  653. threadData.server=this;
  654. threadData.clientList=RakNet::OP_NEW<FileList>( _FILE_AND_LINE_ );
  655. if (threadData.clientList->Deserialize(&inBitStream)==false)
  656. {
  657. RakNet::OP_DELETE(threadData.clientList, _FILE_AND_LINE_);
  658. return;
  659. }
  660. if (threadData.clientList->fileList.Size()==0)
  661. {
  662. RakAssert(0);
  663. RakNet::OP_DELETE(threadData.clientList, _FILE_AND_LINE_);
  664. return;
  665. }
  666. if (IncrementPatchingUserCount(packet->systemAddress))
  667. CallPacketCallback(packet, AutopatcherServerLoadNotifier::QO_POPPED_ONTO_TO_PROCESSING_THREAD);
  668. threadPool.AddInput(GetPatchCB, threadData);
  669. }
  670. void* AutopatcherServer::PerThreadFactory(void *context)
  671. {
  672. (void)context;
  673. AutopatcherRepositoryInterface* p;
  674. connectionPoolMutex.Lock();
  675. p=connectionPool.Pop();
  676. connectionPoolMutex.Unlock();
  677. return p;
  678. }
  679. void AutopatcherServer::PerThreadDestructor(void* factoryResult, void *context)
  680. {
  681. (void)context;
  682. (void)factoryResult;
  683. }
  684. void AutopatcherServer::OnFilePushesComplete( SystemAddress systemAddress, unsigned short setID )
  685. {
  686. if (DecrementPatchingUserCount(systemAddress))
  687. CallPatchCompleteCallback(systemAddress, AutopatcherServerLoadNotifier::PR_PATCHES_WERE_SENT);
  688. }
  689. void AutopatcherServer::OnSendAborted( SystemAddress systemAddress )
  690. {
  691. if (DecrementPatchingUserCount(systemAddress))
  692. CallPatchCompleteCallback(systemAddress, AutopatcherServerLoadNotifier::PR_ABORTED_FROM_DOWNLOAD_THREAD);
  693. }
  694. bool AutopatcherServer::IncrementPatchingUserCount(SystemAddress sa)
  695. {
  696. // A system address may exist more than once in patchingUsers
  697. patchingUsersMutex.Lock();
  698. patchingUsers.Insert(sa, _FILE_AND_LINE_);
  699. patchingUsersMutex.Unlock();
  700. return true;
  701. }
  702. bool AutopatcherServer::DecrementPatchingUserCount(SystemAddress sa)
  703. {
  704. unsigned int i;
  705. patchingUsersMutex.Lock();
  706. for (i=0; i < patchingUsers.Size(); i++)
  707. {
  708. if (patchingUsers[i]==sa)
  709. {
  710. patchingUsers.RemoveAtIndexFast(i);
  711. patchingUsersMutex.Unlock();
  712. return true;
  713. }
  714. }
  715. patchingUsersMutex.Unlock();
  716. return false;
  717. }
  718. bool AutopatcherServer::PatchingUserLimitReached(void) const
  719. {
  720. if (maxConcurrentUsers==0)
  721. return false;
  722. return patchingUsers.Size()>=maxConcurrentUsers;
  723. }
  724. void AutopatcherServer::SetMaxConurrentUsers(unsigned int _maxConcurrentUsers)
  725. {
  726. maxConcurrentUsers=_maxConcurrentUsers;
  727. }
  728. unsigned int AutopatcherServer::GetMaxConurrentUsers(void) const
  729. {
  730. return maxConcurrentUsers;
  731. }
  732. void AutopatcherServer::CallPacketCallback(Packet *packet, AutopatcherServerLoadNotifier::QueueOperation queueOperation)
  733. {
  734. if (loadNotifier)
  735. {
  736. AutopatcherServerLoadNotifier::AutopatcherState autopatcherState;
  737. autopatcherState.requestsQueued=userRequestWaitingQueue.Size();
  738. autopatcherState.requestsWorking=patchingUsers.Size();
  739. AutopatcherServerLoadNotifier::RequestType requestType;
  740. if (packet->data[0]==ID_AUTOPATCHER_GET_CHANGELIST_SINCE_DATE)
  741. requestType=AutopatcherServerLoadNotifier::ASUMC_GET_CHANGELIST;
  742. else
  743. requestType=AutopatcherServerLoadNotifier::ASUMC_GET_PATCH;
  744. loadNotifier->OnQueueUpdate(packet->systemAddress, requestType, queueOperation, &autopatcherState);
  745. }
  746. }
  747. void AutopatcherServer::CallPatchCompleteCallback(const SystemAddress &systemAddress, AutopatcherServerLoadNotifier::PatchResult patchResult)
  748. {
  749. if (loadNotifier)
  750. {
  751. AutopatcherServerLoadNotifier::AutopatcherState autopatcherState;
  752. autopatcherState.requestsQueued=userRequestWaitingQueue.Size();
  753. autopatcherState.requestsWorking=patchingUsers.Size();
  754. loadNotifier->OnGetPatchCompleted(systemAddress, patchResult, &autopatcherState);
  755. }
  756. }
  757. void AutopatcherServer::AddToWaitingQueue(Packet *packet)
  758. {
  759. userRequestWaitingQueue.Push(packet, _FILE_AND_LINE_);
  760. CallPacketCallback(packet, AutopatcherServerLoadNotifier::QO_WAS_ADDED);
  761. }
  762. Packet *AutopatcherServer::AbortOffWaitingQueue(void)
  763. {
  764. Packet *packet = userRequestWaitingQueue.Pop();
  765. CallPacketCallback(packet,AutopatcherServerLoadNotifier::QO_WAS_ABORTED);
  766. return packet;
  767. }
  768. Packet *AutopatcherServer::PopOffWaitingQueue(void)
  769. {
  770. return userRequestWaitingQueue.Pop();;
  771. }
  772. void AutopatcherServer::SetLoadManagementCallback(AutopatcherServerLoadNotifier *asumc)
  773. {
  774. loadNotifier=asumc;
  775. }
  776. void AutopatcherServer::SetAllowDownloadOfOriginalUnmodifiedFiles(bool allow)
  777. {
  778. allowDownloadOfOriginalUnmodifiedFiles = allow;
  779. }
  780. unsigned int AutopatcherServer::GetFilePart( const char *filename, unsigned int startReadBytes, unsigned int numBytesToRead, void *preallocatedDestination, FileListNodeContext context)
  781. {
  782. /*
  783. int offset;
  784. if (context.op==PC_HASH_1_WITH_PATCH)
  785. offset=HASH_LENGTH;
  786. else if (context.op==PC_HASH_2_WITH_PATCH)
  787. offset=HASH_LENGTH*2;
  788. else
  789. offset=0;
  790. int bytesToRead;
  791. if (startReadBytes + numBytesToRead > context.dataLength-offset)
  792. bytesToRead=(context.dataLength-offset)-startReadBytes;
  793. else
  794. bytesToRead=numBytesToRead;
  795. memcpy(preallocatedDestination, ((char*)context.dataPtr)+offset, bytesToRead);
  796. */
  797. int bytesToRead;
  798. if (startReadBytes + numBytesToRead > context.dataLength)
  799. bytesToRead=(context.dataLength)-startReadBytes;
  800. else
  801. bytesToRead=numBytesToRead;
  802. memcpy(preallocatedDestination, context.dataPtr, bytesToRead);
  803. return bytesToRead;
  804. }
  805. #ifdef _MSC_VER
  806. #pragma warning( pop )
  807. #endif
粤ICP备19079148号