FSTMockDatastore.m 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FSTMockDatastore.h"
  17. #import "Auth/FSTEmptyCredentialsProvider.h"
  18. #import "Core/FSTDatabaseInfo.h"
  19. #import "Core/FSTSnapshotVersion.h"
  20. #import "Local/FSTQueryData.h"
  21. #import "Model/FSTDatabaseID.h"
  22. #import "Model/FSTMutation.h"
  23. #import "Remote/FSTSerializerBeta.h"
  24. #import "Remote/FSTStream.h"
  25. #import "Util/FSTAssert.h"
  26. #import "Util/FSTLogger.h"
  27. #import "FSTWatchChange+Testing.h"
  28. @class GRPCProtoCall;
  29. NS_ASSUME_NONNULL_BEGIN
  30. #pragma mark - FSTMockWatchStream
  31. @interface FSTMockWatchStream : FSTWatchStream
  32. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  33. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  34. credentials:(id<FSTCredentialsProvider>)credentials
  35. serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
  36. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  37. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  38. credentials:(id<FSTCredentialsProvider>)credentials
  39. responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
  40. @property(nonatomic, assign) BOOL open;
  41. @property(nonatomic, strong, readonly)
  42. NSMutableDictionary<FSTBoxedTargetID *, FSTQueryData *> *activeTargets;
  43. @property(nonatomic, weak, readwrite, nullable) id<FSTWatchStreamDelegate> delegate;
  44. @end
  45. @implementation FSTMockWatchStream
  46. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  47. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  48. credentials:(id<FSTCredentialsProvider>)credentials
  49. serializer:(FSTSerializerBeta *)serializer {
  50. self = [super initWithDatabase:database
  51. workerDispatchQueue:workerDispatchQueue
  52. credentials:credentials
  53. serializer:serializer];
  54. if (self) {
  55. FSTAssert(database, @"Database must not be nil");
  56. _activeTargets = [NSMutableDictionary dictionary];
  57. }
  58. return self;
  59. }
  60. #pragma mark - Overridden FSTWatchStream methods.
  61. - (void)startWithDelegate:(id<FSTWatchStreamDelegate>)delegate {
  62. FSTAssert(!self.open, @"Trying to start already started watch stream");
  63. self.open = YES;
  64. self.delegate = delegate;
  65. [self notifyStreamOpen];
  66. }
  67. - (void)stop {
  68. self.delegate = nil;
  69. }
  70. - (BOOL)isOpen {
  71. return self.open;
  72. }
  73. - (BOOL)isStarted {
  74. return self.open;
  75. }
  76. - (void)notifyStreamOpen {
  77. [self.delegate watchStreamDidOpen];
  78. }
  79. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  80. [self.delegate watchStreamWasInterruptedWithError:error];
  81. }
  82. - (void)watchQuery:(FSTQueryData *)query {
  83. FSTLog(@"watchQuery: %d: %@", query.targetID, query.query);
  84. // Snapshot version is ignored on the wire
  85. FSTQueryData *sentQueryData =
  86. [query queryDataByReplacingSnapshotVersion:[FSTSnapshotVersion noVersion]
  87. resumeToken:query.resumeToken];
  88. self.activeTargets[@(query.targetID)] = sentQueryData;
  89. }
  90. - (void)unwatchTargetID:(FSTTargetID)targetID {
  91. FSTLog(@"unwatchTargetID: %d", targetID);
  92. [self.activeTargets removeObjectForKey:@(targetID)];
  93. }
  94. - (void)failStreamWithError:(NSError *)error {
  95. self.open = NO;
  96. [self notifyStreamInterruptedWithError:error];
  97. }
  98. #pragma mark - Helper methods.
  99. - (void)writeWatchChange:(FSTWatchChange *)change snapshotVersion:(FSTSnapshotVersion *)snap {
  100. if ([change isKindOfClass:[FSTWatchTargetChange class]]) {
  101. FSTWatchTargetChange *targetChange = (FSTWatchTargetChange *)change;
  102. if (targetChange.cause) {
  103. for (NSNumber *targetID in targetChange.targetIDs) {
  104. if (!self.activeTargets[targetID]) {
  105. // Technically removing an unknown target is valid (e.g. it could race with a
  106. // server-side removal), but we want to pay extra careful attention in tests
  107. // that we only remove targets we listened too.
  108. FSTFail(@"Removing a non-active target");
  109. }
  110. [self.activeTargets removeObjectForKey:targetID];
  111. }
  112. }
  113. }
  114. [self.delegate watchStreamDidChange:change snapshotVersion:snap];
  115. }
  116. @end
  117. #pragma mark - FSTMockWriteStream
  118. @interface FSTMockWriteStream : FSTWriteStream
  119. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  120. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  121. credentials:(id<FSTCredentialsProvider>)credentials
  122. serializer:(FSTSerializerBeta *)serializer NS_DESIGNATED_INITIALIZER;
  123. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  124. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  125. credentials:(id<FSTCredentialsProvider>)credentials
  126. responseMessageClass:(Class)responseMessageClass NS_UNAVAILABLE;
  127. @property(nonatomic, assign) BOOL open;
  128. @property(nonatomic, strong, readonly) NSMutableArray<NSArray<FSTMutation *> *> *sentMutations;
  129. @property(nonatomic, weak, readwrite, nullable) id<FSTWriteStreamDelegate> delegate;
  130. @end
  131. @implementation FSTMockWriteStream
  132. - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database
  133. workerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue
  134. credentials:(id<FSTCredentialsProvider>)credentials
  135. serializer:(FSTSerializerBeta *)serializer {
  136. self = [super initWithDatabase:database
  137. workerDispatchQueue:workerDispatchQueue
  138. credentials:credentials
  139. serializer:serializer];
  140. if (self) {
  141. _sentMutations = [NSMutableArray array];
  142. }
  143. return self;
  144. }
  145. #pragma mark - Overridden FSTWriteStream methods.
  146. - (void)startWithDelegate:(id<FSTWriteStreamDelegate>)delegate {
  147. FSTAssert(!self.open, @"Trying to start already started write stream");
  148. self.open = YES;
  149. [self.sentMutations removeAllObjects];
  150. self.delegate = delegate;
  151. [self notifyStreamOpen];
  152. }
  153. - (void)stop {
  154. self.delegate = nil;
  155. }
  156. - (BOOL)isOpen {
  157. return self.open;
  158. }
  159. - (BOOL)isStarted {
  160. return self.open;
  161. }
  162. - (void)writeHandshake {
  163. self.handshakeComplete = YES;
  164. [self.delegate writeStreamDidCompleteHandshake];
  165. }
  166. - (void)writeMutations:(NSArray<FSTMutation *> *)mutations {
  167. [self.sentMutations addObject:mutations];
  168. }
  169. - (void)notifyStreamOpen {
  170. [self.delegate writeStreamDidOpen];
  171. }
  172. - (void)notifyStreamInterruptedWithError:(nullable NSError *)error {
  173. [self.delegate writeStreamWasInterruptedWithError:error];
  174. }
  175. #pragma mark - Helper methods.
  176. /** Injects a write ack as though it had come from the backend in response to a write. */
  177. - (void)ackWriteWithVersion:(FSTSnapshotVersion *)commitVersion
  178. mutationResults:(NSArray<FSTMutationResult *> *)results {
  179. [self.delegate writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results];
  180. }
  181. /** Injects a failed write response as though it had come from the backend. */
  182. - (void)failStreamWithError:(NSError *)error {
  183. self.open = NO;
  184. [self notifyStreamInterruptedWithError:error];
  185. }
  186. /**
  187. * Returns the next write that was "sent to the backend", failing if there are no queued sent
  188. */
  189. - (NSArray<FSTMutation *> *)nextSentWrite {
  190. FSTAssert(self.sentMutations.count > 0,
  191. @"Writes need to happen before you can call nextSentWrite.");
  192. NSArray<FSTMutation *> *result = [self.sentMutations objectAtIndex:0];
  193. [self.sentMutations removeObjectAtIndex:0];
  194. return result;
  195. }
  196. /**
  197. * Returns the number of mutations that have been sent to the backend but not retrieved via
  198. * nextSentWrite yet.
  199. */
  200. - (int)sentMutationsCount {
  201. return (int)self.sentMutations.count;
  202. }
  203. @end
  204. #pragma mark - FSTMockDatastore
  205. @interface FSTMockDatastore ()
  206. @property(nonatomic, strong, nullable) FSTMockWatchStream *watchStream;
  207. @property(nonatomic, strong, nullable) FSTMockWriteStream *writeStream;
  208. /** Properties implemented in FSTDatastore that are nonpublic. */
  209. @property(nonatomic, strong, readonly) FSTDispatchQueue *workerDispatchQueue;
  210. @property(nonatomic, strong, readonly) id<FSTCredentialsProvider> credentials;
  211. @end
  212. @implementation FSTMockDatastore
  213. + (instancetype)mockDatastoreWithWorkerDispatchQueue:(FSTDispatchQueue *)workerDispatchQueue {
  214. FSTDatabaseID *databaseID = [FSTDatabaseID databaseIDWithProject:@"project" database:@"database"];
  215. FSTDatabaseInfo *databaseInfo = [FSTDatabaseInfo databaseInfoWithDatabaseID:databaseID
  216. persistenceKey:@"persistence"
  217. host:@"host"
  218. sslEnabled:NO];
  219. FSTEmptyCredentialsProvider *credentials = [[FSTEmptyCredentialsProvider alloc] init];
  220. return [[FSTMockDatastore alloc] initWithDatabaseInfo:databaseInfo
  221. workerDispatchQueue:workerDispatchQueue
  222. credentials:credentials];
  223. }
  224. #pragma mark - Overridden FSTDatastore methods.
  225. - (FSTWatchStream *)createWatchStream {
  226. FSTAssert(self.databaseInfo, @"DatabaseInfo must not be nil");
  227. self.watchStream = [[FSTMockWatchStream alloc]
  228. initWithDatabase:self.databaseInfo
  229. workerDispatchQueue:self.workerDispatchQueue
  230. credentials:self.credentials
  231. serializer:[[FSTSerializerBeta alloc]
  232. initWithDatabaseID:self.databaseInfo.databaseID]];
  233. return self.watchStream;
  234. }
  235. - (FSTWriteStream *)createWriteStream {
  236. FSTAssert(self.databaseInfo, @"DatabaseInfo must not be nil");
  237. self.writeStream = [[FSTMockWriteStream alloc]
  238. initWithDatabase:self.databaseInfo
  239. workerDispatchQueue:self.workerDispatchQueue
  240. credentials:self.credentials
  241. serializer:[[FSTSerializerBeta alloc]
  242. initWithDatabaseID:self.databaseInfo.databaseID]];
  243. return self.writeStream;
  244. }
  245. - (void)authorizeAndStartRPC:(GRPCProtoCall *)rpc completion:(FSTVoidErrorBlock)completion {
  246. FSTFail(@"FSTMockDatastore shouldn't be starting any RPCs.");
  247. }
  248. #pragma mark - Method exposed for tests to call.
  249. - (NSArray<FSTMutation *> *)nextSentWrite {
  250. return [self.writeStream nextSentWrite];
  251. }
  252. - (int)writesSent {
  253. return [self.writeStream sentMutationsCount];
  254. }
  255. - (void)ackWriteWithVersion:(FSTSnapshotVersion *)commitVersion
  256. mutationResults:(NSArray<FSTMutationResult *> *)results {
  257. [self.writeStream ackWriteWithVersion:commitVersion mutationResults:results];
  258. }
  259. - (void)failWriteWithError:(NSError *_Nullable)error {
  260. [self.writeStream failStreamWithError:error];
  261. }
  262. - (void)writeWatchTargetAddedWithTargetIDs:(NSArray<FSTBoxedTargetID *> *)targetIDs {
  263. FSTWatchTargetChange *change =
  264. [FSTWatchTargetChange changeWithState:FSTWatchTargetChangeStateAdded
  265. targetIDs:targetIDs
  266. cause:nil];
  267. [self writeWatchChange:change snapshotVersion:[FSTSnapshotVersion noVersion]];
  268. }
  269. - (void)writeWatchCurrentWithTargetIDs:(NSArray<FSTBoxedTargetID *> *)targetIDs
  270. snapshotVersion:(FSTSnapshotVersion *)snapshotVersion
  271. resumeToken:(NSData *)resumeToken {
  272. FSTWatchTargetChange *change =
  273. [FSTWatchTargetChange changeWithState:FSTWatchTargetChangeStateCurrent
  274. targetIDs:targetIDs
  275. resumeToken:resumeToken];
  276. [self writeWatchChange:change snapshotVersion:snapshotVersion];
  277. }
  278. - (void)writeWatchChange:(FSTWatchChange *)change snapshotVersion:(FSTSnapshotVersion *)snap {
  279. [self.watchStream writeWatchChange:change snapshotVersion:snap];
  280. }
  281. - (void)failWatchStreamWithError:(NSError *)error {
  282. [self.watchStream failStreamWithError:error];
  283. }
  284. - (NSDictionary<FSTBoxedTargetID *, FSTQueryData *> *)activeTargets {
  285. return [self.watchStream.activeTargets copy];
  286. }
  287. - (BOOL)isWatchStreamOpen {
  288. return self.watchStream.isOpen;
  289. }
  290. @end
  291. NS_ASSUME_NONNULL_END