/* * Copyright (c) 2014, Oculus VR, Inc. * All rights reserved. * * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. * */ #include "NativeFeatureIncludes.h" #if _RAKNET_SUPPORT_CloudServer==1 #include "CloudServer.h" #include "GetTime.h" #include "MessageIdentifiers.h" #include "BitStream.h" #include "RakPeerInterface.h" enum ServerToServerCommands { STSC_PROCESS_GET_REQUEST, STSC_PROCESS_GET_RESPONSE, STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS, STSC_ADD_UPLOADED_KEY, STSC_ADD_SUBSCRIBED_KEY, STSC_REMOVE_UPLOADED_KEY, STSC_REMOVE_SUBSCRIBED_KEY, STSC_DATA_CHANGED, }; using namespace RakNet; int CloudServer::RemoteServerComp(const RakNetGUID &key, RemoteServer* const &data ) { if (key < data->serverAddress) return -1; if (key > data->serverAddress) return 1; return 0; } int CloudServer::KeySubscriberIDComp(const CloudKey &key, KeySubscriberID * const &data ) { if (key.primaryKey < data->key.primaryKey) return -1; if (key.primaryKey > data->key.primaryKey) return 1; if (key.secondaryKey < data->key.secondaryKey) return -1; if (key.secondaryKey > data->key.secondaryKey) return 1; return 0; } int CloudServer::KeyDataPtrComp( const RakNetGUID &key, CloudData* const &data ) { if (key < data->clientGUID) return -1; if (key > data->clientGUID) return 1; return 0; } int CloudServer::KeyDataListComp( const CloudKey &key, CloudDataList * const &data ) { if (key.primaryKey < data->key.primaryKey) return -1; if (key.primaryKey > data->key.primaryKey) return 1; if (key.secondaryKey < data->key.secondaryKey) return -1; if (key.secondaryKey > data->key.secondaryKey) return 1; return 0; } int CloudServer::BufferedGetResponseFromServerComp(const RakNetGUID &key, CloudServer::BufferedGetResponseFromServer* const &data ) { if (key < data->serverAddress) return -1; if (key > data->serverAddress) return 1; return 0; } int CloudServer::GetRequestComp(const uint32_t &key, CloudServer::GetRequest* const &data ) { if (key < data->requestId) return -1; if (key > data->requestId) return -1; return 0; } void CloudServer::CloudQueryWithAddresses::Serialize(bool writeToBitstream, BitStream *bitStream) { cloudQuery.Serialize(writeToBitstream, bitStream); if (writeToBitstream) { bitStream->WriteCasted(specificSystems.Size()); RakAssert(specificSystems.Size() < (uint16_t)-1 ); for (uint16_t i=0; i < specificSystems.Size(); i++) { bitStream->Write(specificSystems[i]); } } else { uint16_t specificSystemsCount; RakNetGUID addressOrGuid; bitStream->Read(specificSystemsCount); for (uint16_t i=0; i < specificSystemsCount; i++) { bitStream->Read(addressOrGuid); specificSystems.Push(addressOrGuid, _FILE_AND_LINE_); } } } bool CloudServer::GetRequest::AllRemoteServersHaveResponded(void) const { unsigned int i; for (i=0; i < remoteServerResponses.Size(); i++) if (remoteServerResponses[i]->gotResult==false) return false; return true; } void CloudServer::GetRequest::Clear(CloudAllocator *allocator) { unsigned int i; for (i=0; i < remoteServerResponses.Size(); i++) { remoteServerResponses[i]->Clear(allocator); RakNet::OP_DELETE(remoteServerResponses[i], _FILE_AND_LINE_); } remoteServerResponses.Clear(false, _FILE_AND_LINE_); } void CloudServer::BufferedGetResponseFromServer::Clear(CloudAllocator *allocator) { unsigned int i; for (i=0; i < queryResult.rowsReturned.Size(); i++) { allocator->DeallocateRowData(queryResult.rowsReturned[i]->data); allocator->DeallocateCloudQueryRow(queryResult.rowsReturned[i]); } queryResult.rowsReturned.Clear(false, _FILE_AND_LINE_); } CloudServer::CloudServer() { maxUploadBytesPerClient=0; maxBytesPerDowload=0; nextGetRequestId=0; nextGetRequestsCheck=0; } CloudServer::~CloudServer() { Clear(); } void CloudServer::SetMaxUploadBytesPerClient(uint64_t bytes) { maxUploadBytesPerClient=bytes; } void CloudServer::SetMaxBytesPerDownload(uint64_t bytes) { maxBytesPerDowload=bytes; } void CloudServer::Update(void) { // Timeout getRequests RakNet::Time time = RakNet::Time(); if (time > nextGetRequestsCheck) { nextGetRequestsCheck=time+1000; unsigned int i=0; while (i < getRequests.Size()) { if (time - getRequests[i]->requestStartTime > 3000) { // Remote server is not responding, just send back data with whoever did respond ProcessAndTransmitGetRequest(getRequests[i]); getRequests[i]->Clear(this); RakNet::OP_DELETE(getRequests[i],_FILE_AND_LINE_); getRequests.RemoveAtIndex(i); } else { i++; } } } } PluginReceiveResult CloudServer::OnReceive(Packet *packet) { switch (packet->data[0]) { case ID_CLOUD_POST_REQUEST: OnPostRequest(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case ID_CLOUD_RELEASE_REQUEST: OnReleaseRequest(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case ID_CLOUD_GET_REQUEST: OnGetRequest(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case ID_CLOUD_UNSUBSCRIBE_REQUEST: OnUnsubscribeRequest(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case ID_CLOUD_SERVER_TO_SERVER_COMMAND: if (packet->length>1) { switch (packet->data[1]) { case STSC_PROCESS_GET_REQUEST: OnServerToServerGetRequest(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case STSC_PROCESS_GET_RESPONSE: OnServerToServerGetResponse(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS: OnSendUploadedAndSubscribedKeysToServer(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case STSC_ADD_UPLOADED_KEY: OnSendUploadedKeyToServers(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case STSC_ADD_SUBSCRIBED_KEY: OnSendSubscribedKeyToServers(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case STSC_REMOVE_UPLOADED_KEY: OnRemoveUploadedKeyFromServers(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case STSC_REMOVE_SUBSCRIBED_KEY: OnRemoveSubscribedKeyFromServers(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; case STSC_DATA_CHANGED: OnServerDataChanged(packet); return RR_STOP_PROCESSING_AND_DEALLOCATE; } } return RR_STOP_PROCESSING_AND_DEALLOCATE; } return RR_CONTINUE_PROCESSING; } void CloudServer::OnPostRequest(Packet *packet) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)); CloudKey key; key.Serialize(false,&bsIn); uint32_t dataLengthBytes; bsIn.Read(dataLengthBytes); if (maxUploadBytesPerClient>0 && dataLengthBytes>maxUploadBytesPerClient) return; // Exceeded max upload bytes bsIn.AlignReadToByteBoundary(); for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++) { if (queryFilters[filterIndex]->OnPostRequest(packet->guid, packet->systemAddress, key, dataLengthBytes, (const char*) bsIn.GetData()+BITS_TO_BYTES(bsIn.GetReadOffset()))==false) return; } unsigned char *data; if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE) { data = (unsigned char *) rakMalloc_Ex(dataLengthBytes,_FILE_AND_LINE_); if (data==0) { notifyOutOfMemory(_FILE_AND_LINE_); return; } bsIn.ReadAlignedBytes(data,dataLengthBytes); } else data=0; // Add this system to remoteSystems if they aren't there already DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid); RemoteCloudClient *remoteCloudClient; if (remoteSystemsHashIndex.IsInvalid()) { remoteCloudClient = RakNet::OP_NEW(_FILE_AND_LINE_); remoteCloudClient->uploadedKeys.Insert(key,key,true,_FILE_AND_LINE_); remoteCloudClient->uploadedBytes=0; remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_); } else { remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex); bool objectExists; // Add to RemoteCloudClient::uploadedKeys if it isn't there already unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists); if (objectExists==false) { remoteCloudClient->uploadedKeys.InsertAtIndex(key, uploadedKeysIndex, _FILE_AND_LINE_); } } bool cloudDataAlreadyUploaded; unsigned int dataRepositoryIndex; bool dataRepositoryExists; CloudDataList* cloudDataList = GetOrAllocateCloudDataList(key, &dataRepositoryExists, dataRepositoryIndex); if (dataRepositoryExists==false) { cloudDataList->uploaderCount=1; cloudDataAlreadyUploaded=false; } else { cloudDataAlreadyUploaded=cloudDataList->uploaderCount>0; cloudDataList->uploaderCount++; } CloudData *cloudData; bool keyDataListExists; unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists); if (keyDataListExists==false) { if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes+dataLengthBytes>maxUploadBytesPerClient) { // Undo prior insertion of cloudDataList into cloudData if needed if (keyDataListExists==false) { RakNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_); dataRepository.RemoveAtIndex(dataRepositoryIndex); } if (remoteCloudClient->IsUnused()) { RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_); remoteSystems.Remove(packet->guid, _FILE_AND_LINE_); } if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE) rakFree_Ex(data, _FILE_AND_LINE_); return; } cloudData = RakNet::OP_NEW(_FILE_AND_LINE_); cloudData->dataLengthBytes=dataLengthBytes; cloudData->isUploaded=true; if (forceAddress!=UNASSIGNED_SYSTEM_ADDRESS) { cloudData->serverSystemAddress=forceAddress; cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort()); } else { cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress); if (cloudData->serverSystemAddress.IsLoopback()) cloudData->serverSystemAddress.FromString(rakPeerInterface->GetLocalIP(0)); } if (cloudData->serverSystemAddress.GetPort()==0) { // Fix localhost port cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort()); } cloudData->clientSystemAddress=packet->systemAddress; cloudData->serverGUID=rakPeerInterface->GetMyGUID(); cloudData->clientGUID=packet->guid; cloudDataList->keyData.Insert(packet->guid,cloudData,true,_FILE_AND_LINE_); } else { cloudData = cloudDataList->keyData[keyDataListIndex]; if (cloudDataAlreadyUploaded==false) { if (forceAddress!=UNASSIGNED_SYSTEM_ADDRESS) { cloudData->serverSystemAddress=forceAddress; cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort()); } else { cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress); } if (cloudData->serverSystemAddress.GetPort()==0) { // Fix localhost port cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort()); } cloudData->clientSystemAddress=packet->systemAddress; } if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes-cloudData->dataLengthBytes+dataLengthBytes>maxUploadBytesPerClient) { // Undo prior insertion of cloudDataList into cloudData if needed if (dataRepositoryExists==false) { RakNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_); dataRepository.RemoveAtIndex(dataRepositoryIndex); } return; } else { // Subtract already used bytes we are overwriting remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes; } if (cloudData->allocatedData!=0) rakFree_Ex(cloudData->allocatedData,_FILE_AND_LINE_); } if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE) { // Data already allocated cloudData->allocatedData=data; cloudData->dataPtr=data; } else { // Read to stack if (dataLengthBytes>0) bsIn.ReadAlignedBytes(cloudData->stackData,dataLengthBytes); cloudData->allocatedData=0; cloudData->dataPtr=cloudData->stackData; } // Update how many bytes were written for this data cloudData->dataLengthBytes=dataLengthBytes; remoteCloudClient->uploadedBytes+=dataLengthBytes; if (cloudDataAlreadyUploaded==false) { // New data field SendUploadedKeyToServers(cloudDataList->key); } // Existing data field changed NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, true ); NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, true ); // Send update to all remote servers that subscribed to this key NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, true); // I could have also subscribed to a key not yet updated locally // This means I have to go through every RemoteClient that wants this key // Seems like cloudData->specificSubscribers is unnecessary in that case } void CloudServer::OnReleaseRequest(Packet *packet) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)); uint16_t keyCount; bsIn.Read(keyCount); if (keyCount==0) return; DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid); if (remoteSystemIndex.IsInvalid()==true) return; RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex); CloudKey key; // Read all in a list first so I can run filter on it DataStructures::List cloudKeys; for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++) { key.Serialize(false, &bsIn); cloudKeys.Push(key, _FILE_AND_LINE_); } for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++) { if (queryFilters[filterIndex]->OnReleaseRequest(packet->guid, packet->systemAddress, cloudKeys)==false) return; } for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++) { // Serialize in list above so I can run the filter on it // key.Serialize(false, &bsIn); key=cloudKeys[keyCountIndex]; // Remove remote systems uploaded keys bool objectExists; unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists); if (objectExists) { bool dataRepositoryExists; unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists); CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex]; RakAssert(cloudDataList); CloudData *cloudData; bool keyDataListExists; unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists); cloudData = cloudDataList->keyData[keyDataListIndex]; remoteCloudClient->uploadedKeys.RemoveAtIndex(uploadedKeysIndex); remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes; cloudDataList->uploaderCount--; // Broadcast destruction of this key to subscribers NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false ); NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false ); NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false ); cloudData->Clear(); if (cloudData->IsUnused()) { RakNet::OP_DELETE(cloudData, _FILE_AND_LINE_); cloudDataList->keyData.RemoveAtIndex(keyDataListIndex); if (cloudDataList->IsNotUploaded()) { // Tell other servers that this key is no longer uploaded, so they do not request it from us RemoveUploadedKeyFromServers(cloudDataList->key); } if (cloudDataList->IsUnused()) { RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_); dataRepository.RemoveAtIndex(dataRepositoryIndex); } } if (remoteCloudClient->IsUnused()) { RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_); remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_); break; } } } } void CloudServer::OnGetRequest(Packet *packet) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)); uint16_t specificSystemsCount; CloudKey cloudKey; // Create a new GetRequest GetRequest *getRequest; getRequest = RakNet::OP_NEW(_FILE_AND_LINE_); getRequest->cloudQueryWithAddresses.cloudQuery.Serialize(false, &bsIn); getRequest->requestingClient=packet->guid; RakNetGUID addressOrGuid; bsIn.Read(specificSystemsCount); for (uint16_t i=0; i < specificSystemsCount; i++) { bsIn.Read(addressOrGuid); getRequest->cloudQueryWithAddresses.specificSystems.Push(addressOrGuid, _FILE_AND_LINE_); } if (getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size()==0) { RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_); return; } for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++) { if (queryFilters[filterIndex]->OnGetRequest(packet->guid, packet->systemAddress, getRequest->cloudQueryWithAddresses.cloudQuery, getRequest->cloudQueryWithAddresses.specificSystems )==false) return; } getRequest->requestStartTime=RakNet::GetTime(); getRequest->requestId=nextGetRequestId++; // Send request to servers that have this data DataStructures::List remoteServersWithData; GetServersWithUploadedKeys(getRequest->cloudQueryWithAddresses.cloudQuery.keys, remoteServersWithData); if (remoteServersWithData.Size()==0) { ProcessAndTransmitGetRequest(getRequest); } else { RakNet::BitStream bsOut; bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND); bsOut.Write((MessageID)STSC_PROCESS_GET_REQUEST); getRequest->cloudQueryWithAddresses.Serialize(true, &bsOut); bsOut.Write(getRequest->requestId); for (unsigned int remoteServerIndex=0; remoteServerIndex < remoteServersWithData.Size(); remoteServerIndex++) { BufferedGetResponseFromServer* bufferedGetResponseFromServer = RakNet::OP_NEW(_FILE_AND_LINE_); bufferedGetResponseFromServer->serverAddress=remoteServersWithData[remoteServerIndex]->serverAddress; bufferedGetResponseFromServer->gotResult=false; getRequest->remoteServerResponses.Insert(remoteServersWithData[remoteServerIndex]->serverAddress, bufferedGetResponseFromServer, true, _FILE_AND_LINE_); SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServersWithData[remoteServerIndex]->serverAddress, false); } // Record that this system made this request getRequests.Insert(getRequest->requestId, getRequest, true, _FILE_AND_LINE_); } if (getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults) { // Add to key subscription list for the client, which contains a keyId / specificUploaderList pair DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid); RemoteCloudClient *remoteCloudClient; if (remoteSystemsHashIndex.IsInvalid()) { remoteCloudClient = RakNet::OP_NEW(_FILE_AND_LINE_); remoteCloudClient->uploadedBytes=0; remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_); } else { remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex); } unsigned int keyIndex; for (keyIndex=0; keyIndex < getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size(); keyIndex++) { cloudKey = getRequest->cloudQueryWithAddresses.cloudQuery.keys[keyIndex]; unsigned int keySubscriberIndex; bool hasKeySubscriber; keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber); KeySubscriberID* keySubscriberId; if (hasKeySubscriber) { DataStructures::List specificSystems; UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems); } keySubscriberId = RakNet::OP_NEW(_FILE_AND_LINE_); keySubscriberId->key=cloudKey; unsigned int specificSystemIndex; for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++) { keySubscriberId->specificSystemsSubscribedTo.Insert(getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], true, _FILE_AND_LINE_); } remoteCloudClient->subscribedKeys.InsertAtIndex(keySubscriberId, keySubscriberIndex, _FILE_AND_LINE_); // Add CloudData in a similar way unsigned int dataRepositoryIndex; bool dataRepositoryExists; CloudDataList* cloudDataList = GetOrAllocateCloudDataList(cloudKey, &dataRepositoryExists, dataRepositoryIndex); // If this is the first local client to subscribe to this key, call SendSubscribedKeyToServers if (cloudDataList->subscriberCount==0) SendSubscribedKeyToServers(cloudKey); // If the subscription is specific, may have to also allocate CloudData if (getRequest->cloudQueryWithAddresses.specificSystems.Size()) { CloudData *cloudData; bool keyDataListExists; unsigned int specificSystemIndex; for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++) { RakNetGUID specificSystem = getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex]; unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSystem, &keyDataListExists); if (keyDataListExists==false) { cloudData = RakNet::OP_NEW(_FILE_AND_LINE_); cloudData->dataLengthBytes=0; cloudData->allocatedData=0; cloudData->isUploaded=false; cloudData->dataPtr=0; cloudData->serverSystemAddress=UNASSIGNED_SYSTEM_ADDRESS; cloudData->clientSystemAddress=UNASSIGNED_SYSTEM_ADDRESS; cloudData->serverGUID=rakPeerInterface->GetMyGUID(); cloudData->clientGUID=specificSystem; cloudDataList->keyData.Insert(specificSystem,cloudData,true,_FILE_AND_LINE_); } else { cloudData = cloudDataList->keyData[keyDataListIndex]; } ++cloudDataList->subscriberCount; cloudData->specificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_); } } else { ++cloudDataList->subscriberCount; cloudDataList->nonSpecificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_); // Remove packet->guid from CloudData::specificSubscribers among all instances of cloudDataList->keyData unsigned int subscribedKeysIndex; bool subscribedKeysIndexExists; subscribedKeysIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudDataList->key, &subscribedKeysIndexExists); if (subscribedKeysIndexExists) { KeySubscriberID* keySubscriberId; keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex]; unsigned int specificSystemIndex; for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++) { bool keyDataExists; unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists); if (keyDataExists) { CloudData *keyData = cloudDataList->keyData[keyDataIndex]; keyData->specificSubscribers.Remove(packet->guid); --cloudDataList->subscriberCount; } } } } } if (remoteCloudClient->subscribedKeys.Size()==0) { // Didn't do anything remoteSystems.Remove(packet->guid, _FILE_AND_LINE_); RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_); } } if (remoteServersWithData.Size()==0) RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_); } void CloudServer::OnUnsubscribeRequest(Packet *packet) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)); DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid); if (remoteSystemIndex.IsInvalid()==true) return; RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex); uint16_t keyCount, specificSystemCount; DataStructures::List cloudKeys; DataStructures::List specificSystems; uint16_t index; CloudKey cloudKey; bsIn.Read(keyCount); for (index=0; index < keyCount; index++) { cloudKey.Serialize(false, &bsIn); cloudKeys.Push(cloudKey, _FILE_AND_LINE_); } RakNetGUID specificSystem; bsIn.Read(specificSystemCount); for (index=0; index < specificSystemCount; index++) { bsIn.Read(specificSystem); specificSystems.Push(specificSystem, _FILE_AND_LINE_); } for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++) { if (queryFilters[filterIndex]->OnUnsubscribeRequest(packet->guid, packet->systemAddress, cloudKeys, specificSystems )==false) return; } // CloudDataList *cloudDataList; bool dataRepositoryExists; // unsigned int dataRepositoryIndex; for (index=0; index < keyCount; index++) { CloudKey cloudKey = cloudKeys[index]; // dataRepositoryIndex = dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists); if (dataRepositoryExists==false) continue; // cloudDataList = dataRepository[dataRepositoryIndex]; unsigned int keySubscriberIndex; bool hasKeySubscriber; keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber); if (hasKeySubscriber==false) continue; UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems); } if (remoteCloudClient->IsUnused()) { RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_); remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_); } } void CloudServer::OnServerToServerGetRequest(Packet *packet) { // unsigned int remoteServerIndex; bool objectExists; //remoteServerIndex = remoteServers.GetIndexFromKey(packet->guid, &objectExists); if (objectExists==false) return; RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)*2); CloudQueryWithAddresses cloudQueryWithAddresses; uint32_t requestId; cloudQueryWithAddresses.Serialize(false, &bsIn); bsIn.Read(requestId); DataStructures::List cloudDataResultList; DataStructures::List cloudKeyResultList; ProcessCloudQueryWithAddresses(cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList); RakNet::BitStream bsOut; bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND); bsOut.Write((MessageID)STSC_PROCESS_GET_RESPONSE); bsOut.Write(requestId); WriteCloudQueryRowFromResultList(cloudDataResultList, cloudKeyResultList, &bsOut); SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, packet->guid, false); } void CloudServer::OnServerToServerGetResponse(Packet *packet) { unsigned int remoteServerIndex; bool objectExists; remoteServerIndex = remoteServers.GetIndexFromKey(packet->guid, &objectExists); if (objectExists==false) return; RemoteServer *remoteServer = remoteServers[remoteServerIndex]; if (remoteServer==0) return; RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)*2); uint32_t requestId; bsIn.Read(requestId); // Lookup request id bool hasGetRequest; unsigned int getRequestIndex; getRequestIndex = getRequests.GetIndexFromKey(requestId, &hasGetRequest); if (hasGetRequest==false) return; GetRequest *getRequest = getRequests[getRequestIndex]; bool hasRemoteServer; unsigned int remoteServerResponsesIndex; remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(packet->guid, &hasRemoteServer); if (hasRemoteServer==false) return; BufferedGetResponseFromServer *bufferedGetResponseFromServer; bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponsesIndex]; if (bufferedGetResponseFromServer->gotResult==true) return; bufferedGetResponseFromServer->gotResult=true; uint32_t numRows; bufferedGetResponseFromServer->queryResult.SerializeNumRows(false, numRows, &bsIn); bufferedGetResponseFromServer->queryResult.SerializeCloudQueryRows(false, numRows, &bsIn, this); // If all results returned, then also process locally, and return to user if (getRequest->AllRemoteServersHaveResponded()) { ProcessAndTransmitGetRequest(getRequest); getRequest->Clear(this); RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_); getRequests.RemoveAtIndex(getRequestIndex); } } void CloudServer::OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason ) { (void) lostConnectionReason; (void) systemAddress; unsigned int remoteServerIndex; bool objectExists; remoteServerIndex = remoteServers.GetIndexFromKey(rakNetGUID, &objectExists); if (objectExists) { // Update remoteServerResponses by removing this server and sending the response if it is now complete unsigned int getRequestIndex=0; while (getRequestIndex < getRequests.Size()) { GetRequest *getRequest = getRequests[getRequestIndex]; bool waitingForThisServer; unsigned int remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(rakNetGUID, &waitingForThisServer); if (waitingForThisServer) { getRequest->remoteServerResponses[remoteServerResponsesIndex]->Clear(this); RakNet::OP_DELETE(getRequest->remoteServerResponses[remoteServerResponsesIndex], _FILE_AND_LINE_); getRequest->remoteServerResponses.RemoveAtIndex(remoteServerResponsesIndex); if (getRequest->AllRemoteServersHaveResponded()) { ProcessAndTransmitGetRequest(getRequest); getRequest->Clear(this); RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_); getRequests.RemoveAtIndex(getRequestIndex); } else getRequestIndex++; } else getRequestIndex++; } RakNet::OP_DELETE(remoteServers[remoteServerIndex],_FILE_AND_LINE_); remoteServers.RemoveAtIndex(remoteServerIndex); } DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(rakNetGUID); if (remoteSystemIndex.IsInvalid()==false) { RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex); unsigned int uploadedKeysIndex; for (uploadedKeysIndex=0; uploadedKeysIndex < remoteCloudClient->uploadedKeys.Size(); uploadedKeysIndex++) { // Delete keys this system has uploaded bool keyDataRepositoryExists; unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->uploadedKeys[uploadedKeysIndex], &keyDataRepositoryExists); if (keyDataRepositoryExists) { CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex]; bool keyDataExists; unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(rakNetGUID, &keyDataExists); if (keyDataExists) { CloudData *cloudData = cloudDataList->keyData[keyDataIndex]; cloudDataList->uploaderCount--; NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false ); NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false ); NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false ); cloudData->Clear(); if (cloudData->IsUnused()) { RakNet::OP_DELETE(cloudData,_FILE_AND_LINE_); cloudDataList->keyData.RemoveAtIndex(keyDataIndex); if (cloudDataList->IsNotUploaded()) { // Tell other servers that this key is no longer uploaded, so they do not request it from us RemoveUploadedKeyFromServers(cloudDataList->key); } if (cloudDataList->IsUnused()) { // Tell other servers that this key is no longer uploaded, so they do not request it from us RemoveUploadedKeyFromServers(cloudDataList->key); RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_); dataRepository.RemoveAtIndex(dataRepositoryIndex); } } } } } unsigned int subscribedKeysIndex; for (subscribedKeysIndex=0; subscribedKeysIndex < remoteCloudClient->subscribedKeys.Size(); subscribedKeysIndex++) { KeySubscriberID* keySubscriberId; keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex]; bool keyDataRepositoryExists; unsigned int keyDataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->subscribedKeys[subscribedKeysIndex]->key, &keyDataRepositoryExists); if (keyDataRepositoryExists) { CloudDataList* cloudDataList = dataRepository[keyDataRepositoryIndex]; if (keySubscriberId->specificSystemsSubscribedTo.Size()==0) { cloudDataList->nonSpecificSubscribers.Remove(rakNetGUID); --cloudDataList->subscriberCount; } else { unsigned int specificSystemIndex; for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++) { bool keyDataExists; unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists); if (keyDataExists) { CloudData *keyData = cloudDataList->keyData[keyDataIndex]; keyData->specificSubscribers.Remove(rakNetGUID); --cloudDataList->subscriberCount; } } } } RakNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_); } // Delete and remove from remoteSystems RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_); remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_); } } void CloudServer::OnRakPeerShutdown(void) { Clear(); } void CloudServer::Clear(void) { unsigned int i,j; for (i=0; i < dataRepository.Size(); i++) { CloudDataList *cloudDataList = dataRepository[i]; for (j=0; j < cloudDataList->keyData.Size(); j++) { cloudDataList->keyData[j]->Clear(); RakNet::OP_DELETE(cloudDataList->keyData[j], _FILE_AND_LINE_); } RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_); } dataRepository.Clear(false, _FILE_AND_LINE_); for (i=0; i < remoteServers.Size(); i++) { RakNet::OP_DELETE(remoteServers[i], _FILE_AND_LINE_); } remoteServers.Clear(false, _FILE_AND_LINE_); for (i=0; i < getRequests.Size(); i++) { GetRequest *getRequest = getRequests[i]; getRequest->Clear(this); RakNet::OP_DELETE(getRequests[i], _FILE_AND_LINE_); } getRequests.Clear(false, _FILE_AND_LINE_); DataStructures::List keyList; DataStructures::List itemList; remoteSystems.GetAsList(itemList, keyList, _FILE_AND_LINE_); for (i=0; i < itemList.Size(); i++) { RemoteCloudClient* remoteCloudClient = itemList[i]; for (j=0; j < remoteCloudClient->subscribedKeys.Size(); j++) { RakNet::OP_DELETE(remoteCloudClient->subscribedKeys[j], _FILE_AND_LINE_); } RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_); } remoteSystems.Clear(_FILE_AND_LINE_); } void CloudServer::WriteCloudQueryRowFromResultList(DataStructures::List &cloudDataResultList, DataStructures::List &cloudKeyResultList, BitStream *bsOut) { bsOut->WriteCasted(cloudKeyResultList.Size()); unsigned int i; for (i=0; i < cloudKeyResultList.Size(); i++) { WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, bsOut); } } void CloudServer::WriteCloudQueryRowFromResultList(unsigned int i, DataStructures::List &cloudDataResultList, DataStructures::List &cloudKeyResultList, BitStream *bsOut) { CloudQueryRow cloudQueryRow; CloudData *cloudData = cloudDataResultList[i]; cloudQueryRow.key=cloudKeyResultList[i]; cloudQueryRow.data=cloudData->dataPtr; cloudQueryRow.length=cloudData->dataLengthBytes; cloudQueryRow.serverSystemAddress=cloudData->serverSystemAddress; cloudQueryRow.clientSystemAddress=cloudData->clientSystemAddress; cloudQueryRow.serverGUID=cloudData->serverGUID; cloudQueryRow.clientGUID=cloudData->clientGUID; cloudQueryRow.Serialize(true, bsOut, 0); } void CloudServer::NotifyClientSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, DataStructures::OrderedList &subscribers, bool wasUpdated ) { RakNet::BitStream bsOut; bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION); bsOut.Write(wasUpdated); CloudQueryRow row; row.key=key; row.data=cloudData->dataPtr; row.length=cloudData->dataLengthBytes; row.serverSystemAddress=cloudData->serverSystemAddress; row.clientSystemAddress=cloudData->clientSystemAddress; row.serverGUID=cloudData->serverGUID; row.clientGUID=cloudData->clientGUID; row.Serialize(true,&bsOut,0); unsigned int i; for (i=0; i < subscribers.Size(); i++) { SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false); } } void CloudServer::NotifyClientSubscribersOfDataChange( CloudQueryRow *row, DataStructures::OrderedList &subscribers, bool wasUpdated ) { RakNet::BitStream bsOut; bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION); bsOut.Write(wasUpdated); row->Serialize(true,&bsOut,0); unsigned int i; for (i=0; i < subscribers.Size(); i++) { SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false); } } void CloudServer::NotifyServerSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, bool wasUpdated ) { // Find every server that has subscribed // Send them change notifications RakNet::BitStream bsOut; bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND); bsOut.Write((MessageID)STSC_DATA_CHANGED); bsOut.Write(wasUpdated); CloudQueryRow row; row.key=key; row.data=cloudData->dataPtr; row.length=cloudData->dataLengthBytes; row.serverSystemAddress=cloudData->serverSystemAddress; row.clientSystemAddress=cloudData->clientSystemAddress; row.serverGUID=cloudData->serverGUID; row.clientGUID=cloudData->clientGUID; row.Serialize(true,&bsOut,0); unsigned int i; for (i=0; i < remoteServers.Size(); i++) { if (remoteServers[i]->gotSubscribedAndUploadedKeys==false || remoteServers[i]->subscribedKeys.HasData(key)) { SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false); } } } void CloudServer::AddServer(RakNetGUID systemIdentifier) { ConnectionState cs = rakPeerInterface->GetConnectionState(systemIdentifier); if (cs==IS_DISCONNECTED || cs==IS_NOT_CONNECTED) return; bool objectExists; unsigned int index = remoteServers.GetIndexFromKey(systemIdentifier,&objectExists); if (objectExists==false) { RemoteServer *remoteServer = RakNet::OP_NEW(_FILE_AND_LINE_); remoteServer->gotSubscribedAndUploadedKeys=false; remoteServer->serverAddress=systemIdentifier; remoteServers.InsertAtIndex(remoteServer, index, _FILE_AND_LINE_); SendUploadedAndSubscribedKeysToServer(systemIdentifier); } } void CloudServer::RemoveServer(RakNetGUID systemAddress) { bool objectExists; unsigned int index = remoteServers.GetIndexFromKey(systemAddress,&objectExists); if (objectExists==true) { RakNet::OP_DELETE(remoteServers[index],_FILE_AND_LINE_); remoteServers.RemoveAtIndex(index); } } void CloudServer::GetRemoteServers(DataStructures::List &remoteServersOut) { remoteServersOut.Clear(true, _FILE_AND_LINE_); unsigned int i; for (i=0; i < remoteServers.Size(); i++) { remoteServersOut.Push(remoteServers[i]->serverAddress, _FILE_AND_LINE_); } } void CloudServer::ProcessAndTransmitGetRequest(GetRequest *getRequest) { RakNet::BitStream bsOut; bsOut.Write((MessageID) ID_CLOUD_GET_RESPONSE); // BufferedGetResponseFromServer getResponse; CloudQueryResult cloudQueryResult; cloudQueryResult.cloudQuery=getRequest->cloudQueryWithAddresses.cloudQuery; cloudQueryResult.subscribeToResults=getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults; cloudQueryResult.SerializeHeader(true, &bsOut); DataStructures::List cloudDataResultList; DataStructures::List cloudKeyResultList; ProcessCloudQueryWithAddresses(getRequest->cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList); bool unlimitedRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn==0; uint32_t localNumRows = (uint32_t) cloudDataResultList.Size(); if (unlimitedRows==false && localNumRows > getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex && localNumRows - getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex > getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn ) localNumRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex + getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn; BitSize_t bitStreamOffset = bsOut.GetWriteOffset(); uint32_t localRowsToWrite; unsigned int skipRows; if (localNumRows>getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex) { localRowsToWrite=localNumRows-getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex; skipRows=0; } else { localRowsToWrite=0; skipRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex-localNumRows; } cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut); for (unsigned int i=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex; i < localNumRows; i++) { WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, &bsOut); } // Append remote systems for remaining rows if (unlimitedRows==true || getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn>localRowsToWrite) { uint32_t remainingRows=0; uint32_t additionalRowsWritten=0; if (unlimitedRows==false) remainingRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn-localRowsToWrite; unsigned int remoteServerResponseIndex; for (remoteServerResponseIndex=0; remoteServerResponseIndex < getRequest->remoteServerResponses.Size(); remoteServerResponseIndex++) { BufferedGetResponseFromServer *bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponseIndex]; unsigned int cloudQueryRowIndex; for (cloudQueryRowIndex=0; cloudQueryRowIndex < bufferedGetResponseFromServer->queryResult.rowsReturned.Size(); cloudQueryRowIndex++) { if (skipRows>0) { --skipRows; continue; } bufferedGetResponseFromServer->queryResult.rowsReturned[cloudQueryRowIndex]->Serialize(true, &bsOut, this); ++additionalRowsWritten; if (unlimitedRows==false && --remainingRows==0) break; } if (unlimitedRows==false && remainingRows==0) break; } if (additionalRowsWritten>0) { BitSize_t curOffset = bsOut.GetWriteOffset(); bsOut.SetWriteOffset(bitStreamOffset); localRowsToWrite+=additionalRowsWritten; cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut); bsOut.SetWriteOffset(curOffset); } } SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, getRequest->requestingClient, false); } void CloudServer::ProcessCloudQueryWithAddresses( CloudServer::CloudQueryWithAddresses &cloudQueryWithAddresses, DataStructures::List &cloudDataResultList, DataStructures::List &cloudKeyResultList ) { CloudQueryResult cloudQueryResult; CloudQueryRow cloudQueryRow; unsigned int queryIndex; bool dataRepositoryExists; CloudDataList* cloudDataList; unsigned int keyDataIndex; // If specificSystems list empty, applies to all systems // For each of keys in cloudQueryWithAddresses, return that data, limited by maxRowsToReturn for (queryIndex=0; queryIndex < cloudQueryWithAddresses.cloudQuery.keys.Size(); queryIndex++) { const CloudKey &key = cloudQueryWithAddresses.cloudQuery.keys[queryIndex]; unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists); if (dataRepositoryExists) { cloudDataList=dataRepository[dataRepositoryIndex]; if (cloudDataList->uploaderCount>0) { // Return all keyData that was uploaded by specificSystems, or all if not specified if (cloudQueryWithAddresses.specificSystems.Size()>0) { // Return data for matching systems unsigned int specificSystemIndex; for (specificSystemIndex=0; specificSystemIndex < cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++) { bool uploaderExists; keyDataIndex = cloudDataList->keyData.GetIndexFromKey(cloudQueryWithAddresses.specificSystems[specificSystemIndex], &uploaderExists); if (uploaderExists) { cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_); cloudKeyResultList.Push(key, _FILE_AND_LINE_); } } } else { // Return data for all systems for (keyDataIndex=0; keyDataIndex < cloudDataList->keyData.Size(); keyDataIndex++) { cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_); cloudKeyResultList.Push(key, _FILE_AND_LINE_); } } } } } } void CloudServer::SendUploadedAndSubscribedKeysToServer( RakNetGUID systemAddress ) { RakNet::BitStream bsOut; bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND); bsOut.Write((MessageID)STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS); bsOut.WriteCasted(dataRepository.Size()); for (unsigned int i=0; i < dataRepository.Size(); i++) dataRepository[i]->key.Serialize(true, &bsOut); BitSize_t startOffset, endOffset; uint16_t subscribedKeyCount=0; startOffset=bsOut.GetWriteOffset(); bsOut.WriteCasted(subscribedKeyCount); for (unsigned int i=0; i < dataRepository.Size(); i++) { if (dataRepository[i]->subscriberCount>0) { dataRepository[i]->key.Serialize(true, &bsOut); subscribedKeyCount++; } } endOffset=bsOut.GetWriteOffset(); bsOut.SetWriteOffset(startOffset); bsOut.WriteCasted(subscribedKeyCount); bsOut.SetWriteOffset(endOffset); if (dataRepository.Size()>0 || subscribedKeyCount>0) SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, systemAddress, false); } void CloudServer::SendUploadedKeyToServers( CloudKey &cloudKey ) { RakNet::BitStream bsOut; bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND); bsOut.Write((MessageID)STSC_ADD_UPLOADED_KEY); cloudKey.Serialize(true, &bsOut); for (unsigned int i=0; i < remoteServers.Size(); i++) SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false); } void CloudServer::SendSubscribedKeyToServers( CloudKey &cloudKey ) { RakNet::BitStream bsOut; bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND); bsOut.Write((MessageID)STSC_ADD_SUBSCRIBED_KEY); cloudKey.Serialize(true, &bsOut); for (unsigned int i=0; i < remoteServers.Size(); i++) SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false); } void CloudServer::RemoveUploadedKeyFromServers( CloudKey &cloudKey ) { RakNet::BitStream bsOut; bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND); bsOut.Write((MessageID)STSC_REMOVE_UPLOADED_KEY); cloudKey.Serialize(true, &bsOut); for (unsigned int i=0; i < remoteServers.Size(); i++) SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false); } void CloudServer::RemoveSubscribedKeyFromServers( CloudKey &cloudKey ) { RakNet::BitStream bsOut; bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND); bsOut.Write((MessageID)STSC_REMOVE_SUBSCRIBED_KEY); cloudKey.Serialize(true, &bsOut); for (unsigned int i=0; i < remoteServers.Size(); i++) SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false); } void CloudServer::OnSendUploadedAndSubscribedKeysToServer( Packet *packet ) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)*2); bool objectExists; unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists); if (objectExists==false) return; RemoteServer *remoteServer = remoteServers[index]; remoteServer->gotSubscribedAndUploadedKeys=true; // unsigned int insertionIndex; bool alreadyHasKey; uint16_t numUploadedKeys, numSubscribedKeys; bsIn.Read(numUploadedKeys); for (uint16_t i=0; i < numUploadedKeys; i++) { CloudKey cloudKey; cloudKey.Serialize(false, &bsIn); // insertionIndex = remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey); if (alreadyHasKey==false) remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_); } bsIn.Read(numSubscribedKeys); for (uint16_t i=0; i < numSubscribedKeys; i++) { CloudKey cloudKey; cloudKey.Serialize(false, &bsIn); //insertionIndex = remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey); if (alreadyHasKey==false) remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_); } // Potential todo - join servers // For each uploaded key that we subscribe to, query it // For each subscribed key that we have, send it } void CloudServer::OnSendUploadedKeyToServers( Packet *packet ) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)*2); bool objectExists; unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists); if (objectExists==false) return; RemoteServer *remoteServer = remoteServers[index]; CloudKey cloudKey; cloudKey.Serialize(false, &bsIn); // unsigned int insertionIndex; bool alreadyHasKey; // insertionIndex = remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey); if (alreadyHasKey==false) remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_); } void CloudServer::OnSendSubscribedKeyToServers( Packet *packet ) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)*2); bool objectExists; unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists); if (objectExists==false) return; RemoteServer *remoteServer = remoteServers[index]; CloudKey cloudKey; cloudKey.Serialize(false, &bsIn); // unsigned int insertionIndex; bool alreadyHasKey; // insertionIndex = remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey); // Do not need to send current values, the Get request will do that as the Get request is sent at the same time if (alreadyHasKey==false) remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_); } void CloudServer::OnRemoveUploadedKeyFromServers( Packet *packet ) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)*2); bool objectExists; unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists); if (objectExists==false) return; RemoteServer *remoteServer = remoteServers[index]; CloudKey cloudKey; cloudKey.Serialize(false, &bsIn); unsigned int insertionIndex; bool alreadyHasKey; insertionIndex = remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey); if (alreadyHasKey==true) remoteServer->uploadedKeys.RemoveAtIndex(insertionIndex); } void CloudServer::OnRemoveSubscribedKeyFromServers( Packet *packet ) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)*2); bool objectExists; unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists); if (objectExists==false) return; RemoteServer *remoteServer = remoteServers[index]; CloudKey cloudKey; cloudKey.Serialize(false, &bsIn); unsigned int insertionIndex; bool alreadyHasKey; insertionIndex = remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey); if (alreadyHasKey==true) remoteServer->subscribedKeys.RemoveAtIndex(insertionIndex); } void CloudServer::OnServerDataChanged( Packet *packet ) { RakNet::BitStream bsIn(packet->data, packet->length, false); bsIn.IgnoreBytes(sizeof(MessageID)*2); bool objectExists; remoteServers.GetIndexFromKey(packet->guid,&objectExists); if (objectExists==false) return; // Find everyone that cares about this change and relay bool wasUpdated=false; bsIn.Read(wasUpdated); CloudQueryRow row; row.Serialize(false, &bsIn, this); CloudDataList *cloudDataList; bool dataRepositoryExists; unsigned int dataRepositoryIndex; dataRepositoryIndex = dataRepository.GetIndexFromKey(row.key, &dataRepositoryExists); if (dataRepositoryExists==false) { DeallocateRowData(row.data); return; } cloudDataList = dataRepository[dataRepositoryIndex]; CloudData *cloudData; bool keyDataListExists; unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(row.clientGUID, &keyDataListExists); if (keyDataListExists==true) { cloudData = cloudDataList->keyData[keyDataListIndex]; NotifyClientSubscribersOfDataChange(&row, cloudData->specificSubscribers, wasUpdated ); } NotifyClientSubscribersOfDataChange(&row, cloudDataList->nonSpecificSubscribers, wasUpdated ); DeallocateRowData(row.data); } void CloudServer::GetServersWithUploadedKeys( DataStructures::List &keys, DataStructures::List &remoteServersWithData ) { remoteServersWithData.Clear(true, _FILE_AND_LINE_); unsigned int i,j; for (i=0; i < remoteServers.Size(); i++) { remoteServers[i]->workingFlag=false; } for (i=0; i < remoteServers.Size(); i++) { if (remoteServers[i]->workingFlag==false) { if (remoteServers[i]->gotSubscribedAndUploadedKeys==false) { remoteServers[i]->workingFlag=true; remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_); } else { remoteServers[i]->workingFlag=false; for (j=0; j < keys.Size(); j++) { if (remoteServers[i]->workingFlag==false && remoteServers[i]->uploadedKeys.HasData(keys[j])) { remoteServers[i]->workingFlag=true; remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_); break; } } } } } } CloudServer::CloudDataList *CloudServer::GetOrAllocateCloudDataList(CloudKey key, bool *dataRepositoryExists, unsigned int &dataRepositoryIndex) { CloudDataList *cloudDataList; dataRepositoryIndex = dataRepository.GetIndexFromKey(key, dataRepositoryExists); if (*dataRepositoryExists==false) { cloudDataList = RakNet::OP_NEW(_FILE_AND_LINE_); cloudDataList->key=key; cloudDataList->uploaderCount=0; cloudDataList->subscriberCount=0; dataRepository.InsertAtIndex(cloudDataList,dataRepositoryIndex,_FILE_AND_LINE_); } else { cloudDataList = dataRepository[dataRepositoryIndex]; } return cloudDataList; } void CloudServer::UnsubscribeFromKey(RemoteCloudClient *remoteCloudClient, RakNetGUID remoteCloudClientGuid, unsigned int keySubscriberIndex, CloudKey &cloudKey, DataStructures::List &specificSystems) { KeySubscriberID* keySubscriberId = remoteCloudClient->subscribedKeys[keySubscriberIndex]; // If removing specific systems, but global subscription, fail if (keySubscriberId->specificSystemsSubscribedTo.Size()==0 && specificSystems.Size()>0) return; bool dataRepositoryExists; CloudDataList *cloudDataList; unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists); if (dataRepositoryExists==false) return; unsigned int i,j; cloudDataList = dataRepository[dataRepositoryIndex]; if (specificSystems.Size()==0) { // Remove global subscriber. If returns false, have to remove specific subscribers if (cloudDataList->RemoveSubscriber(remoteCloudClientGuid)==false) { for (i=0; i < keySubscriberId->specificSystemsSubscribedTo.Size(); i++) { RemoveSpecificSubscriber(keySubscriberId->specificSystemsSubscribedTo[i], cloudDataList, remoteCloudClientGuid); } } keySubscriberId->specificSystemsSubscribedTo.Clear(true, _FILE_AND_LINE_); } else { for (j=0; j < specificSystems.Size(); j++) { unsigned int specificSystemsSubscribedToIndex; bool hasSpecificSystemsSubscribedTo; specificSystemsSubscribedToIndex=keySubscriberId->specificSystemsSubscribedTo.GetIndexFromKey(specificSystems[j], &hasSpecificSystemsSubscribedTo); if (hasSpecificSystemsSubscribedTo) { RemoveSpecificSubscriber(specificSystems[j], cloudDataList, remoteCloudClientGuid); keySubscriberId->specificSystemsSubscribedTo.RemoveAtIndex(specificSystemsSubscribedToIndex); } } } if (keySubscriberId->specificSystemsSubscribedTo.Size()==0) { RakNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_); remoteCloudClient->subscribedKeys.RemoveAtIndex(keySubscriberIndex); } if (cloudDataList->subscriberCount==0) RemoveSubscribedKeyFromServers(cloudKey); if (cloudDataList->IsUnused()) { RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_); dataRepository.RemoveAtIndex(dataRepositoryIndex); } } void CloudServer::RemoveSpecificSubscriber(RakNetGUID specificSubscriber, CloudDataList *cloudDataList, RakNetGUID remoteCloudClientGuid) { bool keyDataListExists; unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSubscriber, &keyDataListExists); if (keyDataListExists==false) return; CloudData *cloudData = cloudDataList->keyData[keyDataListIndex]; bool hasSpecificSubscriber; unsigned int specificSubscriberIndex = cloudData->specificSubscribers.GetIndexFromKey(remoteCloudClientGuid, &hasSpecificSubscriber); if (hasSpecificSubscriber) { cloudData->specificSubscribers.RemoveAtIndex(specificSubscriberIndex); cloudDataList->subscriberCount--; if (cloudData->IsUnused()) { RakNet::OP_DELETE(cloudData, _FILE_AND_LINE_); cloudDataList->keyData.RemoveAtIndex(keyDataListIndex); } } } void CloudServer::ForceExternalSystemAddress(SystemAddress forcedAddress) { forceAddress=forcedAddress; } void CloudServer::AddQueryFilter(CloudServerQueryFilter* filter) { if (queryFilters.GetIndexOf(filter)!=(unsigned int) -1) return; queryFilters.Push(filter, _FILE_AND_LINE_); } void CloudServer::RemoveQueryFilter(CloudServerQueryFilter* filter) { unsigned int index; index = queryFilters.GetIndexOf(filter); if (index != (unsigned int) -1) queryFilters.RemoveAtIndex(index); } void CloudServer::RemoveAllQueryFilters(void) { queryFilters.Clear(true, _FILE_AND_LINE_); } #endif