FSTMockDatastore.m 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FSTMockDatastore.h"
  17. #import "Auth/FSTEmptyCredentialsProvider.h"
  18. #import "Core/FSTDatabaseInfo.h"
  19. #import "Core/FSTSnapshotVersion.h"
  20. #import "Local/FSTQueryData.h"
  21. #import "Model/FSTDatabaseID.h"
  22. #import "Model/FSTMutation.h"
  23. #import "Util/FSTAssert.h"
  24. #import "Util/FSTLogger.h"
  25. #import "FSTWatchChange+Testing.h"
  26. @class GRPCProtoCall;
  27. NS_ASSUME_NONNULL_BEGIN
  28. #pragma mark - FSTMockWatchStream
  29. @interface FSTMockWatchStream : FSTWatchStream
  30. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  31. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  32. credentials:(id<FSTCredentialsProvider>)credentials
  33. delegate:(id<FSTWatchStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
  34. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  35. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  36. credentials:(id<FSTCredentialsProvider>)credentials
  37. responseMessageClass:(Class)responseMessageClass
  38. delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE;
  39. @property(nonatomic, assign) BOOL open;
  40. @property(nonatomic, strong, readonly)
  41. NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *activeTargets;
  42. @end
  43. @implementation FSTMockWatchStream
  44. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  45. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  46. credentials:(id<FSTCredentialsProvider>)credentials
  47. delegate:(id<FSTWatchStreamDelegate>)delegate {
  48. self = [super initWithDatabase:database
  49. workerDispatchQueue:workerDispatchQueue
  50. credentials:credentials
  51. responseMessageClass:[FSTWatchChange class]
  52. delegate:delegate];
  53. if (self) {
  54. FSTAssert(database, @"Database must not be nil");
  55. _activeTargets = [NSMutableDictionary dictionary];
  56. }
  57. return self;
  58. }
  59. #pragma mark - Overridden FSTWatchStream methods.
  60. - (void)start {
  61. FSTAssert(!self.open, @"Trying to start already started watch stream");
  62. self.open = YES;
  63. [self handleStreamOpen];
  64. }
  65. - (BOOL)isOpen {
  66. return self.open;
  67. }
  68. - (BOOL)isStarted {
  69. return self.open;
  70. }
  71. - (void)handleStreamOpen {
  72. [self.delegate watchStreamDidOpen];
  73. }
  74. - (void)watchQuery:(FSTQueryData *)query {
  75. FSTLog(@"watchQuery: %d: %@", query.targetID, query.query);
  76. // Snapshot version is ignored on the wire
  77. FSTQueryData *sentQueryData =
  78. [query queryDataByReplacingSnapshotVersion:[FSTSnapshotVersion noVersion]
  79. resumeToken:query.resumeToken];
  80. self.activeTargets[@(query.targetID)] = sentQueryData;
  81. }
  82. - (void)unwatchTargetID:(FSTTargetID)targetID {
  83. FSTLog(@"unwatchTargetID: %d", targetID);
  84. [self.activeTargets removeObjectForKey:@(targetID)];
  85. }
  86. - (void)failStreamWithError:(NSError *)error {
  87. self.open = NO;
  88. [self.delegate watchStreamDidClose:error];
  89. }
  90. #pragma mark - Helper methods.
  91. - (void)writeWatchChange:(FSTWatchChange *)change snapshotVersion:(FSTSnapshotVersion *)snap {
  92. if ([change isKindOfClass:[FSTWatchTargetChange class]]) {
  93. FSTWatchTargetChange *targetChange = (FSTWatchTargetChange *)change;
  94. if (targetChange.cause) {
  95. for (NSNumber *targetID in targetChange.targetIDs) {
  96. if (!self.activeTargets[targetID]) {
  97. // Technically removing an unknown target is valid (e.g. it could race with a
  98. // server-side removal), but we want to pay extra careful attention in tests
  99. // that we only remove targets we listened too.
  100. FSTFail(@"Removing a non-active target");
  101. }
  102. [self.activeTargets removeObjectForKey:targetID];
  103. }
  104. }
  105. }
  106. [self.delegate watchStreamDidChange:change snapshotVersion:snap];
  107. }
  108. @end
  109. #pragma mark - FSTMockWriteStream
  110. @interface FSTMockWriteStream : FSTWriteStream
  111. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  112. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  113. credentials:(id<FSTCredentialsProvider>)credentials
  114. delegate:(id<FSTWriteStreamDelegate>)delegate NS_DESIGNATED_INITIALIZER;
  115. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  116. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  117. credentials:(id<FSTCredentialsProvider>)credentials
  118. responseMessageClass:(Class)responseMessageClass
  119. delegate:(id<FSTWatchStreamDelegate>)delegate NS_UNAVAILABLE;
  120. @property(nonatomic, assign) BOOL open;
  121. @property(nonatomic, strong, readonly) NSMutableArray<NSArray<FSTMutation *> *> *sentMutations;
  122. @end
  123. @implementation FSTMockWriteStream
  124. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  125. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  126. credentials:(id<FSTCredentialsProvider>)credentials
  127. delegate:(id<FSTWriteStreamDelegate>)delegate {
  128. self = [super initWithDatabase:database
  129. workerDispatchQueue:workerDispatchQueue
  130. credentials:credentials
  131. responseMessageClass:[FSTMutationResult class]
  132. delegate:delegate];
  133. if (self) {
  134. _sentMutations = [NSMutableArray array];
  135. }
  136. return self;
  137. }
  138. #pragma mark - Overridden FSTWriteStream methods.
  139. - (void)start {
  140. FSTAssert(!self.open, @"Trying to start already started write stream");
  141. self.open = YES;
  142. [self.sentMutations removeAllObjects];
  143. [self handleStreamOpen];
  144. }
  145. - (BOOL)isOpen {
  146. return self.open;
  147. }
  148. - (BOOL)isStarted {
  149. return self.open;
  150. }
  151. - (void)writeHandshake {
  152. self.handshakeComplete = YES;
  153. [self.delegate writeStreamDidCompleteHandshake];
  154. }
  155. - (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
  156. [self.sentMutations addObject:mutations];
  157. }
  158. - (void)handleStreamOpen {
  159. [self.delegate writeStreamDidOpen];
  160. }
  161. #pragma mark - Helper methods.
  162. /** Injects a write ack as though it had come from the backend in response to a write. */
  163. - (void)ackWriteWithVersion:(FSTSnapshotVersion *)commitVersion
  164. mutationResults:(NSArray<FSTMutationResult *> *)results {
  165. [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
  166. }
  167. /** Injects a failed write response as though it had come from the backend. */
  168. - (void)failStreamWithError:(NSError *)error {
  169. self.open = NO;
  170. [self.delegate writeStreamDidClose:error];
  171. }
  172. /**
  173. * Returns the next write that was "sent to the backend", failing if there are no queued sent
  174. */
  175. - (NSArray<FSTMutation *> *)nextSentWrite {
  176. FSTAssert(self.sentMutations.count > 0,
  177. @"Writes need to happen before you can call nextSentWrite.");
  178. NSArray<FSTMutation *> *result = [self.sentMutations objectAtIndex:0];
  179. [self.sentMutations removeObjectAtIndex:0];
  180. return result;
  181. }
  182. /**
  183. * Returns the number of mutations that have been sent to the backend but not retrieved via
  184. * nextSentWrite yet.
  185. */
  186. - (int)sentMutationsCount {
  187. return (int)self.sentMutations.count;
  188. }
  189. @end
  190. #pragma mark - FSTMockDatastore
  191. @interface FSTMockDatastore ()
  192. @property(nonatomic, strong, nullable) FSTMockWatchStream *watchStream;
  193. @property(nonatomic, strong, nullable) FSTMockWriteStream *writeStream;
  194. /** Properties implemented in FSTDatastore that are nonpublic. */
  195. @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
  196. @property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
  197. @end
  198. @implementation FSTMockDatastore
  199. + (instancetype)mockDatastoreWithWorkerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue {
  200. FSTDatabaseID *databaseID = [FSTDatabaseID databaseIDWithProject:@"project" database:@"database"];
  201. FSTDatabaseInfo *databaseInfo = [FSTDatabaseInfo databaseInfoWithDatabaseID:databaseID
  202. persistenceKey:@"persistence"
  203. host:@"host"
  204. sslEnabled:NO];
  205. FSTEmptyCredentialsProvider *credentials = [[FSTEmptyCredentialsProvider alloc] init];
  206. return [[FSTMockDatastore alloc] initWithDatabaseInfo:databaseInfo
  207. workerDispatchQueue:workerDispatchQueue
  208. credentials:credentials];
  209. }
  210. #pragma mark - Overridden FSTDatastore methods.
  211. - (FSTWatchStream *)createWatchStreamWithDelegate:(id<FSTWatchStreamDelegate>)delegate {
  212. FSTAssert(self.databaseInfo, @"DatabaseInfo must not be nil");
  213. self.watchStream = [[FSTMockWatchStream alloc] initWithDatabase:self.databaseInfo
  214. workerDispatchQueue:self.workerDispatchQueue
  215. credentials:self.credentials
  216. delegate:delegate];
  217. return self.watchStream;
  218. }
  219. - (FSTWriteStream *)createWriteStreamWithDelegate:(id<FSTWriteStreamDelegate>)delegate {
  220. FSTAssert(self.databaseInfo, @"DatabaseInfo must not be nil");
  221. self.writeStream = [[FSTMockWriteStream alloc] initWithDatabase:self.databaseInfo
  222. workerDispatchQueue:self.workerDispatchQueue
  223. credentials:self.credentials
  224. delegate:delegate];
  225. return self.writeStream;
  226. }
  227. - (void)authorizeAndStartRPC:(GRPCProtoCall *)rpc completion:(FSTVoidErrorBlock)completion {
  228. FSTFail(@"FSTMockDatastore shouldn't be starting any RPCs.");
  229. }
  230. #pragma mark - Method exposed for tests to call.
  231. - (NSArray<FSTMutation *> *)nextSentWrite {
  232. return [self.writeStream nextSentWrite];
  233. }
  234. - (int)writesSent {
  235. return [self.writeStream sentMutationsCount];
  236. }
  237. - (void)ackWriteWithVersion:(FSTSnapshotVersion *)commitVersion
  238. mutationResults:(NSArray<FSTMutationResult *> *)results {
  239. [self.writeStream ackWriteWithVersion:commitVersion mutationResults:results];
  240. }
  241. - (void)failWriteWithError:(NSError *_Nullable)error {
  242. [self.writeStream failStreamWithError:error];
  243. }
  244. - (void)writeWatchTargetAddedWithTargetIDs:(NSArray<FSTBoxedTargetID *> *)targetIDs {
  245. FSTWatchTargetChange *change =
  246. [FSTWatchTargetChange changeWithState:FSTWatchTargetChangeStateAdded
  247. targetIDs:targetIDs
  248. cause:nil];
  249. [self writeWatchChange:change snapshotVersion:[FSTSnapshotVersion noVersion]];
  250. }
  251. - (void)writeWatchCurrentWithTargetIDs:(NSArray<FSTBoxedTargetID *> *)targetIDs
  252. snapshotVersion:(FSTSnapshotVersion *)snapshotVersion
  253. resumeToken:(NSData *)resumeToken {
  254. FSTWatchTargetChange *change =
  255. [FSTWatchTargetChange changeWithState:FSTWatchTargetChangeStateCurrent
  256. targetIDs:targetIDs
  257. resumeToken:resumeToken];
  258. [self writeWatchChange:change snapshotVersion:snapshotVersion];
  259. }
  260. - (void)writeWatchChange:(FSTWatchChange *)change snapshotVersion:(FSTSnapshotVersion *)snap {
  261. [self.watchStream writeWatchChange:change snapshotVersion:snap];
  262. }
  263. - (void)failWatchStreamWithError:(NSError *)error {
  264. [self.watchStream failStreamWithError:error];
  265. }
  266. - (NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *)activeTargets {
  267. return [self.watchStream.activeTargets copy];
  268. }
  269. - (BOOL)isWatchStreamOpen {
  270. return self.watchStream.isOpen;
  271. }
  272. @end
  273. NS_ASSUME_NONNULL_END