/* * Copyright 2017 Google * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #import "Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h" #import #include #include #include #include #import "Firestore/Source/Core/FSTEventManager.h" #import "Firestore/Source/Core/FSTQuery.h" #import "Firestore/Source/Core/FSTSyncEngine.h" #import "Firestore/Source/Local/FSTLocalStore.h" #import "Firestore/Source/Local/FSTPersistence.h" #import "Firestore/Source/Model/FSTMutation.h" #import "Firestore/Source/Remote/FSTWatchChange.h" #import "Firestore/Example/Tests/Core/FSTSyncEngine+Testing.h" #import "Firestore/Example/Tests/SpecTests/FSTMockDatastore.h" #include "Firestore/core/include/firebase/firestore/firestore_errors.h" #include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h" #include "Firestore/core/src/firebase/firestore/auth/user.h" #include "Firestore/core/src/firebase/firestore/core/database_info.h" #include "Firestore/core/src/firebase/firestore/model/database_id.h" #include "Firestore/core/src/firebase/firestore/model/document_key.h" #include "Firestore/core/src/firebase/firestore/util/async_queue.h" #include "Firestore/core/src/firebase/firestore/util/executor_libdispatch.h" #include "Firestore/core/src/firebase/firestore/util/hard_assert.h" #include "Firestore/core/src/firebase/firestore/util/log.h" #include "Firestore/core/src/firebase/firestore/util/status.h" #include "absl/memory/memory.h" using firebase::firestore::FirestoreErrorCode; using firebase::firestore::auth::EmptyCredentialsProvider; using firebase::firestore::auth::HashUser; using firebase::firestore::auth::User; using firebase::firestore::core::DatabaseInfo; using firebase::firestore::model::DatabaseId; using firebase::firestore::model::DocumentKey; using firebase::firestore::model::DocumentKeySet; using firebase::firestore::model::OnlineState; using firebase::firestore::model::SnapshotVersion; using firebase::firestore::model::TargetId; using firebase::firestore::remote::MockDatastore; using firebase::firestore::util::AsyncQueue; using firebase::firestore::util::TimerId; using firebase::firestore::util::ExecutorLibdispatch; using firebase::firestore::util::MakeString; using firebase::firestore::util::Status; NS_ASSUME_NONNULL_BEGIN @implementation FSTQueryEvent - (NSString *)description { // The Query is also included in the view, so we skip it. return [NSString stringWithFormat:@"", self.viewSnapshot, self.error]; } @end @implementation FSTOutstandingWrite @end @interface FSTSyncEngineTestDriver () #pragma mark - Parts of the Firestore system that the spec tests need to control. @property(nonatomic, strong, readonly) FSTEventManager *eventManager; @property(nonatomic, strong, readonly) FSTRemoteStore *remoteStore; @property(nonatomic, strong, readonly) FSTLocalStore *localStore; @property(nonatomic, strong, readonly) FSTSyncEngine *syncEngine; @property(nonatomic, strong, readonly) id persistence; #pragma mark - Data structures for holding events sent by the watch stream. /** A block for the FSTEventAggregator to use to report events to the test. */ @property(nonatomic, strong, readonly) void (^eventHandler)(FSTQueryEvent *); /** The events received by our eventHandler and not yet retrieved via capturedEventsSinceLastCall */ @property(nonatomic, strong, readonly) NSMutableArray *events; /** A dictionary for tracking the listens on queries. */ @property(nonatomic, strong, readonly) NSMutableDictionary *queryListeners; #pragma mark - Data structures for holding events sent by the write stream. /** The names of the documents that the client acknowledged during the current spec test step */ @property(nonatomic, strong, readonly) NSMutableArray *acknowledgedDocs; /** The names of the documents that the client rejected during the current spec test step */ @property(nonatomic, strong, readonly) NSMutableArray *rejectedDocs; @end @implementation FSTSyncEngineTestDriver { std::unique_ptr _workerQueue; // ivar is declared as mutable. std::unordered_map *, HashUser> _outstandingWrites; DocumentKeySet _expectedLimboDocuments; DatabaseInfo _databaseInfo; User _currentUser; EmptyCredentialsProvider _credentialProvider; std::shared_ptr _datastore; } - (instancetype)initWithPersistence:(id)persistence { return [self initWithPersistence:persistence initialUser:User::Unauthenticated() outstandingWrites:{}]; } - (instancetype)initWithPersistence:(id)persistence initialUser:(const User &)initialUser outstandingWrites:(const FSTOutstandingWriteQueues &)outstandingWrites { if (self = [super init]) { // Do a deep copy. for (const auto &pair : outstandingWrites) { _outstandingWrites[pair.first] = [pair.second mutableCopy]; } _events = [NSMutableArray array]; _databaseInfo = {DatabaseId{"project", "database"}, "persistence", "host", false}; // Set up the sync engine and various stores. dispatch_queue_t queue = dispatch_queue_create("sync_engine_test_driver", DISPATCH_QUEUE_SERIAL); _workerQueue = absl::make_unique(absl::make_unique(queue)); _persistence = persistence; _localStore = [[FSTLocalStore alloc] initWithPersistence:persistence initialUser:initialUser]; _datastore = std::make_shared(_databaseInfo, _workerQueue.get(), &_credentialProvider); _remoteStore = [[FSTRemoteStore alloc] initWithLocalStore:_localStore datastore:_datastore workerQueue:_workerQueue.get()]; _syncEngine = [[FSTSyncEngine alloc] initWithLocalStore:_localStore remoteStore:_remoteStore initialUser:initialUser]; _remoteStore.syncEngine = _syncEngine; _eventManager = [FSTEventManager eventManagerWithSyncEngine:_syncEngine]; _remoteStore.onlineStateDelegate = self; // Set up internal event tracking for the spec tests. NSMutableArray *events = [NSMutableArray array]; _eventHandler = ^(FSTQueryEvent *e) { [events addObject:e]; }; _events = events; _queryListeners = [NSMutableDictionary dictionary]; _expectedActiveTargets = [NSDictionary dictionary]; _currentUser = initialUser; _acknowledgedDocs = [NSMutableArray array]; _rejectedDocs = [NSMutableArray array]; } return self; } - (const FSTOutstandingWriteQueues &)outstandingWrites { return _outstandingWrites; } - (const DocumentKeySet &)expectedLimboDocuments { return _expectedLimboDocuments; } - (void)setExpectedLimboDocuments:(DocumentKeySet)docs { _expectedLimboDocuments = std::move(docs); } - (void)drainQueue { _workerQueue->EnqueueBlocking([] {}); } - (const User &)currentUser { return _currentUser; } - (void)applyChangedOnlineState:(OnlineState)onlineState { [self.syncEngine applyChangedOnlineState:onlineState]; [self.eventManager applyChangedOnlineState:onlineState]; } - (void)start { _workerQueue->EnqueueBlocking([&] { [self.localStore start]; [self.remoteStore start]; }); } - (void)validateUsage { // We could relax this if we found a reason to. HARD_ASSERT(self.events.count == 0, "You must clear all pending events by calling" " capturedEventsSinceLastCall before calling shutdown."); } - (void)shutdown { _workerQueue->EnqueueBlocking([&] { [self.remoteStore shutdown]; [self.persistence shutdown]; }); } - (void)validateNextWriteSent:(FSTMutation *)expectedWrite { NSArray *request = _datastore->NextSentWrite(); // Make sure the write went through the pipe like we expected it to. HARD_ASSERT(request.count == 1, "Only single mutation requests are supported at the moment"); FSTMutation *actualWrite = request[0]; HARD_ASSERT([actualWrite isEqual:expectedWrite], "Mock datastore received write %s but first outstanding mutation was %s", actualWrite, expectedWrite); LOG_DEBUG("A write was sent: %s", actualWrite); } - (int)sentWritesCount { return _datastore->WritesSent(); } - (int)writeStreamRequestCount { return _datastore->write_stream_request_count(); } - (int)watchStreamRequestCount { return _datastore->watch_stream_request_count(); } - (void)disableNetwork { _workerQueue->EnqueueBlocking([&] { // Make sure to execute all writes that are currently queued. This allows us // to assert on the total number of requests sent before shutdown. [self.remoteStore fillWritePipeline]; [self.remoteStore disableNetwork]; }); } - (void)enableNetwork { _workerQueue->EnqueueBlocking([&] { [self.remoteStore enableNetwork]; }); } - (void)runTimer:(TimerId)timerID { _workerQueue->RunScheduledOperationsUntil(timerID); } - (void)changeUser:(const User &)user { _currentUser = user; _workerQueue->EnqueueBlocking([&] { [self.syncEngine credentialDidChangeWithUser:user]; }); } - (FSTOutstandingWrite *)receiveWriteAckWithVersion:(const SnapshotVersion &)commitVersion mutationResults: (NSArray *)mutationResults { FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject; [[self currentOutstandingWrites] removeObjectAtIndex:0]; [self validateNextWriteSent:write.write]; _workerQueue->EnqueueBlocking([&] { _datastore->AckWrite(commitVersion, mutationResults); }); return write; } - (FSTOutstandingWrite *)receiveWriteError:(int)errorCode userInfo:(NSDictionary *)userInfo keepInQueue:(BOOL)keepInQueue { Status error{static_cast(errorCode), MakeString([userInfo description])}; FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject; [self validateNextWriteSent:write.write]; // If this is a permanent error, the mutation is not expected to be sent again so we remove it // from currentOutstandingWrites. if (!keepInQueue) { [[self currentOutstandingWrites] removeObjectAtIndex:0]; } LOG_DEBUG("Failing a write."); _workerQueue->EnqueueBlocking([&] { _datastore->FailWrite(error); }); return write; } - (NSArray *)capturedEventsSinceLastCall { NSArray *result = [self.events copy]; [self.events removeAllObjects]; return result; } - (NSArray *)capturedAcknowledgedWritesSinceLastCall { NSArray *result = [self.acknowledgedDocs copy]; [self.acknowledgedDocs removeAllObjects]; return result; } - (NSArray *)capturedRejectedWritesSinceLastCall { NSArray *result = [self.rejectedDocs copy]; [self.rejectedDocs removeAllObjects]; return result; } - (TargetId)addUserListenerWithQuery:(FSTQuery *)query { // TODO(dimond): Allow customizing listen options in spec tests // TODO(dimond): Change spec tests to verify isFromCache on snapshots FSTListenOptions *options = [[FSTListenOptions alloc] initWithIncludeQueryMetadataChanges:YES includeDocumentMetadataChanges:YES waitForSyncWhenOnline:NO]; FSTQueryListener *listener = [[FSTQueryListener alloc] initWithQuery:query options:options viewSnapshotHandler:^(FSTViewSnapshot *_Nullable snapshot, NSError *_Nullable error) { FSTQueryEvent *event = [[FSTQueryEvent alloc] init]; event.query = query; event.viewSnapshot = snapshot; event.error = error; [self.events addObject:event]; }]; self.queryListeners[query] = listener; TargetId targetID; _workerQueue->EnqueueBlocking([&] { targetID = [self.eventManager addListener:listener]; }); return targetID; } - (void)removeUserListenerWithQuery:(FSTQuery *)query { FSTQueryListener *listener = self.queryListeners[query]; [self.queryListeners removeObjectForKey:query]; _workerQueue->EnqueueBlocking([&] { [self.eventManager removeListener:listener]; }); } - (void)writeUserMutation:(FSTMutation *)mutation { FSTOutstandingWrite *write = [[FSTOutstandingWrite alloc] init]; write.write = mutation; [[self currentOutstandingWrites] addObject:write]; LOG_DEBUG("sending a user write."); _workerQueue->EnqueueBlocking([=] { [self.syncEngine writeMutations:@[ mutation ] completion:^(NSError *_Nullable error) { LOG_DEBUG("A callback was called with error: %s", error); write.done = YES; write.error = error; NSString *mutationKey = [NSString stringWithCString:mutation.key.ToString().c_str() encoding:[NSString defaultCStringEncoding]]; if (error) { [self.rejectedDocs addObject:mutationKey]; } else { [self.acknowledgedDocs addObject:mutationKey]; } }]; }); } - (void)receiveWatchChange:(FSTWatchChange *)change snapshotVersion:(const SnapshotVersion &)snapshot { _workerQueue->EnqueueBlocking([&] { _datastore->WriteWatchChange(change, snapshot); }); } - (void)receiveWatchStreamError:(int)errorCode userInfo:(NSDictionary *)userInfo { Status error{static_cast(errorCode), MakeString([userInfo description])}; _workerQueue->EnqueueBlocking([&] { _datastore->FailWatchStream(error); // Unlike web, stream should re-open synchronously (if we have any listeners) if (self.queryListeners.count > 0) { HARD_ASSERT(_datastore->IsWatchStreamOpen(), "Watch stream is open"); } }); } - (std::map)currentLimboDocuments { return [self.syncEngine currentLimboDocuments]; } - (NSDictionary *)activeTargets { return _datastore->ActiveTargets(); } #pragma mark - Helper Methods - (NSMutableArray *)currentOutstandingWrites { NSMutableArray *writes = _outstandingWrites[_currentUser]; if (!writes) { writes = [NSMutableArray array]; _outstandingWrites[_currentUser] = writes; } return writes; } @end NS_ASSUME_NONNULL_END