Browse Source

[realppl 11] Add query to pipeline support

wu-hui 8 months ago
parent
commit
3a1049500c

+ 3 - 0
.gitignore

@@ -61,6 +61,8 @@ profile
 DerivedData
 *.hmap
 *.ipa
+# Xcode index build files
+.index-build/
 
 # Swift Package Manager
 Package.resolved
@@ -166,3 +168,4 @@ Firestore/Example/GoogleService-Info.plist
 
 # FirebaseVertexAI test data
 vertexai-sdk-test-data
+

+ 9 - 8
Firestore/Example/Firestore.xcodeproj/project.pbxproj

@@ -140,7 +140,6 @@
 		1145D70555D8CDC75183A88C /* leveldb_mutation_queue_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 5C7942B6244F4C416B11B86C /* leveldb_mutation_queue_test.cc */; };
 		11627F3A48F710D654829807 /* comparison_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 87DD1A65EBA9FFC1FFAAE657 /* comparison_test.cc */; };
 		117AFA7934A52466633E12C1 /* FSTTestingHooks.mm in Sources */ = {isa = PBXBuildFile; fileRef = D85AC18C55650ED230A71B82 /* FSTTestingHooks.mm */; };
-		11A5189E73D954824F015424 /* pipeline_util_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 0401C6FDE59C493BFBD5DFED /* pipeline_util_test.cc */; };
 		11BC867491A6631D37DE56A8 /* async_testing.cc in Sources */ = {isa = PBXBuildFile; fileRef = 872C92ABD71B12784A1C5520 /* async_testing.cc */; };
 		11EBD28DBD24063332433947 /* value_util_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 40F9D09063A07F710811A84F /* value_util_test.cc */; };
 		11F8EE69182C9699E90A9E3D /* database_info_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = AB38D92E20235D22000A432D /* database_info_test.cc */; };
@@ -149,6 +148,9 @@
 		121F0FB9DCCBFB7573C7AF48 /* bundle_serializer_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B5C2A94EE24E60543F62CC35 /* bundle_serializer_test.cc */; };
 		124AAEE987451820F24EEA8E /* user_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = CCC9BD953F121B9E29F9AA42 /* user_test.cc */; };
 		125B1048ECB755C2106802EB /* executor_std_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6FB4687208F9B9100554BA2 /* executor_std_test.cc */; };
+		128F2B012E254E2C0006327E /* QueryToPipelineTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 128F2B002E254E2C0006327E /* QueryToPipelineTests.swift */; };
+		128F2B022E254E2C0006327E /* QueryToPipelineTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 128F2B002E254E2C0006327E /* QueryToPipelineTests.swift */; };
+		128F2B032E254E2C0006327E /* QueryToPipelineTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 128F2B002E254E2C0006327E /* QueryToPipelineTests.swift */; };
 		1290FA77A922B76503AE407C /* lru_garbage_collector_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 277EAACC4DD7C21332E8496A /* lru_garbage_collector_test.cc */; };
 		1291D9F5300AFACD1FBD262D /* array_sorted_map_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 54EB764C202277B30088B8F3 /* array_sorted_map_test.cc */; };
 		1296CECE2DEE97F5007F8552 /* RealtimePipelineTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1296CECD2DEE97EF007F8552 /* RealtimePipelineTests.swift */; };
@@ -524,7 +526,6 @@
 		48720B5768AFA2B2F3E14C04 /* Validation_BloomFilterTest_MD5_500_1_bloom_filter_proto.json in Resources */ = {isa = PBXBuildFile; fileRef = D8E530B27D5641B9C26A452C /* Validation_BloomFilterTest_MD5_500_1_bloom_filter_proto.json */; };
 		48926FF55484E996B474D32F /* Validation_BloomFilterTest_MD5_500_01_membership_test_result.json in Resources */ = {isa = PBXBuildFile; fileRef = DD990FD89C165F4064B4F608 /* Validation_BloomFilterTest_MD5_500_01_membership_test_result.json */; };
 		489D672CAA09B9BC66798E9F /* status.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 618BBE9920B89AAC00B5BCE7 /* status.pb.cc */; };
-		48A9AD22B0601C52B0522CF7 /* pipeline_util_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 0401C6FDE59C493BFBD5DFED /* pipeline_util_test.cc */; };
 		48BC5801432127A90CFF55E3 /* index.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 395E8B07639E69290A929695 /* index.pb.cc */; };
 		48D1B38B93D34F1B82320577 /* view_testing.cc in Sources */ = {isa = PBXBuildFile; fileRef = A5466E7809AD2871FFDE6C76 /* view_testing.cc */; };
 		48F44AA226FAD5DE4EAC3798 /* leveldb_query_engine_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = DB1F1E1B1ED15E8D042144B1 /* leveldb_query_engine_test.cc */; };
@@ -723,7 +724,6 @@
 		5556B648B9B1C2F79A706B4F /* common.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 544129D221C2DDC800EFB9CC /* common.pb.cc */; };
 		55B9A6ACDF95D356EA501D92 /* Pods_Firestore_Example_iOS.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = BB5A5E6DD07DA3EB7AD46CA7 /* Pods_Firestore_Example_iOS.framework */; };
 		55E84644D385A70E607A0F91 /* leveldb_local_store_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 5FF903AEFA7A3284660FA4C5 /* leveldb_local_store_test.cc */; };
-		563FE05627C7E66469E99292 /* pipeline_util_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 0401C6FDE59C493BFBD5DFED /* pipeline_util_test.cc */; };
 		568EC1C0F68A7B95E57C8C6C /* leveldb_key_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 54995F6E205B6E12004EFFA0 /* leveldb_key_test.cc */; };
 		56D85436D3C864B804851B15 /* string_format_apple_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = 9CFD366B783AE27B9E79EE7A /* string_format_apple_test.mm */; };
 		57171BD004A1691B19A76453 /* Validation_BloomFilterTest_MD5_1_0001_membership_test_result.json in Resources */ = {isa = PBXBuildFile; fileRef = C939D1789E38C09F9A0C1157 /* Validation_BloomFilterTest_MD5_1_0001_membership_test_result.json */; };
@@ -962,7 +962,6 @@
 		75C6CECF607CA94F56260BAB /* memory_document_overlay_cache_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 29D9C76922DAC6F710BC1EF4 /* memory_document_overlay_cache_test.cc */; };
 		75CC1D1F7F1093C2E09D9998 /* inequality_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = A410E38FA5C3EB5AECDB6F1C /* inequality_test.cc */; };
 		75D124966E727829A5F99249 /* FIRTypeTests.mm in Sources */ = {isa = PBXBuildFile; fileRef = 5492E071202154D600B64F25 /* FIRTypeTests.mm */; };
-		7676C06AF7FF67806747E4F0 /* pipeline_util_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 0401C6FDE59C493BFBD5DFED /* pipeline_util_test.cc */; };
 		76A5447D76F060E996555109 /* task_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 899FC22684B0F7BEEAE13527 /* task_test.cc */; };
 		76AD5862714F170251BDEACB /* Validation_BloomFilterTest_MD5_50000_0001_bloom_filter_proto.json in Resources */ = {isa = PBXBuildFile; fileRef = A5D9044B72061CAF284BC9E4 /* Validation_BloomFilterTest_MD5_50000_0001_bloom_filter_proto.json */; };
 		76C18D1BA96E4F5DF1BF7F4B /* Validation_BloomFilterTest_MD5_500_1_membership_test_result.json in Resources */ = {isa = PBXBuildFile; fileRef = 8AB49283E544497A9C5A0E59 /* Validation_BloomFilterTest_MD5_500_1_membership_test_result.json */; };
@@ -1650,7 +1649,6 @@
 		E1016ECF143B732E7821358E /* byte_stream_apple_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = 7628664347B9C96462D4BF17 /* byte_stream_apple_test.mm */; };
 		E11DDA3DD75705F26245E295 /* FIRCollectionReferenceTests.mm in Sources */ = {isa = PBXBuildFile; fileRef = 5492E045202154AA00B64F25 /* FIRCollectionReferenceTests.mm */; };
 		E1264B172412967A09993EC6 /* byte_string_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 5342CDDB137B4E93E2E85CCA /* byte_string_test.cc */; };
-		E14DBE1D9FC94B5E7E391BEE /* pipeline_util_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 0401C6FDE59C493BFBD5DFED /* pipeline_util_test.cc */; };
 		E15A05789FF01F44BCAE75EF /* fields_array_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = BA4CBA48204C9E25B56993BC /* fields_array_test.cc */; };
 		E186D002520881AD2906ADDB /* status.pb.cc in Sources */ = {isa = PBXBuildFile; fileRef = 618BBE9920B89AAC00B5BCE7 /* status.pb.cc */; };
 		E1DB8E1A4CF3DCE2AE8454D8 /* string_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = EEF23C7104A4D040C3A8CF9B /* string_test.cc */; };
@@ -1676,7 +1674,6 @@
 		E54AC3EA240C05B3720A2FE9 /* Validation_BloomFilterTest_MD5_5000_0001_bloom_filter_proto.json in Resources */ = {isa = PBXBuildFile; fileRef = 728F617782600536F2561463 /* Validation_BloomFilterTest_MD5_5000_0001_bloom_filter_proto.json */; };
 		E56EEC9DAC455E2BE77D110A /* memory_document_overlay_cache_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 29D9C76922DAC6F710BC1EF4 /* memory_document_overlay_cache_test.cc */; };
 		E59F597947D3E130A57E1B5E /* Validation_BloomFilterTest_MD5_1_1_membership_test_result.json in Resources */ = {isa = PBXBuildFile; fileRef = 3369AC938F82A70685C5ED58 /* Validation_BloomFilterTest_MD5_1_1_membership_test_result.json */; };
-		E5FE2BEECD70D59361B51540 /* pipeline_util_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 0401C6FDE59C493BFBD5DFED /* pipeline_util_test.cc */; };
 		E63342115B1DA65DB6F2C59A /* leveldb_local_store_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 5FF903AEFA7A3284660FA4C5 /* leveldb_local_store_test.cc */; };
 		E6357221227031DD77EE5265 /* index_manager_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = AE4A9E38D65688EE000EE2A1 /* index_manager_test.cc */; };
 		E6603BA4B16C9E1422DD3A4B /* FSTTestingHooks.mm in Sources */ = {isa = PBXBuildFile; fileRef = D85AC18C55650ED230A71B82 /* FSTTestingHooks.mm */; };
@@ -1908,8 +1905,7 @@
 		014C60628830D95031574D15 /* random_access_queue_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = random_access_queue_test.cc; sourceTree = "<group>"; };
 		01D10113ECC5B446DB35E96D /* byte_stream_cpp_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = byte_stream_cpp_test.cc; sourceTree = "<group>"; };
 		03BD47161789F26754D3B958 /* Pods-Firestore_Benchmarks_iOS.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_Benchmarks_iOS.release.xcconfig"; path = "Target Support Files/Pods-Firestore_Benchmarks_iOS/Pods-Firestore_Benchmarks_iOS.release.xcconfig"; sourceTree = "<group>"; };
-		0401C6FDE59C493BFBD5DFED /* pipeline_util_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; path = pipeline_util_test.cc; sourceTree = "<group>"; };
-		0458BABD8F8738AD16F4A2FE /* array_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; name = array_test.cc; path = expressions/array_test.cc; sourceTree = "<group>"; };
+		0458BABD8F8738AD16F4A2FE /* array_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; name = array_test.cc; path = expressions/array_test.cc; sourceTree = "<group>"; };
 		045D39C4A7D52AF58264240F /* remote_document_cache_test.h */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.c.h; path = remote_document_cache_test.h; sourceTree = "<group>"; };
 		0473AFFF5567E667A125347B /* ordered_code_benchmark.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = ordered_code_benchmark.cc; sourceTree = "<group>"; };
 		062072B62773A055001655D7 /* AsyncAwaitIntegrationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AsyncAwaitIntegrationTests.swift; sourceTree = "<group>"; };
@@ -1923,6 +1919,7 @@
 		1235769122B7E915007DDFA9 /* EncodableFieldValueTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EncodableFieldValueTests.swift; sourceTree = "<group>"; };
 		1235769422B86E65007DDFA9 /* FirestoreEncoderTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = FirestoreEncoderTests.swift; sourceTree = "<group>"; };
 		124C932B22C1642C00CA8C2D /* CodableIntegrationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CodableIntegrationTests.swift; sourceTree = "<group>"; };
+		128F2B002E254E2C0006327E /* QueryToPipelineTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = QueryToPipelineTests.swift; sourceTree = "<group>"; };
 		1296CECD2DEE97EF007F8552 /* RealtimePipelineTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RealtimePipelineTests.swift; sourceTree = "<group>"; };
 		129A369928CA555B005AE7E2 /* FIRCountTests.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = FIRCountTests.mm; sourceTree = "<group>"; };
 		12F4357299652983A615F886 /* LICENSE */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text; name = LICENSE; path = ../LICENSE; sourceTree = "<group>"; };
