FSTStream.mm 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <GRPCClient/GRPCCall+OAuth2.h>
  17. #import <GRPCClient/GRPCCall.h>
  18. #import "FIRFirestoreErrors.h"
  19. #import "Firestore/Source/API/FIRFirestore+Internal.h"
  20. #import "Firestore/Source/Local/FSTQueryData.h"
  21. #import "Firestore/Source/Model/FSTMutation.h"
  22. #import "Firestore/Source/Remote/FSTBufferedWriter.h"
  23. #import "Firestore/Source/Remote/FSTDatastore.h"
  24. #import "Firestore/Source/Remote/FSTExponentialBackoff.h"
  25. #import "Firestore/Source/Remote/FSTSerializerBeta.h"
  26. #import "Firestore/Source/Remote/FSTStream.h"
  27. #import "Firestore/Source/Util/FSTClasses.h"
  28. #import "Firestore/Source/Util/FSTDispatchQueue.h"
  29. #import "Firestore/Protos/objc/google/firestore/v1beta1/Firestore.pbrpc.h"
  30. #include "Firestore/core/src/firebase/firestore/auth/token.h"
  31. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  32. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  33. #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
  34. #include "Firestore/core/src/firebase/firestore/util/error_apple.h"
  35. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  36. #include "Firestore/core/src/firebase/firestore/util/log.h"
  37. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  38. namespace util = firebase::firestore::util;
  39. using firebase::firestore::auth::CredentialsProvider;
  40. using firebase::firestore::auth::Token;
  41. using firebase::firestore::core::DatabaseInfo;
  42. using firebase::firestore::model::DatabaseId;
  43. using firebase::firestore::model::SnapshotVersion;
  44. using firebase::firestore::model::TargetId;
  45. /**
  46. * Initial backoff time in seconds after an error.
  47. * Set to 1s according to https://cloud.google.com/apis/design/errors.
  48. */
  49. static const NSTimeInterval kBackoffInitialDelay = 1;
  50. static const NSTimeInterval kBackoffMaxDelay = 60.0;
  51. static const double kBackoffFactor = 1.5;
  52. #pragma mark - FSTStream
  53. /** The state of a stream. */
  54. typedef NS_ENUM(NSInteger, FSTStreamState) {
  55. /**
  56. * The streaming RPC is not running and there's no error condition. Calling `start` will
  57. * start the stream immediately without backoff. While in this state -isStarted will return NO.
  58. */
  59. FSTStreamStateInitial = 0,
  60. /**
  61. * The stream is starting, and is waiting for an auth token to attach to the initial request.
  62. * While in this state, isStarted will return YES but isOpen will return NO.
  63. */
  64. FSTStreamStateAuth,
  65. /**
  66. * The streaming RPC is up and running. Requests and responses can flow freely. Both
  67. * isStarted and isOpen will return YES.
  68. */
  69. FSTStreamStateOpen,
  70. /**
  71. * The stream encountered an error. The next start attempt will back off. While in this state
  72. * -isStarted will return NO.
  73. */
  74. FSTStreamStateError,
  75. /**
  76. * An in-between state after an error where the stream is waiting before re-starting. After
  77. * waiting is complete, the stream will try to open. While in this state -isStarted will
  78. * return YES but isOpen will return NO.
  79. */
  80. FSTStreamStateBackoff,
  81. /**
  82. * The stream has been explicitly stopped; no further events will be emitted.
  83. */
  84. FSTStreamStateStopped,
  85. };
  86. // We need to declare these classes first so that Datastore can alloc them.
  87. @interface FSTWatchStream ()
  88. /**
  89. * Initializes the watch stream with its dependencies.
  90. */
  91. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  92. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  93. credentials:(CredentialsProvider *)credentials
  94. serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
  95. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  96. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  97. credentials:(CredentialsProvider *)credentials
  98. responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
  99. @end
  100. @interface FSTStream ()
  101. @property(nonatomic, assign, readonly) FSTTimerID idleTimerID;
  102. @property(nonatomic, strong, nullable) FSTDelayedCallback *idleTimerCallback;
  103. @property(nonatomic, weak, readwrite, nullable) id delegate;
  104. @end
  105. @interface FSTStream () <GRXWriteable>
  106. // Does not own this DatabaseInfo.
  107. @property(nonatomic, assign, readonly) const DatabaseInfo *databaseInfo;
  108. @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
  109. @property(nonatomic, assign, readonly) CredentialsProvider *credentials;
  110. @property(nonatomic, unsafe_unretained, readonly) Class responseMessageClass;
  111. @property(nonatomic, strong, readonly) FSTExponentialBackoff *backoff;
  112. /** A flag tracking whether the stream received a message from the backend. */
  113. @property(nonatomic, assign) BOOL messageReceived;
  114. /**
  115. * Stream state as exposed to consumers of FSTStream. This differs from GRXWriter's notion of the
  116. * state of the stream.
  117. */
  118. @property(nonatomic, assign) FSTStreamState state;
  119. /** The RPC handle. Used for cancellation. */
  120. @property(nonatomic, strong, nullable) GRPCCall *rpc;
  121. /**
  122. * The send-side of the RPC stream in which to submit requests, but only once the underlying RPC has
  123. * started.
  124. */
  125. @property(nonatomic, strong, nullable) FSTBufferedWriter *requestsWriter;
  126. @end
  127. #pragma mark - FSTCallbackFilter
  128. /**
  129. * Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main
  130. * FSTStream to allow the stream to be stopped externally (either by the user or via idle timer)
  131. * and be able to completely prevent any subsequent events from gRPC from calling back into the
  132. * FSTSTream.
  133. */
  134. @interface FSTCallbackFilter : NSObject <GRXWriteable>
  135. - (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
  136. - (instancetype)init NS_UNAVAILABLE;
  137. @property(atomic, readwrite) BOOL callbacksEnabled;
  138. @property(nonatomic, strong, readonly) FSTStream *stream;
  139. @end
  140. @implementation FSTCallbackFilter
  141. - (instancetype)initWithStream:(FSTStream *)stream {
  142. if (self = [super init]) {
  143. _callbacksEnabled = YES;
  144. _stream = stream;
  145. }
  146. return self;
  147. }
  148. - (void)suppressCallbacks {
  149. _callbacksEnabled = NO;
  150. }
  151. - (void)writeValue:(id)value {
  152. if (_callbacksEnabled) {
  153. [self.stream writeValue:value];
  154. }
  155. }
  156. - (void)writesFinishedWithError:(NSError *)errorOrNil {
  157. if (_callbacksEnabled) {
  158. [self.stream writesFinishedWithError:errorOrNil];
  159. }
  160. }
  161. @end
  162. #pragma mark - FSTStream
  163. @interface FSTStream ()
  164. @property(nonatomic, strong, readwrite) FSTCallbackFilter *callbackFilter;
  165. @end
  166. @implementation FSTStream
  167. /** The time a stream stays open after it is marked idle. */
  168. static const NSTimeInterval kIdleTimeout = 60.0;
  169. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  170. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  171. connectionTimerID:(FSTTimerID)connectionTimerID
  172. idleTimerID:(FSTTimerID)idleTimerID
  173. credentials:(CredentialsProvider *)credentials
  174. responseMessageClass:(Class)responseMessageClass {
  175. if (self = [super init]) {
  176. _databaseInfo = database;
  177. _workerDispatchQueue = workerDispatchQueue;
  178. _idleTimerID = idleTimerID;
  179. _credentials = credentials;
  180. _responseMessageClass = responseMessageClass;
  181. _backoff = [[FSTExponentialBackoff alloc] initWithDispatchQueue:workerDispatchQueue
  182. timerID:connectionTimerID
  183. initialDelay:kBackoffInitialDelay
  184. backoffFactor:kBackoffFactor
  185. maxDelay:kBackoffMaxDelay];
  186. _state = FSTStreamStateInitial;
  187. }
  188. return self;
  189. }
  190. - (BOOL)isStarted {
  191. [self.workerDispatchQueue verifyIsCurrentQueue];
  192. FSTStreamState state = self.state;
  193. return state == FSTStreamStateBackoff || state == FSTStreamStateAuth ||
  194. state == FSTStreamStateOpen;
  195. }
  196. - (BOOL)isOpen {
  197. [self.workerDispatchQueue verifyIsCurrentQueue];
  198. return self.state == FSTStreamStateOpen;
  199. }
  200. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  201. @throw FSTAbstractMethodException(); // NOLINT
  202. }
  203. - (void)startWithDelegate:(id)delegate {
  204. [self.workerDispatchQueue verifyIsCurrentQueue];
  205. if (self.state == FSTStreamStateError) {
  206. [self performBackoffWithDelegate:delegate];
  207. return;
  208. }
  209. LOG_DEBUG("%s %s start", NSStringFromClass([self class]), (__bridge void *)self);
  210. HARD_ASSERT(self.state == FSTStreamStateInitial, "Already started");
  211. self.state = FSTStreamStateAuth;
  212. HARD_ASSERT(_delegate == nil, "Delegate must be nil");
  213. _delegate = delegate;
  214. _credentials->GetToken([self](util::StatusOr<Token> result) {
  215. [self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^{
  216. [self resumeStartWithToken:result];
  217. }];
  218. });
  219. }
  220. /** Add an access token to our RPC, after obtaining one from the credentials provider. */
  221. - (void)resumeStartWithToken:(const util::StatusOr<Token> &)result {
  222. [self.workerDispatchQueue verifyIsCurrentQueue];
  223. if (self.state == FSTStreamStateStopped) {
  224. // Streams can be stopped while waiting for authorization.
  225. return;
  226. }
  227. HARD_ASSERT(self.state == FSTStreamStateAuth, "State should still be auth (was %s)", self.state);
  228. if (!result.ok()) {
  229. // RPC has not been started yet, so just invoke higher-level close handler.
  230. [self handleStreamClose:util::MakeNSError(result.status())];
  231. return;
  232. }
  233. self.requestsWriter = [[FSTBufferedWriter alloc] init];
  234. _rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
  235. [_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue];
  236. const Token &token = result.ValueOrDie();
  237. [FSTDatastore
  238. prepareHeadersForRPC:_rpc
  239. databaseID:&self.databaseInfo->database_id()
  240. token:(token.user().is_authenticated() ? token.token() : absl::string_view())];
  241. HARD_ASSERT(_callbackFilter == nil, "GRX Filter must be nil");
  242. _callbackFilter = [[FSTCallbackFilter alloc] initWithStream:self];
  243. [_rpc startWithWriteable:_callbackFilter];
  244. self.state = FSTStreamStateOpen;
  245. [self notifyStreamOpen];
  246. }
  247. /** Backs off after an error. */
  248. - (void)performBackoffWithDelegate:(id)delegate {
  249. LOG_DEBUG("%s %s backoff", NSStringFromClass([self class]), (__bridge void *)self);
  250. [self.workerDispatchQueue verifyIsCurrentQueue];
  251. HARD_ASSERT(self.state == FSTStreamStateError, "Should only perform backoff in an error case");
  252. self.state = FSTStreamStateBackoff;
  253. FSTWeakify(self);
  254. [self.backoff backoffAndRunBlock:^{
  255. FSTStrongify(self);
  256. [self resumeStartFromBackoffWithDelegate:delegate];
  257. }];
  258. }
  259. /** Resumes stream start after backing off. */
  260. - (void)resumeStartFromBackoffWithDelegate:(id)delegate {
  261. if (self.state == FSTStreamStateStopped) {
  262. // We should have canceled the backoff timer when the stream was closed, but just in case we
  263. // make this a no-op.
  264. return;
  265. }
  266. // In order to have performed a backoff the stream must have been in an error state just prior
  267. // to entering the backoff state. If we weren't stopped we must be in the backoff state.
  268. HARD_ASSERT(self.state == FSTStreamStateBackoff, "State should still be backoff (was %s)",
  269. self.state);
  270. // Momentarily set state to FSTStreamStateInitial as `start` expects it.
  271. self.state = FSTStreamStateInitial;
  272. [self startWithDelegate:delegate];
  273. HARD_ASSERT([self isStarted], "Stream should have started.");
  274. }
  275. /**
  276. * Can be overridden to perform additional cleanup before the stream is closed. Calling
  277. * [super tearDown] is not required.
  278. */
  279. - (void)tearDown {
  280. }
  281. /**
  282. * Closes the stream and cleans up as necessary:
  283. *
  284. * * closes the underlying GRPC stream;
  285. * * calls the onClose handler with the given 'error';
  286. * * sets internal stream state to 'finalState';
  287. * * adjusts the backoff timer based on the error
  288. *
  289. * A new stream can be opened by calling `start` unless `finalState` is set to
  290. * `FSTStreamStateStopped`.
  291. *
  292. * @param finalState the intended state of the stream after closing.
  293. * @param error the NSError the connection was closed with.
  294. */
  295. - (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error {
  296. HARD_ASSERT(finalState == FSTStreamStateError || error == nil,
  297. "Can't provide an error when not in an error state.");
  298. [self.workerDispatchQueue verifyIsCurrentQueue];
  299. // The stream will be closed so we don't need our idle close timer anymore.
  300. [self cancelIdleCheck];
  301. // Ensure we don't leave a pending backoff operation queued (in case close()
  302. // was called while we were waiting to reconnect).
  303. [self.backoff cancel];
  304. if (finalState != FSTStreamStateError) {
  305. // If this is an intentional close ensure we don't delay our next connection attempt.
  306. [self.backoff reset];
  307. } else if (error != nil && error.code == FIRFirestoreErrorCodeResourceExhausted) {
  308. LOG_DEBUG("%s %s Using maximum backoff delay to prevent overloading the backend.", [self class],
  309. (__bridge void *)self);
  310. [self.backoff resetToMax];
  311. } else if (error != nil && error.code == FIRFirestoreErrorCodeUnauthenticated) {
  312. // "unauthenticated" error means the token was rejected. Try force refreshing it in case it just
  313. // expired.
  314. _credentials->InvalidateToken();
  315. }
  316. if (finalState != FSTStreamStateError) {
  317. LOG_DEBUG("%s %s Performing stream teardown", [self class], (__bridge void *)self);
  318. [self tearDown];
  319. }
  320. if (self.requestsWriter) {
  321. // Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
  322. // call half-close to avoid secondary failures.
  323. if (finalState != FSTStreamStateError) {
  324. LOG_DEBUG("%s %s Closing stream client-side", [self class], (__bridge void *)self);
  325. @synchronized(self.requestsWriter) {
  326. [self.requestsWriter finishWithError:nil];
  327. }
  328. }
  329. _requestsWriter = nil;
  330. }
  331. // This state must be assigned before calling `notifyStreamInterrupted` to allow the callback to
  332. // inhibit backoff or otherwise manipulate the state in its non-started state.
  333. self.state = finalState;
  334. [self.callbackFilter suppressCallbacks];
  335. _callbackFilter = nil;
  336. // Clean up remaining state.
  337. _messageReceived = NO;
  338. _rpc = nil;
  339. // If the caller explicitly requested a stream stop, don't notify them of a closing stream (it
  340. // could trigger undesirable recovery logic, etc.).
  341. if (finalState != FSTStreamStateStopped) {
  342. [self notifyStreamInterruptedWithError:error];
  343. }
  344. // PORTING NOTE: notifyStreamInterruptedWithError may have restarted the stream with a new
  345. // delegate so we do /not/ want to clear the delegate here. And since we've already suppressed
  346. // callbacks via our callbackFilter, there is no worry about bleed through of events from GRPC.
  347. }
  348. - (void)stop {
  349. LOG_DEBUG("%s %s stop", NSStringFromClass([self class]), (__bridge void *)self);
  350. if ([self isStarted]) {
  351. [self closeWithFinalState:FSTStreamStateStopped error:nil];
  352. }
  353. }
  354. - (void)inhibitBackoff {
  355. HARD_ASSERT(![self isStarted], "Can only inhibit backoff after an error (was %s)", self.state);
  356. [self.workerDispatchQueue verifyIsCurrentQueue];
  357. // Clear the error condition.
  358. self.state = FSTStreamStateInitial;
  359. [self.backoff reset];
  360. }
  361. /** Called by the idle timer when the stream should close due to inactivity. */
  362. - (void)handleIdleCloseTimer {
  363. [self.workerDispatchQueue verifyIsCurrentQueue];
  364. if ([self isOpen]) {
  365. // When timing out an idle stream there's no reason to force the stream into backoff when
  366. // it restarts so set the stream state to Initial instead of Error.
  367. [self closeWithFinalState:FSTStreamStateInitial error:nil];
  368. }
  369. }
  370. - (void)markIdle {
  371. [self.workerDispatchQueue verifyIsCurrentQueue];
  372. // Starts the idle timer if we are in state 'Open' and are not yet already running a timer (in
  373. // which case the previous idle timeout still applies).
  374. if ([self isOpen] && !self.idleTimerCallback) {
  375. self.idleTimerCallback = [self.workerDispatchQueue dispatchAfterDelay:kIdleTimeout
  376. timerID:self.idleTimerID
  377. block:^() {
  378. [self handleIdleCloseTimer];
  379. }];
  380. }
  381. }
  382. - (void)cancelIdleCheck {
  383. [self.workerDispatchQueue verifyIsCurrentQueue];
  384. if (self.idleTimerCallback) {
  385. [self.idleTimerCallback cancel];
  386. self.idleTimerCallback = nil;
  387. }
  388. }
  389. /**
  390. * Parses a protocol buffer response from the server. If the message fails to parse, generates
  391. * an error and closes the stream.
  392. *
  393. * @param protoClass A protocol buffer message class object, that responds to parseFromData:error:.
  394. * @param data The bytes in the response as returned from GRPC.
  395. * @return An instance of the protocol buffer message, parsed from the data if parsing was
  396. * successful, or nil otherwise.
  397. */
  398. - (nullable id)parseProto:(Class)protoClass data:(NSData *)data error:(NSError **)error {
  399. NSError *parseError;
  400. id parsed = [protoClass parseFromData:data error:&parseError];
  401. if (parsed) {
  402. *error = nil;
  403. return parsed;
  404. } else {
  405. NSDictionary *info = @{
  406. NSLocalizedDescriptionKey : @"Unable to parse response from the server",
  407. NSUnderlyingErrorKey : parseError,
  408. @"Expected class" : protoClass,
  409. @"Received value" : data,
  410. };
  411. *error = [NSError errorWithDomain:FIRFirestoreErrorDomain
  412. code:FIRFirestoreErrorCodeInternal
  413. userInfo:info];
  414. return nil;
  415. }
  416. }
  417. /**
  418. * Writes a request proto into the stream.
  419. */
  420. - (void)writeRequest:(GPBMessage *)request {
  421. NSData *data = [request data];
  422. [self cancelIdleCheck];
  423. FSTBufferedWriter *requestsWriter = self.requestsWriter;
  424. @synchronized(requestsWriter) {
  425. [requestsWriter writeValue:data];
  426. }
  427. }
  428. #pragma mark Template methods for subclasses
  429. /**
  430. * Called by the stream after the stream has opened.
  431. *
  432. * Subclasses should relay to their stream-specific delegate. Calling [super notifyStreamOpen] is
  433. * not required.
  434. */
  435. - (void)notifyStreamOpen {
  436. }
  437. /**
  438. * Called by the stream after the stream has been unexpectedly interrupted, either due to an error
  439. * or due to idleness.
  440. *
  441. * Subclasses should relay to their stream-specific delegate. Calling [super
  442. * notifyStreamInterrupted] is not required.
  443. */
  444. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  445. }
  446. /**
  447. * Called by the stream for each incoming protocol message coming from the server.
  448. *
  449. * Subclasses should implement this to deserialize the value and relay to their stream-specific
  450. * delegate, if appropriate. Calling [super handleStreamMessage] is not required.
  451. */
  452. - (void)handleStreamMessage:(id)value {
  453. }
  454. /**
  455. * Called by the stream when the underlying RPC has been closed for whatever reason.
  456. */
  457. - (void)handleStreamClose:(nullable NSError *)error {
  458. LOG_DEBUG("%s %s close: %s", NSStringFromClass([self class]), (__bridge void *)self, error);
  459. HARD_ASSERT([self isStarted], "handleStreamClose: called for non-started stream.");
  460. // In theory the stream could close cleanly, however, in our current model we never expect this
  461. // to happen because if we stop a stream ourselves, this callback will never be called. To
  462. // prevent cases where we retry without a backoff accidentally, we set the stream to error
  463. // in all cases.
  464. [self closeWithFinalState:FSTStreamStateError error:error];
  465. }
  466. #pragma mark GRXWriteable implementation
  467. // The GRXWriteable implementation defines the receive side of the RPC stream.
  468. /**
  469. * Called by GRPC when it publishes a value.
  470. *
  471. * GRPC must be configured to use our worker queue by calling
  472. * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
  473. * the RPC.
  474. */
  475. - (void)writeValue:(id)value {
  476. [self.workerDispatchQueue enterCheckedOperation:^{
  477. HARD_ASSERT([self isStarted], "writeValue: called for stopped stream.");
  478. if (!self.messageReceived) {
  479. self.messageReceived = YES;
  480. if ([FIRFirestore isLoggingEnabled]) {
  481. LOG_DEBUG("%s %s headers (whitelisted): %s", NSStringFromClass([self class]),
  482. (__bridge void *)self,
  483. [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
  484. }
  485. }
  486. NSError *error;
  487. id proto = [self parseProto:self.responseMessageClass data:value error:&error];
  488. if (proto) {
  489. [self handleStreamMessage:proto];
  490. } else {
  491. [self.rpc finishWithError:error];
  492. }
  493. }];
  494. }
  495. /**
  496. * Called by GRPC when it closed the stream with an error representing the final state of the
  497. * stream.
  498. *
  499. * GRPC must be configured to use our worker queue by calling
  500. * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
  501. * the RPC.
  502. *
  503. * Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call
  504. * stop to tear down the stream.
  505. */
  506. - (void)writesFinishedWithError:(nullable NSError *)error __used {
  507. error = [FSTDatastore firestoreErrorForError:error];
  508. [self.workerDispatchQueue enterCheckedOperation:^{
  509. HARD_ASSERT([self isStarted], "writesFinishedWithError: called for stopped stream.");
  510. [self handleStreamClose:error];
  511. }];
  512. }
  513. @end
  514. #pragma mark - FSTWatchStream
  515. @interface FSTWatchStream ()
  516. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  517. @end
  518. @implementation FSTWatchStream
  519. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  520. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  521. credentials:(CredentialsProvider *)credentials
  522. serializer:(FSTSerializerBeta *)serializer {
  523. self = [super initWithDatabase:database
  524. workerDispatchQueue:workerDispatchQueue
  525. connectionTimerID:FSTTimerIDListenStreamConnectionBackoff
  526. idleTimerID:FSTTimerIDListenStreamIdle
  527. credentials:credentials
  528. responseMessageClass:[GCFSListenResponse class]];
  529. if (self) {
  530. _serializer = serializer;
  531. }
  532. return self;
  533. }
  534. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  535. return [[GRPCCall alloc] initWithHost:util::WrapNSString(self.databaseInfo->host())
  536. path:@"/google.firestore.v1beta1.Firestore/Listen"
  537. requestsWriter:requestsWriter];
  538. }
  539. - (void)notifyStreamOpen {
  540. [self.delegate watchStreamDidOpen];
  541. }
  542. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  543. id<FSTWatchStreamDelegate> delegate = self.delegate;
  544. self.delegate = nil;
  545. [delegate watchStreamWasInterruptedWithError:error];
  546. }
  547. - (void)watchQuery:(FSTQueryData *)query {
  548. HARD_ASSERT([self isOpen], "Not yet open");
  549. [self.workerDispatchQueue verifyIsCurrentQueue];
  550. GCFSListenRequest *request = [GCFSListenRequest message];
  551. request.database = [_serializer encodedDatabaseID];
  552. request.addTarget = [_serializer encodedTarget:query];
  553. request.labels = [_serializer encodedListenRequestLabelsForQueryData:query];
  554. LOG_DEBUG("FSTWatchStream %s watch: %s", (__bridge void *)self, request);
  555. [self writeRequest:request];
  556. }
  557. - (void)unwatchTargetID:(TargetId)targetID {
  558. HARD_ASSERT([self isOpen], "Not yet open");
  559. [self.workerDispatchQueue verifyIsCurrentQueue];
  560. GCFSListenRequest *request = [GCFSListenRequest message];
  561. request.database = [_serializer encodedDatabaseID];
  562. request.removeTarget = targetID;
  563. LOG_DEBUG("FSTWatchStream %s unwatch: %s", (__bridge void *)self, request);
  564. [self writeRequest:request];
  565. }
  566. /**
  567. * Receives an inbound message from GRPC, deserializes, and then passes that on to the delegate's
  568. * watchStreamDidChange:snapshotVersion: callback.
  569. */
  570. - (void)handleStreamMessage:(GCFSListenResponse *)proto {
  571. LOG_DEBUG("FSTWatchStream %s response: %s", (__bridge void *)self, proto);
  572. [self.workerDispatchQueue verifyIsCurrentQueue];
  573. // A successful response means the stream is healthy.
  574. [self.backoff reset];
  575. FSTWatchChange *change = [_serializer decodedWatchChange:proto];
  576. SnapshotVersion snap = [_serializer versionFromListenResponse:proto];
  577. [self.delegate watchStreamDidChange:change snapshotVersion:snap];
  578. }
  579. @end
  580. #pragma mark - FSTWriteStream
  581. @interface FSTWriteStream ()
  582. @property(nonatomic, strong, readonly) FSTSerializerBeta *serializer;
  583. @end
  584. @implementation FSTWriteStream
  585. - (instancetype)initWithDatabase:(const DatabaseInfo *)database
  586. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  587. credentials:(CredentialsProvider *)credentials
  588. serializer:(FSTSerializerBeta *)serializer {
  589. self = [super initWithDatabase:database
  590. workerDispatchQueue:workerDispatchQueue
  591. connectionTimerID:FSTTimerIDWriteStreamConnectionBackoff
  592. idleTimerID:FSTTimerIDWriteStreamIdle
  593. credentials:credentials
  594. responseMessageClass:[GCFSWriteResponse class]];
  595. if (self) {
  596. _serializer = serializer;
  597. }
  598. return self;
  599. }
  600. - (GRPCCall *)createRPCWithRequestsWriter:(GRXWriter *)requestsWriter {
  601. return [[GRPCCall alloc] initWithHost:util::WrapNSString(self.databaseInfo->host())
  602. path:@"/google.firestore.v1beta1.Firestore/Write"
  603. requestsWriter:requestsWriter];
  604. }
  605. - (void)startWithDelegate:(id)delegate {
  606. self.handshakeComplete = NO;
  607. [super startWithDelegate:delegate];
  608. }
  609. - (void)notifyStreamOpen {
  610. [self.delegate writeStreamDidOpen];
  611. }
  612. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  613. id<FSTWriteStreamDelegate> delegate = self.delegate;
  614. self.delegate = nil;
  615. [delegate writeStreamWasInterruptedWithError:error];
  616. }
  617. - (void)tearDown {
  618. if ([self isHandshakeComplete]) {
  619. // Send an empty write request to the backend to indicate imminent stream closure. This allows
  620. // the backend to clean up resources.
  621. [self writeMutations:@[]];
  622. }
  623. }
  624. - (void)writeHandshake {
  625. // The initial request cannot contain mutations, but must contain a projectID.
  626. HARD_ASSERT([self isOpen], "Not yet open");
  627. HARD_ASSERT(!self.handshakeComplete, "Handshake sent out of turn");
  628. [self.workerDispatchQueue verifyIsCurrentQueue];
  629. GCFSWriteRequest *request = [GCFSWriteRequest message];
  630. request.database = [_serializer encodedDatabaseID];
  631. // TODO(dimond): Support stream resumption. We intentionally do not set the stream token on the
  632. // handshake, ignoring any stream token we might have.
  633. LOG_DEBUG("FSTWriteStream %s initial request: %s", (__bridge void *)self, request);
  634. [self writeRequest:request];
  635. }
  636. - (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
  637. HARD_ASSERT([self isOpen], "Not yet open");
  638. HARD_ASSERT(self.handshakeComplete, "Mutations sent out of turn");
  639. [self.workerDispatchQueue verifyIsCurrentQueue];
  640. NSMutableArray<GCFSWrite *> *protos = [NSMutableArray arrayWithCapacity:mutations.count];
  641. for (FSTMutation *mutation in mutations) {
  642. [protos addObject:[_serializer encodedMutation:mutation]];
  643. };
  644. GCFSWriteRequest *request = [GCFSWriteRequest message];
  645. request.writesArray = protos;
  646. request.streamToken = self.lastStreamToken;
  647. LOG_DEBUG("FSTWriteStream %s mutation request: %s", (__bridge void *)self, request);
  648. [self writeRequest:request];
  649. }
  650. /**
  651. * Implements GRXWriteable to receive an inbound message from GRPC, deserialize, and then pass
  652. * that on to the mutationResultsHandler.
  653. */
  654. - (void)handleStreamMessage:(GCFSWriteResponse *)response {
  655. LOG_DEBUG("FSTWriteStream %s response: %s", (__bridge void *)self, response);
  656. [self.workerDispatchQueue verifyIsCurrentQueue];
  657. // Always capture the last stream token.
  658. self.lastStreamToken = response.streamToken;
  659. if (!self.isHandshakeComplete) {
  660. // The first response is the handshake response
  661. self.handshakeComplete = YES;
  662. [self.delegate writeStreamDidCompleteHandshake];
  663. } else {
  664. // A successful first write response means the stream is healthy.
  665. // Note that we could consider a successful handshake healthy, however, the write itself
  666. // might be causing an error we want to back off from.
  667. [self.backoff reset];
  668. SnapshotVersion commitVersion = [_serializer decodedVersion:response.commitTime];
  669. NSMutableArray<GCFSWriteResult *> *protos = response.writeResultsArray;
  670. NSMutableArray<FSTMutationResult *> *results = [NSMutableArray arrayWithCapacity:protos.count];
  671. for (GCFSWriteResult *proto in protos) {
  672. [results addObject:[_serializer decodedMutationResult:proto]];
  673. };
  674. [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
  675. }
  676. }
  677. @end