FSTStreamTests.m 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <XCTest/XCTest.h>
  17. #import <FirebaseFirestore/FIRFirestoreSettings.h>
  18. #import "Firestore/Example/Tests/Util/FSTHelpers.h"
  19. #import "Firestore/Example/Tests/Util/FSTIntegrationTestCase.h"
  20. #import "Firestore/Example/Tests/Util/FSTTestDispatchQueue.h"
  21. #import "Firestore/Source/Auth/FSTEmptyCredentialsProvider.h"
  22. #import "Firestore/Source/Core/FSTDatabaseInfo.h"
  23. #import "Firestore/Source/Model/FSTDatabaseID.h"
  24. #import "Firestore/Source/Remote/FSTDatastore.h"
  25. #import "Firestore/Source/Remote/FSTStream.h"
  26. #import "Firestore/Source/Util/FSTAssert.h"
  27. /** Exposes otherwise private methods for testing. */
  28. @interface FSTStream (Testing)
  29. - (void)writesFinishedWithError:(NSError *_Nullable)error;
  30. @end
  31. /**
  32. * Implements FSTWatchStreamDelegate and FSTWriteStreamDelegate and supports waiting on callbacks
  33. * via `fulfillOnCallback`.
  34. */
  35. @interface FSTStreamStatusDelegate : NSObject <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
  36. - (instancetype)initWithTestCase:(XCTestCase *)testCase
  37. queue:(FSTDispatchQueue *)dispatchQueue NS_DESIGNATED_INITIALIZER;
  38. - (instancetype)init NS_UNAVAILABLE;
  39. @property(nonatomic, weak, readonly) XCTestCase *testCase;
  40. @property(nonatomic, strong, readonly) FSTDispatchQueue *dispatchQueue;
  41. @property(nonatomic, readonly) NSMutableArray<NSString *> *states;
  42. @property(nonatomic, strong) XCTestExpectation *expectation;
  43. @end
  44. @implementation FSTStreamStatusDelegate
  45. - (instancetype)initWithTestCase:(XCTestCase *)testCase queue:(FSTDispatchQueue *)dispatchQueue {
  46. if (self = [super init]) {
  47. _testCase = testCase;
  48. _dispatchQueue = dispatchQueue;
  49. _states = [NSMutableArray new];
  50. }
  51. return self;
  52. }
  53. - (void)watchStreamDidOpen {
  54. [_states addObject:@"watchStreamDidOpen"];
  55. [_expectation fulfill];
  56. _expectation = nil;
  57. }
  58. - (void)writeStreamDidOpen {
  59. [_states addObject:@"writeStreamDidOpen"];
  60. [_expectation fulfill];
  61. _expectation = nil;
  62. }
  63. - (void)writeStreamDidCompleteHandshake {
  64. [_states addObject:@"writeStreamDidCompleteHandshake"];
  65. [_expectation fulfill];
  66. _expectation = nil;
  67. }
  68. - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
  69. [_states addObject:@"writeStreamWasInterrupted"];
  70. [_expectation fulfill];
  71. _expectation = nil;
  72. }
  73. - (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
  74. [_states addObject:@"watchStreamWasInterrupted"];
  75. [_expectation fulfill];
  76. _expectation = nil;
  77. }
  78. - (void)watchStreamDidChange:(FSTWatchChange *)change
  79. snapshotVersion:(FSTSnapshotVersion *)snapshotVersion {
  80. [_states addObject:@"watchStreamDidChange"];
  81. [_expectation fulfill];
  82. _expectation = nil;
  83. }
  84. - (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
  85. mutationResults:(NSArray<FSTMutationResult *> *)results {
  86. [_states addObject:@"writeStreamDidReceiveResponseWithVersion"];
  87. [_expectation fulfill];
  88. _expectation = nil;
  89. }
  90. /**
  91. * Executes 'block' using the provided FSTDispatchQueue and waits for any callback on this delegate
  92. * to be called.
  93. */
  94. - (void)awaitNotificationFromBlock:(void (^)(void))block {
  95. FSTAssert(_expectation == nil, @"Previous expectation still active");
  96. XCTestExpectation *expectation =
  97. [self.testCase expectationWithDescription:@"awaitCallbackInBlock"];
  98. _expectation = expectation;
  99. [self.dispatchQueue dispatchAsync:block];
  100. [self.testCase awaitExpectations];
  101. }
  102. @end
  103. @interface FSTStreamTests : XCTestCase
  104. @end
  105. @implementation FSTStreamTests {
  106. dispatch_queue_t _testQueue;
  107. FSTTestDispatchQueue *_workerDispatchQueue;
  108. FSTDatabaseInfo *_databaseInfo;
  109. FSTEmptyCredentialsProvider *_credentials;
  110. FSTStreamStatusDelegate *_delegate;
  111. /** Single mutation to send to the write stream. */
  112. NSArray<FSTMutation *> *_mutations;
  113. }
  114. - (void)setUp {
  115. [super setUp];
  116. FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings];
  117. FSTDatabaseID *databaseID =
  118. [FSTDatabaseID databaseIDWithProject:[FSTIntegrationTestCase projectID]
  119. database:kDefaultDatabaseID];
  120. _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL);
  121. _workerDispatchQueue = [[FSTTestDispatchQueue alloc] initWithQueue:_testQueue];
  122. _databaseInfo = [FSTDatabaseInfo databaseInfoWithDatabaseID:databaseID
  123. persistenceKey:@"test-key"
  124. host:settings.host
  125. sslEnabled:settings.sslEnabled];
  126. _credentials = [[FSTEmptyCredentialsProvider alloc] init];
  127. _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
  128. _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ];
  129. }
  130. - (FSTWriteStream *)setUpWriteStream {
  131. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:_databaseInfo
  132. workerDispatchQueue:_workerDispatchQueue
  133. credentials:_credentials];
  134. return [datastore createWriteStream];
  135. }
  136. - (FSTWatchStream *)setUpWatchStream {
  137. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:_databaseInfo
  138. workerDispatchQueue:_workerDispatchQueue
  139. credentials:_credentials];
  140. return [datastore createWatchStream];
  141. }
  142. /**
  143. * Drains the test queue and asserts that all the observed callbacks (up to this point) match
  144. * 'expectedStates'. Clears the list of observed callbacks on completion.
  145. */
  146. - (void)verifyDelegateObservedStates:(NSArray<NSString *> *)expectedStates {
  147. // Drain queue
  148. dispatch_sync(_testQueue, ^{
  149. });
  150. XCTAssertEqualObjects(_delegate.states, expectedStates);
  151. [_delegate.states removeAllObjects];
  152. }
  153. /** Verifies that the watch stream does not issue an onClose callback after a call to stop(). */
  154. - (void)testWatchStreamStopBeforeHandshake {
  155. FSTWatchStream *watchStream = [self setUpWatchStream];
  156. [_delegate awaitNotificationFromBlock:^{
  157. [watchStream startWithDelegate:_delegate];
  158. }];
  159. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  160. // attempt to restart the stream in the event it had pending watches.
  161. [_workerDispatchQueue dispatchAsync:^{
  162. [watchStream stop];
  163. }];
  164. // Simulate a final callback from GRPC
  165. [watchStream writesFinishedWithError:nil];
  166. [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
  167. }
  168. /** Verifies that the write stream does not issue an onClose callback after a call to stop(). */
  169. - (void)testWriteStreamStopBeforeHandshake {
  170. FSTWriteStream *writeStream = [self setUpWriteStream];
  171. [_delegate awaitNotificationFromBlock:^{
  172. [writeStream startWithDelegate:_delegate];
  173. }];
  174. // Don't start the handshake.
  175. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  176. // attempt to restart the stream in the event it had pending watches.
  177. [_workerDispatchQueue dispatchAsync:^{
  178. [writeStream stop];
  179. }];
  180. // Simulate a final callback from GRPC
  181. [writeStream writesFinishedWithError:nil];
  182. [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
  183. }
  184. - (void)testWriteStreamStopAfterHandshake {
  185. FSTWriteStream *writeStream = [self setUpWriteStream];
  186. [_delegate awaitNotificationFromBlock:^{
  187. [writeStream startWithDelegate:_delegate];
  188. }];
  189. // Writing before the handshake should throw
  190. dispatch_sync(_testQueue, ^{
  191. XCTAssertThrows([writeStream writeMutations:_mutations]);
  192. });
  193. [_delegate awaitNotificationFromBlock:^{
  194. [writeStream writeHandshake];
  195. }];
  196. // Now writes should succeed
  197. [_delegate awaitNotificationFromBlock:^{
  198. [writeStream writeMutations:_mutations];
  199. }];
  200. [_workerDispatchQueue dispatchAsync:^{
  201. [writeStream stop];
  202. }];
  203. [self verifyDelegateObservedStates:@[
  204. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  205. @"writeStreamDidReceiveResponseWithVersion"
  206. ]];
  207. }
  208. - (void)testStreamClosesWhenIdle {
  209. FSTWriteStream *writeStream = [self setUpWriteStream];
  210. [_delegate awaitNotificationFromBlock:^{
  211. [writeStream startWithDelegate:_delegate];
  212. }];
  213. [_delegate awaitNotificationFromBlock:^{
  214. [writeStream writeHandshake];
  215. }];
  216. [_delegate awaitNotificationFromBlock:^{
  217. [writeStream markIdle];
  218. }];
  219. dispatch_sync(_testQueue, ^{
  220. XCTAssertFalse([writeStream isOpen]);
  221. });
  222. [self verifyDelegateObservedStates:@[
  223. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted"
  224. ]];
  225. }
  226. - (void)testStreamCancelsIdleOnWrite {
  227. FSTWriteStream *writeStream = [self setUpWriteStream];
  228. [_delegate awaitNotificationFromBlock:^{
  229. [writeStream startWithDelegate:_delegate];
  230. }];
  231. [_delegate awaitNotificationFromBlock:^{
  232. [writeStream writeHandshake];
  233. }];
  234. // Mark the stream idle, but immediately cancel the idle timer by issuing another write.
  235. [_delegate awaitNotificationFromBlock:^{
  236. [writeStream markIdle];
  237. [writeStream writeMutations:_mutations];
  238. }];
  239. dispatch_sync(_testQueue, ^{
  240. XCTAssertTrue([writeStream isOpen]);
  241. });
  242. [self verifyDelegateObservedStates:@[
  243. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  244. @"writeStreamDidReceiveResponseWithVersion"
  245. ]];
  246. }
  247. @end