FSTStreamTests.mm 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <XCTest/XCTest.h>
  17. #import <FirebaseFirestore/FIRFirestoreSettings.h>
  18. #import "Firestore/Example/Tests/Util/FSTHelpers.h"
  19. #import "Firestore/Example/Tests/Util/FSTIntegrationTestCase.h"
  20. #import "Firestore/Example/Tests/Util/FSTTestDispatchQueue.h"
  21. #import "Firestore/Source/Auth/FSTEmptyCredentialsProvider.h"
  22. #import "Firestore/Source/Remote/FSTDatastore.h"
  23. #import "Firestore/Source/Remote/FSTStream.h"
  24. #import "Firestore/Source/Util/FSTAssert.h"
  25. #include "Firestore/core/src/firebase/firestore/core/database_info.h"
  26. #include "Firestore/core/src/firebase/firestore/model/database_id.h"
  27. #include "Firestore/core/src/firebase/firestore/util/string_apple.h"
  28. namespace util = firebase::firestore::util;
  29. using firebase::firestore::core::DatabaseInfo;
  30. using firebase::firestore::model::DatabaseId;
  31. /** Exposes otherwise private methods for testing. */
  32. @interface FSTStream (Testing)
  33. - (void)writesFinishedWithError:(NSError *_Nullable)error;
  34. @end
  35. /**
  36. * Implements FSTWatchStreamDelegate and FSTWriteStreamDelegate and supports waiting on callbacks
  37. * via `fulfillOnCallback`.
  38. */
  39. @interface FSTStreamStatusDelegate : NSObject <FSTWatchStreamDelegate, FSTWriteStreamDelegate>
  40. - (instancetype)initWithTestCase:(XCTestCase *)testCase
  41. queue:(FSTDispatchQueue *)dispatchQueue NS_DESIGNATED_INITIALIZER;
  42. - (instancetype)init NS_UNAVAILABLE;
  43. @property(nonatomic, weak, readonly) XCTestCase *testCase;
  44. @property(nonatomic, strong, readonly) FSTDispatchQueue *dispatchQueue;
  45. @property(nonatomic, readonly) NSMutableArray<NSString *> *states;
  46. @property(nonatomic, strong) XCTestExpectation *expectation;
  47. @end
  48. @implementation FSTStreamStatusDelegate
  49. - (instancetype)initWithTestCase:(XCTestCase *)testCase queue:(FSTDispatchQueue *)dispatchQueue {
  50. if (self = [super init]) {
  51. _testCase = testCase;
  52. _dispatchQueue = dispatchQueue;
  53. _states = [NSMutableArray new];
  54. }
  55. return self;
  56. }
  57. - (void)watchStreamDidOpen {
  58. [_states addObject:@"watchStreamDidOpen"];
  59. [_expectation fulfill];
  60. _expectation = nil;
  61. }
  62. - (void)writeStreamDidOpen {
  63. [_states addObject:@"writeStreamDidOpen"];
  64. [_expectation fulfill];
  65. _expectation = nil;
  66. }
  67. - (void)writeStreamDidCompleteHandshake {
  68. [_states addObject:@"writeStreamDidCompleteHandshake"];
  69. [_expectation fulfill];
  70. _expectation = nil;
  71. }
  72. - (void)writeStreamWasInterruptedWithError:(nullable NSError *)error {
  73. [_states addObject:@"writeStreamWasInterrupted"];
  74. [_expectation fulfill];
  75. _expectation = nil;
  76. }
  77. - (void)watchStreamWasInterruptedWithError:(nullable NSError *)error {
  78. [_states addObject:@"watchStreamWasInterrupted"];
  79. [_expectation fulfill];
  80. _expectation = nil;
  81. }
  82. - (void)watchStreamDidChange:(FSTWatchChange *)change
  83. snapshotVersion:(FSTSnapshotVersion *)snapshotVersion {
  84. [_states addObject:@"watchStreamDidChange"];
  85. [_expectation fulfill];
  86. _expectation = nil;
  87. }
  88. - (void)writeStreamDidReceiveResponseWithVersion:(FSTSnapshotVersion *)commitVersion
  89. mutationResults:(NSArray<FSTMutationResult *> *)results {
  90. [_states addObject:@"writeStreamDidReceiveResponseWithVersion"];
  91. [_expectation fulfill];
  92. _expectation = nil;
  93. }
  94. /**
  95. * Executes 'block' using the provided FSTDispatchQueue and waits for any callback on this delegate
  96. * to be called.
  97. */
  98. - (void)awaitNotificationFromBlock:(void (^)(void))block {
  99. FSTAssert(_expectation == nil, @"Previous expectation still active");
  100. XCTestExpectation *expectation =
  101. [self.testCase expectationWithDescription:@"awaitCallbackInBlock"];
  102. _expectation = expectation;
  103. [self.dispatchQueue dispatchAsync:block];
  104. [self.testCase awaitExpectations];
  105. }
  106. @end
  107. @interface FSTStreamTests : XCTestCase
  108. @end
  109. @implementation FSTStreamTests {
  110. dispatch_queue_t _testQueue;
  111. FSTTestDispatchQueue *_workerDispatchQueue;
  112. DatabaseInfo _databaseInfo;
  113. FSTEmptyCredentialsProvider *_credentials;
  114. FSTStreamStatusDelegate *_delegate;
  115. /** Single mutation to send to the write stream. */
  116. NSArray<FSTMutation *> *_mutations;
  117. }
  118. - (void)setUp {
  119. [super setUp];
  120. FIRFirestoreSettings *settings = [FSTIntegrationTestCase settings];
  121. DatabaseId database_id(util::MakeStringView([FSTIntegrationTestCase projectID]),
  122. DatabaseId::kDefaultDatabaseId);
  123. _testQueue = dispatch_queue_create("FSTStreamTestWorkerQueue", DISPATCH_QUEUE_SERIAL);
  124. _workerDispatchQueue = [[FSTTestDispatchQueue alloc] initWithQueue:_testQueue];
  125. _databaseInfo = DatabaseInfo(database_id, "test-key", util::MakeStringView(settings.host),
  126. settings.sslEnabled);
  127. _credentials = [[FSTEmptyCredentialsProvider alloc] init];
  128. _delegate = [[FSTStreamStatusDelegate alloc] initWithTestCase:self queue:_workerDispatchQueue];
  129. _mutations = @[ FSTTestSetMutation(@"foo/bar", @{}) ];
  130. }
  131. - (FSTWriteStream *)setUpWriteStream {
  132. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  133. workerDispatchQueue:_workerDispatchQueue
  134. credentials:_credentials];
  135. return [datastore createWriteStream];
  136. }
  137. - (FSTWatchStream *)setUpWatchStream {
  138. FSTDatastore *datastore = [[FSTDatastore alloc] initWithDatabaseInfo:&_databaseInfo
  139. workerDispatchQueue:_workerDispatchQueue
  140. credentials:_credentials];
  141. return [datastore createWatchStream];
  142. }
  143. /**
  144. * Drains the test queue and asserts that all the observed callbacks (up to this point) match
  145. * 'expectedStates'. Clears the list of observed callbacks on completion.
  146. */
  147. - (void)verifyDelegateObservedStates:(NSArray<NSString *> *)expectedStates {
  148. // Drain queue
  149. dispatch_sync(_testQueue, ^{
  150. });
  151. XCTAssertEqualObjects(_delegate.states, expectedStates);
  152. [_delegate.states removeAllObjects];
  153. }
  154. /** Verifies that the watch stream does not issue an onClose callback after a call to stop(). */
  155. - (void)testWatchStreamStopBeforeHandshake {
  156. FSTWatchStream *watchStream = [self setUpWatchStream];
  157. [_delegate awaitNotificationFromBlock:^{
  158. [watchStream startWithDelegate:_delegate];
  159. }];
  160. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  161. // attempt to restart the stream in the event it had pending watches.
  162. [_workerDispatchQueue dispatchAsync:^{
  163. [watchStream stop];
  164. }];
  165. // Simulate a final callback from GRPC
  166. [watchStream writesFinishedWithError:nil];
  167. [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
  168. }
  169. /** Verifies that the write stream does not issue an onClose callback after a call to stop(). */
  170. - (void)testWriteStreamStopBeforeHandshake {
  171. FSTWriteStream *writeStream = [self setUpWriteStream];
  172. [_delegate awaitNotificationFromBlock:^{
  173. [writeStream startWithDelegate:_delegate];
  174. }];
  175. // Don't start the handshake.
  176. // Stop must not call watchStreamDidClose because the full implementation of the delegate could
  177. // attempt to restart the stream in the event it had pending watches.
  178. [_workerDispatchQueue dispatchAsync:^{
  179. [writeStream stop];
  180. }];
  181. // Simulate a final callback from GRPC
  182. [writeStream writesFinishedWithError:nil];
  183. [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
  184. }
  185. - (void)testWriteStreamStopAfterHandshake {
  186. FSTWriteStream *writeStream = [self setUpWriteStream];
  187. [_delegate awaitNotificationFromBlock:^{
  188. [writeStream startWithDelegate:_delegate];
  189. }];
  190. // Writing before the handshake should throw
  191. dispatch_sync(_testQueue, ^{
  192. XCTAssertThrows([writeStream writeMutations:_mutations]);
  193. });
  194. [_delegate awaitNotificationFromBlock:^{
  195. [writeStream writeHandshake];
  196. }];
  197. // Now writes should succeed
  198. [_delegate awaitNotificationFromBlock:^{
  199. [writeStream writeMutations:_mutations];
  200. }];
  201. [_workerDispatchQueue dispatchAsync:^{
  202. [writeStream stop];
  203. }];
  204. [self verifyDelegateObservedStates:@[
  205. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  206. @"writeStreamDidReceiveResponseWithVersion"
  207. ]];
  208. }
  209. - (void)testStreamClosesWhenIdle {
  210. FSTWriteStream *writeStream = [self setUpWriteStream];
  211. [_delegate awaitNotificationFromBlock:^{
  212. [writeStream startWithDelegate:_delegate];
  213. }];
  214. [_delegate awaitNotificationFromBlock:^{
  215. [writeStream writeHandshake];
  216. }];
  217. [_delegate awaitNotificationFromBlock:^{
  218. [writeStream markIdle];
  219. }];
  220. dispatch_sync(_testQueue, ^{
  221. XCTAssertFalse([writeStream isOpen]);
  222. });
  223. [self verifyDelegateObservedStates:@[
  224. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake", @"writeStreamWasInterrupted"
  225. ]];
  226. }
  227. - (void)testStreamCancelsIdleOnWrite {
  228. FSTWriteStream *writeStream = [self setUpWriteStream];
  229. [_delegate awaitNotificationFromBlock:^{
  230. [writeStream startWithDelegate:_delegate];
  231. }];
  232. [_delegate awaitNotificationFromBlock:^{
  233. [writeStream writeHandshake];
  234. }];
  235. // Mark the stream idle, but immediately cancel the idle timer by issuing another write.
  236. [_delegate awaitNotificationFromBlock:^{
  237. [writeStream markIdle];
  238. [writeStream writeMutations:_mutations];
  239. }];
  240. dispatch_sync(_testQueue, ^{
  241. XCTAssertTrue([writeStream isOpen]);
  242. });
  243. [self verifyDelegateObservedStates:@[
  244. @"writeStreamDidOpen", @"writeStreamDidCompleteHandshake",
  245. @"writeStreamDidReceiveResponseWithVersion"
  246. ]];
  247. }
  248. @end