| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- /*
- * Copyright (c) 2014, Oculus VR, Inc.
- * All rights reserved.
- *
- * This source code is licensed under the BSD-style license found in the
- * LICENSE file in the root directory of this source tree. An additional grant
- * of patent rights can be found in the PATENTS file in the same directory.
- *
- */
- /// \file CloudServer.h
- /// \brief Stores client data, and allows cross-server communication to retrieve this data
- /// \details TODO
- ///
- #include "NativeFeatureIncludes.h"
- #if _RAKNET_SUPPORT_CloudServer==1
- #ifndef __CLOUD_SERVER_H
- #define __CLOUD_SERVER_H
- #include "PluginInterface2.h"
- #include "RakMemoryOverride.h"
- #include "NativeTypes.h"
- #include "RakString.h"
- #include "DS_Hash.h"
- #include "CloudCommon.h"
- #include "DS_OrderedList.h"
- /// If the data is smaller than this value, an allocation is avoid. However, this value exists for every row
- #define CLOUD_SERVER_DATA_STACK_SIZE 32
- namespace RakNet
- {
- /// Forward declarations
- class RakPeerInterface;
- /// \brief Zero or more instances of CloudServerQueryFilter can be attached to CloudServer to restrict client queries
- /// All attached instances of CloudServerQueryFilter on each corresponding operation, from all directly connected clients
- /// If any attached instance returns false for a given operation, that operation is silently rejected
- /// \ingroup CLOUD_GROUP
- class RAK_DLL_EXPORT CloudServerQueryFilter
- {
- public:
- CloudServerQueryFilter() {}
- virtual ~CloudServerQueryFilter() {}
- /// Called when a local client wants to post data
- /// \return true to allow, false to reject
- virtual bool OnPostRequest(RakNetGUID clientGuid, SystemAddress clientAddress, CloudKey key, uint32_t dataLength, const char *data)=0;
- /// Called when a local client wants to release data that it has previously uploaded
- /// \return true to allow, false to reject
- virtual bool OnReleaseRequest(RakNetGUID clientGuid, SystemAddress clientAddress, DataStructures::List<CloudKey> &cloudKeys)=0;
- /// Called when a local client wants to query data
- /// If you return false, the client will get no response at all
- /// \return true to allow, false to reject
- virtual bool OnGetRequest(RakNetGUID clientGuid, SystemAddress clientAddress, CloudQuery &query, DataStructures::List<RakNetGUID> &specificSystems)=0;
- /// Called when a local client wants to stop getting updates for data
- /// If you return false, the client will keep getting updates for that data
- /// \return true to allow, false to reject
- virtual bool OnUnsubscribeRequest(RakNetGUID clientGuid, SystemAddress clientAddress, DataStructures::List<CloudKey> &cloudKeys, DataStructures::List<RakNetGUID> &specificSystems)=0;
- };
- /// \brief Stores client data, and allows cross-server communication to retrieve this data
- /// \ingroup CLOUD_GROUP
- class RAK_DLL_EXPORT CloudServer : public PluginInterface2, CloudAllocator
- {
- public:
- // GetInstance() and DestroyInstance(instance*)
- STATIC_FACTORY_DECLARATIONS(CloudServer)
- CloudServer();
- virtual ~CloudServer();
- /// \brief Max bytes a client can upload
- /// Data in excess of this value is silently ignored
- /// defaults to 0 (unlimited)
- /// \param[in] bytes Max bytes a client can upload. 0 means unlimited.
- void SetMaxUploadBytesPerClient(uint64_t bytes);
- /// \brief Max bytes returned by a download. If the number of bytes would exceed this amount, the returned list is truncated
- /// However, if this would result in no rows downloaded, then one row will be returned.
- /// \param[in] bytes Max bytes a client can download from a single Get(). 0 means unlimited.
- void SetMaxBytesPerDownload(uint64_t bytes);
- /// \brief Add a server, which is assumed to be connected in a fully connected mesh to all other servers and also running the CloudServer plugin
- /// The other system must also call AddServer before getting the subscription data, or it will be rejected.
- /// Sending a message telling the other system to call AddServer(), followed by calling AddServer() locally, would be sufficient for this to work.
- /// \note This sends subscription data to the other system, using RELIABLE_ORDERED on channel 0
- /// \param[in] systemIdentifier Identifier of the remote system
- void AddServer(RakNetGUID systemIdentifier);
- /// \brief Removes a server added through AddServer()
- /// \param[in] systemIdentifier Identifier of the remote system
- void RemoveServer(RakNetGUID systemIdentifier);
- /// Return list of servers added with AddServer()
- /// \param[out] remoteServers List of servers added
- void GetRemoteServers(DataStructures::List<RakNetGUID> &remoteServersOut);
- /// \brief Frees all memory. Does not remove query filters
- void Clear(void);
- /// \brief Report the specified SystemAddress to client queries, rather than what RakPeer reads.
- /// This is useful if you already know your public IP
- /// This only applies to future updates, so call it before updating to apply to all queries
- /// \param[in] forcedAddress The systmeAddress to return in queries. Use UNASSIGNED_SYSTEM_ADDRESS (default) to use what RakPeer returns
- void ForceExternalSystemAddress(SystemAddress forcedAddress);
- /// \brief Adds a callback called on each query. If all filters returns true for an operation, the operation is allowed.
- /// If the filter was already added, the function silently fails
- /// \param[in] filter An externally allocated instance of CloudServerQueryFilter. The instance must remain valid until it is removed with RemoveQueryFilter() or RemoveAllQueryFilters()
- void AddQueryFilter(CloudServerQueryFilter* filter);
- /// \brief Removes a callback added with AddQueryFilter()
- /// The instance is not deleted, only unreferenced. It is up to the user to delete the instance, if necessary
- /// \param[in] filter An externally allocated instance of CloudServerQueryFilter. The instance must remain valid until it is removed with RemoveQueryFilter() or RemoveAllQueryFilters()
- void RemoveQueryFilter(CloudServerQueryFilter* filter);
- /// \brief Removes all instances of CloudServerQueryFilter added with AddQueryFilter().
- /// The instances are not deleted, only unreferenced. It is up to the user to delete the instances, if necessary
- void RemoveAllQueryFilters(void);
- protected:
- virtual void Update(void);
- virtual PluginReceiveResult OnReceive(Packet *packet);
- virtual void OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason );
- virtual void OnRakPeerShutdown(void);
- virtual void OnPostRequest(Packet *packet);
- virtual void OnReleaseRequest(Packet *packet);
- virtual void OnGetRequest(Packet *packet);
- virtual void OnUnsubscribeRequest(Packet *packet);
- virtual void OnServerToServerGetRequest(Packet *packet);
- virtual void OnServerToServerGetResponse(Packet *packet);
- uint64_t maxUploadBytesPerClient, maxBytesPerDowload;
- // ----------------------------------------------------------------------------
- // For a given data key, quickly look up one or all systems that have uploaded
- // ----------------------------------------------------------------------------
- struct CloudData
- {
- CloudData() {}
- ~CloudData() {if (allocatedData) rakFree_Ex(allocatedData, _FILE_AND_LINE_);}
- bool IsUnused(void) const {return isUploaded==false && specificSubscribers.Size()==0;}
- void Clear(void) {if (dataPtr==allocatedData) rakFree_Ex(allocatedData, _FILE_AND_LINE_); allocatedData=0; dataPtr=0; dataLengthBytes=0; isUploaded=false;}
- unsigned char stackData[CLOUD_SERVER_DATA_STACK_SIZE];
- unsigned char *allocatedData; // Uses allocatedData instead of stackData if length of data exceeds CLOUD_SERVER_DATA_STACK_SIZE
- unsigned char *dataPtr; // Points to either stackData or allocatedData
- uint32_t dataLengthBytes;
- bool isUploaded;
- /// System address of server that is holding this data, and the client is connected to
- SystemAddress serverSystemAddress;
- /// System address of client that uploaded this data
- SystemAddress clientSystemAddress;
- /// RakNetGUID of server that is holding this data, and the client is connected to
- RakNetGUID serverGUID;
- /// RakNetGUID of client that uploaded this data
- RakNetGUID clientGUID;
- /// When the key data changes from this particular system, notify these subscribers
- /// This list mutually exclusive with CloudDataList::nonSpecificSubscribers
- DataStructures::OrderedList<RakNetGUID, RakNetGUID> specificSubscribers;
- };
- void WriteCloudQueryRowFromResultList(unsigned int i, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut);
- void WriteCloudQueryRowFromResultList(DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut);
- static int KeyDataPtrComp( const RakNetGUID &key, CloudData* const &data );
- struct CloudDataList
- {
- bool IsUnused(void) const {return keyData.Size()==0 && nonSpecificSubscribers.Size()==0;}
- bool IsNotUploaded(void) const {return uploaderCount==0;}
- bool RemoveSubscriber(RakNetGUID g) {
- bool objectExists;
- unsigned int index;
- index = nonSpecificSubscribers.GetIndexFromKey(g, &objectExists);
- if (objectExists)
- {
- subscriberCount--;
- nonSpecificSubscribers.RemoveAtIndex(index);
- return true;
- }
- return false;
- }
- unsigned int uploaderCount, subscriberCount;
- CloudKey key;
- // Data uploaded from or subscribed to for various systems
- DataStructures::OrderedList<RakNetGUID, CloudData*, CloudServer::KeyDataPtrComp> keyData;
- /// When the key data changes from any system, notify these subscribers
- /// This list mutually exclusive with CloudData::specificSubscribers
- DataStructures::OrderedList<RakNetGUID, RakNetGUID> nonSpecificSubscribers;
- };
- static int KeyDataListComp( const CloudKey &key, CloudDataList * const &data );
- DataStructures::OrderedList<CloudKey, CloudDataList*, CloudServer::KeyDataListComp> dataRepository;
- struct KeySubscriberID
- {
- CloudKey key;
- DataStructures::OrderedList<RakNetGUID, RakNetGUID> specificSystemsSubscribedTo;
- };
- static int KeySubscriberIDComp(const CloudKey &key, KeySubscriberID * const &data );
- // Remote systems
- struct RemoteCloudClient
- {
- bool IsUnused(void) const {return uploadedKeys.Size()==0 && subscribedKeys.Size()==0;}
- DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> uploadedKeys;
- DataStructures::OrderedList<CloudKey,KeySubscriberID*,CloudServer::KeySubscriberIDComp> subscribedKeys;
- uint64_t uploadedBytes;
- };
- DataStructures::Hash<RakNetGUID, RemoteCloudClient*, 2048, RakNetGUID::ToUint32> remoteSystems;
- // For a given user, release all subscribed and uploaded keys
- void ReleaseSystem(RakNetGUID clientAddress );
- // For a given user, release a set of keys
- void ReleaseKeys(RakNetGUID clientAddress, DataStructures::List<CloudKey> &keys );
- void NotifyClientSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated );
- void NotifyClientSubscribersOfDataChange( CloudQueryRow *row, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated );
- void NotifyServerSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, bool wasUpdated );
- struct RemoteServer
- {
- RakNetGUID serverAddress;
- // This server needs to know about these keys when they are updated or deleted
- DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> subscribedKeys;
- // This server has uploaded these keys, and needs to know about Get() requests
- DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> uploadedKeys;
- // Just for processing
- bool workingFlag;
- // If false, we don't know what keys they have yet, so send everything
- bool gotSubscribedAndUploadedKeys;
- };
- static int RemoteServerComp(const RakNetGUID &key, RemoteServer* const &data );
- DataStructures::OrderedList<RakNetGUID, RemoteServer*, CloudServer::RemoteServerComp> remoteServers;
- struct BufferedGetResponseFromServer
- {
- void Clear(CloudAllocator *allocator);
- RakNetGUID serverAddress;
- CloudQueryResult queryResult;
- bool gotResult;
- };
- struct CloudQueryWithAddresses
- {
- // Inputs
- CloudQuery cloudQuery;
- DataStructures::List<RakNetGUID> specificSystems;
- void Serialize(bool writeToBitstream, BitStream *bitStream);
- };
- static int BufferedGetResponseFromServerComp(const RakNetGUID &key, BufferedGetResponseFromServer* const &data );
- struct GetRequest
- {
- void Clear(CloudAllocator *allocator);
- bool AllRemoteServersHaveResponded(void) const;
- CloudQueryWithAddresses cloudQueryWithAddresses;
- // When request started. If takes too long for a response from another system, can abort remaining systems
- RakNet::Time requestStartTime;
- // Assigned by server that gets the request to identify response. See nextGetRequestId
- uint32_t requestId;
- RakNetGUID requestingClient;
- DataStructures::OrderedList<RakNetGUID, BufferedGetResponseFromServer*, CloudServer::BufferedGetResponseFromServerComp> remoteServerResponses;
- };
- static int GetRequestComp(const uint32_t &key, GetRequest* const &data );
- DataStructures::OrderedList<uint32_t, GetRequest*, CloudServer::GetRequestComp> getRequests;
- RakNet::Time nextGetRequestsCheck;
- uint32_t nextGetRequestId;
- void ProcessAndTransmitGetRequest(GetRequest *getRequest);
- void ProcessCloudQueryWithAddresses(
- CloudServer::CloudQueryWithAddresses &cloudQueryWithAddresses,
- DataStructures::List<CloudData*> &cloudDataResultList,
- DataStructures::List<CloudKey> &cloudKeyResultList
- );
- void SendUploadedAndSubscribedKeysToServer( RakNetGUID systemAddress );
- void SendUploadedKeyToServers( CloudKey &cloudKey );
- void SendSubscribedKeyToServers( CloudKey &cloudKey );
- void RemoveUploadedKeyFromServers( CloudKey &cloudKey );
- void RemoveSubscribedKeyFromServers( CloudKey &cloudKey );
- void OnSendUploadedAndSubscribedKeysToServer( Packet *packet );
- void OnSendUploadedKeyToServers( Packet *packet );
- void OnSendSubscribedKeyToServers( Packet *packet );
- void OnRemoveUploadedKeyFromServers( Packet *packet );
- void OnRemoveSubscribedKeyFromServers( Packet *packet );
- void OnServerDataChanged( Packet *packet );
- void GetServersWithUploadedKeys(
- DataStructures::List<CloudKey> &keys,
- DataStructures::List<RemoteServer*> &remoteServersWithData
- );
- CloudServer::CloudDataList *GetOrAllocateCloudDataList(CloudKey key, bool *dataRepositoryExists, unsigned int &dataRepositoryIndex);
- void UnsubscribeFromKey(RemoteCloudClient *remoteCloudClient, RakNetGUID remoteCloudClientGuid, unsigned int keySubscriberIndex, CloudKey &cloudKey, DataStructures::List<RakNetGUID> &specificSystems);
- void RemoveSpecificSubscriber(RakNetGUID specificSubscriber, CloudDataList *cloudDataList, RakNetGUID remoteCloudClientGuid);
- DataStructures::List<CloudServerQueryFilter*> queryFilters;
- SystemAddress forceAddress;
- };
- } // namespace RakNet
- #endif
- // Key subscription
- //
- // A given system can subscribe to one or more keys.
- // The subscription can be further be defined as only subscribing to keys uploaded by or changed by a given system.
- // It is possible to subscribe to keys not yet uploaded, or uploaded to another system
- //
- // Operations:
- //
- // 1. SubscribeToKey() - Get() operation with subscription
- // A. Add to key subscription list for the client, which contains a keyId / specificUploaderList pair
- // B. Send to remote servers that for this key, they should send us updates
- // C. (Done, get operation returns current values)
- //
- // 2. UpdateData() - Post() operation
- // A. Find all subscribers to this data, for the uploading system.
- // B. Send them the uploaded data
- // C. Find all servers that subscribe to this data
- // D. Send them the uploaded data
- //
- // 3. DeleteData() - Release() operation
- // A. Find all subscribers to this data, for the deleting system.
- // B. Inform them of the deletion
- // C. Find all servers that subscribe to this data
- // D. Inform them of the deletion
- //
- // 4. Unsubscribe()
- // A. Find this subscriber, and remove their subscription
- // B. If no one else is subscribing to this key for any system, notify remote servers we no longer need subscription updates
- //
- // Internal operations:
- //
- // 1. Find if any connected client has subscribed to a given key
- // A. This is used add and remove our subscription for this key to remote servers
- //
- // 2. For a given key and updating address, find all connected clients that care
- // A. First find connected clients that have subscribed to this key, regardless of address
- // B. Then find connected clients that have subscribed to this key for this particular address
- //
- // 3. Find all remote servers that have subscribed to a given key
- // A. This is so when the key is updated or deleted, we know who to send it to
- //
- // 4. For a given client (such as on disconnect), remove all records of their subscriptions
- #endif // _RAKNET_SUPPORT_*
|