FSTStream.mm 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Source/Remote/FSTDatastore.h"
  17. #import <GRPCClient/GRPCCall+OAuth2.h>
  18. #import <GRPCClient/GRPCCall.h>
  19. #import "FIRFirestoreErrors.h"
  20. #import "Firestore/Source/API/FIRFirestore+Internal.h"
  21. #import "Firestore/Source/Auth/FSTCredentialsProvider.h"
  22. #import "Firestore/Source/Local/FSTQueryData.h"
  23. #import "Firestore/Source/Model/FSTMutation.h"
  24. #import "Firestore/Source/Remote/FSTBufferedWriter.h"
  25. #import "Firestore/Source/Remote/FSTExponentialBackoff.h"
  26. #import "Firestore/Source/Remote/FSTSerializerBeta.h"
  27. #import "Firestore/Source/Remote/FSTStream.h"
  28. #import "Firestore/Source/Util/FSTAssert.h"
  29. #import "Firestore/Source/Util/FSTClasses.h"
  30. #import "Firestore/Source/Util/FSTDispatchQueue.h"
  31. #import "Firestore/Source/Util/FSTLogger.h"
  32. #import "Firestore/Protos/objc/google/firestore/v1beta1/Firestore.pbrpc.h"
  33. #include "Firestore/core/src/firebase/firestore/auth/token.h"
  34. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  35. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  36. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  37. namespace util = firebase::firestore::util;
  38. using firebase::firestore::auth::Token;
  39. using firebase::firestore::core::DatabaseInfo;
  40. using firebase::firestore::model::DatabaseId;
  41. /**
  42. * Initial backoff time in seconds after an error.
  43. * Set to 1s according to https://cloud.google.com/apis/design/errors.
  44. */
  45. static const NSTimeInterval kBackoffInitialDelay = 1;
  46. static const NSTimeInterval kBackoffMaxDelay = 60.0;
  47. static const double kBackoffFactor = 1.5;
  48. #pragma mark - FSTStream
  49. /** The state of a stream. */
  50. typedef NS_ENUM(NSInteger, FSTStreamState) {
  51. /**
  52. * The streaming RPC is not running and there's no error condition. Calling `start` will
  53. * start the stream immediately without backoff. While in this state -isStarted will return NO.
  54. */
  55. FSTStreamStateInitial = 0,
  56. /**
  57. * The stream is starting, and is waiting for an auth token to attach to the initial request.
  58. * While in this state, isStarted will return YES but isOpen will return NO.
  59. */
  60. FSTStreamStateAuth,
  61. /**
  62. * The streaming RPC is up and running. Requests and responses can flow freely. Both
  63. * isStarted and isOpen will return YES.
  64. */
  65. FSTStreamStateOpen,
  66. /**
  67. * The stream encountered an error. The next start attempt will back off. While in this state
  68. * -isStarted will return NO.
  69. */
  70. FSTStreamStateError,
  71. /**
  72. * An in-between state after an error where the stream is waiting before re-starting. After
  73. * waiting is complete, the stream will try to open. While in this state -isStarted will
  74. * return YES but isOpen will return NO.
  75. */
  76. FSTStreamStateBackoff,
  77. /**
  78. * The stream has been explicitly stopped; no further events will be emitted.
  79. */
  80. FSTStreamStateStopped,
  81. };
  82. // We need to declare these classes first so that Datastore can alloc them.
  83. @interface FSTWatchStream ()
  84. /**
  85. * Initializes the watch stream with its dependencies.
  86. */
  87. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  88. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  89. credentials:(id<FSTCredentialsProvider>)credentials
  90. serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
  91. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  92. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  93. credentials:(id<FSTCredentialsProvider>)credentials
  94. responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
  95. @end
  96. @interface FSTStream ()
  97. @property(nonatomic, assign, readonly) FSTTimerID idleTimerID;
  98. @property(nonatomic, strong, nullable) FSTDelayedCallback *idleTimerCallback;
  99. @property(nonatomic, weak, readwrite, nullable) id delegate;
  100. @end
  101. @interface FSTStream () <GRXWriteable>
  102. // Does not own this DatabaseInfo.
  103. @property(nonatomic, assign, readonly) const DatabaseInfo *databaseInfo;
  104. @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
  105. @property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
  106. @property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass;
  107. @property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff;
  108. /** A flag tracking whether the stream received a message from the backend. */
  109. @property(nonatomic, assign) BOOL messageReceived;
  110. /**
  111. * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the
  112. * state of the stream.
  113. */
  114. @property(nonatomic, assign) FSTStreamState state;
  115. /** The RPC handle. Used for cancellation. */
  116. @property(nonatomic, strong, nullable) GRPCCall *rpc;
  117. /**
  118. * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has
  119. * started.
  120. */
  121. @property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter;
  122. @end
  123. #pragma mark - FSTCallbackFilter
  124. /**
  125. * Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main
  126. * FSTStream to allow the stream to be stopped externally (either by the user or via idle timer)
  127. * and be able to completely prevent any subsequent events from gRPC from calling back into the
  128. * FSTSTream.
  129. */
  130. @interface FSTCallbackFilter : NSObject <GRXWriteable>
  131. - (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
  132. - (instancetype)init NS_UNAVAILABLE;
  133. @property(atomic, readwrite) BOOL callbacksEnabled;
  134. @property(nonatomic, strong, readonly) FSTStream *stream;
  135. @end
  136. @implementation FSTCallbackFilter
  137. - (instancetype)initWithStream:(FSTStream *)stream {
  138. if (self = [super init]) {
  139. _callbacksEnabled = YES;
  140. _stream = stream;
  141. }
  142. return self;
  143. }
  144. - (void)suppressCallbacks {
  145. _callbacksEnabled = NO;
  146. }
  147. - (void)writeValue:(id)value {
  148. if (_callbacksEnabled) {
  149. [self.stream writeValue:value];
  150. }
  151. }
  152. - (void)writesFinishedWithError:(NSError *)errorOrNil {
  153. if (_callbacksEnabled) {
  154. [self.stream writesFinishedWithError:errorOrNil];
  155. }
  156. }
  157. @end
  158. #pragma mark - FSTStream
  159. @interface FSTStream ()
  160. @property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter;
  161. @end
  162. @implementation FSTStream
  163. /** The time a stream stays open after it is marked idle. */
  164. static const NSTimeInterval kIdleTimeout = 60.0;
  165. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  166. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  167. connectionTimerID:(FSTTimerID)connectionTimerID
  168. idleTimerID:(FSTTimerID)idleTimerID
  169. credentials:(id<FSTCredentialsProvider>)credentials
  170. responseMessageClass:(Class)responseMessageClass {
  171. if (self = [super init]) {
  172. _databaseInfo = database;
  173. _workerDispatchQueue = workerDispatchQueue;
  174. _idleTimerID = idleTimerID;
  175. _credentials = credentials;
  176. _responseMessageClass = responseMessageClass;
  177. _backoff = [[FSTExponentialBackoff alloc] initWithDispatchQueue:workerDispatchQueue
  178. timerID:connectionTimerID
  179. initialDelay:kBackoffInitialDelay
  180. backoffFactor:kBackoffFactor
  181. maxDelay:kBackoffMaxDelay];
  182. _state = FSTStreamStateInitial;
  183. }
  184. return self;
  185. }
  186. - (BOOL)isStarted {
  187. [self.workerDispatchQueue verifyIsCurrentQueue];
  188. FSTStreamState state = self.state;
  189. return state == FSTStreamStateBackoff || state == FSTStreamStateAuth ||
  190. state == FSTStreamStateOpen;
  191. }
  192. - (BOOL)isOpen {
  193. [self.workerDispatchQueue verifyIsCurrentQueue];
  194. return self.state == FSTStreamStateOpen;
  195. }
  196. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  197. @throw FSTAbstractMethodException(); // NOLINT
  198. }
  199. - (void)startWithDelegate:(id)delegate {
  200. [self.workerDispatchQueue verifyIsCurrentQueue];
  201. if (self.state == FSTStreamStateError) {
  202. [self performBackoffWithDelegate:delegate];
  203. return;
  204. }
  205. FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self);
  206. FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
  207. self.state = FSTStreamStateAuth;
  208. FSTAssert(_delegate == nil, @"Delegate must be nil");
  209. _delegate = delegate;
  210. [self.credentials getTokenForcingRefresh:NO
  211. completion:^(Token result, NSError *_Nullable error) {
  212. error = [FSTDatastore firestoreErrorForError:error];
  213. [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
  214. [self resumeStartWithToken:result error:error];
  215. }];
  216. }];
  217. }
  218. /** Add an access token to our RPC, after obtaining one from the credentials provider. */
  219. - (void)resumeStartWithToken:(const Token &)token error:(NSError *)error {
  220. [self.workerDispatchQueue verifyIsCurrentQueue];
  221. if (self.state == FSTStreamStateStopped) {
  222. // Streams can be stopped while waiting for authorization.
  223. return;
  224. }
  225. FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
  226. (long)self.state);
  227. // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
  228. // but I'm not sure how to detect that right now. http://b/32762461
  229. if (error) {
  230. // RPC has not been started yet, so just invoke higher-level close handler.
  231. [self handleStreamClose:error];
  232. return;
  233. }
  234. self.requestsWriter = [[FSTBufferedWriter alloc] init];
  235. _rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
  236. [_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue];
  237. [FSTDatastore prepareHeadersForRPC:_rpc
  238. databaseID:&self.databaseInfo->database_id()
  239. token:(token.user().is_authenticated() ? token.token() : absl::string_view())];
  240. FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil");
  241. _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self];
  242. [_rpc startWithWriteable:_callbackFilter];
  243. self.state = FSTStreamStateOpen;
  244. [self notifyStreamOpen];
  245. }
  246. /** Backs off after an error. */
  247. - (void)performBackoffWithDelegate:(id)delegate {
  248. FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
  249. [self.workerDispatchQueue verifyIsCurrentQueue];
  250. FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case");
  251. self.state = FSTStreamStateBackoff;
  252. FSTWeakify(self);
  253. [self.backoff backoffAndRunBlock:^{
  254. FSTStrongify(self);
  255. [self resumeStartFromBackoffWithDelegate:delegate];
  256. }];
  257. }
  258. /** Resumes stream start after backing off. */
  259. - (void)resumeStartFromBackoffWithDelegate:(id)delegate {
  260. if (self.state == FSTStreamStateStopped) {
  261. // Streams can be stopped while waiting for backoff to complete.
  262. return;
  263. }
  264. // In order to have performed a backoff the stream must have been in an error state just prior
  265. // to entering the backoff state. If we weren't stopped we must be in the backoff state.
  266. FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)",
  267. (long)self.state);
  268. // Momentarily set state to FSTStreamStateInitial as `start` expects it.
  269. self.state = FSTStreamStateInitial;
  270. [self startWithDelegate:delegate];
  271. FSTAssert([self isStarted], @"Stream should have started.");
  272. }
  273. /**
  274. * Can be overridden to perform additional cleanup before the stream is closed. Calling
  275. * [super tearDown] is not required.
  276. */
  277. - (void)tearDown {
  278. }
  279. /**
  280. * Closes the stream and cleans up as necessary:
  281. *
  282. * * closes the underlying GRPC stream;
  283. * * calls the onClose handler with the given 'error';
  284. * * sets internal stream state to 'finalState';
  285. * * adjusts the backoff timer based on the error
  286. *
  287. * A new stream can be opened by calling `start` unless `finalState` is set to
  288. * `FSTStreamStateStopped`.
  289. *
  290. * @param finalState the intended state of the stream after closing.
  291. * @param error the NSError the connection was closed with.
  292. */
  293. - (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error {
  294. FSTAssert(finalState == FSTStreamStateError || error == nil,
  295. @"Can't provide an error when not in an error state.");
  296. [self.workerDispatchQueue verifyIsCurrentQueue];
  297. [self cancelIdleCheck];
  298. if (finalState != FSTStreamStateError) {
  299. // If this is an intentional close ensure we don't delay our next connection attempt.
  300. [self.backoff reset];
  301. } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) {
  302. FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
  303. (__bridge void *)self);
  304. [self.backoff resetToMax];
  305. }
  306. if (finalState != FSTStreamStateError) {
  307. FSTLog(@"%@ %p Performing stream teardown", [self class], (__bridge void *)self);
  308. [self tearDown];
  309. }
  310. if (self.requestsWriter) {
  311. // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
  312. // call half-close to avoid secondary failures.
  313. if (finalState != FSTStreamStateError) {
  314. FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self);
  315. @synchronized(self.requestsWriter) {
  316. [self.requestsWriter finishWithError:nil];
  317. }
  318. }
  319. _requestsWriter = nil;
  320. }
  321. // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to
  322. // inhibit backoff or otherwise manipulate the state in its non-started state.
  323. self.state = finalState;
  324. [self.callbackFilter suppressCallbacks];
  325. _callbackFilter = nil;
  326. // Clean up remaining state.
  327. _messageReceived = NO;
  328. _rpc = nil;
  329. // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it
  330. // could trigger undesirable recovery logic, etc.).
  331. if (finalState != FSTStreamStateStopped) {
  332. [self notifyStreamInterruptedWithError:error];
  333. }
  334. // PORTING NOTE: notifyStreamInterruptedWithError may have restarted the stream with a new
  335. // delegate so we do /not/ want to clear the delegate here. And since we've already suppressed
  336. // callbacks via our callbackFilter, there is no worry about bleed through of events from GRPC.
  337. }
  338. - (void)stop {
  339. FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
  340. if ([self isStarted]) {
  341. [self closeWithFinalState:FSTStreamStateStopped error:nil];
  342. }
  343. }
  344. - (void)inhibitBackoff {
  345. FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)",
  346. (long)self.state);
  347. [self.workerDispatchQueue verifyIsCurrentQueue];
  348. // Clear the error condition.
  349. self.state = FSTStreamStateInitial;
  350. [self.backoff reset];
  351. }
  352. /** Called by the idle timer when the stream should close due to inactivity. */
  353. - (void)handleIdleCloseTimer {
  354. [self.workerDispatchQueue verifyIsCurrentQueue];
  355. if ([self isOpen]) {
  356. // When timing out an idle stream there's no reason to force the stream into backoff when
  357. // it restarts so set the stream state to Initial instead of Error.
  358. [self closeWithFinalState:FSTStreamStateInitial error:nil];
  359. }
  360. }
  361. - (void)markIdle {
  362. [self.workerDispatchQueue verifyIsCurrentQueue];
  363. // Starts the idle timer if we are in state 'Open' and are not yet already running a timer (in
  364. // which case the previous idle timeout still applies).
  365. if ([self isOpen] && !self.idleTimerCallback) {
  366. self.idleTimerCallback = [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout
  367. timerID:self.idleTimerID
  368. block:^() {
  369. [self handleIdleCloseTimer];
  370. }];
  371. }
  372. }
  373. - (void)cancelIdleCheck {
  374. [self.workerDispatchQueue verifyIsCurrentQueue];
  375. if (self.idleTimerCallback) {
  376. [self.idleTimerCallback cancel];
  377. self.idleTimerCallback = nil;
  378. }
  379. }
  380. /**
  381. * Parses a protocol buffer response from the server. If the message fails to parse, generates
  382. * an error and closes the stream.
  383. *
  384. * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:.
  385. * @param data The bytes in the response as returned from GRPC.
  386. * @return An instance of the protocol buffer message, parsed from the data if parsing was
  387. * successful, or nil otherwise.
  388. */
  389. - (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error {
  390. NSError *parseError;
  391. id parsed = [protoClass parseFromData:data error:&parseError];
  392. if (parsed) {
  393. *error = nil;
  394. return parsed;
  395. } else {
  396. NSDictionary *info = @{
  397. NSLocalizedDescriptionKey : @"Unable to parse response from the server",
  398. NSUnderlyingErrorKey : parseError,
  399. @"Expected class" : protoClass,
  400. @"Received value" : data,
  401. };
  402. *error = [NSError errorWithDomain:FIRFirestoreErrorDomain
  403. code:FIRFirestoreErrorCodeInternal
  404. userInfo:info];
  405. return nil;
  406. }
  407. }
  408. /**
  409. * Writes a request proto into the stream.
  410. */
  411. - (void)writeRequest:(GPBMessage *)request {
  412. NSData *data = [request data];
  413. [self cancelIdleCheck];
  414. FSTBufferedWriter *requestsWriter = self.requestsWriter;
  415. @synchronized(requestsWriter) {
  416. [requestsWriter writeValue:data];
  417. }
  418. }
  419. #pragma mark Template methods for subclasses
  420. /**
  421. * Called by the stream after the stream has opened.
  422. *
  423. * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is
  424. * not required.
  425. */
  426. - (void)notifyStreamOpen {
  427. }
  428. /**
  429. * Called by the stream after the stream has been unexpectedly interrupted, either due to an error
  430. * or due to idleness.
  431. *
  432. * Subclasses should relay to their stream-specific delegate. Calling [super
  433. * notifyStreamInterrupted] is not required.
  434. */
  435. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  436. }
  437. /**
  438. * Called by the stream for each incoming protocol message coming from the server.
  439. *
  440. * Subclasses should implement this to deserialize the value and relay to their stream-specific
  441. * delegate, if appropriate. Calling [super handleStreamMessage] is not required.
  442. */
  443. - (void)handleStreamMessage:(id)value {
  444. }
  445. /**
  446. * Called by the stream when the underlying RPC has been closed for whatever reason.
  447. */
  448. - (void)handleStreamClose:(nullable NSError *)error {
  449. FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
  450. FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");
  451. // In theory the stream could close cleanly, however, in our current model we never expect this
  452. // to happen because if we stop a stream ourselves, this callback will never be called. To
  453. // prevent cases where we retry without a backoff accidentally, we set the stream to error
  454. // in all cases.
  455. [self closeWithFinalState:FSTStreamStateError error:error];
  456. }
  457. #pragma mark GRXWriteable implementation
  458. // The GRXWriteable implementation defines the receive side of the RPC stream.
  459. /**
  460. * Called by GRPC when it publishes a value.
  461. *
  462. * GRPC must be configured to use our worker queue by calling
  463. * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
  464. * the RPC.
  465. */
  466. - (void)writeValue:(id)value {
  467. [self.workerDispatchQueue verifyIsCurrentQueue];
  468. FSTAssert([self isStarted], @"writeValue: called for stopped stream.");
  469. if (!self.messageReceived) {
  470. self.messageReceived = YES;
  471. if ([FIRFirestore isLoggingEnabled]) {
  472. FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
  473. (__bridge void *)self,
  474. [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
  475. }
  476. }
  477. NSError *error;
  478. id proto = [self parseProto:self.responseMessageClass data:value error:&error];
  479. if (proto) {
  480. [self handleStreamMessage:proto];
  481. } else {
  482. [_rpc finishWithError:error];
  483. }
  484. }
  485. /**
  486. * Called by GRPC when it closed the stream with an error representing the final state of the
  487. * stream.
  488. *
  489. * GRPC must be configured to use our worker queue by calling
  490. * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
  491. * the RPC.
  492. *
  493. * Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call
  494. * stop to tear down the stream.
  495. */
  496. - (void)writesFinishedWithError:(nullable NSError *)error __used {
  497. error = [FSTDatastore firestoreErrorForError:error];
  498. [self.workerDispatchQueue verifyIsCurrentQueue];
  499. FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");
  500. [self handleStreamClose:error];
  501. }
  502. @end
  503. #pragma mark - FSTWatchStream
  504. @interface FSTWatchStream ()
  505. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  506. @end
  507. @implementation FSTWatchStream
  508. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  509. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  510. credentials:(id<FSTCredentialsProvider>)credentials
  511. serializer:(FSTSerializerBeta *)serializer {
  512. self = [super initWithDatabase:database
  513. workerDispatchQueue:workerDispatchQueue
  514. connectionTimerID:FSTTimerIDListenStreamConnection
  515. idleTimerID:FSTTimerIDListenStreamIdle
  516. credentials:credentials
  517. responseMessageClass:[GCFSListenResponse class]];
  518. if (self) {
  519. _serializer = serializer;
  520. }
  521. return self;
  522. }
  523. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  524. return [[GRPCCall alloc] initWithHost:util::WrapNSString(self.databaseInfo->host())
  525. path:@"/google.firestore.v1beta1.Firestore/Listen"
  526. requestsWriter:requestsWriter];
  527. }
  528. - (void)notifyStreamOpen {
  529. [self.delegate watchStreamDidOpen];
  530. }
  531. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  532. id<FSTWatchStreamDelegate> delegate = self.delegate;
  533. self.delegate = nil;
  534. [delegate watchStreamWasInterruptedWithError:error];
  535. }
  536. - (void)watchQuery:(FSTQueryData *)query {
  537. FSTAssert([self isOpen], @"Not yet open");
  538. [self.workerDispatchQueue verifyIsCurrentQueue];
  539. GCFSListenRequest *request = [GCFSListenRequest message];
  540. request.database = [_serializer encodedDatabaseID];
  541. request.addTarget = [_serializer encodedTarget:query];
  542. request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
  543. FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request);
  544. [self writeRequest:request];
  545. }
  546. - (void)unwatchTargetID:(FSTTargetID)targetID {
  547. FSTAssert([self isOpen], @"Not yet open");
  548. [self.workerDispatchQueue verifyIsCurrentQueue];
  549. GCFSListenRequest *request = [GCFSListenRequest message];
  550. request.database = [_serializer encodedDatabaseID];
  551. request.removeTarget = targetID;
  552. FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request);
  553. [self writeRequest:request];
  554. }
  555. /**
  556. * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's
  557. * watchStreamDidChange:snapshotVersion: callback.
  558. */
  559. - (void)handleStreamMessage:(GCFSListenResponse *)proto {
  560. FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto);
  561. [self.workerDispatchQueue verifyIsCurrentQueue];
  562. // A successful response means the stream is healthy.
  563. [self.backoff reset];
  564. FSTWatchChange *change = [_serializer decodedWatchChange:proto];
  565. FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto];
  566. [self.delegate watchStreamDidChange:change snapshotVersion:snap];
  567. }
  568. @end
  569. #pragma mark - FSTWriteStream
  570. @interface FSTWriteStream ()
  571. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  572. @end
  573. @implementation FSTWriteStream
  574. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  575. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  576. credentials:(id<FSTCredentialsProvider>)credentials
  577. serializer:(FSTSerializerBeta *)serializer {
  578. self = [super initWithDatabase:database
  579. workerDispatchQueue:workerDispatchQueue
  580. connectionTimerID:FSTTimerIDWriteStreamConnection
  581. idleTimerID:FSTTimerIDWriteStreamIdle
  582. credentials:credentials
  583. responseMessageClass:[GCFSWriteResponse class]];
  584. if (self) {
  585. _serializer = serializer;
  586. }
  587. return self;
  588. }
  589. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  590. return [[GRPCCall alloc] initWithHost:util::WrapNSString(self.databaseInfo->host())
  591. path:@"/google.firestore.v1beta1.Firestore/Write"
  592. requestsWriter:requestsWriter];
  593. }
  594. - (void)startWithDelegate:(id)delegate {
  595. self.handshakeComplete = NO;
  596. [super startWithDelegate:delegate];
  597. }
  598. - (void)notifyStreamOpen {
  599. [self.delegate writeStreamDidOpen];
  600. }
  601. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  602. id<FSTWriteStreamDelegate> delegate = self.delegate;
  603. self.delegate = nil;
  604. [delegate writeStreamWasInterruptedWithError:error];
  605. }
  606. - (void)tearDown {
  607. if ([self isHandshakeComplete]) {
  608. // Send an empty write request to the backend to indicate imminent stream closure. This allows
  609. // the backend to clean up resources.
  610. [self writeMutations:@[]];
  611. }
  612. }
  613. - (void)writeHandshake {
  614. // The initial request cannot contain mutations, but must contain a projectID.
  615. FSTAssert([self isOpen], @"Not yet open");
  616. FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn");
  617. [self.workerDispatchQueue verifyIsCurrentQueue];
  618. GCFSWriteRequest *request = [GCFSWriteRequest message];
  619. request.database = [_serializer encodedDatabaseID];
  620. // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
  621. // handshake, ignoring any stream token we might have.
  622. FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request);
  623. [self writeRequest:request];
  624. }
  625. - (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
  626. FSTAssert([self isOpen], @"Not yet open");
  627. FSTAssert(self.handshakeComplete, @"Mutations sent out of turn");
  628. [self.workerDispatchQueue verifyIsCurrentQueue];
  629. NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
  630. for (FSTMutation *mutation in mutations) {
  631. [protos addObject:[_serializer encodedMutation:mutation]];
  632. };
  633. GCFSWriteRequest *request = [GCFSWriteRequest message];
  634. request.writesArray = protos;
  635. request.streamToken = self.lastStreamToken;
  636. FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request);
  637. [self writeRequest:request];
  638. }
  639. /**
  640. * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass
  641. * that on to the mutationResultsHandler.
  642. */
  643. - (void)handleStreamMessage:(GCFSWriteResponse *)response {
  644. FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response);
  645. [self.workerDispatchQueue verifyIsCurrentQueue];
  646. // Always capture the last stream token.
  647. self.lastStreamToken = response.streamToken;
  648. if (!self.isHandshakeComplete) {
  649. // The first response is the handshake response
  650. self.handshakeComplete = YES;
  651. [self.delegate writeStreamDidCompleteHandshake];
  652. } else {
  653. // A successful first write response means the stream is healthy.
  654. // Note that we could consider a successful handshake healthy, however, the write itself
  655. // might be causing an error we want to back off from.
  656. [self.backoff reset];
  657. FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime];
  658. NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray;
  659. NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count];
  660. for (GCFSWriteResult *proto in protos) {
  661. [results addObject:[_serializer decodedMutationResult:proto]];
  662. };
  663. [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
  664. }
  665. }
  666. @end