FSTSyncEngineTestDriver.mm 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h"
  17. #import <FirebaseFirestore/FIRFirestoreErrors.h>
  18. #include <map>
  19. #include <memory>
  20. #include <unordered_map>
  21. #include <utility>
  22. #import "Firestore/Source/Core/FSTEventManager.h"
  23. #import "Firestore/Source/Core/FSTQuery.h"
  24. #import "Firestore/Source/Core/FSTSyncEngine.h"
  25. #import "Firestore/Source/Local/FSTLocalStore.h"
  26. #import "Firestore/Source/Local/FSTPersistence.h"
  27. #import "Firestore/Source/Model/FSTMutation.h"
  28. #import "Firestore/Source/Remote/FSTWatchChange.h"
  29. #import "Firestore/Example/Tests/Core/FSTSyncEngine+Testing.h"
  30. #import "Firestore/Example/Tests/SpecTests/FSTMockDatastore.h"
  31. #include "Firestore/core/include/firebase/firestore/firestore_errors.h"
  32. #include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h"
  33. #include "Firestore/core/src/firebase/firestore/auth/user.h"
  34. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  35. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  36. #include "Firestore/core/src/firebase/firestore/model/document_key.h"
  37. #include "Firestore/core/src/firebase/firestore/util/async_queue.h"
  38. #include "Firestore/core/src/firebase/firestore/util/executor_libdispatch.h"
  39. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  40. #include "Firestore/core/src/firebase/firestore/util/log.h"
  41. #include "Firestore/core/src/firebase/firestore/util/status.h"
  42. #include "absl/memory/memory.h"
  43. using firebase::firestore::FirestoreErrorCode;
  44. using firebase::firestore::auth::EmptyCredentialsProvider;
  45. using firebase::firestore::auth::HashUser;
  46. using firebase::firestore::auth::User;
  47. using firebase::firestore::core::DatabaseInfo;
  48. using firebase::firestore::model::DatabaseId;
  49. using firebase::firestore::model::DocumentKey;
  50. using firebase::firestore::model::DocumentKeySet;
  51. using firebase::firestore::model::OnlineState;
  52. using firebase::firestore::model::SnapshotVersion;
  53. using firebase::firestore::model::TargetId;
  54. using firebase::firestore::remote::MockDatastore;
  55. using firebase::firestore::util::AsyncQueue;
  56. using firebase::firestore::util::TimerId;
  57. using firebase::firestore::util::ExecutorLibdispatch;
  58. using firebase::firestore::util::MakeString;
  59. using firebase::firestore::util::Status;
  60. NS_ASSUME_NONNULL_BEGIN
  61. @implementation FSTQueryEvent
  62. - (NSString *)description {
  63. // The Query is also included in the view, so we skip it.
  64. return [NSString stringWithFormat:@"<FSTQueryEvent: viewSnapshot=%@, error=%@>",
  65. self.viewSnapshot, self.error];
  66. }
  67. @end
  68. @implementation FSTOutstandingWrite
  69. @end
  70. @interface FSTSyncEngineTestDriver ()
  71. #pragma mark - Parts of the Firestore system that the spec tests need to control.
  72. @property(nonatomic, strong, readonly) FSTEventManager *eventManager;
  73. @property(nonatomic, strong, readonly) FSTRemoteStore *remoteStore;
  74. @property(nonatomic, strong, readonly) FSTLocalStore *localStore;
  75. @property(nonatomic, strong, readonly) FSTSyncEngine *syncEngine;
  76. @property(nonatomic, strong, readonly) id<FSTPersistence> persistence;
  77. #pragma mark - Data structures for holding events sent by the watch stream.
  78. /** A block for the FSTEventAggregator to use to report events to the test. */
  79. @property(nonatomic, strong, readonly) void (^eventHandler)(FSTQueryEvent *);
  80. /** The events received by our eventHandler and not yet retrieved via capturedEventsSinceLastCall */
  81. @property(nonatomic, strong, readonly) NSMutableArray<FSTQueryEvent *> *events;
  82. /** A dictionary for tracking the listens on queries. */
  83. @property(nonatomic, strong, readonly)
  84. NSMutableDictionary<FSTQuery *, FSTQueryListener *> *queryListeners;
  85. #pragma mark - Data structures for holding events sent by the write stream.
  86. /** The names of the documents that the client acknowledged during the current spec test step */
  87. @property(nonatomic, strong, readonly) NSMutableArray<NSString *> *acknowledgedDocs;
  88. /** The names of the documents that the client rejected during the current spec test step */
  89. @property(nonatomic, strong, readonly) NSMutableArray<NSString *> *rejectedDocs;
  90. @end
  91. @implementation FSTSyncEngineTestDriver {
  92. std::unique_ptr<AsyncQueue> _workerQueue;
  93. // ivar is declared as mutable.
  94. std::unordered_map<User, NSMutableArray<FSTOutstandingWrite *> *, HashUser> _outstandingWrites;
  95. DocumentKeySet _expectedLimboDocuments;
  96. DatabaseInfo _databaseInfo;
  97. User _currentUser;
  98. EmptyCredentialsProvider _credentialProvider;
  99. std::shared_ptr<MockDatastore> _datastore;
  100. }
  101. - (instancetype)initWithPersistence:(id<FSTPersistence>)persistence {
  102. return [self initWithPersistence:persistence
  103. initialUser:User::Unauthenticated()
  104. outstandingWrites:{}];
  105. }
  106. - (instancetype)initWithPersistence:(id<FSTPersistence>)persistence
  107. initialUser:(const User &)initialUser
  108. outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites {
  109. if (self = [super init]) {
  110. // Do a deep copy.
  111. for (const auto &pair : outstandingWrites) {
  112. _outstandingWrites[pair.first] = [pair.second mutableCopy];
  113. }
  114. _events = [NSMutableArray array];
  115. _databaseInfo = {DatabaseId{"project", "database"}, "persistence", "host", false};
  116. // Set up the sync engine and various stores.
  117. dispatch_queue_t queue =
  118. dispatch_queue_create("sync_engine_test_driver", DISPATCH_QUEUE_SERIAL);
  119. _workerQueue = absl::make_unique<AsyncQueue>(absl::make_unique<ExecutorLibdispatch>(queue));
  120. _persistence = persistence;
  121. _localStore = [[FSTLocalStore alloc] initWithPersistence:persistence initialUser:initialUser];
  122. _datastore =
  123. std::make_shared<MockDatastore>(_databaseInfo, _workerQueue.get(), &_credentialProvider);
  124. _remoteStore = [[FSTRemoteStore alloc] initWithLocalStore:_localStore
  125. datastore:_datastore
  126. workerQueue:_workerQueue.get()];
  127. _syncEngine = [[FSTSyncEngine alloc] initWithLocalStore:_localStore
  128. remoteStore:_remoteStore
  129. initialUser:initialUser];
  130. _remoteStore.syncEngine = _syncEngine;
  131. _eventManager = [FSTEventManager eventManagerWithSyncEngine:_syncEngine];
  132. _remoteStore.onlineStateDelegate = self;
  133. // Set up internal event tracking for the spec tests.
  134. NSMutableArray<FSTQueryEvent *> *events = [NSMutableArray array];
  135. _eventHandler = ^(FSTQueryEvent *e) {
  136. [events addObject:e];
  137. };
  138. _events = events;
  139. _queryListeners = [NSMutableDictionary dictionary];
  140. _expectedActiveTargets = [NSDictionary dictionary];
  141. _currentUser = initialUser;
  142. _acknowledgedDocs = [NSMutableArray array];
  143. _rejectedDocs = [NSMutableArray array];
  144. }
  145. return self;
  146. }
  147. - (const FSTOutstandingWriteQueues &)outstandingWrites {
  148. return _outstandingWrites;
  149. }
  150. - (const DocumentKeySet &)expectedLimboDocuments {
  151. return _expectedLimboDocuments;
  152. }
  153. - (void)setExpectedLimboDocuments:(DocumentKeySet)docs {
  154. _expectedLimboDocuments = std::move(docs);
  155. }
  156. - (void)drainQueue {
  157. _workerQueue->EnqueueBlocking([] {});
  158. }
  159. - (const User &)currentUser {
  160. return _currentUser;
  161. }
  162. - (void)applyChangedOnlineState:(OnlineState)onlineState {
  163. [self.syncEngine applyChangedOnlineState:onlineState];
  164. [self.eventManager applyChangedOnlineState:onlineState];
  165. }
  166. - (void)start {
  167. _workerQueue->EnqueueBlocking([&] {
  168. [self.localStore start];
  169. [self.remoteStore start];
  170. });
  171. }
  172. - (void)validateUsage {
  173. // We could relax this if we found a reason to.
  174. HARD_ASSERT(self.events.count == 0, "You must clear all pending events by calling"
  175. " capturedEventsSinceLastCall before calling shutdown.");
  176. }
  177. - (void)shutdown {
  178. _workerQueue->EnqueueBlocking([&] {
  179. [self.remoteStore shutdown];
  180. [self.persistence shutdown];
  181. });
  182. }
  183. - (void)validateNextWriteSent:(FSTMutation *)expectedWrite {
  184. NSArray<FSTMutation *> *request = _datastore->NextSentWrite();
  185. // Make sure the write went through the pipe like we expected it to.
  186. HARD_ASSERT(request.count == 1, "Only single mutation requests are supported at the moment");
  187. FSTMutation *actualWrite = request[0];
  188. HARD_ASSERT([actualWrite isEqual:expectedWrite],
  189. "Mock datastore received write %s but first outstanding mutation was %s", actualWrite,
  190. expectedWrite);
  191. LOG_DEBUG("A write was sent: %s", actualWrite);
  192. }
  193. - (int)sentWritesCount {
  194. return _datastore->WritesSent();
  195. }
  196. - (int)writeStreamRequestCount {
  197. return _datastore->write_stream_request_count();
  198. }
  199. - (int)watchStreamRequestCount {
  200. return _datastore->watch_stream_request_count();
  201. }
  202. - (void)disableNetwork {
  203. _workerQueue->EnqueueBlocking([&] {
  204. // Make sure to execute all writes that are currently queued. This allows us
  205. // to assert on the total number of requests sent before shutdown.
  206. [self.remoteStore fillWritePipeline];
  207. [self.remoteStore disableNetwork];
  208. });
  209. }
  210. - (void)enableNetwork {
  211. _workerQueue->EnqueueBlocking([&] { [self.remoteStore enableNetwork]; });
  212. }
  213. - (void)runTimer:(TimerId)timerID {
  214. _workerQueue->RunScheduledOperationsUntil(timerID);
  215. }
  216. - (void)changeUser:(const User &)user {
  217. _currentUser = user;
  218. _workerQueue->EnqueueBlocking([&] { [self.syncEngine credentialDidChangeWithUser:user]; });
  219. }
  220. - (FSTOutstandingWrite *)receiveWriteAckWithVersion:(const SnapshotVersion &)commitVersion
  221. mutationResults:
  222. (NSArray<FSTMutationResult *> *)mutationResults {
  223. FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject;
  224. [[self currentOutstandingWrites] removeObjectAtIndex:0];
  225. [self validateNextWriteSent:write.write];
  226. _workerQueue->EnqueueBlocking([&] { _datastore->AckWrite(commitVersion, mutationResults); });
  227. return write;
  228. }
  229. - (FSTOutstandingWrite *)receiveWriteError:(int)errorCode
  230. userInfo:(NSDictionary<NSString *, id> *)userInfo
  231. keepInQueue:(BOOL)keepInQueue {
  232. Status error{static_cast<FirestoreErrorCode>(errorCode), MakeString([userInfo description])};
  233. FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject;
  234. [self validateNextWriteSent:write.write];
  235. // If this is a permanent error, the mutation is not expected to be sent again so we remove it
  236. // from currentOutstandingWrites.
  237. if (!keepInQueue) {
  238. [[self currentOutstandingWrites] removeObjectAtIndex:0];
  239. }
  240. LOG_DEBUG("Failing a write.");
  241. _workerQueue->EnqueueBlocking([&] { _datastore->FailWrite(error); });
  242. return write;
  243. }
  244. - (NSArray<FSTQueryEvent *> *)capturedEventsSinceLastCall {
  245. NSArray<FSTQueryEvent *> *result = [self.events copy];
  246. [self.events removeAllObjects];
  247. return result;
  248. }
  249. - (NSArray<NSString *> *)capturedAcknowledgedWritesSinceLastCall {
  250. NSArray<NSString *> *result = [self.acknowledgedDocs copy];
  251. [self.acknowledgedDocs removeAllObjects];
  252. return result;
  253. }
  254. - (NSArray<NSString *> *)capturedRejectedWritesSinceLastCall {
  255. NSArray<NSString *> *result = [self.rejectedDocs copy];
  256. [self.rejectedDocs removeAllObjects];
  257. return result;
  258. }
  259. - (TargetId)addUserListenerWithQuery:(FSTQuery *)query {
  260. // TODO(dimond): Allow customizing listen options in spec tests
  261. // TODO(dimond): Change spec tests to verify isFromCache on snapshots
  262. FSTListenOptions *options = [[FSTListenOptions alloc] initWithIncludeQueryMetadataChanges:YES
  263. includeDocumentMetadataChanges:YES
  264. waitForSyncWhenOnline:NO];
  265. FSTQueryListener *listener = [[FSTQueryListener alloc]
  266. initWithQuery:query
  267. options:options
  268. viewSnapshotHandler:^(FSTViewSnapshot *_Nullable snapshot, NSError *_Nullable error) {
  269. FSTQueryEvent *event = [[FSTQueryEvent alloc] init];
  270. event.query = query;
  271. event.viewSnapshot = snapshot;
  272. event.error = error;
  273. [self.events addObject:event];
  274. }];
  275. self.queryListeners[query] = listener;
  276. TargetId targetID;
  277. _workerQueue->EnqueueBlocking([&] { targetID = [self.eventManager addListener:listener]; });
  278. return targetID;
  279. }
  280. - (void)removeUserListenerWithQuery:(FSTQuery *)query {
  281. FSTQueryListener *listener = self.queryListeners[query];
  282. [self.queryListeners removeObjectForKey:query];
  283. _workerQueue->EnqueueBlocking([&] { [self.eventManager removeListener:listener]; });
  284. }
  285. - (void)writeUserMutation:(FSTMutation *)mutation {
  286. FSTOutstandingWrite *write = [[FSTOutstandingWrite alloc] init];
  287. write.write = mutation;
  288. [[self currentOutstandingWrites] addObject:write];
  289. LOG_DEBUG("sending a user write.");
  290. _workerQueue->EnqueueBlocking([=] {
  291. [self.syncEngine writeMutations:@[ mutation ]
  292. completion:^(NSError *_Nullable error) {
  293. LOG_DEBUG("A callback was called with error: %s", error);
  294. write.done = YES;
  295. write.error = error;
  296. NSString *mutationKey =
  297. [NSString stringWithCString:mutation.key.ToString().c_str()
  298. encoding:[NSString defaultCStringEncoding]];
  299. if (error) {
  300. [self.rejectedDocs addObject:mutationKey];
  301. } else {
  302. [self.acknowledgedDocs addObject:mutationKey];
  303. }
  304. }];
  305. });
  306. }
  307. - (void)receiveWatchChange:(FSTWatchChange *)change
  308. snapshotVersion:(const SnapshotVersion &)snapshot {
  309. _workerQueue->EnqueueBlocking([&] { _datastore->WriteWatchChange(change, snapshot); });
  310. }
  311. - (void)receiveWatchStreamError:(int)errorCode userInfo:(NSDictionary<NSString *, id> *)userInfo {
  312. Status error{static_cast<FirestoreErrorCode>(errorCode), MakeString([userInfo description])};
  313. _workerQueue->EnqueueBlocking([&] {
  314. _datastore->FailWatchStream(error);
  315. // Unlike web, stream should re-open synchronously (if we have any listeners)
  316. if (self.queryListeners.count > 0) {
  317. HARD_ASSERT(_datastore->IsWatchStreamOpen(), "Watch stream is open");
  318. }
  319. });
  320. }
  321. - (std::map<DocumentKey, TargetId>)currentLimboDocuments {
  322. return [self.syncEngine currentLimboDocuments];
  323. }
  324. - (NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *)activeTargets {
  325. return _datastore->ActiveTargets();
  326. }
  327. #pragma mark - Helper Methods
  328. - (NSMutableArray<FSTOutstandingWrite *> *)currentOutstandingWrites {
  329. NSMutableArray<FSTOutstandingWrite *> *writes = _outstandingWrites[_currentUser];
  330. if (!writes) {
  331. writes = [NSMutableArray array];
  332. _outstandingWrites[_currentUser] = writes;
  333. }
  334. return writes;
  335. }
  336. @end
  337. NS_ASSUME_NONNULL_END