CloudServer.cpp 59 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685
  1. /*
  2. * Copyright (c) 2014, Oculus VR, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under the BSD-style license found in the
  6. * LICENSE file in the root directory of this source tree. An additional grant
  7. * of patent rights can be found in the PATENTS file in the same directory.
  8. *
  9. */
  10. #include "NativeFeatureIncludes.h"
  11. #if _RAKNET_SUPPORT_CloudServer==1
  12. #include "CloudServer.h"
  13. #include "GetTime.h"
  14. #include "MessageIdentifiers.h"
  15. #include "BitStream.h"
  16. #include "RakPeerInterface.h"
  17. enum ServerToServerCommands
  18. {
  19. STSC_PROCESS_GET_REQUEST,
  20. STSC_PROCESS_GET_RESPONSE,
  21. STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS,
  22. STSC_ADD_UPLOADED_KEY,
  23. STSC_ADD_SUBSCRIBED_KEY,
  24. STSC_REMOVE_UPLOADED_KEY,
  25. STSC_REMOVE_SUBSCRIBED_KEY,
  26. STSC_DATA_CHANGED,
  27. };
  28. using namespace RakNet;
  29. int CloudServer::RemoteServerComp(const RakNetGUID &key, RemoteServer* const &data )
  30. {
  31. if (key < data->serverAddress)
  32. return -1;
  33. if (key > data->serverAddress)
  34. return 1;
  35. return 0;
  36. }
  37. int CloudServer::KeySubscriberIDComp(const CloudKey &key, KeySubscriberID * const &data )
  38. {
  39. if (key.primaryKey < data->key.primaryKey)
  40. return -1;
  41. if (key.primaryKey > data->key.primaryKey)
  42. return 1;
  43. if (key.secondaryKey < data->key.secondaryKey)
  44. return -1;
  45. if (key.secondaryKey > data->key.secondaryKey)
  46. return 1;
  47. return 0;
  48. }
  49. int CloudServer::KeyDataPtrComp( const RakNetGUID &key, CloudData* const &data )
  50. {
  51. if (key < data->clientGUID)
  52. return -1;
  53. if (key > data->clientGUID)
  54. return 1;
  55. return 0;
  56. }
  57. int CloudServer::KeyDataListComp( const CloudKey &key, CloudDataList * const &data )
  58. {
  59. if (key.primaryKey < data->key.primaryKey)
  60. return -1;
  61. if (key.primaryKey > data->key.primaryKey)
  62. return 1;
  63. if (key.secondaryKey < data->key.secondaryKey)
  64. return -1;
  65. if (key.secondaryKey > data->key.secondaryKey)
  66. return 1;
  67. return 0;
  68. }
  69. int CloudServer::BufferedGetResponseFromServerComp(const RakNetGUID &key, CloudServer::BufferedGetResponseFromServer* const &data )
  70. {
  71. if (key < data->serverAddress)
  72. return -1;
  73. if (key > data->serverAddress)
  74. return 1;
  75. return 0;
  76. }
  77. int CloudServer::GetRequestComp(const uint32_t &key, CloudServer::GetRequest* const &data )
  78. {
  79. if (key < data->requestId)
  80. return -1;
  81. if (key > data->requestId)
  82. return -1;
  83. return 0;
  84. }
  85. void CloudServer::CloudQueryWithAddresses::Serialize(bool writeToBitstream, BitStream *bitStream)
  86. {
  87. cloudQuery.Serialize(writeToBitstream, bitStream);
  88. if (writeToBitstream)
  89. {
  90. bitStream->WriteCasted<uint16_t>(specificSystems.Size());
  91. RakAssert(specificSystems.Size() < (uint16_t)-1 );
  92. for (uint16_t i=0; i < specificSystems.Size(); i++)
  93. {
  94. bitStream->Write(specificSystems[i]);
  95. }
  96. }
  97. else
  98. {
  99. uint16_t specificSystemsCount;
  100. RakNetGUID addressOrGuid;
  101. bitStream->Read(specificSystemsCount);
  102. for (uint16_t i=0; i < specificSystemsCount; i++)
  103. {
  104. bitStream->Read(addressOrGuid);
  105. specificSystems.Push(addressOrGuid, _FILE_AND_LINE_);
  106. }
  107. }
  108. }
  109. bool CloudServer::GetRequest::AllRemoteServersHaveResponded(void) const
  110. {
  111. unsigned int i;
  112. for (i=0; i < remoteServerResponses.Size(); i++)
  113. if (remoteServerResponses[i]->gotResult==false)
  114. return false;
  115. return true;
  116. }
  117. void CloudServer::GetRequest::Clear(CloudAllocator *allocator)
  118. {
  119. unsigned int i;
  120. for (i=0; i < remoteServerResponses.Size(); i++)
  121. {
  122. remoteServerResponses[i]->Clear(allocator);
  123. RakNet::OP_DELETE(remoteServerResponses[i], _FILE_AND_LINE_);
  124. }
  125. remoteServerResponses.Clear(false, _FILE_AND_LINE_);
  126. }
  127. void CloudServer::BufferedGetResponseFromServer::Clear(CloudAllocator *allocator)
  128. {
  129. unsigned int i;
  130. for (i=0; i < queryResult.rowsReturned.Size(); i++)
  131. {
  132. allocator->DeallocateRowData(queryResult.rowsReturned[i]->data);
  133. allocator->DeallocateCloudQueryRow(queryResult.rowsReturned[i]);
  134. }
  135. queryResult.rowsReturned.Clear(false, _FILE_AND_LINE_);
  136. }
  137. CloudServer::CloudServer()
  138. {
  139. maxUploadBytesPerClient=0;
  140. maxBytesPerDowload=0;
  141. nextGetRequestId=0;
  142. nextGetRequestsCheck=0;
  143. }
  144. CloudServer::~CloudServer()
  145. {
  146. Clear();
  147. }
  148. void CloudServer::SetMaxUploadBytesPerClient(uint64_t bytes)
  149. {
  150. maxUploadBytesPerClient=bytes;
  151. }
  152. void CloudServer::SetMaxBytesPerDownload(uint64_t bytes)
  153. {
  154. maxBytesPerDowload=bytes;
  155. }
  156. void CloudServer::Update(void)
  157. {
  158. // Timeout getRequests
  159. RakNet::Time time = RakNet::Time();
  160. if (time > nextGetRequestsCheck)
  161. {
  162. nextGetRequestsCheck=time+1000;
  163. unsigned int i=0;
  164. while (i < getRequests.Size())
  165. {
  166. if (time - getRequests[i]->requestStartTime > 3000)
  167. {
  168. // Remote server is not responding, just send back data with whoever did respond
  169. ProcessAndTransmitGetRequest(getRequests[i]);
  170. getRequests[i]->Clear(this);
  171. RakNet::OP_DELETE(getRequests[i],_FILE_AND_LINE_);
  172. getRequests.RemoveAtIndex(i);
  173. }
  174. else
  175. {
  176. i++;
  177. }
  178. }
  179. }
  180. }
  181. PluginReceiveResult CloudServer::OnReceive(Packet *packet)
  182. {
  183. switch (packet->data[0])
  184. {
  185. case ID_CLOUD_POST_REQUEST:
  186. OnPostRequest(packet);
  187. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  188. case ID_CLOUD_RELEASE_REQUEST:
  189. OnReleaseRequest(packet);
  190. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  191. case ID_CLOUD_GET_REQUEST:
  192. OnGetRequest(packet);
  193. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  194. case ID_CLOUD_UNSUBSCRIBE_REQUEST:
  195. OnUnsubscribeRequest(packet);
  196. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  197. case ID_CLOUD_SERVER_TO_SERVER_COMMAND:
  198. if (packet->length>1)
  199. {
  200. switch (packet->data[1])
  201. {
  202. case STSC_PROCESS_GET_REQUEST:
  203. OnServerToServerGetRequest(packet);
  204. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  205. case STSC_PROCESS_GET_RESPONSE:
  206. OnServerToServerGetResponse(packet);
  207. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  208. case STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS:
  209. OnSendUploadedAndSubscribedKeysToServer(packet);
  210. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  211. case STSC_ADD_UPLOADED_KEY:
  212. OnSendUploadedKeyToServers(packet);
  213. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  214. case STSC_ADD_SUBSCRIBED_KEY:
  215. OnSendSubscribedKeyToServers(packet);
  216. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  217. case STSC_REMOVE_UPLOADED_KEY:
  218. OnRemoveUploadedKeyFromServers(packet);
  219. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  220. case STSC_REMOVE_SUBSCRIBED_KEY:
  221. OnRemoveSubscribedKeyFromServers(packet);
  222. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  223. case STSC_DATA_CHANGED:
  224. OnServerDataChanged(packet);
  225. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  226. }
  227. }
  228. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  229. }
  230. return RR_CONTINUE_PROCESSING;
  231. }
  232. void CloudServer::OnPostRequest(Packet *packet)
  233. {
  234. RakNet::BitStream bsIn(packet->data, packet->length, false);
  235. bsIn.IgnoreBytes(sizeof(MessageID));
  236. CloudKey key;
  237. key.Serialize(false,&bsIn);
  238. uint32_t dataLengthBytes;
  239. bsIn.Read(dataLengthBytes);
  240. if (maxUploadBytesPerClient>0 && dataLengthBytes>maxUploadBytesPerClient)
  241. return; // Exceeded max upload bytes
  242. bsIn.AlignReadToByteBoundary();
  243. for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
  244. {
  245. if (queryFilters[filterIndex]->OnPostRequest(packet->guid, packet->systemAddress, key, dataLengthBytes, (const char*) bsIn.GetData()+BITS_TO_BYTES(bsIn.GetReadOffset()))==false)
  246. return;
  247. }
  248. unsigned char *data;
  249. if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
  250. {
  251. data = (unsigned char *) rakMalloc_Ex(dataLengthBytes,_FILE_AND_LINE_);
  252. if (data==0)
  253. {
  254. notifyOutOfMemory(_FILE_AND_LINE_);
  255. return;
  256. }
  257. bsIn.ReadAlignedBytes(data,dataLengthBytes);
  258. }
  259. else
  260. data=0;
  261. // Add this system to remoteSystems if they aren't there already
  262. DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid);
  263. RemoteCloudClient *remoteCloudClient;
  264. if (remoteSystemsHashIndex.IsInvalid())
  265. {
  266. remoteCloudClient = RakNet::OP_NEW<RemoteCloudClient>(_FILE_AND_LINE_);
  267. remoteCloudClient->uploadedKeys.Insert(key,key,true,_FILE_AND_LINE_);
  268. remoteCloudClient->uploadedBytes=0;
  269. remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_);
  270. }
  271. else
  272. {
  273. remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex);
  274. bool objectExists;
  275. // Add to RemoteCloudClient::uploadedKeys if it isn't there already
  276. unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists);
  277. if (objectExists==false)
  278. {
  279. remoteCloudClient->uploadedKeys.InsertAtIndex(key, uploadedKeysIndex, _FILE_AND_LINE_);
  280. }
  281. }
  282. bool cloudDataAlreadyUploaded;
  283. unsigned int dataRepositoryIndex;
  284. bool dataRepositoryExists;
  285. CloudDataList* cloudDataList = GetOrAllocateCloudDataList(key, &dataRepositoryExists, dataRepositoryIndex);
  286. if (dataRepositoryExists==false)
  287. {
  288. cloudDataList->uploaderCount=1;
  289. cloudDataAlreadyUploaded=false;
  290. }
  291. else
  292. {
  293. cloudDataAlreadyUploaded=cloudDataList->uploaderCount>0;
  294. cloudDataList->uploaderCount++;
  295. }
  296. CloudData *cloudData;
  297. bool keyDataListExists;
  298. unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists);
  299. if (keyDataListExists==false)
  300. {
  301. if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes+dataLengthBytes>maxUploadBytesPerClient)
  302. {
  303. // Undo prior insertion of cloudDataList into cloudData if needed
  304. if (keyDataListExists==false)
  305. {
  306. RakNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_);
  307. dataRepository.RemoveAtIndex(dataRepositoryIndex);
  308. }
  309. if (remoteCloudClient->IsUnused())
  310. {
  311. RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
  312. remoteSystems.Remove(packet->guid, _FILE_AND_LINE_);
  313. }
  314. if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
  315. rakFree_Ex(data, _FILE_AND_LINE_);
  316. return;
  317. }
  318. cloudData = RakNet::OP_NEW<CloudData>(_FILE_AND_LINE_);
  319. cloudData->dataLengthBytes=dataLengthBytes;
  320. cloudData->isUploaded=true;
  321. if (forceAddress!=UNASSIGNED_SYSTEM_ADDRESS)
  322. {
  323. cloudData->serverSystemAddress=forceAddress;
  324. cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort());
  325. }
  326. else
  327. {
  328. cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress);
  329. if (cloudData->serverSystemAddress.IsLoopback())
  330. cloudData->serverSystemAddress.FromString(rakPeerInterface->GetLocalIP(0));
  331. }
  332. if (cloudData->serverSystemAddress.GetPort()==0)
  333. {
  334. // Fix localhost port
  335. cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort());
  336. }
  337. cloudData->clientSystemAddress=packet->systemAddress;
  338. cloudData->serverGUID=rakPeerInterface->GetMyGUID();
  339. cloudData->clientGUID=packet->guid;
  340. cloudDataList->keyData.Insert(packet->guid,cloudData,true,_FILE_AND_LINE_);
  341. }
  342. else
  343. {
  344. cloudData = cloudDataList->keyData[keyDataListIndex];
  345. if (cloudDataAlreadyUploaded==false)
  346. {
  347. if (forceAddress!=UNASSIGNED_SYSTEM_ADDRESS)
  348. {
  349. cloudData->serverSystemAddress=forceAddress;
  350. cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort());
  351. }
  352. else
  353. {
  354. cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress);
  355. }
  356. if (cloudData->serverSystemAddress.GetPort()==0)
  357. {
  358. // Fix localhost port
  359. cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort());
  360. }
  361. cloudData->clientSystemAddress=packet->systemAddress;
  362. }
  363. if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes-cloudData->dataLengthBytes+dataLengthBytes>maxUploadBytesPerClient)
  364. {
  365. // Undo prior insertion of cloudDataList into cloudData if needed
  366. if (dataRepositoryExists==false)
  367. {
  368. RakNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_);
  369. dataRepository.RemoveAtIndex(dataRepositoryIndex);
  370. }
  371. return;
  372. }
  373. else
  374. {
  375. // Subtract already used bytes we are overwriting
  376. remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes;
  377. }
  378. if (cloudData->allocatedData!=0)
  379. rakFree_Ex(cloudData->allocatedData,_FILE_AND_LINE_);
  380. }
  381. if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
  382. {
  383. // Data already allocated
  384. cloudData->allocatedData=data;
  385. cloudData->dataPtr=data;
  386. }
  387. else
  388. {
  389. // Read to stack
  390. if (dataLengthBytes>0)
  391. bsIn.ReadAlignedBytes(cloudData->stackData,dataLengthBytes);
  392. cloudData->allocatedData=0;
  393. cloudData->dataPtr=cloudData->stackData;
  394. }
  395. // Update how many bytes were written for this data
  396. cloudData->dataLengthBytes=dataLengthBytes;
  397. remoteCloudClient->uploadedBytes+=dataLengthBytes;
  398. if (cloudDataAlreadyUploaded==false)
  399. {
  400. // New data field
  401. SendUploadedKeyToServers(cloudDataList->key);
  402. }
  403. // Existing data field changed
  404. NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, true );
  405. NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, true );
  406. // Send update to all remote servers that subscribed to this key
  407. NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, true);
  408. // I could have also subscribed to a key not yet updated locally
  409. // This means I have to go through every RemoteClient that wants this key
  410. // Seems like cloudData->specificSubscribers is unnecessary in that case
  411. }
  412. void CloudServer::OnReleaseRequest(Packet *packet)
  413. {
  414. RakNet::BitStream bsIn(packet->data, packet->length, false);
  415. bsIn.IgnoreBytes(sizeof(MessageID));
  416. uint16_t keyCount;
  417. bsIn.Read(keyCount);
  418. if (keyCount==0)
  419. return;
  420. DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid);
  421. if (remoteSystemIndex.IsInvalid()==true)
  422. return;
  423. RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
  424. CloudKey key;
  425. // Read all in a list first so I can run filter on it
  426. DataStructures::List<CloudKey> cloudKeys;
  427. for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++)
  428. {
  429. key.Serialize(false, &bsIn);
  430. cloudKeys.Push(key, _FILE_AND_LINE_);
  431. }
  432. for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
  433. {
  434. if (queryFilters[filterIndex]->OnReleaseRequest(packet->guid, packet->systemAddress, cloudKeys)==false)
  435. return;
  436. }
  437. for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++)
  438. {
  439. // Serialize in list above so I can run the filter on it
  440. // key.Serialize(false, &bsIn);
  441. key=cloudKeys[keyCountIndex];
  442. // Remove remote systems uploaded keys
  443. bool objectExists;
  444. unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists);
  445. if (objectExists)
  446. {
  447. bool dataRepositoryExists;
  448. unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists);
  449. CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex];
  450. RakAssert(cloudDataList);
  451. CloudData *cloudData;
  452. bool keyDataListExists;
  453. unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists);
  454. cloudData = cloudDataList->keyData[keyDataListIndex];
  455. remoteCloudClient->uploadedKeys.RemoveAtIndex(uploadedKeysIndex);
  456. remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes;
  457. cloudDataList->uploaderCount--;
  458. // Broadcast destruction of this key to subscribers
  459. NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false );
  460. NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false );
  461. NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false );
  462. cloudData->Clear();
  463. if (cloudData->IsUnused())
  464. {
  465. RakNet::OP_DELETE(cloudData, _FILE_AND_LINE_);
  466. cloudDataList->keyData.RemoveAtIndex(keyDataListIndex);
  467. if (cloudDataList->IsNotUploaded())
  468. {
  469. // Tell other servers that this key is no longer uploaded, so they do not request it from us
  470. RemoveUploadedKeyFromServers(cloudDataList->key);
  471. }
  472. if (cloudDataList->IsUnused())
  473. {
  474. RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
  475. dataRepository.RemoveAtIndex(dataRepositoryIndex);
  476. }
  477. }
  478. if (remoteCloudClient->IsUnused())
  479. {
  480. RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
  481. remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
  482. break;
  483. }
  484. }
  485. }
  486. }
  487. void CloudServer::OnGetRequest(Packet *packet)
  488. {
  489. RakNet::BitStream bsIn(packet->data, packet->length, false);
  490. bsIn.IgnoreBytes(sizeof(MessageID));
  491. uint16_t specificSystemsCount;
  492. CloudKey cloudKey;
  493. // Create a new GetRequest
  494. GetRequest *getRequest;
  495. getRequest = RakNet::OP_NEW<GetRequest>(_FILE_AND_LINE_);
  496. getRequest->cloudQueryWithAddresses.cloudQuery.Serialize(false, &bsIn);
  497. getRequest->requestingClient=packet->guid;
  498. RakNetGUID addressOrGuid;
  499. bsIn.Read(specificSystemsCount);
  500. for (uint16_t i=0; i < specificSystemsCount; i++)
  501. {
  502. bsIn.Read(addressOrGuid);
  503. getRequest->cloudQueryWithAddresses.specificSystems.Push(addressOrGuid, _FILE_AND_LINE_);
  504. }
  505. if (getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size()==0)
  506. {
  507. RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
  508. return;
  509. }
  510. for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
  511. {
  512. if (queryFilters[filterIndex]->OnGetRequest(packet->guid, packet->systemAddress, getRequest->cloudQueryWithAddresses.cloudQuery, getRequest->cloudQueryWithAddresses.specificSystems )==false)
  513. return;
  514. }
  515. getRequest->requestStartTime=RakNet::GetTime();
  516. getRequest->requestId=nextGetRequestId++;
  517. // Send request to servers that have this data
  518. DataStructures::List<RemoteServer*> remoteServersWithData;
  519. GetServersWithUploadedKeys(getRequest->cloudQueryWithAddresses.cloudQuery.keys, remoteServersWithData);
  520. if (remoteServersWithData.Size()==0)
  521. {
  522. ProcessAndTransmitGetRequest(getRequest);
  523. }
  524. else
  525. {
  526. RakNet::BitStream bsOut;
  527. bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
  528. bsOut.Write((MessageID)STSC_PROCESS_GET_REQUEST);
  529. getRequest->cloudQueryWithAddresses.Serialize(true, &bsOut);
  530. bsOut.Write(getRequest->requestId);
  531. for (unsigned int remoteServerIndex=0; remoteServerIndex < remoteServersWithData.Size(); remoteServerIndex++)
  532. {
  533. BufferedGetResponseFromServer* bufferedGetResponseFromServer = RakNet::OP_NEW<BufferedGetResponseFromServer>(_FILE_AND_LINE_);
  534. bufferedGetResponseFromServer->serverAddress=remoteServersWithData[remoteServerIndex]->serverAddress;
  535. bufferedGetResponseFromServer->gotResult=false;
  536. getRequest->remoteServerResponses.Insert(remoteServersWithData[remoteServerIndex]->serverAddress, bufferedGetResponseFromServer, true, _FILE_AND_LINE_);
  537. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServersWithData[remoteServerIndex]->serverAddress, false);
  538. }
  539. // Record that this system made this request
  540. getRequests.Insert(getRequest->requestId, getRequest, true, _FILE_AND_LINE_);
  541. }
  542. if (getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults)
  543. {
  544. // Add to key subscription list for the client, which contains a keyId / specificUploaderList pair
  545. DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid);
  546. RemoteCloudClient *remoteCloudClient;
  547. if (remoteSystemsHashIndex.IsInvalid())
  548. {
  549. remoteCloudClient = RakNet::OP_NEW<RemoteCloudClient>(_FILE_AND_LINE_);
  550. remoteCloudClient->uploadedBytes=0;
  551. remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_);
  552. }
  553. else
  554. {
  555. remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex);
  556. }
  557. unsigned int keyIndex;
  558. for (keyIndex=0; keyIndex < getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size(); keyIndex++)
  559. {
  560. cloudKey = getRequest->cloudQueryWithAddresses.cloudQuery.keys[keyIndex];
  561. unsigned int keySubscriberIndex;
  562. bool hasKeySubscriber;
  563. keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber);
  564. KeySubscriberID* keySubscriberId;
  565. if (hasKeySubscriber)
  566. {
  567. DataStructures::List<RakNetGUID> specificSystems;
  568. UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems);
  569. }
  570. keySubscriberId = RakNet::OP_NEW<KeySubscriberID>(_FILE_AND_LINE_);
  571. keySubscriberId->key=cloudKey;
  572. unsigned int specificSystemIndex;
  573. for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
  574. {
  575. keySubscriberId->specificSystemsSubscribedTo.Insert(getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], true, _FILE_AND_LINE_);
  576. }
  577. remoteCloudClient->subscribedKeys.InsertAtIndex(keySubscriberId, keySubscriberIndex, _FILE_AND_LINE_);
  578. // Add CloudData in a similar way
  579. unsigned int dataRepositoryIndex;
  580. bool dataRepositoryExists;
  581. CloudDataList* cloudDataList = GetOrAllocateCloudDataList(cloudKey, &dataRepositoryExists, dataRepositoryIndex);
  582. // If this is the first local client to subscribe to this key, call SendSubscribedKeyToServers
  583. if (cloudDataList->subscriberCount==0)
  584. SendSubscribedKeyToServers(cloudKey);
  585. // If the subscription is specific, may have to also allocate CloudData
  586. if (getRequest->cloudQueryWithAddresses.specificSystems.Size())
  587. {
  588. CloudData *cloudData;
  589. bool keyDataListExists;
  590. unsigned int specificSystemIndex;
  591. for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
  592. {
  593. RakNetGUID specificSystem = getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex];
  594. unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSystem, &keyDataListExists);
  595. if (keyDataListExists==false)
  596. {
  597. cloudData = RakNet::OP_NEW<CloudData>(_FILE_AND_LINE_);
  598. cloudData->dataLengthBytes=0;
  599. cloudData->allocatedData=0;
  600. cloudData->isUploaded=false;
  601. cloudData->dataPtr=0;
  602. cloudData->serverSystemAddress=UNASSIGNED_SYSTEM_ADDRESS;
  603. cloudData->clientSystemAddress=UNASSIGNED_SYSTEM_ADDRESS;
  604. cloudData->serverGUID=rakPeerInterface->GetMyGUID();
  605. cloudData->clientGUID=specificSystem;
  606. cloudDataList->keyData.Insert(specificSystem,cloudData,true,_FILE_AND_LINE_);
  607. }
  608. else
  609. {
  610. cloudData = cloudDataList->keyData[keyDataListIndex];
  611. }
  612. ++cloudDataList->subscriberCount;
  613. cloudData->specificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_);
  614. }
  615. }
  616. else
  617. {
  618. ++cloudDataList->subscriberCount;
  619. cloudDataList->nonSpecificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_);
  620. // Remove packet->guid from CloudData::specificSubscribers among all instances of cloudDataList->keyData
  621. unsigned int subscribedKeysIndex;
  622. bool subscribedKeysIndexExists;
  623. subscribedKeysIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudDataList->key, &subscribedKeysIndexExists);
  624. if (subscribedKeysIndexExists)
  625. {
  626. KeySubscriberID* keySubscriberId;
  627. keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex];
  628. unsigned int specificSystemIndex;
  629. for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++)
  630. {
  631. bool keyDataExists;
  632. unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists);
  633. if (keyDataExists)
  634. {
  635. CloudData *keyData = cloudDataList->keyData[keyDataIndex];
  636. keyData->specificSubscribers.Remove(packet->guid);
  637. --cloudDataList->subscriberCount;
  638. }
  639. }
  640. }
  641. }
  642. }
  643. if (remoteCloudClient->subscribedKeys.Size()==0)
  644. {
  645. // Didn't do anything
  646. remoteSystems.Remove(packet->guid, _FILE_AND_LINE_);
  647. RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
  648. }
  649. }
  650. if (remoteServersWithData.Size()==0)
  651. RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
  652. }
  653. void CloudServer::OnUnsubscribeRequest(Packet *packet)
  654. {
  655. RakNet::BitStream bsIn(packet->data, packet->length, false);
  656. bsIn.IgnoreBytes(sizeof(MessageID));
  657. DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid);
  658. if (remoteSystemIndex.IsInvalid()==true)
  659. return;
  660. RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
  661. uint16_t keyCount, specificSystemCount;
  662. DataStructures::List<CloudKey> cloudKeys;
  663. DataStructures::List<RakNetGUID> specificSystems;
  664. uint16_t index;
  665. CloudKey cloudKey;
  666. bsIn.Read(keyCount);
  667. for (index=0; index < keyCount; index++)
  668. {
  669. cloudKey.Serialize(false, &bsIn);
  670. cloudKeys.Push(cloudKey, _FILE_AND_LINE_);
  671. }
  672. RakNetGUID specificSystem;
  673. bsIn.Read(specificSystemCount);
  674. for (index=0; index < specificSystemCount; index++)
  675. {
  676. bsIn.Read(specificSystem);
  677. specificSystems.Push(specificSystem, _FILE_AND_LINE_);
  678. }
  679. for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
  680. {
  681. if (queryFilters[filterIndex]->OnUnsubscribeRequest(packet->guid, packet->systemAddress, cloudKeys, specificSystems )==false)
  682. return;
  683. }
  684. // CloudDataList *cloudDataList;
  685. bool dataRepositoryExists;
  686. // unsigned int dataRepositoryIndex;
  687. for (index=0; index < keyCount; index++)
  688. {
  689. CloudKey cloudKey = cloudKeys[index];
  690. // dataRepositoryIndex =
  691. dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists);
  692. if (dataRepositoryExists==false)
  693. continue;
  694. // cloudDataList = dataRepository[dataRepositoryIndex];
  695. unsigned int keySubscriberIndex;
  696. bool hasKeySubscriber;
  697. keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber);
  698. if (hasKeySubscriber==false)
  699. continue;
  700. UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems);
  701. }
  702. if (remoteCloudClient->IsUnused())
  703. {
  704. RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
  705. remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
  706. }
  707. }
  708. void CloudServer::OnServerToServerGetRequest(Packet *packet)
  709. {
  710. // unsigned int remoteServerIndex;
  711. bool objectExists;
  712. //remoteServerIndex =
  713. remoteServers.GetIndexFromKey(packet->guid, &objectExists);
  714. if (objectExists==false)
  715. return;
  716. RakNet::BitStream bsIn(packet->data, packet->length, false);
  717. bsIn.IgnoreBytes(sizeof(MessageID)*2);
  718. CloudQueryWithAddresses cloudQueryWithAddresses;
  719. uint32_t requestId;
  720. cloudQueryWithAddresses.Serialize(false, &bsIn);
  721. bsIn.Read(requestId);
  722. DataStructures::List<CloudData*> cloudDataResultList;
  723. DataStructures::List<CloudKey> cloudKeyResultList;
  724. ProcessCloudQueryWithAddresses(cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList);
  725. RakNet::BitStream bsOut;
  726. bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
  727. bsOut.Write((MessageID)STSC_PROCESS_GET_RESPONSE);
  728. bsOut.Write(requestId);
  729. WriteCloudQueryRowFromResultList(cloudDataResultList, cloudKeyResultList, &bsOut);
  730. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, packet->guid, false);
  731. }
  732. void CloudServer::OnServerToServerGetResponse(Packet *packet)
  733. {
  734. unsigned int remoteServerIndex;
  735. bool objectExists;
  736. remoteServerIndex = remoteServers.GetIndexFromKey(packet->guid, &objectExists);
  737. if (objectExists==false)
  738. return;
  739. RemoteServer *remoteServer = remoteServers[remoteServerIndex];
  740. if (remoteServer==0)
  741. return;
  742. RakNet::BitStream bsIn(packet->data, packet->length, false);
  743. bsIn.IgnoreBytes(sizeof(MessageID)*2);
  744. uint32_t requestId;
  745. bsIn.Read(requestId);
  746. // Lookup request id
  747. bool hasGetRequest;
  748. unsigned int getRequestIndex;
  749. getRequestIndex = getRequests.GetIndexFromKey(requestId, &hasGetRequest);
  750. if (hasGetRequest==false)
  751. return;
  752. GetRequest *getRequest = getRequests[getRequestIndex];
  753. bool hasRemoteServer;
  754. unsigned int remoteServerResponsesIndex;
  755. remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(packet->guid, &hasRemoteServer);
  756. if (hasRemoteServer==false)
  757. return;
  758. BufferedGetResponseFromServer *bufferedGetResponseFromServer;
  759. bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponsesIndex];
  760. if (bufferedGetResponseFromServer->gotResult==true)
  761. return;
  762. bufferedGetResponseFromServer->gotResult=true;
  763. uint32_t numRows;
  764. bufferedGetResponseFromServer->queryResult.SerializeNumRows(false, numRows, &bsIn);
  765. bufferedGetResponseFromServer->queryResult.SerializeCloudQueryRows(false, numRows, &bsIn, this);
  766. // If all results returned, then also process locally, and return to user
  767. if (getRequest->AllRemoteServersHaveResponded())
  768. {
  769. ProcessAndTransmitGetRequest(getRequest);
  770. getRequest->Clear(this);
  771. RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
  772. getRequests.RemoveAtIndex(getRequestIndex);
  773. }
  774. }
  775. void CloudServer::OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason )
  776. {
  777. (void) lostConnectionReason;
  778. (void) systemAddress;
  779. unsigned int remoteServerIndex;
  780. bool objectExists;
  781. remoteServerIndex = remoteServers.GetIndexFromKey(rakNetGUID, &objectExists);
  782. if (objectExists)
  783. {
  784. // Update remoteServerResponses by removing this server and sending the response if it is now complete
  785. unsigned int getRequestIndex=0;
  786. while (getRequestIndex < getRequests.Size())
  787. {
  788. GetRequest *getRequest = getRequests[getRequestIndex];
  789. bool waitingForThisServer;
  790. unsigned int remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(rakNetGUID, &waitingForThisServer);
  791. if (waitingForThisServer)
  792. {
  793. getRequest->remoteServerResponses[remoteServerResponsesIndex]->Clear(this);
  794. RakNet::OP_DELETE(getRequest->remoteServerResponses[remoteServerResponsesIndex], _FILE_AND_LINE_);
  795. getRequest->remoteServerResponses.RemoveAtIndex(remoteServerResponsesIndex);
  796. if (getRequest->AllRemoteServersHaveResponded())
  797. {
  798. ProcessAndTransmitGetRequest(getRequest);
  799. getRequest->Clear(this);
  800. RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
  801. getRequests.RemoveAtIndex(getRequestIndex);
  802. }
  803. else
  804. getRequestIndex++;
  805. }
  806. else
  807. getRequestIndex++;
  808. }
  809. RakNet::OP_DELETE(remoteServers[remoteServerIndex],_FILE_AND_LINE_);
  810. remoteServers.RemoveAtIndex(remoteServerIndex);
  811. }
  812. DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(rakNetGUID);
  813. if (remoteSystemIndex.IsInvalid()==false)
  814. {
  815. RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
  816. unsigned int uploadedKeysIndex;
  817. for (uploadedKeysIndex=0; uploadedKeysIndex < remoteCloudClient->uploadedKeys.Size(); uploadedKeysIndex++)
  818. {
  819. // Delete keys this system has uploaded
  820. bool keyDataRepositoryExists;
  821. unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->uploadedKeys[uploadedKeysIndex], &keyDataRepositoryExists);
  822. if (keyDataRepositoryExists)
  823. {
  824. CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex];
  825. bool keyDataExists;
  826. unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(rakNetGUID, &keyDataExists);
  827. if (keyDataExists)
  828. {
  829. CloudData *cloudData = cloudDataList->keyData[keyDataIndex];
  830. cloudDataList->uploaderCount--;
  831. NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false );
  832. NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false );
  833. NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false );
  834. cloudData->Clear();
  835. if (cloudData->IsUnused())
  836. {
  837. RakNet::OP_DELETE(cloudData,_FILE_AND_LINE_);
  838. cloudDataList->keyData.RemoveAtIndex(keyDataIndex);
  839. if (cloudDataList->IsNotUploaded())
  840. {
  841. // Tell other servers that this key is no longer uploaded, so they do not request it from us
  842. RemoveUploadedKeyFromServers(cloudDataList->key);
  843. }
  844. if (cloudDataList->IsUnused())
  845. {
  846. // Tell other servers that this key is no longer uploaded, so they do not request it from us
  847. RemoveUploadedKeyFromServers(cloudDataList->key);
  848. RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
  849. dataRepository.RemoveAtIndex(dataRepositoryIndex);
  850. }
  851. }
  852. }
  853. }
  854. }
  855. unsigned int subscribedKeysIndex;
  856. for (subscribedKeysIndex=0; subscribedKeysIndex < remoteCloudClient->subscribedKeys.Size(); subscribedKeysIndex++)
  857. {
  858. KeySubscriberID* keySubscriberId;
  859. keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex];
  860. bool keyDataRepositoryExists;
  861. unsigned int keyDataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->subscribedKeys[subscribedKeysIndex]->key, &keyDataRepositoryExists);
  862. if (keyDataRepositoryExists)
  863. {
  864. CloudDataList* cloudDataList = dataRepository[keyDataRepositoryIndex];
  865. if (keySubscriberId->specificSystemsSubscribedTo.Size()==0)
  866. {
  867. cloudDataList->nonSpecificSubscribers.Remove(rakNetGUID);
  868. --cloudDataList->subscriberCount;
  869. }
  870. else
  871. {
  872. unsigned int specificSystemIndex;
  873. for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++)
  874. {
  875. bool keyDataExists;
  876. unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists);
  877. if (keyDataExists)
  878. {
  879. CloudData *keyData = cloudDataList->keyData[keyDataIndex];
  880. keyData->specificSubscribers.Remove(rakNetGUID);
  881. --cloudDataList->subscriberCount;
  882. }
  883. }
  884. }
  885. }
  886. RakNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_);
  887. }
  888. // Delete and remove from remoteSystems
  889. RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
  890. remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
  891. }
  892. }
  893. void CloudServer::OnRakPeerShutdown(void)
  894. {
  895. Clear();
  896. }
  897. void CloudServer::Clear(void)
  898. {
  899. unsigned int i,j;
  900. for (i=0; i < dataRepository.Size(); i++)
  901. {
  902. CloudDataList *cloudDataList = dataRepository[i];
  903. for (j=0; j < cloudDataList->keyData.Size(); j++)
  904. {
  905. cloudDataList->keyData[j]->Clear();
  906. RakNet::OP_DELETE(cloudDataList->keyData[j], _FILE_AND_LINE_);
  907. }
  908. RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
  909. }
  910. dataRepository.Clear(false, _FILE_AND_LINE_);
  911. for (i=0; i < remoteServers.Size(); i++)
  912. {
  913. RakNet::OP_DELETE(remoteServers[i], _FILE_AND_LINE_);
  914. }
  915. remoteServers.Clear(false, _FILE_AND_LINE_);
  916. for (i=0; i < getRequests.Size(); i++)
  917. {
  918. GetRequest *getRequest = getRequests[i];
  919. getRequest->Clear(this);
  920. RakNet::OP_DELETE(getRequests[i], _FILE_AND_LINE_);
  921. }
  922. getRequests.Clear(false, _FILE_AND_LINE_);
  923. DataStructures::List<RakNetGUID> keyList;
  924. DataStructures::List<RemoteCloudClient*> itemList;
  925. remoteSystems.GetAsList(itemList, keyList, _FILE_AND_LINE_);
  926. for (i=0; i < itemList.Size(); i++)
  927. {
  928. RemoteCloudClient* remoteCloudClient = itemList[i];
  929. for (j=0; j < remoteCloudClient->subscribedKeys.Size(); j++)
  930. {
  931. RakNet::OP_DELETE(remoteCloudClient->subscribedKeys[j], _FILE_AND_LINE_);
  932. }
  933. RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
  934. }
  935. remoteSystems.Clear(_FILE_AND_LINE_);
  936. }
  937. void CloudServer::WriteCloudQueryRowFromResultList(DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut)
  938. {
  939. bsOut->WriteCasted<uint32_t>(cloudKeyResultList.Size());
  940. unsigned int i;
  941. for (i=0; i < cloudKeyResultList.Size(); i++)
  942. {
  943. WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, bsOut);
  944. }
  945. }
  946. void CloudServer::WriteCloudQueryRowFromResultList(unsigned int i, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut)
  947. {
  948. CloudQueryRow cloudQueryRow;
  949. CloudData *cloudData = cloudDataResultList[i];
  950. cloudQueryRow.key=cloudKeyResultList[i];
  951. cloudQueryRow.data=cloudData->dataPtr;
  952. cloudQueryRow.length=cloudData->dataLengthBytes;
  953. cloudQueryRow.serverSystemAddress=cloudData->serverSystemAddress;
  954. cloudQueryRow.clientSystemAddress=cloudData->clientSystemAddress;
  955. cloudQueryRow.serverGUID=cloudData->serverGUID;
  956. cloudQueryRow.clientGUID=cloudData->clientGUID;
  957. cloudQueryRow.Serialize(true, bsOut, 0);
  958. }
  959. void CloudServer::NotifyClientSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated )
  960. {
  961. RakNet::BitStream bsOut;
  962. bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION);
  963. bsOut.Write(wasUpdated);
  964. CloudQueryRow row;
  965. row.key=key;
  966. row.data=cloudData->dataPtr;
  967. row.length=cloudData->dataLengthBytes;
  968. row.serverSystemAddress=cloudData->serverSystemAddress;
  969. row.clientSystemAddress=cloudData->clientSystemAddress;
  970. row.serverGUID=cloudData->serverGUID;
  971. row.clientGUID=cloudData->clientGUID;
  972. row.Serialize(true,&bsOut,0);
  973. unsigned int i;
  974. for (i=0; i < subscribers.Size(); i++)
  975. {
  976. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false);
  977. }
  978. }
  979. void CloudServer::NotifyClientSubscribersOfDataChange( CloudQueryRow *row, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated )
  980. {
  981. RakNet::BitStream bsOut;
  982. bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION);
  983. bsOut.Write(wasUpdated);
  984. row->Serialize(true,&bsOut,0);
  985. unsigned int i;
  986. for (i=0; i < subscribers.Size(); i++)
  987. {
  988. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false);
  989. }
  990. }
  991. void CloudServer::NotifyServerSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, bool wasUpdated )
  992. {
  993. // Find every server that has subscribed
  994. // Send them change notifications
  995. RakNet::BitStream bsOut;
  996. bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
  997. bsOut.Write((MessageID)STSC_DATA_CHANGED);
  998. bsOut.Write(wasUpdated);
  999. CloudQueryRow row;
  1000. row.key=key;
  1001. row.data=cloudData->dataPtr;
  1002. row.length=cloudData->dataLengthBytes;
  1003. row.serverSystemAddress=cloudData->serverSystemAddress;
  1004. row.clientSystemAddress=cloudData->clientSystemAddress;
  1005. row.serverGUID=cloudData->serverGUID;
  1006. row.clientGUID=cloudData->clientGUID;
  1007. row.Serialize(true,&bsOut,0);
  1008. unsigned int i;
  1009. for (i=0; i < remoteServers.Size(); i++)
  1010. {
  1011. if (remoteServers[i]->gotSubscribedAndUploadedKeys==false || remoteServers[i]->subscribedKeys.HasData(key))
  1012. {
  1013. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
  1014. }
  1015. }
  1016. }
  1017. void CloudServer::AddServer(RakNetGUID systemIdentifier)
  1018. {
  1019. ConnectionState cs = rakPeerInterface->GetConnectionState(systemIdentifier);
  1020. if (cs==IS_DISCONNECTED || cs==IS_NOT_CONNECTED)
  1021. return;
  1022. bool objectExists;
  1023. unsigned int index = remoteServers.GetIndexFromKey(systemIdentifier,&objectExists);
  1024. if (objectExists==false)
  1025. {
  1026. RemoteServer *remoteServer = RakNet::OP_NEW<RemoteServer>(_FILE_AND_LINE_);
  1027. remoteServer->gotSubscribedAndUploadedKeys=false;
  1028. remoteServer->serverAddress=systemIdentifier;
  1029. remoteServers.InsertAtIndex(remoteServer, index, _FILE_AND_LINE_);
  1030. SendUploadedAndSubscribedKeysToServer(systemIdentifier);
  1031. }
  1032. }
  1033. void CloudServer::RemoveServer(RakNetGUID systemAddress)
  1034. {
  1035. bool objectExists;
  1036. unsigned int index = remoteServers.GetIndexFromKey(systemAddress,&objectExists);
  1037. if (objectExists==true)
  1038. {
  1039. RakNet::OP_DELETE(remoteServers[index],_FILE_AND_LINE_);
  1040. remoteServers.RemoveAtIndex(index);
  1041. }
  1042. }
  1043. void CloudServer::GetRemoteServers(DataStructures::List<RakNetGUID> &remoteServersOut)
  1044. {
  1045. remoteServersOut.Clear(true, _FILE_AND_LINE_);
  1046. unsigned int i;
  1047. for (i=0; i < remoteServers.Size(); i++)
  1048. {
  1049. remoteServersOut.Push(remoteServers[i]->serverAddress, _FILE_AND_LINE_);
  1050. }
  1051. }
  1052. void CloudServer::ProcessAndTransmitGetRequest(GetRequest *getRequest)
  1053. {
  1054. RakNet::BitStream bsOut;
  1055. bsOut.Write((MessageID) ID_CLOUD_GET_RESPONSE);
  1056. // BufferedGetResponseFromServer getResponse;
  1057. CloudQueryResult cloudQueryResult;
  1058. cloudQueryResult.cloudQuery=getRequest->cloudQueryWithAddresses.cloudQuery;
  1059. cloudQueryResult.subscribeToResults=getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults;
  1060. cloudQueryResult.SerializeHeader(true, &bsOut);
  1061. DataStructures::List<CloudData*> cloudDataResultList;
  1062. DataStructures::List<CloudKey> cloudKeyResultList;
  1063. ProcessCloudQueryWithAddresses(getRequest->cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList);
  1064. bool unlimitedRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn==0;
  1065. uint32_t localNumRows = (uint32_t) cloudDataResultList.Size();
  1066. if (unlimitedRows==false &&
  1067. localNumRows > getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex &&
  1068. localNumRows - getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex > getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn )
  1069. localNumRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex + getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn;
  1070. BitSize_t bitStreamOffset = bsOut.GetWriteOffset();
  1071. uint32_t localRowsToWrite;
  1072. unsigned int skipRows;
  1073. if (localNumRows>getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex)
  1074. {
  1075. localRowsToWrite=localNumRows-getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex;
  1076. skipRows=0;
  1077. }
  1078. else
  1079. {
  1080. localRowsToWrite=0;
  1081. skipRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex-localNumRows;
  1082. }
  1083. cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut);
  1084. for (unsigned int i=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex; i < localNumRows; i++)
  1085. {
  1086. WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, &bsOut);
  1087. }
  1088. // Append remote systems for remaining rows
  1089. if (unlimitedRows==true || getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn>localRowsToWrite)
  1090. {
  1091. uint32_t remainingRows=0;
  1092. uint32_t additionalRowsWritten=0;
  1093. if (unlimitedRows==false)
  1094. remainingRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn-localRowsToWrite;
  1095. unsigned int remoteServerResponseIndex;
  1096. for (remoteServerResponseIndex=0; remoteServerResponseIndex < getRequest->remoteServerResponses.Size(); remoteServerResponseIndex++)
  1097. {
  1098. BufferedGetResponseFromServer *bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponseIndex];
  1099. unsigned int cloudQueryRowIndex;
  1100. for (cloudQueryRowIndex=0; cloudQueryRowIndex < bufferedGetResponseFromServer->queryResult.rowsReturned.Size(); cloudQueryRowIndex++)
  1101. {
  1102. if (skipRows>0)
  1103. {
  1104. --skipRows;
  1105. continue;
  1106. }
  1107. bufferedGetResponseFromServer->queryResult.rowsReturned[cloudQueryRowIndex]->Serialize(true, &bsOut, this);
  1108. ++additionalRowsWritten;
  1109. if (unlimitedRows==false && --remainingRows==0)
  1110. break;
  1111. }
  1112. if (unlimitedRows==false && remainingRows==0)
  1113. break;
  1114. }
  1115. if (additionalRowsWritten>0)
  1116. {
  1117. BitSize_t curOffset = bsOut.GetWriteOffset();
  1118. bsOut.SetWriteOffset(bitStreamOffset);
  1119. localRowsToWrite+=additionalRowsWritten;
  1120. cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut);
  1121. bsOut.SetWriteOffset(curOffset);
  1122. }
  1123. }
  1124. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, getRequest->requestingClient, false);
  1125. }
  1126. void CloudServer::ProcessCloudQueryWithAddresses( CloudServer::CloudQueryWithAddresses &cloudQueryWithAddresses, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList )
  1127. {
  1128. CloudQueryResult cloudQueryResult;
  1129. CloudQueryRow cloudQueryRow;
  1130. unsigned int queryIndex;
  1131. bool dataRepositoryExists;
  1132. CloudDataList* cloudDataList;
  1133. unsigned int keyDataIndex;
  1134. // If specificSystems list empty, applies to all systems
  1135. // For each of keys in cloudQueryWithAddresses, return that data, limited by maxRowsToReturn
  1136. for (queryIndex=0; queryIndex < cloudQueryWithAddresses.cloudQuery.keys.Size(); queryIndex++)
  1137. {
  1138. const CloudKey &key = cloudQueryWithAddresses.cloudQuery.keys[queryIndex];
  1139. unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists);
  1140. if (dataRepositoryExists)
  1141. {
  1142. cloudDataList=dataRepository[dataRepositoryIndex];
  1143. if (cloudDataList->uploaderCount>0)
  1144. {
  1145. // Return all keyData that was uploaded by specificSystems, or all if not specified
  1146. if (cloudQueryWithAddresses.specificSystems.Size()>0)
  1147. {
  1148. // Return data for matching systems
  1149. unsigned int specificSystemIndex;
  1150. for (specificSystemIndex=0; specificSystemIndex < cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
  1151. {
  1152. bool uploaderExists;
  1153. keyDataIndex = cloudDataList->keyData.GetIndexFromKey(cloudQueryWithAddresses.specificSystems[specificSystemIndex], &uploaderExists);
  1154. if (uploaderExists)
  1155. {
  1156. cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_);
  1157. cloudKeyResultList.Push(key, _FILE_AND_LINE_);
  1158. }
  1159. }
  1160. }
  1161. else
  1162. {
  1163. // Return data for all systems
  1164. for (keyDataIndex=0; keyDataIndex < cloudDataList->keyData.Size(); keyDataIndex++)
  1165. {
  1166. cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_);
  1167. cloudKeyResultList.Push(key, _FILE_AND_LINE_);
  1168. }
  1169. }
  1170. }
  1171. }
  1172. }
  1173. }
  1174. void CloudServer::SendUploadedAndSubscribedKeysToServer( RakNetGUID systemAddress )
  1175. {
  1176. RakNet::BitStream bsOut;
  1177. bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
  1178. bsOut.Write((MessageID)STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS);
  1179. bsOut.WriteCasted<uint16_t>(dataRepository.Size());
  1180. for (unsigned int i=0; i < dataRepository.Size(); i++)
  1181. dataRepository[i]->key.Serialize(true, &bsOut);
  1182. BitSize_t startOffset, endOffset;
  1183. uint16_t subscribedKeyCount=0;
  1184. startOffset=bsOut.GetWriteOffset();
  1185. bsOut.WriteCasted<uint16_t>(subscribedKeyCount);
  1186. for (unsigned int i=0; i < dataRepository.Size(); i++)
  1187. {
  1188. if (dataRepository[i]->subscriberCount>0)
  1189. {
  1190. dataRepository[i]->key.Serialize(true, &bsOut);
  1191. subscribedKeyCount++;
  1192. }
  1193. }
  1194. endOffset=bsOut.GetWriteOffset();
  1195. bsOut.SetWriteOffset(startOffset);
  1196. bsOut.WriteCasted<uint16_t>(subscribedKeyCount);
  1197. bsOut.SetWriteOffset(endOffset);
  1198. if (dataRepository.Size()>0 || subscribedKeyCount>0)
  1199. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, systemAddress, false);
  1200. }
  1201. void CloudServer::SendUploadedKeyToServers( CloudKey &cloudKey )
  1202. {
  1203. RakNet::BitStream bsOut;
  1204. bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
  1205. bsOut.Write((MessageID)STSC_ADD_UPLOADED_KEY);
  1206. cloudKey.Serialize(true, &bsOut);
  1207. for (unsigned int i=0; i < remoteServers.Size(); i++)
  1208. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
  1209. }
  1210. void CloudServer::SendSubscribedKeyToServers( CloudKey &cloudKey )
  1211. {
  1212. RakNet::BitStream bsOut;
  1213. bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
  1214. bsOut.Write((MessageID)STSC_ADD_SUBSCRIBED_KEY);
  1215. cloudKey.Serialize(true, &bsOut);
  1216. for (unsigned int i=0; i < remoteServers.Size(); i++)
  1217. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
  1218. }
  1219. void CloudServer::RemoveUploadedKeyFromServers( CloudKey &cloudKey )
  1220. {
  1221. RakNet::BitStream bsOut;
  1222. bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
  1223. bsOut.Write((MessageID)STSC_REMOVE_UPLOADED_KEY);
  1224. cloudKey.Serialize(true, &bsOut);
  1225. for (unsigned int i=0; i < remoteServers.Size(); i++)
  1226. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
  1227. }
  1228. void CloudServer::RemoveSubscribedKeyFromServers( CloudKey &cloudKey )
  1229. {
  1230. RakNet::BitStream bsOut;
  1231. bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
  1232. bsOut.Write((MessageID)STSC_REMOVE_SUBSCRIBED_KEY);
  1233. cloudKey.Serialize(true, &bsOut);
  1234. for (unsigned int i=0; i < remoteServers.Size(); i++)
  1235. SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
  1236. }
  1237. void CloudServer::OnSendUploadedAndSubscribedKeysToServer( Packet *packet )
  1238. {
  1239. RakNet::BitStream bsIn(packet->data, packet->length, false);
  1240. bsIn.IgnoreBytes(sizeof(MessageID)*2);
  1241. bool objectExists;
  1242. unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
  1243. if (objectExists==false)
  1244. return;
  1245. RemoteServer *remoteServer = remoteServers[index];
  1246. remoteServer->gotSubscribedAndUploadedKeys=true;
  1247. // unsigned int insertionIndex;
  1248. bool alreadyHasKey;
  1249. uint16_t numUploadedKeys, numSubscribedKeys;
  1250. bsIn.Read(numUploadedKeys);
  1251. for (uint16_t i=0; i < numUploadedKeys; i++)
  1252. {
  1253. CloudKey cloudKey;
  1254. cloudKey.Serialize(false, &bsIn);
  1255. // insertionIndex =
  1256. remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
  1257. if (alreadyHasKey==false)
  1258. remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
  1259. }
  1260. bsIn.Read(numSubscribedKeys);
  1261. for (uint16_t i=0; i < numSubscribedKeys; i++)
  1262. {
  1263. CloudKey cloudKey;
  1264. cloudKey.Serialize(false, &bsIn);
  1265. //insertionIndex =
  1266. remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
  1267. if (alreadyHasKey==false)
  1268. remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
  1269. }
  1270. // Potential todo - join servers
  1271. // For each uploaded key that we subscribe to, query it
  1272. // For each subscribed key that we have, send it
  1273. }
  1274. void CloudServer::OnSendUploadedKeyToServers( Packet *packet )
  1275. {
  1276. RakNet::BitStream bsIn(packet->data, packet->length, false);
  1277. bsIn.IgnoreBytes(sizeof(MessageID)*2);
  1278. bool objectExists;
  1279. unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
  1280. if (objectExists==false)
  1281. return;
  1282. RemoteServer *remoteServer = remoteServers[index];
  1283. CloudKey cloudKey;
  1284. cloudKey.Serialize(false, &bsIn);
  1285. // unsigned int insertionIndex;
  1286. bool alreadyHasKey;
  1287. // insertionIndex =
  1288. remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
  1289. if (alreadyHasKey==false)
  1290. remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
  1291. }
  1292. void CloudServer::OnSendSubscribedKeyToServers( Packet *packet )
  1293. {
  1294. RakNet::BitStream bsIn(packet->data, packet->length, false);
  1295. bsIn.IgnoreBytes(sizeof(MessageID)*2);
  1296. bool objectExists;
  1297. unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
  1298. if (objectExists==false)
  1299. return;
  1300. RemoteServer *remoteServer = remoteServers[index];
  1301. CloudKey cloudKey;
  1302. cloudKey.Serialize(false, &bsIn);
  1303. // unsigned int insertionIndex;
  1304. bool alreadyHasKey;
  1305. // insertionIndex =
  1306. remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
  1307. // Do not need to send current values, the Get request will do that as the Get request is sent at the same time
  1308. if (alreadyHasKey==false)
  1309. remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
  1310. }
  1311. void CloudServer::OnRemoveUploadedKeyFromServers( Packet *packet )
  1312. {
  1313. RakNet::BitStream bsIn(packet->data, packet->length, false);
  1314. bsIn.IgnoreBytes(sizeof(MessageID)*2);
  1315. bool objectExists;
  1316. unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
  1317. if (objectExists==false)
  1318. return;
  1319. RemoteServer *remoteServer = remoteServers[index];
  1320. CloudKey cloudKey;
  1321. cloudKey.Serialize(false, &bsIn);
  1322. unsigned int insertionIndex;
  1323. bool alreadyHasKey;
  1324. insertionIndex = remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
  1325. if (alreadyHasKey==true)
  1326. remoteServer->uploadedKeys.RemoveAtIndex(insertionIndex);
  1327. }
  1328. void CloudServer::OnRemoveSubscribedKeyFromServers( Packet *packet )
  1329. {
  1330. RakNet::BitStream bsIn(packet->data, packet->length, false);
  1331. bsIn.IgnoreBytes(sizeof(MessageID)*2);
  1332. bool objectExists;
  1333. unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
  1334. if (objectExists==false)
  1335. return;
  1336. RemoteServer *remoteServer = remoteServers[index];
  1337. CloudKey cloudKey;
  1338. cloudKey.Serialize(false, &bsIn);
  1339. unsigned int insertionIndex;
  1340. bool alreadyHasKey;
  1341. insertionIndex = remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
  1342. if (alreadyHasKey==true)
  1343. remoteServer->subscribedKeys.RemoveAtIndex(insertionIndex);
  1344. }
  1345. void CloudServer::OnServerDataChanged( Packet *packet )
  1346. {
  1347. RakNet::BitStream bsIn(packet->data, packet->length, false);
  1348. bsIn.IgnoreBytes(sizeof(MessageID)*2);
  1349. bool objectExists;
  1350. remoteServers.GetIndexFromKey(packet->guid,&objectExists);
  1351. if (objectExists==false)
  1352. return;
  1353. // Find everyone that cares about this change and relay
  1354. bool wasUpdated=false;
  1355. bsIn.Read(wasUpdated);
  1356. CloudQueryRow row;
  1357. row.Serialize(false, &bsIn, this);
  1358. CloudDataList *cloudDataList;
  1359. bool dataRepositoryExists;
  1360. unsigned int dataRepositoryIndex;
  1361. dataRepositoryIndex = dataRepository.GetIndexFromKey(row.key, &dataRepositoryExists);
  1362. if (dataRepositoryExists==false)
  1363. {
  1364. DeallocateRowData(row.data);
  1365. return;
  1366. }
  1367. cloudDataList = dataRepository[dataRepositoryIndex];
  1368. CloudData *cloudData;
  1369. bool keyDataListExists;
  1370. unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(row.clientGUID, &keyDataListExists);
  1371. if (keyDataListExists==true)
  1372. {
  1373. cloudData = cloudDataList->keyData[keyDataListIndex];
  1374. NotifyClientSubscribersOfDataChange(&row, cloudData->specificSubscribers, wasUpdated );
  1375. }
  1376. NotifyClientSubscribersOfDataChange(&row, cloudDataList->nonSpecificSubscribers, wasUpdated );
  1377. DeallocateRowData(row.data);
  1378. }
  1379. void CloudServer::GetServersWithUploadedKeys(
  1380. DataStructures::List<CloudKey> &keys,
  1381. DataStructures::List<CloudServer::RemoteServer*> &remoteServersWithData
  1382. )
  1383. {
  1384. remoteServersWithData.Clear(true, _FILE_AND_LINE_);
  1385. unsigned int i,j;
  1386. for (i=0; i < remoteServers.Size(); i++)
  1387. {
  1388. remoteServers[i]->workingFlag=false;
  1389. }
  1390. for (i=0; i < remoteServers.Size(); i++)
  1391. {
  1392. if (remoteServers[i]->workingFlag==false)
  1393. {
  1394. if (remoteServers[i]->gotSubscribedAndUploadedKeys==false)
  1395. {
  1396. remoteServers[i]->workingFlag=true;
  1397. remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_);
  1398. }
  1399. else
  1400. {
  1401. remoteServers[i]->workingFlag=false;
  1402. for (j=0; j < keys.Size(); j++)
  1403. {
  1404. if (remoteServers[i]->workingFlag==false && remoteServers[i]->uploadedKeys.HasData(keys[j]))
  1405. {
  1406. remoteServers[i]->workingFlag=true;
  1407. remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_);
  1408. break;
  1409. }
  1410. }
  1411. }
  1412. }
  1413. }
  1414. }
  1415. CloudServer::CloudDataList *CloudServer::GetOrAllocateCloudDataList(CloudKey key, bool *dataRepositoryExists, unsigned int &dataRepositoryIndex)
  1416. {
  1417. CloudDataList *cloudDataList;
  1418. dataRepositoryIndex = dataRepository.GetIndexFromKey(key, dataRepositoryExists);
  1419. if (*dataRepositoryExists==false)
  1420. {
  1421. cloudDataList = RakNet::OP_NEW<CloudDataList>(_FILE_AND_LINE_);
  1422. cloudDataList->key=key;
  1423. cloudDataList->uploaderCount=0;
  1424. cloudDataList->subscriberCount=0;
  1425. dataRepository.InsertAtIndex(cloudDataList,dataRepositoryIndex,_FILE_AND_LINE_);
  1426. }
  1427. else
  1428. {
  1429. cloudDataList = dataRepository[dataRepositoryIndex];
  1430. }
  1431. return cloudDataList;
  1432. }
  1433. void CloudServer::UnsubscribeFromKey(RemoteCloudClient *remoteCloudClient, RakNetGUID remoteCloudClientGuid, unsigned int keySubscriberIndex, CloudKey &cloudKey, DataStructures::List<RakNetGUID> &specificSystems)
  1434. {
  1435. KeySubscriberID* keySubscriberId = remoteCloudClient->subscribedKeys[keySubscriberIndex];
  1436. // If removing specific systems, but global subscription, fail
  1437. if (keySubscriberId->specificSystemsSubscribedTo.Size()==0 && specificSystems.Size()>0)
  1438. return;
  1439. bool dataRepositoryExists;
  1440. CloudDataList *cloudDataList;
  1441. unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists);
  1442. if (dataRepositoryExists==false)
  1443. return;
  1444. unsigned int i,j;
  1445. cloudDataList = dataRepository[dataRepositoryIndex];
  1446. if (specificSystems.Size()==0)
  1447. {
  1448. // Remove global subscriber. If returns false, have to remove specific subscribers
  1449. if (cloudDataList->RemoveSubscriber(remoteCloudClientGuid)==false)
  1450. {
  1451. for (i=0; i < keySubscriberId->specificSystemsSubscribedTo.Size(); i++)
  1452. {
  1453. RemoveSpecificSubscriber(keySubscriberId->specificSystemsSubscribedTo[i], cloudDataList, remoteCloudClientGuid);
  1454. }
  1455. }
  1456. keySubscriberId->specificSystemsSubscribedTo.Clear(true, _FILE_AND_LINE_);
  1457. }
  1458. else
  1459. {
  1460. for (j=0; j < specificSystems.Size(); j++)
  1461. {
  1462. unsigned int specificSystemsSubscribedToIndex;
  1463. bool hasSpecificSystemsSubscribedTo;
  1464. specificSystemsSubscribedToIndex=keySubscriberId->specificSystemsSubscribedTo.GetIndexFromKey(specificSystems[j], &hasSpecificSystemsSubscribedTo);
  1465. if (hasSpecificSystemsSubscribedTo)
  1466. {
  1467. RemoveSpecificSubscriber(specificSystems[j], cloudDataList, remoteCloudClientGuid);
  1468. keySubscriberId->specificSystemsSubscribedTo.RemoveAtIndex(specificSystemsSubscribedToIndex);
  1469. }
  1470. }
  1471. }
  1472. if (keySubscriberId->specificSystemsSubscribedTo.Size()==0)
  1473. {
  1474. RakNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_);
  1475. remoteCloudClient->subscribedKeys.RemoveAtIndex(keySubscriberIndex);
  1476. }
  1477. if (cloudDataList->subscriberCount==0)
  1478. RemoveSubscribedKeyFromServers(cloudKey);
  1479. if (cloudDataList->IsUnused())
  1480. {
  1481. RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
  1482. dataRepository.RemoveAtIndex(dataRepositoryIndex);
  1483. }
  1484. }
  1485. void CloudServer::RemoveSpecificSubscriber(RakNetGUID specificSubscriber, CloudDataList *cloudDataList, RakNetGUID remoteCloudClientGuid)
  1486. {
  1487. bool keyDataListExists;
  1488. unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSubscriber, &keyDataListExists);
  1489. if (keyDataListExists==false)
  1490. return;
  1491. CloudData *cloudData = cloudDataList->keyData[keyDataListIndex];
  1492. bool hasSpecificSubscriber;
  1493. unsigned int specificSubscriberIndex = cloudData->specificSubscribers.GetIndexFromKey(remoteCloudClientGuid, &hasSpecificSubscriber);
  1494. if (hasSpecificSubscriber)
  1495. {
  1496. cloudData->specificSubscribers.RemoveAtIndex(specificSubscriberIndex);
  1497. cloudDataList->subscriberCount--;
  1498. if (cloudData->IsUnused())
  1499. {
  1500. RakNet::OP_DELETE(cloudData, _FILE_AND_LINE_);
  1501. cloudDataList->keyData.RemoveAtIndex(keyDataListIndex);
  1502. }
  1503. }
  1504. }
  1505. void CloudServer::ForceExternalSystemAddress(SystemAddress forcedAddress)
  1506. {
  1507. forceAddress=forcedAddress;
  1508. }
  1509. void CloudServer::AddQueryFilter(CloudServerQueryFilter* filter)
  1510. {
  1511. if (queryFilters.GetIndexOf(filter)!=(unsigned int) -1)
  1512. return;
  1513. queryFilters.Push(filter, _FILE_AND_LINE_);
  1514. }
  1515. void CloudServer::RemoveQueryFilter(CloudServerQueryFilter* filter)
  1516. {
  1517. unsigned int index;
  1518. index = queryFilters.GetIndexOf(filter);
  1519. if (index != (unsigned int) -1)
  1520. queryFilters.RemoveAtIndex(index);
  1521. }
  1522. void CloudServer::RemoveAllQueryFilters(void)
  1523. {
  1524. queryFilters.Clear(true, _FILE_AND_LINE_);
  1525. }
  1526. #endif
粤ICP备19079148号