CloudServer.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. /*
  2. * Copyright (c) 2014, Oculus VR, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under the BSD-style license found in the
  6. * LICENSE file in the root directory of this source tree. An additional grant
  7. * of patent rights can be found in the PATENTS file in the same directory.
  8. *
  9. */
  10. /// \file CloudServer.h
  11. /// \brief Stores client data, and allows cross-server communication to retrieve this data
  12. /// \details TODO
  13. ///
  14. #include "NativeFeatureIncludes.h"
  15. #if _RAKNET_SUPPORT_CloudServer==1
  16. #ifndef __CLOUD_SERVER_H
  17. #define __CLOUD_SERVER_H
  18. #include "PluginInterface2.h"
  19. #include "RakMemoryOverride.h"
  20. #include "NativeTypes.h"
  21. #include "RakString.h"
  22. #include "DS_Hash.h"
  23. #include "CloudCommon.h"
  24. #include "DS_OrderedList.h"
  25. /// If the data is smaller than this value, an allocation is avoid. However, this value exists for every row
  26. #define CLOUD_SERVER_DATA_STACK_SIZE 32
  27. namespace RakNet
  28. {
  29. /// Forward declarations
  30. class RakPeerInterface;
  31. /// \brief Zero or more instances of CloudServerQueryFilter can be attached to CloudServer to restrict client queries
  32. /// All attached instances of CloudServerQueryFilter on each corresponding operation, from all directly connected clients
  33. /// If any attached instance returns false for a given operation, that operation is silently rejected
  34. /// \ingroup CLOUD_GROUP
  35. class RAK_DLL_EXPORT CloudServerQueryFilter
  36. {
  37. public:
  38. CloudServerQueryFilter() {}
  39. virtual ~CloudServerQueryFilter() {}
  40. /// Called when a local client wants to post data
  41. /// \return true to allow, false to reject
  42. virtual bool OnPostRequest(RakNetGUID clientGuid, SystemAddress clientAddress, CloudKey key, uint32_t dataLength, const char *data)=0;
  43. /// Called when a local client wants to release data that it has previously uploaded
  44. /// \return true to allow, false to reject
  45. virtual bool OnReleaseRequest(RakNetGUID clientGuid, SystemAddress clientAddress, DataStructures::List<CloudKey> &cloudKeys)=0;
  46. /// Called when a local client wants to query data
  47. /// If you return false, the client will get no response at all
  48. /// \return true to allow, false to reject
  49. virtual bool OnGetRequest(RakNetGUID clientGuid, SystemAddress clientAddress, CloudQuery &query, DataStructures::List<RakNetGUID> &specificSystems)=0;
  50. /// Called when a local client wants to stop getting updates for data
  51. /// If you return false, the client will keep getting updates for that data
  52. /// \return true to allow, false to reject
  53. virtual bool OnUnsubscribeRequest(RakNetGUID clientGuid, SystemAddress clientAddress, DataStructures::List<CloudKey> &cloudKeys, DataStructures::List<RakNetGUID> &specificSystems)=0;
  54. };
  55. /// \brief Stores client data, and allows cross-server communication to retrieve this data
  56. /// \ingroup CLOUD_GROUP
  57. class RAK_DLL_EXPORT CloudServer : public PluginInterface2, CloudAllocator
  58. {
  59. public:
  60. // GetInstance() and DestroyInstance(instance*)
  61. STATIC_FACTORY_DECLARATIONS(CloudServer)
  62. CloudServer();
  63. virtual ~CloudServer();
  64. /// \brief Max bytes a client can upload
  65. /// Data in excess of this value is silently ignored
  66. /// defaults to 0 (unlimited)
  67. /// \param[in] bytes Max bytes a client can upload. 0 means unlimited.
  68. void SetMaxUploadBytesPerClient(uint64_t bytes);
  69. /// \brief Max bytes returned by a download. If the number of bytes would exceed this amount, the returned list is truncated
  70. /// However, if this would result in no rows downloaded, then one row will be returned.
  71. /// \param[in] bytes Max bytes a client can download from a single Get(). 0 means unlimited.
  72. void SetMaxBytesPerDownload(uint64_t bytes);
  73. /// \brief Add a server, which is assumed to be connected in a fully connected mesh to all other servers and also running the CloudServer plugin
  74. /// The other system must also call AddServer before getting the subscription data, or it will be rejected.
  75. /// Sending a message telling the other system to call AddServer(), followed by calling AddServer() locally, would be sufficient for this to work.
  76. /// \note This sends subscription data to the other system, using RELIABLE_ORDERED on channel 0
  77. /// \param[in] systemIdentifier Identifier of the remote system
  78. void AddServer(RakNetGUID systemIdentifier);
  79. /// \brief Removes a server added through AddServer()
  80. /// \param[in] systemIdentifier Identifier of the remote system
  81. void RemoveServer(RakNetGUID systemIdentifier);
  82. /// Return list of servers added with AddServer()
  83. /// \param[out] remoteServers List of servers added
  84. void GetRemoteServers(DataStructures::List<RakNetGUID> &remoteServersOut);
  85. /// \brief Frees all memory. Does not remove query filters
  86. void Clear(void);
  87. /// \brief Report the specified SystemAddress to client queries, rather than what RakPeer reads.
  88. /// This is useful if you already know your public IP
  89. /// This only applies to future updates, so call it before updating to apply to all queries
  90. /// \param[in] forcedAddress The systmeAddress to return in queries. Use UNASSIGNED_SYSTEM_ADDRESS (default) to use what RakPeer returns
  91. void ForceExternalSystemAddress(SystemAddress forcedAddress);
  92. /// \brief Adds a callback called on each query. If all filters returns true for an operation, the operation is allowed.
  93. /// If the filter was already added, the function silently fails
  94. /// \param[in] filter An externally allocated instance of CloudServerQueryFilter. The instance must remain valid until it is removed with RemoveQueryFilter() or RemoveAllQueryFilters()
  95. void AddQueryFilter(CloudServerQueryFilter* filter);
  96. /// \brief Removes a callback added with AddQueryFilter()
  97. /// The instance is not deleted, only unreferenced. It is up to the user to delete the instance, if necessary
  98. /// \param[in] filter An externally allocated instance of CloudServerQueryFilter. The instance must remain valid until it is removed with RemoveQueryFilter() or RemoveAllQueryFilters()
  99. void RemoveQueryFilter(CloudServerQueryFilter* filter);
  100. /// \brief Removes all instances of CloudServerQueryFilter added with AddQueryFilter().
  101. /// The instances are not deleted, only unreferenced. It is up to the user to delete the instances, if necessary
  102. void RemoveAllQueryFilters(void);
  103. protected:
  104. virtual void Update(void);
  105. virtual PluginReceiveResult OnReceive(Packet *packet);
  106. virtual void OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason );
  107. virtual void OnRakPeerShutdown(void);
  108. virtual void OnPostRequest(Packet *packet);
  109. virtual void OnReleaseRequest(Packet *packet);
  110. virtual void OnGetRequest(Packet *packet);
  111. virtual void OnUnsubscribeRequest(Packet *packet);
  112. virtual void OnServerToServerGetRequest(Packet *packet);
  113. virtual void OnServerToServerGetResponse(Packet *packet);
  114. uint64_t maxUploadBytesPerClient, maxBytesPerDowload;
  115. // ----------------------------------------------------------------------------
  116. // For a given data key, quickly look up one or all systems that have uploaded
  117. // ----------------------------------------------------------------------------
  118. struct CloudData
  119. {
  120. CloudData() {}
  121. ~CloudData() {if (allocatedData) rakFree_Ex(allocatedData, _FILE_AND_LINE_);}
  122. bool IsUnused(void) const {return isUploaded==false && specificSubscribers.Size()==0;}
  123. void Clear(void) {if (dataPtr==allocatedData) rakFree_Ex(allocatedData, _FILE_AND_LINE_); allocatedData=0; dataPtr=0; dataLengthBytes=0; isUploaded=false;}
  124. unsigned char stackData[CLOUD_SERVER_DATA_STACK_SIZE];
  125. unsigned char *allocatedData; // Uses allocatedData instead of stackData if length of data exceeds CLOUD_SERVER_DATA_STACK_SIZE
  126. unsigned char *dataPtr; // Points to either stackData or allocatedData
  127. uint32_t dataLengthBytes;
  128. bool isUploaded;
  129. /// System address of server that is holding this data, and the client is connected to
  130. SystemAddress serverSystemAddress;
  131. /// System address of client that uploaded this data
  132. SystemAddress clientSystemAddress;
  133. /// RakNetGUID of server that is holding this data, and the client is connected to
  134. RakNetGUID serverGUID;
  135. /// RakNetGUID of client that uploaded this data
  136. RakNetGUID clientGUID;
  137. /// When the key data changes from this particular system, notify these subscribers
  138. /// This list mutually exclusive with CloudDataList::nonSpecificSubscribers
  139. DataStructures::OrderedList<RakNetGUID, RakNetGUID> specificSubscribers;
  140. };
  141. void WriteCloudQueryRowFromResultList(unsigned int i, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut);
  142. void WriteCloudQueryRowFromResultList(DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut);
  143. static int KeyDataPtrComp( const RakNetGUID &key, CloudData* const &data );
  144. struct CloudDataList
  145. {
  146. bool IsUnused(void) const {return keyData.Size()==0 && nonSpecificSubscribers.Size()==0;}
  147. bool IsNotUploaded(void) const {return uploaderCount==0;}
  148. bool RemoveSubscriber(RakNetGUID g) {
  149. bool objectExists;
  150. unsigned int index;
  151. index = nonSpecificSubscribers.GetIndexFromKey(g, &objectExists);
  152. if (objectExists)
  153. {
  154. subscriberCount--;
  155. nonSpecificSubscribers.RemoveAtIndex(index);
  156. return true;
  157. }
  158. return false;
  159. }
  160. unsigned int uploaderCount, subscriberCount;
  161. CloudKey key;
  162. // Data uploaded from or subscribed to for various systems
  163. DataStructures::OrderedList<RakNetGUID, CloudData*, CloudServer::KeyDataPtrComp> keyData;
  164. /// When the key data changes from any system, notify these subscribers
  165. /// This list mutually exclusive with CloudData::specificSubscribers
  166. DataStructures::OrderedList<RakNetGUID, RakNetGUID> nonSpecificSubscribers;
  167. };
  168. static int KeyDataListComp( const CloudKey &key, CloudDataList * const &data );
  169. DataStructures::OrderedList<CloudKey, CloudDataList*, CloudServer::KeyDataListComp> dataRepository;
  170. struct KeySubscriberID
  171. {
  172. CloudKey key;
  173. DataStructures::OrderedList<RakNetGUID, RakNetGUID> specificSystemsSubscribedTo;
  174. };
  175. static int KeySubscriberIDComp(const CloudKey &key, KeySubscriberID * const &data );
  176. // Remote systems
  177. struct RemoteCloudClient
  178. {
  179. bool IsUnused(void) const {return uploadedKeys.Size()==0 && subscribedKeys.Size()==0;}
  180. DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> uploadedKeys;
  181. DataStructures::OrderedList<CloudKey,KeySubscriberID*,CloudServer::KeySubscriberIDComp> subscribedKeys;
  182. uint64_t uploadedBytes;
  183. };
  184. DataStructures::Hash<RakNetGUID, RemoteCloudClient*, 2048, RakNetGUID::ToUint32> remoteSystems;
  185. // For a given user, release all subscribed and uploaded keys
  186. void ReleaseSystem(RakNetGUID clientAddress );
  187. // For a given user, release a set of keys
  188. void ReleaseKeys(RakNetGUID clientAddress, DataStructures::List<CloudKey> &keys );
  189. void NotifyClientSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated );
  190. void NotifyClientSubscribersOfDataChange( CloudQueryRow *row, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated );
  191. void NotifyServerSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, bool wasUpdated );
  192. struct RemoteServer
  193. {
  194. RakNetGUID serverAddress;
  195. // This server needs to know about these keys when they are updated or deleted
  196. DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> subscribedKeys;
  197. // This server has uploaded these keys, and needs to know about Get() requests
  198. DataStructures::OrderedList<CloudKey,CloudKey,CloudKeyComp> uploadedKeys;
  199. // Just for processing
  200. bool workingFlag;
  201. // If false, we don't know what keys they have yet, so send everything
  202. bool gotSubscribedAndUploadedKeys;
  203. };
  204. static int RemoteServerComp(const RakNetGUID &key, RemoteServer* const &data );
  205. DataStructures::OrderedList<RakNetGUID, RemoteServer*, CloudServer::RemoteServerComp> remoteServers;
  206. struct BufferedGetResponseFromServer
  207. {
  208. void Clear(CloudAllocator *allocator);
  209. RakNetGUID serverAddress;
  210. CloudQueryResult queryResult;
  211. bool gotResult;
  212. };
  213. struct CloudQueryWithAddresses
  214. {
  215. // Inputs
  216. CloudQuery cloudQuery;
  217. DataStructures::List<RakNetGUID> specificSystems;
  218. void Serialize(bool writeToBitstream, BitStream *bitStream);
  219. };
  220. static int BufferedGetResponseFromServerComp(const RakNetGUID &key, BufferedGetResponseFromServer* const &data );
  221. struct GetRequest
  222. {
  223. void Clear(CloudAllocator *allocator);
  224. bool AllRemoteServersHaveResponded(void) const;
  225. CloudQueryWithAddresses cloudQueryWithAddresses;
  226. // When request started. If takes too long for a response from another system, can abort remaining systems
  227. RakNet::Time requestStartTime;
  228. // Assigned by server that gets the request to identify response. See nextGetRequestId
  229. uint32_t requestId;
  230. RakNetGUID requestingClient;
  231. DataStructures::OrderedList<RakNetGUID, BufferedGetResponseFromServer*, CloudServer::BufferedGetResponseFromServerComp> remoteServerResponses;
  232. };
  233. static int GetRequestComp(const uint32_t &key, GetRequest* const &data );
  234. DataStructures::OrderedList<uint32_t, GetRequest*, CloudServer::GetRequestComp> getRequests;
  235. RakNet::Time nextGetRequestsCheck;
  236. uint32_t nextGetRequestId;
  237. void ProcessAndTransmitGetRequest(GetRequest *getRequest);
  238. void ProcessCloudQueryWithAddresses(
  239. CloudServer::CloudQueryWithAddresses &cloudQueryWithAddresses,
  240. DataStructures::List<CloudData*> &cloudDataResultList,
  241. DataStructures::List<CloudKey> &cloudKeyResultList
  242. );
  243. void SendUploadedAndSubscribedKeysToServer( RakNetGUID systemAddress );
  244. void SendUploadedKeyToServers( CloudKey &cloudKey );
  245. void SendSubscribedKeyToServers( CloudKey &cloudKey );
  246. void RemoveUploadedKeyFromServers( CloudKey &cloudKey );
  247. void RemoveSubscribedKeyFromServers( CloudKey &cloudKey );
  248. void OnSendUploadedAndSubscribedKeysToServer( Packet *packet );
  249. void OnSendUploadedKeyToServers( Packet *packet );
  250. void OnSendSubscribedKeyToServers( Packet *packet );
  251. void OnRemoveUploadedKeyFromServers( Packet *packet );
  252. void OnRemoveSubscribedKeyFromServers( Packet *packet );
  253. void OnServerDataChanged( Packet *packet );
  254. void GetServersWithUploadedKeys(
  255. DataStructures::List<CloudKey> &keys,
  256. DataStructures::List<RemoteServer*> &remoteServersWithData
  257. );
  258. CloudServer::CloudDataList *GetOrAllocateCloudDataList(CloudKey key, bool *dataRepositoryExists, unsigned int &dataRepositoryIndex);
  259. void UnsubscribeFromKey(RemoteCloudClient *remoteCloudClient, RakNetGUID remoteCloudClientGuid, unsigned int keySubscriberIndex, CloudKey &cloudKey, DataStructures::List<RakNetGUID> &specificSystems);
  260. void RemoveSpecificSubscriber(RakNetGUID specificSubscriber, CloudDataList *cloudDataList, RakNetGUID remoteCloudClientGuid);
  261. DataStructures::List<CloudServerQueryFilter*> queryFilters;
  262. SystemAddress forceAddress;
  263. };
  264. } // namespace RakNet
  265. #endif
  266. // Key subscription
  267. //
  268. // A given system can subscribe to one or more keys.
  269. // The subscription can be further be defined as only subscribing to keys uploaded by or changed by a given system.
  270. // It is possible to subscribe to keys not yet uploaded, or uploaded to another system
  271. //
  272. // Operations:
  273. //
  274. // 1. SubscribeToKey() - Get() operation with subscription
  275. // A. Add to key subscription list for the client, which contains a keyId / specificUploaderList pair
  276. // B. Send to remote servers that for this key, they should send us updates
  277. // C. (Done, get operation returns current values)
  278. //
  279. // 2. UpdateData() - Post() operation
  280. // A. Find all subscribers to this data, for the uploading system.
  281. // B. Send them the uploaded data
  282. // C. Find all servers that subscribe to this data
  283. // D. Send them the uploaded data
  284. //
  285. // 3. DeleteData() - Release() operation
  286. // A. Find all subscribers to this data, for the deleting system.
  287. // B. Inform them of the deletion
  288. // C. Find all servers that subscribe to this data
  289. // D. Inform them of the deletion
  290. //
  291. // 4. Unsubscribe()
  292. // A. Find this subscriber, and remove their subscription
  293. // B. If no one else is subscribing to this key for any system, notify remote servers we no longer need subscription updates
  294. //
  295. // Internal operations:
  296. //
  297. // 1. Find if any connected client has subscribed to a given key
  298. // A. This is used add and remove our subscription for this key to remote servers
  299. //
  300. // 2. For a given key and updating address, find all connected clients that care
  301. // A. First find connected clients that have subscribed to this key, regardless of address
  302. // B. Then find connected clients that have subscribed to this key for this particular address
  303. //
  304. // 3. Find all remote servers that have subscribed to a given key
  305. // A. This is so when the key is updated or deleted, we know who to send it to
  306. //
  307. // 4. For a given client (such as on disconnect), remove all records of their subscriptions
  308. #endif // _RAKNET_SUPPORT_*
粤ICP备19079148号