FSTStreamTests.mm 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <XCTest/XCTest.h>
  17. #import <GRPCClient/GRPCCall.h>
  18. #import <FirebaseFirestore/FIRFirestoreSettings.h>
  19. #import "Firestore/Example/Tests/Util/FSTHelpers.h"
  20. #import "Firestore/Example/Tests/Util/FSTIntegrationTestCase.h"
  21. #import "Firestore/Source/Remote/FSTDatastore.h"
  22. #import "Firestore/Source/Remote/FSTStream.h"
  23. #import "Firestore/Source/Util/FSTAssert.h"
  24. #include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h"
  25. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  26. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  27. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  28. namespace util = firebase::firestore::util;
  29. using firebase::firestore::auth::EmptyCredentialsProvider;
  30. using firebase::firestore::core::DatabaseInfo;
  31. using firebase::firestore::model::DatabaseId;
  32. /** Exposes otherwise private methods for testing. */
  33. @interface FSTStream (Testing)
  34. @property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
  35. @end
  36. /**
  37. * Implements FSTWatchStreamDelegate and FSTWriteStreamDelegate and supports waiting on callbacks
  38. * via `fulfillOnCallback`.
  39. */
  40. @interface FSTStreamStatusDelegate : NSObject <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
  41. - (instancetype)initWithTestCase:(XCTestCase *)testCase
  42. queue:(FSTDispatchQueue *)dispatchQueue NS_DESIGNATED_INITIALIZER;
  43. - (instancetype)init NS_UNAVAILABLE;
  44. @property(nonatomic, weak, readonly) XCTestCase *testCase;
  45. @property(nonatomic, strong, readonly) FSTDispatchQueue *dispatchQueue;
  46. @property(nonatomic, readonly) NSMutableArray<NSString *> *states;
  47. @property(nonatomic, strong) XCTestExpectation *expectation;
  48. @end
  49. @implementation FSTStreamStatusDelegate
  50. - (instancetype)initWithTestCase:(XCTestCase *)testCase queue:(FSTDispatchQueue *)dispatchQueue {
  51. if (self = [super init]) {
  52. _testCase = testCase;
  53. _dispatchQueue = dispatchQueue;
  54. _states = [NSMutableArray new];
  55. }
  56. return self;
  57. }
  58. - (void)watchStreamDidOpen {
  59. [_states addObject:@"watchStreamDidOpen"];
  60. [_expectation fulfill];
  61. _expectation = nil;
  62. }
  63. - (void)writeStreamDidOpen {
  64. [_states addObject:@"writeStreamDidOpen"];
  65. [_expectation fulfill];
  66. _expectation = nil;
  67. }
  68. - (void)writeStreamDidCompleteHandshake {
  69. [_states addObject:@"writeStreamDidCompleteHandshake"];
  70. [_expectation fulfill];
  71. _expectation = nil;
  72. }
  73. - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
  74. [_states addObject:@"writeStreamWasInterrupted"];
  75. [_expectation fulfill];
  76. _expectation = nil;
  77. }
  78. - (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
  79. [_states addObject:@"watchStreamWasInterrupted"];
  80. [_expectation fulfill];
  81. _expectation = nil;
  82. }
  83. - (void)watchStreamDidChange:(FSTWatchChange *)change
  84. snapshotVersion:(FSTSnapshotVersion *)snapshotVersion {
  85. [_states addObject:@"watchStreamDidChange"];
  86. [_expectation fulfill];
  87. _expectation = nil;
  88. }
  89. - (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
  90. mutationResults:(NSArray<FSTMutationResult *> *)results {
  91. [_states addObject:@"writeStreamDidReceiveResponseWithVersion"];
  92. [_expectation fulfill];
  93. _expectation = nil;
  94. }
  95. /**
  96. * Executes 'block' using the provided FSTDispatchQueue and waits for any callback on this delegate
  97. * to be called.
  98. */
  99. - (void)awaitNotificationFromBlock:(void (^)(void))block {
  100. FSTAssert(_expectation == nil, @"Previous expectation still active");
  101. XCTestExpectation *expectation =
  102. [self.testCase expectationWithDescription:@"awaitCallbackInBlock"];
  103. _expectation = expectation;
  104. [self.dispatchQueue dispatchAsync:block];
  105. [self.testCase awaitExpectations];
  106. }
  107. @end
  108. @interface FSTStreamTests : XCTestCase
  109. @end
  110. @implementation FSTStreamTests {
  111. dispatch_queue_t _testQueue;
  112. FSTDispatchQueue *_workerDispatchQueue;
  113. DatabaseInfo _databaseInfo;
  114. EmptyCredentialsProvider _credentials;
  115. FSTStreamStatusDelegate *_delegate;
  116. /** Single mutation to send to the write stream. */
  117. NSArray<FSTMutation *> *_mutations;
  118. }
  119. - (void)setUp {
  120. [super setUp];
  121. FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings];
  122. DatabaseId database_id(util::MakeStringView([FSTIntegrationTestCase projectID]),
  123. DatabaseId::kDefault);
  124. _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL);
  125. _workerDispatchQueue = [[FSTDispatchQueue alloc] initWithQueue:_testQueue];
  126. _databaseInfo = DatabaseInfo(database_id, "test-key", util::MakeStringView(settings.host),
  127. settings.sslEnabled);
  128. _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
  129. _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ];
  130. }
  131. - (FSTWriteStream *)setUpWriteStream {
  132. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  133. workerDispatchQueue:_workerDispatchQueue
  134. credentials:&_credentials];
  135. return [datastore createWriteStream];
  136. }
  137. - (FSTWatchStream *)setUpWatchStream {
  138. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  139. workerDispatchQueue:_workerDispatchQueue
  140. credentials:&_credentials];
  141. return [datastore createWatchStream];
  142. }
  143. /**
  144. * Drains the test queue and asserts that all the observed callbacks (up to this point) match
  145. * 'expectedStates'. Clears the list of observed callbacks on completion.
  146. */
  147. - (void)verifyDelegateObservedStates:(NSArray<NSString *> *)expectedStates {
  148. // Drain queue
  149. dispatch_sync(_testQueue, ^{
  150. });
  151. XCTAssertEqualObjects(_delegate.states, expectedStates);
  152. [_delegate.states removeAllObjects];
  153. }
  154. /** Verifies that the watch stream does not issue an onClose callback after a call to stop(). */
  155. - (void)testWatchStreamStopBeforeHandshake {
  156. FSTWatchStream *watchStream = [self setUpWatchStream];
  157. [_delegate awaitNotificationFromBlock:^{
  158. [watchStream startWithDelegate:_delegate];
  159. }];
  160. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  161. // attempt to restart the stream in the event it had pending watches.
  162. [_workerDispatchQueue dispatchAsync:^{
  163. [watchStream stop];
  164. }];
  165. // Simulate a final callback from GRPC
  166. [_workerDispatchQueue dispatchAsync:^{
  167. [watchStream.callbackFilter writesFinishedWithError:nil];
  168. }];
  169. [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
  170. }
  171. /** Verifies that the write stream does not issue an onClose callback after a call to stop(). */
  172. - (void)testWriteStreamStopBeforeHandshake {
  173. FSTWriteStream *writeStream = [self setUpWriteStream];
  174. [_delegate awaitNotificationFromBlock:^{
  175. [writeStream startWithDelegate:_delegate];
  176. }];
  177. // Don't start the handshake.
  178. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  179. // attempt to restart the stream in the event it had pending watches.
  180. [_workerDispatchQueue dispatchAsync:^{
  181. [writeStream stop];
  182. }];
  183. // Simulate a final callback from GRPC
  184. [_workerDispatchQueue dispatchAsync:^{
  185. [writeStream.callbackFilter writesFinishedWithError:nil];
  186. }];
  187. [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
  188. }
  189. - (void)testWriteStreamStopAfterHandshake {
  190. FSTWriteStream *writeStream = [self setUpWriteStream];
  191. [_delegate awaitNotificationFromBlock:^{
  192. [writeStream startWithDelegate:_delegate];
  193. }];
  194. // Writing before the handshake should throw
  195. [_workerDispatchQueue dispatchSync:^{
  196. XCTAssertThrows([writeStream writeMutations:_mutations]);
  197. }];
  198. [_delegate awaitNotificationFromBlock:^{
  199. [writeStream writeHandshake];
  200. }];
  201. // Now writes should succeed
  202. [_delegate awaitNotificationFromBlock:^{
  203. [writeStream writeMutations:_mutations];
  204. }];
  205. [_workerDispatchQueue dispatchAsync:^{
  206. [writeStream stop];
  207. }];
  208. [self verifyDelegateObservedStates:@[
  209. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  210. @"writeStreamDidReceiveResponseWithVersion"
  211. ]];
  212. }
  213. - (void)testStreamClosesWhenIdle {
  214. FSTWriteStream *writeStream = [self setUpWriteStream];
  215. [_delegate awaitNotificationFromBlock:^{
  216. [writeStream startWithDelegate:_delegate];
  217. }];
  218. [_delegate awaitNotificationFromBlock:^{
  219. [writeStream writeHandshake];
  220. }];
  221. [_workerDispatchQueue dispatchAsync:^{
  222. [writeStream markIdle];
  223. XCTAssertTrue(
  224. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  225. }];
  226. [_workerDispatchQueue runDelayedCallbacksUntil:FSTTimerIDWriteStreamIdle];
  227. [_workerDispatchQueue dispatchSync:^{
  228. XCTAssertFalse([writeStream isOpen]);
  229. }];
  230. [self verifyDelegateObservedStates:@[
  231. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted"
  232. ]];
  233. }
  234. - (void)testStreamCancelsIdleOnWrite {
  235. FSTWriteStream *writeStream = [self setUpWriteStream];
  236. [_delegate awaitNotificationFromBlock:^{
  237. [writeStream startWithDelegate:_delegate];
  238. }];
  239. [_delegate awaitNotificationFromBlock:^{
  240. [writeStream writeHandshake];
  241. }];
  242. // Mark the stream idle, but immediately cancel the idle timer by issuing another write.
  243. [_delegate awaitNotificationFromBlock:^{
  244. [writeStream markIdle];
  245. XCTAssertTrue(
  246. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  247. [writeStream writeMutations:_mutations];
  248. XCTAssertFalse(
  249. [_workerDispatchQueue containsDelayedCallbackWithTimerID:FSTTimerIDWriteStreamIdle]);
  250. }];
  251. [_workerDispatchQueue dispatchSync:^{
  252. XCTAssertTrue([writeStream isOpen]);
  253. }];
  254. [self verifyDelegateObservedStates:@[
  255. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  256. @"writeStreamDidReceiveResponseWithVersion"
  257. ]];
  258. }
  259. @end