FSTLevelDBMutationQueue.mm 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Source/Local/FSTLevelDBMutationQueue.h"
  17. #include <leveldb/db.h>
  18. #include <leveldb/write_batch.h>
  19. #include <set>
  20. #include <string>
  21. #import "Firestore/Protos/objc/firestore/local/Mutation.pbobjc.h"
  22. #import "Firestore/Source/Auth/FSTUser.h"
  23. #import "Firestore/Source/Core/FSTQuery.h"
  24. #import "Firestore/Source/Local/FSTLevelDB.h"
  25. #import "Firestore/Source/Local/FSTLevelDBKey.h"
  26. #import "Firestore/Source/Local/FSTLocalSerializer.h"
  27. #import "Firestore/Source/Local/FSTWriteGroup.h"
  28. #import "Firestore/Source/Model/FSTDocumentKey.h"
  29. #import "Firestore/Source/Model/FSTMutation.h"
  30. #import "Firestore/Source/Model/FSTMutationBatch.h"
  31. #import "Firestore/Source/Model/FSTPath.h"
  32. #import "Firestore/Source/Util/FSTAssert.h"
  33. #include "Firestore/Port/ordered_code.h"
  34. #include "Firestore/Port/string_util.h"
  35. NS_ASSUME_NONNULL_BEGIN
  36. using Firestore::OrderedCode;
  37. using Firestore::StringView;
  38. using leveldb::DB;
  39. using leveldb::Iterator;
  40. using leveldb::ReadOptions;
  41. using leveldb::Slice;
  42. using leveldb::Status;
  43. using leveldb::WriteBatch;
  44. using leveldb::WriteOptions;
  45. @interface FSTLevelDBMutationQueue ()
  46. - (instancetype)initWithUserID:(NSString *)userID
  47. db:(std::shared_ptr<DB>)db
  48. serializer:(FSTLocalSerializer *)serializer NS_DESIGNATED_INITIALIZER;
  49. /** The normalized userID (e.g. nil UID => @"" userID) used in our LevelDB keys. */
  50. @property(nonatomic, strong, readonly) NSString *userID;
  51. /**
  52. * Next value to use when assigning sequential IDs to each mutation batch.
  53. *
  54. * NOTE: There can only be one FSTLevelDBMutationQueue for a given db at a time, hence it is safe
  55. * to track nextBatchID as an instance-level property. Should we ever relax this constraint we'll
  56. * need to revisit this.
  57. */
  58. @property(nonatomic, assign) FSTBatchID nextBatchID;
  59. /** A write-through cache copy of the metadata describing the current queue. */
  60. @property(nonatomic, strong, nullable) FSTPBMutationQueue *metadata;
  61. @property(nonatomic, strong, readonly) FSTLocalSerializer *serializer;
  62. @end
  63. /**
  64. * Returns a standard set of read options.
  65. *
  66. * For now this is paranoid, but perhaps disable that in production builds.
  67. */
  68. static ReadOptions StandardReadOptions() {
  69. ReadOptions options;
  70. options.verify_checksums = true;
  71. return options;
  72. }
  73. @implementation FSTLevelDBMutationQueue {
  74. // The DB pointer is shared with all cooperating LevelDB-related objects.
  75. std::shared_ptr<DB> _db;
  76. }
  77. + (instancetype)mutationQueueWithUser:(FSTUser *)user
  78. db:(std::shared_ptr<DB>)db
  79. serializer:(FSTLocalSerializer *)serializer {
  80. FSTAssert(![user.UID isEqual:@""], @"UserID must not be an empty string.");
  81. NSString *userID = user.isUnauthenticated ? @"" : user.UID;
  82. return [[FSTLevelDBMutationQueue alloc] initWithUserID:userID db:db serializer:serializer];
  83. }
  84. - (instancetype)initWithUserID:(NSString *)userID
  85. db:(std::shared_ptr<DB>)db
  86. serializer:(FSTLocalSerializer *)serializer {
  87. if (self = [super init]) {
  88. _userID = userID;
  89. _db = db;
  90. _serializer = serializer;
  91. }
  92. return self;
  93. }
  94. - (void)startWithGroup:(FSTWriteGroup *)group {
  95. FSTBatchID nextBatchID = [FSTLevelDBMutationQueue loadNextBatchIDFromDB:_db];
  96. // On restart, nextBatchId may end up lower than lastAcknowledgedBatchId since it's computed from
  97. // the queue contents, and there may be no mutations in the queue. In this case, we need to reset
  98. // lastAcknowledgedBatchId (which is safe since the queue must be empty).
  99. std::string key = [self keyForCurrentMutationQueue];
  100. FSTPBMutationQueue *metadata = [self metadataForKey:key];
  101. if (!metadata) {
  102. metadata = [FSTPBMutationQueue message];
  103. // proto3's default value for lastAcknowledgedBatchId is zero, but that would consider the first
  104. // entry in the queue to be acknowledged without that acknowledgement actually happening.
  105. metadata.lastAcknowledgedBatchId = kFSTBatchIDUnknown;
  106. } else {
  107. FSTBatchID lastAcked = metadata.lastAcknowledgedBatchId;
  108. if (lastAcked >= nextBatchID) {
  109. FSTAssert([self isEmpty], @"Reset nextBatchID is only possible when the queue is empty");
  110. lastAcked = kFSTBatchIDUnknown;
  111. metadata.lastAcknowledgedBatchId = lastAcked;
  112. [group setMessage:metadata forKey:[self keyForCurrentMutationQueue]];
  113. }
  114. }
  115. self.nextBatchID = nextBatchID;
  116. self.metadata = metadata;
  117. }
  118. - (void)shutdown {
  119. _db.reset();
  120. }
  121. + (FSTBatchID)loadNextBatchIDFromDB:(std::shared_ptr<DB>)db {
  122. std::unique_ptr<Iterator> it(db->NewIterator(StandardReadOptions()));
  123. auto tableKey = [FSTLevelDBMutationKey keyPrefix];
  124. FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init];
  125. FSTBatchID maxBatchID = kFSTBatchIDUnknown;
  126. BOOL moreUserIDs = NO;
  127. std::string nextUserID;
  128. it->Seek(tableKey);
  129. if (it->Valid() && [rowKey decodeKey:it->key()]) {
  130. moreUserIDs = YES;
  131. nextUserID = rowKey.userID;
  132. }
  133. // This loop assumes that nextUserId contains the next username at the start of the iteration.
  134. while (moreUserIDs) {
  135. // Compute the first key after the last mutation for nextUserID.
  136. auto userEnd = [FSTLevelDBMutationKey keyPrefixWithUserID:nextUserID];
  137. userEnd = Firestore::PrefixSuccessor(userEnd);
  138. // Seek to that key with the intent of finding the boundary between nextUserID's mutations
  139. // and the one after that (if any).
  140. it->Seek(userEnd);
  141. // At this point there are three possible cases to handle differently. Each case must prepare
  142. // the next iteration (by assigning to nextUserID or setting moreUserIDs = NO) and seek the
  143. // iterator to the last row in the current user's mutation sequence.
  144. if (!it->Valid()) {
  145. // The iterator is past the last row altogether (there are no additional userIDs and now
  146. // rows in any table after mutations). The last row will have the highest batchID.
  147. moreUserIDs = NO;
  148. it->SeekToLast();
  149. } else if ([rowKey decodeKey:it->key()]) {
  150. // The iterator is valid and the key decoded successfully so the next user was just decoded.
  151. nextUserID = rowKey.userID;
  152. it->Prev();
  153. } else {
  154. // The iterator is past the end of the mutations table but there are other rows.
  155. moreUserIDs = NO;
  156. it->Prev();
  157. }
  158. // In all the cases above there was at least one row for the current user and each case has
  159. // set things up such that iterator points to it.
  160. if (![rowKey decodeKey:it->key()]) {
  161. FSTFail(@"There should have been a key previous to %s", userEnd.c_str());
  162. }
  163. if (rowKey.batchID > maxBatchID) {
  164. maxBatchID = rowKey.batchID;
  165. }
  166. }
  167. return maxBatchID + 1;
  168. }
  169. - (BOOL)isEmpty {
  170. std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID];
  171. std::unique_ptr<Iterator> it(_db->NewIterator(StandardReadOptions()));
  172. it->Seek(userKey);
  173. BOOL empty = YES;
  174. if (it->Valid() && it->key().starts_with(userKey)) {
  175. empty = NO;
  176. }
  177. Status status = it->status();
  178. if (!status.ok()) {
  179. FSTFail(@"isEmpty failed with status: %s", status.ToString().c_str());
  180. }
  181. return empty;
  182. }
  183. - (FSTBatchID)highestAcknowledgedBatchID {
  184. return self.metadata.lastAcknowledgedBatchId;
  185. }
  186. - (void)acknowledgeBatch:(FSTMutationBatch *)batch
  187. streamToken:(nullable NSData *)streamToken
  188. group:(FSTWriteGroup *)group {
  189. FSTBatchID batchID = batch.batchID;
  190. FSTAssert(batchID > self.highestAcknowledgedBatchID,
  191. @"Mutation batchIDs must be acknowledged in order");
  192. FSTPBMutationQueue *metadata = self.metadata;
  193. metadata.lastAcknowledgedBatchId = batchID;
  194. metadata.lastStreamToken = streamToken;
  195. [group setMessage:metadata forKey:[self keyForCurrentMutationQueue]];
  196. }
  197. - (nullable NSData *)lastStreamToken {
  198. return self.metadata.lastStreamToken;
  199. }
  200. - (void)setLastStreamToken:(nullable NSData *)streamToken group:(FSTWriteGroup *)group {
  201. FSTPBMutationQueue *metadata = self.metadata;
  202. metadata.lastStreamToken = streamToken;
  203. [group setMessage:metadata forKey:[self keyForCurrentMutationQueue]];
  204. }
  205. - (std::string)keyForCurrentMutationQueue {
  206. return [FSTLevelDBMutationQueueKey keyWithUserID:self.userID];
  207. }
  208. - (nullable FSTPBMutationQueue *)metadataForKey:(const std::string &)key {
  209. std::string value;
  210. Status status = _db->Get(StandardReadOptions(), key, &value);
  211. if (status.ok()) {
  212. return [self parsedMetadata:value];
  213. } else if (status.IsNotFound()) {
  214. return nil;
  215. } else {
  216. FSTFail(@"metadataForKey: failed loading key %s with status: %s", key.c_str(),
  217. status.ToString().c_str());
  218. }
  219. }
  220. - (FSTMutationBatch *)addMutationBatchWithWriteTime:(FSTTimestamp *)localWriteTime
  221. mutations:(NSArray<FSTMutation *> *)mutations
  222. group:(FSTWriteGroup *)group {
  223. FSTBatchID batchID = self.nextBatchID;
  224. self.nextBatchID += 1;
  225. FSTMutationBatch *batch = [[FSTMutationBatch alloc] initWithBatchID:batchID
  226. localWriteTime:localWriteTime
  227. mutations:mutations];
  228. std::string key = [self mutationKeyForBatch:batch];
  229. [group setMessage:[self.serializer encodedMutationBatch:batch] forKey:key];
  230. NSString *userID = self.userID;
  231. // Store an empty value in the index which is equivalent to serializing a GPBEmpty message. In the
  232. // future if we wanted to store some other kind of value here, we can parse these empty values as
  233. // with some other protocol buffer (and the parser will see all default values).
  234. std::string emptyBuffer;
  235. for (FSTMutation *mutation in mutations) {
  236. key = [FSTLevelDBDocumentMutationKey keyWithUserID:userID
  237. documentKey:mutation.key
  238. batchID:batchID];
  239. [group setData:emptyBuffer forKey:key];
  240. }
  241. return batch;
  242. }
  243. - (nullable FSTMutationBatch *)lookupMutationBatch:(FSTBatchID)batchID {
  244. std::string key = [self mutationKeyForBatchID:batchID];
  245. std::string value;
  246. Status status = _db->Get(StandardReadOptions(), key, &value);
  247. if (!status.ok()) {
  248. if (status.IsNotFound()) {
  249. return nil;
  250. }
  251. FSTFail(@"Lookup mutation batch (%@, %d) failed with status: %s", self.userID, batchID,
  252. status.ToString().c_str());
  253. }
  254. return [self decodedMutationBatch:value];
  255. }
  256. - (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(FSTBatchID)batchID {
  257. std::string key = [self mutationKeyForBatchID:batchID + 1];
  258. std::unique_ptr<Iterator> it(_db->NewIterator(StandardReadOptions()));
  259. it->Seek(key);
  260. Status status = it->status();
  261. if (!status.ok()) {
  262. FSTFail(@"Seek to mutation batch (%@, %d) failed with status: %s", self.userID, batchID,
  263. status.ToString().c_str());
  264. }
  265. FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init];
  266. if (!it->Valid() || ![rowKey decodeKey:it->key()]) {
  267. // Past the last row in the DB or out of the mutations table
  268. return nil;
  269. }
  270. if (rowKey.userID != [self.userID UTF8String]) {
  271. // Jumped past the last mutation for this user
  272. return nil;
  273. }
  274. FSTAssert(rowKey.batchID > batchID, @"Should have found mutation after %d", batchID);
  275. return [self decodedMutationBatch:it->value()];
  276. }
  277. - (NSArray<FSTMutationBatch *> *)allMutationBatchesThroughBatchID:(FSTBatchID)batchID {
  278. std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID];
  279. const char *userID = [self.userID UTF8String];
  280. std::unique_ptr<Iterator> it(_db->NewIterator(StandardReadOptions()));
  281. it->Seek(userKey);
  282. NSMutableArray *result = [NSMutableArray array];
  283. FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init];
  284. for (; it->Valid() && [rowKey decodeKey:it->key()]; it->Next()) {
  285. if (rowKey.userID != userID) {
  286. // End of this user's mutations
  287. break;
  288. } else if (rowKey.batchID > batchID) {
  289. // This mutation is past what we're looking for
  290. break;
  291. }
  292. [result addObject:[self decodedMutationBatch:it->value()]];
  293. }
  294. Status status = it->status();
  295. if (!status.ok()) {
  296. FSTFail(@"Find all mutations through mutation batch (%@, %d) failed with status: %s",
  297. self.userID, batchID, status.ToString().c_str());
  298. }
  299. return result;
  300. }
  301. - (NSArray<FSTMutationBatch *> *)allMutationBatchesAffectingDocumentKey:
  302. (FSTDocumentKey *)documentKey {
  303. NSString *userID = self.userID;
  304. // Scan the document-mutation index starting with a prefix starting with the given documentKey.
  305. std::string indexPrefix =
  306. [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:documentKey.path];
  307. std::unique_ptr<Iterator> indexIterator(_db->NewIterator(StandardReadOptions()));
  308. indexIterator->Seek(indexPrefix);
  309. // Simultaneously scan the mutation queue. This works because each (key, batchID) pair is unique
  310. // and ordered, so when scanning a table prefixed by exactly key, all the batchIDs encountered
  311. // will be unique and in order.
  312. std::string mutationsPrefix = [FSTLevelDBMutationKey keyPrefixWithUserID:userID];
  313. std::unique_ptr<Iterator> mutationIterator(_db->NewIterator(StandardReadOptions()));
  314. NSMutableArray *result = [NSMutableArray array];
  315. FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init];
  316. for (; indexIterator->Valid(); indexIterator->Next()) {
  317. Slice indexKey = indexIterator->key();
  318. // Only consider rows matching exactly the specific key of interest. Note that because we order
  319. // by path first, and we order terminators before path separators, we'll encounter all the
  320. // index rows for documentKey contiguously. In particular, all the rows for documentKey will
  321. // occur before any rows for documents nested in a subcollection beneath documentKey so we can
  322. // stop as soon as we hit any such row.
  323. if (!indexKey.starts_with(indexPrefix) || ![rowKey decodeKey:indexKey] ||
  324. ![rowKey.documentKey isEqualToKey:documentKey]) {
  325. break;
  326. }
  327. // Each row is a unique combination of key and batchID, so this foreign key reference can
  328. // only occur once.
  329. std::string mutationKey = [FSTLevelDBMutationKey keyWithUserID:userID batchID:rowKey.batchID];
  330. mutationIterator->Seek(mutationKey);
  331. if (!mutationIterator->Valid() || mutationIterator->key() != mutationKey) {
  332. NSString *foundKeyDescription = @"the end of the table";
  333. if (mutationIterator->Valid()) {
  334. foundKeyDescription = [FSTLevelDBKey descriptionForKey:mutationIterator->key()];
  335. }
  336. FSTFail(
  337. @"Dangling document-mutation reference found: "
  338. @"%@ points to %@; seeking there found %@",
  339. [FSTLevelDBKey descriptionForKey:indexKey], [FSTLevelDBKey descriptionForKey:mutationKey],
  340. foundKeyDescription);
  341. }
  342. [result addObject:[self decodedMutationBatch:mutationIterator->value()]];
  343. }
  344. return result;
  345. }
  346. - (NSArray<FSTMutationBatch *> *)allMutationBatchesAffectingQuery:(FSTQuery *)query {
  347. FSTAssert(![query isDocumentQuery], @"Document queries shouldn't go down this path");
  348. NSString *userID = self.userID;
  349. FSTResourcePath *queryPath = query.path;
  350. int immediateChildrenPathLength = queryPath.length + 1;
  351. // TODO(mcg): Actually implement a single-collection query
  352. //
  353. // This is actually executing an ancestor query, traversing the whole subtree below the
  354. // collection which can be horrifically inefficient for some structures. The right way to
  355. // solve this is to implement the full value index, but that's not in the cards in the near
  356. // future so this is the best we can do for the moment.
  357. //
  358. // Since we don't yet index the actual properties in the mutations, our current approach is to
  359. // just return all mutation batches that affect documents in the collection being queried.
  360. //
  361. // Unlike allMutationBatchesAffectingDocumentKey, this iteration will scan the document-mutation
  362. // index for more than a single document so the associated batchIDs will be neither necessarily
  363. // unique nor in order. This means an efficient simultaneous scan isn't possible.
  364. std::string indexPrefix =
  365. [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:queryPath];
  366. std::unique_ptr<Iterator> indexIterator(_db->NewIterator(StandardReadOptions()));
  367. indexIterator->Seek(indexPrefix);
  368. NSMutableArray *result = [NSMutableArray array];
  369. FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init];
  370. // Collect up unique batchIDs encountered during a scan of the index. Use a set<FSTBatchID> to
  371. // accumulate batch IDs so they can be traversed in order in a scan of the main table.
  372. //
  373. // This method is faster than performing lookups of the keys with _db->Get and keeping a hash of
  374. // batchIDs that have already been looked up. The performance difference is minor for small
  375. // numbers of keys but > 30% faster for larger numbers of keys.
  376. std::set<FSTBatchID> uniqueBatchIds;
  377. for (; indexIterator->Valid(); indexIterator->Next()) {
  378. Slice indexKey = indexIterator->key();
  379. if (!indexKey.starts_with(indexPrefix) || ![rowKey decodeKey:indexKey]) {
  380. break;
  381. }
  382. // Rows with document keys more than one segment longer than the query path can't be matches.
  383. // For example, a query on 'rooms' can't match the document /rooms/abc/messages/xyx.
  384. // TODO(mcg): we'll need a different scanner when we implement ancestor queries.
  385. if (rowKey.documentKey.path.length != immediateChildrenPathLength) {
  386. continue;
  387. }
  388. uniqueBatchIds.insert(rowKey.batchID);
  389. }
  390. // Given an ordered set of unique batchIDs perform a skipping scan over the main table to find
  391. // the mutation batches.
  392. std::unique_ptr<Iterator> mutationIterator(_db->NewIterator(StandardReadOptions()));
  393. for (FSTBatchID batchID : uniqueBatchIds) {
  394. std::string mutationKey = [FSTLevelDBMutationKey keyWithUserID:userID batchID:batchID];
  395. mutationIterator->Seek(mutationKey);
  396. if (!mutationIterator->Valid() || mutationIterator->key() != mutationKey) {
  397. NSString *foundKeyDescription = @"the end of the table";
  398. if (mutationIterator->Valid()) {
  399. foundKeyDescription = [FSTLevelDBKey descriptionForKey:mutationIterator->key()];
  400. }
  401. FSTFail(
  402. @"Dangling document-mutation reference found: "
  403. @"Missing batch %@; seeking there found %@",
  404. [FSTLevelDBKey descriptionForKey:mutationKey], foundKeyDescription);
  405. }
  406. [result addObject:[self decodedMutationBatch:mutationIterator->value()]];
  407. }
  408. return result;
  409. }
  410. - (NSArray<FSTMutationBatch *> *)allMutationBatches {
  411. std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID];
  412. std::unique_ptr<Iterator> it(_db->NewIterator(StandardReadOptions()));
  413. it->Seek(userKey);
  414. NSMutableArray *result = [NSMutableArray array];
  415. for (; it->Valid() && it->key().starts_with(userKey); it->Next()) {
  416. [result addObject:[self decodedMutationBatch:it->value()]];
  417. }
  418. Status status = it->status();
  419. if (!status.ok()) {
  420. FSTFail(@"Find all mutation batches failed with status: %s", status.ToString().c_str());
  421. }
  422. return result;
  423. }
  424. - (void)removeMutationBatches:(NSArray<FSTMutationBatch *> *)batches group:(FSTWriteGroup *)group {
  425. NSString *userID = self.userID;
  426. id<FSTGarbageCollector> garbageCollector = self.garbageCollector;
  427. std::unique_ptr<Iterator> checkIterator(_db->NewIterator(StandardReadOptions()));
  428. for (FSTMutationBatch *batch in batches) {
  429. FSTBatchID batchID = batch.batchID;
  430. std::string key = [FSTLevelDBMutationKey keyWithUserID:userID batchID:batchID];
  431. // As a sanity check, verify that the mutation batch exists before deleting it.
  432. checkIterator->Seek(key);
  433. FSTAssert(checkIterator->Valid(), @"Mutation batch %@ did not exist",
  434. [FSTLevelDBKey descriptionForKey:key]);
  435. FSTAssert(key == checkIterator->key(), @"Mutation batch %@ not found; found %@",
  436. [FSTLevelDBKey descriptionForKey:key],
  437. [FSTLevelDBKey descriptionForKey:checkIterator->key()]);
  438. [group removeMessageForKey:key];
  439. for (FSTMutation *mutation in batch.mutations) {
  440. key = [FSTLevelDBDocumentMutationKey keyWithUserID:userID
  441. documentKey:mutation.key
  442. batchID:batchID];
  443. [group removeMessageForKey:key];
  444. [garbageCollector addPotentialGarbageKey:mutation.key];
  445. }
  446. }
  447. }
  448. - (void)performConsistencyCheck {
  449. if (![self isEmpty]) {
  450. return;
  451. }
  452. // Verify that there are no entries in the document-mutation index if the queue is empty.
  453. std::string indexPrefix = [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID];
  454. std::unique_ptr<Iterator> indexIterator(_db->NewIterator(StandardReadOptions()));
  455. indexIterator->Seek(indexPrefix);
  456. NSMutableArray<NSString *> *danglingMutationReferences = [NSMutableArray array];
  457. for (; indexIterator->Valid(); indexIterator->Next()) {
  458. Slice indexKey = indexIterator->key();
  459. // Only consider rows matching this index prefix for the current user.
  460. if (!indexKey.starts_with(indexPrefix)) {
  461. break;
  462. }
  463. [danglingMutationReferences addObject:[FSTLevelDBKey descriptionForKey:indexKey]];
  464. }
  465. FSTAssert(danglingMutationReferences.count == 0,
  466. @"Document leak -- detected dangling mutation references when queue "
  467. @"is empty. Dangling keys: %@",
  468. danglingMutationReferences);
  469. }
  470. - (std::string)mutationKeyForBatch:(FSTMutationBatch *)batch {
  471. return [FSTLevelDBMutationKey keyWithUserID:self.userID batchID:batch.batchID];
  472. }
  473. - (std::string)mutationKeyForBatchID:(FSTBatchID)batchID {
  474. return [FSTLevelDBMutationKey keyWithUserID:self.userID batchID:batchID];
  475. }
  476. /** Parses the MutationQueue metadata from the given LevelDB row contents. */
  477. - (FSTPBMutationQueue *)parsedMetadata:(Slice)slice {
  478. NSData *data =
  479. [[NSData alloc] initWithBytesNoCopy:(void *)slice.data() length:slice.size() freeWhenDone:NO];
  480. NSError *error;
  481. FSTPBMutationQueue *proto = [FSTPBMutationQueue parseFromData:data error:&error];
  482. if (!proto) {
  483. FSTFail(@"FSTPBMutationQueue failed to parse: %@", error);
  484. }
  485. return proto;
  486. }
  487. - (FSTMutationBatch *)decodedMutationBatch:(Slice)slice {
  488. NSData *data =
  489. [[NSData alloc] initWithBytesNoCopy:(void *)slice.data() length:slice.size() freeWhenDone:NO];
  490. NSError *error;
  491. FSTPBWriteBatch *proto = [FSTPBWriteBatch parseFromData:data error:&error];
  492. if (!proto) {
  493. FSTFail(@"FSTPBMutationBatch failed to parse: %@", error);
  494. }
  495. return [self.serializer decodedMutationBatch:proto];
  496. }
  497. #pragma mark - FSTGarbageSource implementation
  498. - (BOOL)containsKey:(FSTDocumentKey *)documentKey {
  499. std::string indexPrefix =
  500. [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:documentKey.path];
  501. std::unique_ptr<Iterator> indexIterator(_db->NewIterator(StandardReadOptions()));
  502. indexIterator->Seek(indexPrefix);
  503. if (indexIterator->Valid()) {
  504. FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init];
  505. Slice iteratorKey = indexIterator->key();
  506. // Check both that the key prefix matches and that the decoded document key is exactly the key
  507. // we're looking for.
  508. if (iteratorKey.starts_with(indexPrefix) && [rowKey decodeKey:iteratorKey] &&
  509. [rowKey.documentKey isEqualToKey:documentKey]) {
  510. return YES;
  511. }
  512. }
  513. return NO;
  514. }
  515. @end
  516. NS_ASSUME_NONNULL_END