FSTStreamTests.mm 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <XCTest/XCTest.h>
  17. #import <GRPCClient/GRPCCall.h>
  18. #import <FirebaseFirestore/FIRFirestoreSettings.h>
  19. #import "Firestore/Example/Tests/Util/FSTHelpers.h"
  20. #import "Firestore/Example/Tests/Util/FSTIntegrationTestCase.h"
  21. #import "Firestore/Source/Remote/FSTDatastore.h"
  22. #import "Firestore/Source/Remote/FSTStream.h"
  23. #include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h"
  24. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  25. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  26. #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
  27. #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
  28. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  29. namespace util = firebase::firestore::util;
  30. using firebase::firestore::auth::EmptyCredentialsProvider;
  31. using firebase::firestore::core::DatabaseInfo;
  32. using firebase::firestore::model::DatabaseId;
  33. using firebase::firestore::model::SnapshotVersion;
  34. /** Exposes otherwise private methods for testing. */
  35. @interface FSTStream (Testing)
  36. @property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
  37. @end
  38. /**
  39. * Implements FSTWatchStreamDelegate and FSTWriteStreamDelegate and supports waiting on callbacks
  40. * via `fulfillOnCallback`.
  41. */
  42. @interface FSTStreamStatusDelegate : NSObject <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
  43. - (instancetype)initWithTestCase:(XCTestCase *)testCase
  44. queue:(FSTDispatchQueue *)dispatchQueue NS_DESIGNATED_INITIALIZER;
  45. - (instancetype)init NS_UNAVAILABLE;
  46. @property(nonatomic, weak, readonly) XCTestCase *testCase;
  47. @property(nonatomic, strong, readonly) FSTDispatchQueue *dispatchQueue;
  48. @property(nonatomic, readonly) NSMutableArray<NSString *> *states;
  49. @property(nonatomic, strong) XCTestExpectation *expectation;
  50. @end
  51. @implementation FSTStreamStatusDelegate
  52. - (instancetype)initWithTestCase:(XCTestCase *)testCase queue:(FSTDispatchQueue *)dispatchQueue {
  53. if (self = [super init]) {
  54. _testCase = testCase;
  55. _dispatchQueue = dispatchQueue;
  56. _states = [NSMutableArray new];
  57. }
  58. return self;
  59. }
  60. - (void)watchStreamDidOpen {
  61. [_states addObject:@"watchStreamDidOpen"];
  62. [_expectation fulfill];
  63. _expectation = nil;
  64. }
  65. - (void)writeStreamDidOpen {
  66. [_states addObject:@"writeStreamDidOpen"];
  67. [_expectation fulfill];
  68. _expectation = nil;
  69. }
  70. - (void)writeStreamDidCompleteHandshake {
  71. [_states addObject:@"writeStreamDidCompleteHandshake"];
  72. [_expectation fulfill];
  73. _expectation = nil;
  74. }
  75. - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
  76. [_states addObject:@"writeStreamWasInterrupted"];
  77. [_expectation fulfill];
  78. _expectation = nil;
  79. }
  80. - (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
  81. [_states addObject:@"watchStreamWasInterrupted"];
  82. [_expectation fulfill];
  83. _expectation = nil;
  84. }
  85. - (void)watchStreamDidChange:(FSTWatchChange *)change
  86. snapshotVersion:(const SnapshotVersion &)snapshotVersion {
  87. [_states addObject:@"watchStreamDidChange"];
  88. [_expectation fulfill];
  89. _expectation = nil;
  90. }
  91. - (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion
  92. mutationResults:(NSArray<FSTMutationResult *> *)results {
  93. [_states addObject:@"writeStreamDidReceiveResponseWithVersion"];
  94. [_expectation fulfill];
  95. _expectation = nil;
  96. }
  97. /**
  98. * Executes 'block' using the provided FSTDispatchQueue and waits for any callback on this delegate
  99. * to be called.
  100. */
  101. - (void)awaitNotificationFromBlock:(void (^)(void))block {
  102. HARD_ASSERT(_expectation == nil, "Previous expectation still active");
  103. XCTestExpectation *expectation =
  104. [self.testCase expectationWithDescription:@"awaitCallbackInBlock"];
  105. _expectation = expectation;
  106. [self.dispatchQueue dispatchAsync:block];
  107. [self.testCase awaitExpectations];
  108. }
  109. @end
  110. @interface FSTStreamTests : XCTestCase
  111. @end
  112. @implementation FSTStreamTests {
  113. dispatch_queue_t _testQueue;
  114. FSTDispatchQueue *_workerDispatchQueue;
  115. DatabaseInfo _databaseInfo;
  116. EmptyCredentialsProvider _credentials;
  117. FSTStreamStatusDelegate *_delegate;
  118. /** Single mutation to send to the write stream. */
  119. NSArray<FSTMutation *> *_mutations;
  120. }
  121. - (void)setUp {
  122. [super setUp];
  123. FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings];
  124. DatabaseId database_id(util::MakeStringView([FSTIntegrationTestCase projectID]),
  125. DatabaseId::kDefault);
  126. _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL);
  127. _workerDispatchQueue = [[FSTDispatchQueue alloc] initWithQueue:_testQueue];
  128. _databaseInfo = DatabaseInfo(database_id, "test-key", util::MakeStringView(settings.host),
  129. settings.sslEnabled);
  130. _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
  131. _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ];
  132. }
  133. - (FSTWriteStream *)setUpWriteStream {
  134. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  135. workerDispatchQueue:_workerDispatchQueue
  136. credentials:&_credentials];
  137. return [datastore createWriteStream];
  138. }
  139. - (FSTWatchStream *)setUpWatchStream {
  140. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  141. workerDispatchQueue:_workerDispatchQueue
  142. credentials:&_credentials];
  143. return [datastore createWatchStream];
  144. }
  145. /**
  146. * Drains the test queue and asserts that all the observed callbacks (up to this point) match
  147. * 'expectedStates'. Clears the list of observed callbacks on completion.
  148. */
  149. - (void)verifyDelegateObservedStates:(NSArray<NSString *> *)expectedStates {
  150. // Drain queue
  151. dispatch_sync(_testQueue, ^{
  152. });
  153. XCTAssertEqualObjects(_delegate.states, expectedStates);
  154. [_delegate.states removeAllObjects];
  155. }
  156. /** Verifies that the watch stream does not issue an onClose callback after a call to stop(). */
  157. - (void)testWatchStreamStopBeforeHandshake {
  158. FSTWatchStream *watchStream = [self setUpWatchStream];
  159. [_delegate awaitNotificationFromBlock:^{
  160. [watchStream startWithDelegate:_delegate];
  161. }];
  162. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  163. // attempt to restart the stream in the event it had pending watches.
  164. [_workerDispatchQueue dispatchAsync:^{
  165. [watchStream stop];
  166. }];
  167. // Simulate a final callback from GRPC
  168. [_workerDispatchQueue dispatchAsync:^{
  169. [watchStream.callbackFilter writesFinishedWithError:nil];
  170. }];
  171. [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
  172. }
  173. /** Verifies that the write stream does not issue an onClose callback after a call to stop(). */
  174. - (void)testWriteStreamStopBeforeHandshake {
  175. FSTWriteStream *writeStream = [self setUpWriteStream];
  176. [_delegate awaitNotificationFromBlock:^{
  177. [writeStream startWithDelegate:_delegate];
  178. }];
  179. // Don't start the handshake.
  180. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  181. // attempt to restart the stream in the event it had pending watches.
  182. [_workerDispatchQueue dispatchAsync:^{
  183. [writeStream stop];
  184. }];
  185. // Simulate a final callback from GRPC
  186. [_workerDispatchQueue dispatchAsync:^{
  187. [writeStream.callbackFilter writesFinishedWithError:nil];
  188. }];
  189. [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
  190. }
  191. - (void)testWriteStreamStopAfterHandshake {
  192. FSTWriteStream *writeStream = [self setUpWriteStream];
  193. [_delegate awaitNotificationFromBlock:^{
  194. [writeStream startWithDelegate:_delegate];
  195. }];
  196. // Writing before the handshake should throw
  197. [_workerDispatchQueue dispatchSync:^{
  198. XCTAssertThrows([writeStream writeMutations:_mutations]);
  199. }];
  200. [_delegate awaitNotificationFromBlock:^{
  201. [writeStream writeHandshake];
  202. }];
  203. // Now writes should succeed
  204. [_delegate awaitNotificationFromBlock:^{
  205. [writeStream writeMutations:_mutations];
  206. }];
  207. [_workerDispatchQueue dispatchAsync:^{
  208. [writeStream stop];
  209. }];
  210. [self verifyDelegateObservedStates:@[
  211. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  212. @"writeStreamDidReceiveResponseWithVersion"
  213. ]];
  214. }
  215. - (void)testStreamClosesWhenIdle {
  216. FSTWriteStream *writeStream = [self setUpWriteStream];
  217. [_delegate awaitNotificationFromBlock:^{
  218. [writeStream startWithDelegate:_delegate];
  219. }];
  220. [_delegate awaitNotificationFromBlock:^{
  221. [writeStream writeHandshake];
  222. }];
  223. [_workerDispatchQueue dispatchAsync:^{
  224. [writeStream markIdle];
  225. XCTAssertTrue(
  226. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  227. }];
  228. [_workerDispatchQueue runDelayedCallbacksUntil:FSTTimerIDWriteStreamIdle];
  229. [_workerDispatchQueue dispatchSync:^{
  230. XCTAssertFalse([writeStream isOpen]);
  231. }];
  232. [self verifyDelegateObservedStates:@[
  233. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted"
  234. ]];
  235. }
  236. - (void)testStreamCancelsIdleOnWrite {
  237. FSTWriteStream *writeStream = [self setUpWriteStream];
  238. [_delegate awaitNotificationFromBlock:^{
  239. [writeStream startWithDelegate:_delegate];
  240. }];
  241. [_delegate awaitNotificationFromBlock:^{
  242. [writeStream writeHandshake];
  243. }];
  244. // Mark the stream idle, but immediately cancel the idle timer by issuing another write.
  245. [_delegate awaitNotificationFromBlock:^{
  246. [writeStream markIdle];
  247. XCTAssertTrue(
  248. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  249. [writeStream writeMutations:_mutations];
  250. XCTAssertFalse(
  251. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  252. }];
  253. [_workerDispatchQueue dispatchSync:^{
  254. XCTAssertTrue([writeStream isOpen]);
  255. }];
  256. [self verifyDelegateObservedStates:@[
  257. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  258. @"writeStreamDidReceiveResponseWithVersion"
  259. ]];
  260. }
  261. @end