Kaynağa Gözat

Add IndexBackfiller to Support Client Side Indexing (#10021)

* Add IndexBackfiller

* Address feedback

* Address more feedback
cherylEnkidu 3 yıl önce
ebeveyn
işleme
9b4c200afd
32 değiştirilmiş dosya ile 1046 ekleme ve 153 silme
  1. 14 0
      Firestore/Example/Firestore.xcodeproj/project.pbxproj
  2. 29 5
      Firestore/core/src/core/firestore_client.cc
  3. 12 2
      Firestore/core/src/core/firestore_client.h
  4. 3 2
      Firestore/core/src/local/document_overlay_cache.cc
  5. 2 1
      Firestore/core/src/local/document_overlay_cache.h
  6. 105 0
      Firestore/core/src/local/index_backfiller.cc
  7. 75 0
      Firestore/core/src/local/index_backfiller.h
  8. 2 1
      Firestore/core/src/local/index_manager.h
  9. 1 2
      Firestore/core/src/local/leveldb_index_manager.cc
  10. 1 1
      Firestore/core/src/local/leveldb_index_manager.h
  11. 68 90
      Firestore/core/src/local/leveldb_remote_document_cache.cc
  12. 17 8
      Firestore/core/src/local/leveldb_remote_document_cache.h
  13. 37 5
      Firestore/core/src/local/local_documents_view.cc
  14. 23 3
      Firestore/core/src/local/local_documents_view.h
  15. 8 0
      Firestore/core/src/local/local_store.cc
  16. 27 0
      Firestore/core/src/local/local_store.h
  17. 2 2
      Firestore/core/src/local/memory_index_manager.cc
  18. 1 1
      Firestore/core/src/local/memory_index_manager.h
  19. 5 3
      Firestore/core/src/local/memory_remote_document_cache.cc
  20. 5 3
      Firestore/core/src/local/memory_remote_document_cache.h
  21. 7 3
      Firestore/core/src/local/remote_document_cache.h
  22. 8 0
      Firestore/core/src/model/field_index.h
  23. 7 2
      Firestore/core/src/util/async_queue.h
  24. 5 3
      Firestore/core/test/unit/local/counting_query_engine.cc
  25. 5 3
      Firestore/core/test/unit/local/counting_query_engine.h
  26. 6 7
      Firestore/core/test/unit/local/document_overlay_cache_test.cc
  27. 559 0
      Firestore/core/test/unit/local/index_backfiller_test.cc
  28. 1 2
      Firestore/core/test/unit/local/index_manager_test.h
  29. 2 2
      Firestore/core/test/unit/local/query_engine_test.cc
  30. 1 1
      Firestore/core/test/unit/local/query_engine_test.h
  31. 6 1
      Firestore/core/test/unit/testutil/testutil.cc
  32. 2 0
      Firestore/core/test/unit/testutil/testutil.h

+ 14 - 0
Firestore/Example/Firestore.xcodeproj/project.pbxproj

@@ -129,6 +129,7 @@
 		15BF63DFF3A7E9A5376C4233 /* transform_operation_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 33607A3AE91548BD219EC9C6 /* transform_operation_test.cc */; };
 		15F54E9538839D56A40C5565 /* watch_change_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 2D7472BC70C024D736FF74D9 /* watch_change_test.cc */; };
 		160B8B6F32963E94CB70B14F /* leveldb_query_engine_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = DB1F1E1B1ED15E8D042144B1 /* leveldb_query_engine_test.cc */; };
+		167659CDCA47B450F2441454 /* index_backfiller_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 1F50E872B3F117A674DA8E94 /* index_backfiller_test.cc */; };
 		16791B16601204220623916C /* status_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 54A0352C20A3B3D7003E0143 /* status_test.cc */; };
 		16FE432587C1B40AF08613D2 /* objc_type_traits_apple_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = 2A0CF41BA5AED6049B0BEB2C /* objc_type_traits_apple_test.mm */; };
 		16FF9073CA381CA43CA9BF29 /* FIRTransactionOptionsTests.mm in Sources */ = {isa = PBXBuildFile; fileRef = CF39ECA1293D21A0A2AB2626 /* FIRTransactionOptionsTests.mm */; };
@@ -726,6 +727,7 @@
 		75C6CECF607CA94F56260BAB /* memory_document_overlay_cache_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 29D9C76922DAC6F710BC1EF4 /* memory_document_overlay_cache_test.cc */; };
 		75D124966E727829A5F99249 /* FIRTypeTests.mm in Sources */ = {isa = PBXBuildFile; fileRef = 5492E071202154D600B64F25 /* FIRTypeTests.mm */; };
 		76A5447D76F060E996555109 /* task_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 899FC22684B0F7BEEAE13527 /* task_test.cc */; };
+		76FEBDD2793B729BAD2E84C7 /* index_backfiller_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 1F50E872B3F117A674DA8E94 /* index_backfiller_test.cc */; };
 		7731E564468645A4A62E2A3C /* leveldb_key_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 54995F6E205B6E12004EFFA0 /* leveldb_key_test.cc */; };
 		77BB66DD17A8E6545DE22E0B /* remote_document_cache_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 7EB299CF85034F09CFD6F3FD /* remote_document_cache_test.cc */; };
 		77C459976DCF7503AEE18F7F /* leveldb_bundle_cache_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 8E9CD82E60893DDD7757B798 /* leveldb_bundle_cache_test.cc */; };
@@ -849,6 +851,7 @@
 		915A9B8DB280DB4787D83FFE /* byte_stream_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 432C71959255C5DBDF522F52 /* byte_stream_test.cc */; };
 		91AEFFEE35FBE15FEC42A1F4 /* memory_local_store_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = F6CA0C5638AB6627CB5B4CF4 /* memory_local_store_test.cc */; };
 		920B6ABF76FDB3547F1CCD84 /* firestore.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 544129D421C2DDC800EFB9CC /* firestore.pb.cc */; };
+		9236478E01DF2EC7DF58B1FC /* index_backfiller_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 1F50E872B3F117A674DA8E94 /* index_backfiller_test.cc */; };
 		925BE64990449E93242A00A2 /* memory_mutation_queue_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 74FBEFA4FE4B12C435011763 /* memory_mutation_queue_test.cc */; };
 		92D7081085679497DC112EDB /* persistence_testing.cc in Sources */ = {isa = PBXBuildFile; fileRef = 9113B6F513D0473AEABBAF1F /* persistence_testing.cc */; };
 		92EFF0CC2993B43CBC7A61FF /* grpc_streaming_reader_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6D964922154AB8F00EB9CFB /* grpc_streaming_reader_test.cc */; };
@@ -1037,6 +1040,7 @@
 		B83A1416C3922E2F3EBA77FE /* grpc_stream_tester.cc in Sources */ = {isa = PBXBuildFile; fileRef = 87553338E42B8ECA05BA987E /* grpc_stream_tester.cc */; };
 		B842780CF42361ACBBB381A9 /* autoid_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 54740A521FC913E500713A1A /* autoid_test.cc */; };
 		B844B264311E18051B1671ED /* value_util_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 40F9D09063A07F710811A84F /* value_util_test.cc */; };
+		B845B9EDED330D0FDAD891BC /* index_backfiller_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 1F50E872B3F117A674DA8E94 /* index_backfiller_test.cc */; };
 		B896E5DE1CC27347FAC009C3 /* BasicCompileTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = DE0761F61F2FE68D003233AF /* BasicCompileTests.swift */; };
 		B921A4F35B58925D958DD9A6 /* reference_set_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 132E32997D781B896672D30A /* reference_set_test.cc */; };
 		B9706A5CD29195A613CF4147 /* bundle_reader_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 6ECAF7DE28A19C69DF386D88 /* bundle_reader_test.cc */; };
@@ -1119,6 +1123,7 @@
 		CBC891BEEC525F4D8F40A319 /* latlng.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 618BBE9220B89AAC00B5BCE7 /* latlng.pb.cc */; };
 		CBDCA7829AAFEB4853C15517 /* bundle_serializer_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B5C2A94EE24E60543F62CC35 /* bundle_serializer_test.cc */; };
 		CC94A33318F983907E9ED509 /* resume_token_spec_test.json in Resources */ = {isa = PBXBuildFile; fileRef = 54DA12A41F315EE100DD57A1 /* resume_token_spec_test.json */; };
+		CCE596E8654A4D2EEA75C219 /* index_backfiller_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 1F50E872B3F117A674DA8E94 /* index_backfiller_test.cc */; };
 		CD0AA9E5D83C00CAAE7C2F67 /* FIRTimestampTest.m in Sources */ = {isa = PBXBuildFile; fileRef = B65D34A7203C99090076A5E1 /* FIRTimestampTest.m */; };
 		CD1E2F356FC71D7E74FCD26C /* leveldb_remote_document_cache_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 0840319686A223CC4AD3FAB1 /* leveldb_remote_document_cache_test.cc */; };
 		CD226D868CEFA9D557EF33A1 /* query_listener_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 7C3F995E040E9E9C5E8514BB /* query_listener_test.cc */; };
@@ -1215,6 +1220,7 @@
 		E1264B172412967A09993EC6 /* byte_string_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 5342CDDB137B4E93E2E85CCA /* byte_string_test.cc */; };
 		E186D002520881AD2906ADDB /* status.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 618BBE9920B89AAC00B5BCE7 /* status.pb.cc */; };
 		E21D819A06D9691A4B313440 /* remote_store_spec_test.json in Resources */ = {isa = PBXBuildFile; fileRef = 3B843E4A1F3930A400548890 /* remote_store_spec_test.json */; };
+		E25DCFEF318E003B8B7B9DC8 /* index_backfiller_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 1F50E872B3F117A674DA8E94 /* index_backfiller_test.cc */; };
 		E27C0996AF6EC6D08D91B253 /* document.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 544129D821C2DDC800EFB9CC /* document.pb.cc */; };
 		E2AE851F9DC4C037CCD05E36 /* remote_document_cache_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 7EB299CF85034F09CFD6F3FD /* remote_document_cache_test.cc */; };
 		E2B15548A3B6796CE5A01975 /* FIRListenerRegistrationTests.mm in Sources */ = {isa = PBXBuildFile; fileRef = 5492E06B202154D500B64F25 /* FIRListenerRegistrationTests.mm */; };
