/* * Copyright 2017 Google * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #import "FSTDatastore.h" #import #import #import #import "FIRFirestore+Internal.h" #import "FIRFirestoreErrors.h" #import "FIRFirestoreVersion.h" #import "FSTAssert.h" #import "FSTBufferedWriter.h" #import "FSTClasses.h" #import "FSTCredentialsProvider.h" #import "FSTDatabaseID.h" #import "FSTDatabaseInfo.h" #import "FSTDispatchQueue.h" #import "FSTDocument.h" #import "FSTDocumentKey.h" #import "FSTExponentialBackoff.h" #import "FSTLocalStore.h" #import "FSTLogger.h" #import "FSTMutation.h" #import "FSTQueryData.h" #import "FSTSerializerBeta.h" #import "Firestore.pbrpc.h" NS_ASSUME_NONNULL_BEGIN // GRPC does not publicly declare a means of disabling SSL, which we need for testing. Firestore // directly exposes an sslEnabled setting so this is required to plumb that through. Note that our // own tests depend on this working so we'll know if this changes upstream. @interface GRPCHost + (nullable instancetype)hostWithAddress:(NSString *)address; @property(nonatomic, getter=isSecure) BOOL secure; @end /** * Initial backoff time in seconds after an error. * Set to 1s according to https://cloud.google.com/apis/design/errors. */ static const NSTimeInterval kBackoffInitialDelay = 1; static const NSTimeInterval kBackoffMaxDelay = 60.0; static const double kBackoffFactor = 1.5; static NSString *const kXGoogAPIClientHeader = @"x-goog-api-client"; static NSString *const kGoogleCloudResourcePrefix = @"google-cloud-resource-prefix"; /** Function typedef used to create RPCs. */ typedef GRPCProtoCall * (^RPCFactory)(); #pragma mark - FSTStream /** The state of a stream. */ typedef NS_ENUM(NSInteger, FSTStreamState) { /** * The streaming RPC is not running and there's no error condition. Calling `start` will * start the stream immediately without backoff. While in this state -isStarted will return NO. */ FSTStreamStateInitial = 0, /** * The stream is starting, and is waiting for an auth token to attach to the initial request. * While in this state, isStarted will return YES but isOpen will return NO. */ FSTStreamStateAuth, /** * The streaming RPC is up and running. Requests and responses can flow freely. Both * isStarted and isOpen will return YES. */ FSTStreamStateOpen, /** * The stream encountered an error. The next start attempt will back off. While in this state * -isStarted will return NO. */ FSTStreamStateError, /** * An in-between state after an error where the stream is waiting before re-starting. After * waiting is complete, the stream will try to open. While in this state -isStarted will * return YES but isOpen will return NO. */ FSTStreamStateBackoff, /** * The stream has been explicitly stopped; no further events will be emitted. */ FSTStreamStateStopped, }; // We need to declare these classes first so that Datastore can alloc them. @interface FSTWatchStream () /** The delegate that will receive events generated by the watch stream. */ @property(nonatomic, weak, nullable) id delegate; @end @interface FSTBetaWatchStream : FSTWatchStream /** * Initializes the watch stream with its dependencies. */ - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials serializer:(FSTSerializerBeta *)serializer delegate:(id)delegate NS_DESIGNATED_INITIALIZER; - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials responseMessageClass:(Class)responseMessageClass delegate:(id)delegate NS_UNAVAILABLE; @end @interface FSTWriteStream () @property(nonatomic, weak, nullable) id delegate; @end @interface FSTBetaWriteStream : FSTWriteStream /** * Initializes the write stream with its dependencies. */ - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials serializer:(FSTSerializerBeta *)serializer delegate:(id)delegate NS_DESIGNATED_INITIALIZER; - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials responseMessageClass:(Class)responseMessageClass delegate:(id)delegate NS_UNAVAILABLE; @end @interface FSTStream () @property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo; @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue; @property(nonatomic, strong, readonly) id credentials; @property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass; @property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff; /** A flag tracking whether the stream received a message from the backend. */ @property(nonatomic, assign) BOOL messageReceived; /** * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the * state of the stream. */ @property(nonatomic, assign) FSTStreamState state; /** The RPC handle. Used for cancellation. */ @property(nonatomic, strong, nullable) GRPCCall *rpc; /** * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has * started. */ @property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter; @end #pragma mark - FSTDatastore @interface FSTDatastore () /** The GRPC service for Firestore. */ @property(nonatomic, strong, readonly) GCFSFirestore *service; @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue; /** An object for getting an auth token before each request. */ @property(nonatomic, strong, readonly) id credentials; @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer; @end @implementation FSTDatastore + (instancetype)datastoreWithDatabase:(FSTDatabaseInfo *)databaseInfo workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials { return [[FSTDatastore alloc] initWithDatabaseInfo:databaseInfo workerDispatchQueue:workerDispatchQueue credentials:credentials]; } - (instancetype)initWithDatabaseInfo:(FSTDatabaseInfo *)databaseInfo workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials { if (self = [super init]) { _databaseInfo = databaseInfo; if (!databaseInfo.isSSLEnabled) { GRPCHost *hostConfig = [GRPCHost hostWithAddress:databaseInfo.host]; hostConfig.secure = NO; } _service = [GCFSFirestore serviceWithHost:databaseInfo.host]; _workerDispatchQueue = workerDispatchQueue; _credentials = credentials; _serializer = [[FSTSerializerBeta alloc] initWithDatabaseID:databaseInfo.databaseID]; } return self; } - (NSString *)description { return [NSString stringWithFormat:@"", self.databaseInfo]; } /** * Converts the error to an error within the domain FIRFirestoreErrorDomain. */ + (NSError *)firestoreErrorForError:(NSError *)error { if (!error) { return error; } else if ([error.domain isEqualToString:FIRFirestoreErrorDomain]) { return error; } else if ([error.domain isEqualToString:kGRPCErrorDomain]) { FSTAssert(error.code >= GRPCErrorCodeCancelled && error.code <= GRPCErrorCodeUnauthenticated, @"Unknown GRPC error code: %ld", (long)error.code); return [NSError errorWithDomain:FIRFirestoreErrorDomain code:error.code userInfo:error.userInfo]; } else { return [NSError errorWithDomain:FIRFirestoreErrorDomain code:FIRFirestoreErrorCodeUnknown userInfo:@{NSUnderlyingErrorKey : error}]; } } + (BOOL)isAbortedError:(NSError *)error { FSTAssert([error.domain isEqualToString:FIRFirestoreErrorDomain], @"isAbortedError: only works with errors emitted by FSTDatastore."); return error.code == FIRFirestoreErrorCodeAborted; } + (BOOL)isPermanentWriteError:(NSError *)error { FSTAssert([error.domain isEqualToString:FIRFirestoreErrorDomain], @"isPerminanteWriteError: only works with errors emitted by FSTDatastore."); switch (error.code) { case FIRFirestoreErrorCodeCancelled: case FIRFirestoreErrorCodeUnknown: case FIRFirestoreErrorCodeDeadlineExceeded: case FIRFirestoreErrorCodeResourceExhausted: case FIRFirestoreErrorCodeInternal: case FIRFirestoreErrorCodeUnavailable: case FIRFirestoreErrorCodeUnauthenticated: // Unauthenticated means something went wrong with our token and we need // to retry with new credentials which will happen automatically. // TODO(b/37325376): Give up after second unauthenticated error. return NO; case FIRFirestoreErrorCodeInvalidArgument: case FIRFirestoreErrorCodeNotFound: case FIRFirestoreErrorCodeAlreadyExists: case FIRFirestoreErrorCodePermissionDenied: case FIRFirestoreErrorCodeFailedPrecondition: case FIRFirestoreErrorCodeAborted: // Aborted might be retried in some scenarios, but that is dependant on // the context and should handled individually by the calling code. // See https://cloud.google.com/apis/design/errors case FIRFirestoreErrorCodeOutOfRange: case FIRFirestoreErrorCodeUnimplemented: case FIRFirestoreErrorCodeDataLoss: default: return YES; } } /** Returns the string to be used as x-goog-api-client header value. */ + (NSString *)googAPIClientHeaderValue { // TODO(dimond): This should ideally also include the grpc version, however, gRPC defines the // version as a macro, so it would be hardcoded based on version we have at compile time of // the Firestore library, rather than the version available at runtime/at compile time by the // user of the library. return [NSString stringWithFormat:@"gl-objc/ fire/%s grpc/", FirebaseFirestoreVersionString]; } /** Returns the string to be used as google-cloud-resource-prefix header value. */ + (NSString *)googleCloudResourcePrefixForDatabaseID:(FSTDatabaseID *)databaseID { return [NSString stringWithFormat:@"projects/%@/databases/%@", databaseID.projectID, databaseID.databaseID]; } /** * Takes a dictionary of (HTTP) response headers and returns the set of whitelisted headers * (for logging purposes). */ + (NSDictionary *)extractWhiteListedHeaders: (NSDictionary *)headers { NSMutableDictionary *whiteListedHeaders = [NSMutableDictionary dictionary]; NSArray *whiteList = @[ @"date", @"x-google-backends", @"x-google-netmon-label", @"x-google-service", @"x-google-gfe-request-trace" ]; [headers enumerateKeysAndObjectsUsingBlock:^(NSString *headerName, NSString *headerValue, BOOL *stop) { if ([whiteList containsObject:[headerName lowercaseString]]) { whiteListedHeaders[headerName] = headerValue; } }]; return whiteListedHeaders; } /** Logs the (whitelisted) headers returned for an GRPCProtoCall RPC. */ + (void)logHeadersForRPC:(GRPCProtoCall *)rpc RPCName:(NSString *)rpcName { if ([FIRFirestore isLoggingEnabled]) { FSTLog(@"RPC %@ returned headers (whitelisted): %@", rpcName, [FSTDatastore extractWhiteListedHeaders:rpc.responseHeaders]); } } - (void)commitMutations:(NSArray *)mutations completion:(FSTVoidErrorBlock)completion { GCFSCommitRequest *request = [GCFSCommitRequest message]; request.database = [self.serializer encodedDatabaseID]; NSMutableArray *mutationProtos = [NSMutableArray array]; for (FSTMutation *mutation in mutations) { [mutationProtos addObject:[self.serializer encodedMutation:mutation]]; } request.writesArray = mutationProtos; RPCFactory rpcFactory = ^GRPCProtoCall * { __block GRPCProtoCall *rpc = [self.service RPCToCommitWithRequest:request handler:^(GCFSCommitResponse *response, NSError *_Nullable error) { error = [FSTDatastore firestoreErrorForError:error]; [self.workerDispatchQueue dispatchAsync:^{ FSTLog(@"RPC CommitRequest completed. Error: %@", error); [FSTDatastore logHeadersForRPC:rpc RPCName:@"CommitRequest"]; completion(error); }]; }]; return rpc; }; [self invokeRPCWithFactory:rpcFactory errorHandler:completion]; } - (void)lookupDocuments:(NSArray *)keys completion:(FSTVoidMaybeDocumentArrayErrorBlock)completion { GCFSBatchGetDocumentsRequest *request = [GCFSBatchGetDocumentsRequest message]; request.database = [self.serializer encodedDatabaseID]; for (FSTDocumentKey *key in keys) { [request.documentsArray addObject:[self.serializer encodedDocumentKey:key]]; } __block FSTMaybeDocumentDictionary *results = [FSTMaybeDocumentDictionary maybeDocumentDictionary]; RPCFactory rpcFactory = ^GRPCProtoCall * { __block GRPCProtoCall *rpc = [self.service RPCToBatchGetDocumentsWithRequest:request eventHandler:^(BOOL done, GCFSBatchGetDocumentsResponse *_Nullable response, NSError *_Nullable error) { error = [FSTDatastore firestoreErrorForError:error]; [self.workerDispatchQueue dispatchAsync:^{ if (error) { FSTLog(@"RPC BatchGetDocuments completed. Error: %@", error); [FSTDatastore logHeadersForRPC:rpc RPCName:@"BatchGetDocuments"]; completion(nil, error); return; } if (!done) { // Streaming response, accumulate result FSTMaybeDocument *doc = [self.serializer decodedMaybeDocumentFromBatch:response]; results = [results dictionaryBySettingObject:doc forKey:doc.key]; } else { // Streaming response is done, call completion FSTLog(@"RPC BatchGetDocuments completed successfully."); [FSTDatastore logHeadersForRPC:rpc RPCName:@"BatchGetDocuments"]; FSTAssert(!response, @"Got response after done."); NSMutableArray *docs = [NSMutableArray arrayWithCapacity:keys.count]; for (FSTDocumentKey *key in keys) { [docs addObject:results[key]]; } completion(docs, nil); } }]; }]; return rpc; }; [self invokeRPCWithFactory:rpcFactory errorHandler:^(NSError *_Nonnull error) { error = [FSTDatastore firestoreErrorForError:error]; completion(nil, error); }]; } - (void)invokeRPCWithFactory:(GRPCProtoCall * (^)())rpcFactory errorHandler:(FSTVoidErrorBlock)errorHandler { // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token, // but I'm not sure how to detect that right now. http://b/32762461 [self.credentials getTokenForcingRefresh:NO completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) { error = [FSTDatastore firestoreErrorForError:error]; [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{ if (error) { errorHandler(error); } else { GRPCProtoCall *rpc = rpcFactory(); [FSTDatastore prepareHeadersForRPC:rpc databaseID:self.databaseInfo.databaseID token:result.token]; [rpc start]; } }]; }]; } - (FSTWatchStream *)createWatchStreamWithDelegate:(id)delegate { return [[FSTBetaWatchStream alloc] initWithDatabase:_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:_credentials serializer:_serializer delegate:delegate]; } - (FSTWriteStream *)createWriteStreamWithDelegate:(id)delegate { return [[FSTBetaWriteStream alloc] initWithDatabase:_databaseInfo workerDispatchQueue:_workerDispatchQueue credentials:_credentials serializer:_serializer delegate:delegate]; } /** Adds headers to the RPC including any OAuth access token if provided .*/ + (void)prepareHeadersForRPC:(GRPCCall *)rpc databaseID:(FSTDatabaseID *)databaseID token:(nullable NSString *)token { rpc.oauth2AccessToken = token; rpc.requestHeaders[kXGoogAPIClientHeader] = [FSTDatastore googAPIClientHeaderValue]; // This header is used to improve routing and project isolation by the backend. rpc.requestHeaders[kGoogleCloudResourcePrefix] = [FSTDatastore googleCloudResourcePrefixForDatabaseID:databaseID]; } @end #pragma mark - FSTStream @implementation FSTStream - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials responseMessageClass:(Class)responseMessageClass { if (self = [super init]) { _databaseInfo = database; _workerDispatchQueue = workerDispatchQueue; _credentials = credentials; _responseMessageClass = responseMessageClass; _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue initialDelay:kBackoffInitialDelay backoffFactor:kBackoffFactor maxDelay:kBackoffMaxDelay]; _state = FSTStreamStateInitial; } return self; } - (BOOL)isStarted { [self.workerDispatchQueue verifyIsCurrentQueue]; FSTStreamState state = self.state; return state == FSTStreamStateBackoff || state == FSTStreamStateAuth || state == FSTStreamStateOpen; } - (BOOL)isOpen { [self.workerDispatchQueue verifyIsCurrentQueue]; return self.state == FSTStreamStateOpen; } - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { @throw FSTAbstractMethodException(); // NOLINT } - (void)start { [self.workerDispatchQueue verifyIsCurrentQueue]; if (self.state == FSTStreamStateError) { [self performBackoff]; return; } FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self); FSTAssert(self.state == FSTStreamStateInitial, @"Already started"); self.state = FSTStreamStateAuth; [self.credentials getTokenForcingRefresh:NO completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) { error = [FSTDatastore firestoreErrorForError:error]; [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{ [self resumeStartWithToken:result error:error]; }]; }]; } /** Add an access token to our RPC, after obtaining one from the credentials provider. */ - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { if (self.state == FSTStreamStateStopped) { // Streams can be stopped while waiting for authorization. return; } [self.workerDispatchQueue verifyIsCurrentQueue]; FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)", (long)self.state); // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token, // but I'm not sure how to detect that right now. http://b/32762461 if (error) { // RPC has not been started yet, so just invoke higher-level close handler. [self handleStreamClose:error]; return; } self.requestsWriter = [[FSTBufferedWriter alloc] init]; _rpc = [self createRPCWithRequestsWriter:self.requestsWriter]; [FSTDatastore prepareHeadersForRPC:_rpc databaseID:self.databaseInfo.databaseID token:token.token]; [_rpc startWithWriteable:self]; self.state = FSTStreamStateOpen; [self handleStreamOpen]; } /** Backs off after an error. */ - (void)performBackoff { FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self); [self.workerDispatchQueue verifyIsCurrentQueue]; FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case"); self.state = FSTStreamStateBackoff; FSTWeakify(self); [self.backoff backoffAndRunBlock:^{ FSTStrongify(self); [self resumeStartFromBackoff]; }]; } /** Resumes stream start after backing off. */ - (void)resumeStartFromBackoff { if (self.state == FSTStreamStateStopped) { // Streams can be stopped while waiting for backoff to complete. return; } // In order to have performed a backoff the stream must have been in an error state just prior // to entering the backoff state. If we weren't stopped we must be in the backoff state. FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)", (long)self.state); // Momentarily set state to FSTStreamStateInitial as `start` expects it. self.state = FSTStreamStateInitial; [self start]; FSTAssert([self isStarted], @"Stream should have started."); } - (void)stop { FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self); [self.workerDispatchQueue verifyIsCurrentQueue]; // Prevent any possible future restart of this stream. self.state = FSTStreamStateStopped; // Close the stream client side. FSTBufferedWriter *requestsWriter = self.requestsWriter; @synchronized(requestsWriter) { [requestsWriter finishWithError:nil]; } } - (void)inhibitBackoff { FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)", (long)self.state); [self.workerDispatchQueue verifyIsCurrentQueue]; // Clear the error condition. self.state = FSTStreamStateInitial; [self.backoff reset]; } /** * Parses a protocol buffer response from the server. If the message fails to parse, generates * an error and closes the stream. * * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:. * @param data The bytes in the response as returned from GRPC. * @return An instance of the protocol buffer message, parsed from the data if parsing was * successful, or nil otherwise. */ - (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error { NSError *parseError; id parsed = [protoClass parseFromData:data error:&parseError]; if (parsed) { *error = nil; return parsed; } else { NSDictionary *info = @{ NSLocalizedDescriptionKey : @"Unable to parse response from the server", NSUnderlyingErrorKey : parseError, @"Expected class" : protoClass, @"Received value" : data, }; *error = [NSError errorWithDomain:FIRFirestoreErrorDomain code:FIRFirestoreErrorCodeInternal userInfo:info]; return nil; } } /** * Writes a request proto into the stream. */ - (void)writeRequest:(GPBMessage *)request { NSData *data = [request data]; FSTBufferedWriter *requestsWriter = self.requestsWriter; @synchronized(requestsWriter) { [requestsWriter writeValue:data]; } } #pragma mark Template methods for subclasses /** * Called by the stream after the stream has been successfully connected, authenticated, and is now * ready to accept messages. * * Subclasses should relay to their stream-specific delegate. Calling [super handleStreamOpen] is * not required. */ - (void)handleStreamOpen { } /** * Called by the stream for each incoming protocol message coming from the server. * * Subclasses should implement this to deserialize the value and relay to their stream-specific * delegate, if appropriate. Calling [super handleStreamMessage] is not required. */ - (void)handleStreamMessage:(id)value { } /** * Called by the stream when the underlying RPC has been closed for whatever reason. * * Subclasses should first call [super handleStreamClose:] and then call to their * stream-specific delegate. */ - (void)handleStreamClose:(NSError *_Nullable)error { FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error); FSTAssert([self isStarted], @"Can't handle server close in non-started state."); [self.workerDispatchQueue verifyIsCurrentQueue]; self.messageReceived = NO; self.rpc = nil; self.requestsWriter = nil; // In theory the stream could close cleanly, however, in our current model we never expect this // to happen because if we stop a stream ourselves, this callback will never be called. To // prevent cases where we retry without a backoff accidentally, we set the stream to error // in all cases. self.state = FSTStreamStateError; if (error.code == FIRFirestoreErrorCodeResourceExhausted) { FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class], (__bridge void *)self); [self.backoff resetToMax]; } } #pragma mark GRXWriteable implementation // The GRXWriteable implementation defines the receive side of the RPC stream. /** * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately * redispatch back onto our own worker queue. */ - (void)writeValue:(id)value __used { // TODO(mcg): remove the double-dispatch once GRPCCall at head is released. // Once released we can set the responseDispatchQueue property on the GRPCCall and then this // method can call handleStreamMessage directly. FSTWeakify(self); [self.workerDispatchQueue dispatchAsync:^{ FSTStrongify(self); if (!self || self.state == FSTStreamStateStopped) { return; } if (!self.messageReceived) { self.messageReceived = YES; if ([FIRFirestore isLoggingEnabled]) { FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), (__bridge void *)self, [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); } } NSError *error; id proto = [self parseProto:self.responseMessageClass data:value error:&error]; if (proto) { [self handleStreamMessage:proto]; } else { [_rpc finishWithError:error]; } }]; } /** * Called by GRPC when it closed the stream with an error representing the final state of the * stream. * * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to * directly inform stream-specific logic, or call stop to tear down the stream. */ - (void)writesFinishedWithError:(NSError *_Nullable)error __used { error = [FSTDatastore firestoreErrorForError:error]; FSTWeakify(self); [self.workerDispatchQueue dispatchAsync:^{ FSTStrongify(self); if (!self || self.state == FSTStreamStateStopped) { return; } [self handleStreamClose:error]; }]; } @end #pragma mark - FSTWatchStream @implementation FSTWatchStream - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials responseMessageClass:(Class)responseMessageClass delegate:(id)delegate { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue credentials:credentials responseMessageClass:responseMessageClass]; if (self) { _delegate = delegate; } return self; } - (void)stop { // Clear the delegate to avoid any possible bleed through of events from GRPC. self.delegate = nil; [super stop]; } - (void)watchQuery:(FSTQueryData *)query { @throw FSTAbstractMethodException(); // NOLINT } - (void)unwatchTargetID:(FSTTargetID)targetID { @throw FSTAbstractMethodException(); // NOLINT } - (void)handleStreamOpen { [self.delegate watchStreamDidOpen]; } - (void)handleStreamClose:(NSError *_Nullable)error { [super handleStreamClose:error]; [self.delegate watchStreamDidClose:error]; } @end #pragma mark - FSTBetaWatchStream @implementation FSTBetaWatchStream { FSTSerializerBeta *_serializer; } - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials serializer:(FSTSerializerBeta *)serializer delegate:(id)delegate { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue credentials:credentials responseMessageClass:[GCFSListenResponse class] delegate:delegate]; if (self) { _serializer = serializer; } return self; } - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { return [[GRPCCall alloc] initWithHost:self.databaseInfo.host path:@"/google.firestore.v1beta1.Firestore/Listen" requestsWriter:requestsWriter]; } - (void)watchQuery:(FSTQueryData *)query { FSTAssert([self isOpen], @"Not yet open"); [self.workerDispatchQueue verifyIsCurrentQueue]; GCFSListenRequest *request = [GCFSListenRequest message]; request.database = [_serializer encodedDatabaseID]; request.addTarget = [_serializer encodedTarget:query]; request.labels = [_serializer encodedListenRequestLabelsForQueryData:query]; FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request); [self writeRequest:request]; } - (void)unwatchTargetID:(FSTTargetID)targetID { FSTAssert([self isOpen], @"Not yet open"); [self.workerDispatchQueue verifyIsCurrentQueue]; GCFSListenRequest *request = [GCFSListenRequest message]; request.database = [_serializer encodedDatabaseID]; request.removeTarget = targetID; FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request); [self writeRequest:request]; } /** * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's * watchStreamDidChange:snapshotVersion: callback. */ - (void)handleStreamMessage:(GCFSListenResponse *)proto { FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto); [self.workerDispatchQueue verifyIsCurrentQueue]; // A successful response means the stream is healthy. [self.backoff reset]; FSTWatchChange *change = [_serializer decodedWatchChange:proto]; FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto]; [self.delegate watchStreamDidChange:change snapshotVersion:snap]; } @end #pragma mark - FSTWriteStream @implementation FSTWriteStream - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials responseMessageClass:(Class)responseMessageClass delegate:(id)delegate { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue credentials:credentials responseMessageClass:responseMessageClass]; if (self) { _delegate = delegate; } return self; } - (void)start { self.handshakeComplete = NO; [super start]; } - (void)stop { // Clear the delegate to avoid any possible bleed through of events from GRPC. self.delegate = nil; [super stop]; } - (void)writeHandshake { @throw FSTAbstractMethodException(); // NOLINT } - (void)writeMutations:(NSArray *)mutations { @throw FSTAbstractMethodException(); // NOLINT } - (void)handleStreamOpen { [self.delegate writeStreamDidOpen]; } - (void)handleStreamClose:(NSError *_Nullable)error { [super handleStreamClose:error]; [self.delegate writeStreamDidClose:error]; } @end #pragma mark - FSTBetaWriteStream @implementation FSTBetaWriteStream { FSTSerializerBeta *_serializer; } - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue credentials:(id)credentials serializer:(FSTSerializerBeta *)serializer delegate:(id)delegate { self = [super initWithDatabase:database workerDispatchQueue:workerDispatchQueue credentials:credentials responseMessageClass:[GCFSWriteResponse class] delegate:delegate]; if (self) { _serializer = serializer; } return self; } - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter { return [[GRPCCall alloc] initWithHost:self.databaseInfo.host path:@"/google.firestore.v1beta1.Firestore/Write" requestsWriter:requestsWriter]; } - (void)writeHandshake { // The initial request cannot contain mutations, but must contain a projectID. FSTAssert([self isOpen], @"Not yet open"); FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn"); [self.workerDispatchQueue verifyIsCurrentQueue]; GCFSWriteRequest *request = [GCFSWriteRequest message]; request.database = [_serializer encodedDatabaseID]; // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the // handshake, ignoring any stream token we might have. FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request); [self writeRequest:request]; } - (void)writeMutations:(NSArray *)mutations { FSTAssert([self isOpen], @"Not yet open"); FSTAssert(self.handshakeComplete, @"Mutations sent out of turn"); [self.workerDispatchQueue verifyIsCurrentQueue]; NSMutableArray *protos = [NSMutableArray arrayWithCapacity:mutations.count]; for (FSTMutation *mutation in mutations) { [protos addObject:[_serializer encodedMutation:mutation]]; }; GCFSWriteRequest *request = [GCFSWriteRequest message]; request.writesArray = protos; request.streamToken = self.lastStreamToken; FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request); [self writeRequest:request]; } /** * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass * that on to the mutationResultsHandler. */ - (void)handleStreamMessage:(GCFSWriteResponse *)response { FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response); [self.workerDispatchQueue verifyIsCurrentQueue]; // A successful response means the stream is healthy. [self.backoff reset]; // Always capture the last stream token. self.lastStreamToken = response.streamToken; if (!self.handshakeComplete) { // The first response is the handshake response self.handshakeComplete = YES; [self.delegate writeStreamDidCompleteHandshake]; } else { FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime]; NSMutableArray *protos = response.writeResultsArray; NSMutableArray *results = [NSMutableArray arrayWithCapacity:protos.count]; for (GCFSWriteResult *proto in protos) { [results addObject:[_serializer decodedMutationResult:proto]]; }; [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results]; } } @end NS_ASSUME_NONNULL_END