FSTSyncEngineTestDriver.mm 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h"
  17. #import <FirebaseFirestore/FIRFirestoreErrors.h>
  18. #include <cstddef>
  19. #include <map>
  20. #include <memory>
  21. #include <string>
  22. #include <unordered_map>
  23. #include <utility>
  24. #include <vector>
  25. #import "Firestore/Example/Tests/SpecTests/FSTMockDatastore.h"
  26. #include "Firestore/core/include/firebase/firestore/firestore_errors.h"
  27. #include "Firestore/core/src/api/load_bundle_task.h"
  28. #include "Firestore/core/src/bundle/bundle_reader.h"
  29. #include "Firestore/core/src/core/database_info.h"
  30. #include "Firestore/core/src/core/event_manager.h"
  31. #include "Firestore/core/src/core/listen_options.h"
  32. #include "Firestore/core/src/core/pipeline_util.h" // Added for ToRealtimePipeline
  33. #include "Firestore/core/src/core/query_listener.h"
  34. #include "Firestore/core/src/core/sync_engine.h"
  35. #include "Firestore/core/src/credentials/empty_credentials_provider.h"
  36. #include "Firestore/core/src/credentials/user.h"
  37. #include "Firestore/core/src/local/local_store.h"
  38. #include "Firestore/core/src/local/lru_garbage_collector.h"
  39. #include "Firestore/core/src/local/memory_lru_reference_delegate.h"
  40. #include "Firestore/core/src/local/persistence.h"
  41. #include "Firestore/core/src/local/query_engine.h"
  42. #include "Firestore/core/src/model/database_id.h"
  43. #include "Firestore/core/src/model/document_key.h"
  44. #include "Firestore/core/src/remote/firebase_metadata_provider.h"
  45. #include "Firestore/core/src/remote/firebase_metadata_provider_noop.h"
  46. #include "Firestore/core/src/remote/remote_store.h"
  47. #include "Firestore/core/src/remote/serializer.h" // Added for RealtimePipeline constructor
  48. #include "Firestore/core/src/util/async_queue.h"
  49. #include "Firestore/core/src/util/delayed_constructor.h"
  50. #include "Firestore/core/src/util/error_apple.h"
  51. #include "Firestore/core/src/util/executor.h"
  52. #include "Firestore/core/src/util/hard_assert.h"
  53. #include "Firestore/core/src/util/log.h"
  54. #include "Firestore/core/src/util/status.h"
  55. #include "Firestore/core/src/util/statusor.h"
  56. #include "Firestore/core/src/util/string_format.h"
  57. #include "Firestore/core/src/util/to_string.h"
  58. #include "Firestore/core/test/unit/remote/create_noop_connectivity_monitor.h"
  59. #include "Firestore/core/test/unit/testutil/async_testing.h"
  60. #include "absl/memory/memory.h"
  61. using firebase::firestore::Error;
  62. using firebase::firestore::api::LoadBundleTask;
  63. using firebase::firestore::bundle::BundleReader;
  64. using firebase::firestore::core::DatabaseInfo;
  65. using firebase::firestore::core::EventListener;
  66. using firebase::firestore::core::EventManager;
  67. using firebase::firestore::core::ListenOptions;
  68. using firebase::firestore::core::Query;
  69. using firebase::firestore::core::QueryListener;
  70. using firebase::firestore::core::SyncEngine;
  71. using firebase::firestore::core::ViewSnapshot;
  72. using firebase::firestore::credentials::EmptyAppCheckCredentialsProvider;
  73. using firebase::firestore::credentials::EmptyAuthCredentialsProvider;
  74. using firebase::firestore::credentials::HashUser;
  75. using firebase::firestore::credentials::User;
  76. using firebase::firestore::local::LocalStore;
  77. using firebase::firestore::local::LruDelegate;
  78. using firebase::firestore::local::LruParams;
  79. using firebase::firestore::local::Persistence;
  80. using firebase::firestore::local::QueryEngine;
  81. using firebase::firestore::local::TargetData;
  82. using firebase::firestore::model::DatabaseId;
  83. using firebase::firestore::model::DocumentKey;
  84. using firebase::firestore::model::DocumentKeySet;
  85. using firebase::firestore::model::Mutation;
  86. using firebase::firestore::model::MutationResult;
  87. using firebase::firestore::model::OnlineState;
  88. using firebase::firestore::model::SnapshotVersion;
  89. using firebase::firestore::model::TargetId;
  90. using firebase::firestore::remote::ConnectivityMonitor;
  91. using firebase::firestore::remote::CreateFirebaseMetadataProviderNoOp;
  92. using firebase::firestore::remote::CreateNoOpConnectivityMonitor;
  93. using firebase::firestore::remote::FirebaseMetadataProvider;
  94. using firebase::firestore::remote::MockDatastore;
  95. using firebase::firestore::remote::RemoteStore;
  96. using firebase::firestore::remote::WatchChange;
  97. using firebase::firestore::testutil::AsyncQueueForTesting;
  98. using firebase::firestore::util::AsyncQueue;
  99. using firebase::firestore::util::DelayedConstructor;
  100. using firebase::firestore::util::Empty;
  101. using firebase::firestore::util::Executor;
  102. using firebase::firestore::util::MakeNSError;
  103. using firebase::firestore::util::MakeNSString;
  104. using firebase::firestore::util::MakeString;
  105. using firebase::firestore::util::Status;
  106. using firebase::firestore::util::StatusOr;
  107. using firebase::firestore::util::StringFormat;
  108. using firebase::firestore::util::TimerId;
  109. using firebase::firestore::util::ToString;
  110. NS_ASSUME_NONNULL_BEGIN
  111. @implementation FSTQueryEvent {
  112. absl::optional<ViewSnapshot> _maybeViewSnapshot;
  113. }
  114. - (const absl::optional<ViewSnapshot> &)viewSnapshot {
  115. return _maybeViewSnapshot;
  116. }
  117. - (void)setViewSnapshot:(absl::optional<ViewSnapshot>)snapshot {
  118. _maybeViewSnapshot = std::move(snapshot);
  119. }
  120. - (NSString *)description {
  121. // The Query is also included in the view, so we skip it.
  122. std::string str = StringFormat("<FSTQueryEvent: viewSnapshot=%s, error=%s>",
  123. ToString(_maybeViewSnapshot), self.error);
  124. return MakeNSString(str);
  125. }
  126. @end
  127. @implementation FSTOutstandingWrite {
  128. Mutation _write;
  129. }
  130. - (const model::Mutation &)write {
  131. return _write;
  132. }
  133. - (void)setWrite:(model::Mutation)write {
  134. _write = std::move(write);
  135. }
  136. @end
  137. @interface FSTSyncEngineTestDriver ()
  138. #pragma mark - Parts of the Firestore system that the spec tests need to control.
  139. #pragma mark - Data structures for holding events sent by the watch stream.
  140. /** A block for the FSTEventAggregator to use to report events to the test. */
  141. @property(nonatomic, strong, readonly) void (^eventHandler)(FSTQueryEvent *);
  142. /** The events received by our eventHandler and not yet retrieved via capturedEventsSinceLastCall */
  143. @property(nonatomic, strong, readonly) NSMutableArray<FSTQueryEvent *> *events;
  144. #pragma mark - Data structures for holding events sent by the write stream.
  145. /** The names of the documents that the client acknowledged during the current spec test step */
  146. @property(nonatomic, strong, readonly) NSMutableArray<NSString *> *acknowledgedDocs;
  147. /** The names of the documents that the client rejected during the current spec test step */
  148. @property(nonatomic, strong, readonly) NSMutableArray<NSString *> *rejectedDocs;
  149. @end
  150. @implementation FSTSyncEngineTestDriver {
  151. size_t _maxConcurrentLimboResolutions;
  152. std::unique_ptr<Persistence> _persistence;
  153. LruDelegate *_lru_delegate;
  154. std::unique_ptr<LocalStore> _localStore;
  155. std::unique_ptr<SyncEngine> _syncEngine;
  156. std::shared_ptr<AsyncQueue> _workerQueue;
  157. std::unique_ptr<RemoteStore> _remoteStore;
  158. std::unique_ptr<ConnectivityMonitor> _connectivityMonitor;
  159. std::unique_ptr<FirebaseMetadataProvider> _firebaseMetadataProvider;
  160. DelayedConstructor<EventManager> _eventManager;
  161. // Set of active targets, keyed by target Id, mapped to corresponding resume token,
  162. // and list of `TargetData`.
  163. ActiveTargetMap _expectedActiveTargets;
  164. // ivar is declared as mutable.
  165. std::unordered_map<User, NSMutableArray<FSTOutstandingWrite *> *, HashUser> _outstandingWrites;
  166. DocumentKeySet _expectedActiveLimboDocuments;
  167. DocumentKeySet _expectedEnqueuedLimboDocuments;
  168. /** A dictionary for tracking the listens on queries. */
  169. std::unordered_map<core::QueryOrPipeline, std::shared_ptr<QueryListener>> _queryListeners;
  170. DatabaseInfo _databaseInfo;
  171. User _currentUser;
  172. std::vector<std::shared_ptr<EventListener<Empty>>> _snapshotsInSyncListeners;
  173. std::shared_ptr<MockDatastore> _datastore;
  174. QueryEngine _queryEngine;
  175. int _snapshotsInSyncEvents;
  176. int _waitForPendingWritesEvents;
  177. }
  178. - (instancetype)initWithPersistence:(std::unique_ptr<Persistence>)persistence
  179. eagerGC:(BOOL)eagerGC
  180. convertToPipeline:(BOOL)convertToPipeline
  181. initialUser:(const User &)initialUser
  182. outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites
  183. maxConcurrentLimboResolutions:(size_t)maxConcurrentLimboResolutions {
  184. if (self = [super init]) {
  185. _convertToPipeline = convertToPipeline; // Store the flag
  186. _maxConcurrentLimboResolutions = maxConcurrentLimboResolutions;
  187. // Do a deep copy.
  188. for (const auto &pair : outstandingWrites) {
  189. _outstandingWrites[pair.first] = [pair.second mutableCopy];
  190. }
  191. _events = [NSMutableArray array];
  192. _databaseInfo = {DatabaseId{"test-project", "(default)"}, "persistence", "host", false};
  193. // Set up the sync engine and various stores.
  194. _workerQueue = AsyncQueueForTesting();
  195. _persistence = std::move(persistence);
  196. _localStore = absl::make_unique<LocalStore>(_persistence.get(), &_queryEngine, initialUser);
  197. if (!eagerGC) {
  198. _lru_delegate = static_cast<local::LruDelegate *>(_persistence->reference_delegate());
  199. }
  200. _connectivityMonitor = CreateNoOpConnectivityMonitor();
  201. _firebaseMetadataProvider = CreateFirebaseMetadataProviderNoOp();
  202. _datastore = std::make_shared<MockDatastore>(
  203. _databaseInfo, _workerQueue, std::make_shared<EmptyAuthCredentialsProvider>(),
  204. std::make_shared<EmptyAppCheckCredentialsProvider>(), _connectivityMonitor.get(),
  205. _firebaseMetadataProvider.get());
  206. _remoteStore = absl::make_unique<RemoteStore>(
  207. _localStore.get(), _datastore, _workerQueue, _connectivityMonitor.get(),
  208. [self](OnlineState onlineState) { _syncEngine->HandleOnlineStateChange(onlineState); });
  209. ;
  210. _syncEngine = absl::make_unique<SyncEngine>(_localStore.get(), _remoteStore.get(), initialUser,
  211. _maxConcurrentLimboResolutions);
  212. _remoteStore->set_sync_engine(_syncEngine.get());
  213. _eventManager.Init(_syncEngine.get());
  214. // Set up internal event tracking for the spec tests.
  215. NSMutableArray<FSTQueryEvent *> *events = [NSMutableArray array];
  216. _eventHandler = ^(FSTQueryEvent *e) {
  217. [events addObject:e];
  218. };
  219. _events = events;
  220. _currentUser = initialUser;
  221. _acknowledgedDocs = [NSMutableArray array];
  222. _rejectedDocs = [NSMutableArray array];
  223. }
  224. return self;
  225. }
  226. - (const FSTOutstandingWriteQueues &)outstandingWrites {
  227. return _outstandingWrites;
  228. }
  229. - (const DocumentKeySet &)expectedActiveLimboDocuments {
  230. return _expectedActiveLimboDocuments;
  231. }
  232. - (void)setExpectedActiveLimboDocuments:(DocumentKeySet)docs {
  233. _expectedActiveLimboDocuments = std::move(docs);
  234. }
  235. - (const DocumentKeySet &)expectedEnqueuedLimboDocuments {
  236. return _expectedEnqueuedLimboDocuments;
  237. }
  238. - (void)setExpectedEnqueuedLimboDocuments:(DocumentKeySet)docs {
  239. _expectedEnqueuedLimboDocuments = std::move(docs);
  240. }
  241. - (void)drainQueue {
  242. _workerQueue->EnqueueBlocking([] {});
  243. }
  244. - (const User &)currentUser {
  245. return _currentUser;
  246. }
  247. - (const DatabaseInfo &)databaseInfo {
  248. return _databaseInfo;
  249. }
  250. - (void)incrementSnapshotsInSyncEvents {
  251. _snapshotsInSyncEvents += 1;
  252. }
  253. - (void)resetSnapshotsInSyncEvents {
  254. _snapshotsInSyncEvents = 0;
  255. }
  256. - (void)incrementWaitForPendingWritesEvents {
  257. _waitForPendingWritesEvents += 1;
  258. }
  259. - (void)resetWaitForPendingWritesEvents {
  260. _waitForPendingWritesEvents = 0;
  261. }
  262. - (void)waitForPendingWrites {
  263. _syncEngine->RegisterPendingWritesCallback(
  264. [self](const Status &) { [self incrementWaitForPendingWritesEvents]; });
  265. }
  266. - (void)addSnapshotsInSyncListener {
  267. std::shared_ptr<EventListener<Empty>> eventListener = EventListener<Empty>::Create(
  268. [self](const StatusOr<Empty> &) { [self incrementSnapshotsInSyncEvents]; });
  269. _snapshotsInSyncListeners.push_back(eventListener);
  270. _eventManager->AddSnapshotsInSyncListener(eventListener);
  271. }
  272. - (void)removeSnapshotsInSyncListener {
  273. if (_snapshotsInSyncListeners.empty()) {
  274. HARD_FAIL("There must be a listener to unlisten to");
  275. } else {
  276. _eventManager->RemoveSnapshotsInSyncListener(_snapshotsInSyncListeners.back());
  277. _snapshotsInSyncListeners.pop_back();
  278. }
  279. }
  280. - (int)waitForPendingWritesEvents {
  281. return _waitForPendingWritesEvents;
  282. }
  283. - (int)snapshotsInSyncEvents {
  284. return _snapshotsInSyncEvents;
  285. }
  286. - (void)start {
  287. _workerQueue->EnqueueBlocking([&] {
  288. _localStore->Start();
  289. _remoteStore->Start();
  290. });
  291. }
  292. - (void)validateUsage {
  293. // We could relax this if we found a reason to.
  294. HARD_ASSERT(self.events.count == 0, "You must clear all pending events by calling"
  295. " capturedEventsSinceLastCall before calling shutdown.");
  296. }
  297. - (void)shutdown {
  298. _workerQueue->EnqueueBlocking([&] {
  299. _remoteStore->Shutdown();
  300. _persistence->Shutdown();
  301. });
  302. }
  303. - (void)validateNextWriteSent:(const Mutation &)expectedWrite {
  304. std::vector<Mutation> request = _datastore->NextSentWrite();
  305. // Make sure the write went through the pipe like we expected it to.
  306. HARD_ASSERT(request.size() == 1, "Only single mutation requests are supported at the moment");
  307. const Mutation &actualWrite = request[0];
  308. HARD_ASSERT(actualWrite == expectedWrite,
  309. "Mock datastore received write %s but first outstanding mutation was %s",
  310. actualWrite.ToString(), expectedWrite.ToString());
  311. LOG_DEBUG("A write was sent: %s", actualWrite.ToString());
  312. }
  313. - (int)sentWritesCount {
  314. return _datastore->WritesSent();
  315. }
  316. - (int)writeStreamRequestCount {
  317. return _datastore->write_stream_request_count();
  318. }
  319. - (int)watchStreamRequestCount {
  320. return _datastore->watch_stream_request_count();
  321. }
  322. - (void)disableNetwork {
  323. _workerQueue->EnqueueBlocking([&] {
  324. // Make sure to execute all writes that are currently queued. This allows us
  325. // to assert on the total number of requests sent before shutdown.
  326. _remoteStore->FillWritePipeline();
  327. _remoteStore->DisableNetwork();
  328. });
  329. }
  330. - (void)enableNetwork {
  331. _workerQueue->EnqueueBlocking([&] { _remoteStore->EnableNetwork(); });
  332. }
  333. - (void)runTimer:(TimerId)timerID {
  334. _workerQueue->RunScheduledOperationsUntil(timerID);
  335. }
  336. - (void)triggerLruGC:(NSNumber *)threshold {
  337. if (_lru_delegate != nullptr) {
  338. _workerQueue->EnqueueBlocking([&] {
  339. auto *gc = _lru_delegate->garbage_collector();
  340. // Change params to collect all possible garbages
  341. gc->set_lru_params(LruParams{/*min_bytes_threshold*/ threshold.longValue,
  342. /*percentile_to_collect*/ 100,
  343. /*maximum_sequence_numbers_to_collect*/ 1000});
  344. _localStore->CollectGarbage(gc);
  345. });
  346. }
  347. }
  348. - (void)changeUser:(const User &)user {
  349. _currentUser = user;
  350. _workerQueue->EnqueueBlocking([&] { _syncEngine->HandleCredentialChange(user); });
  351. }
  352. - (FSTOutstandingWrite *)receiveWriteAckWithVersion:(const SnapshotVersion &)commitVersion
  353. mutationResults:(std::vector<MutationResult>)mutationResults {
  354. FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject;
  355. [[self currentOutstandingWrites] removeObjectAtIndex:0];
  356. [self validateNextWriteSent:write.write];
  357. _workerQueue->EnqueueBlocking(
  358. [&] { _datastore->AckWrite(commitVersion, std::move(mutationResults)); });
  359. return write;
  360. }
  361. - (FSTOutstandingWrite *)receiveWriteError:(int)errorCode
  362. userInfo:(NSDictionary<NSString *, id> *)userInfo
  363. keepInQueue:(BOOL)keepInQueue {
  364. Status error{static_cast<Error>(errorCode), MakeString([userInfo description])};
  365. FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject;
  366. [self validateNextWriteSent:write.write];
  367. // If this is a permanent error, the mutation is not expected to be sent again so we remove it
  368. // from currentOutstandingWrites.
  369. if (!keepInQueue) {
  370. [[self currentOutstandingWrites] removeObjectAtIndex:0];
  371. }
  372. LOG_DEBUG("Failing a write.");
  373. _workerQueue->EnqueueBlocking([&] { _datastore->FailWrite(error); });
  374. return write;
  375. }
  376. - (NSArray<FSTQueryEvent *> *)capturedEventsSinceLastCall {
  377. NSArray<FSTQueryEvent *> *result = [self.events copy];
  378. [self.events removeAllObjects];
  379. return result;
  380. }
  381. - (NSArray<NSString *> *)capturedAcknowledgedWritesSinceLastCall {
  382. NSArray<NSString *> *result = [self.acknowledgedDocs copy];
  383. [self.acknowledgedDocs removeAllObjects];
  384. return result;
  385. }
  386. - (NSArray<NSString *> *)capturedRejectedWritesSinceLastCall {
  387. NSArray<NSString *> *result = [self.rejectedDocs copy];
  388. [self.rejectedDocs removeAllObjects];
  389. return result;
  390. }
  391. - (TargetId)addUserListenerWithQuery:(Query)query options:(ListenOptions)options {
  392. core::QueryOrPipeline qop_for_listen;
  393. if (_convertToPipeline) {
  394. std::vector<std::shared_ptr<firebase::firestore::api::EvaluableStage>> stages =
  395. firebase::firestore::core::ToPipelineStages(query);
  396. auto serializer =
  397. absl::make_unique<firebase::firestore::remote::Serializer>(_databaseInfo.database_id());
  398. firebase::firestore::api::RealtimePipeline pipeline(std::move(stages), std::move(serializer));
  399. qop_for_listen = core::QueryOrPipeline(pipeline);
  400. } else {
  401. qop_for_listen = core::QueryOrPipeline(query);
  402. }
  403. auto listener = QueryListener::Create(
  404. qop_for_listen, options,
  405. [self, qop_for_listen](const StatusOr<ViewSnapshot> &maybe_snapshot) {
  406. FSTQueryEvent *event = [[FSTQueryEvent alloc] init];
  407. event.queryOrPipeline = qop_for_listen; // Event now holds QueryOrPipeline
  408. if (maybe_snapshot.ok()) {
  409. [event setViewSnapshot:maybe_snapshot.ValueOrDie()];
  410. } else {
  411. event.error = MakeNSError(maybe_snapshot.status());
  412. }
  413. [self.events addObject:event];
  414. });
  415. _queryListeners[qop_for_listen] = listener; // Use QueryOrPipeline as key
  416. TargetId targetID;
  417. // The actual call to EventManager still uses the listener based on the original Query.
  418. // The expectation is that SyncEngine will be made mode-aware if _convertToPipeline is true,
  419. // or that EventManager/QueryListener will be updated to handle QueryOrPipeline directly.
  420. _workerQueue->EnqueueBlocking([&] { targetID = _eventManager->AddQueryListener(listener); });
  421. return targetID;
  422. }
  423. - (void)removeUserListenerWithQuery:(const core::Query &)query {
  424. core::QueryOrPipeline qop;
  425. if (_convertToPipeline) {
  426. std::vector<std::shared_ptr<firebase::firestore::api::EvaluableStage>> stages =
  427. firebase::firestore::core::ToPipelineStages(query);
  428. auto serializer =
  429. absl::make_unique<firebase::firestore::remote::Serializer>(_databaseInfo.database_id());
  430. firebase::firestore::api::RealtimePipeline pipeline(std::move(stages), std::move(serializer));
  431. qop = core::QueryOrPipeline(pipeline);
  432. } else {
  433. qop = core::QueryOrPipeline(query);
  434. }
  435. auto found_iter = _queryListeners.find(qop);
  436. if (found_iter != _queryListeners.end()) {
  437. std::shared_ptr<QueryListener> listener = found_iter->second;
  438. _queryListeners.erase(found_iter);
  439. _workerQueue->EnqueueBlocking([&] { _eventManager->RemoveQueryListener(listener); });
  440. }
  441. }
  442. - (void)loadBundleWithReader:(std::shared_ptr<BundleReader>)reader
  443. task:(std::shared_ptr<LoadBundleTask>)task {
  444. _workerQueue->EnqueueBlocking(
  445. [=] { _syncEngine->LoadBundle(std::move(reader), std::move(task)); });
  446. }
  447. - (void)writeUserMutation:(Mutation)mutation {
  448. FSTOutstandingWrite *write = [[FSTOutstandingWrite alloc] init];
  449. write.write = mutation;
  450. [[self currentOutstandingWrites] addObject:write];
  451. LOG_DEBUG("sending a user write.");
  452. _workerQueue->EnqueueBlocking([=] {
  453. _syncEngine->WriteMutations({mutation}, [self, write, mutation](Status error) {
  454. LOG_DEBUG("A callback was called with error: %s", error.error_message());
  455. write.done = YES;
  456. write.error = error.ToNSError();
  457. NSString *mutationKey = MakeNSString(mutation.key().ToString());
  458. if (!error.ok()) {
  459. [self.rejectedDocs addObject:mutationKey];
  460. } else {
  461. [self.acknowledgedDocs addObject:mutationKey];
  462. }
  463. });
  464. });
  465. }
  466. - (void)receiveWatchChange:(const WatchChange &)change
  467. snapshotVersion:(const SnapshotVersion &)snapshot {
  468. _workerQueue->EnqueueBlocking([&] { _datastore->WriteWatchChange(change, snapshot); });
  469. }
  470. - (void)receiveWatchStreamError:(int)errorCode userInfo:(NSDictionary<NSString *, id> *)userInfo {
  471. Status error{static_cast<Error>(errorCode), MakeString([userInfo description])};
  472. _workerQueue->EnqueueBlocking([&] {
  473. _datastore->FailWatchStream(error);
  474. // Unlike web, stream should re-open synchronously (if we have any listeners)
  475. if (!_queryListeners.empty()) {
  476. HARD_ASSERT(_datastore->IsWatchStreamOpen(), "Watch stream is open");
  477. }
  478. });
  479. }
  480. - (std::map<DocumentKey, TargetId>)activeLimboDocumentResolutions {
  481. return _syncEngine->GetActiveLimboDocumentResolutions();
  482. }
  483. - (std::vector<DocumentKey>)enqueuedLimboDocumentResolutions {
  484. return _syncEngine->GetEnqueuedLimboDocumentResolutions();
  485. }
  486. - (const std::unordered_map<TargetId, TargetData> &)activeTargets {
  487. return _datastore->ActiveTargets();
  488. }
  489. - (const ActiveTargetMap &)expectedActiveTargets {
  490. return _expectedActiveTargets;
  491. }
  492. - (void)setExpectedActiveTargets:(ActiveTargetMap)targets {
  493. _expectedActiveTargets = std::move(targets);
  494. }
  495. #pragma mark - Helper Methods
  496. - (NSMutableArray<FSTOutstandingWrite *> *)currentOutstandingWrites {
  497. NSMutableArray<FSTOutstandingWrite *> *writes = _outstandingWrites[_currentUser];
  498. if (!writes) {
  499. writes = [NSMutableArray array];
  500. _outstandingWrites[_currentUser] = writes;
  501. }
  502. return writes;
  503. }
  504. @end
  505. NS_ASSUME_NONNULL_END