FSTSyncEngine.mm 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Source/Core/FSTSyncEngine.h"
  17. #include <map>
  18. #include <memory>
  19. #include <set>
  20. #include <unordered_map>
  21. #include <utility>
  22. #include <vector>
  23. #import "FIRFirestoreErrors.h"
  24. #import "Firestore/Source/Local/FSTLocalStore.h"
  25. #include "Firestore/core/include/firebase/firestore/firestore_errors.h"
  26. #include "Firestore/core/src/firebase/firestore/auth/user.h"
  27. #include "Firestore/core/src/firebase/firestore/core/target_id_generator.h"
  28. #include "Firestore/core/src/firebase/firestore/core/transaction.h"
  29. #include "Firestore/core/src/firebase/firestore/core/transaction_runner.h"
  30. #include "Firestore/core/src/firebase/firestore/core/view.h"
  31. #include "Firestore/core/src/firebase/firestore/core/view_snapshot.h"
  32. #include "Firestore/core/src/firebase/firestore/local/local_view_changes.h"
  33. #include "Firestore/core/src/firebase/firestore/local/local_write_result.h"
  34. #include "Firestore/core/src/firebase/firestore/local/query_data.h"
  35. #include "Firestore/core/src/firebase/firestore/local/reference_set.h"
  36. #include "Firestore/core/src/firebase/firestore/model/document_key.h"
  37. #include "Firestore/core/src/firebase/firestore/model/document_map.h"
  38. #include "Firestore/core/src/firebase/firestore/model/document_set.h"
  39. #include "Firestore/core/src/firebase/firestore/model/mutation_batch.h"
  40. #include "Firestore/core/src/firebase/firestore/model/no_document.h"
  41. #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
  42. #include "Firestore/core/src/firebase/firestore/remote/remote_event.h"
  43. #include "Firestore/core/src/firebase/firestore/util/delayed_constructor.h"
  44. #include "Firestore/core/src/firebase/firestore/util/error_apple.h"
  45. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  46. #include "Firestore/core/src/firebase/firestore/util/log.h"
  47. #include "Firestore/core/src/firebase/firestore/util/status.h"
  48. #include "absl/types/optional.h"
  49. using firebase::firestore::Error;
  50. using firebase::firestore::auth::HashUser;
  51. using firebase::firestore::auth::User;
  52. using firebase::firestore::core::LimboDocumentChange;
  53. using firebase::firestore::core::Query;
  54. using firebase::firestore::core::SyncEngineCallback;
  55. using firebase::firestore::core::TargetIdGenerator;
  56. using firebase::firestore::core::Transaction;
  57. using firebase::firestore::core::TransactionRunner;
  58. using firebase::firestore::core::View;
  59. using firebase::firestore::core::ViewChange;
  60. using firebase::firestore::core::ViewDocumentChanges;
  61. using firebase::firestore::core::ViewSnapshot;
  62. using firebase::firestore::local::LocalViewChanges;
  63. using firebase::firestore::local::LocalWriteResult;
  64. using firebase::firestore::local::QueryData;
  65. using firebase::firestore::local::QueryPurpose;
  66. using firebase::firestore::local::ReferenceSet;
  67. using firebase::firestore::model::BatchId;
  68. using firebase::firestore::model::DocumentKey;
  69. using firebase::firestore::model::DocumentKeySet;
  70. using firebase::firestore::model::DocumentMap;
  71. using firebase::firestore::model::kBatchIdUnknown;
  72. using firebase::firestore::model::ListenSequenceNumber;
  73. using firebase::firestore::model::MaybeDocumentMap;
  74. using firebase::firestore::model::Mutation;
  75. using firebase::firestore::model::MutationBatchResult;
  76. using firebase::firestore::model::NoDocument;
  77. using firebase::firestore::model::OnlineState;
  78. using firebase::firestore::model::SnapshotVersion;
  79. using firebase::firestore::model::TargetId;
  80. using firebase::firestore::nanopb::ByteString;
  81. using firebase::firestore::remote::Datastore;
  82. using firebase::firestore::remote::RemoteEvent;
  83. using firebase::firestore::remote::RemoteStore;
  84. using firebase::firestore::remote::TargetChange;
  85. using firebase::firestore::util::AsyncQueue;
  86. using firebase::firestore::util::MakeNSError;
  87. using firebase::firestore::util::Status;
  88. using firebase::firestore::util::StatusCallback;
  89. NS_ASSUME_NONNULL_BEGIN
  90. // Limbo documents don't use persistence, and are eagerly GC'd. So, listens for them don't need
  91. // real sequence numbers.
  92. static const ListenSequenceNumber kIrrelevantSequenceNumber = -1;
  93. #pragma mark - FSTQueryView
  94. /**
  95. * FSTQueryView contains all of the info that FSTSyncEngine needs to track for a particular
  96. * query and view.
  97. */
  98. @interface FSTQueryView : NSObject
  99. - (instancetype)initWithQuery:(Query)query
  100. targetID:(TargetId)targetID
  101. resumeToken:(ByteString)resumeToken
  102. view:(core::View)view NS_DESIGNATED_INITIALIZER;
  103. - (instancetype)init NS_UNAVAILABLE;
  104. /** The query itself. */
  105. - (const Query &)query;
  106. /** The targetID created by the client that is used in the watch stream to identify this query. */
  107. @property(nonatomic, assign, readonly) TargetId targetID;
  108. /**
  109. * An identifier from the datastore backend that indicates the last state of the results that
  110. * was received. This can be used to indicate where to continue receiving new doc changes for the
  111. * query.
  112. */
  113. - (const ByteString &)resumeToken;
  114. /**
  115. * The view is responsible for computing the final merged truth of what docs are in the query.
  116. * It gets notified of local and remote changes, and applies the query filters and limits to
  117. * determine the most correct possible results.
  118. */
  119. - (core::View *)view;
  120. @end
  121. @implementation FSTQueryView {
  122. Query _query;
  123. ByteString _resumeToken;
  124. util::DelayedConstructor<View> _view;
  125. }
  126. - (instancetype)initWithQuery:(Query)query
  127. targetID:(TargetId)targetID
  128. resumeToken:(ByteString)resumeToken
  129. view:(View)view {
  130. if (self = [super init]) {
  131. _query = std::move(query);
  132. _targetID = targetID;
  133. _resumeToken = std::move(resumeToken);
  134. _view.Init(std::move(view));
  135. }
  136. return self;
  137. }
  138. - (const Query &)query {
  139. return _query;
  140. }
  141. - (const ByteString &)resumeToken {
  142. return _resumeToken;
  143. }
  144. - (View *)view {
  145. return &*_view;
  146. }
  147. @end
  148. #pragma mark - LimboResolution
  149. /** Tracks a limbo resolution. */
  150. class LimboResolution {
  151. public:
  152. LimboResolution() {
  153. }
  154. explicit LimboResolution(const DocumentKey &key) : key{key} {
  155. }
  156. DocumentKey key;
  157. /**
  158. * Set to true once we've received a document. This is used in remoteKeysForTarget and
  159. * ultimately used by `WatchChangeAggregator` to decide whether it needs to manufacture a delete
  160. * event for the target once the target is CURRENT.
  161. */
  162. bool document_received = false;
  163. };
  164. #pragma mark - FSTSyncEngine
  165. @interface FSTSyncEngine ()
  166. /** The local store, used to persist mutations and cached documents. */
  167. @property(nonatomic, strong, readonly) FSTLocalStore *localStore;
  168. @end
  169. @implementation FSTSyncEngine {
  170. /** The remote store for sending writes, watches, etc. to the backend. */
  171. RemoteStore *_remoteStore;
  172. /**
  173. * A callback to be notified when queries being listened to produce new view snapshots or errors.
  174. */
  175. SyncEngineCallback *_callback;
  176. /** Used for creating the TargetId for the listens used to resolve limbo documents. */
  177. TargetIdGenerator _targetIdGenerator;
  178. /** Stores user completion blocks, indexed by user and BatchId. */
  179. std::unordered_map<User, NSMutableDictionary<NSNumber *, FSTVoidErrorBlock> *, HashUser>
  180. _mutationCompletionBlocks;
  181. /** Stores user callbacks waiting for pending writes to be acknowledged. */
  182. std::unordered_map<model::BatchId, std::vector<StatusCallback>> _pendingWritesCallbacks;
  183. /** FSTQueryViews for all active queries, indexed by query. */
  184. std::unordered_map<Query, FSTQueryView *> _queryViewsByQuery;
  185. /** FSTQueryViews for all active queries, indexed by target ID. */
  186. std::unordered_map<TargetId, FSTQueryView *> _queryViewsByTarget;
  187. /**
  188. * When a document is in limbo, we create a special listen to resolve it. This maps the
  189. * DocumentKey of each limbo document to the TargetId of the listen resolving it.
  190. */
  191. std::map<DocumentKey, TargetId> _limboTargetsByKey;
  192. /**
  193. * Basically the inverse of limboTargetsByKey, a map of target ID to a LimboResolution (which
  194. * includes the DocumentKey as well as whether we've received a document for the target).
  195. */
  196. std::map<TargetId, LimboResolution> _limboResolutionsByTarget;
  197. User _currentUser;
  198. /** Used to track any documents that are currently in limbo. */
  199. ReferenceSet _limboDocumentRefs;
  200. }
  201. - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore
  202. remoteStore:(RemoteStore *)remoteStore
  203. initialUser:(const User &)initialUser {
  204. if (self = [super init]) {
  205. _localStore = localStore;
  206. _remoteStore = remoteStore;
  207. _targetIdGenerator = TargetIdGenerator::SyncEngineTargetIdGenerator();
  208. _currentUser = initialUser;
  209. }
  210. return self;
  211. }
  212. - (void)setCallback:(SyncEngineCallback *)callback {
  213. _callback = callback;
  214. }
  215. - (TargetId)listenToQuery:(Query)query {
  216. [self assertCallbackExistsForSelector:_cmd];
  217. HARD_ASSERT(_queryViewsByQuery.find(query) == _queryViewsByQuery.end(),
  218. "We already listen to query: %s", query.ToString());
  219. QueryData queryData = [self.localStore allocateQuery:query];
  220. ViewSnapshot viewSnapshot = [self initializeViewAndComputeSnapshotForQueryData:queryData];
  221. _callback->OnViewSnapshots({viewSnapshot});
  222. _remoteStore->Listen(queryData);
  223. return queryData.target_id();
  224. }
  225. - (ViewSnapshot)initializeViewAndComputeSnapshotForQueryData:(const QueryData &)queryData {
  226. DocumentMap docs = [self.localStore executeQuery:queryData.query()];
  227. DocumentKeySet remoteKeys = [self.localStore remoteDocumentKeysForTarget:queryData.target_id()];
  228. View view(queryData.query(), std::move(remoteKeys));
  229. ViewDocumentChanges viewDocChanges = view.ComputeDocumentChanges(docs.underlying_map());
  230. ViewChange viewChange = view.ApplyChanges(viewDocChanges);
  231. HARD_ASSERT(viewChange.limbo_changes().empty(),
  232. "View returned limbo docs before target ack from the server.");
  233. FSTQueryView *queryView = [[FSTQueryView alloc] initWithQuery:queryData.query()
  234. targetID:queryData.target_id()
  235. resumeToken:queryData.resume_token()
  236. view:view];
  237. _queryViewsByQuery[queryData.query()] = queryView;
  238. _queryViewsByTarget[queryData.target_id()] = queryView;
  239. HARD_ASSERT(viewChange.snapshot().has_value(),
  240. "applyChangesToDocuments for new view should always return a snapshot");
  241. return viewChange.snapshot().value();
  242. }
  243. - (void)stopListeningToQuery:(const Query &)query {
  244. [self assertCallbackExistsForSelector:_cmd];
  245. FSTQueryView *queryView = _queryViewsByQuery[query];
  246. HARD_ASSERT(queryView, "Trying to stop listening to a query not found");
  247. [self.localStore releaseQuery:query];
  248. _remoteStore->StopListening(queryView.targetID);
  249. [self removeAndCleanupQuery:queryView];
  250. }
  251. - (void)writeMutations:(std::vector<Mutation> &&)mutations
  252. completion:(FSTVoidErrorBlock)completion {
  253. [self assertCallbackExistsForSelector:_cmd];
  254. LocalWriteResult result = [self.localStore locallyWriteMutations:std::move(mutations)];
  255. [self addMutationCompletionBlock:completion batchID:result.batch_id()];
  256. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:result.changes() remoteEvent:absl::nullopt];
  257. _remoteStore->FillWritePipeline();
  258. }
  259. - (void)registerPendingWritesCallback:(StatusCallback)callback {
  260. if (!_remoteStore->CanUseNetwork()) {
  261. LOG_DEBUG("The network is disabled. The task returned by 'awaitPendingWrites()' will not "
  262. "complete until the network is enabled.");
  263. }
  264. int largestPendingBatchId = [self.localStore getHighestUnacknowledgedBatchId];
  265. if (largestPendingBatchId == kBatchIdUnknown) {
  266. // Trigger the callback right away if there is no pending writes at the moment.
  267. callback(Status::OK());
  268. return;
  269. }
  270. auto it = _pendingWritesCallbacks.find(largestPendingBatchId);
  271. if (it != _pendingWritesCallbacks.end()) {
  272. it->second.push_back(std::move(callback));
  273. } else {
  274. _pendingWritesCallbacks.emplace(largestPendingBatchId,
  275. std::vector<StatusCallback>({std::move(callback)}));
  276. }
  277. }
  278. /** Triggers callbacks waiting for this batch id to get acknowledged by server, if there are any. */
  279. - (void)triggerPendingWriteCallbacksWithBatchId:(int)batchId {
  280. auto it = _pendingWritesCallbacks.find(batchId);
  281. if (it != _pendingWritesCallbacks.end()) {
  282. for (const auto &callback : it->second) {
  283. callback(Status::OK());
  284. }
  285. _pendingWritesCallbacks.erase(it);
  286. }
  287. }
  288. - (void)failOutstandingPendingWritesAwaitingCallbacks:(absl::string_view)errorMessage {
  289. for (const auto &entry : _pendingWritesCallbacks) {
  290. for (const auto &callback : entry.second) {
  291. callback(Status(Error::Cancelled, errorMessage));
  292. }
  293. }
  294. _pendingWritesCallbacks.clear();
  295. }
  296. - (void)addMutationCompletionBlock:(FSTVoidErrorBlock)completion batchID:(BatchId)batchID {
  297. NSMutableDictionary<NSNumber *, FSTVoidErrorBlock> *completionBlocks =
  298. _mutationCompletionBlocks[_currentUser];
  299. if (!completionBlocks) {
  300. completionBlocks = [NSMutableDictionary dictionary];
  301. _mutationCompletionBlocks[_currentUser] = completionBlocks;
  302. }
  303. [completionBlocks setObject:completion forKey:@(batchID)];
  304. }
  305. /**
  306. * Takes an updateCallback in which a set of reads and writes can be performed atomically. In the
  307. * updateCallback, user code can read and write values using a transaction object. After the
  308. * updateCallback, all changes will be committed. If a retryable error occurs (for example, some
  309. * other client has changed any of the data referenced), then the updateCallback will be called
  310. * again after a backoff. If the updateCallback still fails after all retries, then the transaction
  311. * will be rejected.
  312. *
  313. * The transaction object passed to the updateCallback contains methods for accessing documents
  314. * and collections. Unlike other firestore access, data accessed with the transaction will not
  315. * reflect local changes that have not been committed. For this reason, it is required that all
  316. * reads are performed before any writes. Transactions must be performed while online.
  317. */
  318. - (void)transactionWithRetries:(int)retries
  319. workerQueue:(const std::shared_ptr<AsyncQueue> &)workerQueue
  320. updateCallback:(core::TransactionUpdateCallback)updateCallback
  321. resultCallback:(core::TransactionResultCallback)resultCallback {
  322. workerQueue->VerifyIsCurrentQueue();
  323. HARD_ASSERT(retries >= 0, "Got negative number of retries for transaction");
  324. // Allocate a shared_ptr so that the TransactionRunner can outlive this frame.
  325. auto runner = std::make_shared<TransactionRunner>(workerQueue, _remoteStore, updateCallback,
  326. resultCallback);
  327. runner->Run();
  328. }
  329. - (void)applyRemoteEvent:(const RemoteEvent &)remoteEvent {
  330. [self assertCallbackExistsForSelector:_cmd];
  331. // Update `receivedDocument` as appropriate for any limbo targets.
  332. for (const auto &entry : remoteEvent.target_changes()) {
  333. TargetId targetID = entry.first;
  334. const TargetChange &change = entry.second;
  335. const auto iter = _limboResolutionsByTarget.find(targetID);
  336. if (iter != _limboResolutionsByTarget.end()) {
  337. LimboResolution &limboResolution = iter->second;
  338. // Since this is a limbo resolution lookup, it's for a single document and it could be
  339. // added, modified, or removed, but not a combination.
  340. HARD_ASSERT(change.added_documents().size() + change.modified_documents().size() +
  341. change.removed_documents().size() <=
  342. 1,
  343. "Limbo resolution for single document contains multiple changes.");
  344. if (change.added_documents().size() > 0) {
  345. limboResolution.document_received = true;
  346. } else if (change.modified_documents().size() > 0) {
  347. HARD_ASSERT(limboResolution.document_received,
  348. "Received change for limbo target document without add.");
  349. } else if (change.removed_documents().size() > 0) {
  350. HARD_ASSERT(limboResolution.document_received,
  351. "Received remove for limbo target document without add.");
  352. limboResolution.document_received = false;
  353. } else {
  354. // This was probably just a CURRENT targetChange or similar.
  355. }
  356. }
  357. }
  358. MaybeDocumentMap changes = [self.localStore applyRemoteEvent:remoteEvent];
  359. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:remoteEvent];
  360. }
  361. - (void)applyChangedOnlineState:(OnlineState)onlineState {
  362. [self assertCallbackExistsForSelector:_cmd];
  363. std::vector<ViewSnapshot> newViewSnapshots;
  364. for (const auto &entry : _queryViewsByQuery) {
  365. FSTQueryView *queryView = entry.second;
  366. ViewChange viewChange = queryView.view->ApplyOnlineStateChange(onlineState);
  367. HARD_ASSERT(viewChange.limbo_changes().empty(),
  368. "OnlineState should not affect limbo documents.");
  369. if (viewChange.snapshot().has_value()) {
  370. newViewSnapshots.push_back(*std::move(viewChange).snapshot());
  371. }
  372. }
  373. _callback->OnViewSnapshots(std::move(newViewSnapshots));
  374. _callback->HandleOnlineStateChange(onlineState);
  375. }
  376. - (void)rejectListenWithTargetID:(const TargetId)targetID error:(NSError *)error {
  377. [self assertCallbackExistsForSelector:_cmd];
  378. const auto iter = _limboResolutionsByTarget.find(targetID);
  379. if (iter != _limboResolutionsByTarget.end()) {
  380. const DocumentKey limboKey = iter->second.key;
  381. // Since this query failed, we won't want to manually unlisten to it.
  382. // So go ahead and remove it from bookkeeping.
  383. _limboTargetsByKey.erase(limboKey);
  384. _limboResolutionsByTarget.erase(targetID);
  385. // TODO(dimond): Retry on transient errors?
  386. // It's a limbo doc. Create a synthetic event saying it was deleted. This is kind of a hack.
  387. // Ideally, we would have a method in the local store to purge a document. However, it would
  388. // be tricky to keep all of the local store's invariants with another method.
  389. NoDocument doc(limboKey, SnapshotVersion::None(), /* has_committed_mutations= */ false);
  390. DocumentKeySet limboDocuments = DocumentKeySet{limboKey};
  391. RemoteEvent event{SnapshotVersion::None(), /*target_changes=*/{}, /*target_mismatches=*/{},
  392. /*document_updates=*/{{limboKey, doc}}, std::move(limboDocuments)};
  393. [self applyRemoteEvent:event];
  394. } else {
  395. auto found = _queryViewsByTarget.find(targetID);
  396. HARD_ASSERT(found != _queryViewsByTarget.end(), "Unknown targetId: %s", targetID);
  397. FSTQueryView *queryView = found->second;
  398. const Query &query = queryView.query;
  399. [self.localStore releaseQuery:query];
  400. [self removeAndCleanupQuery:queryView];
  401. if ([self errorIsInteresting:error]) {
  402. LOG_WARN("Listen for query at %s failed: %s", query.path().CanonicalString(),
  403. error.localizedDescription);
  404. }
  405. _callback->OnError(query, Status::FromNSError(error));
  406. }
  407. }
  408. - (void)applySuccessfulWriteWithResult:(const MutationBatchResult &)batchResult {
  409. [self assertCallbackExistsForSelector:_cmd];
  410. // The local store may or may not be able to apply the write result and raise events immediately
  411. // (depending on whether the watcher is caught up), so we raise user callbacks first so that they
  412. // consistently happen before listen events.
  413. [self processUserCallbacksForBatchID:batchResult.batch().batch_id() error:nil];
  414. [self triggerPendingWriteCallbacksWithBatchId:batchResult.batch().batch_id()];
  415. MaybeDocumentMap changes = [self.localStore acknowledgeBatchWithResult:batchResult];
  416. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];
  417. }
  418. - (void)rejectFailedWriteWithBatchID:(BatchId)batchID error:(NSError *)error {
  419. [self assertCallbackExistsForSelector:_cmd];
  420. MaybeDocumentMap changes = [self.localStore rejectBatchID:batchID];
  421. if (!changes.empty() && [self errorIsInteresting:error]) {
  422. const DocumentKey &minKey = changes.min()->first;
  423. LOG_WARN("Write at %s failed: %s", minKey.ToString(), error.localizedDescription);
  424. }
  425. // The local store may or may not be able to apply the write result and raise events immediately
  426. // (depending on whether the watcher is caught up), so we raise user callbacks first so that they
  427. // consistently happen before listen events.
  428. [self processUserCallbacksForBatchID:batchID error:error];
  429. [self triggerPendingWriteCallbacksWithBatchId:batchID];
  430. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];
  431. }
  432. - (void)processUserCallbacksForBatchID:(BatchId)batchID error:(NSError *_Nullable)error {
  433. NSMutableDictionary<NSNumber *, FSTVoidErrorBlock> *completionBlocks =
  434. _mutationCompletionBlocks[_currentUser];
  435. // NOTE: Mutations restored from persistence won't have completion blocks, so it's okay for
  436. // this (or the completion below) to be nil.
  437. if (completionBlocks) {
  438. NSNumber *boxedBatchID = @(batchID);
  439. FSTVoidErrorBlock completion = completionBlocks[boxedBatchID];
  440. if (completion) {
  441. completion(error);
  442. [completionBlocks removeObjectForKey:boxedBatchID];
  443. }
  444. }
  445. }
  446. - (void)assertCallbackExistsForSelector:(SEL)methodSelector {
  447. HARD_ASSERT(_callback, "Tried to call '%s' before callback was registered.",
  448. NSStringFromSelector(methodSelector));
  449. }
  450. - (void)removeAndCleanupQuery:(FSTQueryView *)queryView {
  451. _queryViewsByQuery.erase(queryView.query);
  452. _queryViewsByTarget.erase(queryView.targetID);
  453. DocumentKeySet limboKeys = _limboDocumentRefs.ReferencedKeys(queryView.targetID);
  454. _limboDocumentRefs.RemoveReferences(queryView.targetID);
  455. for (const DocumentKey &key : limboKeys) {
  456. if (!_limboDocumentRefs.ContainsKey(key)) {
  457. // We removed the last reference for this key.
  458. [self removeLimboTargetForKey:key];
  459. }
  460. }
  461. }
  462. /**
  463. * Computes a new snapshot from the changes and calls the registered callback with the new snapshot.
  464. */
  465. - (void)emitNewSnapshotsAndNotifyLocalStoreWithChanges:(const MaybeDocumentMap &)changes
  466. remoteEvent:(const absl::optional<RemoteEvent> &)
  467. maybeRemoteEvent {
  468. std::vector<ViewSnapshot> newSnapshots;
  469. std::vector<LocalViewChanges> documentChangesInAllViews;
  470. for (const auto &entry : _queryViewsByQuery) {
  471. FSTQueryView *queryView = entry.second;
  472. const View &view = *queryView.view;
  473. ViewDocumentChanges viewDocChanges = view.ComputeDocumentChanges(changes);
  474. if (viewDocChanges.needs_refill()) {
  475. // The query has a limit and some docs were removed/updated, so we need to re-run the
  476. // query against the local store to make sure we didn't lose any good docs that had been
  477. // past the limit.
  478. DocumentMap docs = [self.localStore executeQuery:queryView.query];
  479. viewDocChanges = view.ComputeDocumentChanges(docs.underlying_map(), viewDocChanges);
  480. }
  481. absl::optional<TargetChange> targetChange;
  482. if (maybeRemoteEvent.has_value()) {
  483. const RemoteEvent &remoteEvent = maybeRemoteEvent.value();
  484. auto it = remoteEvent.target_changes().find(queryView.targetID);
  485. if (it != remoteEvent.target_changes().end()) {
  486. targetChange = it->second;
  487. }
  488. }
  489. ViewChange viewChange = queryView.view->ApplyChanges(viewDocChanges, targetChange);
  490. [self updateTrackedLimboDocumentsWithChanges:viewChange.limbo_changes()
  491. targetID:queryView.targetID];
  492. if (viewChange.snapshot().has_value()) {
  493. newSnapshots.push_back(*viewChange.snapshot());
  494. LocalViewChanges docChanges =
  495. LocalViewChanges::FromViewSnapshot(*viewChange.snapshot(), queryView.targetID);
  496. documentChangesInAllViews.push_back(std::move(docChanges));
  497. }
  498. }
  499. _callback->OnViewSnapshots(std::move(newSnapshots));
  500. [self.localStore notifyLocalViewChanges:documentChangesInAllViews];
  501. }
  502. /** Updates the limbo document state for the given targetID. */
  503. - (void)updateTrackedLimboDocumentsWithChanges:
  504. (const std::vector<LimboDocumentChange> &)limboChanges
  505. targetID:(TargetId)targetID {
  506. for (const LimboDocumentChange &limboChange : limboChanges) {
  507. switch (limboChange.type()) {
  508. case LimboDocumentChange::Type::Added:
  509. _limboDocumentRefs.AddReference(limboChange.key(), targetID);
  510. [self trackLimboChange:limboChange];
  511. break;
  512. case LimboDocumentChange::Type::Removed:
  513. LOG_DEBUG("Document no longer in limbo: %s", limboChange.key().ToString());
  514. _limboDocumentRefs.RemoveReference(limboChange.key(), targetID);
  515. if (!_limboDocumentRefs.ContainsKey(limboChange.key())) {
  516. // We removed the last reference for this key
  517. [self removeLimboTargetForKey:limboChange.key()];
  518. }
  519. break;
  520. default:
  521. HARD_FAIL("Unknown limbo change type: %s", limboChange.type());
  522. }
  523. }
  524. }
  525. - (void)trackLimboChange:(const LimboDocumentChange &)limboChange {
  526. const DocumentKey &key = limboChange.key();
  527. if (_limboTargetsByKey.find(key) == _limboTargetsByKey.end()) {
  528. LOG_DEBUG("New document in limbo: %s", key.ToString());
  529. TargetId limboTargetID = _targetIdGenerator.NextId();
  530. Query query(key.path());
  531. QueryData queryData(std::move(query), limboTargetID, kIrrelevantSequenceNumber,
  532. QueryPurpose::LimboResolution);
  533. _limboResolutionsByTarget.emplace(limboTargetID, LimboResolution{key});
  534. _remoteStore->Listen(queryData);
  535. _limboTargetsByKey[key] = limboTargetID;
  536. }
  537. }
  538. - (void)removeLimboTargetForKey:(const DocumentKey &)key {
  539. const auto iter = _limboTargetsByKey.find(key);
  540. if (iter == _limboTargetsByKey.end()) {
  541. // This target already got removed, because the query failed.
  542. return;
  543. }
  544. TargetId limboTargetID = iter->second;
  545. _remoteStore->StopListening(limboTargetID);
  546. _limboTargetsByKey.erase(key);
  547. _limboResolutionsByTarget.erase(limboTargetID);
  548. }
  549. // Used for testing
  550. - (std::map<DocumentKey, TargetId>)currentLimboDocuments {
  551. // Return defensive copy
  552. return _limboTargetsByKey;
  553. }
  554. - (void)credentialDidChangeWithUser:(const firebase::firestore::auth::User &)user {
  555. BOOL userChanged = (_currentUser != user);
  556. _currentUser = user;
  557. if (userChanged) {
  558. // Fails callbacks waiting for pending writes requested by previous user.
  559. [self failOutstandingPendingWritesAwaitingCallbacks:
  560. "'waitForPendingWrites' callback is cancelled due to a user change."];
  561. // Notify local store and emit any resulting events from swapping out the mutation queue.
  562. MaybeDocumentMap changes = [self.localStore userDidChange:user];
  563. [self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];
  564. }
  565. // Notify remote store so it can restart its streams.
  566. _remoteStore->HandleCredentialChange();
  567. }
  568. - (DocumentKeySet)remoteKeysForTarget:(TargetId)targetId {
  569. const auto iter = _limboResolutionsByTarget.find(targetId);
  570. if (iter != _limboResolutionsByTarget.end() && iter->second.document_received) {
  571. return DocumentKeySet{iter->second.key};
  572. } else {
  573. auto found = _queryViewsByTarget.find(targetId);
  574. FSTQueryView *queryView = found != _queryViewsByTarget.end() ? found->second : nil;
  575. return queryView ? queryView.view->synced_documents() : DocumentKeySet{};
  576. }
  577. }
  578. /**
  579. * Decides if the error likely represents a developer mistake such as forgetting to create an index
  580. * or permission denied. Used to decide whether an error is worth automatically logging as a
  581. * warning.
  582. */
  583. - (BOOL)errorIsInteresting:(NSError *)error {
  584. if (error.domain == FIRFirestoreErrorDomain) {
  585. if (error.code == FIRFirestoreErrorCodeFailedPrecondition &&
  586. [error.localizedDescription containsString:@"requires an index"]) {
  587. return YES;
  588. } else if (error.code == FIRFirestoreErrorCodePermissionDenied) {
  589. return YES;
  590. }
  591. }
  592. return NO;
  593. }
  594. - (BOOL)isRetryableTransactionError:(const Status &)error {
  595. // In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
  596. // non-matching document versions with ABORTED. These errors should be retried.
  597. Error code = error.code();
  598. return code == Error::Aborted || code == Error::FailedPrecondition ||
  599. !Datastore::IsPermanentError(error);
  600. }
  601. @end
  602. NS_ASSUME_NONNULL_END