@@ -1421,6 +1427,7 @@
 		1C01D8CE367C56BB2624E299 /* index.pb.h */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.c.h; name = index.pb.h; path = admin/index.pb.h; sourceTree = "<group>"; };
 		1C3F7302BF4AE6CBC00ECDD0 /* resource.pb.cc */ = {isa = PBXFileReference; includeInIndex = 1; path = resource.pb.cc; sourceTree = "<group>"; };
 		1CA9800A53669EFBFFB824E3 /* memory_remote_document_cache_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = memory_remote_document_cache_test.cc; sourceTree = "<group>"; };
+		1F50E872B3F117A674DA8E94 /* index_backfiller_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; path = index_backfiller_test.cc; sourceTree = "<group>"; };
 		214877F52A705012D6720CA0 /* object_value_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = object_value_test.cc; sourceTree = "<group>"; };
 		2220F583583EFC28DE792ABE /* Pods_Firestore_IntegrationTests_tvOS.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_Firestore_IntegrationTests_tvOS.framework; sourceTree = BUILT_PRODUCTS_DIR; };
 		2286F308EFB0534B1BDE05B9 /* memory_target_cache_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = memory_target_cache_test.cc; sourceTree = "<group>"; };
@@ -2098,6 +2105,7 @@
 				75E24C5CD7BC423D48713100 /* counting_query_engine.h */,
 				FFCA39825D9678A03D1845D0 /* document_overlay_cache_test.cc */,
 				DF445D5201750281F1817387 /* document_overlay_cache_test.h */,
+				1F50E872B3F117A674DA8E94 /* index_backfiller_test.cc */,
 				AE4A9E38D65688EE000EE2A1 /* index_manager_test.cc */,
 				73F1F73A2210F3D800E1F692 /* index_manager_test.h */,
 				8E9CD82E60893DDD7757B798 /* leveldb_bundle_cache_test.cc */,
@@ -3642,6 +3650,7 @@
 				E82F8EBBC8CC37299A459E73 /* hashing_test_apple.mm in Sources */,
 				897F3C1936612ACB018CA1DD /* http.pb.cc in Sources */,
 				48BC5801432127A90CFF55E3 /* index.pb.cc in Sources */,
+				167659CDCA47B450F2441454 /* index_backfiller_test.cc in Sources */,
 				FAD97B82766AEC29B7B5A1B7 /* index_manager_test.cc in Sources */,
 				E084921EFB7CF8CB1E950D6C /* iterator_adaptors_test.cc in Sources */,
 				49C04B97AB282FFA82FD98CD /* latlng.pb.cc in Sources */,
@@ -3846,6 +3855,7 @@
 				CD78EEAA1CD36BE691CA3427 /* hashing_test_apple.mm in Sources */,
 				1357806B4CD3A62A8F5DE86D /* http.pb.cc in Sources */,
 				190F9885BAA81587F08CD26C /* index.pb.cc in Sources */,
+				B845B9EDED330D0FDAD891BC /* index_backfiller_test.cc in Sources */,
 				F58A23FEF328EB74F681FE83 /* index_manager_test.cc in Sources */,
 				0E4C94369FFF7EC0C9229752 /* iterator_adaptors_test.cc in Sources */,
 				0FBDD5991E8F6CD5F8542474 /* latlng.pb.cc in Sources */,
@@ -4064,6 +4074,7 @@
 				3B256CCF6AEEE12E22F16BB8 /* hashing_test_apple.mm in Sources */,
 				AB8209455BAA17850D5E196D /* http.pb.cc in Sources */,
 				096BA3A3703AC1491F281618 /* index.pb.cc in Sources */,
+				9236478E01DF2EC7DF58B1FC /* index_backfiller_test.cc in Sources */,
 				4BFEEB7FDD7CD5A693B5B5C1 /* index_manager_test.cc in Sources */,
 				FA334ADC73CFDB703A7C17CD /* iterator_adaptors_test.cc in Sources */,
 				CBC891BEEC525F4D8F40A319 /* latlng.pb.cc in Sources */,
@@ -4282,6 +4293,7 @@
 				BDD2D1812BAD962E3C81A53F /* hashing_test_apple.mm in Sources */,
 				49794806F3D5052E5F61A40D /* http.pb.cc in Sources */,
 				6E8CD8F545C8EDA84918977C /* index.pb.cc in Sources */,
+				E25DCFEF318E003B8B7B9DC8 /* index_backfiller_test.cc in Sources */,
 				650B31A5EC6F8D2AEA79C350 /* index_manager_test.cc in Sources */,
 				86494278BE08F10A8AAF9603 /* iterator_adaptors_test.cc in Sources */,
 				4173B61CB74EB4CD1D89EE68 /* latlng.pb.cc in Sources */,
@@ -4496,6 +4508,7 @@
 				B69CF3F12227386500B281C8 /* hashing_test_apple.mm in Sources */,
 				618BBEB020B89AAC00B5BCE7 /* http.pb.cc in Sources */,
 				77D38E78F7CCB8504450A8FB /* index.pb.cc in Sources */,
+				76FEBDD2793B729BAD2E84C7 /* index_backfiller_test.cc in Sources */,
 				E6357221227031DD77EE5265 /* index_manager_test.cc in Sources */,
 				54A0353520A3D8CB003E0143 /* iterator_adaptors_test.cc in Sources */,
 				618BBEAE20B89AAC00B5BCE7 /* latlng.pb.cc in Sources */,
@@ -4733,6 +4746,7 @@
 				433474A3416B76645FFD17BB /* hashing_test_apple.mm in Sources */,
 				06A3926F89C847846BE4D6BE /* http.pb.cc in Sources */,
 				78E8DDDBE131F3DA9AF9F8B8 /* index.pb.cc in Sources */,
+				CCE596E8654A4D2EEA75C219 /* index_backfiller_test.cc in Sources */,
 				2B4234B962625F9EE68B31AC /* index_manager_test.cc in Sources */,
 				8A79DDB4379A063C30A76329 /* iterator_adaptors_test.cc in Sources */,
 				23C04A637090E438461E4E70 /* latlng.pb.cc in Sources */,

+ 29 - 5
Firestore/core/src/core/firestore_client.cc

@@ -101,8 +101,20 @@ using util::StatusOrCallback;
 using util::ThrowIllegalState;
 using util::TimerId;
 