@@ -2505,6 +2502,7 @@
 				59BF06E5A4988F9F949DD871 /* PipelineApiTests.swift */,
 				861684E49DAC993D153E60D0 /* PipelineTests.swift */,
 				621D620928F9CE7400D2FA26 /* QueryIntegrationTests.swift */,
+				128F2B002E254E2C0006327E /* QueryToPipelineTests.swift */,
 				1296CECD2DEE97EF007F8552 /* RealtimePipelineTests.swift */,
 				4D65F6E69993611D47DC8E7C /* SnapshotListenerSourceTests.swift */,
 				EFF22EA92C5060A4009A369B /* VectorIntegrationTests.swift */,
@@ -4984,6 +4982,7 @@
 				3D5F7AA7BB68529F47BE4B12 /* PipelineApiTests.swift in Sources */,
 				655F8647F57E5F2155DFF7B5 /* PipelineTests.swift in Sources */,
 				621D620C28F9CE7400D2FA26 /* QueryIntegrationTests.swift in Sources */,
+				128F2B022E254E2C0006327E /* QueryToPipelineTests.swift in Sources */,
 				1296CECF2DEE97F5007F8552 /* RealtimePipelineTests.swift in Sources */,
 				1CFBD4563960D8A20C4679A3 /* SnapshotListenerSourceTests.swift in Sources */,
 				EE4C4BE7F93366AE6368EE02 /* TestHelper.swift in Sources */,
@@ -5265,6 +5264,7 @@
 				DF6FBE5BBD578B0DD34CEFA1 /* PipelineApiTests.swift in Sources */,
 				C8C2B945D84DD98391145F3F /* PipelineTests.swift in Sources */,
 				621D620B28F9CE7400D2FA26 /* QueryIntegrationTests.swift in Sources */,
+				128F2B032E254E2C0006327E /* QueryToPipelineTests.swift in Sources */,
 				1296CECE2DEE97F5007F8552 /* RealtimePipelineTests.swift in Sources */,
 				A0BC30D482B0ABD1A3A24CDC /* SnapshotListenerSourceTests.swift in Sources */,
 				A78366DBE0BFDE42474A728A /* TestHelper.swift in Sources */,
@@ -5828,6 +5828,7 @@
 				BD74B0E1FC752236A7376BC3 /* PipelineApiTests.swift in Sources */,
 				E04CB0D580980748D5DC453F /* PipelineTests.swift in Sources */,
 				621D620A28F9CE7400D2FA26 /* QueryIntegrationTests.swift in Sources */,
+				128F2B012E254E2C0006327E /* QueryToPipelineTests.swift in Sources */,
 				1296CED02DEE97F5007F8552 /* RealtimePipelineTests.swift in Sources */,
 				B00F8D1819EE20C45B660940 /* SnapshotListenerSourceTests.swift in Sources */,
 				AD34726BFD3461FF64BBD56D /* TestHelper.swift in Sources */,

+ 45 - 33
Firestore/Example/Tests/SpecTests/FSTSpecTests.mm

@@ -918,37 +918,49 @@ NSString *ToTargetIdListString(const ActiveTargetMap &map) {
     }
     if (expectedState[@"activeTargets"]) {
       __block ActiveTargetMap expectedActiveTargets;
-      [expectedState[@"activeTargets"] enumerateKeysAndObjectsUsingBlock:^(NSString *targetIDString,
-                                                                           NSDictionary *queryData,
-                                                                           BOOL *) {
-        TargetId targetID = [targetIDString intValue];
-        NSArray *queriesJson = queryData[@"queries"];
-        std::vector<TargetData> queries;
-        for (id queryJson in queriesJson) {
-          core::QueryOrPipeline qop;
-          Query query = [self parseQuery:queryJson];
-
-          QueryPurpose purpose = QueryPurpose::Listen;
-          if ([queryData objectForKey:@"targetPurpose"] != nil) {
-            purpose = [self parseQueryPurpose:queryData[@"targetPurpose"]];
-          }
-
-          TargetData target_data(core::TargetOrPipeline(query.ToTarget()), targetID, 0, purpose);
-          if ([queryData objectForKey:@"resumeToken"] != nil) {
-            target_data = target_data.WithResumeToken(MakeResumeToken(queryData[@"resumeToken"]),
-                                                      SnapshotVersion::None());
-          } else {
-            target_data = target_data.WithResumeToken(ByteString(),
-                                                      [self parseVersion:queryData[@"readTime"]]);
-          }
-
-          if ([queryData objectForKey:@"expectedCount"] != nil) {
-            target_data = target_data.WithExpectedCount([queryData[@"expectedCount"] intValue]);
-          }
-          queries.push_back(std::move(target_data));
-        }
-        expectedActiveTargets[targetID] = std::move(queries);
-      }];
+      [expectedState[@"activeTargets"]
+          enumerateKeysAndObjectsUsingBlock:^(NSString *targetIDString, NSDictionary *queryData,
+                                              BOOL *) {
+            TargetId targetID = [targetIDString intValue];
+            NSArray *queriesJson = queryData[@"queries"];
+            std::vector<TargetData> queries;
+            for (id queryJson in queriesJson) {
+              QueryPurpose purpose = QueryPurpose::Listen;
+              if ([queryData objectForKey:@"targetPurpose"] != nil) {
+                purpose = [self parseQueryPurpose:queryData[@"targetPurpose"]];
+              }
+
+              core::TargetOrPipeline top;
+              Query query = [self parseQuery:queryJson];
+
+              if (self->_convertToPipeline &&
+                  purpose != firebase::firestore::local::QueryPurpose::LimboResolution) {
+                std::vector<std::shared_ptr<api::EvaluableStage>> stages =
+                    core::ToPipelineStages(query);
+                auto serializer =
+                    absl::make_unique<remote::Serializer>(self.driver.databaseInfo.database_id());
+                top = core::TargetOrPipeline(
+                    api::RealtimePipeline(std::move(stages), std::move(serializer)));
+              } else {
+                top = core::TargetOrPipeline(query.ToTarget());
+              }
+
+              TargetData target_data(top, targetID, 0, purpose);
+              if ([queryData objectForKey:@"resumeToken"] != nil) {
+                target_data = target_data.WithResumeToken(
+                    MakeResumeToken(queryData[@"resumeToken"]), SnapshotVersion::None());
+              } else {
+                target_data = target_data.WithResumeToken(
+                    ByteString(), [self parseVersion:queryData[@"readTime"]]);
+              }
+
+              if ([queryData objectForKey:@"expectedCount"] != nil) {
+                target_data = target_data.WithExpectedCount([queryData[@"expectedCount"] intValue]);
+              }
+              queries.push_back(std::move(target_data));
+            }
+            expectedActiveTargets[targetID] = std::move(queries);
+          }];
       [self.driver setExpectedActiveTargets:std::move(expectedActiveTargets)];
     }
   }
@@ -1058,12 +1070,12 @@ NSString *ToTargetIdListString(const ActiveTargetMap &map) {
     // XCTAssertEqualObjects(actualTargets[targetID], TargetData);
     const TargetData &actual = found->second;
     auto left = actual.target_or_pipeline();
-    auto left_p = left.IsPipeline();
     auto right = targetData.target_or_pipeline();
+    auto left_p = left.IsPipeline();
     auto right_p = right.IsPipeline();
-    XCTAssertEqual(actual.purpose(), targetData.purpose());
     XCTAssertEqual(left_p, right_p);
     XCTAssertEqual(left, right);
+    XCTAssertEqual(actual.purpose(), targetData.purpose());
     XCTAssertEqual(actual.target_id(), targetData.target_id());
     XCTAssertEqual(actual.snapshot_version(), targetData.snapshot_version());
     XCTAssertEqual(actual.resume_token(), targetData.resume_token());

+ 33 - 0
Firestore/Source/API/FIRPipelineBridge+Internal.h

@@ -48,6 +48,39 @@ NS_ASSUME_NONNULL_BEGIN
 
 @end
 
+@interface FIRCollectionSourceStageBridge (Internal)
+- (id)initWithCppStage:(std::shared_ptr<const firebase::firestore::api::CollectionSource>)stage;
+@end
+
+@interface FIRDatabaseSourceStageBridge (Internal)
+- (id)initWithCppStage:(std::shared_ptr<const firebase::firestore::api::DatabaseSource>)stage;
+@end
+
+@interface FIRCollectionGroupSourceStageBridge (Internal)
+- (id)initWithCppStage:
+    (std::shared_ptr<const firebase::firestore::api::CollectionGroupSource>)stage;
+@end
+
+@interface FIRDocumentsSourceStageBridge (Internal)
+- (id)initWithCppStage:(std::shared_ptr<const firebase::firestore::api::DocumentsSource>)stage;
+@end
+
+@interface FIRWhereStageBridge (Internal)
+- (id)initWithCppStage:(std::shared_ptr<const firebase::firestore::api::Where>)stage;
+@end
+
+@interface FIRLimitStageBridge (Internal)
+- (id)initWithCppStage:(std::shared_ptr<const firebase::firestore::api::LimitStage>)stage;
+@end
+
+@interface FIROffsetStageBridge (Internal)
+- (id)initWithCppStage:(std::shared_ptr<const firebase::firestore::api::OffsetStage>)stage;
+@end
+
+@interface FIRSorStageBridge (Internal)
+- (id)initWithCppStage:(std::shared_ptr<const firebase::firestore::api::SortStage>)stage;
+@end
+
 @interface __FIRPipelineSnapshotBridge (Internal)
 
 - (id)initWithCppSnapshot:(api::PipelineSnapshot)snapshot;

+ 166 - 2
Firestore/Source/API/FIRPipelineBridge.mm

@@ -26,6 +26,7 @@
 #import "Firestore/Source/API/FIRFirestore+Internal.h"
 #import "Firestore/Source/API/FIRListenerRegistration+Internal.h"
 #import "Firestore/Source/API/FIRPipelineBridge+Internal.h"
+#import "Firestore/Source/API/FIRQuery+Internal.h"
 #import "Firestore/Source/API/FIRSnapshotMetadata+Internal.h"
 #import "Firestore/Source/API/FSTUserDataReader.h"
 #import "Firestore/Source/API/FSTUserDataWriter.h"
@@ -78,9 +79,9 @@ using firebase::firestore::api::Ordering;
 using firebase::firestore::api::Pipeline;
 using firebase::firestore::api::PipelineResultChange;
 using firebase::firestore::api::QueryListenerRegistration;
+using firebase::firestore::api::RawStage;
 using firebase::firestore::api::RealtimePipeline;
 using firebase::firestore::api::RealtimePipelineSnapshot;
-using firebase::firestore::api::RawStage;
 using firebase::firestore::api::RemoveFieldsStage;
 using firebase::firestore::api::ReplaceWith;
 using firebase::firestore::api::Sample;
@@ -256,6 +257,11 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
 @end
 
 @implementation FIRStageBridge
+- (NSString *)name {
+  [NSException raise:NSInternalInconsistencyException
+              format:@"You must override %@ in a subclass", NSStringFromSelector(_cmd)];
+  return nil;
+}
 @end
 
 @implementation FIRCollectionSourceStageBridge {
@@ -283,6 +289,17 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return collection_source;
 }
 
+- (id)initWithCppStage:(std::shared_ptr<const api::CollectionSource>)stage {
+  self = [super init];
+  if (self) {
+    collection_source = std::const_pointer_cast<api::CollectionSource>(stage);
+  }
+  return self;
+}
+
+- (NSString *)name {
+  return @"collection";
+}
 @end
 
 @implementation FIRDatabaseSourceStageBridge {
@@ -301,6 +318,17 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_database_source;
 }
 
+- (id)initWithCppStage:(std::shared_ptr<const api::DatabaseSource>)stage {
+  self = [super init];
+  if (self) {
+    cpp_database_source = std::const_pointer_cast<api::DatabaseSource>(stage);
+  }
+  return self;
+}
+
+- (NSString *)name {
+  return @"database";
+}
 @end
 
 @implementation FIRCollectionGroupSourceStageBridge {
@@ -319,6 +347,17 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_collection_group_source;
 }
 
+- (id)initWithCppStage:(std::shared_ptr<const api::CollectionGroupSource>)stage {
+  self = [super init];
+  if (self) {
+    cpp_collection_group_source = std::const_pointer_cast<api::CollectionGroupSource>(stage);
+  }
+  return self;
+}
+
+- (NSString *)name {
+  return @"collection_group";
+}
 @end
 
 @implementation FIRDocumentsSourceStageBridge {
@@ -350,6 +389,17 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_document_source;
 }
 
+- (id)initWithCppStage:(std::shared_ptr<const api::DocumentsSource>)stage {
+  self = [super init];
+  if (self) {
+    cpp_document_source = std::const_pointer_cast<api::DocumentsSource>(stage);
+  }
+  return self;
+}
+
+- (NSString *)name {
+  return @"documents";
+}
 @end
 
 @implementation FIRWhereStageBridge {
@@ -376,6 +426,18 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_where;
 }
 
+- (id)initWithCppStage:(std::shared_ptr<const api::Where>)stage {
+  self = [super init];
+  if (self) {
+    cpp_where = std::const_pointer_cast<api::Where>(stage);
+    isUserDataRead = YES;
+  }
+  return self;
+}
+
+- (NSString *)name {
+  return @"where";
+}
 @end
 
 @implementation FIRLimitStageBridge {
@@ -402,6 +464,18 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_limit_stage;
 }
 
+- (id)initWithCppStage:(std::shared_ptr<const api::LimitStage>)stage {
+  self = [super init];
+  if (self) {
+    cpp_limit_stage = std::const_pointer_cast<api::LimitStage>(stage);
+    isUserDataRead = YES;
+  }
+  return self;
+}
+
+- (NSString *)name {
+  return @"limit";
+}
 @end
 
 @implementation FIROffsetStageBridge {
@@ -428,6 +502,18 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_offset_stage;
 }
 
+- (id)initWithCppStage:(std::shared_ptr<const api::OffsetStage>)stage {
+  self = [super init];
+  if (self) {
+    cpp_offset_stage = std::const_pointer_cast<api::OffsetStage>(stage);
+    isUserDataRead = YES;
+  }
+  return self;
+}
+
+- (NSString *)name {
+  return @"offset";
+}
 @end
 
 // TBD
@@ -460,6 +546,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_add_fields;
 }
 
