FSTStreamTests.mm 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <XCTest/XCTest.h>
  17. #import <GRPCClient/GRPCCall.h>
  18. #import <FirebaseFirestore/FIRFirestoreSettings.h>
  19. #import "Firestore/Example/Tests/Util/FSTHelpers.h"
  20. #import "Firestore/Example/Tests/Util/FSTIntegrationTestCase.h"
  21. #import "Firestore/Source/Auth/FSTEmptyCredentialsProvider.h"
  22. #import "Firestore/Source/Remote/FSTDatastore.h"
  23. #import "Firestore/Source/Remote/FSTStream.h"
  24. #import "Firestore/Source/Util/FSTAssert.h"
  25. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  26. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  27. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  28. namespace util = firebase::firestore::util;
  29. using firebase::firestore::core::DatabaseInfo;
  30. using firebase::firestore::model::DatabaseId;
  31. /** Exposes otherwise private methods for testing. */
  32. @interface FSTStream (Testing)
  33. @property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
  34. @end
  35. /**
  36. * Implements FSTWatchStreamDelegate and FSTWriteStreamDelegate and supports waiting on callbacks
  37. * via `fulfillOnCallback`.
  38. */
  39. @interface FSTStreamStatusDelegate : NSObject <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
  40. - (instancetype)initWithTestCase:(XCTestCase *)testCase
  41. queue:(FSTDispatchQueue *)dispatchQueue NS_DESIGNATED_INITIALIZER;
  42. - (instancetype)init NS_UNAVAILABLE;
  43. @property(nonatomic, weak, readonly) XCTestCase *testCase;
  44. @property(nonatomic, strong, readonly) FSTDispatchQueue *dispatchQueue;
  45. @property(nonatomic, readonly) NSMutableArray<NSString *> *states;
  46. @property(nonatomic, strong) XCTestExpectation *expectation;
  47. @end
  48. @implementation FSTStreamStatusDelegate
  49. - (instancetype)initWithTestCase:(XCTestCase *)testCase queue:(FSTDispatchQueue *)dispatchQueue {
  50. if (self = [super init]) {
  51. _testCase = testCase;
  52. _dispatchQueue = dispatchQueue;
  53. _states = [NSMutableArray new];
  54. }
  55. return self;
  56. }
  57. - (void)watchStreamDidOpen {
  58. [_states addObject:@"watchStreamDidOpen"];
  59. [_expectation fulfill];
  60. _expectation = nil;
  61. }
  62. - (void)writeStreamDidOpen {
  63. [_states addObject:@"writeStreamDidOpen"];
  64. [_expectation fulfill];
  65. _expectation = nil;
  66. }
  67. - (void)writeStreamDidCompleteHandshake {
  68. [_states addObject:@"writeStreamDidCompleteHandshake"];
  69. [_expectation fulfill];
  70. _expectation = nil;
  71. }
  72. - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
  73. [_states addObject:@"writeStreamWasInterrupted"];
  74. [_expectation fulfill];
  75. _expectation = nil;
  76. }
  77. - (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
  78. [_states addObject:@"watchStreamWasInterrupted"];
  79. [_expectation fulfill];
  80. _expectation = nil;
  81. }
  82. - (void)watchStreamDidChange:(FSTWatchChange *)change
  83. snapshotVersion:(FSTSnapshotVersion *)snapshotVersion {
  84. [_states addObject:@"watchStreamDidChange"];
  85. [_expectation fulfill];
  86. _expectation = nil;
  87. }
  88. - (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
  89. mutationResults:(NSArray<FSTMutationResult *> *)results {
  90. [_states addObject:@"writeStreamDidReceiveResponseWithVersion"];
  91. [_expectation fulfill];
  92. _expectation = nil;
  93. }
  94. /**
  95. * Executes 'block' using the provided FSTDispatchQueue and waits for any callback on this delegate
  96. * to be called.
  97. */
  98. - (void)awaitNotificationFromBlock:(void (^)(void))block {
  99. FSTAssert(_expectation == nil, @"Previous expectation still active");
  100. XCTestExpectation *expectation =
  101. [self.testCase expectationWithDescription:@"awaitCallbackInBlock"];
  102. _expectation = expectation;
  103. [self.dispatchQueue dispatchAsync:block];
  104. [self.testCase awaitExpectations];
  105. }
  106. @end
  107. @interface FSTStreamTests : XCTestCase
  108. @end
  109. @implementation FSTStreamTests {
  110. dispatch_queue_t _testQueue;
  111. FSTDispatchQueue *_workerDispatchQueue;
  112. DatabaseInfo _databaseInfo;
  113. FSTEmptyCredentialsProvider *_credentials;
  114. FSTStreamStatusDelegate *_delegate;
  115. /** Single mutation to send to the write stream. */
  116. NSArray<FSTMutation *> *_mutations;
  117. }
  118. - (void)setUp {
  119. [super setUp];
  120. FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings];
  121. DatabaseId database_id(util::MakeStringView([FSTIntegrationTestCase projectID]),
  122. DatabaseId::kDefault);
  123. _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL);
  124. _workerDispatchQueue = [[FSTDispatchQueue alloc] initWithQueue:_testQueue];
  125. _databaseInfo = DatabaseInfo(database_id, "test-key", util::MakeStringView(settings.host),
  126. settings.sslEnabled);
  127. _credentials = [[FSTEmptyCredentialsProvider alloc] init];
  128. _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
  129. _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ];
  130. }
  131. - (FSTWriteStream *)setUpWriteStream {
  132. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  133. workerDispatchQueue:_workerDispatchQueue
  134. credentials:_credentials];
  135. return [datastore createWriteStream];
  136. }
  137. - (FSTWatchStream *)setUpWatchStream {
  138. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  139. workerDispatchQueue:_workerDispatchQueue
  140. credentials:_credentials];
  141. return [datastore createWatchStream];
  142. }
  143. /**
  144. * Drains the test queue and asserts that all the observed callbacks (up to this point) match
  145. * 'expectedStates'. Clears the list of observed callbacks on completion.
  146. */
  147. - (void)verifyDelegateObservedStates:(NSArray<NSString *> *)expectedStates {
  148. // Drain queue
  149. dispatch_sync(_testQueue, ^{
  150. });
  151. XCTAssertEqualObjects(_delegate.states, expectedStates);
  152. [_delegate.states removeAllObjects];
  153. }
  154. /** Verifies that the watch stream does not issue an onClose callback after a call to stop(). */
  155. - (void)testWatchStreamStopBeforeHandshake {
  156. FSTWatchStream *watchStream = [self setUpWatchStream];
  157. [_delegate awaitNotificationFromBlock:^{
  158. [watchStream startWithDelegate:_delegate];
  159. }];
  160. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  161. // attempt to restart the stream in the event it had pending watches.
  162. [_workerDispatchQueue dispatchAsync:^{
  163. [watchStream stop];
  164. }];
  165. // Simulate a final callback from GRPC
  166. [_workerDispatchQueue dispatchAsync:^{
  167. [watchStream.callbackFilter writesFinishedWithError:nil];
  168. }];
  169. [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
  170. }
  171. /** Verifies that the write stream does not issue an onClose callback after a call to stop(). */
  172. - (void)testWriteStreamStopBeforeHandshake {
  173. FSTWriteStream *writeStream = [self setUpWriteStream];
  174. [_delegate awaitNotificationFromBlock:^{
  175. [writeStream startWithDelegate:_delegate];
  176. }];
  177. // Don't start the handshake.
  178. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  179. // attempt to restart the stream in the event it had pending watches.
  180. [_workerDispatchQueue dispatchAsync:^{
  181. [writeStream stop];
  182. }];
  183. // Simulate a final callback from GRPC
  184. [_workerDispatchQueue dispatchAsync:^{
  185. [writeStream.callbackFilter writesFinishedWithError:nil];
  186. }];
  187. [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
  188. }
  189. - (void)testWriteStreamStopAfterHandshake {
  190. FSTWriteStream *writeStream = [self setUpWriteStream];
  191. [_delegate awaitNotificationFromBlock:^{
  192. [writeStream startWithDelegate:_delegate];
  193. }];
  194. // Writing before the handshake should throw
  195. dispatch_sync(_testQueue, ^{
  196. XCTAssertThrows([writeStream writeMutations:_mutations]);
  197. });
  198. [_delegate awaitNotificationFromBlock:^{
  199. [writeStream writeHandshake];
  200. }];
  201. // Now writes should succeed
  202. [_delegate awaitNotificationFromBlock:^{
  203. [writeStream writeMutations:_mutations];
  204. }];
  205. [_workerDispatchQueue dispatchAsync:^{
  206. [writeStream stop];
  207. }];
  208. [self verifyDelegateObservedStates:@[
  209. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  210. @"writeStreamDidReceiveResponseWithVersion"
  211. ]];
  212. }
  213. - (void)testStreamClosesWhenIdle {
  214. FSTWriteStream *writeStream = [self setUpWriteStream];
  215. [_delegate awaitNotificationFromBlock:^{
  216. [writeStream startWithDelegate:_delegate];
  217. }];
  218. [_delegate awaitNotificationFromBlock:^{
  219. [writeStream writeHandshake];
  220. }];
  221. [_workerDispatchQueue dispatchAsync:^{
  222. [writeStream markIdle];
  223. XCTAssertTrue(
  224. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  225. }];
  226. [_workerDispatchQueue runDelayedCallbacksUntil:FSTTimerIDWriteStreamIdle];
  227. dispatch_sync(_testQueue, ^{
  228. XCTAssertFalse([writeStream isOpen]);
  229. });
  230. [self verifyDelegateObservedStates:@[
  231. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted"
  232. ]];
  233. }
  234. - (void)testStreamCancelsIdleOnWrite {
  235. FSTWriteStream *writeStream = [self setUpWriteStream];
  236. [_delegate awaitNotificationFromBlock:^{
  237. [writeStream startWithDelegate:_delegate];
  238. }];
  239. [_delegate awaitNotificationFromBlock:^{
  240. [writeStream writeHandshake];
  241. }];
  242. // Mark the stream idle, but immediately cancel the idle timer by issuing another write.
  243. [_delegate awaitNotificationFromBlock:^{
  244. [writeStream markIdle];
  245. XCTAssertTrue(
  246. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  247. [writeStream writeMutations:_mutations];
  248. XCTAssertFalse(
  249. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  250. }];
  251. dispatch_sync(_testQueue, ^{
  252. XCTAssertTrue([writeStream isOpen]);
  253. });
  254. [self verifyDelegateObservedStates:@[
  255. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  256. @"writeStreamDidReceiveResponseWithVersion"
  257. ]];
  258. }
  259. @end