+namespace {
+
 static const size_t kMaxConcurrentLimboResolutions = 100;
 
+static const auto kInitialGCDelay = std::chrono::minutes(1);
+static const auto kRegularGCDelay = std::chrono::minutes(5);
+
+/** How long we wait to try running index backfill after SDK initialization. */
+static const auto kInitialBackfillDelay = std::chrono::milliseconds(15);
+/** Minimum amount of time between backfill checks, after the first one. */
+static const auto kRegularBackfillDelay = std::chrono::milliseconds(1);
+
+}  // namespace
+
 std::shared_ptr<FirestoreClient> FirestoreClient::Create(
     const DatabaseInfo& database_info,
     const api::Settings& settings,
@@ -236,6 +248,8 @@ void FirestoreClient::Initialize(const User& user, const Settings& settings) {
   // refilling mutation queue, etc.) so must be started after LocalStore.
   local_store_->Start();
   remote_store_->Start();
+
+  ScheduleIndexBackfiller();
 }
 
 FirestoreClient::~FirestoreClient() {
@@ -296,6 +310,8 @@ void FirestoreClient::TerminateInternal() {
   // If we've scheduled LRU garbage collection, cancel it.
   lru_callback_.Cancel();
 
+  backfiller_callback_.Cancel();
+
   remote_store_->Shutdown();
   persistence_->Shutdown();
 
@@ -307,13 +323,9 @@ void FirestoreClient::TerminateInternal() {
   remote_store_.reset();
 }
 
-/**
- * Schedules a callback to try running LRU garbage collection. Reschedules
- * itself after the GC has run.
- */
 void FirestoreClient::ScheduleLruGarbageCollection() {
   std::chrono::milliseconds delay =
-      gc_has_run_ ? regular_gc_delay_ : initial_gc_delay_;
+      gc_has_run_ ? kRegularGCDelay : kInitialGCDelay;
 
   lru_callback_ = worker_queue_->EnqueueAfterDelay(
       delay, TimerId::GarbageCollectionDelay, [this] {
@@ -323,6 +335,18 @@ void FirestoreClient::ScheduleLruGarbageCollection() {
       });
 }
 
+void FirestoreClient::ScheduleIndexBackfiller() {
+  std::chrono::milliseconds delay =
+      backfiller_has_run_ ? kRegularBackfillDelay : kInitialBackfillDelay;
+
+  backfiller_callback_ = worker_queue_->EnqueueAfterDelay(
+      delay, TimerId::IndexBackfillDelay, [this] {
+        local_store_->Backfill();
+        backfiller_has_run_ = true;
+        ScheduleIndexBackfiller();
+      });
+}
+
 void FirestoreClient::DisableNetwork(StatusCallback callback) {
   VerifyNotTerminated();
 

+ 12 - 2
Firestore/core/src/core/firestore_client.h

@@ -204,8 +204,18 @@ class FirestoreClient : public std::enable_shared_from_this<FirestoreClient> {
 
   void TerminateInternal();
 
+  /**
+   * Schedules a callback to try running LRU garbage collection. Reschedules
+   * itself after the GC has run.
+   */
   void ScheduleLruGarbageCollection();
 
+  /**
+   * Schedules a callback to try running index backfiller. Reschedules
+   * itself after the backfiller has run.
+   */
+  void ScheduleIndexBackfiller();
+
   DatabaseInfo database_info_;
   std::shared_ptr<credentials::AppCheckCredentialsProvider>
       app_check_credentials_provider_;
@@ -231,12 +241,12 @@ class FirestoreClient : public std::enable_shared_from_this<FirestoreClient> {
   std::unique_ptr<SyncEngine> sync_engine_;
   std::unique_ptr<EventManager> event_manager_;
 
-  std::chrono::milliseconds initial_gc_delay_ = std::chrono::minutes(1);
-  std::chrono::milliseconds regular_gc_delay_ = std::chrono::minutes(5);
   bool gc_has_run_ = false;
+  bool backfiller_has_run_ = false;
   bool credentials_initialized_ = false;
   local::LruDelegate* _Nullable lru_delegate_;
   util::DelayedOperation lru_callback_;
+  util::DelayedOperation backfiller_callback_;
 };
 
 }  // namespace core

+ 3 - 2
Firestore/core/src/local/document_overlay_cache.cc

@@ -16,6 +16,7 @@
 
 #include "Firestore/core/src/local/document_overlay_cache.h"
 
+#include <set>
 #include <utility>
 
 #include "Firestore/core/src/immutable/sorted_set.h"
@@ -31,8 +32,8 @@ using model::DocumentKeySet;
 using model::Overlay;
 using model::OverlayByDocumentKeyMap;
 
-void DocumentOverlayCache::GetOverlays(OverlayByDocumentKeyMap& dest,
-                                       const DocumentKeySet& keys) const {
+void DocumentOverlayCache::GetOverlays(
+    OverlayByDocumentKeyMap& dest, const std::set<DocumentKey>& keys) const {
   for (const DocumentKey& key : keys) {
     absl::optional<Overlay> overlay = GetOverlay(key);
     if (overlay.has_value()) {

+ 2 - 1
Firestore/core/src/local/document_overlay_cache.h

@@ -18,6 +18,7 @@
 #define FIRESTORE_CORE_SRC_LOCAL_DOCUMENT_OVERLAY_CACHE_H_
 
 #include <cstdlib>
+#include <set>
 #include <unordered_map>
 
 #include "Firestore/core/src/model/document_key.h"
@@ -60,7 +61,7 @@ class DocumentOverlayCache {
    * which there are no overlays.
    */
   virtual void GetOverlays(model::OverlayByDocumentKeyMap& dest,
-                           const model::DocumentKeySet& keys) const;
+                           const std::set<model::DocumentKey>& keys) const;
 
   /**
    * Saves the given document key to mutation map to persistence as overlays.

+ 105 - 0
Firestore/core/src/local/index_backfiller.cc

@@ -0,0 +1,105 @@
+// Copyright 2022 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <algorithm>
+#include <unordered_set>
+#include <utility>
+
+#include "Firestore/core/src/local/index_backfiller.h"
+#include "Firestore/core/src/local/index_manager.h"
+#include "Firestore/core/src/local/local_documents_view.h"
+#include "Firestore/core/src/local/local_store.h"
+#include "Firestore/core/src/local/local_write_result.h"
+#include "Firestore/core/src/local/persistence.h"
+#include "Firestore/core/src/model/field_index.h"
+#include "Firestore/core/src/util/log.h"
+
+namespace firebase {
+namespace firestore {
+namespace local {
+
+namespace {
+
+using model::IndexOffset;
+
+/**
+ * The maximum number of documents to process each time Backfill() is called.
+ */
+static const int kMaxDocumentsToProcess = 50;
+
+}  // namespace
+
+IndexBackfiller::IndexBackfiller() {
+  max_documents_to_process_ = kMaxDocumentsToProcess;
+}
+
+int IndexBackfiller::WriteIndexEntries(const LocalStore* local_store) {
+  IndexManager* index_manager = local_store->index_manager();
+  std::unordered_set<std::string> processed_collection_groups;
+  int documents_remaining = max_documents_to_process_;
+  while (documents_remaining > 0) {
+    const auto collection_group =
+        index_manager->GetNextCollectionGroupToUpdate();
+    if (!collection_group ||
+        (processed_collection_groups.find(collection_group.value()) !=
+         processed_collection_groups.end())) {
+      break;
+    }
+    LOG_DEBUG("Processing collection: %s", collection_group.value());
+    documents_remaining -= WriteEntriesForCollectionGroup(
+        local_store, collection_group.value(), documents_remaining);
+    processed_collection_groups.insert(collection_group.value());
+  }
+  return max_documents_to_process_ - documents_remaining;
+}
+
+int IndexBackfiller::WriteEntriesForCollectionGroup(
+    const LocalStore* local_store,
+    const std::string& collection_group,
+    int documents_remaining_under_cap) const {
+  IndexManager* index_manager = local_store->index_manager();
+  const auto local_documents_view = local_store->local_documents();
+
+  // Use the earliest offset of all field indexes to query the local cache.
+  const auto existing_offset = index_manager->GetMinOffset(collection_group);
+  const auto next_batch = local_documents_view->GetNextDocuments(
+      collection_group, existing_offset, documents_remaining_under_cap);
+  index_manager->UpdateIndexEntries(next_batch.changes());
+
+  const auto new_offset = GetNewOffset(existing_offset, next_batch);
+  LOG_DEBUG("Updating offset: %s", new_offset.ToString());
+  index_manager->UpdateCollectionGroup(collection_group, new_offset);
+
+  return next_batch.changes().size();
+}
+
+model::IndexOffset IndexBackfiller::GetNewOffset(
+    const IndexOffset& existing_offset,
+    const LocalWriteResult& lookup_result) const {
+  auto max_offset = existing_offset;
+  for (const auto& entry : lookup_result.changes()) {
+    auto new_offset = IndexOffset::FromDocument(entry.second);
+    if (new_offset.CompareTo(max_offset) ==
+        util::ComparisonResult::Descending) {
+      max_offset = std::move(new_offset);
+    }
+  }
+  return IndexOffset(
+      max_offset.read_time(), max_offset.document_key(),
+      std::max(lookup_result.batch_id(), existing_offset.largest_batch_id()));
+}
+
+}  // namespace local
+}  // namespace firestore
+}  // namespace firebase

+ 75 - 0
Firestore/core/src/local/index_backfiller.h

@@ -0,0 +1,75 @@
+// Copyright 2022 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef FIRESTORE_CORE_SRC_LOCAL_INDEX_BACKFILLER_H_
+#define FIRESTORE_CORE_SRC_LOCAL_INDEX_BACKFILLER_H_
+
+#include <string>
+
+namespace firebase {
+namespace firestore {
+
+namespace util {
+class AsyncQueue;
+}
+
+namespace model {
+class IndexOffset;
+}
+
+namespace local {
+class Persistence;
+class LocalStore;
+class LocalWriteResult;
+class IndexManager;
+
+/** Implements the steps for backfilling indexes. */
+class IndexBackfiller {
+ public:
+  IndexBackfiller();
+
+  /**
+   * Writes index entries until the cap is reached. Returns the number of
+   * documents processed.
+   */
+  int WriteIndexEntries(const LocalStore* local_store);
+
+ private:
+  friend class IndexBackfillerTest;
+
+  /**
+   * Writes entries for the provided collection group. Returns the number of
+   * documents processed.
+   */
+  int WriteEntriesForCollectionGroup(const LocalStore* local_store,
+                                     const std::string& collection_group,
+                                     int documents_remaining_under_cap) const;
+
+  /** Returns the next offset based on the provided documents. */
+  model::IndexOffset GetNewOffset(const model::IndexOffset& existing_offset,
+                                  const LocalWriteResult& lookup_result) const;
+
+  // For testing
+  void SetMaxDocumentsToProcess(int new_max) {
+    max_documents_to_process_ = new_max;
+  }
+
+  int max_documents_to_process_;
+};
+
+}  // namespace local
+}  // namespace firestore
+}  // namespace firebase
+
+#endif  // FIRESTORE_CORE_SRC_LOCAL_INDEX_BACKFILLER_H_

+ 2 - 1
Firestore/core/src/local/index_manager.h

@@ -146,7 +146,8 @@ class IndexManager {
    * Returns the next collection group to update. Returns `nullopt` if no
    * group exists.
    */
-  virtual absl::optional<std::string> GetNextCollectionGroupToUpdate() = 0;
+  virtual absl::optional<std::string> GetNextCollectionGroupToUpdate()
+      const = 0;
 
   /**
    * Sets the collection group's latest read time.

+ 1 - 2
Firestore/core/src/local/leveldb_index_manager.cc

@@ -708,7 +708,7 @@ std::vector<LevelDbIndexManager::IndexRange> LevelDbIndexManager::CreateRange(
 }
 
 absl::optional<std::string>
-LevelDbIndexManager::GetNextCollectionGroupToUpdate() {
+LevelDbIndexManager::GetNextCollectionGroupToUpdate() const {
   if (next_index_to_update_.empty()) {
     return absl::nullopt;
   }
@@ -725,7 +725,6 @@ void LevelDbIndexManager::UpdateCollectionGroup(
     IndexState updated_state{memoized_max_sequence_number_, offset};
 
     auto state_key = LevelDbIndexStateKey::Key(uid_, field_index.index_id());
-    auto val = EncodeIndexState(updated_state);
     db_->current_transaction()->Put(std::move(state_key),
                                     EncodeIndexState(updated_state));
 

+ 1 - 1
Firestore/core/src/local/leveldb_index_manager.h

@@ -83,7 +83,7 @@ class LevelDbIndexManager : public IndexManager {
   absl::optional<std::vector<model::DocumentKey>> GetDocumentsMatchingTarget(
       const core::Target& target) override;
 
-  absl::optional<std::string> GetNextCollectionGroupToUpdate() override;
+  absl::optional<std::string> GetNextCollectionGroupToUpdate() const override;
 
   void UpdateCollectionGroup(const std::string& collection_group,
                              model::IndexOffset offset) override;

+ 68 - 90
Firestore/core/src/local/leveldb_remote_document_cache.cc

@@ -44,6 +44,7 @@ using core::Query;
 using leveldb::Status;
 using model::DocumentKey;
 using model::DocumentKeySet;
+using model::DocumentVersionMap;
 using model::MutableDocument;
 using model::MutableDocumentMap;
 using model::ResourcePath;
@@ -124,7 +125,7 @@ void LevelDbRemoteDocumentCache::Remove(const DocumentKey& key) {
   db_->current_transaction()->Delete(ldb_key);
 }
 
-MutableDocument LevelDbRemoteDocumentCache::Get(const DocumentKey& key) {
+MutableDocument LevelDbRemoteDocumentCache::Get(const DocumentKey& key) const {
   std::string ldb_key = LevelDbRemoteDocumentKey::Key(key);
   std::string value;
   Status status = db_->current_transaction()->Get(ldb_key, &value);
@@ -139,7 +140,7 @@ MutableDocument LevelDbRemoteDocumentCache::Get(const DocumentKey& key) {
 }
 
 MutableDocumentMap LevelDbRemoteDocumentCache::GetAll(
-    const DocumentKeySet& keys) {
+    const DocumentKeySet& keys) const {
   BackgroundQueue tasks(executor_.get());
   AsyncResults<std::pair<DocumentKey, MutableDocument>> results;
 
@@ -170,115 +171,92 @@ MutableDocumentMap LevelDbRemoteDocumentCache::GetAll(
 }
 
 MutableDocumentMap LevelDbRemoteDocumentCache::GetAllExisting(
-    const DocumentKeySet& keys) {
-  MutableDocumentMap docs = LevelDbRemoteDocumentCache::GetAll(keys);
-  MutableDocumentMap result;
-  for (const auto& kv : docs) {
-    const DocumentKey& key = kv.first;
-    auto& document = kv.second;
-    if (document.is_found_document()) {
-      result = result.insert(key, document);
-    }
+    DocumentVersionMap&& remote_map) const {
+  BackgroundQueue tasks(executor_.get());
+  AsyncResults<std::pair<DocumentKey, MutableDocument>> results;
+  for (const auto& key_version : remote_map) {
+    tasks.Execute([this, &results, key_version] {
+      auto document = Get(key_version.first).WithReadTime(key_version.second);
+      if (document.is_found_document()) {
+        results.Insert(std::make_pair(key_version.first, std::move(document)));
+      }
+    });
   }
+  tasks.AwaitAll();
 
-  return result;
+  MutableDocumentMap map;
+  for (const auto& entry : results.Result()) {
+    map = map.insert(entry.first, entry.second);
+  }
+  return map;
 }
 
-model::MutableDocumentMap LevelDbRemoteDocumentCache::GetAll(
+MutableDocumentMap LevelDbRemoteDocumentCache::GetAll(
     const std::string& collection_group,
     const model::IndexOffset& offset,
     size_t limit) const {
-  (void)collection_group;
-  (void)offset;
-  (void)limit;
-  // TODO(cheryllin): Implement GetAll() together with backfiller
-  return model::MutableDocumentMap();
+  HARD_ASSERT(limit > 0u, "Limit should be at least 1");
+  const auto parents = index_manager_->GetCollectionParents(collection_group);
+  std::vector<ResourcePath> collections;
+  collections.reserve(parents.size());
+  for (const auto& parent : parents) {
+    collections.push_back(parent.Append(collection_group));
+  }
+
+  MutableDocumentMap result;
+  for (auto path = collections.cbegin();
+       path != collections.cend() && result.size() < limit; path++) {
+    const auto remote_docs = GetAll(*path, offset, limit - result.size());
+    for (const auto& doc : remote_docs) {
+      result = result.insert(doc.first, doc.second);
+    }
+  }
+  return result;
 }
 
 MutableDocumentMap LevelDbRemoteDocumentCache::GetAll(
-    const model::ResourcePath& path, const model::IndexOffset& offset) {
+    const model::ResourcePath& path,
+    const model::IndexOffset& offset,
+    const absl::optional<size_t> limit) const {
   // Use the query path as a prefix for testing if a document matches the query.
-  size_t immediate_children_path_length = path.size() + 1;
-
-  if (offset.read_time() != SnapshotVersion::None()) {
-    // Execute an index-free query and filter by read time. This is safe since
-    // all document changes to queries that have a
-    // last_limbo_free_snapshot_version (`since_read_time`) have a read time
-    // set.
-    std::string start_key =
-        LevelDbRemoteDocumentReadTimeKey::KeyPrefix(path, offset.read_time());
-    auto it = db_->current_transaction()->NewIterator();
-    it->Seek(util::ImmediateSuccessor(start_key));
-
-    DocumentKeySet remote_keys;
-
-    LevelDbRemoteDocumentReadTimeKey current_key;
-    for (; it->Valid() && current_key.Decode(it->key()); it->Next()) {
-      const ResourcePath& collection_path = current_key.collection_path();
-      if (collection_path != path) {
-        break;
-      }
 
-      const SnapshotVersion& read_time = current_key.read_time();
-      if (read_time > offset.read_time()) {
-        DocumentKey document_key(path.Append(current_key.document_id()));
-        remote_keys = remote_keys.insert(document_key);
-      } else if (read_time == offset.read_time()) {
-        DocumentKey document_key(path.Append(current_key.document_id()));
-        if (document_key > offset.document_key()) {
-          remote_keys = remote_keys.insert(document_key);
-        }
-      }
-    }
-
-    return LevelDbRemoteDocumentCache::GetAllExisting(remote_keys);
-  } else {
-    BackgroundQueue tasks(executor_.get());
-    AsyncResults<MutableDocument> results;
-
-    // Documents are ordered by key, so we can use a prefix scan to narrow down
-    // the documents we need to match the query against.
-    std::string start_key = LevelDbRemoteDocumentKey::KeyPrefix(path);
-    auto it = db_->current_transaction()->NewIterator();
-    it->Seek(start_key);
-
-    LevelDbRemoteDocumentKey current_key;
-    for (; it->Valid() && current_key.Decode(it->key()); it->Next()) {
-      // The query is actually returning any path that starts with the query
-      // path prefix which may include documents in subcollections. For example,
-      // a query on 'rooms' will return rooms/abc/messages/xyx but we shouldn't
-      // match it. Fix this by discarding rows with document keys more than one
-      // segment longer than the query path.
-      const DocumentKey& document_key = current_key.document_key();
-      if (document_key.path().size() != immediate_children_path_length) {
-        continue;
-      }
+  // Execute an index-free query and filter by read time. This is safe since
+  // all document changes to queries that have a
+  // last_limbo_free_snapshot_version (`since_read_time`) have a read time
+  // set.
+  std::string start_key =
+      LevelDbRemoteDocumentReadTimeKey::KeyPrefix(path, offset.read_time());
+  auto it = db_->current_transaction()->NewIterator();
+  it->Seek(util::ImmediateSuccessor(start_key));
 
-      if (!path.IsPrefixOf(document_key.path())) {
-        break;
-      }
+  DocumentVersionMap remote_map;
 
-      const std::string& contents = it->value();
-      tasks.Execute([this, &results, document_key, contents] {
-        MutableDocument document = DecodeMaybeDocument(contents, document_key);
-        if (document.is_found_document()) {
-          results.Insert(document);
-        }
-      });
+  LevelDbRemoteDocumentReadTimeKey current_key;
+  for (; it->Valid() && current_key.Decode(it->key()) &&
+         (!limit.has_value() || remote_map.size() < limit);
+       it->Next()) {
+    const ResourcePath& collection_path = current_key.collection_path();
+    if (collection_path != path) {
+      break;
     }
 
-    tasks.AwaitAll();
-
-    MutableDocumentMap map;
-    for (const MutableDocument& doc : results.Result()) {
-      map = map.insert(doc.key(), doc);
+    const SnapshotVersion& read_time = current_key.read_time();
+    if (read_time > offset.read_time()) {
+      DocumentKey document_key(path.Append(current_key.document_id()));
+      remote_map[document_key] = read_time;
+    } else if (read_time == offset.read_time()) {
+      DocumentKey document_key(path.Append(current_key.document_id()));
+      if (document_key > offset.document_key()) {
+        remote_map[document_key] = read_time;
+      }
     }
-    return map;
   }
+
+  return LevelDbRemoteDocumentCache::GetAllExisting(std::move(remote_map));
 }
 
 MutableDocument LevelDbRemoteDocumentCache::DecodeMaybeDocument(
-    absl::string_view encoded, const DocumentKey& key) {
+    absl::string_view encoded, const DocumentKey& key) const {
   StringReader reader{encoded};
 
   auto message = Message<firestore_client_MaybeDocument>::TryParse(&reader);

+ 17 - 8
Firestore/core/src/local/leveldb_remote_document_cache.h

@@ -35,6 +35,11 @@ namespace util {
 class Executor;
 }  // namespace util
 
+namespace model {
+class MutableDocument;
+class SnapshotVersion;
+}  // namespace model
+
 namespace local {
 
 class LevelDbPersistence;
@@ -51,25 +56,29 @@ class LevelDbRemoteDocumentCache : public RemoteDocumentCache {
            const model::SnapshotVersion& read_time) override;
   void Remove(const model::DocumentKey& key) override;
 
-  model::MutableDocument Get(const model::DocumentKey& key) override;
-  model::MutableDocumentMap GetAll(const model::DocumentKeySet& keys) override;
+  model::MutableDocument Get(const model::DocumentKey& key) const override;
+  model::MutableDocumentMap GetAll(
+      const model::DocumentKeySet& keys) const override;
   model::MutableDocumentMap GetAll(const std::string& collection_group,
                                    const model::IndexOffset& offset,
                                    size_t limit) const override;
-  model::MutableDocumentMap GetAll(const model::ResourcePath& path,
-                                   const model::IndexOffset& offset) override;
+  model::MutableDocumentMap GetAll(
+      const model::ResourcePath& path,
+      const model::IndexOffset& offset,
+      absl::optional<size_t> limit = absl::nullopt) const override;
 
   void SetIndexManager(IndexManager* manager) override;
 
  private:
   /**
    * Looks up a set of entries in the cache, returning only existing entries of
-   * Type::Document.
+   * Type::Document together with its SnapshotVersion.
    */
-  model::MutableDocumentMap GetAllExisting(const model::DocumentKeySet& keys);
+  model::MutableDocumentMap GetAllExisting(
+      model::DocumentVersionMap&& remote_map) const;
 
-  model::MutableDocument DecodeMaybeDocument(absl::string_view encoded,
-                                             const model::DocumentKey& key);
+  model::MutableDocument DecodeMaybeDocument(
+      absl::string_view encoded, const model::DocumentKey& key) const;
 
   // The LevelDbRemoteDocumentCache instance is owned by LevelDbPersistence.
   LevelDbPersistence* db_;

+ 37 - 5
Firestore/core/src/local/local_documents_view.cc

@@ -19,6 +19,7 @@
 #include <algorithm>
 #include <map>
 #include <memory>
+#include <set>
 #include <string>
 #include <unordered_map>
 #include <utility>
@@ -123,6 +124,37 @@ model::DocumentMap LocalDocumentsView::GetDocumentsMatchingCollectionGroupQuery(
   return results;
 }
 
+LocalWriteResult LocalDocumentsView::GetNextDocuments(
+    const std::string& collection_group,
+    const IndexOffset& offset,
+    int count) const {
+  auto docs = remote_document_cache_->GetAll(collection_group, offset, count);
+  auto overlays = count - docs.size() > 0
+                      ? document_overlay_cache_->GetOverlays(
+                            collection_group, offset.largest_batch_id(),
+                            count - docs.size())
+                      : OverlayByDocumentKeyMap();
+
+  int largest_batch_id = IndexOffset::InitialLargestBatchId();
+  for (const auto& entry : overlays) {
+    if (docs.find(entry.first) == docs.end()) {
+      docs =
+          docs.insert(entry.first, GetBaseDocument(entry.first, entry.second));
+    }
+    // The callsite will use the largest batch ID together with the latest read
+    // time to create a new index offset. Since we only process batch IDs if all
+    // remote documents have been read, no overlay will increase the overall
+    // read time. This is why we only need to special case the batch id.
+    largest_batch_id =
+        std::max(largest_batch_id, entry.second.largest_batch_id());
+  }
+
+  PopulateOverlays(overlays, DocumentKeySet::FromKeysOf(docs));
+  auto local_docs = ComputeViews(docs, std::move(overlays), DocumentKeySet{});
+  return LocalWriteResult::FromOverlayedDocuments(largest_batch_id,
+                                                  std::move(local_docs));
+}
+
 DocumentMap LocalDocumentsView::GetDocumentsMatchingCollectionQuery(
     const Query& query, const IndexOffset& offset) {
   MutableDocumentMap remote_documents =
@@ -201,10 +233,10 @@ model::OverlayedDocumentMap LocalDocumentsView::GetOverlayedDocuments(
 void LocalDocumentsView::PopulateOverlays(
     OverlayByDocumentKeyMap& overlays,
     const model::DocumentKeySet& keys) const {
-  DocumentKeySet missing_overlays;
+  std::set<DocumentKey> missing_overlays;
   for (const DocumentKey& key : keys) {
     if (overlays.find(key) == overlays.end()) {
-      missing_overlays = missing_overlays.insert(key);
+      missing_overlays.insert(key);
     }
   }
   document_overlay_cache_->GetOverlays(overlays, missing_overlays);
@@ -213,7 +245,7 @@ void LocalDocumentsView::PopulateOverlays(
 model::OverlayedDocumentMap LocalDocumentsView::ComputeViews(
     MutableDocumentMap docs,
     OverlayByDocumentKeyMap&& overlays,
-    const DocumentKeySet& existence_state_changed) {
+    const DocumentKeySet& existence_state_changed) const {
   model::MutableDocumentPtrMap recalculate_documents;
   model::FieldMaskMap mutated_fields;
   for (const auto& docs_entry : docs) {
@@ -253,7 +285,7 @@ model::OverlayedDocumentMap LocalDocumentsView::ComputeViews(
 }
 
 void LocalDocumentsView::RecalculateAndSaveOverlays(
-    const DocumentKeySet& keys) {
+    const DocumentKeySet& keys) const {
   model::MutableDocumentPtrMap docs;
   auto remote_docs = remote_document_cache_->GetAll(keys);
   for (const auto& entry : remote_docs) {
@@ -263,7 +295,7 @@ void LocalDocumentsView::RecalculateAndSaveOverlays(
 }
 
 model::FieldMaskMap LocalDocumentsView::RecalculateAndSaveOverlays(
-    model::MutableDocumentPtrMap&& docs) {
+    model::MutableDocumentPtrMap&& docs) const {
   DocumentKeySet keys;
   for (const auto& doc : docs) {
     keys = keys.insert(doc.first);

+ 23 - 3
Firestore/core/src/local/local_documents_view.h

@@ -80,6 +80,26 @@ class LocalDocumentsView {
    */
   model::DocumentMap GetDocuments(const model::DocumentKeySet& keys);
 
+  /**
+   * Given a collection group, returns the next documents that follow the
+   * provided offset, along with an updated batch ID.
+   *
+   * The documents returned by this method are ordered by remote version from
+   * the provided offset. If there are no more remote documents after the
+   * provided offset, documents with mutations in order of batch id from the
+   * offset are returned. Since all documents in a batch are returned together,
+   * the total number of documents returned can exceed count.
+   *
+   * @param collection_group The collection group for the documents.
+   * @param offset The offset to index into.
+   * @param count The number of documents to return
+   * @return A LocalWriteResult with the documents that follow the provided
+   * offset and the last processed batch id.
+   */
+  local::LocalWriteResult GetNextDocuments(const std::string& collection_group,
+                                           const model::IndexOffset& offset,
+                                           int count) const;
+
   /**
    * Similar to `GetDocuments`, but creates the local view from the given
    * `base_docs` without retrieving documents from the local store.
@@ -109,7 +129,7 @@ class LocalDocumentsView {
    * Recalculates overlays by reading the documents from remote document cache
    * first, and save them after they are calculated.
    */
-  void RecalculateAndSaveOverlays(const model::DocumentKeySet& keys);
+  void RecalculateAndSaveOverlays(const model::DocumentKeySet& keys) const;
 
   /**
    * Performs a query against the local view of all documents.
@@ -174,10 +194,10 @@ class LocalDocumentsView {
   model::OverlayedDocumentMap ComputeViews(
       model::MutableDocumentMap docs,
       model::OverlayByDocumentKeyMap&& overlays,
-      const model::DocumentKeySet& existence_state_changed);
+      const model::DocumentKeySet& existence_state_changed) const;
 
   model::FieldMaskMap RecalculateAndSaveOverlays(
-      model::MutableDocumentPtrMap&& docs);
+      model::MutableDocumentPtrMap&& docs) const;
 
   RemoteDocumentCache* remote_document_cache_;
   MutationQueue* mutation_queue_;

+ 8 - 0
Firestore/core/src/local/local_store.cc

@@ -22,6 +22,7 @@
 
 #include "Firestore/core/src/credentials/user.h"
 #include "Firestore/core/src/local/bundle_cache.h"
+#include "Firestore/core/src/local/index_backfiller.h"
 #include "Firestore/core/src/local/local_documents_view.h"
 #include "Firestore/core/src/local/local_view_changes.h"
 #include "Firestore/core/src/local/local_write_result.h"
@@ -119,6 +120,7 @@ LocalStore::LocalStore(Persistence* persistence,
   persistence->reference_delegate()->AddInMemoryPins(&local_view_references_);
   target_id_generator_ = TargetIdGenerator::TargetCacheTargetIdGenerator(0);
   query_engine_->Initialize(local_documents_.get());
+  index_backfiller_ = absl::make_unique<IndexBackfiller>();
 }
 
 LocalStore::~LocalStore() = default;
@@ -566,6 +568,12 @@ LruResults LocalStore::CollectGarbage(LruGarbageCollector* garbage_collector) {
   });
 }
 
+int LocalStore::Backfill() const {
+  return persistence_->Run("Backfill Indexes", [&] {
+    return index_backfiller_->WriteIndexEntries(this);
+  });
+}
+
 bool LocalStore::HasNewerBundle(const bundle::BundleMetadata& metadata) {
   return persistence_->Run("Has newer bundle", [&] {
     absl::optional<bundle::BundleMetadata> cached_metadata =

+ 27 - 0
Firestore/core/src/local/local_store.h

@@ -64,6 +64,7 @@ class QueryEngine;
 class QueryResult;
 class RemoteDocumentCache;
 class TargetCache;
+class IndexBackfiller;
 
 struct LruResults;
 
@@ -245,6 +246,12 @@ class LocalStore : public bundle::BundleCallback {
 
   LruResults CollectGarbage(LruGarbageCollector* garbage_collector);
 
+  /**
+   * Runs a single backfill operation and returns the number of documents
+   * processed.
+   */
+  int Backfill() const;
+
   /**
    * Returns whether the given bundle has already been loaded and its create
    * time is newer or equal to the currently loading bundle.
@@ -276,9 +283,24 @@ class LocalStore : public bundle::BundleCallback {
       const std::string& query_name);
 
  private:
+  friend class IndexBackfiller;
+  friend class IndexBackfillerTest;
   friend class LocalStoreTest;
   friend class LevelDbOverlayMigrationManagerTest;
 
+  IndexManager* index_manager() const {
+    return index_manager_;
+  }
+
+  const LocalDocumentsView* local_documents() const {
+    return local_documents_.get();
+  }
+
+  // For testing
+  IndexBackfiller* index_backfiller() const {
+    return index_backfiller_.get();
+  }
+
   struct DocumentChangeResult {
     model::MutableDocumentMap changed_docs;
     model::DocumentKeySet existence_changed_keys;
@@ -387,6 +409,11 @@ class LocalStore : public bundle::BundleCallback {
    */
   std::unique_ptr<LocalDocumentsView> local_documents_;
 
+  /**
+   * Implements the steps for backfilling indexes.
+   */
+  std::unique_ptr<IndexBackfiller> index_backfiller_;
+
   /** The set of document references maintained by any local views. */
   ReferenceSet local_view_references_;
 

+ 2 - 2
Firestore/core/src/local/memory_index_manager.cc

@@ -116,8 +116,8 @@ MemoryIndexManager::GetDocumentsMatchingTarget(const core::Target& target) {
   return {};
 }
 
-absl::optional<std::string>
-MemoryIndexManager::GetNextCollectionGroupToUpdate() {
+absl::optional<std::string> MemoryIndexManager::GetNextCollectionGroupToUpdate()
+    const {
   return absl::nullopt;
 }
 

+ 1 - 1
Firestore/core/src/local/memory_index_manager.h

@@ -79,7 +79,7 @@ class MemoryIndexManager : public IndexManager {
   absl::optional<std::vector<model::DocumentKey>> GetDocumentsMatchingTarget(
       const core::Target& target) override;
 
-  absl::optional<std::string> GetNextCollectionGroupToUpdate() override;
+  absl::optional<std::string> GetNextCollectionGroupToUpdate() const override;
 
   void UpdateCollectionGroup(const std::string& collection_group,
                              model::IndexOffset offset) override;

+ 5 - 3
Firestore/core/src/local/memory_remote_document_cache.cc

@@ -55,7 +55,7 @@ void MemoryRemoteDocumentCache::Remove(const DocumentKey& key) {
   docs_ = docs_.erase(key);
 }
 
-MutableDocument MemoryRemoteDocumentCache::Get(const DocumentKey& key) {
+MutableDocument MemoryRemoteDocumentCache::Get(const DocumentKey& key) const {
   const auto& entry = docs_.get(key);
   // Note: We create an explicit copy to prevent modifications of the backing
   // data.
@@ -63,7 +63,7 @@ MutableDocument MemoryRemoteDocumentCache::Get(const DocumentKey& key) {
 }
 
 MutableDocumentMap MemoryRemoteDocumentCache::GetAll(
-    const DocumentKeySet& keys) {
+    const DocumentKeySet& keys) const {
   MutableDocumentMap results;
   for (const DocumentKey& key : keys) {
     // Make sure each key has a corresponding entry, which is nullopt in case
@@ -84,7 +84,9 @@ MutableDocumentMap MemoryRemoteDocumentCache::GetAll(const std::string&,
 }
 
 MutableDocumentMap MemoryRemoteDocumentCache::GetAll(
-    const model::ResourcePath& path, const model::IndexOffset& offset) {
+    const model::ResourcePath& path,
+    const model::IndexOffset& offset,
+    const absl::optional<size_t>) const {
   MutableDocumentMap results;
 
   // Documents are ordered by key, so we can use a prefix scan to narrow down

+ 5 - 3
Firestore/core/src/local/memory_remote_document_cache.h

@@ -45,13 +45,15 @@ class MemoryRemoteDocumentCache : public RemoteDocumentCache {
            const model::SnapshotVersion& read_time) override;
   void Remove(const model::DocumentKey& key) override;
 
-  model::MutableDocument Get(const model::DocumentKey& key) override;
-  model::MutableDocumentMap GetAll(const model::DocumentKeySet& keys) override;
+  model::MutableDocument Get(const model::DocumentKey& key) const override;
+  model::MutableDocumentMap GetAll(
+      const model::DocumentKeySet& keys) const override;
   model::MutableDocumentMap GetAll(const std::string&,
                                    const model::IndexOffset&,
                                    size_t) const override;
   model::MutableDocumentMap GetAll(const model::ResourcePath& path,
-                                   const model::IndexOffset& offset) override;
+                                   const model::IndexOffset& offset,
+                                   absl::optional<size_t>) const override;
   void SetIndexManager(IndexManager* manager) override;
 
   std::vector<model::DocumentKey> RemoveOrphanedDocuments(

+ 7 - 3
Firestore/core/src/local/remote_document_cache.h

@@ -66,7 +66,7 @@ class RemoteDocumentCache {
    * @return The cached Document or DeletedDocument entry, or nullopt if we
    * have nothing cached.
    */
-  virtual model::MutableDocument Get(const model::DocumentKey& key) = 0;
+  virtual model::MutableDocument Get(const model::DocumentKey& key) const = 0;
 
   /**
    * Looks up a set of entries in the cache.
@@ -76,7 +76,7 @@ class RemoteDocumentCache {
    * entry is not cached, the corresponding key will be mapped to a null value.
    */
   virtual model::MutableDocumentMap GetAll(
-      const model::DocumentKeySet& keys) = 0;
+      const model::DocumentKeySet& keys) const = 0;
 
   /**
    * Looks up the next "limit" number of documents for a collection group based
@@ -103,10 +103,14 @@ class RemoteDocumentCache {
    * @param path The collection path to match documents against.
    * @param offset The read time and document key to start scanning at
    * (exclusive).
+   * @param limit The maximum number of results to return.
+   * If the limit is not defined, returns all matching documents.
    * @return The set of matching documents.
    */
   virtual model::MutableDocumentMap GetAll(
-      const model::ResourcePath& path, const model::IndexOffset& offset) = 0;
+      const model::ResourcePath& path,
+      const model::IndexOffset& offset,
+      absl::optional<size_t> limit = absl::nullopt) const = 0;
 
   /**
    * Sets the index manager used by remote document cache.

+ 8 - 0
Firestore/core/src/model/field_index.h

@@ -134,6 +134,14 @@ class IndexOffset : public util::Comparable<IndexOffset> {
     return largest_batch_id_;
   }
 
+  /** Creates a pretty-printed description of the IndexOffset for debugging. */
+  std::string ToString() const {
+    return absl::StrCat(
+        "Index Offset: {read time: ", read_time_.ToString(),
+        ", document key: ", document_key_.ToString(),
+        ", largest batch id: ", std::to_string(largest_batch_id_), "}");
+  }
+
   util::ComparisonResult CompareTo(const IndexOffset& rhs) const;
 
  private:

+ 7 - 2
Firestore/core/src/util/async_queue.h

@@ -60,7 +60,7 @@ enum class TimerId {
   OnlineStateTimeout,
 
   /**
-   * A timer used to periodically attempt LRU Garbage collection
+   * A timer used to periodically attempt LRU Garbage collection.
    */
   GarbageCollectionDelay,
 
@@ -68,7 +68,12 @@ enum class TimerId {
    * A timer used to retry transactions. Since there can be multiple concurrent
    * transactions, multiple of these may be in the queue at a given time.
    */
-  RetryTransaction
+  RetryTransaction,
+
+  /**
+   * A timer used to periodically attempt Index Backfill
+   */
+  IndexBackfillDelay
 };
 
 // A serial queue that executes given operations asynchronously, one at a time.

+ 5 - 3
Firestore/core/test/unit/local/counting_query_engine.cc

@@ -159,14 +159,14 @@ void WrappedRemoteDocumentCache::Remove(const model::DocumentKey& key) {
 }
 
 model::MutableDocument WrappedRemoteDocumentCache::Get(
-    const model::DocumentKey& key) {
+    const model::DocumentKey& key) const {
   auto result = subject_->Get(key);
   query_engine_->documents_read_by_key_ += result.is_found_document() ? 1 : 0;
   return result;
 }
 
 model::MutableDocumentMap WrappedRemoteDocumentCache::GetAll(
-    const model::DocumentKeySet& keys) {
+    const model::DocumentKeySet& keys) const {
   auto result = subject_->GetAll(keys);
   query_engine_->documents_read_by_key_ += result.size();
   return result;
@@ -182,7 +182,9 @@ model::MutableDocumentMap WrappedRemoteDocumentCache::GetAll(
 }
 
 model::MutableDocumentMap WrappedRemoteDocumentCache::GetAll(
-    const model::ResourcePath& path, const model::IndexOffset& offset) {
+    const model::ResourcePath& path,
+    const model::IndexOffset& offset,
+    absl::optional<size_t>) const {
   auto result = subject_->GetAll(path, offset);
   query_engine_->documents_read_by_query_ += result.size();
   return result;

+ 5 - 3
Firestore/core/test/unit/local/counting_query_engine.h

@@ -174,16 +174,18 @@ class WrappedRemoteDocumentCache : public RemoteDocumentCache {
 
   void Remove(const model::DocumentKey& key) override;
 
-  model::MutableDocument Get(const model::DocumentKey& key) override;
+  model::MutableDocument Get(const model::DocumentKey& key) const override;
 
-  model::MutableDocumentMap GetAll(const model::DocumentKeySet& keys) override;
+  model::MutableDocumentMap GetAll(
+      const model::DocumentKeySet& keys) const override;
 
   model::MutableDocumentMap GetAll(const std::string& collection_group,
                                    const model::IndexOffset& offset,
                                    size_t limit) const override;
 
   model::MutableDocumentMap GetAll(const model::ResourcePath& path,
-                                   const model::IndexOffset& offset) override;
+                                   const model::IndexOffset& offset,
+                                   absl::optional<size_t>) const override;
 
   void SetIndexManager(IndexManager* manager) override {
     index_manager_ = NOT_NULL(manager);

+ 6 - 7
Firestore/core/test/unit/local/document_overlay_cache_test.cc

@@ -156,9 +156,8 @@ TEST_P(DocumentOverlayCacheTest, ReturnsNullWhenOverlayIsNotFound) {
 TEST_P(DocumentOverlayCacheTest, SkipsNonExistingOverlayInBatchLookup) {
   this->persistence_->Run("SkipsNonExistingOverlayInBatchLookup", [&] {
     model::OverlayByDocumentKeyMap result;
-    auto lookup = model::DocumentKeySet().insert(testutil::Key("coll/doc"));
+    const auto lookup = std::set<DocumentKey>{{testutil::Key("coll/doc")}};
     this->cache_->GetOverlays(result, lookup);
-
     EXPECT_TRUE(result.empty());
   });
 }
@@ -166,7 +165,8 @@ TEST_P(DocumentOverlayCacheTest, SkipsNonExistingOverlayInBatchLookup) {
 TEST_P(DocumentOverlayCacheTest, SupportsEmptyBatchInBatchLookup) {
   this->persistence_->Run("SupportsEmptyBatchInBatchLookup", [&] {
     model::OverlayByDocumentKeyMap result;
-    this->cache_->GetOverlays(result, model::DocumentKeySet());
+    this->cache_->GetOverlays(result, {});
+    EXPECT_TRUE(result.empty());
   });
 }
 
@@ -178,10 +178,9 @@ TEST_P(DocumentOverlayCacheTest, CanReadSavedOverlaysInBatches) {
     this->SaveOverlaysWithMutations(3, {m1, m2, m3});
 
     model::OverlayByDocumentKeyMap result;
-    auto lookup = model::DocumentKeySet()
-                      .insert(testutil::Key("coll1/a"))
-                      .insert(testutil::Key("coll1/b"))
-                      .insert(testutil::Key("coll2/c"));
+    const auto lookup = std::set<DocumentKey>{testutil::Key("coll1/a"),
+                                              testutil::Key("coll1/b"),
+                                              testutil::Key("coll2/c")};
     this->cache_->GetOverlays(result, lookup);
     EXPECT_EQ(m1, result[testutil::Key("coll1/a")].mutation());
     EXPECT_EQ(m2, result[testutil::Key("coll1/b")].mutation());

+ 559 - 0
Firestore/core/test/unit/local/index_backfiller_test.cc

@@ -0,0 +1,559 @@
+// Copyright 2022 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "Firestore/core/src/local/index_backfiller.h"
+#include "Firestore/core/src/core/filter.h"
+#include "Firestore/core/src/core/target.h"
+#include "Firestore/core/src/credentials/user.h"
+#include "Firestore/core/src/local/leveldb_persistence.h"
+#include "Firestore/core/src/local/local_store.h"
+#include "Firestore/core/src/model/delete_mutation.h"
+#include "Firestore/core/src/model/field_index.h"
+#include "Firestore/core/src/model/mutation.h"
+#include "Firestore/core/src/model/patch_mutation.h"
+#include "Firestore/core/src/model/set_mutation.h"
+#include "Firestore/core/test/unit/local/counting_query_engine.h"
+#include "Firestore/core/test/unit/local/persistence_testing.h"
+#include "Firestore/core/test/unit/testutil/testutil.h"
+#include "gtest/gtest.h"
+
+namespace firebase {
+namespace firestore {
+namespace local {
+namespace {
+
+using model::DocumentKey;
+using model::FieldIndex;
+using model::IndexOffset;
+using model::Segment;
+using model::SnapshotVersion;
+using testutil::DeleteMutation;
+using testutil::Doc;
+using testutil::Field;
+using testutil::Filter;
+using testutil::MakeFieldIndex;
+using testutil::Map;
+using testutil::OrderBy;
+using testutil::PatchMutation;
+using testutil::Query;
+using testutil::Version;
+
+}  // namespace
+
+class IndexBackfillerTest : public ::testing::Test {
+ public:
+  IndexBackfillerTest()
+      : persistence_(LevelDbPersistenceForTesting()),
+        remote_document_cache_(persistence_->remote_document_cache()),
+        document_overlay_cache_(persistence_->GetDocumentOverlayCache(
+            credentials::User::Unauthenticated())),
+        local_store_(persistence_.get(),
+                     &query_engine_,
+                     credentials::User::Unauthenticated()),
+        index_manager_(local_store_.index_manager()),
+        index_backfiller_(local_store_.index_backfiller()) {
+    persistence_->Run("Start Index Manager in BackfillerTests",
+                      [&] { index_manager_->Start(); });
+  }
+
+  void AddFieldIndex(const std::string& collection_group,
+                     const std::string& field) const {
+    const auto field_index =
+        MakeFieldIndex(collection_group, field, Segment::Kind::kAscending);
+    persistence_->Run("AddFieldIndex in BackfillerTests",
+                      [&] { index_manager_->AddFieldIndex(field_index); });
+  }
+
+  void AddFieldIndex(const std::string& collection_group,
+                     const std::string& field,
+                     SnapshotVersion version) const {
+    const auto field_index =
+        FieldIndex(-1, collection_group,
+                   {model::Segment{Field(field), Segment::Kind::kAscending}},
+                   model::IndexState(0, version, {},
+                                     IndexOffset::InitialLargestBatchId()));
+    persistence_->Run("AddFieldIndex in BackfillerTests",
+                      [&] { index_manager_->AddFieldIndex(field_index); });
+  }
+
+  void AddFieldIndex(const std::string& collection_group,
+                     const std::string& field,
+                     int64_t sequence_number) const {
+    const auto field_index =
+        FieldIndex(-1, collection_group,
+                   {model::Segment{Field(field), Segment::Kind::kAscending}},
+                   model::IndexState(sequence_number, IndexOffset::None()));
+    persistence_->Run("AddFieldIndex in BackfillerTests",
+                      [&] { index_manager_->AddFieldIndex(field_index); });
+  }
+
+  /** Creates a document and adds it to the RemoteDocumentCache. */
+  void AddDoc(const std::string& path,
+              SnapshotVersion readTime,
+              const std::string& field,
+              int value) const {
+    persistence_->Run("AddDoc in BackfillerTests", [&] {
+      remote_document_cache_->Add(Doc(path, 10, Map(field, value)), readTime);
+    });
+  }
+
+  void SetMaxDocumentsToProcess(int new_max) const {
+    index_backfiller_->SetMaxDocumentsToProcess(new_max);
+  }
+
+  void VerifyQueryResults(
+      const core::Query& query,
+      const std::unordered_set<std::string>& expected_keys) const {
+    persistence_->Run("VerifyQueryResults", [&] {
+      const core::Target& target = query.ToTarget();
+      auto actual_keys = index_manager_->GetDocumentsMatchingTarget(target);
+      if (!actual_keys) {
+        ASSERT_EQ(0u, expected_keys.size());
+      } else {
+        ASSERT_EQ(actual_keys.value().size(), expected_keys.size());
+        for (const auto& key : actual_keys.value()) {
+          EXPECT_TRUE(expected_keys.find(key.ToString()) !=
+                      expected_keys.end());
+        }
+      }
+    });
+  }
+
+  void VerifyQueryResults(
+      const std::string& collection_group,
+      const std::unordered_set<std::string>& expected_keys) const {
+    VerifyQueryResults(Query(collection_group).AddingOrderBy(OrderBy("foo")),
+                       expected_keys);
+  }
+
+  /**
+   * Adds a set mutation to a batch with the specified id for every specified
+   * document path.
+   */
+  void AddSetMutationsToOverlay(int batch_id,
+                                const std::vector<std::string>& paths) const {
+    persistence_->Run("AddSetMutationsToOverlay", [&] {
+      model::MutationByDocumentKeyMap map;
+      for (const auto& path : paths) {
+        map[DocumentKey::FromPathString(path)] =
+            testutil::SetMutation(path, testutil::Map("foo", "bar"));
+      }
+      document_overlay_cache_->SaveOverlays(batch_id, map);
+    });
+  }
+
+  void AddMutationToOverlay(const std::string path,
+                            const model::Mutation& mutation) const {
+    persistence_->Run("AddMutationToOverlay", [&] {
+      document_overlay_cache_->SaveOverlays(
+          5, model::MutationByDocumentKeyMap{
+                 {DocumentKey::FromPathString(path), mutation}});
+    });
+  }
+
+  std::unique_ptr<LevelDbPersistence> persistence_;
+  RemoteDocumentCache* remote_document_cache_;
+  DocumentOverlayCache* document_overlay_cache_;
+  CountingQueryEngine query_engine_;
+  LocalStore local_store_;
+  IndexManager* index_manager_;
+  IndexBackfiller* index_backfiller_;
+};
+
+TEST_F(IndexBackfillerTest, WritesLatestReadTimeToFieldIndexOnCompletion) {
+  AddFieldIndex("coll1", "foo");
+  AddFieldIndex("coll2", "bar");
+  AddDoc("coll1/docA", Version(10), "foo", 1);
+  AddDoc("coll2/docA", Version(20), "bar", 1);
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+
+  auto field_index1 = index_manager_->GetFieldIndexes("coll1").at(0);
+  auto field_index2 = index_manager_->GetFieldIndexes("coll2").at(0);
+  EXPECT_EQ(Version(10), field_index1.index_state().index_offset().read_time());
+  EXPECT_EQ(Version(20), field_index2.index_state().index_offset().read_time());
+
+  AddDoc("coll1/docB", Version(50, 10), "foo", 1);
+  AddDoc("coll1/docC", Version(50), "foo", 1);
+  AddDoc("coll2/docB", Version(60), "bar", 1);
+  AddDoc("coll2/docC", Version(60, 10), "bar", 1);
+
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(4, documents_processed);
+
+  field_index1 = index_manager_->GetFieldIndexes("coll1").at(0);
+  field_index2 = index_manager_->GetFieldIndexes("coll2").at(0);
+  EXPECT_EQ(Version(50, 10),
+            field_index1.index_state().index_offset().read_time());
+  EXPECT_EQ(Version(60, 10),
+            field_index2.index_state().index_offset().read_time());
+}
+
+TEST_F(IndexBackfillerTest, FetchesDocumentsAfterEarliestReadTime) {
+  AddFieldIndex("coll1", "foo", Version(10));
+
+  // Documents before read time should not be fetched.
+  AddDoc("coll1/docA", Version(9), "foo", 1);
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(0, documents_processed);
+
+  // Read time should be the highest read time from the cache.
+  auto field_index = index_manager_->GetFieldIndexes("coll1").at(0);
+  EXPECT_EQ(IndexOffset(Version(10), DocumentKey::Empty(),
+                        IndexOffset::InitialLargestBatchId()),
+            field_index.index_state().index_offset());
+
+  // Documents that are after the earliest read time
+  // but before field index read time are fetched.
+  AddDoc("coll1/docB", Version(19), "boo", 1);
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  // Field indexes should now hold the latest read time
+  field_index = index_manager_->GetFieldIndexes("coll1").at(0);
+  EXPECT_EQ(Version(19), field_index.index_state().index_offset().read_time());
+}
+
+TEST_F(IndexBackfillerTest, WritesIndexEntries) {
+  AddFieldIndex("coll1", "foo");
+  AddFieldIndex("coll2", "bar");
+  AddDoc("coll1/docA", Version(10), "foo", 1);
+  AddDoc("coll1/docB", Version(10), "boo", 1);
+  AddDoc("coll2/docA", Version(10), "bar", 1);
+  AddDoc("coll2/docB", Version(10), "car", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(4, documents_processed);
+}
+
+TEST_F(IndexBackfillerTest, WritesOldestDocumentFirst) {
+  SetMaxDocumentsToProcess(2);
+
+  AddFieldIndex("coll1", "foo");
+  AddDoc("coll1/docA", Version(5), "foo", 1);
+  AddDoc("coll1/docB", Version(3), "foo", 1);
+  AddDoc("coll1/docC", Version(10), "foo", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+
+  VerifyQueryResults("coll1", {"coll1/docA", "coll1/docB"});
+
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  VerifyQueryResults("coll1", {"coll1/docA", "coll1/docB", "coll1/docC"});
+}
+
+TEST_F(IndexBackfillerTest, UsesDocumentKeyOffsetForLargeSnapshots) {
+  SetMaxDocumentsToProcess(2);
+
+  AddFieldIndex("coll1", "foo");
+  AddDoc("coll1/docA", Version(1), "foo", 1);
+  AddDoc("coll1/docB", Version(1), "foo", 1);
+  AddDoc("coll1/docC", Version(1), "foo", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+
+  VerifyQueryResults("coll1", {"coll1/docA", "coll1/docB"});
+
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  VerifyQueryResults("coll1", {"coll1/docA", "coll1/docB", "coll1/docC"});
+}
+
+TEST_F(IndexBackfillerTest, UpdatesCollectionGroups) {
+  SetMaxDocumentsToProcess(2);
+
+  AddFieldIndex("coll1", "foo");
+  AddFieldIndex("coll2", "foo");
+
+  AddDoc("coll1/docA", Version(10), "foo", 1);
+  AddDoc("coll1/docB", Version(20), "foo", 1);
+  AddDoc("coll2/docA", Version(30), "foo", 1);
+
+  absl::optional<std::string> collection_group =
+      index_manager_->GetNextCollectionGroupToUpdate();
+  ASSERT_TRUE(collection_group.has_value());
+  ASSERT_EQ("coll1", collection_group.value());
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+
+  // Check that coll1 was backfilled and that coll2 is next
+  collection_group = index_manager_->GetNextCollectionGroupToUpdate();
+  ASSERT_TRUE(collection_group.has_value());
+  ASSERT_EQ("coll2", collection_group.value());
+}
+
+TEST_F(IndexBackfillerTest, PrioritizesNewCollectionGroups) {
+  SetMaxDocumentsToProcess(1);
+
+  // In this test case, `coll3` is a new collection group that hasn't been
+  // indexed, so it should be processed ahead of the other collection groups.
+  AddFieldIndex("coll1", "foo", /* sequenceNumber= */ 1);
+  AddFieldIndex("coll2", "foo", /* sequenceNumber= */ 2);
+  AddFieldIndex("coll3", "foo", /* sequenceNumber= */ 0);
+
+  AddDoc("coll1/doc", Version(10), "foo", 1);
+  AddDoc("coll2/doc", Version(20), "foo", 1);
+  AddDoc("coll3/doc", Version(30), "foo", 1);
+
+  // Check that coll3 is the next collection ID the backfiller should update
+  absl::optional<std::string> collection_group =
+      index_manager_->GetNextCollectionGroupToUpdate();
+  ASSERT_TRUE(collection_group.has_value());
+  ASSERT_EQ("coll3", collection_group.value());
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  VerifyQueryResults("coll3", {"coll3/doc"});
+}
+
+TEST_F(IndexBackfillerTest, WritesUntilCap) {
+  SetMaxDocumentsToProcess(3);
+
+  AddFieldIndex("coll1", "foo");
+  AddFieldIndex("coll2", "foo");
+  AddDoc("coll1/docA", Version(10), "foo", 1);
+  AddDoc("coll1/docB", Version(20), "foo", 1);
+  AddDoc("coll2/docA", Version(30), "foo", 1);
+  AddDoc("coll2/docA", Version(40), "foo", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(3, documents_processed);
+
+  VerifyQueryResults("coll1", {"coll1/docA", "coll1/docB"});
+  VerifyQueryResults("coll2", {"coll2/docA"});
+}
+
+TEST_F(IndexBackfillerTest, UsesLatestReadTimeForEmptyCollections) {
+  AddFieldIndex("coll", "foo", Version(1));
+  AddDoc("readtime/doc", Version(1), "foo", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(0, documents_processed);
+
+  AddDoc("coll/ignored", Version(2), "foo", 1);
+  AddDoc("coll/added", Version(3), "foo", 1);
+
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+}
+
+TEST_F(IndexBackfillerTest, HandlesLocalMutationsAfterRemoteDocs) {
+  SetMaxDocumentsToProcess(2);
+  AddFieldIndex("coll1", "foo");
+
+  AddDoc("coll1/docA", Version(10), "foo", 1);
+  AddDoc("coll1/docB", Version(20), "foo", 1);
+  AddDoc("coll1/docC", Version(30), "foo", 1);
+  AddSetMutationsToOverlay(1, {"coll1/docD"});
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+  VerifyQueryResults("coll1", {"coll1/docA", "coll1/docB"});
+
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+  VerifyQueryResults("coll1",
+                     {"coll1/docA", "coll1/docB", "coll1/docC", "coll1/docD"});
+}
+
+TEST_F(IndexBackfillerTest,
+       MutationsUpToDocumentLimitAndUpdatesBatchIdOnIndex) {
+  SetMaxDocumentsToProcess(2);
+  AddFieldIndex("coll1", "foo");
+  AddDoc("coll1/docA", Version(10), "foo", 1);
+  AddSetMutationsToOverlay(2, {"coll1/docB"});
+  AddSetMutationsToOverlay(3, {"coll1/docC"});
+  AddSetMutationsToOverlay(4, {"coll1/docD"});
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+  VerifyQueryResults("coll1", {"coll1/docA", "coll1/docB"});
+  auto field_index = index_manager_->GetFieldIndexes("coll1").at(0);
+  ASSERT_EQ(2, field_index.index_state().index_offset().largest_batch_id());
+
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+  VerifyQueryResults("coll1",
+                     {"coll1/docA", "coll1/docB", "coll1/docC", "coll1/docD"});
+  field_index = index_manager_->GetFieldIndexes("coll1").at(0);
+  ASSERT_EQ(4, field_index.index_state().index_offset().largest_batch_id());
+}
+
+TEST_F(IndexBackfillerTest, MutationFinishesMutationBatchEvenIfItExceedsLimit) {
+  SetMaxDocumentsToProcess(2);
+  AddFieldIndex("coll1", "foo");
+  AddDoc("coll1/docA", Version(10), "foo", 1);
+  AddSetMutationsToOverlay(2, {"coll1/docB", "coll1/docC", "coll1/docD"});
+  AddSetMutationsToOverlay(3, {"coll1/docE"});
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(4, documents_processed);
+  VerifyQueryResults("coll1",
+                     {"coll1/docA", "coll1/docB", "coll1/docC", "coll1/docD"});
+}
+
+TEST_F(IndexBackfillerTest, MutationsFromHighWaterMark) {
+  SetMaxDocumentsToProcess(2);
+  AddFieldIndex("coll1", "foo");
+  AddDoc("coll1/docA", Version(10), "foo", 1);
+  AddSetMutationsToOverlay(3, {"coll1/docB"});
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+  VerifyQueryResults("coll1", {"coll1/docA", "coll1/docB"});
+
+  AddSetMutationsToOverlay(1, {"coll1/docC"});
+  AddSetMutationsToOverlay(2, {"coll1/docD"});
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(0, documents_processed);
+}
+
+TEST_F(IndexBackfillerTest, UpdatesExistingDocToNewValue) {
+  const auto& query = Query("coll").AddingFilter(Filter("foo", "==", 2));
+  AddFieldIndex("coll", "foo");
+
+  AddDoc("coll/doc", Version(10), "foo", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+  VerifyQueryResults(query, {});
+
+  // Update doc to new remote version with new value.
+  AddDoc("coll/doc", Version(40), "foo", 2);
+  local_store_.Backfill();
+
+  VerifyQueryResults(query, {"coll/doc"});
+}
+
+TEST_F(IndexBackfillerTest, UpdatesDocsThatNoLongerMatch) {
+  const auto& query = Query("coll").AddingFilter(Filter("foo", ">", 0));
+  AddFieldIndex("coll", "foo");
+  AddDoc("coll/doc", Version(10), "foo", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+  VerifyQueryResults(query, {"coll/doc"});
+
+  // Update doc to new remote version with new value that doesn't match field
+  // index.
+  AddDoc("coll/doc", Version(40), "foo", -1);
+
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+  VerifyQueryResults(query, {});
+}
+
+TEST_F(IndexBackfillerTest, DoesNotProcessSameDocumentTwice) {
+  AddFieldIndex("coll", "foo");
+  AddDoc("coll/doc", Version(5), "foo", 1);
+  AddSetMutationsToOverlay(1, {"coll/doc"});
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  const auto field_index = index_manager_->GetFieldIndexes("coll").at(0);
+  ASSERT_EQ(Version(5), field_index.index_state().index_offset().read_time());
+  ASSERT_EQ(1, field_index.index_state().index_offset().largest_batch_id());
+}
+
+TEST_F(IndexBackfillerTest, AppliesSetToRemoteDoc) {
+  AddFieldIndex("coll", "foo");
+  AddDoc("coll/doc", Version(5), "boo", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  model::Mutation patch = PatchMutation("coll/doc", Map("foo", "bar"));
+  AddMutationToOverlay("coll/doc", patch);
+
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  VerifyQueryResults("coll", {"coll/doc"});
+}
+
+TEST_F(IndexBackfillerTest, AppliesPatchToRemoteDoc) {
+  const auto& query_a = Query("coll").AddingOrderBy(OrderBy("a"));
+  const auto& query_b = Query("coll").AddingOrderBy(OrderBy("b"));
+
+  AddFieldIndex("coll", "a");
+  AddFieldIndex("coll", "b");
+  AddDoc("coll/doc", Version(5), "a", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  VerifyQueryResults(query_a, {"coll/doc"});
+  VerifyQueryResults(query_b, {});
+
+  model::Mutation patch = PatchMutation("coll/doc", Map("b", 1));
+  AddMutationToOverlay("coll/doc", patch);
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  VerifyQueryResults(query_a, {"coll/doc"});
+  VerifyQueryResults(query_b, {"coll/doc"});
+}
+
+TEST_F(IndexBackfillerTest, AppliesDeleteToRemoteDoc) {
+  AddFieldIndex("coll", "foo");
+  AddDoc("coll/doc", Version(5), "foo", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  const model::DeleteMutation delete_mutation = DeleteMutation("coll/doc");
+  AddMutationToOverlay("coll/doc", delete_mutation);
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(1, documents_processed);
+
+  persistence_->Run("BackfillAppliesDeleteToRemoteDoc", [&] {
+    auto query = Query("coll").AddingFilter(Filter("foo", "==", 2));
+    const core::Target& target = query.ToTarget();
+    const auto matching = index_manager_->GetDocumentsMatchingTarget(target);
+    ASSERT_TRUE(matching.has_value() && matching.value().empty());
+  });
+}
+
+TEST_F(IndexBackfillerTest, ReindexesDocumentsWhenNewIndexIsAdded) {
+  const auto& query_a = Query("coll").AddingOrderBy(OrderBy("a"));
+  const auto& query_b = Query("coll").AddingOrderBy(OrderBy("b"));
+
+  AddFieldIndex("coll", "a");
+  AddDoc("coll/doc1", Version(1), "a", 1);
+  AddDoc("coll/doc2", Version(1), "b", 1);
+
+  int documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+  VerifyQueryResults(query_a, {"coll/doc1"});
+  VerifyQueryResults(query_b, {});
+
+  AddFieldIndex("coll", "b");
+  documents_processed = local_store_.Backfill();
+  ASSERT_EQ(2, documents_processed);
+
+  VerifyQueryResults(query_a, {"coll/doc1"});
+  VerifyQueryResults(query_b, {"coll/doc2"});
+}
+
+}  // namespace local
+}  // namespace firestore
+}  // namespace firebase

+ 1 - 2
Firestore/core/test/unit/local/index_manager_test.h

@@ -22,14 +22,13 @@
 #include <vector>
 
 #include "Firestore/core/src/local/index_manager.h"
+#include "Firestore/core/src/local/persistence.h"
 #include "gtest/gtest.h"
 
 namespace firebase {
 namespace firestore {
 namespace local {
 
-class Persistence;
-
 using FactoryFunc = std::unique_ptr<Persistence> (*)();
 
 class IndexManagerTest : public ::testing::TestWithParam<FactoryFunc> {

+ 2 - 2
Firestore/core/test/unit/local/query_engine_test.cc

@@ -151,10 +151,10 @@ void QueryEngineTestBase::AddDocuments(
 }
 
 void QueryEngineTestBase::AddDocumentWithEventVersion(
-    const SnapshotVersion& eventVersion,
+    const SnapshotVersion& event_version,
     const std::vector<MutableDocument>& docs) {
   for (const MutableDocument& doc : docs) {
-    remote_document_cache_->Add(doc, eventVersion);
+    remote_document_cache_->Add(doc, event_version);
   }
 }
 

+ 1 - 1
Firestore/core/test/unit/local/query_engine_test.h

@@ -85,7 +85,7 @@ class QueryEngineTestBase : public testing::Test {
    * given snapshot version.
    */
   void AddDocumentWithEventVersion(
-      const model::SnapshotVersion& eventVersion,
+      const model::SnapshotVersion& event_version,
       const std::vector<model::MutableDocument>& docs);
 
   /** Adds a mutation to the mutation queue. */

+ 6 - 1
Firestore/core/test/unit/testutil/testutil.cc

@@ -226,6 +226,10 @@ model::SnapshotVersion Version(int64_t version) {
   return model::SnapshotVersion{Timestamp::FromTimePoint(timepoint)};
 }
 
+model::SnapshotVersion Version(int64_t seconds, int32_t nanoseconds) {
+  return model::SnapshotVersion{Timestamp(seconds, nanoseconds)};
+}
+
 model::MutableDocument Doc(absl::string_view key,
                            int64_t version,
                            Message<google_firestore_v1_Value> data) {
@@ -236,7 +240,8 @@ model::MutableDocument Doc(absl::string_view key,
 
 model::MutableDocument Doc(absl::string_view key, int64_t version) {
   return MutableDocument::FoundDocument(Key(key), Version(version),
-                                        ObjectValue{});
+                                        ObjectValue{})
+      .WithReadTime(Version(version));
 }
 
 model::MutableDocument DeletedDoc(absl::string_view key, int64_t version) {

+ 2 - 0
Firestore/core/test/unit/testutil/testutil.h

@@ -287,6 +287,8 @@ model::ResourcePath Resource(absl::string_view field);
  */
 model::SnapshotVersion Version(int64_t version);
 
+model::SnapshotVersion Version(int64_t seconds, int32_t nanoseconds);
+
 model::MutableDocument Doc(absl::string_view key, int64_t version = 0);
 
 model::MutableDocument Doc(absl::string_view key,