FSTSyncEngine.mm 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Source/Core/FSTSyncEngine.h"
  17. #include <map>
  18. #include <memory>
  19. #include <set>
  20. #include <unordered_map>
  21. #include <utility>
  22. #include <vector>
  23. #import "FIRFirestoreErrors.h"
  24. #import "Firestore/Source/Core/FSTView.h"
  25. #import "Firestore/Source/Local/FSTLocalStore.h"
  26. #import "Firestore/Source/Local/FSTQueryData.h"
  27. #import "Firestore/Source/Model/FSTDocument.h"
  28. #import "Firestore/Source/Model/FSTMutationBatch.h"
  29. #include "Firestore/core/include/firebase/firestore/firestore_errors.h"
  30. #include "Firestore/core/src/firebase/firestore/auth/user.h"
  31. #include "Firestore/core/src/firebase/firestore/core/target_id_generator.h"
  32. #include "Firestore/core/src/firebase/firestore/core/transaction.h"
  33. #include "Firestore/core/src/firebase/firestore/core/view_snapshot.h"
  34. #include "Firestore/core/src/firebase/firestore/local/local_view_changes.h"
  35. #include "Firestore/core/src/firebase/firestore/local/local_write_result.h"
  36. #include "Firestore/core/src/firebase/firestore/local/reference_set.h"
  37. #include "Firestore/core/src/firebase/firestore/model/document_key.h"
  38. #include "Firestore/core/src/firebase/firestore/model/document_map.h"
  39. #include "Firestore/core/src/firebase/firestore/model/document_set.h"
  40. #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
  41. #include "Firestore/core/src/firebase/firestore/remote/remote_event.h"
  42. #include "Firestore/core/src/firebase/firestore/util/error_apple.h"
  43. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  44. #include "Firestore/core/src/firebase/firestore/util/log.h"
  45. #include "Firestore/core/src/firebase/firestore/util/status.h"
  46. #include "absl/types/optional.h"
  47. using firebase::firestore::Error;
  48. using firebase::firestore::auth::HashUser;
  49. using firebase::firestore::auth::User;
  50. using firebase::firestore::core::Query;
  51. using firebase::firestore::core::SyncEngineCallback;
  52. using firebase::firestore::core::TargetIdGenerator;
  53. using firebase::firestore::core::Transaction;
  54. using firebase::firestore::core::ViewSnapshot;
  55. using firebase::firestore::local::LocalViewChanges;
  56. using firebase::firestore::local::LocalWriteResult;
  57. using firebase::firestore::local::ReferenceSet;
  58. using firebase::firestore::model::BatchId;
  59. using firebase::firestore::model::DocumentKey;
  60. using firebase::firestore::model::DocumentKeySet;
  61. using firebase::firestore::model::DocumentMap;
  62. using firebase::firestore::model::MaybeDocumentMap;
  63. using firebase::firestore::model::ListenSequenceNumber;
  64. using firebase::firestore::model::OnlineState;
  65. using firebase::firestore::model::SnapshotVersion;
  66. using firebase::firestore::model::TargetId;
  67. using firebase::firestore::remote::Datastore;
  68. using firebase::firestore::remote::RemoteEvent;
  69. using firebase::firestore::remote::RemoteStore;
  70. using firebase::firestore::remote::TargetChange;
  71. using firebase::firestore::util::AsyncQueue;
  72. using firebase::firestore::util::MakeNSError;
  73. using firebase::firestore::util::Status;
  74. NS_ASSUME_NONNULL_BEGIN
  75. // Limbo documents don't use persistence, and are eagerly GC'd. So, listens for them don't need
  76. // real sequence numbers.
  77. static const ListenSequenceNumber kIrrelevantSequenceNumber = -1;
  78. #pragma mark - FSTQueryView
  79. /**
  80. * FSTQueryView contains all of the info that FSTSyncEngine needs to track for a particular
  81. * query and view.
  82. */
  83. @interface FSTQueryView : NSObject
  84. - (instancetype)initWithQuery:(Query)query
  85. targetID:(TargetId)targetID
  86. resumeToken:(NSData *)resumeToken
  87. view:(FSTView *)view NS_DESIGNATED_INITIALIZER;
  88. - (instancetype)init NS_UNAVAILABLE;
  89. /** The query itself. */
  90. - (const Query &)query;
  91. /** The targetID created by the client that is used in the watch stream to identify this query. */
  92. @property(nonatomic, assign, readonly) TargetId targetID;
  93. /**
  94. * An identifier from the datastore backend that indicates the last state of the results that
  95. * was received. This can be used to indicate where to continue receiving new doc changes for the
  96. * query.
  97. */
  98. @property(nonatomic, copy, readonly) NSData *resumeToken;
  99. /**
  100. * The view is responsible for computing the final merged truth of what docs are in the query.
  101. * It gets notified of local and remote changes, and applies the query filters and limits to
  102. * determine the most correct possible results.
  103. */
  104. @property(nonatomic, strong, readonly) FSTView *view;
  105. @end
  106. @implementation FSTQueryView {
  107. Query _query;
  108. }
  109. - (instancetype)initWithQuery:(Query)query
  110. targetID:(TargetId)targetID
  111. resumeToken:(NSData *)resumeToken
  112. view:(FSTView *)view {
  113. if (self = [super init]) {
  114. _query = std::move(query);
  115. _targetID = targetID;
  116. _resumeToken = resumeToken;
  117. _view = view;
  118. }
  119. return self;
  120. }
  121. - (const Query &)query {
  122. return _query;
  123. }
  124. @end
  125. #pragma mark - LimboResolution
  126. /** Tracks a limbo resolution. */
  127. class LimboResolution {
  128. public:
  129. LimboResolution() {
  130. }
  131. explicit LimboResolution(const DocumentKey &key) : key{key} {
  132. }
  133. DocumentKey key;
  134. /**
  135. * Set to true once we've received a document. This is used in remoteKeysForTarget and
  136. * ultimately used by `WatchChangeAggregator` to decide whether it needs to manufacture a delete
  137. * event for the target once the target is CURRENT.
  138. */
  139. bool document_received = false;
  140. };
  141. #pragma mark - FSTSyncEngine
  142. @interface FSTSyncEngine ()
  143. /** The local store, used to persist mutations and cached documents. */
  144. @property(nonatomic, strong, readonly) FSTLocalStore *localStore;
  145. @end
  146. @implementation FSTSyncEngine {
  147. /** The remote store for sending writes, watches, etc. to the backend. */
  148. RemoteStore *_remoteStore;
  149. /**
  150. * A callback to be notified when queries being listened to produce new view snapshots or errors.
  151. */
  152. SyncEngineCallback *_callback;
  153. /** Used for creating the TargetId for the listens used to resolve limbo documents. */
  154. TargetIdGenerator _targetIdGenerator;
  155. /** Stores user completion blocks, indexed by user and BatchId. */
  156. std::unordered_map<User, NSMutableDictionary<NSNumber *, FSTVoidErrorBlock> *, HashUser>
  157. _mutationCompletionBlocks;
  158. /** FSTQueryViews for all active queries, indexed by query. */
  159. std::unordered_map<Query, FSTQueryView *> _queryViewsByQuery;
  160. /** FSTQueryViews for all active queries, indexed by target ID. */
  161. std::unordered_map<TargetId, FSTQueryView *> _queryViewsByTarget;
  162. /**
  163. * When a document is in limbo, we create a special listen to resolve it. This maps the
  164. * DocumentKey of each limbo document to the TargetId of the listen resolving it.
  165. */
  166. std::map<DocumentKey, TargetId> _limboTargetsByKey;
  167. /**
  168. * Basically the inverse of limboTargetsByKey, a map of target ID to a LimboResolution (which
  169. * includes the DocumentKey as well as whether we've received a document for the target).
  170. */
  171. std::map<TargetId, LimboResolution> _limboResolutionsByTarget;
  172. User _currentUser;
  173. /** Used to track any documents that are currently in limbo. */
  174. ReferenceSet _limboDocumentRefs;
  175. }
  176. - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore
  177. remoteStore:(RemoteStore *)remoteStore
  178. initialUser:(const User &)initialUser {
  179. if (self = [super init]) {
  180. _localStore = localStore;
  181. _remoteStore = remoteStore;
  182. _targetIdGenerator = TargetIdGenerator::SyncEngineTargetIdGenerator();
  183. _currentUser = initialUser;
  184. }
  185. return self;
  186. }
  187. - (void)setCallback:(SyncEngineCallback *)callback {
  188. _callback = callback;
  189. }
  190. - (TargetId)listenToQuery:(Query)query {
  191. [self assertCallbackExistsForSelector:_cmd];
  192. HARD_ASSERT(_queryViewsByQuery.find(query) == _queryViewsByQuery.end(),
  193. "We already listen to query: %s", query.ToString());
  194. FSTQueryData *queryData = [self.localStore allocateQuery:query];
  195. ViewSnapshot viewSnapshot = [self initializeViewAndComputeSnapshotForQueryData:queryData];
  196. _callback->OnViewSnapshots({viewSnapshot});
  197. _remoteStore->Listen(queryData);
  198. return queryData.targetID;
  199. }
  200. - (ViewSnapshot)initializeViewAndComputeSnapshotForQueryData:(FSTQueryData *)queryData {
  201. DocumentMap docs = [self.localStore executeQuery:queryData.query];
  202. DocumentKeySet remoteKeys = [self.localStore remoteDocumentKeysForTarget:queryData.targetID];
  203. FSTView *view = [[FSTView alloc] initWithQuery:queryData.query
  204. remoteDocuments:std::move(remoteKeys)];
  205. FSTViewDocumentChanges *viewDocChanges = [view computeChangesWithDocuments:docs.underlying_map()];
  206. FSTViewChange *viewChange = [view applyChangesToDocuments:viewDocChanges];
  207. HARD_ASSERT(viewChange.limboChanges.count == 0,
  208. "View returned limbo docs before target ack from the server.");
  209. FSTQueryView *queryView = [[FSTQueryView alloc] initWithQuery:queryData.query
  210. targetID:queryData.targetID
  211. resumeToken:queryData.resumeToken
  212. view:view];
  213. _queryViewsByQuery[queryData.query] = queryView;
  214. _queryViewsByTarget[queryData.targetID] = queryView;
  215. HARD_ASSERT(viewChange.snapshot.has_value(),
  216. "applyChangesToDocuments for new view should always return a snapshot");
  217. return viewChange.snapshot.value();
  218. }
  219. - (void)stopListeningToQuery:(const Query &)query {
  220. [self assertCallbackExistsForSelector:_cmd];
  221. FSTQueryView *queryView = _queryViewsByQuery[query];
  222. HARD_ASSERT(queryView, "Trying to stop listening to a query not found");
  223. [self.localStore releaseQuery:query];
  224. _remoteStore->StopListening(queryView.targetID);
  225. [self removeAndCleanupQuery:queryView];
  226. }
  227. - (void)writeMutations:(std::vector<FSTMutation *> &&)mutations
  228. completion:(FSTVoidErrorBlock)completion {
  229. [self assertCallbackExistsForSelector:_cmd];
  230. LocalWriteResult result = [self.localStore locallyWriteMutations:std::move(mutations)];
  231. [self addMutationCompletionBlock:completion batchID:result.batch_id()];
  232. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:result.changes() remoteEvent:absl::nullopt];
  233. _remoteStore->FillWritePipeline();
  234. }
  235. - (void)addMutationCompletionBlock:(FSTVoidErrorBlock)completion batchID:(BatchId)batchID {
  236. NSMutableDictionary<NSNumber *, FSTVoidErrorBlock> *completionBlocks =
  237. _mutationCompletionBlocks[_currentUser];
  238. if (!completionBlocks) {
  239. completionBlocks = [NSMutableDictionary dictionary];
  240. _mutationCompletionBlocks[_currentUser] = completionBlocks;
  241. }
  242. [completionBlocks setObject:completion forKey:@(batchID)];
  243. }
  244. /**
  245. * Takes an updateCallback in which a set of reads and writes can be performed atomically. In the
  246. * updateCallback, user code can read and write values using a transaction object. After the
  247. * updateCallback, all changes will be committed. If someone else has changed any of the data
  248. * referenced, then the updateCallback will be called again. If the updateCallback still fails after
  249. * the given number of retries, then the transaction will be rejected.
  250. *
  251. * The transaction object passed to the updateCallback contains methods for accessing documents
  252. * and collections. Unlike other firestore access, data accessed with the transaction will not
  253. * reflect local changes that have not been committed. For this reason, it is required that all
  254. * reads are performed before any writes. Transactions must be performed while online.
  255. */
  256. - (void)transactionWithRetries:(int)retries
  257. workerQueue:(const std::shared_ptr<AsyncQueue> &)workerQueue
  258. updateCallback:(core::TransactionUpdateCallback)updateCallback
  259. resultCallback:(core::TransactionResultCallback)resultCallback {
  260. workerQueue->VerifyIsCurrentQueue();
  261. HARD_ASSERT(retries >= 0, "Got negative number of retries for transaction");
  262. std::shared_ptr<Transaction> transaction = _remoteStore->CreateTransaction();
  263. updateCallback(transaction, [=](util::StatusOr<absl::any> maybe_result) {
  264. workerQueue->Enqueue(
  265. [self, retries, workerQueue, updateCallback, resultCallback, transaction, maybe_result] {
  266. if (!maybe_result.ok()) {
  267. if (retries > 0 && [self isRetryableTransactionError:maybe_result.status()] &&
  268. !transaction->IsPermanentlyFailed()) {
  269. return [self transactionWithRetries:(retries - 1)
  270. workerQueue:workerQueue
  271. updateCallback:updateCallback
  272. resultCallback:resultCallback];
  273. } else {
  274. resultCallback(std::move(maybe_result));
  275. }
  276. } else {
  277. transaction->Commit([self, retries, workerQueue, updateCallback, resultCallback,
  278. maybe_result, transaction](Status status) {
  279. if (status.ok()) {
  280. resultCallback(std::move(maybe_result));
  281. return;
  282. }
  283. if (retries > 0 && [self isRetryableTransactionError:status] &&
  284. !transaction->IsPermanentlyFailed()) {
  285. workerQueue->VerifyIsCurrentQueue();
  286. return [self transactionWithRetries:(retries - 1)
  287. workerQueue:workerQueue
  288. updateCallback:updateCallback
  289. resultCallback:resultCallback];
  290. }
  291. resultCallback(std::move(status));
  292. });
  293. }
  294. });
  295. });
  296. }
  297. - (void)applyRemoteEvent:(const RemoteEvent &)remoteEvent {
  298. [self assertCallbackExistsForSelector:_cmd];
  299. // Update `receivedDocument` as appropriate for any limbo targets.
  300. for (const auto &entry : remoteEvent.target_changes()) {
  301. TargetId targetID = entry.first;
  302. const TargetChange &change = entry.second;
  303. const auto iter = _limboResolutionsByTarget.find(targetID);
  304. if (iter != _limboResolutionsByTarget.end()) {
  305. LimboResolution &limboResolution = iter->second;
  306. // Since this is a limbo resolution lookup, it's for a single document and it could be
  307. // added, modified, or removed, but not a combination.
  308. HARD_ASSERT(change.added_documents().size() + change.modified_documents().size() +
  309. change.removed_documents().size() <=
  310. 1,
  311. "Limbo resolution for single document contains multiple changes.");
  312. if (change.added_documents().size() > 0) {
  313. limboResolution.document_received = true;
  314. } else if (change.modified_documents().size() > 0) {
  315. HARD_ASSERT(limboResolution.document_received,
  316. "Received change for limbo target document without add.");
  317. } else if (change.removed_documents().size() > 0) {
  318. HARD_ASSERT(limboResolution.document_received,
  319. "Received remove for limbo target document without add.");
  320. limboResolution.document_received = false;
  321. } else {
  322. // This was probably just a CURRENT targetChange or similar.
  323. }
  324. }
  325. }
  326. MaybeDocumentMap changes = [self.localStore applyRemoteEvent:remoteEvent];
  327. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:remoteEvent];
  328. }
  329. - (void)applyChangedOnlineState:(OnlineState)onlineState {
  330. [self assertCallbackExistsForSelector:_cmd];
  331. std::vector<ViewSnapshot> newViewSnapshots;
  332. for (const auto &entry : _queryViewsByQuery) {
  333. FSTQueryView *queryView = entry.second;
  334. FSTViewChange *viewChange = [queryView.view applyChangedOnlineState:onlineState];
  335. HARD_ASSERT(viewChange.limboChanges.count == 0,
  336. "OnlineState should not affect limbo documents.");
  337. if (viewChange.snapshot.has_value()) {
  338. newViewSnapshots.push_back(std::move(viewChange.snapshot.value()));
  339. }
  340. }
  341. _callback->OnViewSnapshots(std::move(newViewSnapshots));
  342. _callback->HandleOnlineStateChange(onlineState);
  343. }
  344. - (void)rejectListenWithTargetID:(const TargetId)targetID error:(NSError *)error {
  345. [self assertCallbackExistsForSelector:_cmd];
  346. const auto iter = _limboResolutionsByTarget.find(targetID);
  347. if (iter != _limboResolutionsByTarget.end()) {
  348. const DocumentKey limboKey = iter->second.key;
  349. // Since this query failed, we won't want to manually unlisten to it.
  350. // So go ahead and remove it from bookkeeping.
  351. _limboTargetsByKey.erase(limboKey);
  352. _limboResolutionsByTarget.erase(targetID);
  353. // TODO(dimond): Retry on transient errors?
  354. // It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack.
  355. // Ideally, we would have a method in the local store to purge a document. However, it would
  356. // be tricky to keep all of the local store's invariants with another method.
  357. FSTDeletedDocument *doc = [FSTDeletedDocument documentWithKey:limboKey
  358. version:SnapshotVersion::None()
  359. hasCommittedMutations:NO];
  360. DocumentKeySet limboDocuments = DocumentKeySet{doc.key};
  361. RemoteEvent event{SnapshotVersion::None(), /*target_changes=*/{}, /*target_mismatches=*/{},
  362. /*document_updates=*/{{limboKey, doc}}, std::move(limboDocuments)};
  363. [self applyRemoteEvent:event];
  364. } else {
  365. auto found = _queryViewsByTarget.find(targetID);
  366. HARD_ASSERT(found != _queryViewsByTarget.end(), "Unknown targetId: %s", targetID);
  367. FSTQueryView *queryView = found->second;
  368. const Query &query = queryView.query;
  369. [self.localStore releaseQuery:query];
  370. [self removeAndCleanupQuery:queryView];
  371. if ([self errorIsInteresting:error]) {
  372. LOG_WARN("Listen for query at %s failed: %s", query.path().CanonicalString(),
  373. error.localizedDescription);
  374. }
  375. _callback->OnError(query, Status::FromNSError(error));
  376. }
  377. }
  378. - (void)applySuccessfulWriteWithResult:(FSTMutationBatchResult *)batchResult {
  379. [self assertCallbackExistsForSelector:_cmd];
  380. // The local store may or may not be able to apply the write result and raise events immediately
  381. // (depending on whether the watcher is caught up), so we raise user callbacks first so that they
  382. // consistently happen before listen events.
  383. [self processUserCallbacksForBatchID:batchResult.batch.batchID error:nil];
  384. MaybeDocumentMap changes = [self.localStore acknowledgeBatchWithResult:batchResult];
  385. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];
  386. }
  387. - (void)rejectFailedWriteWithBatchID:(BatchId)batchID error:(NSError *)error {
  388. [self assertCallbackExistsForSelector:_cmd];
  389. MaybeDocumentMap changes = [self.localStore rejectBatchID:batchID];
  390. if (!changes.empty() && [self errorIsInteresting:error]) {
  391. const DocumentKey &minKey = changes.min()->first;
  392. LOG_WARN("Write at %s failed: %s", minKey.ToString(), error.localizedDescription);
  393. }
  394. // The local store may or may not be able to apply the write result and raise events immediately
  395. // (depending on whether the watcher is caught up), so we raise user callbacks first so that they
  396. // consistently happen before listen events.
  397. [self processUserCallbacksForBatchID:batchID error:error];
  398. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];
  399. }
  400. - (void)processUserCallbacksForBatchID:(BatchId)batchID error:(NSError *_Nullable)error {
  401. NSMutableDictionary<NSNumber *, FSTVoidErrorBlock> *completionBlocks =
  402. _mutationCompletionBlocks[_currentUser];
  403. // NOTE: Mutations restored from persistence won't have completion blocks, so it's okay for
  404. // this (or the completion below) to be nil.
  405. if (completionBlocks) {
  406. NSNumber *boxedBatchID = @(batchID);
  407. FSTVoidErrorBlock completion = completionBlocks[boxedBatchID];
  408. if (completion) {
  409. completion(error);
  410. [completionBlocks removeObjectForKey:boxedBatchID];
  411. }
  412. }
  413. }
  414. - (void)assertCallbackExistsForSelector:(SEL)methodSelector {
  415. HARD_ASSERT(_callback, "Tried to call '%s' before callback was registered.",
  416. NSStringFromSelector(methodSelector));
  417. }
  418. - (void)removeAndCleanupQuery:(FSTQueryView *)queryView {
  419. _queryViewsByQuery.erase(queryView.query);
  420. _queryViewsByTarget.erase(queryView.targetID);
  421. DocumentKeySet limboKeys = _limboDocumentRefs.ReferencedKeys(queryView.targetID);
  422. _limboDocumentRefs.RemoveReferences(queryView.targetID);
  423. for (const DocumentKey &key : limboKeys) {
  424. if (!_limboDocumentRefs.ContainsKey(key)) {
  425. // We removed the last reference for this key.
  426. [self removeLimboTargetForKey:key];
  427. }
  428. }
  429. }
  430. /**
  431. * Computes a new snapshot from the changes and calls the registered callback with the new snapshot.
  432. */
  433. - (void)emitNewSnapshotsAndNotifyLocalStoreWithChanges:(const MaybeDocumentMap &)changes
  434. remoteEvent:(const absl::optional<RemoteEvent> &)
  435. maybeRemoteEvent {
  436. std::vector<ViewSnapshot> newSnapshots;
  437. std::vector<LocalViewChanges> documentChangesInAllViews;
  438. for (const auto &entry : _queryViewsByQuery) {
  439. FSTQueryView *queryView = entry.second;
  440. FSTView *view = queryView.view;
  441. FSTViewDocumentChanges *viewDocChanges = [view computeChangesWithDocuments:changes];
  442. if (viewDocChanges.needsRefill) {
  443. // The query has a limit and some docs were removed/updated, so we need to re-run the
  444. // query against the local store to make sure we didn't lose any good docs that had been
  445. // past the limit.
  446. DocumentMap docs = [self.localStore executeQuery:queryView.query];
  447. viewDocChanges = [view computeChangesWithDocuments:docs.underlying_map()
  448. previousChanges:viewDocChanges];
  449. }
  450. absl::optional<TargetChange> targetChange;
  451. if (maybeRemoteEvent.has_value()) {
  452. const RemoteEvent &remoteEvent = maybeRemoteEvent.value();
  453. auto it = remoteEvent.target_changes().find(queryView.targetID);
  454. if (it != remoteEvent.target_changes().end()) {
  455. targetChange = it->second;
  456. }
  457. }
  458. FSTViewChange *viewChange = [queryView.view applyChangesToDocuments:viewDocChanges
  459. targetChange:targetChange];
  460. [self updateTrackedLimboDocumentsWithChanges:viewChange.limboChanges
  461. targetID:queryView.targetID];
  462. if (viewChange.snapshot.has_value()) {
  463. newSnapshots.push_back(viewChange.snapshot.value());
  464. LocalViewChanges docChanges =
  465. LocalViewChanges::FromViewSnapshot(viewChange.snapshot.value(), queryView.targetID);
  466. documentChangesInAllViews.push_back(std::move(docChanges));
  467. }
  468. }
  469. _callback->OnViewSnapshots(std::move(newSnapshots));
  470. [self.localStore notifyLocalViewChanges:documentChangesInAllViews];
  471. }
  472. /** Updates the limbo document state for the given targetID. */
  473. - (void)updateTrackedLimboDocumentsWithChanges:(NSArray<FSTLimboDocumentChange *> *)limboChanges
  474. targetID:(TargetId)targetID {
  475. for (FSTLimboDocumentChange *limboChange in limboChanges) {
  476. switch (limboChange.type) {
  477. case FSTLimboDocumentChangeTypeAdded:
  478. _limboDocumentRefs.AddReference(limboChange.key, targetID);
  479. [self trackLimboChange:limboChange];
  480. break;
  481. case FSTLimboDocumentChangeTypeRemoved:
  482. LOG_DEBUG("Document no longer in limbo: %s", limboChange.key.ToString());
  483. _limboDocumentRefs.RemoveReference(limboChange.key, targetID);
  484. if (!_limboDocumentRefs.ContainsKey(limboChange.key)) {
  485. // We removed the last reference for this key
  486. [self removeLimboTargetForKey:limboChange.key];
  487. }
  488. break;
  489. default:
  490. HARD_FAIL("Unknown limbo change type: %s", limboChange.type);
  491. }
  492. }
  493. }
  494. - (void)trackLimboChange:(FSTLimboDocumentChange *)limboChange {
  495. DocumentKey key{limboChange.key};
  496. if (_limboTargetsByKey.find(key) == _limboTargetsByKey.end()) {
  497. LOG_DEBUG("New document in limbo: %s", key.ToString());
  498. TargetId limboTargetID = _targetIdGenerator.NextId();
  499. Query query(key.path());
  500. FSTQueryData *queryData = [[FSTQueryData alloc] initWithQuery:std::move(query)
  501. targetID:limboTargetID
  502. listenSequenceNumber:kIrrelevantSequenceNumber
  503. purpose:FSTQueryPurposeLimboResolution];
  504. _limboResolutionsByTarget.emplace(limboTargetID, LimboResolution{key});
  505. _remoteStore->Listen(queryData);
  506. _limboTargetsByKey[key] = limboTargetID;
  507. }
  508. }
  509. - (void)removeLimboTargetForKey:(const DocumentKey &)key {
  510. const auto iter = _limboTargetsByKey.find(key);
  511. if (iter == _limboTargetsByKey.end()) {
  512. // This target already got removed, because the query failed.
  513. return;
  514. }
  515. TargetId limboTargetID = iter->second;
  516. _remoteStore->StopListening(limboTargetID);
  517. _limboTargetsByKey.erase(key);
  518. _limboResolutionsByTarget.erase(limboTargetID);
  519. }
  520. // Used for testing
  521. - (std::map<DocumentKey, TargetId>)currentLimboDocuments {
  522. // Return defensive copy
  523. return _limboTargetsByKey;
  524. }
  525. - (void)credentialDidChangeWithUser:(const firebase::firestore::auth::User &)user {
  526. BOOL userChanged = (_currentUser != user);
  527. _currentUser = user;
  528. if (userChanged) {
  529. // Notify local store and emit any resulting events from swapping out the mutation queue.
  530. MaybeDocumentMap changes = [self.localStore userDidChange:user];
  531. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];
  532. }
  533. // Notify remote store so it can restart its streams.
  534. _remoteStore->HandleCredentialChange();
  535. }
  536. - (DocumentKeySet)remoteKeysForTarget:(TargetId)targetId {
  537. const auto iter = _limboResolutionsByTarget.find(targetId);
  538. if (iter != _limboResolutionsByTarget.end() && iter->second.document_received) {
  539. return DocumentKeySet{iter->second.key};
  540. } else {
  541. auto found = _queryViewsByTarget.find(targetId);
  542. FSTQueryView *queryView = found != _queryViewsByTarget.end() ? found->second : nil;
  543. return queryView ? queryView.view.syncedDocuments : DocumentKeySet{};
  544. }
  545. }
  546. /**
  547. * Decides if the error likely represents a developer mistake such as forgetting to create an index
  548. * or permission denied. Used to decide whether an error is worth automatically logging as a
  549. * warning.
  550. */
  551. - (BOOL)errorIsInteresting:(NSError *)error {
  552. if (error.domain == FIRFirestoreErrorDomain) {
  553. if (error.code == FIRFirestoreErrorCodeFailedPrecondition &&
  554. [error.localizedDescription containsString:@"requires an index"]) {
  555. return YES;
  556. } else if (error.code == FIRFirestoreErrorCodePermissionDenied) {
  557. return YES;
  558. }
  559. }
  560. return NO;
  561. }
  562. - (BOOL)isRetryableTransactionError:(const Status &)error {
  563. // In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
  564. // non-matching document versions with ABORTED. These errors should be retried.
  565. Error code = error.code();
  566. return code == Error::Aborted || code == Error::FailedPrecondition ||
  567. !Datastore::IsPermanentError(error);
  568. }
  569. @end
  570. NS_ASSUME_NONNULL_END