+- (NSString *)name {
+  return @"add_fields";
+}
 @end
 
 @implementation FIRRemoveFieldsStageBridge {
@@ -490,6 +579,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_remove_fields;
 }
 
+- (NSString *)name {
+  return @"remove_fields";
+}
 @end
 
 @implementation FIRSelectStageBridge {
@@ -520,6 +612,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_select;
 }
 
+- (NSString *)name {
+  return @"select";
+}
 @end
 
 @implementation FIRDistinctStageBridge {
@@ -550,6 +645,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_distinct;
 }
 
+- (NSString *)name {
+  return @"distinct";
+}
 @end
 
 @implementation FIRAggregateStageBridge {
@@ -589,6 +687,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_aggregate;
 }
 
+- (NSString *)name {
+  return @"aggregate";
+}
 @end
 
 @implementation FIRFindNearestStageBridge {
@@ -650,6 +751,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_find_nearest;
 }
 
+- (NSString *)name {
+  return @"find_nearest";
+}
 @end
 
 @implementation FIRSorStageBridge {
@@ -680,6 +784,18 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_sort;
 }
 
+- (id)initWithCppStage:(std::shared_ptr<const api::SortStage>)stage {
+  self = [super init];
+  if (self) {
+    cpp_sort = std::const_pointer_cast<api::SortStage>(stage);
+    isUserDataRead = YES;
+  }
+  return self;
+}
+
+- (NSString *)name {
+  return @"sort";
+}
 @end
 
 @implementation FIRReplaceWithStageBridge {
@@ -706,6 +822,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_replace_with;
 }
 
+- (NSString *)name {
+  return @"replace_with";
+}
 @end
 
 @implementation FIRSampleStageBridge {
@@ -753,6 +872,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_sample;
 }
 
+- (NSString *)name {
+  return @"sample";
+}
 @end
 
 @implementation FIRUnionStageBridge {
@@ -779,6 +901,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_union_stage;
 }
 
+- (NSString *)name {
+  return @"union";
+}
 @end
 
 @implementation FIRUnnestStageBridge {
@@ -818,6 +943,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_unnest;
 }
 
+- (NSString *)name {
+  return @"unnest";
+}
 @end
 
 @implementation FIRRawStageBridge {
@@ -900,6 +1028,9 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_generic_stage;
 }
 
+- (NSString *)name {
+  return _name;
+}
 @end
 
 @interface __FIRPipelineSnapshotBridge ()
@@ -1118,6 +1249,39 @@ inline std::string EnsureLeadingSlash(const std::string &path) {
   return cpp_pipeline;
 }
 
++ (NSArray<FIRStageBridge *> *)createStageBridgesFromQuery:(FIRQuery *)query {
+  std::vector<std::shared_ptr<api::EvaluableStage>> evaluable_stages =
+      firebase::firestore::core::ToPipelineStages(query.query);
+  std::vector<std::shared_ptr<api::Stage>> cpp_stages(evaluable_stages.begin(),
+                                                      evaluable_stages.end());
+  NSMutableArray<FIRStageBridge *> *stageBridges = [NSMutableArray array];
+
+  for (const auto &cpp_stage_base : cpp_stages) {
+    if (auto cpp_stage = std::dynamic_pointer_cast<api::CollectionSource>(cpp_stage_base)) {
+      [stageBridges addObject:[[FIRCollectionSourceStageBridge alloc] initWithCppStage:cpp_stage]];
+    } else if (auto cpp_stage =
+                   std::dynamic_pointer_cast<api::CollectionGroupSource>(cpp_stage_base)) {
+      [stageBridges
+          addObject:[[FIRCollectionGroupSourceStageBridge alloc] initWithCppStage:cpp_stage]];
+    } else if (auto cpp_stage = std::dynamic_pointer_cast<api::DocumentsSource>(cpp_stage_base)) {
+      [stageBridges addObject:[[FIRDocumentsSourceStageBridge alloc] initWithCppStage:cpp_stage]];
+    } else if (auto cpp_stage = std::dynamic_pointer_cast<api::Where>(cpp_stage_base)) {
+      [stageBridges addObject:[[FIRWhereStageBridge alloc] initWithCppStage:cpp_stage]];
+    } else if (auto cpp_stage = std::dynamic_pointer_cast<api::LimitStage>(cpp_stage_base)) {
+      [stageBridges addObject:[[FIRLimitStageBridge alloc] initWithCppStage:cpp_stage]];
+    } else if (auto cpp_stage = std::dynamic_pointer_cast<api::SortStage>(cpp_stage_base)) {
+      [stageBridges addObject:[[FIRSorStageBridge alloc] initWithCppStage:cpp_stage]];
+    } else if (auto cpp_stage = std::dynamic_pointer_cast<api::OffsetStage>(cpp_stage_base)) {
+      [stageBridges addObject:[[FIROffsetStageBridge alloc] initWithCppStage:cpp_stage]];
+    } else {
+      ThrowInvalidArgument(
+          "Unknown or unhandled stage type '%s' encountered when converting from FIRQuery.",
+          cpp_stage_base->name().c_str());
+    }
+  }
+  return [stageBridges copy];
+}
+
 @end
 
 @interface __FIRRealtimePipelineSnapshotBridge ()
