FSTStream.mm 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Source/Remote/FSTDatastore.h"
  17. #import <GRPCClient/GRPCCall+OAuth2.h>
  18. #import <GRPCClient/GRPCCall.h>
  19. #import "FIRFirestoreErrors.h"
  20. #import "Firestore/Source/API/FIRFirestore+Internal.h"
  21. #import "Firestore/Source/Auth/FSTCredentialsProvider.h"
  22. #import "Firestore/Source/Local/FSTQueryData.h"
  23. #import "Firestore/Source/Model/FSTMutation.h"
  24. #import "Firestore/Source/Remote/FSTBufferedWriter.h"
  25. #import "Firestore/Source/Remote/FSTExponentialBackoff.h"
  26. #import "Firestore/Source/Remote/FSTSerializerBeta.h"
  27. #import "Firestore/Source/Remote/FSTStream.h"
  28. #import "Firestore/Source/Util/FSTAssert.h"
  29. #import "Firestore/Source/Util/FSTClasses.h"
  30. #import "Firestore/Source/Util/FSTDispatchQueue.h"
  31. #import "Firestore/Source/Util/FSTLogger.h"
  32. #import "Firestore/Protos/objc/google/firestore/v1beta1/Firestore.pbrpc.h"
  33. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  34. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  35. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  36. namespace util = firebase::firestore::util;
  37. using firebase::firestore::core::DatabaseInfo;
  38. using firebase::firestore::model::DatabaseId;
  39. /**
  40. * Initial backoff time in seconds after an error.
  41. * Set to 1s according to https://cloud.google.com/apis/design/errors.
  42. */
  43. static const NSTimeInterval kBackoffInitialDelay = 1;
  44. static const NSTimeInterval kBackoffMaxDelay = 60.0;
  45. static const double kBackoffFactor = 1.5;
  46. #pragma mark - FSTStream
  47. /** The state of a stream. */
  48. typedef NS_ENUM(NSInteger, FSTStreamState) {
  49. /**
  50. * The streaming RPC is not running and there's no error condition. Calling `start` will
  51. * start the stream immediately without backoff. While in this state -isStarted will return NO.
  52. */
  53. FSTStreamStateInitial = 0,
  54. /**
  55. * The stream is starting, and is waiting for an auth token to attach to the initial request.
  56. * While in this state, isStarted will return YES but isOpen will return NO.
  57. */
  58. FSTStreamStateAuth,
  59. /**
  60. * The streaming RPC is up and running. Requests and responses can flow freely. Both
  61. * isStarted and isOpen will return YES.
  62. */
  63. FSTStreamStateOpen,
  64. /**
  65. * The stream encountered an error. The next start attempt will back off. While in this state
  66. * -isStarted will return NO.
  67. */
  68. FSTStreamStateError,
  69. /**
  70. * An in-between state after an error where the stream is waiting before re-starting. After
  71. * waiting is complete, the stream will try to open. While in this state -isStarted will
  72. * return YES but isOpen will return NO.
  73. */
  74. FSTStreamStateBackoff,
  75. /**
  76. * The stream has been explicitly stopped; no further events will be emitted.
  77. */
  78. FSTStreamStateStopped,
  79. };
  80. // We need to declare these classes first so that Datastore can alloc them.
  81. @interface FSTWatchStream ()
  82. /**
  83. * Initializes the watch stream with its dependencies.
  84. */
  85. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  86. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  87. credentials:(id<FSTCredentialsProvider>)credentials
  88. serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
  89. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  90. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  91. credentials:(id<FSTCredentialsProvider>)credentials
  92. responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
  93. @end
  94. @interface FSTStream ()
  95. @property(nonatomic, getter=isIdle) BOOL idle;
  96. @property(nonatomic, weak, readwrite, nullable) id delegate;
  97. @end
  98. @interface FSTStream () <GRXWriteable>
  99. // Does not own this DatabaseInfo.
  100. @property(nonatomic, assign, readonly) const DatabaseInfo *databaseInfo;
  101. @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
  102. @property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
  103. @property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass;
  104. @property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff;
  105. /** A flag tracking whether the stream received a message from the backend. */
  106. @property(nonatomic, assign) BOOL messageReceived;
  107. /**
  108. * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the
  109. * state of the stream.
  110. */
  111. @property(nonatomic, assign) FSTStreamState state;
  112. /** The RPC handle. Used for cancellation. */
  113. @property(nonatomic, strong, nullable) GRPCCall *rpc;
  114. /**
  115. * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has
  116. * started.
  117. */
  118. @property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter;
  119. @end
  120. #pragma mark - FSTCallbackFilter
  121. /** Filter class that allows disabling of GRPC callbacks. */
  122. @interface FSTCallbackFilter : NSObject <GRXWriteable>
  123. - (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
  124. - (instancetype)init NS_UNAVAILABLE;
  125. @property(atomic, readwrite) BOOL callbacksEnabled;
  126. @property(nonatomic, strong, readonly) FSTStream *stream;
  127. @end
  128. @implementation FSTCallbackFilter
  129. - (instancetype)initWithStream:(FSTStream *)stream {
  130. if (self = [super init]) {
  131. _callbacksEnabled = YES;
  132. _stream = stream;
  133. }
  134. return self;
  135. }
  136. - (void)suppressCallbacks {
  137. _callbacksEnabled = NO;
  138. }
  139. - (void)writeValue:(id)value {
  140. if (_callbacksEnabled) {
  141. [self.stream writeValue:value];
  142. }
  143. }
  144. - (void)writesFinishedWithError:(NSError *)errorOrNil {
  145. if (_callbacksEnabled) {
  146. [self.stream writesFinishedWithError:errorOrNil];
  147. }
  148. }
  149. @end
  150. #pragma mark - FSTStream
  151. @interface FSTStream ()
  152. @property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter;
  153. @end
  154. @implementation FSTStream
  155. /** The time a stream stays open after it is marked idle. */
  156. static const NSTimeInterval kIdleTimeout = 60.0;
  157. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  158. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  159. credentials:(id<FSTCredentialsProvider>)credentials
  160. responseMessageClass:(Class)responseMessageClass {
  161. if (self = [super init]) {
  162. _databaseInfo = database;
  163. _workerDispatchQueue = workerDispatchQueue;
  164. _credentials = credentials;
  165. _responseMessageClass = responseMessageClass;
  166. _backoff = [FSTExponentialBackoff exponentialBackoffWithDispatchQueue:workerDispatchQueue
  167. initialDelay:kBackoffInitialDelay
  168. backoffFactor:kBackoffFactor
  169. maxDelay:kBackoffMaxDelay];
  170. _state = FSTStreamStateInitial;
  171. }
  172. return self;
  173. }
  174. - (BOOL)isStarted {
  175. [self.workerDispatchQueue verifyIsCurrentQueue];
  176. FSTStreamState state = self.state;
  177. return state == FSTStreamStateBackoff || state == FSTStreamStateAuth ||
  178. state == FSTStreamStateOpen;
  179. }
  180. - (BOOL)isOpen {
  181. [self.workerDispatchQueue verifyIsCurrentQueue];
  182. return self.state == FSTStreamStateOpen;
  183. }
  184. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  185. @throw FSTAbstractMethodException(); // NOLINT
  186. }
  187. - (void)startWithDelegate:(id)delegate {
  188. [self.workerDispatchQueue verifyIsCurrentQueue];
  189. if (self.state == FSTStreamStateError) {
  190. [self performBackoffWithDelegate:delegate];
  191. return;
  192. }
  193. FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self);
  194. FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
  195. self.state = FSTStreamStateAuth;
  196. FSTAssert(_delegate == nil, @"Delegate must be nil");
  197. _delegate = delegate;
  198. [self.credentials
  199. getTokenForcingRefresh:NO
  200. completion:^(FSTGetTokenResult *_Nullable result, NSError *_Nullable error) {
  201. error = [FSTDatastore firestoreErrorForError:error];
  202. [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
  203. [self resumeStartWithToken:result error:error];
  204. }];
  205. }];
  206. }
  207. /** Add an access token to our RPC, after obtaining one from the credentials provider. */
  208. - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error {
  209. if (self.state == FSTStreamStateStopped) {
  210. // Streams can be stopped while waiting for authorization.
  211. return;
  212. }
  213. [self.workerDispatchQueue verifyIsCurrentQueue];
  214. FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
  215. (long)self.state);
  216. // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
  217. // but I'm not sure how to detect that right now. http://b/32762461
  218. if (error) {
  219. // RPC has not been started yet, so just invoke higher-level close handler.
  220. [self handleStreamClose:error];
  221. return;
  222. }
  223. self.requestsWriter = [[FSTBufferedWriter alloc] init];
  224. _rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
  225. [FSTDatastore prepareHeadersForRPC:_rpc
  226. databaseID:&self.databaseInfo->database_id()
  227. token:token.token];
  228. FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil");
  229. _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self];
  230. [_rpc startWithWriteable:_callbackFilter];
  231. self.state = FSTStreamStateOpen;
  232. [self notifyStreamOpen];
  233. }
  234. /** Backs off after an error. */
  235. - (void)performBackoffWithDelegate:(id)delegate {
  236. FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
  237. [self.workerDispatchQueue verifyIsCurrentQueue];
  238. FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case");
  239. self.state = FSTStreamStateBackoff;
  240. FSTWeakify(self);
  241. [self.backoff backoffAndRunBlock:^{
  242. FSTStrongify(self);
  243. [self resumeStartFromBackoffWithDelegate:delegate];
  244. }];
  245. }
  246. /** Resumes stream start after backing off. */
  247. - (void)resumeStartFromBackoffWithDelegate:(id)delegate {
  248. if (self.state == FSTStreamStateStopped) {
  249. // Streams can be stopped while waiting for backoff to complete.
  250. return;
  251. }
  252. // In order to have performed a backoff the stream must have been in an error state just prior
  253. // to entering the backoff state. If we weren't stopped we must be in the backoff state.
  254. FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)",
  255. (long)self.state);
  256. // Momentarily set state to FSTStreamStateInitial as `start` expects it.
  257. self.state = FSTStreamStateInitial;
  258. [self startWithDelegate:delegate];
  259. FSTAssert([self isStarted], @"Stream should have started.");
  260. }
  261. /**
  262. * Can be overridden to perform additional cleanup before the stream is closed. Calling
  263. * [super tearDown] is not required.
  264. */
  265. - (void)tearDown {
  266. }
  267. /**
  268. * Closes the stream and cleans up as necessary:
  269. *
  270. * * closes the underlying GRPC stream;
  271. * * calls the onClose handler with the given 'error';
  272. * * sets internal stream state to 'finalState';
  273. * * adjusts the backoff timer based on the error
  274. *
  275. * A new stream can be opened by calling `start` unless `finalState` is set to
  276. * `FSTStreamStateStopped`.
  277. *
  278. * @param finalState the intended state of the stream after closing.
  279. * @param error the NSError the connection was closed with.
  280. */
  281. - (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error {
  282. FSTAssert(finalState == FSTStreamStateError || error == nil,
  283. @"Can't provide an error when not in an error state.");
  284. [self.workerDispatchQueue verifyIsCurrentQueue];
  285. [self cancelIdleCheck];
  286. if (finalState != FSTStreamStateError) {
  287. // If this is an intentional close ensure we don't delay our next connection attempt.
  288. [self.backoff reset];
  289. } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) {
  290. FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
  291. (__bridge void *)self);
  292. [self.backoff resetToMax];
  293. }
  294. [self tearDown];
  295. if (self.requestsWriter) {
  296. // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
  297. // call half-close to avoid secondary failures.
  298. if (finalState != FSTStreamStateError) {
  299. FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self);
  300. @synchronized(self.requestsWriter) {
  301. [self.requestsWriter finishWithError:nil];
  302. }
  303. }
  304. _requestsWriter = nil;
  305. }
  306. // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to
  307. // inhibit backoff or otherwise manipulate the state in its non-started state.
  308. self.state = finalState;
  309. [self.callbackFilter suppressCallbacks];
  310. _callbackFilter = nil;
  311. // Clean up remaining state.
  312. _messageReceived = NO;
  313. _rpc = nil;
  314. // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it
  315. // could trigger undesirable recovery logic, etc.).
  316. if (finalState != FSTStreamStateStopped) {
  317. [self notifyStreamInterruptedWithError:error];
  318. }
  319. // Clear the delegates to avoid any possible bleed through of events from GRPC.
  320. _delegate = nil;
  321. }
  322. - (void)stop {
  323. FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
  324. if ([self isStarted]) {
  325. [self closeWithFinalState:FSTStreamStateStopped error:nil];
  326. }
  327. }
  328. - (void)inhibitBackoff {
  329. FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)",
  330. (long)self.state);
  331. [self.workerDispatchQueue verifyIsCurrentQueue];
  332. // Clear the error condition.
  333. self.state = FSTStreamStateInitial;
  334. [self.backoff reset];
  335. }
  336. /** Called by the idle timer when the stream should close due to inactivity. */
  337. - (void)handleIdleCloseTimer {
  338. [self.workerDispatchQueue verifyIsCurrentQueue];
  339. if (self.state == FSTStreamStateOpen && [self isIdle]) {
  340. // When timing out an idle stream there's no reason to force the stream into backoff when
  341. // it restarts so set the stream state to Initial instead of Error.
  342. [self closeWithFinalState:FSTStreamStateInitial error:nil];
  343. }
  344. }
  345. - (void)markIdle {
  346. [self.workerDispatchQueue verifyIsCurrentQueue];
  347. if (self.state == FSTStreamStateOpen) {
  348. self.idle = YES;
  349. [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout
  350. block:^() {
  351. [self handleIdleCloseTimer];
  352. }];
  353. }
  354. }
  355. - (void)cancelIdleCheck {
  356. [self.workerDispatchQueue verifyIsCurrentQueue];
  357. self.idle = NO;
  358. }
  359. /**
  360. * Parses a protocol buffer response from the server. If the message fails to parse, generates
  361. * an error and closes the stream.
  362. *
  363. * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:.
  364. * @param data The bytes in the response as returned from GRPC.
  365. * @return An instance of the protocol buffer message, parsed from the data if parsing was
  366. * successful, or nil otherwise.
  367. */
  368. - (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error {
  369. NSError *parseError;
  370. id parsed = [protoClass parseFromData:data error:&parseError];
  371. if (parsed) {
  372. *error = nil;
  373. return parsed;
  374. } else {
  375. NSDictionary *info = @{
  376. NSLocalizedDescriptionKey : @"Unable to parse response from the server",
  377. NSUnderlyingErrorKey : parseError,
  378. @"Expected class" : protoClass,
  379. @"Received value" : data,
  380. };
  381. *error = [NSError errorWithDomain:FIRFirestoreErrorDomain
  382. code:FIRFirestoreErrorCodeInternal
  383. userInfo:info];
  384. return nil;
  385. }
  386. }
  387. /**
  388. * Writes a request proto into the stream.
  389. */
  390. - (void)writeRequest:(GPBMessage *)request {
  391. NSData *data = [request data];
  392. [self cancelIdleCheck];
  393. FSTBufferedWriter *requestsWriter = self.requestsWriter;
  394. @synchronized(requestsWriter) {
  395. [requestsWriter writeValue:data];
  396. }
  397. }
  398. #pragma mark Template methods for subclasses
  399. /**
  400. * Called by the stream after the stream has opened.
  401. *
  402. * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is
  403. * not required.
  404. */
  405. - (void)notifyStreamOpen {
  406. }
  407. /**
  408. * Called by the stream after the stream has been unexpectedly interrupted, either due to an error
  409. * or due to idleness.
  410. *
  411. * Subclasses should relay to their stream-specific delegate. Calling [super
  412. * notifyStreamInterrupted] is not required.
  413. */
  414. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  415. }
  416. /**
  417. * Called by the stream for each incoming protocol message coming from the server.
  418. *
  419. * Subclasses should implement this to deserialize the value and relay to their stream-specific
  420. * delegate, if appropriate. Calling [super handleStreamMessage] is not required.
  421. */
  422. - (void)handleStreamMessage:(id)value {
  423. }
  424. /**
  425. * Called by the stream when the underlying RPC has been closed for whatever reason.
  426. */
  427. - (void)handleStreamClose:(nullable NSError *)error {
  428. FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
  429. if (![self isStarted]) { // The stream could have already been closed by the idle close timer.
  430. FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class]));
  431. return;
  432. }
  433. // In theory the stream could close cleanly, however, in our current model we never expect this
  434. // to happen because if we stop a stream ourselves, this callback will never be called. To
  435. // prevent cases where we retry without a backoff accidentally, we set the stream to error
  436. // in all cases.
  437. [self closeWithFinalState:FSTStreamStateError error:error];
  438. }
  439. #pragma mark GRXWriteable implementation
  440. // The GRXWriteable implementation defines the receive side of the RPC stream.
  441. /**
  442. * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
  443. * redispatch back onto our own worker queue.
  444. */
  445. - (void)writeValue:(id)value __used {
  446. // TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
  447. // Once released we can set the responseDispatchQueue property on the GRPCCall and then this
  448. // method can call handleStreamMessage directly.
  449. FSTWeakify(self);
  450. [self.workerDispatchQueue dispatchAsync:^{
  451. FSTStrongify(self);
  452. if (![self isStarted]) {
  453. FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class]));
  454. }
  455. if (!self.messageReceived) {
  456. self.messageReceived = YES;
  457. if ([FIRFirestore isLoggingEnabled]) {
  458. FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
  459. (__bridge void *)self,
  460. [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
  461. }
  462. }
  463. NSError *error;
  464. id proto = [self parseProto:self.responseMessageClass data:value error:&error];
  465. if (proto) {
  466. [self handleStreamMessage:proto];
  467. } else {
  468. [_rpc finishWithError:error];
  469. }
  470. }];
  471. }
  472. /**
  473. * Called by GRPC when it closed the stream with an error representing the final state of the
  474. * stream.
  475. *
  476. * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
  477. * directly inform stream-specific logic, or call stop to tear down the stream.
  478. */
  479. - (void)writesFinishedWithError:(nullable NSError *)error __used {
  480. error = [FSTDatastore firestoreErrorForError:error];
  481. FSTWeakify(self);
  482. [self.workerDispatchQueue dispatchAsync:^{
  483. FSTStrongify(self);
  484. if (!self || self.state == FSTStreamStateStopped) {
  485. return;
  486. }
  487. [self handleStreamClose:error];
  488. }];
  489. }
  490. @end
  491. #pragma mark - FSTWatchStream
  492. @interface FSTWatchStream ()
  493. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  494. @end
  495. @implementation FSTWatchStream
  496. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  497. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  498. credentials:(id<FSTCredentialsProvider>)credentials
  499. serializer:(FSTSerializerBeta *)serializer {
  500. self = [super initWithDatabase:database
  501. workerDispatchQueue:workerDispatchQueue
  502. credentials:credentials
  503. responseMessageClass:[GCFSListenResponse class]];
  504. if (self) {
  505. _serializer = serializer;
  506. }
  507. return self;
  508. }
  509. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  510. return [[GRPCCall alloc] initWithHost:util::WrapNSStringNoCopy(self.databaseInfo->host())
  511. path:@"/google.firestore.v1beta1.Firestore/Listen"
  512. requestsWriter:requestsWriter];
  513. }
  514. - (void)notifyStreamOpen {
  515. [self.delegate watchStreamDidOpen];
  516. }
  517. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  518. id<FSTWatchStreamDelegate> delegate = self.delegate;
  519. self.delegate = nil;
  520. [delegate watchStreamWasInterruptedWithError:error];
  521. }
  522. - (void)watchQuery:(FSTQueryData *)query {
  523. FSTAssert([self isOpen], @"Not yet open");
  524. [self.workerDispatchQueue verifyIsCurrentQueue];
  525. GCFSListenRequest *request = [GCFSListenRequest message];
  526. request.database = [_serializer encodedDatabaseID];
  527. request.addTarget = [_serializer encodedTarget:query];
  528. request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
  529. FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request);
  530. [self writeRequest:request];
  531. }
  532. - (void)unwatchTargetID:(FSTTargetID)targetID {
  533. FSTAssert([self isOpen], @"Not yet open");
  534. [self.workerDispatchQueue verifyIsCurrentQueue];
  535. GCFSListenRequest *request = [GCFSListenRequest message];
  536. request.database = [_serializer encodedDatabaseID];
  537. request.removeTarget = targetID;
  538. FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request);
  539. [self writeRequest:request];
  540. }
  541. /**
  542. * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's
  543. * watchStreamDidChange:snapshotVersion: callback.
  544. */
  545. - (void)handleStreamMessage:(GCFSListenResponse *)proto {
  546. FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto);
  547. [self.workerDispatchQueue verifyIsCurrentQueue];
  548. // A successful response means the stream is healthy.
  549. [self.backoff reset];
  550. FSTWatchChange *change = [_serializer decodedWatchChange:proto];
  551. FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto];
  552. [self.delegate watchStreamDidChange:change snapshotVersion:snap];
  553. }
  554. @end
  555. #pragma mark - FSTWriteStream
  556. @interface FSTWriteStream ()
  557. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  558. @end
  559. @implementation FSTWriteStream
  560. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  561. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  562. credentials:(id<FSTCredentialsProvider>)credentials
  563. serializer:(FSTSerializerBeta *)serializer {
  564. self = [super initWithDatabase:database
  565. workerDispatchQueue:workerDispatchQueue
  566. credentials:credentials
  567. responseMessageClass:[GCFSWriteResponse class]];
  568. if (self) {
  569. _serializer = serializer;
  570. }
  571. return self;
  572. }
  573. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  574. return [[GRPCCall alloc] initWithHost:util::WrapNSStringNoCopy(self.databaseInfo->host())
  575. path:@"/google.firestore.v1beta1.Firestore/Write"
  576. requestsWriter:requestsWriter];
  577. }
  578. - (void)startWithDelegate:(id)delegate {
  579. self.handshakeComplete = NO;
  580. [super startWithDelegate:delegate];
  581. }
  582. - (void)notifyStreamOpen {
  583. [self.delegate writeStreamDidOpen];
  584. }
  585. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  586. id<FSTWriteStreamDelegate> delegate = self.delegate;
  587. self.delegate = nil;
  588. [delegate writeStreamWasInterruptedWithError:error];
  589. }
  590. - (void)tearDown {
  591. if ([self isHandshakeComplete]) {
  592. // Send an empty write request to the backend to indicate imminent stream closure. This allows
  593. // the backend to clean up resources.
  594. [self writeMutations:@[]];
  595. }
  596. }
  597. - (void)writeHandshake {
  598. // The initial request cannot contain mutations, but must contain a projectID.
  599. FSTAssert([self isOpen], @"Not yet open");
  600. FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn");
  601. [self.workerDispatchQueue verifyIsCurrentQueue];
  602. GCFSWriteRequest *request = [GCFSWriteRequest message];
  603. request.database = [_serializer encodedDatabaseID];
  604. // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
  605. // handshake, ignoring any stream token we might have.
  606. FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request);
  607. [self writeRequest:request];
  608. }
  609. - (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
  610. FSTAssert([self isOpen], @"Not yet open");
  611. FSTAssert(self.handshakeComplete, @"Mutations sent out of turn");
  612. [self.workerDispatchQueue verifyIsCurrentQueue];
  613. NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
  614. for (FSTMutation *mutation in mutations) {
  615. [protos addObject:[_serializer encodedMutation:mutation]];
  616. };
  617. GCFSWriteRequest *request = [GCFSWriteRequest message];
  618. request.writesArray = protos;
  619. request.streamToken = self.lastStreamToken;
  620. FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request);
  621. [self writeRequest:request];
  622. }
  623. /**
  624. * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass
  625. * that on to the mutationResultsHandler.
  626. */
  627. - (void)handleStreamMessage:(GCFSWriteResponse *)response {
  628. FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response);
  629. [self.workerDispatchQueue verifyIsCurrentQueue];
  630. // Always capture the last stream token.
  631. self.lastStreamToken = response.streamToken;
  632. if (!self.isHandshakeComplete) {
  633. // The first response is the handshake response
  634. self.handshakeComplete = YES;
  635. [self.delegate writeStreamDidCompleteHandshake];
  636. } else {
  637. // A successful first write response means the stream is healthy.
  638. // Note that we could consider a successful handshake healthy, however, the write itself
  639. // might be causing an error we want to back off from.
  640. [self.backoff reset];
  641. FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime];
  642. NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray;
  643. NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count];
  644. for (GCFSWriteResult *proto in protos) {
  645. [results addObject:[_serializer decodedMutationResult:proto]];
  646. };
  647. [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
  648. }
  649. }
  650. @end