FSTLevelDBMutationQueue.mm 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Source/Local/FSTLevelDBMutationQueue.h"
  17. #include <memory>
  18. #include <set>
  19. #include <string>
  20. #include <utility>
  21. #include <vector>
  22. #import "Firestore/Protos/objc/firestore/local/Mutation.pbobjc.h"
  23. #import "Firestore/Source/Core/FSTQuery.h"
  24. #import "Firestore/Source/Local/FSTLevelDB.h"
  25. #import "Firestore/Source/Local/FSTLevelDBKey.h"
  26. #import "Firestore/Source/Local/FSTLocalSerializer.h"
  27. #import "Firestore/Source/Model/FSTMutation.h"
  28. #import "Firestore/Source/Model/FSTMutationBatch.h"
  29. #include "Firestore/core/src/firebase/firestore/auth/user.h"
  30. #include "Firestore/core/src/firebase/firestore/local/leveldb_key.h"
  31. #include "Firestore/core/src/firebase/firestore/local/leveldb_transaction.h"
  32. #include "Firestore/core/src/firebase/firestore/local/leveldb_util.h"
  33. #include "Firestore/core/src/firebase/firestore/model/resource_path.h"
  34. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  35. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  36. #include "Firestore/core/src/firebase/firestore/util/string_util.h"
  37. #include "absl/strings/match.h"
  38. #include "leveldb/db.h"
  39. #include "leveldb/write_batch.h"
  40. NS_ASSUME_NONNULL_BEGIN
  41. namespace util = firebase::firestore::util;
  42. using firebase::firestore::local::LevelDbTransaction;
  43. using Firestore::StringView;
  44. using firebase::firestore::auth::User;
  45. using firebase::firestore::local::DescribeKey;
  46. using firebase::firestore::model::DocumentKey;
  47. using firebase::firestore::model::DocumentKeySet;
  48. using firebase::firestore::model::ResourcePath;
  49. using leveldb::DB;
  50. using leveldb::Iterator;
  51. using leveldb::ReadOptions;
  52. using leveldb::Slice;
  53. using leveldb::Status;
  54. using leveldb::WriteBatch;
  55. using leveldb::WriteOptions;
  56. @interface FSTLevelDBMutationQueue ()
  57. - (instancetype)initWithUserID:(std::string)userID
  58. db:(FSTLevelDB *)db
  59. serializer:(FSTLocalSerializer *)serializer NS_DESIGNATED_INITIALIZER;
  60. /**
  61. * Next value to use when assigning sequential IDs to each mutation batch.
  62. *
  63. * NOTE: There can only be one FSTLevelDBMutationQueue for a given db at a time, hence it is safe
  64. * to track nextBatchID as an instance-level property. Should we ever relax this constraint we'll
  65. * need to revisit this.
  66. */
  67. @property(nonatomic, assign) FSTBatchID nextBatchID;
  68. /** A write-through cache copy of the metadata describing the current queue. */
  69. @property(nonatomic, strong, nullable) FSTPBMutationQueue *metadata;
  70. @property(nonatomic, strong, readonly) FSTLocalSerializer *serializer;
  71. @end
  72. @implementation FSTLevelDBMutationQueue {
  73. FSTLevelDB *_db;
  74. /** The normalized userID (e.g. nil UID => @"" userID) used in our LevelDB keys. */
  75. std::string _userID;
  76. }
  77. + (instancetype)mutationQueueWithUser:(const User &)user
  78. db:(FSTLevelDB *)db
  79. serializer:(FSTLocalSerializer *)serializer {
  80. std::string userID = user.is_authenticated() ? user.uid() : "";
  81. return [[FSTLevelDBMutationQueue alloc] initWithUserID:std::move(userID)
  82. db:db
  83. serializer:serializer];
  84. }
  85. - (instancetype)initWithUserID:(std::string)userID
  86. db:(FSTLevelDB *)db
  87. serializer:(FSTLocalSerializer *)serializer {
  88. if (self = [super init]) {
  89. _userID = std::move(userID);
  90. _db = db;
  91. _serializer = serializer;
  92. }
  93. return self;
  94. }
  95. - (void)start {
  96. FSTBatchID nextBatchID = [FSTLevelDBMutationQueue loadNextBatchIDFromDB:_db.ptr];
  97. // On restart, nextBatchId may end up lower than lastAcknowledgedBatchId since it's computed from
  98. // the queue contents, and there may be no mutations in the queue. In this case, we need to reset
  99. // lastAcknowledgedBatchId (which is safe since the queue must be empty).
  100. std::string key = [self keyForCurrentMutationQueue];
  101. FSTPBMutationQueue *metadata = [self metadataForKey:key];
  102. if (!metadata) {
  103. metadata = [FSTPBMutationQueue message];
  104. // proto3's default value for lastAcknowledgedBatchId is zero, but that would consider the first
  105. // entry in the queue to be acknowledged without that acknowledgement actually happening.
  106. metadata.lastAcknowledgedBatchId = kFSTBatchIDUnknown;
  107. } else {
  108. FSTBatchID lastAcked = metadata.lastAcknowledgedBatchId;
  109. if (lastAcked >= nextBatchID) {
  110. HARD_ASSERT([self isEmpty], "Reset nextBatchID is only possible when the queue is empty");
  111. lastAcked = kFSTBatchIDUnknown;
  112. metadata.lastAcknowledgedBatchId = lastAcked;
  113. _db.currentTransaction->Put([self keyForCurrentMutationQueue], metadata);
  114. }
  115. }
  116. self.nextBatchID = nextBatchID;
  117. self.metadata = metadata;
  118. }
  119. + (FSTBatchID)loadNextBatchIDFromDB:(DB *)db {
  120. // TODO(gsoltis): implement Prev() and SeekToLast() on LevelDbTransaction::Iterator, then port
  121. // this to a transaction.
  122. std::unique_ptr<Iterator> it(db->NewIterator(LevelDbTransaction::DefaultReadOptions()));
  123. auto tableKey = [FSTLevelDBMutationKey keyPrefix];
  124. FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init];
  125. FSTBatchID maxBatchID = kFSTBatchIDUnknown;
  126. BOOL moreUserIDs = NO;
  127. std::string nextUserID;
  128. it->Seek(tableKey);
  129. if (it->Valid() && [rowKey decodeKey:it->key()]) {
  130. moreUserIDs = YES;
  131. nextUserID = rowKey.userID;
  132. }
  133. // This loop assumes that nextUserId contains the next username at the start of the iteration.
  134. while (moreUserIDs) {
  135. // Compute the first key after the last mutation for nextUserID.
  136. auto userEnd = [FSTLevelDBMutationKey keyPrefixWithUserID:nextUserID];
  137. userEnd = util::PrefixSuccessor(userEnd);
  138. // Seek to that key with the intent of finding the boundary between nextUserID's mutations
  139. // and the one after that (if any).
  140. it->Seek(userEnd);
  141. // At this point there are three possible cases to handle differently. Each case must prepare
  142. // the next iteration (by assigning to nextUserID or setting moreUserIDs = NO) and seek the
  143. // iterator to the last row in the current user's mutation sequence.
  144. if (!it->Valid()) {
  145. // The iterator is past the last row altogether (there are no additional userIDs and now
  146. // rows in any table after mutations). The last row will have the highest batchID.
  147. moreUserIDs = NO;
  148. it->SeekToLast();
  149. } else if ([rowKey decodeKey:it->key()]) {
  150. // The iterator is valid and the key decoded successfully so the next user was just decoded.
  151. nextUserID = rowKey.userID;
  152. it->Prev();
  153. } else {
  154. // The iterator is past the end of the mutations table but there are other rows.
  155. moreUserIDs = NO;
  156. it->Prev();
  157. }
  158. // In all the cases above there was at least one row for the current user and each case has
  159. // set things up such that iterator points to it.
  160. if (![rowKey decodeKey:it->key()]) {
  161. HARD_FAIL("There should have been a key previous to %s", userEnd);
  162. }
  163. if (rowKey.batchID > maxBatchID) {
  164. maxBatchID = rowKey.batchID;
  165. }
  166. }
  167. return maxBatchID + 1;
  168. }
  169. - (BOOL)isEmpty {
  170. std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:_userID];
  171. auto it = _db.currentTransaction->NewIterator();
  172. it->Seek(userKey);
  173. BOOL empty = YES;
  174. if (it->Valid() && absl::StartsWith(it->key(), userKey)) {
  175. empty = NO;
  176. }
  177. return empty;
  178. }
  179. - (FSTBatchID)highestAcknowledgedBatchID {
  180. return self.metadata.lastAcknowledgedBatchId;
  181. }
  182. - (void)acknowledgeBatch:(FSTMutationBatch *)batch streamToken:(nullable NSData *)streamToken {
  183. FSTBatchID batchID = batch.batchID;
  184. HARD_ASSERT(batchID > self.highestAcknowledgedBatchID,
  185. "Mutation batchIDs must be acknowledged in order");
  186. FSTPBMutationQueue *metadata = self.metadata;
  187. metadata.lastAcknowledgedBatchId = batchID;
  188. metadata.lastStreamToken = streamToken;
  189. _db.currentTransaction->Put([self keyForCurrentMutationQueue], metadata);
  190. }
  191. - (nullable NSData *)lastStreamToken {
  192. return self.metadata.lastStreamToken;
  193. }
  194. - (void)setLastStreamToken:(nullable NSData *)streamToken {
  195. FSTPBMutationQueue *metadata = self.metadata;
  196. metadata.lastStreamToken = streamToken;
  197. _db.currentTransaction->Put([self keyForCurrentMutationQueue], metadata);
  198. }
  199. - (std::string)keyForCurrentMutationQueue {
  200. return [FSTLevelDBMutationQueueKey keyWithUserID:_userID];
  201. }
  202. - (nullable FSTPBMutationQueue *)metadataForKey:(const std::string &)key {
  203. std::string value;
  204. Status status = _db.currentTransaction->Get(key, &value);
  205. if (status.ok()) {
  206. return [self parsedMetadata:value];
  207. } else if (status.IsNotFound()) {
  208. return nil;
  209. } else {
  210. HARD_FAIL("metadataForKey: failed loading key %s with status: %s", key, status.ToString());
  211. }
  212. }
  213. - (FSTMutationBatch *)addMutationBatchWithWriteTime:(FIRTimestamp *)localWriteTime
  214. mutations:(NSArray<FSTMutation *> *)mutations {
  215. FSTBatchID batchID = self.nextBatchID;
  216. self.nextBatchID += 1;
  217. FSTMutationBatch *batch = [[FSTMutationBatch alloc] initWithBatchID:batchID
  218. localWriteTime:localWriteTime
  219. mutations:mutations];
  220. std::string key = [self mutationKeyForBatch:batch];
  221. _db.currentTransaction->Put(key, [self.serializer encodedMutationBatch:batch]);
  222. // Store an empty value in the index which is equivalent to serializing a GPBEmpty message. In the
  223. // future if we wanted to store some other kind of value here, we can parse these empty values as
  224. // with some other protocol buffer (and the parser will see all default values).
  225. std::string emptyBuffer;
  226. for (FSTMutation *mutation in mutations) {
  227. key = [FSTLevelDBDocumentMutationKey keyWithUserID:_userID
  228. documentKey:mutation.key
  229. batchID:batchID];
  230. _db.currentTransaction->Put(key, emptyBuffer);
  231. }
  232. return batch;
  233. }
  234. - (nullable FSTMutationBatch *)lookupMutationBatch:(FSTBatchID)batchID {
  235. std::string key = [self mutationKeyForBatchID:batchID];
  236. std::string value;
  237. Status status = _db.currentTransaction->Get(key, &value);
  238. if (!status.ok()) {
  239. if (status.IsNotFound()) {
  240. return nil;
  241. }
  242. HARD_FAIL("Lookup mutation batch (%s, %s) failed with status: %s", _userID, batchID,
  243. status.ToString());
  244. }
  245. return [self decodedMutationBatch:value];
  246. }
  247. - (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(FSTBatchID)batchID {
  248. // All batches with batchID <= self.metadata.lastAcknowledgedBatchId have been acknowledged so
  249. // the first unacknowledged batch after batchID will have a batchID larger than both of these
  250. // values.
  251. FSTBatchID nextBatchID = MAX(batchID, self.metadata.lastAcknowledgedBatchId) + 1;
  252. std::string key = [self mutationKeyForBatchID:nextBatchID];
  253. auto it = _db.currentTransaction->NewIterator();
  254. it->Seek(key);
  255. FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init];
  256. if (!it->Valid() || ![rowKey decodeKey:it->key()]) {
  257. // Past the last row in the DB or out of the mutations table
  258. return nil;
  259. }
  260. if (rowKey.userID != _userID) {
  261. // Jumped past the last mutation for this user
  262. return nil;
  263. }
  264. HARD_ASSERT(rowKey.batchID >= nextBatchID, "Should have found mutation after %s", nextBatchID);
  265. return [self decodedMutationBatch:it->value()];
  266. }
  267. - (NSArray<FSTMutationBatch *> *)allMutationBatchesThroughBatchID:(FSTBatchID)batchID {
  268. std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:_userID];
  269. auto it = _db.currentTransaction->NewIterator();
  270. it->Seek(userKey);
  271. NSMutableArray *result = [NSMutableArray array];
  272. FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init];
  273. for (; it->Valid() && [rowKey decodeKey:it->key()]; it->Next()) {
  274. if (rowKey.userID != _userID) {
  275. // End of this user's mutations
  276. break;
  277. } else if (rowKey.batchID > batchID) {
  278. // This mutation is past what we're looking for
  279. break;
  280. }
  281. [result addObject:[self decodedMutationBatch:it->value()]];
  282. }
  283. return result;
  284. }
  285. - (NSArray<FSTMutationBatch *> *)allMutationBatchesAffectingDocumentKey:
  286. (const DocumentKey &)documentKey {
  287. // Scan the document-mutation index starting with a prefix starting with the given documentKey.
  288. std::string indexPrefix =
  289. [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:_userID resourcePath:documentKey.path()];
  290. auto indexIterator = _db.currentTransaction->NewIterator();
  291. indexIterator->Seek(indexPrefix);
  292. // Simultaneously scan the mutation queue. This works because each (key, batchID) pair is unique
  293. // and ordered, so when scanning a table prefixed by exactly key, all the batchIDs encountered
  294. // will be unique and in order.
  295. std::string mutationsPrefix = [FSTLevelDBMutationKey keyPrefixWithUserID:_userID];
  296. auto mutationIterator = _db.currentTransaction->NewIterator();
  297. NSMutableArray *result = [NSMutableArray array];
  298. FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init];
  299. for (; indexIterator->Valid(); indexIterator->Next()) {
  300. // Only consider rows matching exactly the specific key of interest. Index rows have this
  301. // form (with markers in brackets):
  302. //
  303. // <User>user <Path>collection <Path>doc <BatchId>2 <Terminator>
  304. // <User>user <Path>collection <Path>doc <BatchId>3 <Terminator>
  305. // <User>user <Path>collection <Path>doc <Path>sub <Path>doc <BatchId>3 <Terminator>
  306. //
  307. // Note that Path markers sort after BatchId markers so this means that when searching for
  308. // collection/doc, all the entries for it will be contiguous in the table, allowing a break
  309. // after any mismatch.
  310. if (!absl::StartsWith(indexIterator->key(), indexPrefix) ||
  311. ![rowKey decodeKey:indexIterator->key()] ||
  312. DocumentKey{rowKey.documentKey} != documentKey) {
  313. break;
  314. }
  315. // Each row is a unique combination of key and batchID, so this foreign key reference can
  316. // only occur once.
  317. std::string mutationKey = [FSTLevelDBMutationKey keyWithUserID:_userID batchID:rowKey.batchID];
  318. mutationIterator->Seek(mutationKey);
  319. if (!mutationIterator->Valid() || mutationIterator->key() != mutationKey) {
  320. HARD_FAIL(
  321. "Dangling document-mutation reference found: "
  322. "%s points to %s; seeking there found %s",
  323. DescribeKey(indexIterator), DescribeKey(mutationKey), DescribeKey(mutationIterator));
  324. }
  325. [result addObject:[self decodedMutationBatch:mutationIterator->value()]];
  326. }
  327. return result;
  328. }
  329. - (NSArray<FSTMutationBatch *> *)allMutationBatchesAffectingDocumentKeys:
  330. (const DocumentKeySet &)documentKeys {
  331. // Take a pass through the document keys and collect the set of unique mutation batchIDs that
  332. // affect them all. Some batches can affect more than one key.
  333. std::set<FSTBatchID> batchIDs;
  334. auto indexIterator = _db.currentTransaction->NewIterator();
  335. FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init];
  336. for (const DocumentKey &documentKey : documentKeys) {
  337. std::string indexPrefix =
  338. [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:_userID resourcePath:documentKey.path()];
  339. for (indexIterator->Seek(indexPrefix); indexIterator->Valid(); indexIterator->Next()) {
  340. // Only consider rows matching exactly the specific key of interest. Index rows have this
  341. // form (with markers in brackets):
  342. //
  343. // <User>user <Path>collection <Path>doc <BatchId>2 <Terminator>
  344. // <User>user <Path>collection <Path>doc <BatchId>3 <Terminator>
  345. // <User>user <Path>collection <Path>doc <Path>sub <Path>doc <BatchId>3 <Terminator>
  346. //
  347. // Note that Path markers sort after BatchId markers so this means that when searching for
  348. // collection/doc, all the entries for it will be contiguous in the table, allowing a break
  349. // after any mismatch.
  350. if (!absl::StartsWith(indexIterator->key(), indexPrefix) ||
  351. ![rowKey decodeKey:indexIterator->key()] ||
  352. DocumentKey{rowKey.documentKey} != documentKey) {
  353. break;
  354. }
  355. batchIDs.insert(rowKey.batchID);
  356. }
  357. }
  358. return [self allMutationBatchesWithBatchIDs:batchIDs];
  359. }
  360. - (NSArray<FSTMutationBatch *> *)allMutationBatchesAffectingQuery:(FSTQuery *)query {
  361. HARD_ASSERT(![query isDocumentQuery], "Document queries shouldn't go down this path");
  362. const ResourcePath &queryPath = query.path;
  363. size_t immediateChildrenPathLength = queryPath.size() + 1;
  364. // TODO(mcg): Actually implement a single-collection query
  365. //
  366. // This is actually executing an ancestor query, traversing the whole subtree below the
  367. // collection which can be horrifically inefficient for some structures. The right way to
  368. // solve this is to implement the full value index, but that's not in the cards in the near
  369. // future so this is the best we can do for the moment.
  370. //
  371. // Since we don't yet index the actual properties in the mutations, our current approach is to
  372. // just return all mutation batches that affect documents in the collection being queried.
  373. //
  374. // Unlike allMutationBatchesAffectingDocumentKey, this iteration will scan the document-mutation
  375. // index for more than a single document so the associated batchIDs will be neither necessarily
  376. // unique nor in order. This means an efficient simultaneous scan isn't possible.
  377. std::string indexPrefix =
  378. [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:_userID resourcePath:queryPath];
  379. auto indexIterator = _db.currentTransaction->NewIterator();
  380. indexIterator->Seek(indexPrefix);
  381. FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init];
  382. // Collect up unique batchIDs encountered during a scan of the index. Use a set<FSTBatchID> to
  383. // accumulate batch IDs so they can be traversed in order in a scan of the main table.
  384. //
  385. // This method is faster than performing lookups of the keys with _db->Get and keeping a hash of
  386. // batchIDs that have already been looked up. The performance difference is minor for small
  387. // numbers of keys but > 30% faster for larger numbers of keys.
  388. std::set<FSTBatchID> uniqueBatchIDs;
  389. for (; indexIterator->Valid(); indexIterator->Next()) {
  390. if (!absl::StartsWith(indexIterator->key(), indexPrefix) ||
  391. ![rowKey decodeKey:indexIterator->key()]) {
  392. break;
  393. }
  394. // Rows with document keys more than one segment longer than the query path can't be matches.
  395. // For example, a query on 'rooms' can't match the document /rooms/abc/messages/xyx.
  396. // TODO(mcg): we'll need a different scanner when we implement ancestor queries.
  397. if (rowKey.documentKey.path.size() != immediateChildrenPathLength) {
  398. continue;
  399. }
  400. uniqueBatchIDs.insert(rowKey.batchID);
  401. }
  402. return [self allMutationBatchesWithBatchIDs:uniqueBatchIDs];
  403. }
  404. /**
  405. * Constructs an array of matching batches, sorted by batchID to ensure that multiple mutations
  406. * affecting the same document key are applied in order.
  407. */
  408. - (NSArray<FSTMutationBatch *> *)allMutationBatchesWithBatchIDs:
  409. (const std::set<FSTBatchID> &)batchIDs {
  410. NSMutableArray *result = [NSMutableArray array];
  411. // Given an ordered set of unique batchIDs perform a skipping scan over the main table to find
  412. // the mutation batches.
  413. auto mutationIterator = _db.currentTransaction->NewIterator();
  414. for (FSTBatchID batchID : batchIDs) {
  415. std::string mutationKey = [FSTLevelDBMutationKey keyWithUserID:_userID batchID:batchID];
  416. mutationIterator->Seek(mutationKey);
  417. if (!mutationIterator->Valid() || mutationIterator->key() != mutationKey) {
  418. HARD_FAIL(
  419. "Dangling document-mutation reference found: "
  420. "Missing batch %s; seeking there found %s",
  421. DescribeKey(mutationKey), DescribeKey(mutationIterator));
  422. }
  423. [result addObject:[self decodedMutationBatch:mutationIterator->value()]];
  424. }
  425. return result;
  426. }
  427. - (NSArray<FSTMutationBatch *> *)allMutationBatches {
  428. std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:_userID];
  429. auto it = _db.currentTransaction->NewIterator();
  430. it->Seek(userKey);
  431. NSMutableArray *result = [NSMutableArray array];
  432. for (; it->Valid() && absl::StartsWith(it->key(), userKey); it->Next()) {
  433. [result addObject:[self decodedMutationBatch:it->value()]];
  434. }
  435. return result;
  436. }
  437. - (void)removeMutationBatches:(NSArray<FSTMutationBatch *> *)batches {
  438. auto checkIterator = _db.currentTransaction->NewIterator();
  439. for (FSTMutationBatch *batch in batches) {
  440. FSTBatchID batchID = batch.batchID;
  441. std::string key = [FSTLevelDBMutationKey keyWithUserID:_userID batchID:batchID];
  442. // As a sanity check, verify that the mutation batch exists before deleting it.
  443. checkIterator->Seek(key);
  444. HARD_ASSERT(checkIterator->Valid(), "Mutation batch %s did not exist", DescribeKey(key));
  445. HARD_ASSERT(key == checkIterator->key(), "Mutation batch %s not found; found %s",
  446. DescribeKey(key), DescribeKey(checkIterator));
  447. _db.currentTransaction->Delete(key);
  448. for (FSTMutation *mutation in batch.mutations) {
  449. key = [FSTLevelDBDocumentMutationKey keyWithUserID:_userID
  450. documentKey:mutation.key
  451. batchID:batchID];
  452. _db.currentTransaction->Delete(key);
  453. [_db.referenceDelegate removeMutationReference:mutation.key];
  454. }
  455. }
  456. }
  457. - (void)performConsistencyCheck {
  458. if (![self isEmpty]) {
  459. return;
  460. }
  461. // Verify that there are no entries in the document-mutation index if the queue is empty.
  462. std::string indexPrefix = [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:_userID];
  463. auto indexIterator = _db.currentTransaction->NewIterator();
  464. indexIterator->Seek(indexPrefix);
  465. std::vector<std::string> danglingMutationReferences;
  466. for (; indexIterator->Valid(); indexIterator->Next()) {
  467. // Only consider rows matching this index prefix for the current user.
  468. if (!absl::StartsWith(indexIterator->key(), indexPrefix)) {
  469. break;
  470. }
  471. danglingMutationReferences.push_back(DescribeKey(indexIterator));
  472. }
  473. HARD_ASSERT(danglingMutationReferences.empty(),
  474. "Document leak -- detected dangling mutation references when queue "
  475. "is empty. Dangling keys: %s",
  476. util::ToString(danglingMutationReferences));
  477. }
  478. - (std::string)mutationKeyForBatch:(FSTMutationBatch *)batch {
  479. return [FSTLevelDBMutationKey keyWithUserID:_userID batchID:batch.batchID];
  480. }
  481. - (std::string)mutationKeyForBatchID:(FSTBatchID)batchID {
  482. return [FSTLevelDBMutationKey keyWithUserID:_userID batchID:batchID];
  483. }
  484. /** Parses the MutationQueue metadata from the given LevelDB row contents. */
  485. - (FSTPBMutationQueue *)parsedMetadata:(Slice)slice {
  486. NSData *data =
  487. [[NSData alloc] initWithBytesNoCopy:(void *)slice.data() length:slice.size() freeWhenDone:NO];
  488. NSError *error;
  489. FSTPBMutationQueue *proto = [FSTPBMutationQueue parseFromData:data error:&error];
  490. if (!proto) {
  491. HARD_FAIL("FSTPBMutationQueue failed to parse: %s", error);
  492. }
  493. return proto;
  494. }
  495. - (FSTMutationBatch *)decodedMutationBatch:(absl::string_view)encoded {
  496. NSData *data = [[NSData alloc] initWithBytesNoCopy:(void *)encoded.data()
  497. length:encoded.size()
  498. freeWhenDone:NO];
  499. NSError *error;
  500. FSTPBWriteBatch *proto = [FSTPBWriteBatch parseFromData:data error:&error];
  501. if (!proto) {
  502. HARD_FAIL("FSTPBMutationBatch failed to parse: %s", error);
  503. }
  504. return [self.serializer decodedMutationBatch:proto];
  505. }
  506. @end
  507. NS_ASSUME_NONNULL_END