@@ -1297,7 +1461,7 @@ core::ListenOptions ToListenOptions(__FIRPipelineListenOptionsBridge *_Nullable
       wrapped_firestore->client()->user_executor(), std::move(view_listener));
 
   std::shared_ptr<core::QueryListener> query_listener = wrapped_firestore->client()->ListenToQuery(
-      *cpp_pipeline, ToListenOptions(options), async_listener);
+      core::QueryOrPipeline(*cpp_pipeline), ToListenOptions(options), async_listener);
 
   return [[FSTListenerRegistration alloc]
       initWithRegistration:absl::make_unique<QueryListenerRegistration>(wrapped_firestore->client(),

+ 2 - 7
Firestore/Source/Public/FirebaseFirestore/FIRPipelineBridge.h

@@ -69,6 +69,7 @@ NS_SWIFT_NAME(OrderingBridge)
 NS_SWIFT_SENDABLE
 NS_SWIFT_NAME(StageBridge)
 @interface FIRStageBridge : NSObject
+@property(nonatomic, readonly) NSString *name;
 @end
 
 NS_SWIFT_SENDABLE
@@ -76,7 +77,6 @@ NS_SWIFT_NAME(CollectionSourceStageBridge)
 @interface FIRCollectionSourceStageBridge : FIRStageBridge
 
 - (id)initWithRef:(FIRCollectionReference *)ref firestore:(FIRFirestore *)db;
-
 @end
 
 NS_SWIFT_SENDABLE
@@ -84,7 +84,6 @@ NS_SWIFT_NAME(DatabaseSourceStageBridge)
 @interface FIRDatabaseSourceStageBridge : FIRStageBridge
 
 - (id)init;
-
 @end
 
 NS_SWIFT_SENDABLE
@@ -92,7 +91,6 @@ NS_SWIFT_NAME(CollectionGroupSourceStageBridge)
 @interface FIRCollectionGroupSourceStageBridge : FIRStageBridge
 
 - (id)initWithCollectionId:(NSString *)id;
-
 @end
 
 NS_SWIFT_SENDABLE
@@ -100,7 +98,6 @@ NS_SWIFT_NAME(DocumentsSourceStageBridge)
 @interface FIRDocumentsSourceStageBridge : FIRStageBridge
 
 - (id)initWithDocuments:(NSArray<FIRDocumentReference *> *)documents firestore:(FIRFirestore *)db;
-
 @end
 
 NS_SWIFT_SENDABLE
@@ -108,7 +105,6 @@ NS_SWIFT_NAME(WhereStageBridge)
 @interface FIRWhereStageBridge : FIRStageBridge
 
 - (id)initWithExpr:(FIRExprBridge *)expr;
-
 @end
 
 NS_SWIFT_SENDABLE
@@ -116,7 +112,6 @@ NS_SWIFT_NAME(LimitStageBridge)
 @interface FIRLimitStageBridge : FIRStageBridge
 
 - (id)initWithLimit:(NSInteger)value;
-
 @end
 
 NS_SWIFT_SENDABLE
@@ -124,7 +119,6 @@ NS_SWIFT_NAME(OffsetStageBridge)
 @interface FIROffsetStageBridge : FIRStageBridge
 
 - (id)initWithOffset:(NSInteger)value;
-
 @end
 
 NS_SWIFT_SENDABLE
@@ -269,6 +263,7 @@ NS_SWIFT_NAME(PipelineBridge)
 - (void)executeWithCompletion:(void (^)(__FIRPipelineSnapshotBridge *_Nullable result,
                                         NSError *_Nullable error))completion;
 
++ (NSArray<FIRStageBridge *> *)createStageBridgesFromQuery:(FIRQuery *)query;
 @end
 
 NS_SWIFT_SENDABLE

+ 33 - 11
Firestore/Swift/Source/SwiftAPI/Pipeline/PipelineSource.swift

@@ -23,8 +23,11 @@ public struct PipelineSource<P>: @unchecked Sendable {
   }
 
   public func collection(_ path: String) -> P {
-    let normalizedPath = path.hasPrefix("/") ? path : "/" + path
-    return factory([CollectionSource(collection: normalizedPath)], db)
+    return factory([CollectionSource(collection: db.collection(path), db: db)], db)
+  }
+
+  public func collection(_ coll: CollectionReference) -> P {
+    return factory([CollectionSource(collection: coll, db: db)], db)
   }
 
   public func collectionGroup(_ collectionId: String) -> P {
@@ -39,20 +42,39 @@ public struct PipelineSource<P>: @unchecked Sendable {
   }
 
   public func documents(_ docs: [DocumentReference]) -> P {
-    let paths = docs.map { $0.path.hasPrefix("/") ? $0.path : "/" + $0.path }
-    return factory([DocumentsSource(paths: paths)], db)
+    return factory([DocumentsSource(docs: docs, db: db)], db)
   }
 
   public func documents(_ paths: [String]) -> P {
-    let normalizedPaths = paths.map { $0.hasPrefix("/") ? $0 : "/" + $0 }
-    return factory([DocumentsSource(paths: normalizedPaths)], db)
+    let docs = paths.map { db.document($0) }
+    return factory([DocumentsSource(docs: docs, db: db)], db)
   }
 
   public func create(from query: Query) -> P {
-    return factory([QuerySource(query: query)], db)
-  }
-
-  public func create(from aggregateQuery: AggregateQuery) -> P {
-    return factory([AggregateQuerySource(aggregateQuery: aggregateQuery)], db)
+    let stageBridges = PipelineBridge.createStageBridges(from: query)
+    let stages: [Stage] = stageBridges.map { bridge in
+      switch bridge.name {
+      case "collection":
+        return CollectionSource(
+          bridge: bridge as! CollectionSourceStageBridge,
+          db: query.firestore
+        )
+      case "collection_group":
+        return CollectionGroupSource(bridge: bridge as! CollectionGroupSourceStageBridge)
+      case "documents":
+        return DocumentsSource(bridge: bridge as! DocumentsSourceStageBridge, db: query.firestore)
+      case "where":
+        return Where(bridge: bridge as! WhereStageBridge)
+      case "limit":
+        return Limit(bridge: bridge as! LimitStageBridge)
+      case "sort":
+        return Sort(bridge: bridge as! SortStageBridge)
+      case "offset":
+        return Offset(bridge: bridge as! OffsetStageBridge)
+      default:
+        fatalError("Unknown stage type \(bridge.name)")
+      }
+    }
+    return factory(stages, db)
   }
 }

+ 1 - 1
Firestore/Swift/Source/SwiftAPI/Pipeline/RealtimePipeline.swift

@@ -138,7 +138,7 @@ public struct RealtimePipeline: @unchecked Sendable {
   ///
   /// - Parameter condition: The `BooleanExpr` to apply.
   /// - Returns: A new `Pipeline` object with this stage appended.
-  public func `where`(_ condition: BooleanExpr) -> RealtimePipeline {
+  public func `where`(_ condition: BooleanExpression) -> RealtimePipeline {
     return RealtimePipeline(stages: stages + [Where(condition: condition)], db: db)
   }
 

+ 40 - 50
Firestore/Swift/Source/SwiftAPI/Stages.swift

@@ -33,27 +33,32 @@ class CollectionSource: Stage {
   let name: String = "collection"
 
   let bridge: StageBridge
-  private var collection: CollectionReference
   private let db: Firestore
 
   init(collection: CollectionReference, db: Firestore) {
-    self.collection = collection
     self.db = db
     bridge = CollectionSourceStageBridge(ref: collection, firestore: db)
   }
+
+  init(bridge: CollectionSourceStageBridge, db: Firestore) {
+    self.db = db
+    self.bridge = bridge
+  }
 }
 
 @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
 class CollectionGroupSource: Stage {
-  let name: String = "collectionId"
+  let name: String = "collection_group"
 
   let bridge: StageBridge
-  private var collectionId: String
 
   init(collectionId: String) {
-    self.collectionId = collectionId
     bridge = CollectionGroupSourceStageBridge(collectionId: collectionId)
   }
+
+  init(bridge: CollectionGroupSourceStageBridge) {
+    self.bridge = bridge
+  }
 }
 
 // Represents the entire database as a source.
@@ -65,6 +70,10 @@ class DatabaseSource: Stage {
   init() {
     bridge = DatabaseSourceStageBridge()
   }
+
+  init(bridge: DatabaseSourceStageBridge) {
+    self.bridge = bridge
+  }
 }
 
 // Represents a list of document references as a source.
@@ -72,42 +81,17 @@ class DatabaseSource: Stage {
 class DocumentsSource: Stage {
   let name: String = "documents"
   let bridge: StageBridge
-  private var docs: [DocumentReference]
   private let db: Firestore
 
   // Initialize with an array of String paths
   init(docs: [DocumentReference], db: Firestore) {
-    self.docs = docs
     self.db = db
     bridge = DocumentsSourceStageBridge(documents: docs, firestore: db)
   }
-}
-
-// Represents an existing Query as a source.
-@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
-class QuerySource: Stage {
-  let name: String = "query"
-  let bridge: StageBridge
-  private var query: Query
-
-  init(query: Query) {
-    self.query = query
-    bridge = DatabaseSourceStageBridge()
-    // TODO: bridge = QuerySourceStageBridge(query: query.query)
-  }
-}
 
-// Represents an existing AggregateQuery as a source.
-@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
-class AggregateQuerySource: Stage {
-  let name: String = "aggregateQuery"
-  let bridge: StageBridge
-  private var aggregateQuery: AggregateQuery
-
-  init(aggregateQuery: AggregateQuery) {
-    self.aggregateQuery = aggregateQuery
-    bridge = DatabaseSourceStageBridge()
-    // TODO: bridge = AggregateQuerySourceStageBridge(aggregateQuery: aggregateQuery.query)
+  init(bridge: DocumentsSourceStageBridge, db: Firestore) {
+    self.db = db
+    self.bridge = bridge
   }
 }
 
@@ -116,12 +100,16 @@ class Where: Stage {
   let name: String = "where"
 
   let bridge: StageBridge
-  private var condition: BooleanExpression
+  private var condition: BooleanExpression?
 
   init(condition: BooleanExpression) {
     self.condition = condition
     bridge = WhereStageBridge(expr: condition.toBridge())
   }
+
+  init(bridge: WhereStageBridge) {
+    self.bridge = bridge
+  }
 }
 
 @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
@@ -129,12 +117,14 @@ class Limit: Stage {
   let name: String = "limit"
 
   let bridge: StageBridge
-  private var limit: Int32
 
   init(_ limit: Int32) {
-    self.limit = limit
     bridge = LimitStageBridge(limit: NSInteger(limit))
   }
+
+  init(bridge: LimitStageBridge) {
+    self.bridge = bridge
+  }
 }
 
 @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
@@ -142,17 +132,19 @@ class Offset: Stage {
   let name: String = "offset"
 
   let bridge: StageBridge
-  private var offset: Int32
 
   init(_ offset: Int32) {
-    self.offset = offset
     bridge = OffsetStageBridge(offset: NSInteger(offset))
   }
+
+  init(bridge: OffsetStageBridge) {
+    self.bridge = bridge
+  }
 }
 
 @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
 class AddFields: Stage {
-  let name: String = "addFields"
+  let name: String = "add_fields"
   let bridge: StageBridge
   private var selectables: [Selectable]
 
@@ -171,7 +163,7 @@ class AddFields: Stage {
 
 @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
 class RemoveFieldsStage: Stage {
-  let name: String = "removeFields"
+  let name: String = "remove_fields"
   let bridge: StageBridge
   private var fields: [String]
 
@@ -190,10 +182,8 @@ class RemoveFieldsStage: Stage {
 class Select: Stage {
   let name: String = "select"
   let bridge: StageBridge
-  private var selections: [Selectable]
 
   init(selections: [Selectable]) {
-    self.selections = selections
     let map = Helper.selectablesToMap(selectables: selections)
     bridge = SelectStageBridge(selections: map
       .mapValues { Helper.sendableToExpr($0).toBridge() })
@@ -204,10 +194,8 @@ class Select: Stage {
 class Distinct: Stage {
   let name: String = "distinct"
   let bridge: StageBridge
-  private var groups: [Selectable]
 
   init(groups: [Selectable]) {
-    self.groups = groups
     let map = Helper.selectablesToMap(selectables: groups)
     bridge = DistinctStageBridge(groups: map
       .mapValues { Helper.sendableToExpr($0).toBridge() })
@@ -226,12 +214,12 @@ class Aggregate: Stage {
     if groups != nil {
       self.groups = Helper.selectablesToMap(selectables: groups!)
     }
-    let map = accumulators
+    let accumulatorsMap = accumulators
       .reduce(into: [String: AggregateFunctionBridge]()) { result, accumulator in
         result[accumulator.alias] = accumulator.aggregate.bridge
       }
     bridge = AggregateStageBridge(
-      accumulators: map,
+      accumulators: accumulatorsMap,
       groups: self.groups.mapValues { Helper.sendableToExpr($0).toBridge() }
     )
   }
@@ -239,7 +227,7 @@ class Aggregate: Stage {
 
 @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
 class FindNearest: Stage {
-  let name: String = "findNearest"
+  let name: String = "find_nearest"
   let bridge: StageBridge
   private var field: Field
   private var vectorValue: VectorValue
@@ -271,17 +259,19 @@ class FindNearest: Stage {
 class Sort: Stage {
   let name: String = "sort"
   let bridge: StageBridge
-  private var orderings: [Ordering]
 
   init(orderings: [Ordering]) {
-    self.orderings = orderings
     bridge = SortStageBridge(orderings: orderings.map { $0.bridge })
   }
+
+  init(bridge: SortStageBridge) {
+    self.bridge = bridge
+  }
 }
 
 @available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
 class ReplaceWith: Stage {
-  let name: String = "replaceWith"
+  let name: String = "replace_with"
   let bridge: StageBridge
   private var expr: Expression
 

+ 1 - 0
Firestore/Swift/Tests/Integration/AggregationIntegrationTests.swift

@@ -31,6 +31,7 @@ class AggregationIntegrationTests: FSTIntegrationTestCase {
     try await collection.addDocument(data: ["author": "authorA",
                                             "title": "titleA",
                                             "pages": 100,
+
                                             "height": 24.5,
                                             "weight": 24.1,
                                             "foo": 1,

+ 6 - 9
Firestore/Swift/Tests/Integration/PipelineApiTests.swift

@@ -36,9 +36,6 @@ final class PipelineApiTests: FSTIntegrationTestCase {
     let query: Query = db.collection("foo").limit(to: 2)
     let _: Pipeline = pipelineSource.create(from: query)
 
-    let aggregateQuery = db.collection("foo").count
-    let _: Pipeline = pipelineSource.create(from: aggregateQuery)
-
     let _: PipelineSnapshot = try await pipeline.execute()
   }
 
@@ -310,12 +307,12 @@ final class PipelineApiTests: FSTIntegrationTestCase {
         ]
       )
 
-    // One special Field value is conveniently exposed as constructor to help the user reference reserved field values of __name__.
-        _ = db.pipeline().collection("books")
-          .addFields([
-            DocumentId()
-            ]
-          )
+    // One special Field value is conveniently exposed as constructor to help the user reference
+    // reserved field values of __name__.
+    _ = db.pipeline().collection("books")
+      .addFields([
+        DocumentId(),
+      ])
   }
 
   func testConstant() async throws {

+ 105 - 52
Firestore/Swift/Tests/Integration/QueryIntegrationTests.swift

@@ -18,7 +18,20 @@ import FirebaseFirestore
 import Foundation
 
 class QueryIntegrationTests: FSTIntegrationTestCase {
-  func testOrQueries() throws {
+  class var isRunningPipeline: Bool {
+    return false
+  }
+
+  open func check(_ coll: CollectionReference, query: Query,
+                  matchesResult expectedKeys: [String]) async throws {
+    checkOnlineAndOfflineCollection(
+      coll,
+      query: query,
+      matchesResult: expectedKeys
+    )
+  }
+
+  func testOrQueries() async throws {
     let collRef = collectionRef(
       withDocuments: ["doc1": ["a": 1, "b": 0],
                       "doc2": ["a": 2, "b": 1],
@@ -32,8 +45,8 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", isEqualTo: 1),
        Filter.whereField("b", isEqualTo: 1)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter1),
-                                    matchesResult: ["doc1", "doc2", "doc4", "doc5"])
+    try await check(collRef, query: collRef.whereFilter(filter1),
+                    matchesResult: ["doc1", "doc2", "doc4", "doc5"])
 
     // (a==1 && b==0) || (a==3 && b==2)
     let filter2 = Filter.orFilter(
@@ -46,8 +59,8 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
          Filter.whereField("b", isEqualTo: 2)]
       )]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter2),
-                                    matchesResult: ["doc1", "doc3"])
+    try await check(collRef, query: collRef.whereFilter(filter2),
+                    matchesResult: ["doc1", "doc3"])
 
     // a==1 && (b==0 || b==3).
     let filter3 = Filter.andFilter(
@@ -57,8 +70,8 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
           Filter.whereField("b", isEqualTo: 3)]
        )]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter3),
-                                    matchesResult: ["doc1", "doc4"])
+    try await check(collRef, query: collRef.whereFilter(filter3),
+                    matchesResult: ["doc1", "doc4"])
 
     // (a==2 || b==2) && (a==3 || b==3)
     let filter4 = Filter.andFilter(
@@ -71,21 +84,21 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
          Filter.whereField("b", isEqualTo: 3)]
       )]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter4),
-                                    matchesResult: ["doc3"])
+    try await check(collRef, query: collRef.whereFilter(filter4),
+                    matchesResult: ["doc3"])
 
     // Test with limits without orderBy (the __name__ ordering is the tie breaker).
     let filter5 = Filter.orFilter(
       [Filter.whereField("a", isEqualTo: 2),
        Filter.whereField("b", isEqualTo: 1)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter5).limit(to: 1),
-                                    matchesResult: ["doc2"])
+    try await check(collRef, query: collRef.whereFilter(filter5).limit(to: 1),
+                    matchesResult: ["doc2"])
   }
 
-  func testOrQueriesWithCompositeIndexes() throws {
+  func testOrQueriesWithCompositeIndexes() async throws {
     // TODO(orquery): Enable this test against production when possible.
-    try XCTSkipIf(!FSTIntegrationTestCase.isRunningAgainstEmulator(),
+    try XCTSkipIf(!(FSTIntegrationTestCase.isRunningAgainstEmulator()),
                   "Skip this test if running against production because it results in" +
                     "a 'missing index' error. The Firestore Emulator, however, does serve these queries.")
 
@@ -102,16 +115,16 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", isGreaterThan: 2),
        Filter.whereField("b", isEqualTo: 1)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter1),
-                                    matchesResult: ["doc5", "doc2", "doc3"])
+    try await check(collRef, query: collRef.whereFilter(filter1),
+                    matchesResult: ["doc5", "doc2", "doc3"])
 
     // Test with limits (implicit order by ASC): (a==1) || (b > 0) LIMIT 2
     let filter2 = Filter.orFilter(
       [Filter.whereField("a", isEqualTo: 1),
        Filter.whereField("b", isGreaterThan: 0)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter2).limit(to: 2),
-                                    matchesResult: ["doc1", "doc2"])
+    try await check(collRef, query: collRef.whereFilter(filter2).limit(to: 2),
+                    matchesResult: ["doc1", "doc2"])
 
     // Test with limits (explicit order by): (a==1) || (b > 0) LIMIT_TO_LAST 2
     // Note: The public query API does not allow implicit ordering when limitToLast is used.
@@ -119,7 +132,7 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", isEqualTo: 1),
        Filter.whereField("b", isGreaterThan: 0)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter3)
+    try await check(collRef, query: collRef.whereFilter(filter3)
       .limit(toLast: 2)
       .order(by: "b"),
       matchesResult: ["doc3", "doc4"])
@@ -129,7 +142,7 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", isEqualTo: 2),
        Filter.whereField("b", isEqualTo: 1)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter4).limit(to: 1)
+    try await check(collRef, query: collRef.whereFilter(filter4).limit(to: 1)
       .order(by: "a"),
       matchesResult: ["doc5"])
 
@@ -138,12 +151,12 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", isEqualTo: 2),
        Filter.whereField("b", isEqualTo: 1)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter5).limit(toLast: 1)
+    try await check(collRef, query: collRef.whereFilter(filter5).limit(toLast: 1)
       .order(by: "a"),
       matchesResult: ["doc2"])
   }
 
-  func testOrQueriesWithIn() throws {
+  func testOrQueriesWithIn() async throws {
     let collRef = collectionRef(
       withDocuments: ["doc1": ["a": 1, "b": 0],
                       "doc2": ["b": 1],
@@ -158,11 +171,11 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", isEqualTo: 2),
        Filter.whereField("b", in: [2, 3])]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter),
-                                    matchesResult: ["doc3", "doc4", "doc6"])
+    try await check(collRef, query: collRef.whereFilter(filter),
+                    matchesResult: ["doc3", "doc4", "doc6"])
   }
 
-  func testOrQueriesWithArrayMembership() throws {
+  func testOrQueriesWithArrayMembership() async throws {
     let collRef = collectionRef(
       withDocuments: ["doc1": ["a": 1, "b": [0]],
                       "doc2": ["b": 1],
@@ -177,19 +190,19 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", isEqualTo: 2),
        Filter.whereField("b", arrayContains: 7)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter1),
-                                    matchesResult: ["doc3", "doc4", "doc6"])
+    try await check(collRef, query: collRef.whereFilter(filter1),
+                    matchesResult: ["doc3", "doc4", "doc6"])
 
     // a==2 || b array-contains-any [0, 3]
     let filter2 = Filter.orFilter(
       [Filter.whereField("a", isEqualTo: 2),
        Filter.whereField("b", arrayContainsAny: [0, 3])]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter2),
-                                    matchesResult: ["doc1", "doc4", "doc6"])
+    try await check(collRef, query: collRef.whereFilter(filter2),
+                    matchesResult: ["doc1", "doc4", "doc6"])
   }
 
-  func testMultipleInOps() throws {
+  func testMultipleInOps() async throws {
     let collRef = collectionRef(
       withDocuments: ["doc1": ["a": 1, "b": 0],
                       "doc2": ["b": 1],
@@ -204,8 +217,8 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", in: [2, 3]),
        Filter.whereField("b", in: [0, 2])]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter1).order(by: "a"),
-                                    matchesResult: ["doc1", "doc6", "doc3"])
+    try await check(collRef, query: collRef.whereFilter(filter1).order(by: "a"),
+                    matchesResult: ["doc1", "doc6", "doc3"])
 
     // Two IN operations on same fields with disjunction.
     // a IN [0,3] || a IN [0,2] should union them (similar to: a IN [0,2,3]).
@@ -213,11 +226,11 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", in: [0, 3]),
        Filter.whereField("a", in: [0, 2])]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter2),
-                                    matchesResult: ["doc3", "doc6"])
+    try await check(collRef, query: collRef.whereFilter(filter2),
+                    matchesResult: ["doc3", "doc6"])
   }
 
-  func testUsingInWithArrayContainsAny() throws {
+  func testUsingInWithArrayContainsAny() async throws {
     let collRef = collectionRef(
       withDocuments: ["doc1": ["a": 1, "b": [0]],
                       "doc2": ["b": [1]],
@@ -231,8 +244,8 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", in: [2, 3]),
        Filter.whereField("b", arrayContainsAny: [0, 7])]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter1),
-                                    matchesResult: ["doc1", "doc3", "doc4", "doc6"])
+    try await check(collRef, query: collRef.whereFilter(filter1),
+                    matchesResult: ["doc1", "doc3", "doc4", "doc6"])
 
     let filter2 = Filter.orFilter(
       [Filter.andFilter(
@@ -241,11 +254,11 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       ),
       Filter.whereField("b", arrayContainsAny: [0, 7])]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter2),
-                                    matchesResult: ["doc1", "doc3", "doc4"])
+    try await check(collRef, query: collRef.whereFilter(filter2),
+                    matchesResult: ["doc1", "doc3", "doc4"])
   }
 
-  func testUseInWithArrayContains() throws {
+  func testUseInWithArrayContains() async throws {
     let collRef = collectionRef(
       withDocuments: ["doc1": ["a": 1, "b": [0]],
                       "doc2": ["b": [1]],
@@ -259,15 +272,15 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
       [Filter.whereField("a", in: [2, 3]),
        Filter.whereField("b", arrayContainsAny: [3])]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter1),
-                                    matchesResult: ["doc3", "doc4", "doc6"])
+    try await check(collRef, query: collRef.whereFilter(filter1),
+                    matchesResult: ["doc3", "doc4", "doc6"])
 
     let filter2 = Filter.andFilter(
       [Filter.whereField("a", in: [2, 3]),
        Filter.whereField("b", arrayContains: 7)]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter2),
-                                    matchesResult: ["doc3"])
+    try await check(collRef, query: collRef.whereFilter(filter2),
+                    matchesResult: ["doc3"])
 
     let filter3 = Filter.orFilter(
       [Filter.whereField("a", in: [2, 3]),
@@ -276,8 +289,8 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
           Filter.whereField("a", isEqualTo: 1)]
        )]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter3),
-                                    matchesResult: ["doc3", "doc4", "doc6"])
+    try await check(collRef, query: collRef.whereFilter(filter3),
+                    matchesResult: ["doc3", "doc4", "doc6"])
 
     let filter4 = Filter.andFilter(
       [Filter.whereField("a", in: [2, 3]),
@@ -286,14 +299,16 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
           Filter.whereField("a", isEqualTo: 1)]
        )]
     )
