| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685 |
- /*
- * Copyright (c) 2014, Oculus VR, Inc.
- * All rights reserved.
- *
- * This source code is licensed under the BSD-style license found in the
- * LICENSE file in the root directory of this source tree. An additional grant
- * of patent rights can be found in the PATENTS file in the same directory.
- *
- */
- #include "NativeFeatureIncludes.h"
- #if _RAKNET_SUPPORT_CloudServer==1
- #include "CloudServer.h"
- #include "GetTime.h"
- #include "MessageIdentifiers.h"
- #include "BitStream.h"
- #include "RakPeerInterface.h"
- enum ServerToServerCommands
- {
- STSC_PROCESS_GET_REQUEST,
- STSC_PROCESS_GET_RESPONSE,
- STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS,
- STSC_ADD_UPLOADED_KEY,
- STSC_ADD_SUBSCRIBED_KEY,
- STSC_REMOVE_UPLOADED_KEY,
- STSC_REMOVE_SUBSCRIBED_KEY,
- STSC_DATA_CHANGED,
- };
- using namespace RakNet;
- int CloudServer::RemoteServerComp(const RakNetGUID &key, RemoteServer* const &data )
- {
- if (key < data->serverAddress)
- return -1;
- if (key > data->serverAddress)
- return 1;
- return 0;
- }
- int CloudServer::KeySubscriberIDComp(const CloudKey &key, KeySubscriberID * const &data )
- {
- if (key.primaryKey < data->key.primaryKey)
- return -1;
- if (key.primaryKey > data->key.primaryKey)
- return 1;
- if (key.secondaryKey < data->key.secondaryKey)
- return -1;
- if (key.secondaryKey > data->key.secondaryKey)
- return 1;
- return 0;
- }
- int CloudServer::KeyDataPtrComp( const RakNetGUID &key, CloudData* const &data )
- {
- if (key < data->clientGUID)
- return -1;
- if (key > data->clientGUID)
- return 1;
- return 0;
- }
- int CloudServer::KeyDataListComp( const CloudKey &key, CloudDataList * const &data )
- {
- if (key.primaryKey < data->key.primaryKey)
- return -1;
- if (key.primaryKey > data->key.primaryKey)
- return 1;
- if (key.secondaryKey < data->key.secondaryKey)
- return -1;
- if (key.secondaryKey > data->key.secondaryKey)
- return 1;
- return 0;
- }
- int CloudServer::BufferedGetResponseFromServerComp(const RakNetGUID &key, CloudServer::BufferedGetResponseFromServer* const &data )
- {
- if (key < data->serverAddress)
- return -1;
- if (key > data->serverAddress)
- return 1;
- return 0;
- }
- int CloudServer::GetRequestComp(const uint32_t &key, CloudServer::GetRequest* const &data )
- {
- if (key < data->requestId)
- return -1;
- if (key > data->requestId)
- return -1;
- return 0;
- }
- void CloudServer::CloudQueryWithAddresses::Serialize(bool writeToBitstream, BitStream *bitStream)
- {
- cloudQuery.Serialize(writeToBitstream, bitStream);
- if (writeToBitstream)
- {
- bitStream->WriteCasted<uint16_t>(specificSystems.Size());
- RakAssert(specificSystems.Size() < (uint16_t)-1 );
- for (uint16_t i=0; i < specificSystems.Size(); i++)
- {
- bitStream->Write(specificSystems[i]);
- }
- }
- else
- {
- uint16_t specificSystemsCount;
- RakNetGUID addressOrGuid;
- bitStream->Read(specificSystemsCount);
- for (uint16_t i=0; i < specificSystemsCount; i++)
- {
- bitStream->Read(addressOrGuid);
- specificSystems.Push(addressOrGuid, _FILE_AND_LINE_);
- }
- }
- }
- bool CloudServer::GetRequest::AllRemoteServersHaveResponded(void) const
- {
- unsigned int i;
- for (i=0; i < remoteServerResponses.Size(); i++)
- if (remoteServerResponses[i]->gotResult==false)
- return false;
- return true;
- }
- void CloudServer::GetRequest::Clear(CloudAllocator *allocator)
- {
- unsigned int i;
- for (i=0; i < remoteServerResponses.Size(); i++)
- {
- remoteServerResponses[i]->Clear(allocator);
- RakNet::OP_DELETE(remoteServerResponses[i], _FILE_AND_LINE_);
- }
- remoteServerResponses.Clear(false, _FILE_AND_LINE_);
- }
- void CloudServer::BufferedGetResponseFromServer::Clear(CloudAllocator *allocator)
- {
- unsigned int i;
- for (i=0; i < queryResult.rowsReturned.Size(); i++)
- {
- allocator->DeallocateRowData(queryResult.rowsReturned[i]->data);
- allocator->DeallocateCloudQueryRow(queryResult.rowsReturned[i]);
- }
- queryResult.rowsReturned.Clear(false, _FILE_AND_LINE_);
- }
- CloudServer::CloudServer()
- {
- maxUploadBytesPerClient=0;
- maxBytesPerDowload=0;
- nextGetRequestId=0;
- nextGetRequestsCheck=0;
- }
- CloudServer::~CloudServer()
- {
- Clear();
- }
- void CloudServer::SetMaxUploadBytesPerClient(uint64_t bytes)
- {
- maxUploadBytesPerClient=bytes;
- }
- void CloudServer::SetMaxBytesPerDownload(uint64_t bytes)
- {
- maxBytesPerDowload=bytes;
- }
- void CloudServer::Update(void)
- {
- // Timeout getRequests
- RakNet::Time time = RakNet::Time();
- if (time > nextGetRequestsCheck)
- {
- nextGetRequestsCheck=time+1000;
- unsigned int i=0;
- while (i < getRequests.Size())
- {
- if (time - getRequests[i]->requestStartTime > 3000)
- {
- // Remote server is not responding, just send back data with whoever did respond
- ProcessAndTransmitGetRequest(getRequests[i]);
- getRequests[i]->Clear(this);
- RakNet::OP_DELETE(getRequests[i],_FILE_AND_LINE_);
- getRequests.RemoveAtIndex(i);
- }
- else
- {
- i++;
- }
- }
- }
- }
- PluginReceiveResult CloudServer::OnReceive(Packet *packet)
- {
- switch (packet->data[0])
- {
- case ID_CLOUD_POST_REQUEST:
- OnPostRequest(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case ID_CLOUD_RELEASE_REQUEST:
- OnReleaseRequest(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case ID_CLOUD_GET_REQUEST:
- OnGetRequest(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case ID_CLOUD_UNSUBSCRIBE_REQUEST:
- OnUnsubscribeRequest(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case ID_CLOUD_SERVER_TO_SERVER_COMMAND:
- if (packet->length>1)
- {
- switch (packet->data[1])
- {
- case STSC_PROCESS_GET_REQUEST:
- OnServerToServerGetRequest(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case STSC_PROCESS_GET_RESPONSE:
- OnServerToServerGetResponse(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS:
- OnSendUploadedAndSubscribedKeysToServer(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case STSC_ADD_UPLOADED_KEY:
- OnSendUploadedKeyToServers(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case STSC_ADD_SUBSCRIBED_KEY:
- OnSendSubscribedKeyToServers(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case STSC_REMOVE_UPLOADED_KEY:
- OnRemoveUploadedKeyFromServers(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case STSC_REMOVE_SUBSCRIBED_KEY:
- OnRemoveSubscribedKeyFromServers(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- case STSC_DATA_CHANGED:
- OnServerDataChanged(packet);
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- }
- }
- return RR_STOP_PROCESSING_AND_DEALLOCATE;
- }
- return RR_CONTINUE_PROCESSING;
- }
- void CloudServer::OnPostRequest(Packet *packet)
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID));
- CloudKey key;
- key.Serialize(false,&bsIn);
- uint32_t dataLengthBytes;
- bsIn.Read(dataLengthBytes);
- if (maxUploadBytesPerClient>0 && dataLengthBytes>maxUploadBytesPerClient)
- return; // Exceeded max upload bytes
- bsIn.AlignReadToByteBoundary();
- for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
- {
- if (queryFilters[filterIndex]->OnPostRequest(packet->guid, packet->systemAddress, key, dataLengthBytes, (const char*) bsIn.GetData()+BITS_TO_BYTES(bsIn.GetReadOffset()))==false)
- return;
- }
- unsigned char *data;
- if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
- {
- data = (unsigned char *) rakMalloc_Ex(dataLengthBytes,_FILE_AND_LINE_);
- if (data==0)
- {
- notifyOutOfMemory(_FILE_AND_LINE_);
- return;
- }
- bsIn.ReadAlignedBytes(data,dataLengthBytes);
- }
- else
- data=0;
- // Add this system to remoteSystems if they aren't there already
- DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid);
- RemoteCloudClient *remoteCloudClient;
- if (remoteSystemsHashIndex.IsInvalid())
- {
- remoteCloudClient = RakNet::OP_NEW<RemoteCloudClient>(_FILE_AND_LINE_);
- remoteCloudClient->uploadedKeys.Insert(key,key,true,_FILE_AND_LINE_);
- remoteCloudClient->uploadedBytes=0;
- remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_);
- }
- else
- {
- remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex);
- bool objectExists;
- // Add to RemoteCloudClient::uploadedKeys if it isn't there already
- unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists);
- if (objectExists==false)
- {
- remoteCloudClient->uploadedKeys.InsertAtIndex(key, uploadedKeysIndex, _FILE_AND_LINE_);
- }
- }
- bool cloudDataAlreadyUploaded;
- unsigned int dataRepositoryIndex;
- bool dataRepositoryExists;
- CloudDataList* cloudDataList = GetOrAllocateCloudDataList(key, &dataRepositoryExists, dataRepositoryIndex);
- if (dataRepositoryExists==false)
- {
- cloudDataList->uploaderCount=1;
- cloudDataAlreadyUploaded=false;
- }
- else
- {
- cloudDataAlreadyUploaded=cloudDataList->uploaderCount>0;
- cloudDataList->uploaderCount++;
- }
- CloudData *cloudData;
- bool keyDataListExists;
- unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists);
- if (keyDataListExists==false)
- {
- if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes+dataLengthBytes>maxUploadBytesPerClient)
- {
- // Undo prior insertion of cloudDataList into cloudData if needed
- if (keyDataListExists==false)
- {
- RakNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_);
- dataRepository.RemoveAtIndex(dataRepositoryIndex);
- }
- if (remoteCloudClient->IsUnused())
- {
- RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
- remoteSystems.Remove(packet->guid, _FILE_AND_LINE_);
- }
- if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
- rakFree_Ex(data, _FILE_AND_LINE_);
- return;
- }
- cloudData = RakNet::OP_NEW<CloudData>(_FILE_AND_LINE_);
- cloudData->dataLengthBytes=dataLengthBytes;
- cloudData->isUploaded=true;
- if (forceAddress!=UNASSIGNED_SYSTEM_ADDRESS)
- {
- cloudData->serverSystemAddress=forceAddress;
- cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort());
- }
- else
- {
- cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress);
- if (cloudData->serverSystemAddress.IsLoopback())
- cloudData->serverSystemAddress.FromString(rakPeerInterface->GetLocalIP(0));
- }
- if (cloudData->serverSystemAddress.GetPort()==0)
- {
- // Fix localhost port
- cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort());
- }
- cloudData->clientSystemAddress=packet->systemAddress;
- cloudData->serverGUID=rakPeerInterface->GetMyGUID();
- cloudData->clientGUID=packet->guid;
- cloudDataList->keyData.Insert(packet->guid,cloudData,true,_FILE_AND_LINE_);
- }
- else
- {
- cloudData = cloudDataList->keyData[keyDataListIndex];
- if (cloudDataAlreadyUploaded==false)
- {
- if (forceAddress!=UNASSIGNED_SYSTEM_ADDRESS)
- {
- cloudData->serverSystemAddress=forceAddress;
- cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort());
- }
- else
- {
- cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress);
- }
- if (cloudData->serverSystemAddress.GetPort()==0)
- {
- // Fix localhost port
- cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort());
- }
- cloudData->clientSystemAddress=packet->systemAddress;
- }
- if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes-cloudData->dataLengthBytes+dataLengthBytes>maxUploadBytesPerClient)
- {
- // Undo prior insertion of cloudDataList into cloudData if needed
- if (dataRepositoryExists==false)
- {
- RakNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_);
- dataRepository.RemoveAtIndex(dataRepositoryIndex);
- }
- return;
- }
- else
- {
- // Subtract already used bytes we are overwriting
- remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes;
- }
- if (cloudData->allocatedData!=0)
- rakFree_Ex(cloudData->allocatedData,_FILE_AND_LINE_);
- }
- if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
- {
- // Data already allocated
- cloudData->allocatedData=data;
- cloudData->dataPtr=data;
- }
- else
- {
- // Read to stack
- if (dataLengthBytes>0)
- bsIn.ReadAlignedBytes(cloudData->stackData,dataLengthBytes);
- cloudData->allocatedData=0;
- cloudData->dataPtr=cloudData->stackData;
- }
- // Update how many bytes were written for this data
- cloudData->dataLengthBytes=dataLengthBytes;
- remoteCloudClient->uploadedBytes+=dataLengthBytes;
- if (cloudDataAlreadyUploaded==false)
- {
- // New data field
- SendUploadedKeyToServers(cloudDataList->key);
- }
- // Existing data field changed
- NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, true );
- NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, true );
- // Send update to all remote servers that subscribed to this key
- NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, true);
- // I could have also subscribed to a key not yet updated locally
- // This means I have to go through every RemoteClient that wants this key
- // Seems like cloudData->specificSubscribers is unnecessary in that case
- }
- void CloudServer::OnReleaseRequest(Packet *packet)
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID));
- uint16_t keyCount;
- bsIn.Read(keyCount);
- if (keyCount==0)
- return;
- DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid);
- if (remoteSystemIndex.IsInvalid()==true)
- return;
- RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
- CloudKey key;
- // Read all in a list first so I can run filter on it
- DataStructures::List<CloudKey> cloudKeys;
- for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++)
- {
- key.Serialize(false, &bsIn);
- cloudKeys.Push(key, _FILE_AND_LINE_);
- }
- for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
- {
- if (queryFilters[filterIndex]->OnReleaseRequest(packet->guid, packet->systemAddress, cloudKeys)==false)
- return;
- }
- for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++)
- {
- // Serialize in list above so I can run the filter on it
- // key.Serialize(false, &bsIn);
- key=cloudKeys[keyCountIndex];
- // Remove remote systems uploaded keys
- bool objectExists;
- unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists);
- if (objectExists)
- {
- bool dataRepositoryExists;
- unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists);
- CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex];
- RakAssert(cloudDataList);
- CloudData *cloudData;
- bool keyDataListExists;
- unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists);
- cloudData = cloudDataList->keyData[keyDataListIndex];
- remoteCloudClient->uploadedKeys.RemoveAtIndex(uploadedKeysIndex);
- remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes;
- cloudDataList->uploaderCount--;
- // Broadcast destruction of this key to subscribers
- NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false );
- NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false );
- NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false );
- cloudData->Clear();
- if (cloudData->IsUnused())
- {
- RakNet::OP_DELETE(cloudData, _FILE_AND_LINE_);
- cloudDataList->keyData.RemoveAtIndex(keyDataListIndex);
- if (cloudDataList->IsNotUploaded())
- {
- // Tell other servers that this key is no longer uploaded, so they do not request it from us
- RemoveUploadedKeyFromServers(cloudDataList->key);
- }
- if (cloudDataList->IsUnused())
- {
- RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
- dataRepository.RemoveAtIndex(dataRepositoryIndex);
- }
- }
- if (remoteCloudClient->IsUnused())
- {
- RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
- remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
- break;
- }
- }
- }
- }
- void CloudServer::OnGetRequest(Packet *packet)
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID));
- uint16_t specificSystemsCount;
- CloudKey cloudKey;
- // Create a new GetRequest
- GetRequest *getRequest;
- getRequest = RakNet::OP_NEW<GetRequest>(_FILE_AND_LINE_);
- getRequest->cloudQueryWithAddresses.cloudQuery.Serialize(false, &bsIn);
- getRequest->requestingClient=packet->guid;
- RakNetGUID addressOrGuid;
- bsIn.Read(specificSystemsCount);
- for (uint16_t i=0; i < specificSystemsCount; i++)
- {
- bsIn.Read(addressOrGuid);
- getRequest->cloudQueryWithAddresses.specificSystems.Push(addressOrGuid, _FILE_AND_LINE_);
- }
- if (getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size()==0)
- {
- RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
- return;
- }
- for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
- {
- if (queryFilters[filterIndex]->OnGetRequest(packet->guid, packet->systemAddress, getRequest->cloudQueryWithAddresses.cloudQuery, getRequest->cloudQueryWithAddresses.specificSystems )==false)
- return;
- }
- getRequest->requestStartTime=RakNet::GetTime();
- getRequest->requestId=nextGetRequestId++;
- // Send request to servers that have this data
- DataStructures::List<RemoteServer*> remoteServersWithData;
- GetServersWithUploadedKeys(getRequest->cloudQueryWithAddresses.cloudQuery.keys, remoteServersWithData);
- if (remoteServersWithData.Size()==0)
- {
- ProcessAndTransmitGetRequest(getRequest);
- }
- else
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
- bsOut.Write((MessageID)STSC_PROCESS_GET_REQUEST);
- getRequest->cloudQueryWithAddresses.Serialize(true, &bsOut);
- bsOut.Write(getRequest->requestId);
- for (unsigned int remoteServerIndex=0; remoteServerIndex < remoteServersWithData.Size(); remoteServerIndex++)
- {
- BufferedGetResponseFromServer* bufferedGetResponseFromServer = RakNet::OP_NEW<BufferedGetResponseFromServer>(_FILE_AND_LINE_);
- bufferedGetResponseFromServer->serverAddress=remoteServersWithData[remoteServerIndex]->serverAddress;
- bufferedGetResponseFromServer->gotResult=false;
- getRequest->remoteServerResponses.Insert(remoteServersWithData[remoteServerIndex]->serverAddress, bufferedGetResponseFromServer, true, _FILE_AND_LINE_);
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServersWithData[remoteServerIndex]->serverAddress, false);
- }
- // Record that this system made this request
- getRequests.Insert(getRequest->requestId, getRequest, true, _FILE_AND_LINE_);
- }
- if (getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults)
- {
- // Add to key subscription list for the client, which contains a keyId / specificUploaderList pair
- DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid);
- RemoteCloudClient *remoteCloudClient;
- if (remoteSystemsHashIndex.IsInvalid())
- {
- remoteCloudClient = RakNet::OP_NEW<RemoteCloudClient>(_FILE_AND_LINE_);
- remoteCloudClient->uploadedBytes=0;
- remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_);
- }
- else
- {
- remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex);
- }
- unsigned int keyIndex;
- for (keyIndex=0; keyIndex < getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size(); keyIndex++)
- {
- cloudKey = getRequest->cloudQueryWithAddresses.cloudQuery.keys[keyIndex];
- unsigned int keySubscriberIndex;
- bool hasKeySubscriber;
- keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber);
- KeySubscriberID* keySubscriberId;
- if (hasKeySubscriber)
- {
- DataStructures::List<RakNetGUID> specificSystems;
- UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems);
- }
- keySubscriberId = RakNet::OP_NEW<KeySubscriberID>(_FILE_AND_LINE_);
- keySubscriberId->key=cloudKey;
- unsigned int specificSystemIndex;
- for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
- {
- keySubscriberId->specificSystemsSubscribedTo.Insert(getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], true, _FILE_AND_LINE_);
- }
- remoteCloudClient->subscribedKeys.InsertAtIndex(keySubscriberId, keySubscriberIndex, _FILE_AND_LINE_);
- // Add CloudData in a similar way
- unsigned int dataRepositoryIndex;
- bool dataRepositoryExists;
- CloudDataList* cloudDataList = GetOrAllocateCloudDataList(cloudKey, &dataRepositoryExists, dataRepositoryIndex);
- // If this is the first local client to subscribe to this key, call SendSubscribedKeyToServers
- if (cloudDataList->subscriberCount==0)
- SendSubscribedKeyToServers(cloudKey);
- // If the subscription is specific, may have to also allocate CloudData
- if (getRequest->cloudQueryWithAddresses.specificSystems.Size())
- {
- CloudData *cloudData;
- bool keyDataListExists;
- unsigned int specificSystemIndex;
- for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
- {
- RakNetGUID specificSystem = getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex];
- unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSystem, &keyDataListExists);
- if (keyDataListExists==false)
- {
- cloudData = RakNet::OP_NEW<CloudData>(_FILE_AND_LINE_);
- cloudData->dataLengthBytes=0;
- cloudData->allocatedData=0;
- cloudData->isUploaded=false;
- cloudData->dataPtr=0;
- cloudData->serverSystemAddress=UNASSIGNED_SYSTEM_ADDRESS;
- cloudData->clientSystemAddress=UNASSIGNED_SYSTEM_ADDRESS;
- cloudData->serverGUID=rakPeerInterface->GetMyGUID();
- cloudData->clientGUID=specificSystem;
- cloudDataList->keyData.Insert(specificSystem,cloudData,true,_FILE_AND_LINE_);
- }
- else
- {
- cloudData = cloudDataList->keyData[keyDataListIndex];
- }
- ++cloudDataList->subscriberCount;
- cloudData->specificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_);
- }
- }
- else
- {
- ++cloudDataList->subscriberCount;
- cloudDataList->nonSpecificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_);
- // Remove packet->guid from CloudData::specificSubscribers among all instances of cloudDataList->keyData
- unsigned int subscribedKeysIndex;
- bool subscribedKeysIndexExists;
- subscribedKeysIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudDataList->key, &subscribedKeysIndexExists);
- if (subscribedKeysIndexExists)
- {
- KeySubscriberID* keySubscriberId;
- keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex];
- unsigned int specificSystemIndex;
- for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++)
- {
- bool keyDataExists;
- unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists);
- if (keyDataExists)
- {
- CloudData *keyData = cloudDataList->keyData[keyDataIndex];
- keyData->specificSubscribers.Remove(packet->guid);
- --cloudDataList->subscriberCount;
- }
- }
- }
- }
- }
- if (remoteCloudClient->subscribedKeys.Size()==0)
- {
- // Didn't do anything
- remoteSystems.Remove(packet->guid, _FILE_AND_LINE_);
- RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
- }
- }
- if (remoteServersWithData.Size()==0)
- RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
- }
- void CloudServer::OnUnsubscribeRequest(Packet *packet)
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID));
- DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid);
- if (remoteSystemIndex.IsInvalid()==true)
- return;
- RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
- uint16_t keyCount, specificSystemCount;
- DataStructures::List<CloudKey> cloudKeys;
- DataStructures::List<RakNetGUID> specificSystems;
- uint16_t index;
- CloudKey cloudKey;
- bsIn.Read(keyCount);
- for (index=0; index < keyCount; index++)
- {
- cloudKey.Serialize(false, &bsIn);
- cloudKeys.Push(cloudKey, _FILE_AND_LINE_);
- }
- RakNetGUID specificSystem;
- bsIn.Read(specificSystemCount);
- for (index=0; index < specificSystemCount; index++)
- {
- bsIn.Read(specificSystem);
- specificSystems.Push(specificSystem, _FILE_AND_LINE_);
- }
- for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
- {
- if (queryFilters[filterIndex]->OnUnsubscribeRequest(packet->guid, packet->systemAddress, cloudKeys, specificSystems )==false)
- return;
- }
- // CloudDataList *cloudDataList;
- bool dataRepositoryExists;
- // unsigned int dataRepositoryIndex;
- for (index=0; index < keyCount; index++)
- {
- CloudKey cloudKey = cloudKeys[index];
- // dataRepositoryIndex =
- dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists);
- if (dataRepositoryExists==false)
- continue;
- // cloudDataList = dataRepository[dataRepositoryIndex];
- unsigned int keySubscriberIndex;
- bool hasKeySubscriber;
- keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber);
- if (hasKeySubscriber==false)
- continue;
- UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems);
- }
- if (remoteCloudClient->IsUnused())
- {
- RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
- remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
- }
- }
- void CloudServer::OnServerToServerGetRequest(Packet *packet)
- {
- // unsigned int remoteServerIndex;
- bool objectExists;
- //remoteServerIndex =
- remoteServers.GetIndexFromKey(packet->guid, &objectExists);
- if (objectExists==false)
- return;
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID)*2);
- CloudQueryWithAddresses cloudQueryWithAddresses;
- uint32_t requestId;
- cloudQueryWithAddresses.Serialize(false, &bsIn);
- bsIn.Read(requestId);
- DataStructures::List<CloudData*> cloudDataResultList;
- DataStructures::List<CloudKey> cloudKeyResultList;
- ProcessCloudQueryWithAddresses(cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList);
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
- bsOut.Write((MessageID)STSC_PROCESS_GET_RESPONSE);
- bsOut.Write(requestId);
- WriteCloudQueryRowFromResultList(cloudDataResultList, cloudKeyResultList, &bsOut);
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, packet->guid, false);
- }
- void CloudServer::OnServerToServerGetResponse(Packet *packet)
- {
- unsigned int remoteServerIndex;
- bool objectExists;
- remoteServerIndex = remoteServers.GetIndexFromKey(packet->guid, &objectExists);
- if (objectExists==false)
- return;
- RemoteServer *remoteServer = remoteServers[remoteServerIndex];
- if (remoteServer==0)
- return;
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID)*2);
- uint32_t requestId;
- bsIn.Read(requestId);
- // Lookup request id
- bool hasGetRequest;
- unsigned int getRequestIndex;
- getRequestIndex = getRequests.GetIndexFromKey(requestId, &hasGetRequest);
- if (hasGetRequest==false)
- return;
- GetRequest *getRequest = getRequests[getRequestIndex];
- bool hasRemoteServer;
- unsigned int remoteServerResponsesIndex;
- remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(packet->guid, &hasRemoteServer);
- if (hasRemoteServer==false)
- return;
- BufferedGetResponseFromServer *bufferedGetResponseFromServer;
- bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponsesIndex];
- if (bufferedGetResponseFromServer->gotResult==true)
- return;
- bufferedGetResponseFromServer->gotResult=true;
- uint32_t numRows;
- bufferedGetResponseFromServer->queryResult.SerializeNumRows(false, numRows, &bsIn);
- bufferedGetResponseFromServer->queryResult.SerializeCloudQueryRows(false, numRows, &bsIn, this);
- // If all results returned, then also process locally, and return to user
- if (getRequest->AllRemoteServersHaveResponded())
- {
- ProcessAndTransmitGetRequest(getRequest);
- getRequest->Clear(this);
- RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
- getRequests.RemoveAtIndex(getRequestIndex);
- }
- }
- void CloudServer::OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason )
- {
- (void) lostConnectionReason;
- (void) systemAddress;
- unsigned int remoteServerIndex;
- bool objectExists;
- remoteServerIndex = remoteServers.GetIndexFromKey(rakNetGUID, &objectExists);
- if (objectExists)
- {
- // Update remoteServerResponses by removing this server and sending the response if it is now complete
- unsigned int getRequestIndex=0;
- while (getRequestIndex < getRequests.Size())
- {
- GetRequest *getRequest = getRequests[getRequestIndex];
- bool waitingForThisServer;
- unsigned int remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(rakNetGUID, &waitingForThisServer);
- if (waitingForThisServer)
- {
- getRequest->remoteServerResponses[remoteServerResponsesIndex]->Clear(this);
- RakNet::OP_DELETE(getRequest->remoteServerResponses[remoteServerResponsesIndex], _FILE_AND_LINE_);
- getRequest->remoteServerResponses.RemoveAtIndex(remoteServerResponsesIndex);
- if (getRequest->AllRemoteServersHaveResponded())
- {
- ProcessAndTransmitGetRequest(getRequest);
- getRequest->Clear(this);
- RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
- getRequests.RemoveAtIndex(getRequestIndex);
- }
- else
- getRequestIndex++;
- }
- else
- getRequestIndex++;
- }
- RakNet::OP_DELETE(remoteServers[remoteServerIndex],_FILE_AND_LINE_);
- remoteServers.RemoveAtIndex(remoteServerIndex);
- }
- DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(rakNetGUID);
- if (remoteSystemIndex.IsInvalid()==false)
- {
- RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
- unsigned int uploadedKeysIndex;
- for (uploadedKeysIndex=0; uploadedKeysIndex < remoteCloudClient->uploadedKeys.Size(); uploadedKeysIndex++)
- {
- // Delete keys this system has uploaded
- bool keyDataRepositoryExists;
- unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->uploadedKeys[uploadedKeysIndex], &keyDataRepositoryExists);
- if (keyDataRepositoryExists)
- {
- CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex];
- bool keyDataExists;
- unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(rakNetGUID, &keyDataExists);
- if (keyDataExists)
- {
- CloudData *cloudData = cloudDataList->keyData[keyDataIndex];
- cloudDataList->uploaderCount--;
- NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false );
- NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false );
- NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false );
- cloudData->Clear();
- if (cloudData->IsUnused())
- {
- RakNet::OP_DELETE(cloudData,_FILE_AND_LINE_);
- cloudDataList->keyData.RemoveAtIndex(keyDataIndex);
- if (cloudDataList->IsNotUploaded())
- {
- // Tell other servers that this key is no longer uploaded, so they do not request it from us
- RemoveUploadedKeyFromServers(cloudDataList->key);
- }
- if (cloudDataList->IsUnused())
- {
- // Tell other servers that this key is no longer uploaded, so they do not request it from us
- RemoveUploadedKeyFromServers(cloudDataList->key);
- RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
- dataRepository.RemoveAtIndex(dataRepositoryIndex);
- }
- }
- }
- }
- }
- unsigned int subscribedKeysIndex;
- for (subscribedKeysIndex=0; subscribedKeysIndex < remoteCloudClient->subscribedKeys.Size(); subscribedKeysIndex++)
- {
- KeySubscriberID* keySubscriberId;
- keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex];
- bool keyDataRepositoryExists;
- unsigned int keyDataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->subscribedKeys[subscribedKeysIndex]->key, &keyDataRepositoryExists);
- if (keyDataRepositoryExists)
- {
- CloudDataList* cloudDataList = dataRepository[keyDataRepositoryIndex];
- if (keySubscriberId->specificSystemsSubscribedTo.Size()==0)
- {
- cloudDataList->nonSpecificSubscribers.Remove(rakNetGUID);
- --cloudDataList->subscriberCount;
- }
- else
- {
- unsigned int specificSystemIndex;
- for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++)
- {
- bool keyDataExists;
- unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists);
- if (keyDataExists)
- {
- CloudData *keyData = cloudDataList->keyData[keyDataIndex];
- keyData->specificSubscribers.Remove(rakNetGUID);
- --cloudDataList->subscriberCount;
- }
- }
- }
- }
- RakNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_);
- }
- // Delete and remove from remoteSystems
- RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
- remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
- }
- }
- void CloudServer::OnRakPeerShutdown(void)
- {
- Clear();
- }
- void CloudServer::Clear(void)
- {
- unsigned int i,j;
- for (i=0; i < dataRepository.Size(); i++)
- {
- CloudDataList *cloudDataList = dataRepository[i];
- for (j=0; j < cloudDataList->keyData.Size(); j++)
- {
- cloudDataList->keyData[j]->Clear();
- RakNet::OP_DELETE(cloudDataList->keyData[j], _FILE_AND_LINE_);
- }
- RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
- }
- dataRepository.Clear(false, _FILE_AND_LINE_);
- for (i=0; i < remoteServers.Size(); i++)
- {
- RakNet::OP_DELETE(remoteServers[i], _FILE_AND_LINE_);
- }
- remoteServers.Clear(false, _FILE_AND_LINE_);
- for (i=0; i < getRequests.Size(); i++)
- {
- GetRequest *getRequest = getRequests[i];
- getRequest->Clear(this);
- RakNet::OP_DELETE(getRequests[i], _FILE_AND_LINE_);
- }
- getRequests.Clear(false, _FILE_AND_LINE_);
- DataStructures::List<RakNetGUID> keyList;
- DataStructures::List<RemoteCloudClient*> itemList;
- remoteSystems.GetAsList(itemList, keyList, _FILE_AND_LINE_);
- for (i=0; i < itemList.Size(); i++)
- {
- RemoteCloudClient* remoteCloudClient = itemList[i];
- for (j=0; j < remoteCloudClient->subscribedKeys.Size(); j++)
- {
- RakNet::OP_DELETE(remoteCloudClient->subscribedKeys[j], _FILE_AND_LINE_);
- }
- RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
- }
- remoteSystems.Clear(_FILE_AND_LINE_);
- }
- void CloudServer::WriteCloudQueryRowFromResultList(DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut)
- {
- bsOut->WriteCasted<uint32_t>(cloudKeyResultList.Size());
- unsigned int i;
- for (i=0; i < cloudKeyResultList.Size(); i++)
- {
- WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, bsOut);
- }
- }
- void CloudServer::WriteCloudQueryRowFromResultList(unsigned int i, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut)
- {
- CloudQueryRow cloudQueryRow;
- CloudData *cloudData = cloudDataResultList[i];
- cloudQueryRow.key=cloudKeyResultList[i];
- cloudQueryRow.data=cloudData->dataPtr;
- cloudQueryRow.length=cloudData->dataLengthBytes;
- cloudQueryRow.serverSystemAddress=cloudData->serverSystemAddress;
- cloudQueryRow.clientSystemAddress=cloudData->clientSystemAddress;
- cloudQueryRow.serverGUID=cloudData->serverGUID;
- cloudQueryRow.clientGUID=cloudData->clientGUID;
- cloudQueryRow.Serialize(true, bsOut, 0);
- }
- void CloudServer::NotifyClientSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated )
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION);
- bsOut.Write(wasUpdated);
- CloudQueryRow row;
- row.key=key;
- row.data=cloudData->dataPtr;
- row.length=cloudData->dataLengthBytes;
- row.serverSystemAddress=cloudData->serverSystemAddress;
- row.clientSystemAddress=cloudData->clientSystemAddress;
- row.serverGUID=cloudData->serverGUID;
- row.clientGUID=cloudData->clientGUID;
- row.Serialize(true,&bsOut,0);
- unsigned int i;
- for (i=0; i < subscribers.Size(); i++)
- {
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false);
- }
- }
- void CloudServer::NotifyClientSubscribersOfDataChange( CloudQueryRow *row, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated )
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION);
- bsOut.Write(wasUpdated);
- row->Serialize(true,&bsOut,0);
- unsigned int i;
- for (i=0; i < subscribers.Size(); i++)
- {
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false);
- }
- }
- void CloudServer::NotifyServerSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, bool wasUpdated )
- {
- // Find every server that has subscribed
- // Send them change notifications
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
- bsOut.Write((MessageID)STSC_DATA_CHANGED);
- bsOut.Write(wasUpdated);
- CloudQueryRow row;
- row.key=key;
- row.data=cloudData->dataPtr;
- row.length=cloudData->dataLengthBytes;
- row.serverSystemAddress=cloudData->serverSystemAddress;
- row.clientSystemAddress=cloudData->clientSystemAddress;
- row.serverGUID=cloudData->serverGUID;
- row.clientGUID=cloudData->clientGUID;
- row.Serialize(true,&bsOut,0);
- unsigned int i;
- for (i=0; i < remoteServers.Size(); i++)
- {
- if (remoteServers[i]->gotSubscribedAndUploadedKeys==false || remoteServers[i]->subscribedKeys.HasData(key))
- {
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
- }
- }
- }
- void CloudServer::AddServer(RakNetGUID systemIdentifier)
- {
- ConnectionState cs = rakPeerInterface->GetConnectionState(systemIdentifier);
- if (cs==IS_DISCONNECTED || cs==IS_NOT_CONNECTED)
- return;
- bool objectExists;
- unsigned int index = remoteServers.GetIndexFromKey(systemIdentifier,&objectExists);
- if (objectExists==false)
- {
- RemoteServer *remoteServer = RakNet::OP_NEW<RemoteServer>(_FILE_AND_LINE_);
- remoteServer->gotSubscribedAndUploadedKeys=false;
- remoteServer->serverAddress=systemIdentifier;
- remoteServers.InsertAtIndex(remoteServer, index, _FILE_AND_LINE_);
- SendUploadedAndSubscribedKeysToServer(systemIdentifier);
- }
- }
- void CloudServer::RemoveServer(RakNetGUID systemAddress)
- {
- bool objectExists;
- unsigned int index = remoteServers.GetIndexFromKey(systemAddress,&objectExists);
- if (objectExists==true)
- {
- RakNet::OP_DELETE(remoteServers[index],_FILE_AND_LINE_);
- remoteServers.RemoveAtIndex(index);
- }
- }
- void CloudServer::GetRemoteServers(DataStructures::List<RakNetGUID> &remoteServersOut)
- {
- remoteServersOut.Clear(true, _FILE_AND_LINE_);
- unsigned int i;
- for (i=0; i < remoteServers.Size(); i++)
- {
- remoteServersOut.Push(remoteServers[i]->serverAddress, _FILE_AND_LINE_);
- }
- }
- void CloudServer::ProcessAndTransmitGetRequest(GetRequest *getRequest)
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID) ID_CLOUD_GET_RESPONSE);
- // BufferedGetResponseFromServer getResponse;
- CloudQueryResult cloudQueryResult;
- cloudQueryResult.cloudQuery=getRequest->cloudQueryWithAddresses.cloudQuery;
- cloudQueryResult.subscribeToResults=getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults;
- cloudQueryResult.SerializeHeader(true, &bsOut);
- DataStructures::List<CloudData*> cloudDataResultList;
- DataStructures::List<CloudKey> cloudKeyResultList;
- ProcessCloudQueryWithAddresses(getRequest->cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList);
- bool unlimitedRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn==0;
- uint32_t localNumRows = (uint32_t) cloudDataResultList.Size();
- if (unlimitedRows==false &&
- localNumRows > getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex &&
- localNumRows - getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex > getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn )
- localNumRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex + getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn;
- BitSize_t bitStreamOffset = bsOut.GetWriteOffset();
- uint32_t localRowsToWrite;
- unsigned int skipRows;
- if (localNumRows>getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex)
- {
- localRowsToWrite=localNumRows-getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex;
- skipRows=0;
- }
- else
- {
- localRowsToWrite=0;
- skipRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex-localNumRows;
- }
- cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut);
- for (unsigned int i=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex; i < localNumRows; i++)
- {
- WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, &bsOut);
- }
- // Append remote systems for remaining rows
- if (unlimitedRows==true || getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn>localRowsToWrite)
- {
- uint32_t remainingRows=0;
- uint32_t additionalRowsWritten=0;
- if (unlimitedRows==false)
- remainingRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn-localRowsToWrite;
- unsigned int remoteServerResponseIndex;
- for (remoteServerResponseIndex=0; remoteServerResponseIndex < getRequest->remoteServerResponses.Size(); remoteServerResponseIndex++)
- {
- BufferedGetResponseFromServer *bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponseIndex];
- unsigned int cloudQueryRowIndex;
- for (cloudQueryRowIndex=0; cloudQueryRowIndex < bufferedGetResponseFromServer->queryResult.rowsReturned.Size(); cloudQueryRowIndex++)
- {
- if (skipRows>0)
- {
- --skipRows;
- continue;
- }
- bufferedGetResponseFromServer->queryResult.rowsReturned[cloudQueryRowIndex]->Serialize(true, &bsOut, this);
- ++additionalRowsWritten;
- if (unlimitedRows==false && --remainingRows==0)
- break;
- }
- if (unlimitedRows==false && remainingRows==0)
- break;
- }
- if (additionalRowsWritten>0)
- {
- BitSize_t curOffset = bsOut.GetWriteOffset();
- bsOut.SetWriteOffset(bitStreamOffset);
- localRowsToWrite+=additionalRowsWritten;
- cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut);
- bsOut.SetWriteOffset(curOffset);
- }
- }
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, getRequest->requestingClient, false);
- }
- void CloudServer::ProcessCloudQueryWithAddresses( CloudServer::CloudQueryWithAddresses &cloudQueryWithAddresses, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList )
- {
- CloudQueryResult cloudQueryResult;
- CloudQueryRow cloudQueryRow;
- unsigned int queryIndex;
- bool dataRepositoryExists;
- CloudDataList* cloudDataList;
- unsigned int keyDataIndex;
- // If specificSystems list empty, applies to all systems
- // For each of keys in cloudQueryWithAddresses, return that data, limited by maxRowsToReturn
- for (queryIndex=0; queryIndex < cloudQueryWithAddresses.cloudQuery.keys.Size(); queryIndex++)
- {
- const CloudKey &key = cloudQueryWithAddresses.cloudQuery.keys[queryIndex];
- unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists);
- if (dataRepositoryExists)
- {
- cloudDataList=dataRepository[dataRepositoryIndex];
- if (cloudDataList->uploaderCount>0)
- {
- // Return all keyData that was uploaded by specificSystems, or all if not specified
- if (cloudQueryWithAddresses.specificSystems.Size()>0)
- {
- // Return data for matching systems
- unsigned int specificSystemIndex;
- for (specificSystemIndex=0; specificSystemIndex < cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
- {
- bool uploaderExists;
- keyDataIndex = cloudDataList->keyData.GetIndexFromKey(cloudQueryWithAddresses.specificSystems[specificSystemIndex], &uploaderExists);
- if (uploaderExists)
- {
- cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_);
- cloudKeyResultList.Push(key, _FILE_AND_LINE_);
- }
- }
- }
- else
- {
- // Return data for all systems
- for (keyDataIndex=0; keyDataIndex < cloudDataList->keyData.Size(); keyDataIndex++)
- {
- cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_);
- cloudKeyResultList.Push(key, _FILE_AND_LINE_);
- }
- }
- }
- }
- }
- }
- void CloudServer::SendUploadedAndSubscribedKeysToServer( RakNetGUID systemAddress )
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
- bsOut.Write((MessageID)STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS);
- bsOut.WriteCasted<uint16_t>(dataRepository.Size());
- for (unsigned int i=0; i < dataRepository.Size(); i++)
- dataRepository[i]->key.Serialize(true, &bsOut);
- BitSize_t startOffset, endOffset;
- uint16_t subscribedKeyCount=0;
- startOffset=bsOut.GetWriteOffset();
- bsOut.WriteCasted<uint16_t>(subscribedKeyCount);
- for (unsigned int i=0; i < dataRepository.Size(); i++)
- {
- if (dataRepository[i]->subscriberCount>0)
- {
- dataRepository[i]->key.Serialize(true, &bsOut);
- subscribedKeyCount++;
- }
- }
- endOffset=bsOut.GetWriteOffset();
- bsOut.SetWriteOffset(startOffset);
- bsOut.WriteCasted<uint16_t>(subscribedKeyCount);
- bsOut.SetWriteOffset(endOffset);
- if (dataRepository.Size()>0 || subscribedKeyCount>0)
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, systemAddress, false);
- }
- void CloudServer::SendUploadedKeyToServers( CloudKey &cloudKey )
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
- bsOut.Write((MessageID)STSC_ADD_UPLOADED_KEY);
- cloudKey.Serialize(true, &bsOut);
- for (unsigned int i=0; i < remoteServers.Size(); i++)
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
- }
- void CloudServer::SendSubscribedKeyToServers( CloudKey &cloudKey )
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
- bsOut.Write((MessageID)STSC_ADD_SUBSCRIBED_KEY);
- cloudKey.Serialize(true, &bsOut);
- for (unsigned int i=0; i < remoteServers.Size(); i++)
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
- }
- void CloudServer::RemoveUploadedKeyFromServers( CloudKey &cloudKey )
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
- bsOut.Write((MessageID)STSC_REMOVE_UPLOADED_KEY);
- cloudKey.Serialize(true, &bsOut);
- for (unsigned int i=0; i < remoteServers.Size(); i++)
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
- }
- void CloudServer::RemoveSubscribedKeyFromServers( CloudKey &cloudKey )
- {
- RakNet::BitStream bsOut;
- bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
- bsOut.Write((MessageID)STSC_REMOVE_SUBSCRIBED_KEY);
- cloudKey.Serialize(true, &bsOut);
- for (unsigned int i=0; i < remoteServers.Size(); i++)
- SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
- }
- void CloudServer::OnSendUploadedAndSubscribedKeysToServer( Packet *packet )
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID)*2);
- bool objectExists;
- unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
- if (objectExists==false)
- return;
- RemoteServer *remoteServer = remoteServers[index];
- remoteServer->gotSubscribedAndUploadedKeys=true;
- // unsigned int insertionIndex;
- bool alreadyHasKey;
- uint16_t numUploadedKeys, numSubscribedKeys;
- bsIn.Read(numUploadedKeys);
- for (uint16_t i=0; i < numUploadedKeys; i++)
- {
- CloudKey cloudKey;
- cloudKey.Serialize(false, &bsIn);
- // insertionIndex =
- remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
- if (alreadyHasKey==false)
- remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
- }
- bsIn.Read(numSubscribedKeys);
- for (uint16_t i=0; i < numSubscribedKeys; i++)
- {
- CloudKey cloudKey;
- cloudKey.Serialize(false, &bsIn);
- //insertionIndex =
- remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
- if (alreadyHasKey==false)
- remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
- }
- // Potential todo - join servers
- // For each uploaded key that we subscribe to, query it
- // For each subscribed key that we have, send it
- }
- void CloudServer::OnSendUploadedKeyToServers( Packet *packet )
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID)*2);
- bool objectExists;
- unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
- if (objectExists==false)
- return;
- RemoteServer *remoteServer = remoteServers[index];
- CloudKey cloudKey;
- cloudKey.Serialize(false, &bsIn);
- // unsigned int insertionIndex;
- bool alreadyHasKey;
- // insertionIndex =
- remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
- if (alreadyHasKey==false)
- remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
- }
- void CloudServer::OnSendSubscribedKeyToServers( Packet *packet )
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID)*2);
- bool objectExists;
- unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
- if (objectExists==false)
- return;
- RemoteServer *remoteServer = remoteServers[index];
- CloudKey cloudKey;
- cloudKey.Serialize(false, &bsIn);
- // unsigned int insertionIndex;
- bool alreadyHasKey;
- // insertionIndex =
- remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
- // Do not need to send current values, the Get request will do that as the Get request is sent at the same time
- if (alreadyHasKey==false)
- remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
- }
- void CloudServer::OnRemoveUploadedKeyFromServers( Packet *packet )
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID)*2);
- bool objectExists;
- unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
- if (objectExists==false)
- return;
- RemoteServer *remoteServer = remoteServers[index];
- CloudKey cloudKey;
- cloudKey.Serialize(false, &bsIn);
- unsigned int insertionIndex;
- bool alreadyHasKey;
- insertionIndex = remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
- if (alreadyHasKey==true)
- remoteServer->uploadedKeys.RemoveAtIndex(insertionIndex);
- }
- void CloudServer::OnRemoveSubscribedKeyFromServers( Packet *packet )
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID)*2);
- bool objectExists;
- unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
- if (objectExists==false)
- return;
- RemoteServer *remoteServer = remoteServers[index];
- CloudKey cloudKey;
- cloudKey.Serialize(false, &bsIn);
- unsigned int insertionIndex;
- bool alreadyHasKey;
- insertionIndex = remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
- if (alreadyHasKey==true)
- remoteServer->subscribedKeys.RemoveAtIndex(insertionIndex);
- }
- void CloudServer::OnServerDataChanged( Packet *packet )
- {
- RakNet::BitStream bsIn(packet->data, packet->length, false);
- bsIn.IgnoreBytes(sizeof(MessageID)*2);
- bool objectExists;
- remoteServers.GetIndexFromKey(packet->guid,&objectExists);
- if (objectExists==false)
- return;
- // Find everyone that cares about this change and relay
- bool wasUpdated=false;
- bsIn.Read(wasUpdated);
- CloudQueryRow row;
- row.Serialize(false, &bsIn, this);
- CloudDataList *cloudDataList;
- bool dataRepositoryExists;
- unsigned int dataRepositoryIndex;
- dataRepositoryIndex = dataRepository.GetIndexFromKey(row.key, &dataRepositoryExists);
- if (dataRepositoryExists==false)
- {
- DeallocateRowData(row.data);
- return;
- }
- cloudDataList = dataRepository[dataRepositoryIndex];
- CloudData *cloudData;
- bool keyDataListExists;
- unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(row.clientGUID, &keyDataListExists);
- if (keyDataListExists==true)
- {
- cloudData = cloudDataList->keyData[keyDataListIndex];
- NotifyClientSubscribersOfDataChange(&row, cloudData->specificSubscribers, wasUpdated );
- }
- NotifyClientSubscribersOfDataChange(&row, cloudDataList->nonSpecificSubscribers, wasUpdated );
- DeallocateRowData(row.data);
- }
- void CloudServer::GetServersWithUploadedKeys(
- DataStructures::List<CloudKey> &keys,
- DataStructures::List<CloudServer::RemoteServer*> &remoteServersWithData
- )
- {
- remoteServersWithData.Clear(true, _FILE_AND_LINE_);
- unsigned int i,j;
- for (i=0; i < remoteServers.Size(); i++)
- {
- remoteServers[i]->workingFlag=false;
- }
- for (i=0; i < remoteServers.Size(); i++)
- {
- if (remoteServers[i]->workingFlag==false)
- {
- if (remoteServers[i]->gotSubscribedAndUploadedKeys==false)
- {
- remoteServers[i]->workingFlag=true;
- remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_);
- }
- else
- {
- remoteServers[i]->workingFlag=false;
- for (j=0; j < keys.Size(); j++)
- {
- if (remoteServers[i]->workingFlag==false && remoteServers[i]->uploadedKeys.HasData(keys[j]))
- {
- remoteServers[i]->workingFlag=true;
- remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_);
- break;
- }
- }
- }
- }
- }
- }
- CloudServer::CloudDataList *CloudServer::GetOrAllocateCloudDataList(CloudKey key, bool *dataRepositoryExists, unsigned int &dataRepositoryIndex)
- {
- CloudDataList *cloudDataList;
- dataRepositoryIndex = dataRepository.GetIndexFromKey(key, dataRepositoryExists);
- if (*dataRepositoryExists==false)
- {
- cloudDataList = RakNet::OP_NEW<CloudDataList>(_FILE_AND_LINE_);
- cloudDataList->key=key;
- cloudDataList->uploaderCount=0;
- cloudDataList->subscriberCount=0;
- dataRepository.InsertAtIndex(cloudDataList,dataRepositoryIndex,_FILE_AND_LINE_);
- }
- else
- {
- cloudDataList = dataRepository[dataRepositoryIndex];
- }
- return cloudDataList;
- }
- void CloudServer::UnsubscribeFromKey(RemoteCloudClient *remoteCloudClient, RakNetGUID remoteCloudClientGuid, unsigned int keySubscriberIndex, CloudKey &cloudKey, DataStructures::List<RakNetGUID> &specificSystems)
- {
- KeySubscriberID* keySubscriberId = remoteCloudClient->subscribedKeys[keySubscriberIndex];
- // If removing specific systems, but global subscription, fail
- if (keySubscriberId->specificSystemsSubscribedTo.Size()==0 && specificSystems.Size()>0)
- return;
- bool dataRepositoryExists;
- CloudDataList *cloudDataList;
- unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists);
- if (dataRepositoryExists==false)
- return;
- unsigned int i,j;
- cloudDataList = dataRepository[dataRepositoryIndex];
- if (specificSystems.Size()==0)
- {
- // Remove global subscriber. If returns false, have to remove specific subscribers
- if (cloudDataList->RemoveSubscriber(remoteCloudClientGuid)==false)
- {
- for (i=0; i < keySubscriberId->specificSystemsSubscribedTo.Size(); i++)
- {
- RemoveSpecificSubscriber(keySubscriberId->specificSystemsSubscribedTo[i], cloudDataList, remoteCloudClientGuid);
- }
- }
- keySubscriberId->specificSystemsSubscribedTo.Clear(true, _FILE_AND_LINE_);
- }
- else
- {
- for (j=0; j < specificSystems.Size(); j++)
- {
- unsigned int specificSystemsSubscribedToIndex;
- bool hasSpecificSystemsSubscribedTo;
- specificSystemsSubscribedToIndex=keySubscriberId->specificSystemsSubscribedTo.GetIndexFromKey(specificSystems[j], &hasSpecificSystemsSubscribedTo);
- if (hasSpecificSystemsSubscribedTo)
- {
- RemoveSpecificSubscriber(specificSystems[j], cloudDataList, remoteCloudClientGuid);
- keySubscriberId->specificSystemsSubscribedTo.RemoveAtIndex(specificSystemsSubscribedToIndex);
- }
- }
- }
- if (keySubscriberId->specificSystemsSubscribedTo.Size()==0)
- {
- RakNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_);
- remoteCloudClient->subscribedKeys.RemoveAtIndex(keySubscriberIndex);
- }
- if (cloudDataList->subscriberCount==0)
- RemoveSubscribedKeyFromServers(cloudKey);
- if (cloudDataList->IsUnused())
- {
- RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
- dataRepository.RemoveAtIndex(dataRepositoryIndex);
- }
- }
- void CloudServer::RemoveSpecificSubscriber(RakNetGUID specificSubscriber, CloudDataList *cloudDataList, RakNetGUID remoteCloudClientGuid)
- {
- bool keyDataListExists;
- unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSubscriber, &keyDataListExists);
- if (keyDataListExists==false)
- return;
- CloudData *cloudData = cloudDataList->keyData[keyDataListIndex];
- bool hasSpecificSubscriber;
- unsigned int specificSubscriberIndex = cloudData->specificSubscribers.GetIndexFromKey(remoteCloudClientGuid, &hasSpecificSubscriber);
- if (hasSpecificSubscriber)
- {
- cloudData->specificSubscribers.RemoveAtIndex(specificSubscriberIndex);
- cloudDataList->subscriberCount--;
- if (cloudData->IsUnused())
- {
- RakNet::OP_DELETE(cloudData, _FILE_AND_LINE_);
- cloudDataList->keyData.RemoveAtIndex(keyDataListIndex);
- }
- }
- }
- void CloudServer::ForceExternalSystemAddress(SystemAddress forcedAddress)
- {
- forceAddress=forcedAddress;
- }
- void CloudServer::AddQueryFilter(CloudServerQueryFilter* filter)
- {
- if (queryFilters.GetIndexOf(filter)!=(unsigned int) -1)
- return;
- queryFilters.Push(filter, _FILE_AND_LINE_);
- }
- void CloudServer::RemoveQueryFilter(CloudServerQueryFilter* filter)
- {
- unsigned int index;
- index = queryFilters.GetIndexOf(filter);
- if (index != (unsigned int) -1)
- queryFilters.RemoveAtIndex(index);
- }
- void CloudServer::RemoveAllQueryFilters(void)
- {
- queryFilters.Clear(true, _FILE_AND_LINE_);
- }
- #endif
|