FSTDatastore.m 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FSTDatastore.h"
  17. #import <GRPCClient/GRPCCall+OAuth2.h>
  18. #import <GRPCClient/GRPCCall.h>
  19. #import <ProtoRPC/ProtoRPC.h>
  20. #import "FIRFirestore+Internal.h"
  21. #import "FIRFirestoreErrors.h"
  22. #import "FIRFirestoreVersion.h"
  23. #import "FSTAssert.h"
  24. #import "FSTBufferedWriter.h"
  25. #import "FSTClasses.h"
  26. #import "FSTCredentialsProvider.h"
  27. #import "FSTDatabaseID.h"
  28. #import "FSTDatabaseInfo.h"
  29. #import "FSTDispatchQueue.h"
  30. #import "FSTDocument.h"
  31. #import "FSTDocumentKey.h"
  32. #import "FSTExponentialBackoff.h"
  33. #import "FSTLocalStore.h"
  34. #import "FSTLogger.h"
  35. #import "FSTMutation.h"
  36. #import "FSTQueryData.h"
  37. #import "FSTSerializerBeta.h"
  38. #import "Firestore.pbrpc.h"
  39. NS_ASSUME_NONNULL_BEGIN
  40. // GRPC does not publicly declare a means of disabling SSL, which we need for testing. Firestore
  41. // directly exposes an sslEnabled setting so this is required to plumb that through. Note that our
  42. // own tests depend on this working so we'll know if this changes upstream.
  43. @interface GRPCHost
  44. + (nullable instancetype)hostWithAddress:(NSString *)address;
  45. @property(nonatomic, getter=isSecure) BOOL secure;
  46. @end
  47. /**
  48. * Initial backoff time in seconds after an error.
  49. * Set to 1s according to https://cloud.google.com/apis/design/errors.
  50. */
  51. static const NSTimeInterval kBackoffInitialDelay = 1;
  52. static const NSTimeInterval kBackoffMaxDelay = 60.0;
  53. static const double kBackoffFactor = 1.5;
  54. static NSString *const kXGoogAPIClientHeader = @"x-goog-api-client";
  55. static NSString *const kGoogleCloudResourcePrefix = @"google-cloud-resource-prefix";
  56. /** Function typedef used to create RPCs. */
  57. typedef GRPCProtoCall * (^RPCFactory)(void);
  58. #pragma mark - FSTStream
  59. /** The state of a stream. */
  60. typedef NS_ENUM(NSInteger, FSTStreamState) {
  61. /**
  62. * The streaming RPC is not running and there's no error condition. Calling `start` will
  63. * start the stream immediately without backoff. While in this state -isStarted will return NO.
  64. */
  65. FSTStreamStateInitial = 0,
  66. /**
  67. * The stream is starting, and is waiting for an auth token to attach to the initial request.
  68. * While in this state, isStarted will return YES but isOpen will return NO.
  69. */
  70. FSTStreamStateAuth,
  71. /**
  72. * The streaming RPC is up and running. Requests and responses can flow freely. Both
  73. * isStarted and isOpen will return YES.
  74. */
  75. FSTStreamStateOpen,
  76. /**
  77. * The stream encountered an error. The next start attempt will back off. While in this state
  78. * -isStarted will return NO.
  79. */
  80. FSTStreamStateError,
  81. /**
  82. * An in-between state after an error where the stream is waiting before re-starting. After
  83. * waiting is complete, the stream will try to open. While in this state -isStarted will
  84. * return YES but isOpen will return NO.
  85. */
  86. FSTStreamStateBackoff,
  87. /**
  88. * The stream has been explicitly stopped; no further events will be emitted.
  89. */
  90. FSTStreamStateStopped,
  91. };
  92. // We need to declare these classes first so that Datastore can alloc them.
  93. @interface FSTWatchStream ()
  94. /** The delegate that will receive events generated by the watch stream. */
  95. @property(nonatomic, weak, nullable) id<FSTWatchStreamDelegate> delegate;
  96. @end
  97. @interface FSTBetaWatchStream : FSTWatchStream
  98. /**
  99. * Initializes the watch stream with its dependencies.
  100. */
  101. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  102. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  103. credentials:(id<FSTCredentialsProvider>)credentials
  104. serializer:(FSTSerializerBeta *)serializer
  105. delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
  106. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  107. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  108. credentials:(id<FSTCredentialsProvider>)credentials
  109. responseMessageClass:(Class)responseMessageClass
  110. delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE;
  111. @end
  112. @interface FSTWriteStream ()
  113. @property(nonatomic, weak, nullable) id<FSTWriteStreamDelegate> delegate;
  114. @end
  115. @interface FSTBetaWriteStream : FSTWriteStream
  116. /**
  117. * Initializes the write stream with its dependencies.
  118. */
  119. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  120. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  121. credentials:(id<FSTCredentialsProvider>)credentials
  122. serializer:(FSTSerializerBeta *)serializer
  123. delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
  124. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  125. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  126. credentials:(id<FSTCredentialsProvider>)credentials
  127. responseMessageClass:(Class)responseMessageClass
  128. delegate:(id<FSTWriteStreamDelegate>)delegate NS_UNAVAILABLE;
  129. @end
  130. @interface FSTStream () <GRXWriteable>
  131. @property(nonatomic, strong, readonly) FSTDatabaseInfo *databaseInfo;
  132. @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
  133. @property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
  134. @property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass;
  135. @property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff;
  136. /** A flag tracking whether the stream received a message from the backend. */
  137. @property(nonatomic, assign) BOOL messageReceived;
  138. /**
  139. * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the
  140. * state of the stream.
  141. */
  142. @property(nonatomic, assign) FSTStreamState state;
  143. /** The RPC handle. Used for cancellation. */
  144. @property(nonatomic, strong, nullable) GRPCCall *rpc;
  145. /**
  146. * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has
  147. * started.
  148. */
  149. @property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter;
  150. @end
  151. #pragma mark - FSTDatastore
  152. @interface FSTDatastore ()
  153. /** The GRPC service for Firestore. */
  154. @property(nonatomic, strong, readonly) GCFSFirestore *service;
  155. @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
  156. /** An object for getting an auth token before each request. */
  157. @property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
  158. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  159. @end
  160. @implementation FSTDatastore
  161. + (instancetype)datastoreWithDatabase:(FSTDatabaseInfo *)databaseInfo
  162. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  163. credentials:(id<FSTCredentialsProvider>)credentials {
  164. return [[FSTDatastore alloc] initWithDatabaseInfo:databaseInfo
  165. workerDispatchQueue:workerDispatchQueue
  166. credentials:credentials];
  167. }
  168. - (instancetype)initWithDatabaseInfo:(FSTDatabaseInfo *)databaseInfo
  169. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  170. credentials:(id<FSTCredentialsProvider>)credentials {
  171. if (self = [super init]) {
  172. _databaseInfo = databaseInfo;
  173. if (!databaseInfo.isSSLEnabled) {
  174. GRPCHost *hostConfig = [GRPCHost hostWithAddress:databaseInfo.host];
  175. hostConfig.secure = NO;
  176. }
  177. _service = [GCFSFirestore serviceWithHost:databaseInfo.host];
  178. _workerDispatchQueue = workerDispatchQueue;
  179. _credentials = credentials;
  180. _serializer = [[FSTSerializerBeta alloc] initWithDatabaseID:databaseInfo.databaseID];
  181. }
  182. return self;
  183. }
  184. - (NSString *)description {
  185. return [NSString stringWithFormat:@"<FSTDatastore: %@>", self.databaseInfo];
  186. }
  187. /**
  188. * Converts the error to an error within the domain FIRFirestoreErrorDomain.
  189. */
  190. + (NSError *)firestoreErrorForError:(NSError *)error {
  191. if (!error) {
  192. return error;
  193. } else if ([error.domain isEqualToString:FIRFirestoreErrorDomain]) {
  194. return error;
  195. } else if ([error.domain isEqualToString:kGRPCErrorDomain]) {
  196. FSTAssert(error.code >= GRPCErrorCodeCancelled && error.code <= GRPCErrorCodeUnauthenticated,
  197. @"Unknown GRPC error code: %ld", (long)error.code);
  198. return
  199. [NSError errorWithDomain:FIRFirestoreErrorDomain code:error.code userInfo:error.userInfo];
  200. } else {
  201. return [NSError errorWithDomain:FIRFirestoreErrorDomain
  202. code:FIRFirestoreErrorCodeUnknown
  203. userInfo:@{NSUnderlyingErrorKey : error}];
  204. }
  205. }
  206. + (BOOL)isAbortedError:(NSError *)error {
  207. FSTAssert([error.domain isEqualToString:FIRFirestoreErrorDomain],
  208. @"isAbortedError: only works with errors emitted by FSTDatastore.");
  209. return error.code == FIRFirestoreErrorCodeAborted;
  210. }
  211. + (BOOL)isPermanentWriteError:(NSError *)error {
  212. FSTAssert([error.domain isEqualToString:FIRFirestoreErrorDomain],
  213. @"isPerminanteWriteError: only works with errors emitted by FSTDatastore.");
  214. switch (error.code) {
  215. case FIRFirestoreErrorCodeCancelled:
  216. case FIRFirestoreErrorCodeUnknown:
  217. case FIRFirestoreErrorCodeDeadlineExceeded:
  218. case FIRFirestoreErrorCodeResourceExhausted:
  219. case FIRFirestoreErrorCodeInternal:
  220. case FIRFirestoreErrorCodeUnavailable:
  221. case FIRFirestoreErrorCodeUnauthenticated:
  222. // Unauthenticated means something went wrong with our token and we need
  223. // to retry with new credentials which will happen automatically.
  224. // TODO(b/37325376): Give up after second unauthenticated error.
  225. return NO;
  226. case FIRFirestoreErrorCodeInvalidArgument:
  227. case FIRFirestoreErrorCodeNotFound:
  228. case FIRFirestoreErrorCodeAlreadyExists:
  229. case FIRFirestoreErrorCodePermissionDenied:
  230. case FIRFirestoreErrorCodeFailedPrecondition:
  231. case FIRFirestoreErrorCodeAborted:
  232. // Aborted might be retried in some scenarios, but that is dependant on
  233. // the context and should handled individually by the calling code.
  234. // See https://cloud.google.com/apis/design/errors
  235. case FIRFirestoreErrorCodeOutOfRange:
  236. case FIRFirestoreErrorCodeUnimplemented:
  237. case FIRFirestoreErrorCodeDataLoss:
  238. default:
  239. return YES;
  240. }
  241. }
  242. /** Returns the string to be used as x-goog-api-client header value. */
  243. + (NSString *)googAPIClientHeaderValue {
  244. // TODO(dimond): This should ideally also include the grpc version, however, gRPC defines the
  245. // version as a macro, so it would be hardcoded based on version we have at compile time of
  246. // the Firestore library, rather than the version available at runtime/at compile time by the
  247. // user of the library.
  248. return [NSString stringWithFormat:@"gl-objc/ fire/%s grpc/", FirebaseFirestoreVersionString];
  249. }
  250. /** Returns the string to be used as google-cloud-resource-prefix header value. */
  251. + (NSString *)googleCloudResourcePrefixForDatabaseID:(FSTDatabaseID *)databaseID {
  252. return [NSString
  253. stringWithFormat:@"projects/%@/databases/%@", databaseID.projectID, databaseID.databaseID];
  254. }
  255. /**
  256. * Takes a dictionary of (HTTP) response headers and returns the set of whitelisted headers
  257. * (for logging purposes).
  258. */
  259. + (NSDictionary<NSString *, NSString *> *)extractWhiteListedHeaders:
  260. (NSDictionary<NSString *, NSString *> *)headers {
  261. NSMutableDictionary<NSString *, NSString *> *whiteListedHeaders =
  262. [NSMutableDictionary dictionary];
  263. NSArray<NSString *> *whiteList = @[
  264. @"date", @"x-google-backends", @"x-google-netmon-label", @"x-google-service",
  265. @"x-google-gfe-request-trace"
  266. ];
  267. [headers
  268. enumerateKeysAndObjectsUsingBlock:^(NSString *headerName, NSString *headerValue, BOOL *stop) {
  269. if ([whiteList containsObject:[headerName lowercaseString]]) {
  270. whiteListedHeaders[headerName] = headerValue;
  271. }
  272. }];
  273. return whiteListedHeaders;
  274. }
  275. /** Logs the (whitelisted) headers returned for an GRPCProtoCall RPC. */
  276. + (void)logHeadersForRPC:(GRPCProtoCall *)rpc RPCName:(NSString *)rpcName {
  277. if ([FIRFirestore isLoggingEnabled]) {
  278. FSTLog(@"RPC %@ returned headers (whitelisted): %@", rpcName,
  279. [FSTDatastore extractWhiteListedHeaders:rpc.responseHeaders]);
  280. }
  281. }
  282. - (void)commitMutations:(NSArray<FSTMutation *> *)mutations
  283. completion:(FSTVoidErrorBlock)completion {
  284. GCFSCommitRequest *request = [GCFSCommitRequest message];
  285. request.database = [self.serializer encodedDatabaseID];
  286. NSMutableArray<GCFSWrite *> *mutationProtos = [NSMutableArray array];
  287. for (FSTMutation *mutation in mutations) {
  288. [mutationProtos addObject:[self.serializer encodedMutation:mutation]];
  289. }
  290. request.writesArray = mutationProtos;
  291. RPCFactory rpcFactory = ^GRPCProtoCall * {
  292. __block GRPCProtoCall *rpc = [self.service
  293. RPCToCommitWithRequest:request
  294. handler:^(GCFSCommitResponse *response, NSError *_Nullable error) {
  295. error = [FSTDatastore firestoreErrorForError:error];
  296. [self.workerDispatchQueue dispatchAsync:^{
  297. FSTLog(@"RPC CommitRequest completed. Error: %@", error);
  298. [FSTDatastore logHeadersForRPC:rpc RPCName:@"CommitRequest"];
  299. completion(error);
  300. }];
  301. }];
  302. return rpc;
  303. };
  304. [self invokeRPCWithFactory:rpcFactory errorHandler:completion];
  305. }
  306. - (void)lookupDocuments:(NSArray<FSTDocumentKey *> *)keys
  307. completion:(FSTVoidMaybeDocumentArrayErrorBlock)completion {
  308. GCFSBatchGetDocumentsRequest *request = [GCFSBatchGetDocumentsRequest message];
  309. request.database = [self.serializer encodedDatabaseID];
  310. for (FSTDocumentKey *key in keys) {
  311. [request.documentsArray addObject:[self.serializer encodedDocumentKey:key]];
  312. }
  313. __block FSTMaybeDocumentDictionary *results =
  314. [FSTMaybeDocumentDictionary maybeDocumentDictionary];
  315. RPCFactory rpcFactory = ^GRPCProtoCall * {
  316. __block GRPCProtoCall *rpc = [self.service
  317. RPCToBatchGetDocumentsWithRequest:request
  318. eventHandler:^(BOOL done,
  319. GCFSBatchGetDocumentsResponse *_Nullable response,
  320. NSError *_Nullable error) {
  321. error = [FSTDatastore firestoreErrorForError:error];
  322. [self.workerDispatchQueue dispatchAsync:^{
  323. if (error) {
  324. FSTLog(@"RPC BatchGetDocuments completed. Error: %@", error);
  325. [FSTDatastore logHeadersForRPC:rpc RPCName:@"BatchGetDocuments"];
  326. completion(nil, error);
  327. return;
  328. }
  329. if (!done) {
  330. // Streaming response, accumulate result
  331. FSTMaybeDocument *doc =
  332. [self.serializer decodedMaybeDocumentFromBatch:response];
  333. results = [results dictionaryBySettingObject:doc forKey:doc.key];
  334. } else {
  335. // Streaming response is done, call completion
  336. FSTLog(@"RPC BatchGetDocuments completed successfully.");
  337. [FSTDatastore logHeadersForRPC:rpc RPCName:@"BatchGetDocuments"];
  338. FSTAssert(!response, @"Got response after done.");
  339. NSMutableArray<FSTMaybeDocument *> *docs =
  340. [NSMutableArray arrayWithCapacity:keys.count];
  341. for (FSTDocumentKey *key in keys) {
  342. [docs addObject:results[key]];
  343. }
  344. completion(docs, nil);
  345. }
  346. }];
  347. }];
  348. return rpc;
  349. };
  350. [self invokeRPCWithFactory:rpcFactory
  351. errorHandler:^(NSError *_Nonnull error) {
  352. error = [FSTDatastore firestoreErrorForError:error];
  353. completion(nil, error);
  354. }];
  355. }
  356. - (void)invokeRPCWithFactory:(GRPCProtoCall * (^)(void))rpcFactory
  357. errorHandler:(FSTVoidErrorBlock)errorHandler {
  358. // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
  359. // but I'm not sure how to detect that right now. http://b/32762461
  360. [self.credentials
  361. getTokenForcingRefresh:NO
  362. completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) {
  363. error = [FSTDatastore firestoreErrorForError:error];
  364. [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
  365. if (error) {
  366. errorHandler(error);
  367. } else {
  368. GRPCProtoCall *rpc = rpcFactory();
  369. [FSTDatastore prepareHeadersForRPC:rpc
  370. databaseID:self.databaseInfo.databaseID
  371. token:result.token];
  372. [rpc start];
  373. }
  374. }];
  375. }];
  376. }
  377. - (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate {
  378. return [[FSTBetaWatchStream alloc] initWithDatabase:_databaseInfo
  379. workerDispatchQueue:_workerDispatchQueue
  380. credentials:_credentials
  381. serializer:_serializer
  382. delegate:delegate];
  383. }
  384. - (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate {
  385. return [[FSTBetaWriteStream alloc] initWithDatabase:_databaseInfo
  386. workerDispatchQueue:_workerDispatchQueue
  387. credentials:_credentials
  388. serializer:_serializer
  389. delegate:delegate];
  390. }
  391. /** Adds headers to the RPC including any OAuth access token if provided .*/
  392. + (void)prepareHeadersForRPC:(GRPCCall *)rpc
  393. databaseID:(FSTDatabaseID *)databaseID
  394. token:(nullable NSString *)token {
  395. rpc.oauth2AccessToken = token;
  396. rpc.requestHeaders[kXGoogAPIClientHeader] = [FSTDatastore googAPIClientHeaderValue];
  397. // This header is used to improve routing and project isolation by the backend.
  398. rpc.requestHeaders[kGoogleCloudResourcePrefix] =
  399. [FSTDatastore googleCloudResourcePrefixForDatabaseID:databaseID];
  400. }
  401. @end
  402. #pragma mark - FSTStream
  403. @implementation FSTStream
  404. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  405. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  406. credentials:(id<FSTCredentialsProvider>)credentials
  407. responseMessageClass:(Class)responseMessageClass {
  408. if (self = [super init]) {
  409. _databaseInfo = database;
  410. _workerDispatchQueue = workerDispatchQueue;
  411. _credentials = credentials;
  412. _responseMessageClass = responseMessageClass;
  413. _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue
  414. initialDelay:kBackoffInitialDelay
  415. backoffFactor:kBackoffFactor
  416. maxDelay:kBackoffMaxDelay];
  417. _state = FSTStreamStateInitial;
  418. }
  419. return self;
  420. }
  421. - (BOOL)isStarted {
  422. [self.workerDispatchQueue verifyIsCurrentQueue];
  423. FSTStreamState state = self.state;
  424. return state == FSTStreamStateBackoff || state == FSTStreamStateAuth ||
  425. state == FSTStreamStateOpen;
  426. }
  427. - (BOOL)isOpen {
  428. [self.workerDispatchQueue verifyIsCurrentQueue];
  429. return self.state == FSTStreamStateOpen;
  430. }
  431. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  432. @throw FSTAbstractMethodException(); // NOLINT
  433. }
  434. - (void)start {
  435. [self.workerDispatchQueue verifyIsCurrentQueue];
  436. if (self.state == FSTStreamStateError) {
  437. [self performBackoff];
  438. return;
  439. }
  440. FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self);
  441. FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
  442. self.state = FSTStreamStateAuth;
  443. [self.credentials
  444. getTokenForcingRefresh:NO
  445. completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) {
  446. error = [FSTDatastore firestoreErrorForError:error];
  447. [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
  448. [self resumeStartWithToken:result error:error];
  449. }];
  450. }];
  451. }
  452. /** Add an access token to our RPC, after obtaining one from the credentials provider. */
  453. - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error {
  454. if (self.state == FSTStreamStateStopped) {
  455. // Streams can be stopped while waiting for authorization.
  456. return;
  457. }
  458. [self.workerDispatchQueue verifyIsCurrentQueue];
  459. FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
  460. (long)self.state);
  461. // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
  462. // but I'm not sure how to detect that right now. http://b/32762461
  463. if (error) {
  464. // RPC has not been started yet, so just invoke higher-level close handler.
  465. [self handleStreamClose:error];
  466. return;
  467. }
  468. self.requestsWriter = [[FSTBufferedWriter alloc] init];
  469. _rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
  470. [FSTDatastore prepareHeadersForRPC:_rpc
  471. databaseID:self.databaseInfo.databaseID
  472. token:token.token];
  473. [_rpc startWithWriteable:self];
  474. self.state = FSTStreamStateOpen;
  475. [self handleStreamOpen];
  476. }
  477. /** Backs off after an error. */
  478. - (void)performBackoff {
  479. FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
  480. [self.workerDispatchQueue verifyIsCurrentQueue];
  481. FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case");
  482. self.state = FSTStreamStateBackoff;
  483. FSTWeakify(self);
  484. [self.backoff backoffAndRunBlock:^{
  485. FSTStrongify(self);
  486. [self resumeStartFromBackoff];
  487. }];
  488. }
  489. /** Resumes stream start after backing off. */
  490. - (void)resumeStartFromBackoff {
  491. if (self.state == FSTStreamStateStopped) {
  492. // Streams can be stopped while waiting for backoff to complete.
  493. return;
  494. }
  495. // In order to have performed a backoff the stream must have been in an error state just prior
  496. // to entering the backoff state. If we weren't stopped we must be in the backoff state.
  497. FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)",
  498. (long)self.state);
  499. // Momentarily set state to FSTStreamStateInitial as `start` expects it.
  500. self.state = FSTStreamStateInitial;
  501. [self start];
  502. FSTAssert([self isStarted], @"Stream should have started.");
  503. }
  504. - (void)stop {
  505. FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
  506. [self.workerDispatchQueue verifyIsCurrentQueue];
  507. // Prevent any possible future restart of this stream.
  508. self.state = FSTStreamStateStopped;
  509. // Close the stream client side.
  510. FSTBufferedWriter *requestsWriter = self.requestsWriter;
  511. @synchronized(requestsWriter) {
  512. [requestsWriter finishWithError:nil];
  513. }
  514. }
  515. - (void)inhibitBackoff {
  516. FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)",
  517. (long)self.state);
  518. [self.workerDispatchQueue verifyIsCurrentQueue];
  519. // Clear the error condition.
  520. self.state = FSTStreamStateInitial;
  521. [self.backoff reset];
  522. }
  523. /**
  524. * Parses a protocol buffer response from the server. If the message fails to parse, generates
  525. * an error and closes the stream.
  526. *
  527. * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:.
  528. * @param data The bytes in the response as returned from GRPC.
  529. * @return An instance of the protocol buffer message, parsed from the data if parsing was
  530. * successful, or nil otherwise.
  531. */
  532. - (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error {
  533. NSError *parseError;
  534. id parsed = [protoClass parseFromData:data error:&parseError];
  535. if (parsed) {
  536. *error = nil;
  537. return parsed;
  538. } else {
  539. NSDictionary *info = @{
  540. NSLocalizedDescriptionKey : @"Unable to parse response from the server",
  541. NSUnderlyingErrorKey : parseError,
  542. @"Expected class" : protoClass,
  543. @"Received value" : data,
  544. };
  545. *error = [NSError errorWithDomain:FIRFirestoreErrorDomain
  546. code:FIRFirestoreErrorCodeInternal
  547. userInfo:info];
  548. return nil;
  549. }
  550. }
  551. /**
  552. * Writes a request proto into the stream.
  553. */
  554. - (void)writeRequest:(GPBMessage *)request {
  555. NSData *data = [request data];
  556. FSTBufferedWriter *requestsWriter = self.requestsWriter;
  557. @synchronized(requestsWriter) {
  558. [requestsWriter writeValue:data];
  559. }
  560. }
  561. #pragma mark Template methods for subclasses
  562. /**
  563. * Called by the stream after the stream has been successfully connected, authenticated, and is now
  564. * ready to accept messages.
  565. *
  566. * Subclasses should relay to their stream-specific delegate. Calling [super handleStreamOpen] is
  567. * not required.
  568. */
  569. - (void)handleStreamOpen {
  570. }
  571. /**
  572. * Called by the stream for each incoming protocol message coming from the server.
  573. *
  574. * Subclasses should implement this to deserialize the value and relay to their stream-specific
  575. * delegate, if appropriate. Calling [super handleStreamMessage] is not required.
  576. */
  577. - (void)handleStreamMessage:(id)value {
  578. }
  579. /**
  580. * Called by the stream when the underlying RPC has been closed for whatever reason.
  581. *
  582. * Subclasses should first call [super handleStreamClose:] and then call to their
  583. * stream-specific delegate.
  584. */
  585. - (void)handleStreamClose:(NSError *_Nullable)error {
  586. FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
  587. FSTAssert([self isStarted], @"Can't handle server close in non-started state.");
  588. [self.workerDispatchQueue verifyIsCurrentQueue];
  589. self.messageReceived = NO;
  590. self.rpc = nil;
  591. self.requestsWriter = nil;
  592. // In theory the stream could close cleanly, however, in our current model we never expect this
  593. // to happen because if we stop a stream ourselves, this callback will never be called. To
  594. // prevent cases where we retry without a backoff accidentally, we set the stream to error
  595. // in all cases.
  596. self.state = FSTStreamStateError;
  597. if (error.code == FIRFirestoreErrorCodeResourceExhausted) {
  598. FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
  599. (__bridge void *)self);
  600. [self.backoff resetToMax];
  601. }
  602. }
  603. #pragma mark GRXWriteable implementation
  604. // The GRXWriteable implementation defines the receive side of the RPC stream.
  605. /**
  606. * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
  607. * redispatch back onto our own worker queue.
  608. */
  609. - (void)writeValue:(id)value __used {
  610. // TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
  611. // Once released we can set the responseDispatchQueue property on the GRPCCall and then this
  612. // method can call handleStreamMessage directly.
  613. FSTWeakify(self);
  614. [self.workerDispatchQueue dispatchAsync:^{
  615. FSTStrongify(self);
  616. if (!self || self.state == FSTStreamStateStopped) {
  617. return;
  618. }
  619. if (!self.messageReceived) {
  620. self.messageReceived = YES;
  621. if ([FIRFirestore isLoggingEnabled]) {
  622. FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
  623. (__bridge void *)self,
  624. [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
  625. }
  626. }
  627. NSError *error;
  628. id proto = [self parseProto:self.responseMessageClass data:value error:&error];
  629. if (proto) {
  630. [self handleStreamMessage:proto];
  631. } else {
  632. [_rpc finishWithError:error];
  633. }
  634. }];
  635. }
  636. /**
  637. * Called by GRPC when it closed the stream with an error representing the final state of the
  638. * stream.
  639. *
  640. * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
  641. * directly inform stream-specific logic, or call stop to tear down the stream.
  642. */
  643. - (void)writesFinishedWithError:(NSError *_Nullable)error __used {
  644. error = [FSTDatastore firestoreErrorForError:error];
  645. FSTWeakify(self);
  646. [self.workerDispatchQueue dispatchAsync:^{
  647. FSTStrongify(self);
  648. if (!self || self.state == FSTStreamStateStopped) {
  649. return;
  650. }
  651. [self handleStreamClose:error];
  652. }];
  653. }
  654. @end
  655. #pragma mark - FSTWatchStream
  656. @implementation FSTWatchStream
  657. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  658. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  659. credentials:(id<FSTCredentialsProvider>)credentials
  660. responseMessageClass:(Class)responseMessageClass
  661. delegate:(id<FSTWatchStreamDelegate>)delegate {
  662. self = [super initWithDatabase:database
  663. workerDispatchQueue:workerDispatchQueue
  664. credentials:credentials
  665. responseMessageClass:responseMessageClass];
  666. if (self) {
  667. _delegate = delegate;
  668. }
  669. return self;
  670. }
  671. - (void)stop {
  672. // Clear the delegate to avoid any possible bleed through of events from GRPC.
  673. self.delegate = nil;
  674. [super stop];
  675. }
  676. - (void)watchQuery:(FSTQueryData *)query {
  677. @throw FSTAbstractMethodException(); // NOLINT
  678. }
  679. - (void)unwatchTargetID:(FSTTargetID)targetID {
  680. @throw FSTAbstractMethodException(); // NOLINT
  681. }
  682. - (void)handleStreamOpen {
  683. [self.delegate watchStreamDidOpen];
  684. }
  685. - (void)handleStreamClose:(NSError *_Nullable)error {
  686. [super handleStreamClose:error];
  687. [self.delegate watchStreamDidClose:error];
  688. }
  689. @end
  690. #pragma mark - FSTBetaWatchStream
  691. @implementation FSTBetaWatchStream {
  692. FSTSerializerBeta *_serializer;
  693. }
  694. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  695. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  696. credentials:(id<FSTCredentialsProvider>)credentials
  697. serializer:(FSTSerializerBeta *)serializer
  698. delegate:(id<FSTWatchStreamDelegate>)delegate {
  699. self = [super initWithDatabase:database
  700. workerDispatchQueue:workerDispatchQueue
  701. credentials:credentials
  702. responseMessageClass:[GCFSListenResponse class]
  703. delegate:delegate];
  704. if (self) {
  705. _serializer = serializer;
  706. }
  707. return self;
  708. }
  709. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  710. return [[GRPCCall alloc] initWithHost:self.databaseInfo.host
  711. path:@"/google.firestore.v1beta1.Firestore/Listen"
  712. requestsWriter:requestsWriter];
  713. }
  714. - (void)watchQuery:(FSTQueryData *)query {
  715. FSTAssert([self isOpen], @"Not yet open");
  716. [self.workerDispatchQueue verifyIsCurrentQueue];
  717. GCFSListenRequest *request = [GCFSListenRequest message];
  718. request.database = [_serializer encodedDatabaseID];
  719. request.addTarget = [_serializer encodedTarget:query];
  720. request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
  721. FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request);
  722. [self writeRequest:request];
  723. }
  724. - (void)unwatchTargetID:(FSTTargetID)targetID {
  725. FSTAssert([self isOpen], @"Not yet open");
  726. [self.workerDispatchQueue verifyIsCurrentQueue];
  727. GCFSListenRequest *request = [GCFSListenRequest message];
  728. request.database = [_serializer encodedDatabaseID];
  729. request.removeTarget = targetID;
  730. FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request);
  731. [self writeRequest:request];
  732. }
  733. /**
  734. * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's
  735. * watchStreamDidChange:snapshotVersion: callback.
  736. */
  737. - (void)handleStreamMessage:(GCFSListenResponse *)proto {
  738. FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto);
  739. [self.workerDispatchQueue verifyIsCurrentQueue];
  740. // A successful response means the stream is healthy.
  741. [self.backoff reset];
  742. FSTWatchChange *change = [_serializer decodedWatchChange:proto];
  743. FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto];
  744. [self.delegate watchStreamDidChange:change snapshotVersion:snap];
  745. }
  746. @end
  747. #pragma mark - FSTWriteStream
  748. @implementation FSTWriteStream
  749. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  750. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  751. credentials:(id<FSTCredentialsProvider>)credentials
  752. responseMessageClass:(Class)responseMessageClass
  753. delegate:(id<FSTWriteStreamDelegate>)delegate {
  754. self = [super initWithDatabase:database
  755. workerDispatchQueue:workerDispatchQueue
  756. credentials:credentials
  757. responseMessageClass:responseMessageClass];
  758. if (self) {
  759. _delegate = delegate;
  760. }
  761. return self;
  762. }
  763. - (void)start {
  764. self.handshakeComplete = NO;
  765. [super start];
  766. }
  767. - (void)stop {
  768. // Clear the delegate to avoid any possible bleed through of events from GRPC.
  769. self.delegate = nil;
  770. [super stop];
  771. }
  772. - (void)writeHandshake {
  773. @throw FSTAbstractMethodException(); // NOLINT
  774. }
  775. - (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
  776. @throw FSTAbstractMethodException(); // NOLINT
  777. }
  778. - (void)handleStreamOpen {
  779. [self.delegate writeStreamDidOpen];
  780. }
  781. - (void)handleStreamClose:(NSError *_Nullable)error {
  782. [super handleStreamClose:error];
  783. [self.delegate writeStreamDidClose:error];
  784. }
  785. @end
  786. #pragma mark - FSTBetaWriteStream
  787. @implementation FSTBetaWriteStream {
  788. FSTSerializerBeta *_serializer;
  789. }
  790. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  791. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  792. credentials:(id<FSTCredentialsProvider>)credentials
  793. serializer:(FSTSerializerBeta *)serializer
  794. delegate:(id<FSTWriteStreamDelegate>)delegate {
  795. self = [super initWithDatabase:database
  796. workerDispatchQueue:workerDispatchQueue
  797. credentials:credentials
  798. responseMessageClass:[GCFSWriteResponse class]
  799. delegate:delegate];
  800. if (self) {
  801. _serializer = serializer;
  802. }
  803. return self;
  804. }
  805. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  806. return [[GRPCCall alloc] initWithHost:self.databaseInfo.host
  807. path:@"/google.firestore.v1beta1.Firestore/Write"
  808. requestsWriter:requestsWriter];
  809. }
  810. - (void)writeHandshake {
  811. // The initial request cannot contain mutations, but must contain a projectID.
  812. FSTAssert([self isOpen], @"Not yet open");
  813. FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn");
  814. [self.workerDispatchQueue verifyIsCurrentQueue];
  815. GCFSWriteRequest *request = [GCFSWriteRequest message];
  816. request.database = [_serializer encodedDatabaseID];
  817. // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
  818. // handshake, ignoring any stream token we might have.
  819. FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request);
  820. [self writeRequest:request];
  821. }
  822. - (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
  823. FSTAssert([self isOpen], @"Not yet open");
  824. FSTAssert(self.handshakeComplete, @"Mutations sent out of turn");
  825. [self.workerDispatchQueue verifyIsCurrentQueue];
  826. NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
  827. for (FSTMutation *mutation in mutations) {
  828. [protos addObject:[_serializer encodedMutation:mutation]];
  829. };
  830. GCFSWriteRequest *request = [GCFSWriteRequest message];
  831. request.writesArray = protos;
  832. request.streamToken = self.lastStreamToken;
  833. FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request);
  834. [self writeRequest:request];
  835. }
  836. /**
  837. * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass
  838. * that on to the mutationResultsHandler.
  839. */
  840. - (void)handleStreamMessage:(GCFSWriteResponse *)response {
  841. FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response);
  842. [self.workerDispatchQueue verifyIsCurrentQueue];
  843. // A successful response means the stream is healthy.
  844. [self.backoff reset];
  845. // Always capture the last stream token.
  846. self.lastStreamToken = response.streamToken;
  847. if (!self.handshakeComplete) {
  848. // The first response is the handshake response
  849. self.handshakeComplete = YES;
  850. [self.delegate writeStreamDidCompleteHandshake];
  851. } else {
  852. FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime];
  853. NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray;
  854. NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count];
  855. for (GCFSWriteResult *proto in protos) {
  856. [results addObject:[_serializer decodedMutationResult:proto]];
  857. };
  858. [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
  859. }
  860. }
  861. @end
  862. NS_ASSUME_NONNULL_END