-    checkOnlineAndOfflineCollection(collRef, query: collRef.whereFilter(filter4),
-                                    matchesResult: ["doc3"])
+    try await check(collRef, query: collRef.whereFilter(filter4),
+                    matchesResult: ["doc3"])
   }
 
-  func testOrderByEquality() throws {
+  func testOrderByEquality() async throws {
     // TODO(orquery): Enable this test against production when possible.
-    try XCTSkipIf(!FSTIntegrationTestCase.isRunningAgainstEmulator(),
-                  "Skip this test if running against production because order-by-equality is not supported yet.")
+    try XCTSkipIf(
+      !(FSTIntegrationTestCase.isRunningAgainstEmulator() || type(of: self).isRunningPipeline),
+      "Skip this test if running against production because order-by-equality is not supported yet."
+    )
 
     let collRef = collectionRef(
       withDocuments: ["doc1": ["a": 1, "b": [0]],
@@ -304,16 +319,54 @@ class QueryIntegrationTests: FSTIntegrationTestCase {
                       "doc6": ["a": 2, "c": 20]]
     )
 
-    checkOnlineAndOfflineCollection(
+    try await check(
       collRef,
       query: collRef.whereFilter(Filter.whereField("a", isEqualTo: 1)),
       matchesResult: ["doc1", "doc4", "doc5"]
     )
 
-    checkOnlineAndOfflineCollection(
+    try await check(
       collRef,
       query: collRef.whereFilter(Filter.whereField("a", in: [2, 3])).order(by: "a"),
       matchesResult: ["doc6", "doc3"]
     )
   }
 }
+
+class QueryAsPipelineIntegrationTests: QueryIntegrationTests {
+  override class var isRunningPipeline: Bool {
+    return true
+  }
+
+  override func check(_ coll: CollectionReference, query: Query,
+                      matchesResult expectedKeys: [String]) async throws {
+    let collPipeline = coll.firestore.realtimePipeline().create(from: coll)
+    var collIterator = collPipeline.snapshotStream().makeAsyncIterator()
+    var _ = try await collIterator.next()
+
+    let pipeline = query.firestore.realtimePipeline().create(from: query)
+
+    var cacheIterator = pipeline.snapshotStream(options: .init(source: .cache)).makeAsyncIterator()
+    let cacheSnapshot = try await cacheIterator.next()
+    let cacheResultIds = cacheSnapshot?.results().map { $0.id }
+
+    var serverIterator = pipeline.snapshotStream(options: .init(
+      includeMetadataChanges: true,
+      source: .default
+    )).makeAsyncIterator()
+    var serverSnapshot = try await serverIterator.next()
+    if serverSnapshot?.metadata.isFromCache == true {
+      serverSnapshot = try await serverIterator.next()
+    }
+    let serverResultIds = serverSnapshot?.results().map { $0.id }
+
+    var remoteKeysIterator = pipeline.snapshotStream(options: .init(source: .cache))
+      .makeAsyncIterator()
+    let remoteKeysSnapshot = try await remoteKeysIterator.next()
+    let remoteKeysResultIds = remoteKeysSnapshot?.results().map { $0.id }
+
+    XCTAssertEqual(cacheResultIds, serverResultIds)
+    XCTAssertEqual(serverResultIds, remoteKeysResultIds)
+    XCTAssertEqual(remoteKeysResultIds, expectedKeys)
+  }
+}

+ 727 - 0
Firestore/Swift/Tests/Integration/QueryToPipelineTests.swift

@@ -0,0 +1,727 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+import FirebaseCore
+import FirebaseFirestore
+import Foundation
+import XCTest
+
+@available(iOS 13, tvOS 13, macOS 10.15, macCatalyst 13, watchOS 7, *)
+class QueryToPipelineTests: FSTIntegrationTestCase {
+  let testUnsupportedFeatures = false
+
+  private func verifyResults(_ snapshot: PipelineSnapshot,
+                             _ expected: [[String: AnyHashable?]],
+                             enforceOrder: Bool = false,
+                             file: StaticString = #file,
+                             line: UInt = #line) {
+    let results = snapshot.results.map { $0.data as! [String: AnyHashable?] }
+    XCTAssertEqual(results.count, expected.count, "Result count mismatch.", file: file, line: line)
+
+    if enforceOrder {
+      for i in 0 ..< expected.count {
+        XCTAssertEqual(
+          results[i],
+          expected[i],
+          "Document at index \(i) does not match.",
+          file: file,
+          line: line
+        )
+      }
+    } else {
+      // For unordered comparison, convert to Sets of dictionaries.
+      XCTAssertEqual(
+        Set(results),
+        Set(expected),
+        "Result sets do not match.",
+        file: file,
+        line: line
+      )
+    }
+  }
+
+  func testSupportsDefaultQuery() async throws {
+    let collRef = collectionRef(withDocuments: ["1": ["foo": 1]])
+    let db = collRef.firestore
+
+    let pipeline = db.pipeline().create(from: collRef)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]])
+  }
+
+  func testSupportsFilteredQuery() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("foo", isEqualTo: 1)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]])
+  }
+
+  func testSupportsFilteredQueryWithFieldPath() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField(FieldPath(["foo"]), isEqualTo: 1)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]])
+  }
+
+  func testSupportsOrderedQueryWithDefaultOrder() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo")
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1], ["foo": 2]], enforceOrder: true)
+  }
+
+  func testSupportsOrderedQueryWithAsc() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo", descending: false)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1], ["foo": 2]], enforceOrder: true)
+  }
+
+  func testSupportsOrderedQueryWithDesc() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo", descending: true)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2], ["foo": 1]], enforceOrder: true)
+  }
+
+  func testSupportsLimitQuery() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo").limit(to: 1)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]], enforceOrder: true)
+  }
+
+  func testSupportsLimitToLastQuery() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+      "3": ["foo": 3],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo").limit(toLast: 2)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2], ["foo": 3]], enforceOrder: true)
+  }
+
+  func testSupportsStartAt() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo").start(at: [2])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2]], enforceOrder: true)
+  }
+
+  func testSupportsStartAtWithLimitToLast() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+      "3": ["foo": 3],
+      "4": ["foo": 4],
+      "5": ["foo": 5],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo").start(at: [3]).limit(toLast: 4)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 3], ["foo": 4], ["foo": 5]], enforceOrder: true)
+  }
+
+  func testSupportsEndAtWithLimitToLast() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+      "3": ["foo": 3],
+      "4": ["foo": 4],
+      "5": ["foo": 5],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo").end(at: [3]).limit(toLast: 2)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2], ["foo": 3]], enforceOrder: true)
+  }
+
+  func testSupportsStartAfterWithDocumentSnapshot() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["id": 1, "foo": 1, "bar": 1, "baz": 1],
+      "2": ["id": 2, "foo": 1, "bar": 1, "baz": 2],
+      "3": ["id": 3, "foo": 1, "bar": 1, "baz": 2],
+      "4": ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+      "5": ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+      "6": ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+      "7": ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+      "8": ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+      "9": ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+      "10": ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+      "11": ["id": 11, "foo": 2, "bar": 2, "baz": 2],
+      "12": ["id": 12, "foo": 2, "bar": 2, "baz": 2],
+    ])
+    let db = collRef.firestore
+
+    var docRef = try await collRef.document("2").getDocument()
+    var query = collRef.order(by: "foo").order(by: "bar").order(by: "baz")
+      .start(afterDocument: docRef)
+    var pipeline = db.pipeline().create(from: query)
+    var snapshot = try await pipeline.execute()
+
+    verifyResults(
+      snapshot,
+      [
+        ["id": 3, "foo": 1, "bar": 1, "baz": 2],
+        ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+        ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+        ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+        ["id": 11, "foo": 2, "bar": 2, "baz": 2],
+        ["id": 12, "foo": 2, "bar": 2, "baz": 2],
+      ],
+      enforceOrder: true
+    )
+
+    docRef = try await collRef.document("3").getDocument()
+    query = collRef.order(by: "foo").order(by: "bar").order(by: "baz").start(afterDocument: docRef)
+    pipeline = db.pipeline().create(from: query)
+    snapshot = try await pipeline.execute()
+    verifyResults(
+      snapshot,
+      [
+        ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+        ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+        ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+        ["id": 11, "foo": 2, "bar": 2, "baz": 2],
+        ["id": 12, "foo": 2, "bar": 2, "baz": 2],
+      ],
+      enforceOrder: true
+    )
+  }
+
+  func testSupportsStartAtWithDocumentSnapshot() async throws {
+    try XCTSkipIf(true, "Unsupported feature: sort on __name__ is not working")
+    let collRef = collectionRef(withDocuments: [
+      "1": ["id": 1, "foo": 1, "bar": 1, "baz": 1],
+      "2": ["id": 2, "foo": 1, "bar": 1, "baz": 2],
+      "3": ["id": 3, "foo": 1, "bar": 1, "baz": 2],
+      "4": ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+      "5": ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+      "6": ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+      "7": ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+      "8": ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+      "9": ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+      "10": ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+      "11": ["id": 11, "foo": 2, "bar": 2, "baz": 2],
+      "12": ["id": 12, "foo": 2, "bar": 2, "baz": 2],
+    ])
+    let db = collRef.firestore
+
+    var docRef = try await collRef.document("2").getDocument()
+    var query = collRef.order(by: "foo").order(by: "bar").order(by: "baz").start(atDocument: docRef)
+    var pipeline = db.pipeline().create(from: query)
+    var snapshot = try await pipeline.execute()
+
+    verifyResults(
+      snapshot,
+      [
+        ["id": 2, "foo": 1, "bar": 1, "baz": 2],
+        ["id": 3, "foo": 1, "bar": 1, "baz": 2],
+        ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+        ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+        ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+        ["id": 11, "foo": 2, "bar": 2, "baz": 2],
+        ["id": 12, "foo": 2, "bar": 2, "baz": 2],
+      ],
+      enforceOrder: true
+    )
+
+    docRef = try await collRef.document("3").getDocument()
+    query = collRef.order(by: "foo").order(by: "bar").order(by: "baz").start(atDocument: docRef)
+    pipeline = db.pipeline().create(from: query)
+    snapshot = try await pipeline.execute()
+    verifyResults(
+      snapshot,
+      [
+        ["id": 3, "foo": 1, "bar": 1, "baz": 2],
+        ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+        ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+        ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+        ["id": 11, "foo": 2, "bar": 2, "baz": 2],
+        ["id": 12, "foo": 2, "bar": 2, "baz": 2],
+      ],
+      enforceOrder: true
+    )
+  }
+
+  func testSupportsStartAfter() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo").start(after: [1])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2]], enforceOrder: true)
+  }
+
+  func testSupportsEndAt() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo").end(at: [1])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]], enforceOrder: true)
+  }
+
+  func testSupportsEndBefore() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.order(by: "foo").end(before: [2])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]], enforceOrder: true)
+  }
+
+  func testSupportsPagination() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    var query = collRef.order(by: "foo").limit(to: 1)
+    var pipeline = db.pipeline().create(from: query)
+    var snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]], enforceOrder: true)
+
+    let lastFoo = snapshot.results.first!.get("foo")!
+    query = query.start(after: [lastFoo])
+    pipeline = db.pipeline().create(from: query)
+    snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2]], enforceOrder: true)
+  }
+
+  func testSupportsPaginationOnDocumentIds() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1],
+      "2": ["foo": 2],
+    ])
+    let db = collRef.firestore
+
+    var query = collRef.order(by: "foo").order(by: FieldPath.documentID()).limit(to: 1)
+    var pipeline = db.pipeline().create(from: query)
+    var snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]], enforceOrder: true)
+
+    let lastSnapshot = snapshot.results.first!
+    query = query.start(after: [lastSnapshot.get("foo")!, lastSnapshot.ref!.documentID])
+    pipeline = db.pipeline().create(from: query)
+    snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2]], enforceOrder: true)
+  }
+
+  func testSupportsCollectionGroups() async throws {
+    let db = firestore()
+    let collRef = collectionRef()
+    let collectionGroupId = "\(collRef.collectionID)group"
+
+    let fooDoc = db.document("\(collRef.path)/foo/\(collectionGroupId)/doc1")
+    let barDoc = db.document("\(collRef.path)/bar/baz/boo/\(collectionGroupId)/doc2")
+
+    try await fooDoc.setData(["foo": 1])
+    try await barDoc.setData(["bar": 1])
+
+    let query = db.collectionGroup(collectionGroupId)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["bar": 1], ["foo": 1]])
+  }
+
+  func testSupportsQueryOverCollectionPathWithSpecialCharacters() async throws {
+    let collRef = collectionRef()
+    let db = collRef.firestore
+
+    let docWithSpecials = collRef.document("so! @#$%^&*()_+special")
+    let collectionWithSpecials = docWithSpecials.collection("so! @#$%^&*()_+special")
+
+    try await collectionWithSpecials.addDocument(data: ["foo": 1])
+    try await collectionWithSpecials.addDocument(data: ["foo": 2])
+
+    let query = collectionWithSpecials.order(by: "foo", descending: false)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1], ["foo": 2]], enforceOrder: true)
+  }
+
+  func testSupportsMultipleInequalityOnSameField() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "01": ["id": 1, "foo": 1, "bar": 1, "baz": 1],
+      "02": ["id": 2, "foo": 1, "bar": 1, "baz": 2],
+      "03": ["id": 3, "foo": 1, "bar": 1, "baz": 2],
+      "04": ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+      "05": ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+      "06": ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+      "07": ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+      "08": ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+      "09": ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+      "10": ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+      "11": ["id": 11, "foo": 2, "bar": 2, "baz": 2],
+      "12": ["id": 12, "foo": 2, "bar": 2, "baz": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("id", isGreaterThan: 2).whereField("id", isLessThanOrEqualTo: 10)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(
+      snapshot,
+      [
+        ["id": 3, "foo": 1, "bar": 1, "baz": 2],
+        ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+        ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+        ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+        ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+        ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+      ],
+      enforceOrder: false
+    )
+  }
+
+  func testSupportsMultipleInequalityOnDifferentFields() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "01": ["id": 1, "foo": 1, "bar": 1, "baz": 1],
+      "02": ["id": 2, "foo": 1, "bar": 1, "baz": 2],
+      "03": ["id": 3, "foo": 1, "bar": 1, "baz": 2],
+      "04": ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+      "05": ["id": 5, "foo": 1, "bar": 2, "baz": 2],
+      "06": ["id": 6, "foo": 1, "bar": 2, "baz": 2],
+      "07": ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+      "08": ["id": 8, "foo": 2, "bar": 1, "baz": 2],
+      "09": ["id": 9, "foo": 2, "bar": 1, "baz": 2],
+      "10": ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+      "11": ["id": 11, "foo": 2, "bar": 2, "baz": 2],
+      "12": ["id": 12, "foo": 2, "bar": 2, "baz": 2],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("id", isGreaterThanOrEqualTo: 2)
+      .whereField("baz", isLessThan: 2)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(
+      snapshot,
+      [
+        ["id": 4, "foo": 1, "bar": 2, "baz": 1],
+        ["id": 7, "foo": 2, "bar": 1, "baz": 1],
+        ["id": 10, "foo": 2, "bar": 2, "baz": 1],
+      ],
+      enforceOrder: false
+    )
+  }
+
+  func testSupportsCollectionGroupQuery() async throws {
+    let collRef = collectionRef(withDocuments: ["1": ["foo": 1]])
+    let db = collRef.firestore
+
+    let query = db.collectionGroup(collRef.collectionID)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1]])
+  }
+
+  func testSupportsEqNan() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": Double.nan],
+      "2": ["foo": 2, "bar": 1],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", isEqualTo: Double.nan)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    XCTAssertEqual(snapshot.results.count, 1)
+    let data = snapshot.results.first!.data
+    XCTAssertEqual(data["foo"] as? Int, 1)
+    XCTAssertTrue((data["bar"] as? Double)?.isNaN ?? false)
+  }
+
+  func testSupportsNeqNan() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": Double.nan],
+      "2": ["foo": 2, "bar": 1],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", isNotEqualTo: Double.nan)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2, "bar": 1]])
+  }
+
+  func testSupportsEqNull() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": NSNull()],
+      "2": ["foo": 2, "bar": 1],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", isEqualTo: NSNull())
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1, "bar": nil]])
+  }
+
+  func testSupportsNeqNull() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": NSNull()],
+      "2": ["foo": 2, "bar": 1],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", isNotEqualTo: NSNull())
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2, "bar": 1]])
+  }
+
+  func testSupportsNeq() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": 0],
+      "2": ["foo": 2, "bar": 1],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", isNotEqualTo: 0)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 2, "bar": 1]])
+  }
+
+  func testSupportsArrayContains() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": [0, 2, 4, 6]],
+      "2": ["foo": 2, "bar": [1, 3, 5, 7]],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", arrayContains: 4)
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1, "bar": [0, 2, 4, 6]]])
+  }
+
+  func testSupportsArrayContainsAny() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": [0, 2, 4, 6]],
+      "2": ["foo": 2, "bar": [1, 3, 5, 7]],
+      "3": ["foo": 3, "bar": [10, 20, 30, 40]],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", arrayContainsAny: [4, 5])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(
+      snapshot,
+      [
+        ["foo": 1, "bar": [0, 2, 4, 6]],
+        ["foo": 2, "bar": [1, 3, 5, 7]],
+      ]
+    )
+  }
+
+  func testSupportsIn() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": 2],
+      "2": ["foo": 2],
+      "3": ["foo": 3, "bar": 10],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", in: [0, 10, 20])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 3, "bar": 10]])
+  }
+
+  func testSupportsInWith1() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": 2],
+      "2": ["foo": 2],
+      "3": ["foo": 3, "bar": 10],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", in: [2])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1, "bar": 2]])
+  }
+
+  func testSupportsNotIn() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": 2],
+      "2": ["foo": 2, "bar": 1],
+      "3": ["foo": 3, "bar": 10],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", notIn: [0, 10, 20])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 1, "bar": 2], ["foo": 2, "bar": 1]])
+  }
+
+  func testSupportsNotInWith1() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": 2],
+      "2": ["foo": 2],
+      "3": ["foo": 3, "bar": 10],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereField("bar", notIn: [2])
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(snapshot, [["foo": 3, "bar": 10]])
+  }
+
+  func testSupportsOrOperator() async throws {
+    let collRef = collectionRef(withDocuments: [
+      "1": ["foo": 1, "bar": 2],
+      "2": ["foo": 2, "bar": 0],
+      "3": ["foo": 3, "bar": 10],
+    ])
+    let db = collRef.firestore
+
+    let query = collRef.whereFilter(Filter.orFilter([
+      Filter.whereField("bar", isEqualTo: 2),
+      Filter.whereField("foo", isEqualTo: 3),
+    ])).order(by: "foo")
+    let pipeline = db.pipeline().create(from: query)
+    let snapshot = try await pipeline.execute()
+
+    verifyResults(
+      snapshot,
+      [
+        ["foo": 1, "bar": 2],
+        ["foo": 3, "bar": 10],
+      ],
+      enforceOrder: true
+    )
+  }
+}

