FSTSyncEngineTestDriver.mm 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h"
  17. #import <FirebaseFirestore/FIRFirestoreErrors.h>
  18. #include <map>
  19. #include <memory>
  20. #include <string>
  21. #include <unordered_map>
  22. #include <utility>
  23. #include <vector>
  24. #import "Firestore/Example/Tests/SpecTests/FSTMockDatastore.h"
  25. #include "Firestore/core/include/firebase/firestore/firestore_errors.h"
  26. #include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h"
  27. #include "Firestore/core/src/firebase/firestore/auth/user.h"
  28. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  29. #include "Firestore/core/src/firebase/firestore/core/event_manager.h"
  30. #include "Firestore/core/src/firebase/firestore/core/listen_options.h"
  31. #include "Firestore/core/src/firebase/firestore/core/query_listener.h"
  32. #include "Firestore/core/src/firebase/firestore/core/sync_engine.h"
  33. #include "Firestore/core/src/firebase/firestore/local/index_free_query_engine.h"
  34. #include "Firestore/core/src/firebase/firestore/local/local_store.h"
  35. #include "Firestore/core/src/firebase/firestore/local/persistence.h"
  36. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  37. #include "Firestore/core/src/firebase/firestore/model/document_key.h"
  38. #include "Firestore/core/src/firebase/firestore/remote/remote_store.h"
  39. #include "Firestore/core/src/firebase/firestore/util/async_queue.h"
  40. #include "Firestore/core/src/firebase/firestore/util/delayed_constructor.h"
  41. #include "Firestore/core/src/firebase/firestore/util/error_apple.h"
  42. #include "Firestore/core/src/firebase/firestore/util/executor.h"
  43. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  44. #include "Firestore/core/src/firebase/firestore/util/log.h"
  45. #include "Firestore/core/src/firebase/firestore/util/status.h"
  46. #include "Firestore/core/src/firebase/firestore/util/statusor.h"
  47. #include "Firestore/core/src/firebase/firestore/util/string_format.h"
  48. #include "Firestore/core/src/firebase/firestore/util/to_string.h"
  49. #include "Firestore/core/test/firebase/firestore/testutil/async_testing.h"
  50. #include "absl/memory/memory.h"
  51. namespace testutil = firebase::firestore::testutil;
  52. using firebase::firestore::Error;
  53. using firebase::firestore::auth::EmptyCredentialsProvider;
  54. using firebase::firestore::auth::HashUser;
  55. using firebase::firestore::auth::User;
  56. using firebase::firestore::core::DatabaseInfo;
  57. using firebase::firestore::core::EventListener;
  58. using firebase::firestore::core::EventManager;
  59. using firebase::firestore::core::ListenOptions;
  60. using firebase::firestore::core::Query;
  61. using firebase::firestore::core::QueryListener;
  62. using firebase::firestore::core::SyncEngine;
  63. using firebase::firestore::core::ViewSnapshot;
  64. using firebase::firestore::local::IndexFreeQueryEngine;
  65. using firebase::firestore::local::LocalStore;
  66. using firebase::firestore::local::Persistence;
  67. using firebase::firestore::local::TargetData;
  68. using firebase::firestore::model::DatabaseId;
  69. using firebase::firestore::model::DocumentKey;
  70. using firebase::firestore::model::DocumentKeySet;
  71. using firebase::firestore::model::Mutation;
  72. using firebase::firestore::model::MutationResult;
  73. using firebase::firestore::model::OnlineState;
  74. using firebase::firestore::model::SnapshotVersion;
  75. using firebase::firestore::model::TargetId;
  76. using firebase::firestore::remote::MockDatastore;
  77. using firebase::firestore::remote::RemoteStore;
  78. using firebase::firestore::remote::WatchChange;
  79. using firebase::firestore::util::AsyncQueue;
  80. using firebase::firestore::util::DelayedConstructor;
  81. using firebase::firestore::util::Empty;
  82. using firebase::firestore::util::Executor;
  83. using firebase::firestore::util::MakeNSError;
  84. using firebase::firestore::util::MakeNSString;
  85. using firebase::firestore::util::MakeString;
  86. using firebase::firestore::util::Status;
  87. using firebase::firestore::util::StatusOr;
  88. using firebase::firestore::util::StringFormat;
  89. using firebase::firestore::util::TimerId;
  90. using firebase::firestore::util::ToString;
  91. NS_ASSUME_NONNULL_BEGIN
  92. @implementation FSTQueryEvent {
  93. absl::optional<ViewSnapshot> _maybeViewSnapshot;
  94. }
  95. - (const absl::optional<ViewSnapshot> &)viewSnapshot {
  96. return _maybeViewSnapshot;
  97. }
  98. - (void)setViewSnapshot:(absl::optional<ViewSnapshot>)snapshot {
  99. _maybeViewSnapshot = std::move(snapshot);
  100. }
  101. - (NSString *)description {
  102. // The Query is also included in the view, so we skip it.
  103. std::string str = StringFormat("<FSTQueryEvent: viewSnapshot=%s, error=%s>",
  104. ToString(_maybeViewSnapshot), self.error);
  105. return MakeNSString(str);
  106. }
  107. @end
  108. @implementation FSTOutstandingWrite {
  109. Mutation _write;
  110. }
  111. - (const model::Mutation &)write {
  112. return _write;
  113. }
  114. - (void)setWrite:(model::Mutation)write {
  115. _write = std::move(write);
  116. }
  117. @end
  118. @interface FSTSyncEngineTestDriver ()
  119. #pragma mark - Parts of the Firestore system that the spec tests need to control.
  120. #pragma mark - Data structures for holding events sent by the watch stream.
  121. /** A block for the FSTEventAggregator to use to report events to the test. */
  122. @property(nonatomic, strong, readonly) void (^eventHandler)(FSTQueryEvent *);
  123. /** The events received by our eventHandler and not yet retrieved via capturedEventsSinceLastCall */
  124. @property(nonatomic, strong, readonly) NSMutableArray<FSTQueryEvent *> *events;
  125. #pragma mark - Data structures for holding events sent by the write stream.
  126. /** The names of the documents that the client acknowledged during the current spec test step */
  127. @property(nonatomic, strong, readonly) NSMutableArray<NSString *> *acknowledgedDocs;
  128. /** The names of the documents that the client rejected during the current spec test step */
  129. @property(nonatomic, strong, readonly) NSMutableArray<NSString *> *rejectedDocs;
  130. @end
  131. @implementation FSTSyncEngineTestDriver {
  132. std::unique_ptr<Persistence> _persistence;
  133. std::unique_ptr<LocalStore> _localStore;
  134. std::unique_ptr<SyncEngine> _syncEngine;
  135. std::shared_ptr<AsyncQueue> _workerQueue;
  136. std::unique_ptr<RemoteStore> _remoteStore;
  137. DelayedConstructor<EventManager> _eventManager;
  138. // Set of active targets, keyed by target Id, mapped to corresponding resume token,
  139. // and list of `TargetData`.
  140. ActiveTargetMap _expectedActiveTargets;
  141. // ivar is declared as mutable.
  142. std::unordered_map<User, NSMutableArray<FSTOutstandingWrite *> *, HashUser> _outstandingWrites;
  143. DocumentKeySet _expectedActiveLimboDocuments;
  144. /** A dictionary for tracking the listens on queries. */
  145. std::unordered_map<Query, std::shared_ptr<QueryListener>> _queryListeners;
  146. DatabaseInfo _databaseInfo;
  147. User _currentUser;
  148. std::vector<std::shared_ptr<EventListener<Empty>>> _snapshotsInSyncListeners;
  149. std::shared_ptr<MockDatastore> _datastore;
  150. IndexFreeQueryEngine _queryEngine;
  151. int _snapshotsInSyncEvents;
  152. }
  153. - (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence {
  154. return [self initWithPersistence:std::move(persistence)
  155. initialUser:User::Unauthenticated()
  156. outstandingWrites:{}];
  157. }
  158. - (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence
  159. initialUser:(const User &)initialUser
  160. outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites {
  161. if (self = [super init]) {
  162. // Do a deep copy.
  163. for (const auto &pair : outstandingWrites) {
  164. _outstandingWrites[pair.first] = [pair.second mutableCopy];
  165. }
  166. _events = [NSMutableArray array];
  167. _databaseInfo = {DatabaseId{"project", "database"}, "persistence", "host", false};
  168. // Set up the sync engine and various stores.
  169. _workerQueue = testutil::AsyncQueueForTesting();
  170. _persistence = std::move(persistence);
  171. _localStore = absl::make_unique<LocalStore>(_persistence.get(), &_queryEngine, initialUser);
  172. _datastore = std::make_shared<MockDatastore>(_databaseInfo, _workerQueue,
  173. std::make_shared<EmptyCredentialsProvider>());
  174. _remoteStore = absl::make_unique<RemoteStore>(
  175. _localStore.get(), _datastore, _workerQueue,
  176. [self](OnlineState onlineState) { _syncEngine->HandleOnlineStateChange(onlineState); });
  177. ;
  178. _syncEngine = absl::make_unique<SyncEngine>(_localStore.get(), _remoteStore.get(), initialUser);
  179. _remoteStore->set_sync_engine(_syncEngine.get());
  180. _eventManager.Init(_syncEngine.get());
  181. // Set up internal event tracking for the spec tests.
  182. NSMutableArray<FSTQueryEvent *> *events = [NSMutableArray array];
  183. _eventHandler = ^(FSTQueryEvent *e) {
  184. [events addObject:e];
  185. };
  186. _events = events;
  187. _currentUser = initialUser;
  188. _acknowledgedDocs = [NSMutableArray array];
  189. _rejectedDocs = [NSMutableArray array];
  190. }
  191. return self;
  192. }
  193. - (const FSTOutstandingWriteQueues &)outstandingWrites {
  194. return _outstandingWrites;
  195. }
  196. - (const DocumentKeySet &)expectedActiveLimboDocuments {
  197. return _expectedActiveLimboDocuments;
  198. }
  199. - (void)setExpectedActiveLimboDocuments:(DocumentKeySet)docs {
  200. _expectedActiveLimboDocuments = std::move(docs);
  201. }
  202. - (void)drainQueue {
  203. _workerQueue->EnqueueBlocking([] {});
  204. }
  205. - (const User &)currentUser {
  206. return _currentUser;
  207. }
  208. - (void)incrementSnapshotsInSyncEvents {
  209. _snapshotsInSyncEvents += 1;
  210. }
  211. - (void)resetSnapshotsInSyncEvents {
  212. _snapshotsInSyncEvents = 0;
  213. }
  214. - (void)addSnapshotsInSyncListener {
  215. std::shared_ptr<EventListener<Empty>> eventListener = EventListener<Empty>::Create(
  216. [self](const StatusOr<Empty> &) { [self incrementSnapshotsInSyncEvents]; });
  217. _snapshotsInSyncListeners.push_back(eventListener);
  218. _eventManager->AddSnapshotsInSyncListener(eventListener);
  219. }
  220. - (void)removeSnapshotsInSyncListener {
  221. if (_snapshotsInSyncListeners.empty()) {
  222. HARD_FAIL("There must be a listener to unlisten to");
  223. } else {
  224. _eventManager->RemoveSnapshotsInSyncListener(_snapshotsInSyncListeners.back());
  225. _snapshotsInSyncListeners.pop_back();
  226. }
  227. }
  228. - (int)snapshotsInSyncEvents {
  229. return _snapshotsInSyncEvents;
  230. }
  231. - (void)start {
  232. _workerQueue->EnqueueBlocking([&] {
  233. _localStore->Start();
  234. _remoteStore->Start();
  235. });
  236. }
  237. - (void)validateUsage {
  238. // We could relax this if we found a reason to.
  239. HARD_ASSERT(self.events.count == 0, "You must clear all pending events by calling"
  240. " capturedEventsSinceLastCall before calling shutdown.");
  241. }
  242. - (void)shutdown {
  243. _workerQueue->EnqueueBlocking([&] {
  244. _remoteStore->Shutdown();
  245. _persistence->Shutdown();
  246. });
  247. }
  248. - (void)validateNextWriteSent:(const Mutation &)expectedWrite {
  249. std::vector<Mutation> request = _datastore->NextSentWrite();
  250. // Make sure the write went through the pipe like we expected it to.
  251. HARD_ASSERT(request.size() == 1, "Only single mutation requests are supported at the moment");
  252. const Mutation &actualWrite = request[0];
  253. HARD_ASSERT(actualWrite == expectedWrite,
  254. "Mock datastore received write %s but first outstanding mutation was %s",
  255. actualWrite.ToString(), expectedWrite.ToString());
  256. LOG_DEBUG("A write was sent: %s", actualWrite.ToString());
  257. }
  258. - (int)sentWritesCount {
  259. return _datastore->WritesSent();
  260. }
  261. - (int)writeStreamRequestCount {
  262. return _datastore->write_stream_request_count();
  263. }
  264. - (int)watchStreamRequestCount {
  265. return _datastore->watch_stream_request_count();
  266. }
  267. - (void)disableNetwork {
  268. _workerQueue->EnqueueBlocking([&] {
  269. // Make sure to execute all writes that are currently queued. This allows us
  270. // to assert on the total number of requests sent before shutdown.
  271. _remoteStore->FillWritePipeline();
  272. _remoteStore->DisableNetwork();
  273. });
  274. }
  275. - (void)enableNetwork {
  276. _workerQueue->EnqueueBlocking([&] { _remoteStore->EnableNetwork(); });
  277. }
  278. - (void)runTimer:(TimerId)timerID {
  279. _workerQueue->RunScheduledOperationsUntil(timerID);
  280. }
  281. - (void)changeUser:(const User &)user {
  282. _currentUser = user;
  283. _workerQueue->EnqueueBlocking([&] { _syncEngine->HandleCredentialChange(user); });
  284. }
  285. - (FSTOutstandingWrite *)receiveWriteAckWithVersion:(const SnapshotVersion &)commitVersion
  286. mutationResults:(std::vector<MutationResult>)mutationResults {
  287. FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject;
  288. [[self currentOutstandingWrites] removeObjectAtIndex:0];
  289. [self validateNextWriteSent:write.write];
  290. _workerQueue->EnqueueBlocking(
  291. [&] { _datastore->AckWrite(commitVersion, std::move(mutationResults)); });
  292. return write;
  293. }
  294. - (FSTOutstandingWrite *)receiveWriteError:(int)errorCode
  295. userInfo:(NSDictionary<NSString *, id> *)userInfo
  296. keepInQueue:(BOOL)keepInQueue {
  297. Status error{static_cast<Error>(errorCode), MakeString([userInfo description])};
  298. FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject;
  299. [self validateNextWriteSent:write.write];
  300. // If this is a permanent error, the mutation is not expected to be sent again so we remove it
  301. // from currentOutstandingWrites.
  302. if (!keepInQueue) {
  303. [[self currentOutstandingWrites] removeObjectAtIndex:0];
  304. }
  305. LOG_DEBUG("Failing a write.");
  306. _workerQueue->EnqueueBlocking([&] { _datastore->FailWrite(error); });
  307. return write;
  308. }
  309. - (NSArray<FSTQueryEvent *> *)capturedEventsSinceLastCall {
  310. NSArray<FSTQueryEvent *> *result = [self.events copy];
  311. [self.events removeAllObjects];
  312. return result;
  313. }
  314. - (NSArray<NSString *> *)capturedAcknowledgedWritesSinceLastCall {
  315. NSArray<NSString *> *result = [self.acknowledgedDocs copy];
  316. [self.acknowledgedDocs removeAllObjects];
  317. return result;
  318. }
  319. - (NSArray<NSString *> *)capturedRejectedWritesSinceLastCall {
  320. NSArray<NSString *> *result = [self.rejectedDocs copy];
  321. [self.rejectedDocs removeAllObjects];
  322. return result;
  323. }
  324. - (TargetId)addUserListenerWithQuery:(Query)query {
  325. // TODO(dimond): Allow customizing listen options in spec tests
  326. // TODO(dimond): Change spec tests to verify isFromCache on snapshots
  327. ListenOptions options = ListenOptions::FromIncludeMetadataChanges(true);
  328. auto listener = QueryListener::Create(
  329. query, options, [self, query](const StatusOr<ViewSnapshot> &maybe_snapshot) {
  330. FSTQueryEvent *event = [[FSTQueryEvent alloc] init];
  331. event.query = query;
  332. if (maybe_snapshot.ok()) {
  333. [event setViewSnapshot:maybe_snapshot.ValueOrDie()];
  334. } else {
  335. event.error = MakeNSError(maybe_snapshot.status());
  336. }
  337. [self.events addObject:event];
  338. });
  339. _queryListeners[query] = listener;
  340. TargetId targetID;
  341. _workerQueue->EnqueueBlocking([&] { targetID = _eventManager->AddQueryListener(listener); });
  342. return targetID;
  343. }
  344. - (void)removeUserListenerWithQuery:(const Query &)query {
  345. auto found_iter = _queryListeners.find(query);
  346. if (found_iter != _queryListeners.end()) {
  347. std::shared_ptr<QueryListener> listener = found_iter->second;
  348. _queryListeners.erase(found_iter);
  349. _workerQueue->EnqueueBlocking([&] { _eventManager->RemoveQueryListener(listener); });
  350. }
  351. }
  352. - (void)writeUserMutation:(Mutation)mutation {
  353. FSTOutstandingWrite *write = [[FSTOutstandingWrite alloc] init];
  354. write.write = mutation;
  355. [[self currentOutstandingWrites] addObject:write];
  356. LOG_DEBUG("sending a user write.");
  357. _workerQueue->EnqueueBlocking([=] {
  358. _syncEngine->WriteMutations({mutation}, [self, write, mutation](Status error) {
  359. LOG_DEBUG("A callback was called with error: %s", error.error_message());
  360. write.done = YES;
  361. write.error = error.ToNSError();
  362. NSString *mutationKey = MakeNSString(mutation.key().ToString());
  363. if (!error.ok()) {
  364. [self.rejectedDocs addObject:mutationKey];
  365. } else {
  366. [self.acknowledgedDocs addObject:mutationKey];
  367. }
  368. });
  369. });
  370. }
  371. - (void)receiveWatchChange:(const WatchChange &)change
  372. snapshotVersion:(const SnapshotVersion &)snapshot {
  373. _workerQueue->EnqueueBlocking([&] { _datastore->WriteWatchChange(change, snapshot); });
  374. }
  375. - (void)receiveWatchStreamError:(int)errorCode userInfo:(NSDictionary<NSString *, id> *)userInfo {
  376. Status error{static_cast<Error>(errorCode), MakeString([userInfo description])};
  377. _workerQueue->EnqueueBlocking([&] {
  378. _datastore->FailWatchStream(error);
  379. // Unlike web, stream should re-open synchronously (if we have any listeners)
  380. if (!_queryListeners.empty()) {
  381. HARD_ASSERT(_datastore->IsWatchStreamOpen(), "Watch stream is open");
  382. }
  383. });
  384. }
  385. - (std::map<DocumentKey, TargetId>)currentLimboDocuments {
  386. return _syncEngine->GetCurrentLimboDocuments();
  387. }
  388. - (const std::unordered_map<TargetId, TargetData> &)activeTargets {
  389. return _datastore->ActiveTargets();
  390. }
  391. - (const ActiveTargetMap &)expectedActiveTargets {
  392. return _expectedActiveTargets;
  393. }
  394. - (void)setExpectedActiveTargets:(ActiveTargetMap)targets {
  395. _expectedActiveTargets = std::move(targets);
  396. }
  397. #pragma mark - Helper Methods
  398. - (NSMutableArray<FSTOutstandingWrite *> *)currentOutstandingWrites {
  399. NSMutableArray<FSTOutstandingWrite *> *writes = _outstandingWrites[_currentUser];
  400. if (!writes) {
  401. writes = [NSMutableArray array];
  402. _outstandingWrites[_currentUser] = writes;
  403. }
  404. return writes;
  405. }
  406. @end
  407. NS_ASSUME_NONNULL_END