FSTStream.mm 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firestore/Source/Remote/FSTDatastore.h"
  17. #import <GRPCClient/GRPCCall+OAuth2.h>
  18. #import <GRPCClient/GRPCCall.h>
  19. #import "FIRFirestoreErrors.h"
  20. #import "Firestore/Source/API/FIRFirestore+Internal.h"
  21. #import "Firestore/Source/Local/FSTQueryData.h"
  22. #import "Firestore/Source/Model/FSTMutation.h"
  23. #import "Firestore/Source/Remote/FSTBufferedWriter.h"
  24. #import "Firestore/Source/Remote/FSTExponentialBackoff.h"
  25. #import "Firestore/Source/Remote/FSTSerializerBeta.h"
  26. #import "Firestore/Source/Remote/FSTStream.h"
  27. #import "Firestore/Source/Util/FSTAssert.h"
  28. #import "Firestore/Source/Util/FSTClasses.h"
  29. #import "Firestore/Source/Util/FSTDispatchQueue.h"
  30. #import "Firestore/Source/Util/FSTLogger.h"
  31. #import "Firestore/Protos/objc/google/firestore/v1beta1/Firestore.pbrpc.h"
  32. #include "Firestore/core/src/firebase/firestore/auth/token.h"
  33. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  34. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  35. #include "Firestore/core/src/firebase/firestore/util/error_apple.h"
  36. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  37. namespace util = firebase::firestore::util;
  38. using firebase::firestore::auth::CredentialsProvider;
  39. using firebase::firestore::auth::Token;
  40. using firebase::firestore::core::DatabaseInfo;
  41. using firebase::firestore::model::DatabaseId;
  42. /**
  43. * Initial backoff time in seconds after an error.
  44. * Set to 1s according to https://cloud.google.com/apis/design/errors.
  45. */
  46. static const NSTimeInterval kBackoffInitialDelay = 1;
  47. static const NSTimeInterval kBackoffMaxDelay = 60.0;
  48. static const double kBackoffFactor = 1.5;
  49. #pragma mark - FSTStream
  50. /** The state of a stream. */
  51. typedef NS_ENUM(NSInteger, FSTStreamState) {
  52. /**
  53. * The streaming RPC is not running and there's no error condition. Calling `start` will
  54. * start the stream immediately without backoff. While in this state -isStarted will return NO.
  55. */
  56. FSTStreamStateInitial = 0,
  57. /**
  58. * The stream is starting, and is waiting for an auth token to attach to the initial request.
  59. * While in this state, isStarted will return YES but isOpen will return NO.
  60. */
  61. FSTStreamStateAuth,
  62. /**
  63. * The streaming RPC is up and running. Requests and responses can flow freely. Both
  64. * isStarted and isOpen will return YES.
  65. */
  66. FSTStreamStateOpen,
  67. /**
  68. * The stream encountered an error. The next start attempt will back off. While in this state
  69. * -isStarted will return NO.
  70. */
  71. FSTStreamStateError,
  72. /**
  73. * An in-between state after an error where the stream is waiting before re-starting. After
  74. * waiting is complete, the stream will try to open. While in this state -isStarted will
  75. * return YES but isOpen will return NO.
  76. */
  77. FSTStreamStateBackoff,
  78. /**
  79. * The stream has been explicitly stopped; no further events will be emitted.
  80. */
  81. FSTStreamStateStopped,
  82. };
  83. // We need to declare these classes first so that Datastore can alloc them.
  84. @interface FSTWatchStream ()
  85. /**
  86. * Initializes the watch stream with its dependencies.
  87. */
  88. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  89. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  90. credentials:(CredentialsProvider *)credentials
  91. serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
  92. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  93. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  94. credentials:(CredentialsProvider *)credentials
  95. responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
  96. @end
  97. @interface FSTStream ()
  98. @property(nonatomic, assign, readonly) FSTTimerID idleTimerID;
  99. @property(nonatomic, strong, nullable) FSTDelayedCallback *idleTimerCallback;
  100. @property(nonatomic, weak, readwrite, nullable) id delegate;
  101. @end
  102. @interface FSTStream () <GRXWriteable>
  103. // Does not own this DatabaseInfo.
  104. @property(nonatomic, assign, readonly) const DatabaseInfo *databaseInfo;
  105. @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
  106. @property(nonatomic, assign, readonly) CredentialsProvider *credentials;
  107. @property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass;
  108. @property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff;
  109. /** A flag tracking whether the stream received a message from the backend. */
  110. @property(nonatomic, assign) BOOL messageReceived;
  111. /**
  112. * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the
  113. * state of the stream.
  114. */
  115. @property(nonatomic, assign) FSTStreamState state;
  116. /** The RPC handle. Used for cancellation. */
  117. @property(nonatomic, strong, nullable) GRPCCall *rpc;
  118. /**
  119. * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has
  120. * started.
  121. */
  122. @property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter;
  123. @end
  124. #pragma mark - FSTCallbackFilter
  125. /**
  126. * Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main
  127. * FSTStream to allow the stream to be stopped externally (either by the user or via idle timer)
  128. * and be able to completely prevent any subsequent events from gRPC from calling back into the
  129. * FSTSTream.
  130. */
  131. @interface FSTCallbackFilter : NSObject <GRXWriteable>
  132. - (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
  133. - (instancetype)init NS_UNAVAILABLE;
  134. @property(atomic, readwrite) BOOL callbacksEnabled;
  135. @property(nonatomic, strong, readonly) FSTStream *stream;
  136. @end
  137. @implementation FSTCallbackFilter
  138. - (instancetype)initWithStream:(FSTStream *)stream {
  139. if (self = [super init]) {
  140. _callbacksEnabled = YES;
  141. _stream = stream;
  142. }
  143. return self;
  144. }
  145. - (void)suppressCallbacks {
  146. _callbacksEnabled = NO;
  147. }
  148. - (void)writeValue:(id)value {
  149. if (_callbacksEnabled) {
  150. [self.stream writeValue:value];
  151. }
  152. }
  153. - (void)writesFinishedWithError:(NSError *)errorOrNil {
  154. if (_callbacksEnabled) {
  155. [self.stream writesFinishedWithError:errorOrNil];
  156. }
  157. }
  158. @end
  159. #pragma mark - FSTStream
  160. @interface FSTStream ()
  161. @property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter;
  162. @end
  163. @implementation FSTStream
  164. /** The time a stream stays open after it is marked idle. */
  165. static const NSTimeInterval kIdleTimeout = 60.0;
  166. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  167. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  168. connectionTimerID:(FSTTimerID)connectionTimerID
  169. idleTimerID:(FSTTimerID)idleTimerID
  170. credentials:(CredentialsProvider *)credentials
  171. responseMessageClass:(Class)responseMessageClass {
  172. if (self = [super init]) {
  173. _databaseInfo = database;
  174. _workerDispatchQueue = workerDispatchQueue;
  175. _idleTimerID = idleTimerID;
  176. _credentials = credentials;
  177. _responseMessageClass = responseMessageClass;
  178. _backoff = [[FSTExponentialBackoff alloc] initWithDispatchQueue:workerDispatchQueue
  179. timerID:connectionTimerID
  180. initialDelay:kBackoffInitialDelay
  181. backoffFactor:kBackoffFactor
  182. maxDelay:kBackoffMaxDelay];
  183. _state = FSTStreamStateInitial;
  184. }
  185. return self;
  186. }
  187. - (BOOL)isStarted {
  188. [self.workerDispatchQueue verifyIsCurrentQueue];
  189. FSTStreamState state = self.state;
  190. return state == FSTStreamStateBackoff || state == FSTStreamStateAuth ||
  191. state == FSTStreamStateOpen;
  192. }
  193. - (BOOL)isOpen {
  194. [self.workerDispatchQueue verifyIsCurrentQueue];
  195. return self.state == FSTStreamStateOpen;
  196. }
  197. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  198. @throw FSTAbstractMethodException(); // NOLINT
  199. }
  200. - (void)startWithDelegate:(id)delegate {
  201. [self.workerDispatchQueue verifyIsCurrentQueue];
  202. if (self.state == FSTStreamStateError) {
  203. [self performBackoffWithDelegate:delegate];
  204. return;
  205. }
  206. FSTLog(@"%@ %p start", NSStringFromClass([self class]), (__bridge void *)self);
  207. FSTAssert(self.state == FSTStreamStateInitial, @"Already started");
  208. self.state = FSTStreamStateAuth;
  209. FSTAssert(_delegate == nil, @"Delegate must be nil");
  210. _delegate = delegate;
  211. _credentials->GetToken(
  212. /*force_refresh=*/false,
  213. [self](Token result, const int64_t error_code, const absl::string_view error_msg) {
  214. NSError *error = util::WrapNSError(error_code, error_msg);
  215. [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
  216. [self resumeStartWithToken:result error:error];
  217. }];
  218. });
  219. }
  220. /** Add an access token to our RPC, after obtaining one from the credentials provider. */
  221. - (void)resumeStartWithToken:(const Token &)token error:(NSError *)error {
  222. [self.workerDispatchQueue verifyIsCurrentQueue];
  223. if (self.state == FSTStreamStateStopped) {
  224. // Streams can be stopped while waiting for authorization.
  225. return;
  226. }
  227. FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
  228. (long)self.state);
  229. // TODO(mikelehen): We should force a refresh if the previous RPC failed due to an expired token,
  230. // but I'm not sure how to detect that right now. http://b/32762461
  231. if (error) {
  232. // RPC has not been started yet, so just invoke higher-level close handler.
  233. [self handleStreamClose:error];
  234. return;
  235. }
  236. self.requestsWriter = [[FSTBufferedWriter alloc] init];
  237. _rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
  238. [_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue];
  239. [FSTDatastore
  240. prepareHeadersForRPC:_rpc
  241. databaseID:&self.databaseInfo->database_id()
  242. token:(token.user().is_authenticated() ? token.token() : absl::string_view())];
  243. FSTAssert(_callbackFilter == nil, @"GRX Filter must be nil");
  244. _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self];
  245. [_rpc startWithWriteable:_callbackFilter];
  246. self.state = FSTStreamStateOpen;
  247. [self notifyStreamOpen];
  248. }
  249. /** Backs off after an error. */
  250. - (void)performBackoffWithDelegate:(id)delegate {
  251. FSTLog(@"%@ %p backoff", NSStringFromClass([self class]), (__bridge void *)self);
  252. [self.workerDispatchQueue verifyIsCurrentQueue];
  253. FSTAssert(self.state == FSTStreamStateError, @"Should only perform backoff in an error case");
  254. self.state = FSTStreamStateBackoff;
  255. FSTWeakify(self);
  256. [self.backoff backoffAndRunBlock:^{
  257. FSTStrongify(self);
  258. [self resumeStartFromBackoffWithDelegate:delegate];
  259. }];
  260. }
  261. /** Resumes stream start after backing off. */
  262. - (void)resumeStartFromBackoffWithDelegate:(id)delegate {
  263. if (self.state == FSTStreamStateStopped) {
  264. // Streams can be stopped while waiting for backoff to complete.
  265. return;
  266. }
  267. // In order to have performed a backoff the stream must have been in an error state just prior
  268. // to entering the backoff state. If we weren't stopped we must be in the backoff state.
  269. FSTAssert(self.state == FSTStreamStateBackoff, @"State should still be backoff (was %ld)",
  270. (long)self.state);
  271. // Momentarily set state to FSTStreamStateInitial as `start` expects it.
  272. self.state = FSTStreamStateInitial;
  273. [self startWithDelegate:delegate];
  274. FSTAssert([self isStarted], @"Stream should have started.");
  275. }
  276. /**
  277. * Can be overridden to perform additional cleanup before the stream is closed. Calling
  278. * [super tearDown] is not required.
  279. */
  280. - (void)tearDown {
  281. }
  282. /**
  283. * Closes the stream and cleans up as necessary:
  284. *
  285. * * closes the underlying GRPC stream;
  286. * * calls the onClose handler with the given 'error';
  287. * * sets internal stream state to 'finalState';
  288. * * adjusts the backoff timer based on the error
  289. *
  290. * A new stream can be opened by calling `start` unless `finalState` is set to
  291. * `FSTStreamStateStopped`.
  292. *
  293. * @param finalState the intended state of the stream after closing.
  294. * @param error the NSError the connection was closed with.
  295. */
  296. - (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error {
  297. FSTAssert(finalState == FSTStreamStateError || error == nil,
  298. @"Can't provide an error when not in an error state.");
  299. [self.workerDispatchQueue verifyIsCurrentQueue];
  300. [self cancelIdleCheck];
  301. if (finalState != FSTStreamStateError) {
  302. // If this is an intentional close ensure we don't delay our next connection attempt.
  303. [self.backoff reset];
  304. } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) {
  305. FSTLog(@"%@ %p Using maximum backoff delay to prevent overloading the backend.", [self class],
  306. (__bridge void *)self);
  307. [self.backoff resetToMax];
  308. }
  309. if (finalState != FSTStreamStateError) {
  310. FSTLog(@"%@ %p Performing stream teardown", [self class], (__bridge void *)self);
  311. [self tearDown];
  312. }
  313. if (self.requestsWriter) {
  314. // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
  315. // call half-close to avoid secondary failures.
  316. if (finalState != FSTStreamStateError) {
  317. FSTLog(@"%@ %p Closing stream client-side", [self class], (__bridge void *)self);
  318. @synchronized(self.requestsWriter) {
  319. [self.requestsWriter finishWithError:nil];
  320. }
  321. }
  322. _requestsWriter = nil;
  323. }
  324. // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to
  325. // inhibit backoff or otherwise manipulate the state in its non-started state.
  326. self.state = finalState;
  327. [self.callbackFilter suppressCallbacks];
  328. _callbackFilter = nil;
  329. // Clean up remaining state.
  330. _messageReceived = NO;
  331. _rpc = nil;
  332. // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it
  333. // could trigger undesirable recovery logic, etc.).
  334. if (finalState != FSTStreamStateStopped) {
  335. [self notifyStreamInterruptedWithError:error];
  336. }
  337. // PORTING NOTE: notifyStreamInterruptedWithError may have restarted the stream with a new
  338. // delegate so we do /not/ want to clear the delegate here. And since we've already suppressed
  339. // callbacks via our callbackFilter, there is no worry about bleed through of events from GRPC.
  340. }
  341. - (void)stop {
  342. FSTLog(@"%@ %p stop", NSStringFromClass([self class]), (__bridge void *)self);
  343. if ([self isStarted]) {
  344. [self closeWithFinalState:FSTStreamStateStopped error:nil];
  345. }
  346. }
  347. - (void)inhibitBackoff {
  348. FSTAssert(![self isStarted], @"Can only inhibit backoff after an error (was %ld)",
  349. (long)self.state);
  350. [self.workerDispatchQueue verifyIsCurrentQueue];
  351. // Clear the error condition.
  352. self.state = FSTStreamStateInitial;
  353. [self.backoff reset];
  354. }
  355. /** Called by the idle timer when the stream should close due to inactivity. */
  356. - (void)handleIdleCloseTimer {
  357. [self.workerDispatchQueue verifyIsCurrentQueue];
  358. if ([self isOpen]) {
  359. // When timing out an idle stream there's no reason to force the stream into backoff when
  360. // it restarts so set the stream state to Initial instead of Error.
  361. [self closeWithFinalState:FSTStreamStateInitial error:nil];
  362. }
  363. }
  364. - (void)markIdle {
  365. [self.workerDispatchQueue verifyIsCurrentQueue];
  366. // Starts the idle timer if we are in state 'Open' and are not yet already running a timer (in
  367. // which case the previous idle timeout still applies).
  368. if ([self isOpen] && !self.idleTimerCallback) {
  369. self.idleTimerCallback = [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout
  370. timerID:self.idleTimerID
  371. block:^() {
  372. [self handleIdleCloseTimer];
  373. }];
  374. }
  375. }
  376. - (void)cancelIdleCheck {
  377. [self.workerDispatchQueue verifyIsCurrentQueue];
  378. if (self.idleTimerCallback) {
  379. [self.idleTimerCallback cancel];
  380. self.idleTimerCallback = nil;
  381. }
  382. }
  383. /**
  384. * Parses a protocol buffer response from the server. If the message fails to parse, generates
  385. * an error and closes the stream.
  386. *
  387. * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:.
  388. * @param data The bytes in the response as returned from GRPC.
  389. * @return An instance of the protocol buffer message, parsed from the data if parsing was
  390. * successful, or nil otherwise.
  391. */
  392. - (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error {
  393. NSError *parseError;
  394. id parsed = [protoClass parseFromData:data error:&parseError];
  395. if (parsed) {
  396. *error = nil;
  397. return parsed;
  398. } else {
  399. NSDictionary *info = @{
  400. NSLocalizedDescriptionKey : @"Unable to parse response from the server",
  401. NSUnderlyingErrorKey : parseError,
  402. @"Expected class" : protoClass,
  403. @"Received value" : data,
  404. };
  405. *error = [NSError errorWithDomain:FIRFirestoreErrorDomain
  406. code:FIRFirestoreErrorCodeInternal
  407. userInfo:info];
  408. return nil;
  409. }
  410. }
  411. /**
  412. * Writes a request proto into the stream.
  413. */
  414. - (void)writeRequest:(GPBMessage *)request {
  415. NSData *data = [request data];
  416. [self cancelIdleCheck];
  417. FSTBufferedWriter *requestsWriter = self.requestsWriter;
  418. @synchronized(requestsWriter) {
  419. [requestsWriter writeValue:data];
  420. }
  421. }
  422. #pragma mark Template methods for subclasses
  423. /**
  424. * Called by the stream after the stream has opened.
  425. *
  426. * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is
  427. * not required.
  428. */
  429. - (void)notifyStreamOpen {
  430. }
  431. /**
  432. * Called by the stream after the stream has been unexpectedly interrupted, either due to an error
  433. * or due to idleness.
  434. *
  435. * Subclasses should relay to their stream-specific delegate. Calling [super
  436. * notifyStreamInterrupted] is not required.
  437. */
  438. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  439. }
  440. /**
  441. * Called by the stream for each incoming protocol message coming from the server.
  442. *
  443. * Subclasses should implement this to deserialize the value and relay to their stream-specific
  444. * delegate, if appropriate. Calling [super handleStreamMessage] is not required.
  445. */
  446. - (void)handleStreamMessage:(id)value {
  447. }
  448. /**
  449. * Called by the stream when the underlying RPC has been closed for whatever reason.
  450. */
  451. - (void)handleStreamClose:(nullable NSError *)error {
  452. FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);
  453. FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");
  454. // In theory the stream could close cleanly, however, in our current model we never expect this
  455. // to happen because if we stop a stream ourselves, this callback will never be called. To
  456. // prevent cases where we retry without a backoff accidentally, we set the stream to error
  457. // in all cases.
  458. [self closeWithFinalState:FSTStreamStateError error:error];
  459. }
  460. #pragma mark GRXWriteable implementation
  461. // The GRXWriteable implementation defines the receive side of the RPC stream.
  462. /**
  463. * Called by GRPC when it publishes a value.
  464. *
  465. * GRPC must be configured to use our worker queue by calling
  466. * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
  467. * the RPC.
  468. */
  469. - (void)writeValue:(id)value {
  470. [self.workerDispatchQueue enterCheckedOperation:^{
  471. FSTAssert([self isStarted], @"writeValue: called for stopped stream.");
  472. if (!self.messageReceived) {
  473. self.messageReceived = YES;
  474. if ([FIRFirestore isLoggingEnabled]) {
  475. FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
  476. (__bridge void *)self,
  477. [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
  478. }
  479. }
  480. NSError *error;
  481. id proto = [self parseProto:self.responseMessageClass data:value error:&error];
  482. if (proto) {
  483. [self handleStreamMessage:proto];
  484. } else {
  485. [self.rpc finishWithError:error];
  486. }
  487. }];
  488. }
  489. /**
  490. * Called by GRPC when it closed the stream with an error representing the final state of the
  491. * stream.
  492. *
  493. * GRPC must be configured to use our worker queue by calling
  494. * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
  495. * the RPC.
  496. *
  497. * Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call
  498. * stop to tear down the stream.
  499. */
  500. - (void)writesFinishedWithError:(nullable NSError *)error __used {
  501. error = [FSTDatastore firestoreErrorForError:error];
  502. [self.workerDispatchQueue enterCheckedOperation:^{
  503. FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");
  504. [self handleStreamClose:error];
  505. }];
  506. }
  507. @end
  508. #pragma mark - FSTWatchStream
  509. @interface FSTWatchStream ()
  510. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  511. @end
  512. @implementation FSTWatchStream
  513. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  514. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  515. credentials:(CredentialsProvider *)credentials
  516. serializer:(FSTSerializerBeta *)serializer {
  517. self = [super initWithDatabase:database
  518. workerDispatchQueue:workerDispatchQueue
  519. connectionTimerID:FSTTimerIDListenStreamConnectionBackoff
  520. idleTimerID:FSTTimerIDListenStreamIdle
  521. credentials:credentials
  522. responseMessageClass:[GCFSListenResponse class]];
  523. if (self) {
  524. _serializer = serializer;
  525. }
  526. return self;
  527. }
  528. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  529. return [[GRPCCall alloc] initWithHost:util::WrapNSString(self.databaseInfo->host())
  530. path:@"/google.firestore.v1beta1.Firestore/Listen"
  531. requestsWriter:requestsWriter];
  532. }
  533. - (void)notifyStreamOpen {
  534. [self.delegate watchStreamDidOpen];
  535. }
  536. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  537. id<FSTWatchStreamDelegate> delegate = self.delegate;
  538. self.delegate = nil;
  539. [delegate watchStreamWasInterruptedWithError:error];
  540. }
  541. - (void)watchQuery:(FSTQueryData *)query {
  542. FSTAssert([self isOpen], @"Not yet open");
  543. [self.workerDispatchQueue verifyIsCurrentQueue];
  544. GCFSListenRequest *request = [GCFSListenRequest message];
  545. request.database = [_serializer encodedDatabaseID];
  546. request.addTarget = [_serializer encodedTarget:query];
  547. request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
  548. FSTLog(@"FSTWatchStream %p watch: %@", (__bridge void *)self, request);
  549. [self writeRequest:request];
  550. }
  551. - (void)unwatchTargetID:(FSTTargetID)targetID {
  552. FSTAssert([self isOpen], @"Not yet open");
  553. [self.workerDispatchQueue verifyIsCurrentQueue];
  554. GCFSListenRequest *request = [GCFSListenRequest message];
  555. request.database = [_serializer encodedDatabaseID];
  556. request.removeTarget = targetID;
  557. FSTLog(@"FSTWatchStream %p unwatch: %@", (__bridge void *)self, request);
  558. [self writeRequest:request];
  559. }
  560. /**
  561. * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's
  562. * watchStreamDidChange:snapshotVersion: callback.
  563. */
  564. - (void)handleStreamMessage:(GCFSListenResponse *)proto {
  565. FSTLog(@"FSTWatchStream %p response: %@", (__bridge void *)self, proto);
  566. [self.workerDispatchQueue verifyIsCurrentQueue];
  567. // A successful response means the stream is healthy.
  568. [self.backoff reset];
  569. FSTWatchChange *change = [_serializer decodedWatchChange:proto];
  570. FSTSnapshotVersion *snap = [_serializer versionFromListenResponse:proto];
  571. [self.delegate watchStreamDidChange:change snapshotVersion:snap];
  572. }
  573. @end
  574. #pragma mark - FSTWriteStream
  575. @interface FSTWriteStream ()
  576. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  577. @end
  578. @implementation FSTWriteStream
  579. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  580. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  581. credentials:(CredentialsProvider *)credentials
  582. serializer:(FSTSerializerBeta *)serializer {
  583. self = [super initWithDatabase:database
  584. workerDispatchQueue:workerDispatchQueue
  585. connectionTimerID:FSTTimerIDWriteStreamConnectionBackoff
  586. idleTimerID:FSTTimerIDWriteStreamIdle
  587. credentials:credentials
  588. responseMessageClass:[GCFSWriteResponse class]];
  589. if (self) {
  590. _serializer = serializer;
  591. }
  592. return self;
  593. }
  594. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  595. return [[GRPCCall alloc] initWithHost:util::WrapNSString(self.databaseInfo->host())
  596. path:@"/google.firestore.v1beta1.Firestore/Write"
  597. requestsWriter:requestsWriter];
  598. }
  599. - (void)startWithDelegate:(id)delegate {
  600. self.handshakeComplete = NO;
  601. [super startWithDelegate:delegate];
  602. }
  603. - (void)notifyStreamOpen {
  604. [self.delegate writeStreamDidOpen];
  605. }
  606. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  607. id<FSTWriteStreamDelegate> delegate = self.delegate;
  608. self.delegate = nil;
  609. [delegate writeStreamWasInterruptedWithError:error];
  610. }
  611. - (void)tearDown {
  612. if ([self isHandshakeComplete]) {
  613. // Send an empty write request to the backend to indicate imminent stream closure. This allows
  614. // the backend to clean up resources.
  615. [self writeMutations:@[]];
  616. }
  617. }
  618. - (void)writeHandshake {
  619. // The initial request cannot contain mutations, but must contain a projectID.
  620. FSTAssert([self isOpen], @"Not yet open");
  621. FSTAssert(!self.handshakeComplete, @"Handshake sent out of turn");
  622. [self.workerDispatchQueue verifyIsCurrentQueue];
  623. GCFSWriteRequest *request = [GCFSWriteRequest message];
  624. request.database = [_serializer encodedDatabaseID];
  625. // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
  626. // handshake, ignoring any stream token we might have.
  627. FSTLog(@"FSTWriteStream %p initial request: %@", (__bridge void *)self, request);
  628. [self writeRequest:request];
  629. }
  630. - (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
  631. FSTAssert([self isOpen], @"Not yet open");
  632. FSTAssert(self.handshakeComplete, @"Mutations sent out of turn");
  633. [self.workerDispatchQueue verifyIsCurrentQueue];
  634. NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
  635. for (FSTMutation *mutation in mutations) {
  636. [protos addObject:[_serializer encodedMutation:mutation]];
  637. };
  638. GCFSWriteRequest *request = [GCFSWriteRequest message];
  639. request.writesArray = protos;
  640. request.streamToken = self.lastStreamToken;
  641. FSTLog(@"FSTWriteStream %p mutation request: %@", (__bridge void *)self, request);
  642. [self writeRequest:request];
  643. }
  644. /**
  645. * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass
  646. * that on to the mutationResultsHandler.
  647. */
  648. - (void)handleStreamMessage:(GCFSWriteResponse *)response {
  649. FSTLog(@"FSTWriteStream %p response: %@", (__bridge void *)self, response);
  650. [self.workerDispatchQueue verifyIsCurrentQueue];
  651. // Always capture the last stream token.
  652. self.lastStreamToken = response.streamToken;
  653. if (!self.isHandshakeComplete) {
  654. // The first response is the handshake response
  655. self.handshakeComplete = YES;
  656. [self.delegate writeStreamDidCompleteHandshake];
  657. } else {
  658. // A successful first write response means the stream is healthy.
  659. // Note that we could consider a successful handshake healthy, however, the write itself
  660. // might be causing an error we want to back off from.
  661. [self.backoff reset];
  662. FSTSnapshotVersion *commitVersion = [_serializer decodedVersion:response.commitTime];
  663. NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray;
  664. NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count];
  665. for (GCFSWriteResult *proto in protos) {
  666. [results addObject:[_serializer decodedMutationResult:proto]];
  667. };
  668. [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
  669. }
  670. }
  671. @end