+ 40 - 0
Firestore/Swift/Tests/Integration/RealtimePipelineTests.swift

@@ -359,6 +359,14 @@ class RealtimePipelineIntegrationTests: FSTIntegrationTestCase {
     XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true)
     XCTAssertNotNil(result.get("rating") as? Timestamp)
     XCTAssertEqual(result.get("rating") as? Timestamp, result.data["rating"] as? Timestamp)
+    let firstChanges = firstSnapshot!.changes
+    XCTAssertEqual(firstChanges.count, 1)
+    XCTAssertEqual(firstChanges[0].type, .added)
+    XCTAssertNotNil(firstChanges[0].result.get("rating") as? Timestamp)
+    XCTAssertEqual(
+      firstChanges[0].result.get("rating") as? Timestamp,
+      result.get("rating") as? Timestamp
+    )
 
     enableNetwork()
 
@@ -368,6 +376,14 @@ class RealtimePipelineIntegrationTests: FSTIntegrationTestCase {
       secondSnapshot!.results()[0].get("rating") as? Timestamp,
       result.data["rating"] as? Timestamp
     )
+    let secondChanges = secondSnapshot!.changes
+    XCTAssertEqual(secondChanges.count, 1)
+    XCTAssertEqual(secondChanges[0].type, .modified)
+    XCTAssertNotNil(secondChanges[0].result.get("rating") as? Timestamp)
+    XCTAssertEqual(
+      secondChanges[0].result.get("rating") as? Timestamp,
+      secondSnapshot!.results()[0].get("rating") as? Timestamp
+    )
   }
 
   func testCanEvaluateServerTimestampEstimateProperly() async throws {
@@ -432,12 +448,24 @@ class RealtimePipelineIntegrationTests: FSTIntegrationTestCase {
     XCTAssertNotNil(result.get("rating") as? Double)
     XCTAssertEqual(result.get("rating") as! Double, 4.2)
     XCTAssertEqual(result.get("rating") as! Double, result.data["rating"] as! Double)
+    let firstChanges = firstSnapshot!.changes
+    XCTAssertEqual(firstChanges.count, 1)
+    XCTAssertEqual(firstChanges[0].type, .added)
+    XCTAssertEqual(firstChanges[0].result.get("rating") as! Double, 4.2)
 
     enableNetwork()
 
     let secondSnapshot = try await iterator.next()
     XCTAssertEqual(secondSnapshot!.metadata.isFromCache, false)
     XCTAssertNotNil(secondSnapshot!.results()[0].get("rating") as? Timestamp)
+    let secondChanges = secondSnapshot!.changes
+    XCTAssertEqual(secondChanges.count, 1)
+    XCTAssertEqual(secondChanges[0].type, .modified)
+    XCTAssertNotNil(secondChanges[0].result.get("rating") as? Timestamp)
+    XCTAssertEqual(
+      secondChanges[0].result.get("rating") as? Timestamp,
+      secondSnapshot!.results()[0].get("rating") as? Timestamp
+    )
   }
 
   func testCanEvaluateServerTimestampPreviousProperly() async throws {
@@ -493,12 +521,24 @@ class RealtimePipelineIntegrationTests: FSTIntegrationTestCase {
     XCTAssertEqual(firstSnapshot!.metadata.isFromCache, true)
     XCTAssertNil(result.get("rating") as? Timestamp)
     XCTAssertEqual(result.get("rating") as? Timestamp, result.data["rating"] as? Timestamp)
+    let firstChanges = firstSnapshot!.changes
+    XCTAssertEqual(firstChanges.count, 1)
+    XCTAssertEqual(firstChanges[0].type, .added)
+    XCTAssertNil(firstChanges[0].result.get("rating") as? Timestamp)
 
     enableNetwork()
 
     let secondSnapshot = try await iterator.next()
     XCTAssertEqual(secondSnapshot!.metadata.isFromCache, false)
     XCTAssertNotNil(secondSnapshot!.results()[0].get("rating") as? Timestamp)
+    let secondChanges = secondSnapshot!.changes
+    XCTAssertEqual(secondChanges.count, 1)
+    XCTAssertEqual(secondChanges[0].type, .modified)
+    XCTAssertNotNil(secondChanges[0].result.get("rating") as? Timestamp)
+    XCTAssertEqual(
+      secondChanges[0].result.get("rating") as? Timestamp,
+      secondSnapshot!.results()[0].get("rating") as? Timestamp
+    )
   }
 
   func testCanEvaluateServerTimestampNoneProperly() async throws {

+ 19 - 19
Firestore/core/src/api/stages.cc

@@ -50,7 +50,7 @@ CollectionSource::CollectionSource(std::string path)
 google_firestore_v1_Pipeline_Stage CollectionSource::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
 
-  result.name = nanopb::MakeBytesArray("collection");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 1;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
@@ -68,7 +68,7 @@ google_firestore_v1_Pipeline_Stage CollectionSource::to_proto() const {
 google_firestore_v1_Pipeline_Stage DatabaseSource::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
 
-  result.name = nanopb::MakeBytesArray("database");
+  result.name = nanopb::MakeBytesArray(name());
   result.args_count = 0;
   result.args = nullptr;
   result.options_count = 0;
@@ -80,7 +80,7 @@ google_firestore_v1_Pipeline_Stage DatabaseSource::to_proto() const {
 google_firestore_v1_Pipeline_Stage CollectionGroupSource::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
 
-  result.name = nanopb::MakeBytesArray("collection_group");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 2;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(2);
@@ -102,7 +102,7 @@ google_firestore_v1_Pipeline_Stage CollectionGroupSource::to_proto() const {
 google_firestore_v1_Pipeline_Stage DocumentsSource::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
 
-  result.name = nanopb::MakeBytesArray("documents");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = static_cast<pb_size_t>(documents_.size());
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(result.args_count);
@@ -123,7 +123,7 @@ google_firestore_v1_Pipeline_Stage DocumentsSource::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage AddFields::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("add_fields");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 1;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
@@ -143,7 +143,7 @@ google_firestore_v1_Pipeline_Stage AddFields::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage AggregateStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("aggregate");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 2;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(2);
@@ -177,7 +177,7 @@ google_firestore_v1_Pipeline_Stage AggregateStage::to_proto() const {
 google_firestore_v1_Pipeline_Stage Where::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
 
-  result.name = nanopb::MakeBytesArray("where");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 1;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
@@ -208,7 +208,7 @@ google_firestore_v1_Value FindNearestStage::DistanceMeasure::proto() const {
 
 google_firestore_v1_Pipeline_Stage FindNearestStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("find_nearest");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 3;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(3);
@@ -228,7 +228,7 @@ google_firestore_v1_Pipeline_Stage FindNearestStage::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage LimitStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("limit");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 1;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
@@ -242,7 +242,7 @@ google_firestore_v1_Pipeline_Stage LimitStage::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage OffsetStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("offset");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 1;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
@@ -256,7 +256,7 @@ google_firestore_v1_Pipeline_Stage OffsetStage::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage SelectStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("select");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 1;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
@@ -276,7 +276,7 @@ google_firestore_v1_Pipeline_Stage SelectStage::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage SortStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("sort");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = static_cast<pb_size_t>(orders_.size());
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(result.args_count);
@@ -292,7 +292,7 @@ google_firestore_v1_Pipeline_Stage SortStage::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage DistinctStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("distinct");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 1;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
@@ -312,7 +312,7 @@ google_firestore_v1_Pipeline_Stage DistinctStage::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage RemoveFieldsStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("remove_fields");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = static_cast<pb_size_t>(fields_.size());
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(result.args_count);
@@ -342,7 +342,7 @@ google_firestore_v1_Value ReplaceWith::ReplaceMode::to_proto() const {
 
 google_firestore_v1_Pipeline_Stage ReplaceWith::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("replace_with");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 2;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(2);
@@ -379,7 +379,7 @@ Sample::Sample(SampleMode mode, int64_t count, double percentage)
 
 google_firestore_v1_Pipeline_Stage Sample::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("sample");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 2;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(2);
@@ -409,7 +409,7 @@ Union::Union(std::shared_ptr<Pipeline> other) : other_(std::move(other)) {
 
 google_firestore_v1_Pipeline_Stage Union::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("union");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 1;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(1);
@@ -430,7 +430,7 @@ Unnest::Unnest(std::shared_ptr<Expr> field,
 
 google_firestore_v1_Pipeline_Stage Unnest::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray("unnest");
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = 2;
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(2);
@@ -462,7 +462,7 @@ RawStage::RawStage(
 
 google_firestore_v1_Pipeline_Stage RawStage::to_proto() const {
   google_firestore_v1_Pipeline_Stage result;
-  result.name = nanopb::MakeBytesArray(name_);
+  result.name = nanopb::MakeBytesArray(name());
 
   result.args_count = static_cast<pb_size_t>(params_.size());
   result.args = nanopb::MakeArray<google_firestore_v1_Value>(result.args_count);

+ 82 - 16
Firestore/core/src/api/stages.h

@@ -49,6 +49,7 @@ class Stage {
   Stage() = default;
   virtual ~Stage() = default;
 
+  virtual const std::string& name() const = 0;
   virtual google_firestore_v1_Pipeline_Stage to_proto() const = 0;
 };
 
@@ -78,9 +79,8 @@ class EvaluateContext {
 class EvaluableStage : public Stage {
  public:
   EvaluableStage() = default;
-  virtual ~EvaluableStage() = default;
+  ~EvaluableStage() override = default;
 
-  virtual absl::string_view name() const = 0;
   virtual model::PipelineInputOutputVector Evaluate(
       const EvaluateContext& context,
       const model::PipelineInputOutputVector& inputs) const = 0;
@@ -93,8 +93,9 @@ class CollectionSource : public EvaluableStage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
-  absl::string_view name() const override {
-    return "collection";
+  const std::string& name() const override {
+    static const std::string kName = "collection";
+    return kName;
   }
 
   std::string path() const {
@@ -116,8 +117,9 @@ class DatabaseSource : public EvaluableStage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
-  absl::string_view name() const override {
-    return "database";
+  const std::string& name() const override {
+    static const std::string kName = "database";
+    return kName;
   }
 
   model::PipelineInputOutputVector Evaluate(
@@ -134,8 +136,9 @@ class CollectionGroupSource : public EvaluableStage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
-  absl::string_view name() const override {
-    return "collection_group";
+  const std::string& name() const override {
+    static const std::string kName = "collection_group";
+    return kName;
   }
 
   absl::string_view collection_id() const {
@@ -163,8 +166,9 @@ class DocumentsSource : public EvaluableStage {
       const EvaluateContext& context,
       const model::PipelineInputOutputVector& inputs) const override;
 
-  absl::string_view name() const override {
-    return "documents";
+  const std::string& name() const override {
+    static const std::string kName = "documents";
+    return kName;
   }
 
   std::vector<std::string> documents() const {
@@ -185,6 +189,11 @@ class AddFields : public Stage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "add_fields";
+    return kName;
+  }
+
  private:
   std::unordered_map<std::string, std::shared_ptr<Expr>> fields_;
 };
@@ -200,6 +209,11 @@ class AggregateStage : public Stage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "aggregate";
+    return kName;
+  }
+
  private:
   std::unordered_map<std::string, std::shared_ptr<AggregateFunction>>
       accumulators_;
@@ -214,8 +228,9 @@ class Where : public EvaluableStage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
-  absl::string_view name() const override {
-    return "where";
+  const std::string& name() const override {
+    static const std::string kName = "where";
+    return kName;
   }
 
   const Expr* expr() const {
@@ -259,6 +274,11 @@ class FindNearestStage : public Stage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "find_nearest";
+    return kName;
+  }
+
  private:
   std::shared_ptr<Expr> property_;
   nanopb::SharedMessage<google_firestore_v1_Value> vector_;
@@ -274,8 +294,9 @@ class LimitStage : public EvaluableStage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
-  absl::string_view name() const override {
-    return "limit";
+  const std::string& name() const override {
+    static const std::string kName = "limit";
+    return kName;
   }
 
   int64_t limit() const {
@@ -298,6 +319,11 @@ class OffsetStage : public Stage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "offset";
+    return kName;
+  }
+
  private:
   int64_t offset_;
 };
@@ -312,6 +338,11 @@ class SelectStage : public Stage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "select";
+    return kName;
+  }
+
  private:
   std::unordered_map<std::string, std::shared_ptr<Expr>> fields_;
 };
@@ -325,8 +356,9 @@ class SortStage : public EvaluableStage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
-  absl::string_view name() const override {
-    return "sort";
+  const std::string& name() const override {
+    static const std::string kName = "sort";
+    return kName;
   }
 
   model::PipelineInputOutputVector Evaluate(
@@ -351,6 +383,11 @@ class DistinctStage : public Stage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "distinct";
+    return kName;
+  }
+
  private:
   std::unordered_map<std::string, std::shared_ptr<Expr>> groups_;
 };
@@ -364,6 +401,11 @@ class RemoveFieldsStage : public Stage {
 
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "remove_fields";
+    return kName;
+  }
+
  private:
   std::vector<Field> fields_;
 };
@@ -392,6 +434,11 @@ class ReplaceWith : public Stage {
   ~ReplaceWith() override = default;
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "replace_with";
+    return kName;
+  }
+
  private:
   std::shared_ptr<Expr> expr_;
   ReplaceMode mode_;
@@ -420,6 +467,11 @@ class Sample : public Stage {
   ~Sample() override = default;
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "sample";
+    return kName;
+  }
+
  private:
   SampleMode mode_;
   int64_t count_;
@@ -432,6 +484,11 @@ class Union : public Stage {
   ~Union() override = default;
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "union";
+    return kName;
+  }
+
  private:
   std::shared_ptr<Pipeline> other_;
 };
@@ -444,6 +501,11 @@ class Unnest : public Stage {
   ~Unnest() override = default;
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    static const std::string kName = "unnest";
+    return kName;
+  }
+
  private:
   std::shared_ptr<Expr> field_;
   std::shared_ptr<Expr> alias_;
@@ -458,6 +520,10 @@ class RawStage : public Stage {
   ~RawStage() override = default;
   google_firestore_v1_Pipeline_Stage to_proto() const override;
 
+  const std::string& name() const override {
+    return name_;
+  }
+
  private:
   std::string name_;
   std::vector<google_firestore_v1_Value> params_;

+ 1 - 0
Firestore/core/src/core/pipeline_run.cc

@@ -22,6 +22,7 @@
 #include "Firestore/core/src/api/stages.h"
 #include "Firestore/core/src/core/pipeline_util.h"
 #include "Firestore/core/src/model/mutable_document.h"
+#include "Firestore/core/src/util/log.h"
 
 namespace firebase {
 namespace firestore {

+ 27 - 48
Firestore/core/src/core/pipeline_util.cc

@@ -40,6 +40,7 @@
 #include "Firestore/core/src/model/value_util.h"
 #include "Firestore/core/src/remote/serializer.h"
 #include "Firestore/core/src/util/comparison.h"
+#include "Firestore/core/src/util/exception.h"
 #include "Firestore/core/src/util/hard_assert.h"
 #include "Firestore/core/src/util/log.h"
 #include "absl/strings/str_cat.h"
@@ -654,17 +655,6 @@ std::shared_ptr<api::Expr> ToPipelineBooleanExpr(const Filter& filter) {
   return nullptr;
 }
 
-std::vector<api::Ordering> ReverseOrderings(
-    const std::vector<api::Ordering>& orderings) {
-  std::vector<api::Ordering> reversed;
-  reversed.reserve(orderings.size());
-  for (const auto& o : orderings) {
-    const api::Ordering new_order(o);
-    reversed.push_back(new_order.WithReversedDirection());
-  }
-  return reversed;
-}
-
 std::shared_ptr<api::Expr> WhereConditionsFromCursor(
     const Bound& bound,
     const std::vector<api::Ordering>& orderings,
@@ -680,7 +670,7 @@ std::shared_ptr<api::Expr> WhereConditionsFromCursor(
   std::string func_inclusive_name = is_before ? "lte" : "gte";
 
   std::vector<std::shared_ptr<api::Expr>> or_conditions;
-  for (size_t sub_end = 1; sub_end <= orderings.size(); ++sub_end) {
+  for (size_t sub_end = 1; sub_end <= cursors.size(); ++sub_end) {
     std::vector<std::shared_ptr<api::Expr>> conditions;
     for (size_t index = 0; index < sub_end; ++index) {
       if (index < sub_end - 1) {
@@ -767,48 +757,37 @@ std::vector<std::shared_ptr<api::EvaluableStage>> ToPipelineStages(
             : api::Ordering::Direction::DESCENDING);
   }
 
-  if (!api_orderings.empty()) {
-    if (query.limit_type() == LimitType::Last) {
-      auto reversed_api_orderings = ReverseOrderings(api_orderings);
-      stages.push_back(
-          std::make_shared<api::SortStage>(reversed_api_orderings));
+  if (query.start_at()) {
+    stages.push_back(std::make_shared<api::Where>(WhereConditionsFromCursor(
+        *query.start_at(), api_orderings, /*is_before*/ false)));
+  }
+
+  if (query.end_at()) {
+    stages.push_back(std::make_shared<api::Where>(WhereConditionsFromCursor(
+        *query.end_at(), api_orderings, /*is_before*/ true)));
+  }
 
-      if (query.start_at()) {
-        // For limitToLast, start_at defines what to exclude from the *end* of
-        // the un-reversed result set. With reversed sort, this becomes a
-        // 'before' cursor.
-        stages.push_back(std::make_shared<api::Where>(WhereConditionsFromCursor(
-            *query.start_at(), api_orderings, /*is_before=*/false)));
-      }
-      if (query.end_at()) {
-        // For limitToLast, end_at defines what to exclude from the *start* of
-        // the un-reversed result set. With reversed sort, this becomes an
-        // 'after' cursor.
-        stages.push_back(std::make_shared<api::Where>(WhereConditionsFromCursor(
-            *query.end_at(), api_orderings, /*is_before=*/true)));
-      }
+  if (query.has_limit()) {
+    if (query.limit_type() == LimitType::First) {
+      stages.push_back(std::make_shared<api::SortStage>(api_orderings));
       stages.push_back(std::make_shared<api::LimitStage>(query.limit()));
-      stages.push_back(
-          std::make_shared<api::SortStage>(api_orderings));  // Sort back
     } else {
-      stages.push_back(std::make_shared<api::SortStage>(api_orderings));
-      if (query.start_at()) {
-        stages.push_back(std::make_shared<api::Where>(WhereConditionsFromCursor(
-            *query.start_at(), api_orderings, /*is_before=*/true)));
+      if (query.explicit_order_bys().empty()) {
+        util::ThrowInvalidArgument(
+            "limit(toLast:) queries require specifying at least one OrderBy() "
+            "clause.");
       }
-      if (query.end_at()) {
-        stages.push_back(std::make_shared<api::Where>(WhereConditionsFromCursor(
-            *query.end_at(), api_orderings, /*is_before=*/false)));
-      }
-      if (query.limit_type() == LimitType::First && query.limit()) {
-        stages.push_back(std::make_shared<api::LimitStage>(query.limit()));
+
+      std::vector<api::Ordering> reversed_orderings;
+      for (const auto& ordering : api_orderings) {
+        reversed_orderings.push_back(ordering.WithReversedDirection());
       }
+      stages.push_back(std::make_shared<api::SortStage>(reversed_orderings));
+      stages.push_back(std::make_shared<api::LimitStage>(query.limit()));
+      stages.push_back(std::make_shared<api::SortStage>(api_orderings));
     }
-  } else if (query.limit_type() == LimitType::First && query.limit()) {
-    // Limit without order by requires a default sort by __name__
-    stages.push_back(std::make_shared<api::SortStage>(
-        std::vector<api::Ordering>{NewKeyOrdering()}));
-    stages.push_back(std::make_shared<api::LimitStage>(query.limit()));
+  } else {
+    stages.push_back(std::make_shared<api::SortStage>(api_orderings));
   }
 
   return stages;

+ 41 - 13
Firestore/core/src/core/view.cc

@@ -23,6 +23,7 @@
 #include "Firestore/core/src/core/target.h"
 #include "Firestore/core/src/model/document_set.h"
 #include "Firestore/core/src/util/hard_assert.h"  // For HARD_ASSERT and HARD_FAIL
+#include "pipeline_run.h"
 
 namespace firebase {
 namespace firestore {
@@ -260,19 +261,46 @@ ViewDocumentChanges View::ComputeDocumentChanges(
   // Drop documents out to meet limitToFirst/limitToLast requirement.
   auto limit = GetLimit(query_);
   if (limit.has_value()) {
-    auto limit_type = GetLimitType(query_);
-    auto abs_limit = std::abs(limit.value());
-    if (abs_limit < static_cast<int64_t>(new_document_set.size())) {
-      for (size_t i = new_document_set.size() - abs_limit; i > 0; --i) {
-        absl::optional<Document> found =
-            limit_type == LimitType::First
-                ? new_document_set.GetLastDocument()
-                : new_document_set.GetFirstDocument();
-        const Document& old_doc = *found;
-        new_document_set = new_document_set.erase(old_doc->key());
-        new_mutated_keys = new_mutated_keys.erase(old_doc->key());
-        change_set.AddChange(
-            DocumentViewChange{old_doc, DocumentViewChange::Type::Removed});
+    if (query_.IsPipeline()) {
+      // TODO(pipeline): Not very efficient obviously, but should be fine for
+      // now. Longer term, limit queries should be evaluated from query engine
+      // as well.
+      std::vector<model::MutableDocument> candidates;
+      for (const Document& doc : new_document_set) {
+        candidates.push_back(doc.get());
+      }
+
+      auto results = RunPipeline(
+          const_cast<api::RealtimePipeline&>(query_.pipeline()), candidates);
+      DocumentSet new_result = DocumentSet(query_.Comparator());
+      for (auto doc : results) {
+        new_result = new_result.insert(doc);
+      }
+
+      for (Document doc : new_document_set) {
+        if (!new_result.ContainsKey(doc->key())) {
+          new_mutated_keys = new_mutated_keys.erase(doc->key());
+          change_set.AddChange(
+              DocumentViewChange{doc, DocumentViewChange::Type::Removed});
+        }
+      }
+
+      new_document_set = new_result;
+    } else {
+      auto limit_type = GetLimitType(query_);
+      auto abs_limit = std::abs(limit.value());
+      if (abs_limit < static_cast<int64_t>(new_document_set.size())) {
+        for (size_t i = new_document_set.size() - abs_limit; i > 0; --i) {
+          absl::optional<Document> found =
+              limit_type == LimitType::First
+                  ? new_document_set.GetLastDocument()
+                  : new_document_set.GetFirstDocument();
+          const Document& old_doc = *found;
+          new_document_set = new_document_set.erase(old_doc->key());
+          new_mutated_keys = new_mutated_keys.erase(old_doc->key());
+          change_set.AddChange(
+              DocumentViewChange{old_doc, DocumentViewChange::Type::Removed});
+        }
       }
     }
   }

+ 4 - 4
Firestore/core/test/unit/local/query_engine_test.cc

@@ -42,6 +42,7 @@
 #include "Firestore/core/src/model/precondition.h"
 #include "Firestore/core/src/model/snapshot_version.h"
 #include "Firestore/core/src/remote/serializer.h"
+#include "Firestore/core/src/util/log.h"
 #include "Firestore/core/test/unit/core/pipeline/utils.h"
 #include "Firestore/core/test/unit/testutil/expression_test_util.h"
 #include "Firestore/core/test/unit/testutil/testutil.h"
@@ -217,9 +218,7 @@ DocumentSet QueryEngineTestBase::RunQuery(
   const auto docs = query_engine_.GetDocumentsMatchingQuery(
       query_or_pipeline_to_run, last_limbo_free_snapshot_version, remote_keys);
 
-  // The View is always constructed based on the original query's intent,
-  // regardless of whether it was executed as a query or pipeline.
-  View view(core::QueryOrPipeline{query}, DocumentKeySet());
+  View view(query_or_pipeline_to_run, DocumentKeySet());
   ViewDocumentChanges view_doc_changes = view.ComputeDocumentChanges(docs, {});
   return view.ApplyChanges(view_doc_changes).snapshot()->documents();
 }
@@ -628,11 +627,12 @@ TEST_P(QueryEngineTest, CanPerformOrQueriesUsingFullCollectionScan2) {
         [&] { return RunQuery(query6, kMissingLastLimboFreeSnapshot); });
     EXPECT_EQ(result6, DocSet(query6.Comparator(), {doc1, doc2}));
 
-    // Test with limits (implicit order by DESC): (a==1) || (b > 0)
+    // Test with limits (order by b ASC): (a==1) || (b > 0)
     // LIMIT_TO_LAST 2
     core::Query query7 = Query("coll")
                              .AddingFilter(OrFilters(
                                  {Filter("a", "==", 1), Filter("b", ">", 0)}))
+                             .AddingOrderBy(OrderBy("b", "asc"))
                              .WithLimitToLast(2);
     DocumentSet result7 = ExpectFullCollectionScan<DocumentSet>(
         [&] { return RunQuery(query7, kMissingLastLimboFreeSnapshot); });