FSTLocalStore.mm 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Source/Local/FSTLocalStore.h"
  17. #include <memory>
  18. #include <set>
  19. #include <unordered_map>
  20. #include <utility>
  21. #include <vector>
  22. #import "FIRTimestamp.h"
  23. #import "Firestore/Source/Local/FSTPersistence.h"
  24. #include "Firestore/core/include/firebase/firestore/timestamp.h"
  25. #include "Firestore/core/src/firebase/firestore/auth/user.h"
  26. #include "Firestore/core/src/firebase/firestore/core/target_id_generator.h"
  27. #include "Firestore/core/src/firebase/firestore/immutable/sorted_set.h"
  28. #include "Firestore/core/src/firebase/firestore/local/local_documents_view.h"
  29. #include "Firestore/core/src/firebase/firestore/local/local_view_changes.h"
  30. #include "Firestore/core/src/firebase/firestore/local/local_write_result.h"
  31. #include "Firestore/core/src/firebase/firestore/local/mutation_queue.h"
  32. #include "Firestore/core/src/firebase/firestore/local/query_cache.h"
  33. #include "Firestore/core/src/firebase/firestore/local/query_data.h"
  34. #include "Firestore/core/src/firebase/firestore/local/reference_set.h"
  35. #include "Firestore/core/src/firebase/firestore/local/remote_document_cache.h"
  36. #include "Firestore/core/src/firebase/firestore/model/document_key_set.h"
  37. #include "Firestore/core/src/firebase/firestore/model/document_map.h"
  38. #include "Firestore/core/src/firebase/firestore/model/mutation_batch.h"
  39. #include "Firestore/core/src/firebase/firestore/model/mutation_batch_result.h"
  40. #include "Firestore/core/src/firebase/firestore/model/patch_mutation.h"
  41. #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
  42. #include "Firestore/core/src/firebase/firestore/nanopb/nanopb_util.h"
  43. #include "Firestore/core/src/firebase/firestore/remote/remote_event.h"
  44. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  45. #include "Firestore/core/src/firebase/firestore/util/log.h"
  46. #include "Firestore/core/src/firebase/firestore/util/to_string.h"
  47. #include "absl/memory/memory.h"
  48. #include "absl/types/optional.h"
  49. namespace util = firebase::firestore::util;
  50. using firebase::Timestamp;
  51. using firebase::firestore::auth::User;
  52. using firebase::firestore::core::Query;
  53. using firebase::firestore::core::TargetIdGenerator;
  54. using firebase::firestore::local::LocalDocumentsView;
  55. using firebase::firestore::local::LocalViewChanges;
  56. using firebase::firestore::local::LocalWriteResult;
  57. using firebase::firestore::local::LruResults;
  58. using firebase::firestore::local::MutationQueue;
  59. using firebase::firestore::local::QueryCache;
  60. using firebase::firestore::local::QueryData;
  61. using firebase::firestore::local::QueryPurpose;
  62. using firebase::firestore::local::ReferenceSet;
  63. using firebase::firestore::local::RemoteDocumentCache;
  64. using firebase::firestore::model::BatchId;
  65. using firebase::firestore::model::DocumentKey;
  66. using firebase::firestore::model::DocumentKeySet;
  67. using firebase::firestore::model::DocumentMap;
  68. using firebase::firestore::model::DocumentVersionMap;
  69. using firebase::firestore::model::FieldMask;
  70. using firebase::firestore::model::FieldPath;
  71. using firebase::firestore::model::ListenSequenceNumber;
  72. using firebase::firestore::model::MaybeDocument;
  73. using firebase::firestore::model::MaybeDocumentMap;
  74. using firebase::firestore::model::Mutation;
  75. using firebase::firestore::model::MutationBatch;
  76. using firebase::firestore::model::MutationBatchResult;
  77. using firebase::firestore::model::ObjectValue;
  78. using firebase::firestore::model::OptionalMaybeDocumentMap;
  79. using firebase::firestore::model::PatchMutation;
  80. using firebase::firestore::model::Precondition;
  81. using firebase::firestore::model::SnapshotVersion;
  82. using firebase::firestore::model::TargetId;
  83. using firebase::firestore::nanopb::ByteString;
  84. using firebase::firestore::remote::RemoteEvent;
  85. using firebase::firestore::remote::TargetChange;
  86. NS_ASSUME_NONNULL_BEGIN
  87. /**
  88. * The maximum time to leave a resume token buffered without writing it out. This value is
  89. * arbitrary: it's long enough to avoid several writes (possibly indefinitely if updates come more
  90. * frequently than this) but short enough that restarting after crashing will still have a pretty
  91. * recent resume token.
  92. */
  93. static const int64_t kResumeTokenMaxAgeSeconds = 5 * 60; // 5 minutes
  94. @interface FSTLocalStore ()
  95. /** Manages our in-memory or durable persistence. */
  96. @property(nonatomic, strong, readonly) id<FSTPersistence> persistence;
  97. /** Maps a query to the data about that query. */
  98. @property(nonatomic) QueryCache *queryCache;
  99. @end
  100. @implementation FSTLocalStore {
  101. /** Used to generate targetIDs for queries tracked locally. */
  102. TargetIdGenerator _targetIDGenerator;
  103. /** The set of all cached remote documents. */
  104. RemoteDocumentCache *_remoteDocumentCache;
  105. QueryCache *_queryCache;
  106. /** The set of all mutations that have been sent but not yet been applied to the backend. */
  107. MutationQueue *_mutationQueue;
  108. /** The "local" view of all documents (layering mutationQueue on top of remoteDocumentCache). */
  109. std::unique_ptr<LocalDocumentsView> _localDocuments;
  110. /** The set of document references maintained by any local views. */
  111. ReferenceSet _localViewReferences;
  112. /** Maps a targetID to data about its query. */
  113. std::unordered_map<TargetId, QueryData> _targetIDs;
  114. }
  115. - (instancetype)initWithPersistence:(id<FSTPersistence>)persistence
  116. initialUser:(const User &)initialUser {
  117. if (self = [super init]) {
  118. _persistence = persistence;
  119. _mutationQueue = [persistence mutationQueueForUser:initialUser];
  120. _remoteDocumentCache = [persistence remoteDocumentCache];
  121. _queryCache = [persistence queryCache];
  122. _localDocuments = absl::make_unique<LocalDocumentsView>(_remoteDocumentCache, _mutationQueue,
  123. [_persistence indexManager]);
  124. [_persistence.referenceDelegate addInMemoryPins:&_localViewReferences];
  125. _targetIDGenerator = TargetIdGenerator::QueryCacheTargetIdGenerator(0);
  126. }
  127. return self;
  128. }
  129. - (void)start {
  130. [self startMutationQueue];
  131. TargetId targetID = _queryCache->highest_target_id();
  132. _targetIDGenerator = TargetIdGenerator::QueryCacheTargetIdGenerator(targetID);
  133. }
  134. - (void)startMutationQueue {
  135. self.persistence.run("Start MutationQueue", [&]() { _mutationQueue->Start(); });
  136. }
  137. - (MaybeDocumentMap)userDidChange:(const User &)user {
  138. // Swap out the mutation queue, grabbing the pending mutation batches before and after.
  139. std::vector<MutationBatch> oldBatches =
  140. self.persistence.run("OldBatches", [&] { return _mutationQueue->AllMutationBatches(); });
  141. // The old one has a reference to the mutation queue, so nil it out first.
  142. _localDocuments.reset();
  143. _mutationQueue = [self.persistence mutationQueueForUser:user];
  144. [self startMutationQueue];
  145. return self.persistence.run("NewBatches", [&] {
  146. std::vector<MutationBatch> newBatches = _mutationQueue->AllMutationBatches();
  147. // Recreate our LocalDocumentsView using the new MutationQueue.
  148. _localDocuments = absl::make_unique<LocalDocumentsView>(_remoteDocumentCache, _mutationQueue,
  149. [_persistence indexManager]);
  150. // Union the old/new changed keys.
  151. DocumentKeySet changedKeys;
  152. for (const std::vector<MutationBatch> *batches : {&oldBatches, &newBatches}) {
  153. for (const MutationBatch &batch : *batches) {
  154. for (const Mutation &mutation : batch.mutations()) {
  155. changedKeys = changedKeys.insert(mutation.key());
  156. }
  157. }
  158. }
  159. // Return the set of all (potentially) changed documents as the result of the user change.
  160. return _localDocuments->GetDocuments(changedKeys);
  161. });
  162. }
  163. - (LocalWriteResult)locallyWriteMutations:(std::vector<Mutation> &&)mutations {
  164. Timestamp localWriteTime = Timestamp::Now();
  165. DocumentKeySet keys;
  166. for (const Mutation &mutation : mutations) {
  167. keys = keys.insert(mutation.key());
  168. }
  169. return self.persistence.run("Locally write mutations", [&] {
  170. // Load and apply all existing mutations. This lets us compute the current base state for
  171. // all non-idempotent transforms before applying any additional user-provided writes.
  172. MaybeDocumentMap existingDocuments = _localDocuments->GetDocuments(keys);
  173. // For non-idempotent mutations (such as `FieldValue.increment()`), we record the base
  174. // state in a separate patch mutation. This is later used to guarantee consistent values
  175. // and prevents flicker even if the backend sends us an update that already includes our
  176. // transform.
  177. std::vector<Mutation> baseMutations;
  178. for (const Mutation &mutation : mutations) {
  179. absl::optional<MaybeDocument> base_document = existingDocuments.get(mutation.key());
  180. absl::optional<ObjectValue> base_value = mutation.ExtractBaseValue(base_document);
  181. if (base_value) {
  182. // NOTE: The base state should only be applied if there's some existing document to
  183. // override, so use a Precondition of exists=true
  184. baseMutations.push_back(PatchMutation(
  185. mutation.key(), *base_value, base_value->ToFieldMask(), Precondition::Exists(true)));
  186. }
  187. }
  188. MutationBatch batch = _mutationQueue->AddMutationBatch(localWriteTime, std::move(baseMutations),
  189. std::move(mutations));
  190. MaybeDocumentMap changedDocuments = batch.ApplyToLocalDocumentSet(existingDocuments);
  191. return LocalWriteResult{batch.batch_id(), std::move(changedDocuments)};
  192. });
  193. }
  194. - (MaybeDocumentMap)acknowledgeBatchWithResult:(const MutationBatchResult &)batchResult {
  195. return self.persistence.run("Acknowledge batch", [&] {
  196. const MutationBatch &batch = batchResult.batch();
  197. _mutationQueue->AcknowledgeBatch(batch, batchResult.stream_token());
  198. [self applyBatchResult:batchResult];
  199. _mutationQueue->PerformConsistencyCheck();
  200. return _localDocuments->GetDocuments(batch.keys());
  201. });
  202. }
  203. - (MaybeDocumentMap)rejectBatchID:(BatchId)batchID {
  204. return self.persistence.run("Reject batch", [&] {
  205. absl::optional<MutationBatch> toReject = _mutationQueue->LookupMutationBatch(batchID);
  206. HARD_ASSERT(toReject.has_value(), "Attempt to reject nonexistent batch!");
  207. _mutationQueue->RemoveMutationBatch(*toReject);
  208. _mutationQueue->PerformConsistencyCheck();
  209. return _localDocuments->GetDocuments(toReject->keys());
  210. });
  211. }
  212. - (ByteString)lastStreamToken {
  213. return _mutationQueue->GetLastStreamToken();
  214. }
  215. - (void)setLastStreamToken:(const ByteString &)streamToken {
  216. self.persistence.run("Set stream token",
  217. [&]() { _mutationQueue->SetLastStreamToken(streamToken); });
  218. }
  219. - (const SnapshotVersion &)lastRemoteSnapshotVersion {
  220. return self.queryCache->GetLastRemoteSnapshotVersion();
  221. }
  222. - (MaybeDocumentMap)applyRemoteEvent:(const RemoteEvent &)remoteEvent {
  223. return self.persistence.run("Apply remote event", [&] {
  224. // TODO(gsoltis): move the sequence number into the reference delegate.
  225. ListenSequenceNumber sequenceNumber = self.persistence.currentSequenceNumber;
  226. DocumentKeySet authoritativeUpdates;
  227. for (const auto &entry : remoteEvent.target_changes()) {
  228. TargetId targetID = entry.first;
  229. const TargetChange &change = entry.second;
  230. // Do not ref/unref unassigned targetIDs - it may lead to leaks.
  231. auto found = _targetIDs.find(targetID);
  232. if (found == _targetIDs.end()) {
  233. continue;
  234. }
  235. QueryData queryData = found->second;
  236. // When a global snapshot contains updates (either add or modify) we can completely trust
  237. // these updates as authoritative and blindly apply them to our cache (as a defensive measure
  238. // to promote self-healing in the unfortunate case that our cache is ever somehow corrupted /
  239. // out-of-sync).
  240. //
  241. // If the document is only updated while removing it from a target then watch isn't obligated
  242. // to send the absolute latest version: it can send the first version that caused the document
  243. // not to match.
  244. for (const DocumentKey &key : change.added_documents()) {
  245. authoritativeUpdates = authoritativeUpdates.insert(key);
  246. }
  247. for (const DocumentKey &key : change.modified_documents()) {
  248. authoritativeUpdates = authoritativeUpdates.insert(key);
  249. }
  250. _queryCache->RemoveMatchingKeys(change.removed_documents(), targetID);
  251. _queryCache->AddMatchingKeys(change.added_documents(), targetID);
  252. // Update the resume token if the change includes one. Don't clear any preexisting value.
  253. // Bump the sequence number as well, so that documents being removed now are ordered later
  254. // than documents that were previously removed from this target.
  255. const ByteString &resumeToken = change.resume_token();
  256. if (!resumeToken.empty()) {
  257. QueryData oldQueryData = queryData;
  258. queryData = queryData.Copy(remoteEvent.snapshot_version(), resumeToken, sequenceNumber);
  259. _targetIDs[targetID] = queryData;
  260. if ([self shouldPersistQueryData:queryData oldQueryData:oldQueryData change:change]) {
  261. _queryCache->UpdateTarget(queryData);
  262. }
  263. }
  264. }
  265. OptionalMaybeDocumentMap changedDocs;
  266. const DocumentKeySet &limboDocuments = remoteEvent.limbo_document_changes();
  267. DocumentKeySet updatedKeys;
  268. for (const auto &kv : remoteEvent.document_updates()) {
  269. updatedKeys = updatedKeys.insert(kv.first);
  270. }
  271. // Each loop iteration only affects its "own" doc, so it's safe to get all the remote
  272. // documents in advance in a single call.
  273. OptionalMaybeDocumentMap existingDocs = _remoteDocumentCache->GetAll(updatedKeys);
  274. for (const auto &kv : remoteEvent.document_updates()) {
  275. const DocumentKey &key = kv.first;
  276. const MaybeDocument &doc = kv.second;
  277. absl::optional<MaybeDocument> existingDoc;
  278. auto foundExisting = existingDocs.get(key);
  279. if (foundExisting) {
  280. existingDoc = *foundExisting;
  281. }
  282. if (!existingDoc ||
  283. (authoritativeUpdates.contains(doc.key()) && !existingDoc->has_pending_writes()) ||
  284. doc.version() >= existingDoc->version()) {
  285. // If a document update isn't authoritative, make sure we don't apply an old document
  286. // version to the remote cache.
  287. _remoteDocumentCache->Add(doc);
  288. changedDocs = changedDocs.insert(key, doc);
  289. } else if (doc.type() == MaybeDocument::Type::NoDocument &&
  290. doc.version() == SnapshotVersion::None()) {
  291. // NoDocuments with SnapshotVersion.MIN are used in manufactured events (e.g. in the case
  292. // of a limbo document resolution failing). We remove these documents from cache since we
  293. // lost access.
  294. _remoteDocumentCache->Remove(key);
  295. changedDocs = changedDocs.insert(key, doc);
  296. } else {
  297. LOG_DEBUG("FSTLocalStore Ignoring outdated watch update for %s. "
  298. "Current version: %s Watch version: %s",
  299. key.ToString(), existingDoc->version().ToString(), doc.version().ToString());
  300. }
  301. // If this was a limbo resolution, make sure we mark when it was accessed.
  302. if (limboDocuments.contains(key)) {
  303. [self.persistence.referenceDelegate limboDocumentUpdated:key];
  304. }
  305. }
  306. // HACK: The only reason we allow omitting snapshot version is so we can synthesize remote
  307. // events when we get permission denied errors while trying to resolve the state of a locally
  308. // cached document that is in limbo.
  309. const SnapshotVersion &lastRemoteVersion = _queryCache->GetLastRemoteSnapshotVersion();
  310. const SnapshotVersion &remoteVersion = remoteEvent.snapshot_version();
  311. if (remoteVersion != SnapshotVersion::None()) {
  312. HARD_ASSERT(remoteVersion >= lastRemoteVersion,
  313. "Watch stream reverted to previous snapshot?? (%s < %s)",
  314. remoteVersion.ToString(), lastRemoteVersion.ToString());
  315. _queryCache->SetLastRemoteSnapshotVersion(remoteVersion);
  316. }
  317. return _localDocuments->GetLocalViewOfDocuments(changedDocs);
  318. });
  319. }
  320. /**
  321. * Returns YES if the newQueryData should be persisted during an update of an active target.
  322. * QueryData should always be persisted when a target is being released and should not call this
  323. * function.
  324. *
  325. * While the target is active, QueryData updates can be omitted when nothing about the target has
  326. * changed except metadata like the resume token or snapshot version. Occasionally it's worth the
  327. * extra write to prevent these values from getting too stale after a crash, but this doesn't have
  328. * to be too frequent.
  329. */
  330. - (BOOL)shouldPersistQueryData:(const QueryData &)newQueryData
  331. oldQueryData:(const QueryData &)oldQueryData
  332. change:(const TargetChange &)change {
  333. // Avoid clearing any existing value
  334. if (newQueryData.resume_token().empty()) return NO;
  335. // Any resume token is interesting if there isn't one already.
  336. if (oldQueryData.resume_token().empty()) return YES;
  337. // Don't allow resume token changes to be buffered indefinitely. This allows us to be reasonably
  338. // up-to-date after a crash and avoids needing to loop over all active queries on shutdown.
  339. // Especially in the browser we may not get time to do anything interesting while the current
  340. // tab is closing.
  341. int64_t newSeconds = newQueryData.snapshot_version().timestamp().seconds();
  342. int64_t oldSeconds = oldQueryData.snapshot_version().timestamp().seconds();
  343. int64_t timeDelta = newSeconds - oldSeconds;
  344. if (timeDelta >= kResumeTokenMaxAgeSeconds) return YES;
  345. // Otherwise if the only thing that has changed about a target is its resume token then it's not
  346. // worth persisting. Note that the RemoteStore keeps an in-memory view of the currently active
  347. // targets which includes the current resume token, so stream failure or user changes will still
  348. // use an up-to-date resume token regardless of what we do here.
  349. size_t changes = change.added_documents().size() + change.modified_documents().size() +
  350. change.removed_documents().size();
  351. return changes > 0;
  352. }
  353. - (void)notifyLocalViewChanges:(const std::vector<LocalViewChanges> &)viewChanges {
  354. self.persistence.run("NotifyLocalViewChanges", [&]() {
  355. for (const LocalViewChanges &viewChange : viewChanges) {
  356. for (const DocumentKey &key : viewChange.removed_keys()) {
  357. [self->_persistence.referenceDelegate removeReference:key];
  358. }
  359. _localViewReferences.AddReferences(viewChange.added_keys(), viewChange.target_id());
  360. _localViewReferences.AddReferences(viewChange.removed_keys(), viewChange.target_id());
  361. }
  362. });
  363. }
  364. - (absl::optional<MutationBatch>)nextMutationBatchAfterBatchID:(BatchId)batchID {
  365. return self.persistence.run("NextMutationBatchAfterBatchID", [&] {
  366. return _mutationQueue->NextMutationBatchAfterBatchId(batchID);
  367. });
  368. }
  369. - (absl::optional<MaybeDocument>)readDocument:(const DocumentKey &)key {
  370. return self.persistence.run("ReadDocument", [&] { return _localDocuments->GetDocument(key); });
  371. }
  372. - (model::BatchId)getHighestUnacknowledgedBatchId {
  373. return self.persistence.run("getHighestUnacknowledgedBatchId",
  374. [&]() { return _mutationQueue->GetHighestUnacknowledgedBatchId(); });
  375. }
  376. - (QueryData)allocateQuery:(Query)query {
  377. QueryData queryData = self.persistence.run("Allocate query", [&] {
  378. absl::optional<QueryData> cached = _queryCache->GetTarget(query);
  379. // TODO(mcg): freshen last accessed date if cached exists?
  380. if (!cached) {
  381. cached = QueryData(query, _targetIDGenerator.NextId(), self.persistence.currentSequenceNumber,
  382. QueryPurpose::Listen);
  383. _queryCache->AddTarget(*cached);
  384. }
  385. return *cached;
  386. });
  387. // Sanity check to ensure that even when resuming a query it's not currently active.
  388. TargetId targetID = queryData.target_id();
  389. HARD_ASSERT(_targetIDs.find(targetID) == _targetIDs.end(),
  390. "Tried to allocate an already allocated query: %s", query.ToString());
  391. _targetIDs[targetID] = queryData;
  392. return queryData;
  393. }
  394. - (void)releaseQuery:(const Query &)query {
  395. self.persistence.run("Release query", [&]() {
  396. absl::optional<QueryData> queryData = _queryCache->GetTarget(query);
  397. HARD_ASSERT(queryData, "Tried to release nonexistent query: %s", query.ToString());
  398. TargetId targetID = queryData->target_id();
  399. auto found = _targetIDs.find(targetID);
  400. if (found != _targetIDs.end()) {
  401. const QueryData &cachedQueryData = found->second;
  402. if (cachedQueryData.snapshot_version() > queryData->snapshot_version()) {
  403. // If we've been avoiding persisting the resumeToken (see shouldPersistQueryData for
  404. // conditions and rationale) we need to persist the token now because there will no
  405. // longer be an in-memory version to fall back on.
  406. queryData = cachedQueryData;
  407. _queryCache->UpdateTarget(*queryData);
  408. }
  409. }
  410. // References for documents sent via Watch are automatically removed when we delete a
  411. // query's target data from the reference delegate. Since this does not remove references
  412. // for locally mutated documents, we have to remove the target associations for these
  413. // documents manually.
  414. DocumentKeySet removed = _localViewReferences.RemoveReferences(targetID);
  415. for (const DocumentKey &key : removed) {
  416. [self.persistence.referenceDelegate removeReference:key];
  417. }
  418. _targetIDs.erase(targetID);
  419. [self.persistence.referenceDelegate removeTarget:*queryData];
  420. });
  421. }
  422. - (DocumentMap)executeQuery:(const Query &)query {
  423. return self.persistence.run("ExecuteQuery",
  424. [&] { return _localDocuments->GetDocumentsMatchingQuery(query); });
  425. }
  426. - (DocumentKeySet)remoteDocumentKeysForTarget:(TargetId)targetID {
  427. return self.persistence.run("RemoteDocumentKeysForTarget",
  428. [&] { return _queryCache->GetMatchingKeys(targetID); });
  429. }
  430. - (void)applyBatchResult:(const MutationBatchResult &)batchResult {
  431. const MutationBatch &batch = batchResult.batch();
  432. DocumentKeySet docKeys = batch.keys();
  433. const DocumentVersionMap &versions = batchResult.doc_versions();
  434. for (const DocumentKey &docKey : docKeys) {
  435. absl::optional<MaybeDocument> remoteDoc = _remoteDocumentCache->Get(docKey);
  436. absl::optional<MaybeDocument> doc = remoteDoc;
  437. auto ackVersionIter = versions.find(docKey);
  438. HARD_ASSERT(ackVersionIter != versions.end(),
  439. "docVersions should contain every doc in the write.");
  440. const SnapshotVersion &ackVersion = ackVersionIter->second;
  441. if (!doc || doc->version() < ackVersion) {
  442. doc = batch.ApplyToRemoteDocument(doc, docKey, batchResult);
  443. if (!doc) {
  444. HARD_ASSERT(!remoteDoc, "Mutation batch %s applied to document %s resulted in nullopt.",
  445. batch.ToString(), util::ToString(remoteDoc));
  446. } else {
  447. _remoteDocumentCache->Add(*doc);
  448. }
  449. }
  450. }
  451. _mutationQueue->RemoveMutationBatch(batch);
  452. }
  453. - (LruResults)collectGarbage:(local::LruGarbageCollector *)garbageCollector {
  454. return self.persistence.run("Collect garbage",
  455. [&] { return garbageCollector->Collect(_targetIDs); });
  456. }
  457. @end
  458. NS_ASSUME_NONNULL_END