FSTStreamTests.mm 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <XCTest/XCTest.h>
  17. #import <GRPCClient/GRPCCall.h>
  18. #import <FirebaseFirestore/FIRFirestoreErrors.h>
  19. #import <FirebaseFirestore/FIRFirestoreSettings.h>
  20. #include <utility>
  21. #import "Firestore/Example/Tests/Util/FSTHelpers.h"
  22. #import "Firestore/Example/Tests/Util/FSTIntegrationTestCase.h"
  23. #import "Firestore/Source/Remote/FSTDatastore.h"
  24. #import "Firestore/Source/Remote/FSTStream.h"
  25. #include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h"
  26. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  27. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  28. #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
  29. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  30. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  31. namespace util = firebase::firestore::util;
  32. using firebase::firestore::auth::EmptyCredentialsProvider;
  33. using firebase::firestore::core::DatabaseInfo;
  34. using firebase::firestore::model::DatabaseId;
  35. using firebase::firestore::model::SnapshotVersion;
  36. /** Exposes otherwise private methods for testing. */
  37. @interface FSTStream (Testing)
  38. @property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
  39. @end
  40. /**
  41. * Implements FSTWatchStreamDelegate and FSTWriteStreamDelegate and supports waiting on callbacks
  42. * via `fulfillOnCallback`.
  43. */
  44. @interface FSTStreamStatusDelegate : NSObject <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
  45. - (instancetype)initWithTestCase:(XCTestCase *)testCase
  46. queue:(FSTDispatchQueue *)dispatchQueue NS_DESIGNATED_INITIALIZER;
  47. - (instancetype)init NS_UNAVAILABLE;
  48. @property(nonatomic, weak, readonly) XCTestCase *testCase;
  49. @property(nonatomic, strong, readonly) FSTDispatchQueue *dispatchQueue;
  50. @property(nonatomic, readonly) NSMutableArray<NSString *> *states;
  51. @property(nonatomic, strong) XCTestExpectation *expectation;
  52. @end
  53. @implementation FSTStreamStatusDelegate
  54. - (instancetype)initWithTestCase:(XCTestCase *)testCase queue:(FSTDispatchQueue *)dispatchQueue {
  55. if (self = [super init]) {
  56. _testCase = testCase;
  57. _dispatchQueue = dispatchQueue;
  58. _states = [NSMutableArray new];
  59. }
  60. return self;
  61. }
  62. - (void)watchStreamDidOpen {
  63. [_states addObject:@"watchStreamDidOpen"];
  64. [_expectation fulfill];
  65. _expectation = nil;
  66. }
  67. - (void)writeStreamDidOpen {
  68. [_states addObject:@"writeStreamDidOpen"];
  69. [_expectation fulfill];
  70. _expectation = nil;
  71. }
  72. - (void)writeStreamDidCompleteHandshake {
  73. [_states addObject:@"writeStreamDidCompleteHandshake"];
  74. [_expectation fulfill];
  75. _expectation = nil;
  76. }
  77. - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
  78. [_states addObject:@"writeStreamWasInterrupted"];
  79. [_expectation fulfill];
  80. _expectation = nil;
  81. }
  82. - (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
  83. [_states addObject:@"watchStreamWasInterrupted"];
  84. [_expectation fulfill];
  85. _expectation = nil;
  86. }
  87. - (void)watchStreamDidChange:(FSTWatchChange *)change
  88. snapshotVersion:(const SnapshotVersion &)snapshotVersion {
  89. [_states addObject:@"watchStreamDidChange"];
  90. [_expectation fulfill];
  91. _expectation = nil;
  92. }
  93. - (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion
  94. mutationResults:(NSArray<FSTMutationResult *> *)results {
  95. [_states addObject:@"writeStreamDidReceiveResponseWithVersion"];
  96. [_expectation fulfill];
  97. _expectation = nil;
  98. }
  99. /**
  100. * Executes 'block' using the provided FSTDispatchQueue and waits for any callback on this delegate
  101. * to be called.
  102. */
  103. - (void)awaitNotificationFromBlock:(void (^)(void))block {
  104. HARD_ASSERT(_expectation == nil, "Previous expectation still active");
  105. XCTestExpectation *expectation =
  106. [self.testCase expectationWithDescription:@"awaitCallbackInBlock"];
  107. _expectation = expectation;
  108. [self.dispatchQueue dispatchAsync:block];
  109. [self.testCase awaitExpectations];
  110. }
  111. @end
  112. @interface FSTStreamTests : XCTestCase
  113. @end
  114. @implementation FSTStreamTests {
  115. dispatch_queue_t _testQueue;
  116. FSTDispatchQueue *_workerDispatchQueue;
  117. DatabaseInfo _databaseInfo;
  118. EmptyCredentialsProvider _credentials;
  119. FSTStreamStatusDelegate *_delegate;
  120. /** Single mutation to send to the write stream. */
  121. NSArray<FSTMutation *> *_mutations;
  122. }
  123. - (void)setUp {
  124. [super setUp];
  125. FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings];
  126. DatabaseId database_id(util::MakeStringView([FSTIntegrationTestCase projectID]),
  127. DatabaseId::kDefault);
  128. _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL);
  129. _workerDispatchQueue = [[FSTDispatchQueue alloc] initWithQueue:_testQueue];
  130. _databaseInfo = DatabaseInfo(database_id, "test-key", util::MakeStringView(settings.host),
  131. settings.sslEnabled);
  132. _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
  133. _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ];
  134. }
  135. - (FSTWriteStream *)setUpWriteStream {
  136. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  137. workerDispatchQueue:_workerDispatchQueue
  138. credentials:&_credentials];
  139. return [datastore createWriteStream];
  140. }
  141. - (FSTWatchStream *)setUpWatchStream {
  142. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  143. workerDispatchQueue:_workerDispatchQueue
  144. credentials:&_credentials];
  145. return [datastore createWatchStream];
  146. }
  147. /**
  148. * Drains the test queue and asserts that all the observed callbacks (up to this point) match
  149. * 'expectedStates'. Clears the list of observed callbacks on completion.
  150. */
  151. - (void)verifyDelegateObservedStates:(NSArray<NSString *> *)expectedStates {
  152. // Drain queue
  153. dispatch_sync(_testQueue, ^{
  154. });
  155. XCTAssertEqualObjects(_delegate.states, expectedStates);
  156. [_delegate.states removeAllObjects];
  157. }
  158. /** Verifies that the watch stream does not issue an onClose callback after a call to stop(). */
  159. - (void)testWatchStreamStopBeforeHandshake {
  160. FSTWatchStream *watchStream = [self setUpWatchStream];
  161. [_delegate awaitNotificationFromBlock:^{
  162. [watchStream startWithDelegate:_delegate];
  163. }];
  164. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  165. // attempt to restart the stream in the event it had pending watches.
  166. [_workerDispatchQueue dispatchAsync:^{
  167. [watchStream stop];
  168. }];
  169. // Simulate a final callback from GRPC
  170. [_workerDispatchQueue dispatchAsync:^{
  171. [watchStream.callbackFilter writesFinishedWithError:nil];
  172. }];
  173. [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
  174. }
  175. /** Verifies that the write stream does not issue an onClose callback after a call to stop(). */
  176. - (void)testWriteStreamStopBeforeHandshake {
  177. FSTWriteStream *writeStream = [self setUpWriteStream];
  178. [_delegate awaitNotificationFromBlock:^{
  179. [writeStream startWithDelegate:_delegate];
  180. }];
  181. // Don't start the handshake.
  182. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  183. // attempt to restart the stream in the event it had pending watches.
  184. [_workerDispatchQueue dispatchAsync:^{
  185. [writeStream stop];
  186. }];
  187. // Simulate a final callback from GRPC
  188. [_workerDispatchQueue dispatchAsync:^{
  189. [writeStream.callbackFilter writesFinishedWithError:nil];
  190. }];
  191. [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
  192. }
  193. - (void)testWriteStreamStopAfterHandshake {
  194. FSTWriteStream *writeStream = [self setUpWriteStream];
  195. [_delegate awaitNotificationFromBlock:^{
  196. [writeStream startWithDelegate:_delegate];
  197. }];
  198. // Writing before the handshake should throw
  199. [_workerDispatchQueue dispatchSync:^{
  200. XCTAssertThrows([writeStream writeMutations:_mutations]);
  201. }];
  202. [_delegate awaitNotificationFromBlock:^{
  203. [writeStream writeHandshake];
  204. }];
  205. // Now writes should succeed
  206. [_delegate awaitNotificationFromBlock:^{
  207. [writeStream writeMutations:_mutations];
  208. }];
  209. [_workerDispatchQueue dispatchAsync:^{
  210. [writeStream stop];
  211. }];
  212. [self verifyDelegateObservedStates:@[
  213. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  214. @"writeStreamDidReceiveResponseWithVersion"
  215. ]];
  216. }
  217. - (void)testStreamClosesWhenIdle {
  218. FSTWriteStream *writeStream = [self setUpWriteStream];
  219. [_delegate awaitNotificationFromBlock:^{
  220. [writeStream startWithDelegate:_delegate];
  221. }];
  222. [_delegate awaitNotificationFromBlock:^{
  223. [writeStream writeHandshake];
  224. }];
  225. [_workerDispatchQueue dispatchAsync:^{
  226. [writeStream markIdle];
  227. XCTAssertTrue(
  228. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  229. }];
  230. [_workerDispatchQueue runDelayedCallbacksUntil:FSTTimerIDWriteStreamIdle];
  231. [_workerDispatchQueue dispatchSync:^{
  232. XCTAssertFalse([writeStream isOpen]);
  233. }];
  234. [self verifyDelegateObservedStates:@[
  235. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted"
  236. ]];
  237. }
  238. - (void)testStreamCancelsIdleOnWrite {
  239. FSTWriteStream *writeStream = [self setUpWriteStream];
  240. [_delegate awaitNotificationFromBlock:^{
  241. [writeStream startWithDelegate:_delegate];
  242. }];
  243. [_delegate awaitNotificationFromBlock:^{
  244. [writeStream writeHandshake];
  245. }];
  246. // Mark the stream idle, but immediately cancel the idle timer by issuing another write.
  247. [_delegate awaitNotificationFromBlock:^{
  248. [writeStream markIdle];
  249. XCTAssertTrue(
  250. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  251. [writeStream writeMutations:_mutations];
  252. XCTAssertFalse(
  253. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  254. }];
  255. [_workerDispatchQueue dispatchSync:^{
  256. XCTAssertTrue([writeStream isOpen]);
  257. }];
  258. [self verifyDelegateObservedStates:@[
  259. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  260. @"writeStreamDidReceiveResponseWithVersion"
  261. ]];
  262. }
  263. class MockCredentialsProvider : public firebase::firestore::auth::EmptyCredentialsProvider {
  264. public:
  265. MockCredentialsProvider() {
  266. observed_states_ = [NSMutableArray new];
  267. }
  268. void GetToken(firebase::firestore::auth::TokenListener completion) override {
  269. [observed_states_ addObject:@"GetToken"];
  270. EmptyCredentialsProvider::GetToken(std::move(completion));
  271. }
  272. void InvalidateToken() override {
  273. [observed_states_ addObject:@"InvalidateToken"];
  274. EmptyCredentialsProvider::InvalidateToken();
  275. }
  276. NSMutableArray<NSString *> *observed_states() const {
  277. return observed_states_;
  278. }
  279. private:
  280. NSMutableArray<NSString *> *observed_states_;
  281. };
  282. - (void)testStreamRefreshesTokenUponExpiration {
  283. MockCredentialsProvider credentials;
  284. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  285. workerDispatchQueue:_workerDispatchQueue
  286. credentials:&credentials];
  287. FSTWatchStream *watchStream = [datastore createWatchStream];
  288. [_delegate awaitNotificationFromBlock:^{
  289. [watchStream startWithDelegate:_delegate];
  290. }];
  291. // Simulate callback from GRPC with an unauthenticated error -- this should invalidate the token.
  292. NSError *unauthenticatedError = [NSError errorWithDomain:FIRFirestoreErrorDomain
  293. code:FIRFirestoreErrorCodeUnauthenticated
  294. userInfo:nil];
  295. dispatch_async(_testQueue, ^{
  296. [watchStream.callbackFilter writesFinishedWithError:unauthenticatedError];
  297. });
  298. // Drain the queue.
  299. dispatch_sync(_testQueue, ^{
  300. });
  301. // Try reconnecting.
  302. [_delegate awaitNotificationFromBlock:^{
  303. [watchStream startWithDelegate:_delegate];
  304. }];
  305. // Simulate a different error -- token should not be invalidated this time.
  306. NSError *unavailableError = [NSError errorWithDomain:FIRFirestoreErrorDomain
  307. code:FIRFirestoreErrorCodeUnavailable
  308. userInfo:nil];
  309. dispatch_async(_testQueue, ^{
  310. [watchStream.callbackFilter writesFinishedWithError:unavailableError];
  311. });
  312. dispatch_sync(_testQueue, ^{
  313. });
  314. [_delegate awaitNotificationFromBlock:^{
  315. [watchStream startWithDelegate:_delegate];
  316. }];
  317. dispatch_sync(_testQueue, ^{
  318. });
  319. NSArray<NSString *> *expected = @[ @"GetToken", @"InvalidateToken", @"GetToken", @"GetToken" ];
  320. XCTAssertEqualObjects(credentials.observed_states(), expected);
  321. }
  322. @end