SQLite3ServerPlugin.cpp 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. /*
  2. * Copyright (c) 2014, Oculus VR, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under the BSD-style license found in the
  6. * LICENSE file in the root directory of this source tree. An additional grant
  7. * of patent rights can be found in the PATENTS file in the same directory.
  8. *
  9. */
  10. #include "SQLite3ServerPlugin.h"
  11. #include "MessageIdentifiers.h"
  12. #include "BitStream.h"
  13. #include "GetTime.h"
  14. using namespace RakNet;
  15. bool operator<( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() < cls.dbIdentifier;}
  16. bool operator>( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() > cls.dbIdentifier;}
  17. bool operator==( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() == cls.dbIdentifier;}
  18. int PerRowCallback(void *userArgument, int argc, char **argv, char **azColName)
  19. {
  20. SQLite3Table *outputTable = (SQLite3Table*)userArgument;
  21. unsigned int idx;
  22. if (outputTable->columnNames.Size()==0)
  23. {
  24. for (idx=0; idx < (unsigned int) argc; idx++)
  25. outputTable->columnNames.Push(azColName[idx], _FILE_AND_LINE_ );
  26. }
  27. SQLite3Row *row = RakNet::OP_NEW<SQLite3Row>(_FILE_AND_LINE_);
  28. outputTable->rows.Push(row,_FILE_AND_LINE_);
  29. for (idx=0; idx < (unsigned int) argc; idx++)
  30. {
  31. if (argv[idx])
  32. row->entries.Push(argv[idx], _FILE_AND_LINE_ );
  33. else
  34. row->entries.Push("", _FILE_AND_LINE_ );
  35. }
  36. return 0;
  37. }
  38. SQLite3ServerPlugin::SQLite3ServerPlugin()
  39. {
  40. }
  41. SQLite3ServerPlugin::~SQLite3ServerPlugin()
  42. {
  43. StopThreads();
  44. }
  45. bool SQLite3ServerPlugin::AddDBHandle(RakNet::RakString dbIdentifier, sqlite3 *dbHandle, bool dbAutoCreated)
  46. {
  47. if (dbIdentifier.IsEmpty())
  48. return false;
  49. unsigned int idx = dbHandles.GetInsertionIndex(dbIdentifier);
  50. if (idx==(unsigned int)-1)
  51. return false;
  52. NamedDBHandle ndbh;
  53. ndbh.dbHandle=dbHandle;
  54. ndbh.dbIdentifier=dbIdentifier;
  55. ndbh.dbAutoCreated=dbAutoCreated;
  56. ndbh.whenCreated=RakNet::GetTimeMS();
  57. dbHandles.InsertAtIndex(ndbh,idx,_FILE_AND_LINE_);
  58. #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
  59. if (sqlThreadPool.WasStarted()==false)
  60. sqlThreadPool.StartThreads(1,0);
  61. #endif
  62. return true;
  63. }
  64. void SQLite3ServerPlugin::RemoveDBHandle(RakNet::RakString dbIdentifier, bool alsoCloseConnection)
  65. {
  66. unsigned int idx = dbHandles.GetIndexOf(dbIdentifier);
  67. if (idx!=(unsigned int)-1)
  68. {
  69. if (alsoCloseConnection)
  70. {
  71. printf("Closed %s\n", dbIdentifier.C_String());
  72. sqlite3_close(dbHandles[idx].dbHandle);
  73. }
  74. dbHandles.RemoveAtIndex(idx,_FILE_AND_LINE_);
  75. #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
  76. if (dbHandles.GetSize()==0)
  77. StopThreads();
  78. #endif // SQLite3_STATEMENT_EXECUTE_THREADED
  79. }
  80. }
  81. void SQLite3ServerPlugin::RemoveDBHandle(sqlite3 *dbHandle, bool alsoCloseConnection)
  82. {
  83. unsigned int idx;
  84. for (idx=0; idx < dbHandles.GetSize(); idx++)
  85. {
  86. if (dbHandles[idx].dbHandle==dbHandle)
  87. {
  88. if (alsoCloseConnection)
  89. {
  90. printf("Closed %s\n", dbHandles[idx].dbIdentifier.C_String());
  91. sqlite3_close(dbHandles[idx].dbHandle);
  92. }
  93. dbHandles.RemoveAtIndex(idx,_FILE_AND_LINE_);
  94. #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
  95. if (dbHandles.GetSize()==0)
  96. StopThreads();
  97. #endif // SQLite3_STATEMENT_EXECUTE_THREADED
  98. return;
  99. }
  100. }
  101. }
  102. #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
  103. void SQLite3ServerPlugin::Update(void)
  104. {
  105. SQLExecThreadOutput output;
  106. while (sqlThreadPool.HasOutputFast() && sqlThreadPool.HasOutput())
  107. {
  108. output = sqlThreadPool.GetOutput();
  109. RakNet::BitStream bsOut((unsigned char*) output.data, output.length,false);
  110. SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,output.sender,false);
  111. rakFree_Ex(output.data,_FILE_AND_LINE_);
  112. }
  113. }
  114. SQLite3ServerPlugin::SQLExecThreadOutput ExecStatementThread(SQLite3ServerPlugin::SQLExecThreadInput threadInput, bool *returnOutput, void* perThreadData)
  115. {
  116. unsigned int queryId;
  117. RakNet::RakString dbIdentifier;
  118. RakNet::RakString inputStatement;
  119. RakNet::BitStream bsIn((unsigned char*) threadInput.data, threadInput.length, false);
  120. bsIn.IgnoreBytes(sizeof(MessageID));
  121. bsIn.Read(queryId);
  122. bsIn.Read(dbIdentifier);
  123. bsIn.Read(inputStatement);
  124. // bool isRequest;
  125. // bsIn.Read(isRequest);
  126. bsIn.IgnoreBits(1);
  127. char *errorMsg;
  128. RakNet::RakString errorMsgStr;
  129. SQLite3Table outputTable;
  130. sqlite3_exec(threadInput.dbHandle, inputStatement.C_String(), PerRowCallback, &outputTable, &errorMsg);
  131. if (errorMsg)
  132. {
  133. errorMsgStr=errorMsg;
  134. sqlite3_free(errorMsg);
  135. }
  136. RakNet::BitStream bsOut;
  137. bsOut.Write((MessageID)ID_SQLite3_EXEC);
  138. bsOut.Write(queryId);
  139. bsOut.Write(dbIdentifier);
  140. bsOut.Write(inputStatement);
  141. bsOut.Write(false);
  142. bsOut.Write(errorMsgStr);
  143. outputTable.Serialize(&bsOut);
  144. // Free input data
  145. rakFree_Ex(threadInput.data,_FILE_AND_LINE_);
  146. // Copy to output data
  147. SQLite3ServerPlugin::SQLExecThreadOutput threadOutput;
  148. threadOutput.data=(char*) rakMalloc_Ex(bsOut.GetNumberOfBytesUsed(),_FILE_AND_LINE_);
  149. memcpy(threadOutput.data,bsOut.GetData(),bsOut.GetNumberOfBytesUsed());
  150. threadOutput.length=bsOut.GetNumberOfBytesUsed();
  151. threadOutput.sender=threadInput.sender;
  152. // SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
  153. *returnOutput=true;
  154. return threadOutput;
  155. }
  156. #endif // SQLite3_STATEMENT_EXECUTE_THREADED
  157. PluginReceiveResult SQLite3ServerPlugin::OnReceive(Packet *packet)
  158. {
  159. switch (packet->data[0])
  160. {
  161. case ID_SQLite3_EXEC:
  162. {
  163. unsigned int queryId;
  164. RakNet::RakString dbIdentifier;
  165. RakNet::RakString inputStatement;
  166. RakNet::BitStream bsIn(packet->data, packet->length, false);
  167. bsIn.IgnoreBytes(sizeof(MessageID));
  168. bsIn.Read(queryId);
  169. bsIn.Read(dbIdentifier);
  170. bsIn.Read(inputStatement);
  171. bool isRequest;
  172. bsIn.Read(isRequest);
  173. if (isRequest)
  174. {
  175. // Server code
  176. unsigned int idx = dbHandles.GetIndexOf(dbIdentifier);
  177. if (idx==-1)
  178. {
  179. RakNet::BitStream bsOut;
  180. bsOut.Write((MessageID)ID_SQLite3_UNKNOWN_DB);
  181. bsOut.Write(queryId);
  182. bsOut.Write(dbIdentifier);
  183. bsOut.Write(inputStatement);
  184. SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
  185. }
  186. else
  187. {
  188. #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
  189. // Push to the thread
  190. SQLExecThreadInput input;
  191. input.data=(char*) rakMalloc_Ex(packet->length, _FILE_AND_LINE_);
  192. memcpy(input.data,packet->data,packet->length);
  193. input.dbHandle=dbHandles[idx].dbHandle;
  194. input.length=packet->length;
  195. input.sender=packet->systemAddress;
  196. sqlThreadPool.AddInput(ExecStatementThread, input);
  197. #else
  198. char *errorMsg;
  199. RakNet::RakString errorMsgStr;
  200. SQLite3Table outputTable;
  201. sqlite3_exec(dbHandles[idx].dbHandle, inputStatement.C_String(), PerRowCallback, &outputTable, &errorMsg);
  202. if (errorMsg)
  203. {
  204. errorMsgStr=errorMsg;
  205. sqlite3_free(errorMsg);
  206. }
  207. RakNet::BitStream bsOut;
  208. bsOut.Write((MessageID)ID_SQLite3_EXEC);
  209. bsOut.Write(queryId);
  210. bsOut.Write(dbIdentifier);
  211. bsOut.Write(inputStatement);
  212. bsOut.Write(false);
  213. bsOut.Write(errorMsgStr);
  214. outputTable.Serialize(&bsOut);
  215. SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
  216. #endif
  217. }
  218. }
  219. return RR_STOP_PROCESSING_AND_DEALLOCATE;
  220. }
  221. break;
  222. }
  223. return RR_CONTINUE_PROCESSING;
  224. }
  225. void SQLite3ServerPlugin::OnAttach(void)
  226. {
  227. }
  228. void SQLite3ServerPlugin::OnDetach(void)
  229. {
  230. StopThreads();
  231. }
  232. void SQLite3ServerPlugin::StopThreads(void)
  233. {
  234. #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
  235. sqlThreadPool.StopThreads();
  236. unsigned int i;
  237. for (i=0; i < sqlThreadPool.InputSize(); i++)
  238. {
  239. RakNet::OP_DELETE(sqlThreadPool.GetInputAtIndex(i).data, _FILE_AND_LINE_);
  240. }
  241. sqlThreadPool.ClearInput();
  242. for (i=0; i < sqlThreadPool.OutputSize(); i++)
  243. {
  244. RakNet::OP_DELETE(sqlThreadPool.GetOutputAtIndex(i).data, _FILE_AND_LINE_);
  245. }
  246. sqlThreadPool.ClearOutput();
  247. #endif
  248. }
粤ICP备19079148号