FSyncTree.m 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FSyncTree.h"
  17. #import "FListenProvider.h"
  18. #import "FWriteTree.h"
  19. #import "FNode.h"
  20. #import "FPath.h"
  21. #import "FEventRegistration.h"
  22. #import "FImmutableTree.h"
  23. #import "FOperation.h"
  24. #import "FWriteTreeRef.h"
  25. #import "FOverwrite.h"
  26. #import "FOperationSource.h"
  27. #import "FMerge.h"
  28. #import "FAckUserWrite.h"
  29. #import "FView.h"
  30. #import "FSyncPoint.h"
  31. #import "FEmptyNode.h"
  32. #import "FQueryParams.h"
  33. #import "FQuerySpec.h"
  34. #import "FSnapshotHolder.h"
  35. #import "FChildrenNode.h"
  36. #import "FTupleRemovedQueriesEvents.h"
  37. #import "FAtomicNumber.h"
  38. #import "FEventRaiser.h"
  39. #import "FListenComplete.h"
  40. #import "FSnapshotUtilities.h"
  41. #import "FCacheNode.h"
  42. #import "FUtilities.h"
  43. #import "FCompoundWrite.h"
  44. #import "FWriteRecord.h"
  45. #import "FPersistenceManager.h"
  46. #import "FKeepSyncedEventRegistration.h"
  47. #import "FServerValues.h"
  48. #import "FCompoundHash.h"
  49. #import "FRangeMerge.h"
  50. // Size after which we start including the compound hash
  51. static const NSUInteger kFSizeThresholdForCompoundHash = 1024;
  52. @interface FListenContainer : NSObject<FSyncTreeHash>
  53. @property (nonatomic, strong) FView *view;
  54. @property (nonatomic, copy) fbt_nsarray_nsstring onComplete;
  55. @end
  56. @implementation FListenContainer
  57. - (instancetype)initWithView:(FView *)view onComplete:(fbt_nsarray_nsstring)onComplete {
  58. self = [super init];
  59. if (self != nil) {
  60. self->_view = view;
  61. self->_onComplete = onComplete;
  62. }
  63. return self;
  64. }
  65. - (id<FNode>)serverCache {
  66. return self.view.serverCache;
  67. }
  68. - (FCompoundHash *)compoundHash {
  69. return [FCompoundHash fromNode:[self serverCache]];
  70. }
  71. - (NSString *)simpleHash {
  72. return [[self serverCache] dataHash];
  73. }
  74. - (BOOL)includeCompoundHash {
  75. return [FSnapshotUtilities estimateSerializedNodeSize:[self serverCache]] > kFSizeThresholdForCompoundHash;
  76. }
  77. @end
  78. @interface FSyncTree ()
  79. /**
  80. * Tree of SyncPoints. There's a SyncPoint at any location that has 1 or more views.
  81. */
  82. @property (nonatomic, strong) FImmutableTree *syncPointTree;
  83. /**
  84. * A tree of all pending user writes (user-initiated set, transactions, updates, etc)
  85. */
  86. @property (nonatomic, strong) FWriteTree *pendingWriteTree;
  87. /**
  88. * Maps tagId -> FTuplePathQueryParams
  89. */
  90. @property (nonatomic, strong) NSMutableDictionary *tagToQueryMap;
  91. @property (nonatomic, strong) NSMutableDictionary *queryToTagMap;
  92. @property (nonatomic, strong) FListenProvider *listenProvider;
  93. @property (nonatomic, strong) FPersistenceManager *persistenceManager;
  94. @property (nonatomic, strong) FAtomicNumber *queryTagCounter;
  95. @property (nonatomic, strong) NSMutableSet *keepSyncedQueries;
  96. @end
  97. /**
  98. * SyncTree is the central class for managing event callback registration, data caching, views
  99. * (query processing), and event generation. There are typically two SyncTree instances for
  100. * each Repo, one for the normal Firebase data, and one for the .info data.
  101. *
  102. * It has a number of responsibilities, including:
  103. * - Tracking all user event callbacks (registered via addEventRegistration: and removeEventRegistration:).
  104. * - Applying and caching data changes for user setValue:, runTransactionBlock:, and updateChildValues: calls
  105. * (applyUserOverwriteAtPath:, applyUserMergeAtPath:).
  106. * - Applying and caching data changes for server data changes (applyServerOverwriteAtPath:,
  107. * applyServerMergeAtPath:).
  108. * - Generating user-facing events for server and user changes (all of the apply* methods
  109. * return the set of events that need to be raised as a result).
  110. * - Maintaining the appropriate set of server listens to ensure we are always subscribed
  111. * to the correct set of paths and queries to satisfy the current set of user event
  112. * callbacks (listens are started/stopped using the provided listenProvider).
  113. *
  114. * NOTE: Although SyncTree tracks event callbacks and calculates events to raise, the actual
  115. * events are returned to the caller rather than raised synchronously.
  116. */
  117. @implementation FSyncTree
  118. - (id) initWithListenProvider:(FListenProvider *)provider {
  119. return [self initWithPersistenceManager:nil listenProvider:provider];
  120. }
  121. - (id) initWithPersistenceManager:(FPersistenceManager *)persistenceManager listenProvider:(FListenProvider *)provider {
  122. self = [super init];
  123. if (self) {
  124. self.syncPointTree = [FImmutableTree empty];
  125. self.pendingWriteTree = [[FWriteTree alloc] init];
  126. self.tagToQueryMap = [[NSMutableDictionary alloc] init];
  127. self.queryToTagMap = [[NSMutableDictionary alloc] init];
  128. self.listenProvider = provider;
  129. self.persistenceManager = persistenceManager;
  130. self.queryTagCounter = [[FAtomicNumber alloc] init];
  131. self.keepSyncedQueries = [NSMutableSet set];
  132. }
  133. return self;
  134. }
  135. #pragma mark -
  136. #pragma mark Apply Operations
  137. /**
  138. * Apply data changes for a user-generated setValue: runTransactionBlock: updateChildValues:, etc.
  139. * @return NSArray of FEvent to raise.
  140. */
  141. - (NSArray *) applyUserOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData writeId:(NSInteger)writeId isVisible:(BOOL)visible {
  142. // Record pending write
  143. [self.pendingWriteTree addOverwriteAtPath:path newData:newData writeId:writeId isVisible:visible];
  144. if (!visible) {
  145. return @[];
  146. } else {
  147. FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource userInstance] path:path snap:newData];
  148. return [self applyOperationToSyncPoints:operation];
  149. }
  150. }
  151. /**
  152. * Apply the data from a user-generated updateChildValues: call
  153. * @return NSArray of FEvent to raise.
  154. */
  155. - (NSArray *) applyUserMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren writeId:(NSInteger)writeId {
  156. // Record pending merge
  157. [self.pendingWriteTree addMergeAtPath:path changedChildren:changedChildren writeId:writeId];
  158. FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource userInstance] path:path children:changedChildren];
  159. return [self applyOperationToSyncPoints:operation];
  160. }
  161. /**
  162. * Acknowledge a pending user write that was previously registered with applyUserOverwriteAtPath: or applyUserMergeAtPath:
  163. * TODO[offline]: Taking a serverClock here is awkward, but server values are awkward. :-(
  164. * @return NSArray of FEvent to raise.
  165. */
  166. - (NSArray *) ackUserWriteWithWriteId:(NSInteger)writeId revert:(BOOL)revert persist:(BOOL)persist clock:(id<FClock>)clock {
  167. FWriteRecord *write = [self.pendingWriteTree writeForId:writeId];
  168. BOOL needToReevaluate = [self.pendingWriteTree removeWriteId:writeId];
  169. if (write.visible) {
  170. if (persist) {
  171. [self.persistenceManager removeUserWrite:writeId];
  172. }
  173. if (!revert) {
  174. NSDictionary *serverValues = [FServerValues generateServerValues:clock];
  175. if ([write isOverwrite]) {
  176. id<FNode> resolvedNode = [FServerValues resolveDeferredValueSnapshot:write.overwrite withServerValues:serverValues];
  177. [self.persistenceManager applyUserWrite:resolvedNode toServerCacheAtPath:write.path];
  178. } else {
  179. FCompoundWrite *resolvedMerge = [FServerValues resolveDeferredValueCompoundWrite:write.merge withServerValues:serverValues];
  180. [self.persistenceManager applyUserMerge:resolvedMerge toServerCacheAtPath:write.path];
  181. }
  182. }
  183. }
  184. if (!needToReevaluate) {
  185. return @[];
  186. } else {
  187. __block FImmutableTree *affectedTree = [FImmutableTree empty];
  188. if (write.isOverwrite) {
  189. affectedTree = [affectedTree setValue:@YES atPath:[FPath empty]];
  190. } else {
  191. [write.merge enumerateWrites:^(FPath *path, id <FNode> node, BOOL *stop) {
  192. affectedTree = [affectedTree setValue:@YES atPath:path];
  193. }];
  194. }
  195. FAckUserWrite *operation = [[FAckUserWrite alloc] initWithPath:write.path affectedTree:affectedTree revert:revert];
  196. return [self applyOperationToSyncPoints:operation];
  197. }
  198. }
  199. /**
  200. * Apply new server data for the specified path
  201. * @return NSArray of FEvent to raise.
  202. */
  203. - (NSArray *) applyServerOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData {
  204. [self.persistenceManager updateServerCacheWithNode:newData forQuery:[FQuerySpec defaultQueryAtPath:path]];
  205. FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource serverInstance] path:path snap:newData];
  206. return [self applyOperationToSyncPoints:operation];
  207. }
  208. /**
  209. * Applied new server data to be merged in at the specified path
  210. * @return NSArray of FEvent to raise.
  211. */
  212. - (NSArray *) applyServerMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren {
  213. [self.persistenceManager updateServerCacheWithMerge:changedChildren atPath:path];
  214. FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource serverInstance] path:path children:changedChildren];
  215. return [self applyOperationToSyncPoints:operation];
  216. }
  217. - (NSArray *) applyServerRangeMergeAtPath:(FPath *)path updates:(NSArray *)ranges {
  218. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  219. if (syncPoint == nil) {
  220. // Removed view, so it's safe to just ignore this update
  221. return @[];
  222. } else {
  223. // This could be for any "complete" (unfiltered) view, and if there is more than one complete view, they should
  224. // each have the same cache so it doesn't matter which one we use.
  225. FView *view = [syncPoint completeView];
  226. if (view != nil) {
  227. id<FNode> serverNode = [view serverCache];
  228. for (FRangeMerge *merge in ranges) {
  229. serverNode = [merge applyToNode:serverNode];
  230. }
  231. return [self applyServerOverwriteAtPath:path newData:serverNode];
  232. } else {
  233. // There doesn't exist a view for this update, so it was removed and it's safe to just ignore this range
  234. // merge
  235. return @[];
  236. }
  237. }
  238. }
  239. /**
  240. * Apply a listen complete to a path
  241. * @return NSArray of FEvent to raise.
  242. */
  243. - (NSArray *) applyListenCompleteAtPath:(FPath *)path {
  244. [self.persistenceManager setQueryComplete:[FQuerySpec defaultQueryAtPath:path]];
  245. id<FOperation> operation = [[FListenComplete alloc] initWithSource:[FOperationSource serverInstance] path:path];
  246. return [self applyOperationToSyncPoints:operation];
  247. }
  248. /**
  249. * Apply a listen complete to a path
  250. * @return NSArray of FEvent to raise.
  251. */
  252. - (NSArray *) applyTaggedListenCompleteAtPath:(FPath *)path tagId:(NSNumber *)tagId {
  253. FQuerySpec *query = [self queryForTag:tagId];
  254. if (query != nil) {
  255. [self.persistenceManager setQueryComplete:query];
  256. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  257. id<FOperation> op = [[FListenComplete alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  258. path:relativePath];
  259. return [self applyTaggedOperation:op atPath:query.path];
  260. } else {
  261. // We've already removed the query. No big deal, ignore the update.
  262. return @[];
  263. }
  264. }
  265. /**
  266. * Internal helper method to apply tagged operation
  267. */
  268. - (NSArray *) applyTaggedOperation:(id<FOperation>)operation atPath:(FPath *)path {
  269. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  270. NSAssert(syncPoint != nil, @"Missing sync point for query tag that we're tracking.");
  271. FWriteTreeRef *writesCache = [self.pendingWriteTree childWritesForPath:path];
  272. return [syncPoint applyOperation:operation writesCache:writesCache serverCache:nil];
  273. }
  274. /**
  275. * Apply new server data for the specified tagged query
  276. * @return NSArray of FEvent to raise.
  277. */
  278. - (NSArray *) applyTaggedQueryOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData tagId:(NSNumber *)tagId {
  279. FQuerySpec *query = [self queryForTag:tagId];
  280. if (query != nil) {
  281. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  282. FQuerySpec *queryToOverwrite = relativePath.isEmpty ? query : [FQuerySpec defaultQueryAtPath:path];
  283. [self.persistenceManager updateServerCacheWithNode:newData forQuery:queryToOverwrite];
  284. FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  285. path:relativePath snap:newData];
  286. return [self applyTaggedOperation:operation atPath:query.path];
  287. } else {
  288. // Query must have been removed already
  289. return @[];
  290. }
  291. }
  292. /**
  293. * Apply server data to be merged in for the specified tagged query
  294. * @return NSArray of FEvent to raise.
  295. */
  296. - (NSArray *) applyTaggedQueryMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren tagId:(NSNumber *)tagId {
  297. FQuerySpec *query = [self queryForTag:tagId];
  298. if (query != nil) {
  299. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  300. [self.persistenceManager updateServerCacheWithMerge:changedChildren atPath:path];
  301. FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  302. path:relativePath
  303. children:changedChildren];
  304. return [self applyTaggedOperation:operation atPath:query.path];
  305. } else {
  306. // We've already removed the query. No big deal, ignore the update.
  307. return @[];
  308. }
  309. }
  310. - (NSArray *) applyTaggedServerRangeMergeAtPath:(FPath *)path updates:(NSArray *)ranges tagId:(NSNumber *)tagId {
  311. FQuerySpec *query = [self queryForTag:tagId];
  312. if (query != nil) {
  313. NSAssert([path isEqual:query.path], @"Tagged update path and query path must match");
  314. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  315. NSAssert(syncPoint != nil, @"Missing sync point for query tag that we're tracking.");
  316. FView *view = [syncPoint viewForQuery:query];
  317. NSAssert(view != nil, @"Missing view for query tag that we're tracking");
  318. id<FNode> serverNode = [view serverCache];
  319. for (FRangeMerge *merge in ranges) {
  320. serverNode = [merge applyToNode:serverNode];
  321. }
  322. return [self applyTaggedQueryOverwriteAtPath:path newData:serverNode tagId:tagId];
  323. } else {
  324. // We've already removed the query. No big deal, ignore the update.
  325. return @[];
  326. }
  327. }
  328. /**
  329. * Add an event callback for the specified query
  330. * @return NSArray of FEvent to raise.
  331. */
  332. - (NSArray *) addEventRegistration:(id<FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query {
  333. FPath *path = query.path;
  334. __block BOOL foundAncestorDefaultView = NO;
  335. [self.syncPointTree forEachOnPath:query.path whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  336. foundAncestorDefaultView = foundAncestorDefaultView || [syncPoint hasCompleteView];
  337. return !foundAncestorDefaultView;
  338. }];
  339. [self.persistenceManager setQueryActive:query];
  340. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  341. if (syncPoint == nil) {
  342. syncPoint = [[FSyncPoint alloc] initWithPersistenceManager:self.persistenceManager];
  343. self.syncPointTree = [self.syncPointTree setValue:syncPoint atPath:path];
  344. }
  345. BOOL viewAlreadyExists = [syncPoint viewExistsForQuery:query];
  346. NSArray *events;
  347. if (viewAlreadyExists) {
  348. events = [syncPoint addEventRegistration:eventRegistration forExistingViewForQuery:query];
  349. } else {
  350. if (![query loadsAllData]) {
  351. // We need to track a tag for this query
  352. NSAssert(self.queryToTagMap[query] == nil, @"View does not exist, but we have a tag");
  353. NSNumber *tagId = [self.queryTagCounter getAndIncrement];
  354. self.queryToTagMap[query] = tagId;
  355. self.tagToQueryMap[tagId] = query;
  356. }
  357. FWriteTreeRef *writesCache = [self.pendingWriteTree childWritesForPath:path];
  358. FCacheNode *serverCache = [self serverCacheForQuery:query];
  359. events = [syncPoint addEventRegistration:eventRegistration
  360. forNonExistingViewForQuery:query
  361. writesCache:writesCache
  362. serverCache:serverCache];
  363. // There was no view and no default listen
  364. if (!foundAncestorDefaultView) {
  365. FView *view = [syncPoint viewForQuery:query];
  366. NSMutableArray *mutableEvents = [events mutableCopy];
  367. [mutableEvents addObjectsFromArray:[self setupListenerOnQuery:query view:view]];
  368. events = mutableEvents;
  369. }
  370. }
  371. return events;
  372. }
  373. - (FCacheNode *)serverCacheForQuery:(FQuerySpec *)query {
  374. __block id<FNode> serverCacheNode = nil;
  375. [self.syncPointTree forEachOnPath:query.path whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  376. FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint to:query.path];
  377. serverCacheNode = [syncPoint completeServerCacheAtPath:relativePath];
  378. return serverCacheNode == nil;
  379. }];
  380. FCacheNode *serverCache;
  381. if (serverCacheNode != nil) {
  382. FIndexedNode *indexed = [FIndexedNode indexedNodeWithNode:serverCacheNode index:query.index];
  383. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed isFullyInitialized:YES isFiltered:NO];
  384. } else {
  385. FCacheNode *persistenceServerCache = [self.persistenceManager serverCacheForQuery:query];
  386. if (persistenceServerCache.isFullyInitialized) {
  387. serverCache = persistenceServerCache;
  388. } else {
  389. serverCacheNode = [FEmptyNode emptyNode];
  390. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:query.path];
  391. [subtree forEachChild:^(NSString *childKey, FSyncPoint *childSyncPoint) {
  392. id<FNode> completeCache = [childSyncPoint completeServerCacheAtPath:[FPath empty]];
  393. if (completeCache) {
  394. serverCacheNode = [serverCacheNode updateImmediateChild:childKey withNewChild:completeCache];
  395. }
  396. }];
  397. // Fill the node with any available children we have
  398. [persistenceServerCache.node enumerateChildrenUsingBlock:^(NSString *key, id<FNode> node, BOOL *stop) {
  399. if (![serverCacheNode hasChild:key]) {
  400. serverCacheNode = [serverCacheNode updateImmediateChild:key withNewChild:node];
  401. }
  402. }];
  403. FIndexedNode *indexed = [FIndexedNode indexedNodeWithNode:serverCacheNode index:query.index];
  404. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed isFullyInitialized:NO isFiltered:NO];
  405. }
  406. }
  407. return serverCache;
  408. }
  409. /**
  410. * Remove event callback(s).
  411. *
  412. * If query is the default query, we'll check all queries for the specified eventRegistration.
  413. * If eventRegistration is null, we'll remove all callbacks for the specified query/queries.
  414. *
  415. * @param eventRegistration if nil, all callbacks are removed
  416. * @param cancelError If provided, appropriate cancel events will be returned
  417. * @return NSArray of FEvent to raise.
  418. */
  419. - (NSArray *) removeEventRegistration:(id <FEventRegistration>)eventRegistration
  420. forQuery:(FQuerySpec *)query
  421. cancelError:(NSError *)cancelError {
  422. // Find the syncPoint first. Then deal with whether or not it has matching listeners
  423. FPath *path = query.path;
  424. FSyncPoint *maybeSyncPoint = [self.syncPointTree valueAtPath:path];
  425. NSArray *cancelEvents = @[];
  426. // A removal on a default query affects all queries at that location. A removal on an indexed query, even one without
  427. // other query constraints, does *not* affect all queries at that location. So this check must be for 'default', and
  428. // not loadsAllData:
  429. if (maybeSyncPoint && ([query isDefault] || [maybeSyncPoint viewExistsForQuery:query])) {
  430. FTupleRemovedQueriesEvents *removedAndEvents = [maybeSyncPoint removeEventRegistration:eventRegistration forQuery:query cancelError:cancelError];
  431. if ([maybeSyncPoint isEmpty]) {
  432. self.syncPointTree = [self.syncPointTree removeValueAtPath:path];
  433. }
  434. NSArray *removed = removedAndEvents.removedQueries;
  435. cancelEvents = removedAndEvents.cancelEvents;
  436. // We may have just removed one of many listeners and can short-circuit this whole process
  437. // We may also not have removed a default listener, in which case all of the descendant listeners should already
  438. // be properly set up.
  439. //
  440. // Since indexed queries can shadow if they don't have other query constraints, check for loadsAllData: instead
  441. // of isDefault:
  442. NSUInteger defaultQueryIndex = [removed indexOfObjectPassingTest:^BOOL(FQuerySpec *q, NSUInteger idx, BOOL *stop) {
  443. return [q loadsAllData];
  444. }];
  445. BOOL removingDefault = defaultQueryIndex != NSNotFound;
  446. [removed enumerateObjectsUsingBlock:^(FQuerySpec *query, NSUInteger idx, BOOL *stop) {
  447. [self.persistenceManager setQueryInactive:query];
  448. }];
  449. NSNumber *covered = [self.syncPointTree findOnPath:path andApplyBlock:^id(FPath *relativePath, FSyncPoint *parentSyncPoint) {
  450. return [NSNumber numberWithBool:[parentSyncPoint hasCompleteView]];
  451. }];
  452. if (removingDefault && ![covered boolValue]) {
  453. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  454. // There are potentially child listeners. Determine what if any listens we need to send before executing
  455. // the removal
  456. if (![subtree isEmpty]) {
  457. // We need to fold over our subtree and collect the listeners to send
  458. NSArray *newViews = [self collectDistinctViewsForSubTree:subtree];
  459. // Ok, we've collected all the listens we need. Set them up.
  460. [newViews enumerateObjectsUsingBlock:^(FView *view, NSUInteger idx, BOOL *stop) {
  461. FQuerySpec *newQuery = view.query;
  462. FListenContainer *listenContainer = [self createListenerForView:view];
  463. self.listenProvider.startListening([self queryForListening:newQuery], [self tagForQuery:newQuery],
  464. listenContainer, listenContainer.onComplete);
  465. }];
  466. } else {
  467. // There's nothing below us, so nothing we need to start listening on
  468. }
  469. }
  470. // If we removed anything and we're not covered by a higher up listen, we need to stop listening on this query.
  471. // The above block has us covered in terms of making sure we're set up on listens lower in the tree.
  472. // Also, note that if we have a cancelError, it's already been removed at the provider level.
  473. if (![covered boolValue] && [removed count] > 0 && cancelError == nil) {
  474. // If we removed a default, then we weren't listening on any of the other queries here. Just cancel the one
  475. // default. Otherwise, we need to iterate through and cancel each individual query
  476. if (removingDefault) {
  477. // We don't tag default listeners
  478. self.listenProvider.stopListening([self queryForListening:query], nil);
  479. } else {
  480. [removed enumerateObjectsUsingBlock:^(FQuerySpec *queryToRemove, NSUInteger idx, BOOL *stop) {
  481. NSNumber *tagToRemove = [self.queryToTagMap objectForKey:queryToRemove];
  482. self.listenProvider.stopListening([self queryForListening:queryToRemove], tagToRemove);
  483. }];
  484. }
  485. }
  486. // Now, clear all the tags we're tracking for the removed listens.
  487. [self removeTags:removed];
  488. } else {
  489. // No-op, this listener must've been already removed
  490. }
  491. return cancelEvents;
  492. }
  493. - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)keepSynced {
  494. // Only do something if we actually need to add/remove an event registration
  495. if (keepSynced && ![self.keepSyncedQueries containsObject:query]) {
  496. [self addEventRegistration:[FKeepSyncedEventRegistration instance] forQuery:query];
  497. [self.keepSyncedQueries addObject:query];
  498. } else if (!keepSynced && [self.keepSyncedQueries containsObject:query]) {
  499. [self removeEventRegistration:[FKeepSyncedEventRegistration instance] forQuery:query cancelError:nil];
  500. [self.keepSyncedQueries removeObject:query];
  501. }
  502. }
  503. - (NSArray *) removeAllWrites {
  504. [self.persistenceManager removeAllUserWrites];
  505. NSArray *removedWrites = [self.pendingWriteTree removeAllWrites];
  506. if (removedWrites.count > 0) {
  507. FImmutableTree *affectedTree = [[FImmutableTree empty] setValue:@YES atPath:[FPath empty]];
  508. return [self applyOperationToSyncPoints:[[FAckUserWrite alloc] initWithPath:[FPath empty]
  509. affectedTree:affectedTree revert:YES]];
  510. } else {
  511. return @[];
  512. }
  513. }
  514. /**
  515. * Returns a complete cache, if we have one, of the data at a particular path. The location must have a listener above
  516. * it, but as this is only used by transaction code, that should always be the case anyways.
  517. *
  518. * Note: this method will *include* hidden writes from transaction with applyLocally set to false.
  519. * @param path The path to the data we want
  520. * @param writeIdsToExclude A specific set to be excluded
  521. */
  522. - (id <FNode>) calcCompleteEventCacheAtPath:(FPath *)path excludeWriteIds:(NSArray *)writeIdsToExclude {
  523. BOOL includeHiddenSets = YES;
  524. FWriteTree *writeTree = self.pendingWriteTree;
  525. id<FNode> serverCache = [self.syncPointTree findOnPath:path andApplyBlock:^id<FNode>(FPath *pathSoFar, FSyncPoint *syncPoint) {
  526. FPath *relativePath = [FPath relativePathFrom:pathSoFar to:path];
  527. id<FNode> serverCache = [syncPoint completeServerCacheAtPath:relativePath];
  528. if (serverCache) {
  529. return serverCache;
  530. } else {
  531. return nil;
  532. }
  533. }];
  534. return [writeTree calculateCompleteEventCacheAtPath:path completeServerCache:serverCache excludeWriteIds:writeIdsToExclude includeHiddenWrites:includeHiddenSets];
  535. }
  536. #pragma mark -
  537. #pragma mark Private Methods
  538. /**
  539. * This collapses multiple unfiltered views into a single view, since we only need a single
  540. * listener for them.
  541. * @return NSArray of FView
  542. */
  543. - (NSArray *) collectDistinctViewsForSubTree:(FImmutableTree *)subtree {
  544. return [subtree foldWithBlock:^NSArray *(FPath *relativePath, FSyncPoint *maybeChildSyncPoint, NSDictionary *childMap) {
  545. if (maybeChildSyncPoint && [maybeChildSyncPoint hasCompleteView]) {
  546. FView *completeView = [maybeChildSyncPoint completeView];
  547. return @[completeView];
  548. } else {
  549. // No complete view here, flatten any deeper listens into an array
  550. NSMutableArray *views = [[NSMutableArray alloc] init];
  551. if (maybeChildSyncPoint) {
  552. views = [[maybeChildSyncPoint queryViews] mutableCopy];
  553. }
  554. [childMap enumerateKeysAndObjectsUsingBlock:^(NSString *childKey, NSArray *childViews, BOOL *stop) {
  555. [views addObjectsFromArray:childViews];
  556. }];
  557. return views;
  558. }
  559. }];
  560. }
  561. /**
  562. * @param queries NSArray of FQuerySpec
  563. */
  564. - (void) removeTags:(NSArray *)queries {
  565. [queries enumerateObjectsUsingBlock:^(FQuerySpec *removedQuery, NSUInteger idx, BOOL *stop) {
  566. if (![removedQuery loadsAllData]) {
  567. // We should have a tag for this
  568. NSNumber *removedQueryTag = self.queryToTagMap[removedQuery];
  569. [self.queryToTagMap removeObjectForKey:removedQuery];
  570. [self.tagToQueryMap removeObjectForKey:removedQueryTag];
  571. }
  572. }];
  573. }
  574. - (FQuerySpec *) queryForListening:(FQuerySpec *)query {
  575. if (query.loadsAllData && !query.isDefault) {
  576. // We treat queries that load all data as default queries
  577. return [FQuerySpec defaultQueryAtPath:query.path];
  578. } else {
  579. return query;
  580. }
  581. }
  582. /**
  583. * For a given new listen, manage the de-duplication of outstanding subscriptions.
  584. * @return NSArray of FEvent events to support synchronous data sources
  585. */
  586. - (NSArray *) setupListenerOnQuery:(FQuerySpec *)query view:(FView *)view {
  587. FPath *path = query.path;
  588. NSNumber *tagId = [self tagForQuery:query];
  589. FListenContainer *listenContainer = [self createListenerForView:view];
  590. NSArray *events = self.listenProvider.startListening([self queryForListening:query], tagId, listenContainer,
  591. listenContainer.onComplete);
  592. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  593. // The root of this subtree has our query. We're here because we definitely need to send a listen for that, but we
  594. // may need to shadow other listens as well.
  595. if (tagId != nil) {
  596. NSAssert(![subtree.value hasCompleteView], @"If we're adding a query, it shouldn't be shadowed");
  597. } else {
  598. // Shadow everything at or below this location, this is a default listener.
  599. NSArray *queriesToStop = [subtree foldWithBlock:^id(FPath *relativePath, FSyncPoint *maybeChildSyncPoint, NSDictionary *childMap) {
  600. if (![relativePath isEmpty] && maybeChildSyncPoint != nil && [maybeChildSyncPoint hasCompleteView]) {
  601. return @[[maybeChildSyncPoint completeView].query];
  602. } else {
  603. // No default listener here, flatten any deeper queries into an array
  604. NSMutableArray *queries = [[NSMutableArray alloc] init];
  605. if (maybeChildSyncPoint != nil) {
  606. for (FView *view in [maybeChildSyncPoint queryViews]) {
  607. [queries addObject:view.query];
  608. }
  609. }
  610. [childMap enumerateKeysAndObjectsUsingBlock:^(NSString *key, NSArray *childQueries, BOOL *stop) {
  611. [queries addObjectsFromArray:childQueries];
  612. }];
  613. return queries;
  614. }
  615. }];
  616. for (FQuerySpec *queryToStop in queriesToStop) {
  617. self.listenProvider.stopListening([self queryForListening:queryToStop], [self tagForQuery:queryToStop]);
  618. }
  619. }
  620. return events;
  621. }
  622. - (FListenContainer *) createListenerForView:(FView *)view {
  623. FQuerySpec *query = view.query;
  624. NSNumber *tagId = [self tagForQuery:query];
  625. FListenContainer *listenContainer = [[FListenContainer alloc] initWithView:view
  626. onComplete:^(NSString *status) {
  627. if ([status isEqualToString:@"ok"]) {
  628. if (tagId != nil) {
  629. return [self applyTaggedListenCompleteAtPath:query.path tagId:tagId];
  630. } else {
  631. return [self applyListenCompleteAtPath:query.path];
  632. }
  633. } else {
  634. // If a listen failed, kill all of the listeners here, not just the one that triggered the error.
  635. // Note that this may need to be scoped to just this listener if we change permissions on filtered children
  636. NSError *error = [FUtilities errorForStatus:status andReason:nil];
  637. FFWarn(@"I-RDB038012", @"Listener at %@ failed: %@", query.path, status);
  638. return [self removeEventRegistration:nil forQuery:query cancelError:error];
  639. }
  640. }];
  641. return listenContainer;
  642. }
  643. /**
  644. * @return The query associated with the given tag, if we have one
  645. */
  646. - (FQuerySpec *) queryForTag:(NSNumber *)tagId {
  647. return self.tagToQueryMap[tagId];
  648. }
  649. /**
  650. * @return The tag associated with the given query
  651. */
  652. - (NSNumber *) tagForQuery:(FQuerySpec *)query {
  653. return self.queryToTagMap[query];
  654. }
  655. #pragma mark -
  656. #pragma mark applyOperation Helpers
  657. /**
  658. * A helper method that visits all descendant and ancestor SyncPoints, applying the operation.
  659. *
  660. * NOTES:
  661. * - Descendant SyncPoints will be visited first (since we raise events depth-first).
  662. * - We call applyOperation: on each SyncPoint passing three things:
  663. * 1. A version of the Operation that has been made relative to the SyncPoint location.
  664. * 2. A WriteTreeRef of any writes we have cached at the SyncPoint location.
  665. * 3. A snapshot Node with cached server data, if we have it.
  666. * - We concatenate all of the events returned by each SyncPoint and return the result.
  667. *
  668. * @return Array of FEvent
  669. */
  670. - (NSArray *) applyOperationToSyncPoints:(id<FOperation>)operation {
  671. return [self applyOperationHelper:operation syncPointTree:self.syncPointTree serverCache:nil
  672. writesCache:[self.pendingWriteTree childWritesForPath:[FPath empty]]];
  673. }
  674. /**
  675. * Recursive helper for applyOperationToSyncPoints_
  676. */
  677. - (NSArray *) applyOperationHelper:(id<FOperation>)operation syncPointTree:(FImmutableTree *)syncPointTree
  678. serverCache:(id<FNode>)serverCache writesCache:(FWriteTreeRef *)writesCache {
  679. if ([operation.path isEmpty]) {
  680. return [self applyOperationDescendantsHelper:operation syncPointTree:syncPointTree serverCache:serverCache writesCache:writesCache];
  681. } else {
  682. FSyncPoint *syncPoint = syncPointTree.value;
  683. // If we don't have cached server data, see if we can get it from this SyncPoint
  684. if (serverCache == nil && syncPoint != nil) {
  685. serverCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
  686. }
  687. NSMutableArray *events = [[NSMutableArray alloc] init];
  688. NSString *childKey = [operation.path getFront];
  689. id<FOperation> childOperation = [operation operationForChild:childKey];
  690. FImmutableTree *childTree = [syncPointTree.children get:childKey];
  691. if (childTree != nil && childOperation != nil) {
  692. id<FNode> childServerCache = serverCache ? [serverCache getImmediateChild:childKey] : nil;
  693. FWriteTreeRef *childWritesCache = [writesCache childWriteTreeRef:childKey];
  694. [events addObjectsFromArray:[self applyOperationHelper:childOperation syncPointTree:childTree serverCache:childServerCache writesCache:childWritesCache]];
  695. }
  696. if (syncPoint) {
  697. [events addObjectsFromArray:[syncPoint applyOperation:operation writesCache:writesCache serverCache:serverCache]];
  698. }
  699. return events;
  700. }
  701. }
  702. /**
  703. * Recursive helper for applyOperationToSyncPoints:
  704. */
  705. - (NSArray *) applyOperationDescendantsHelper:(id<FOperation>)operation syncPointTree:(FImmutableTree *)syncPointTree
  706. serverCache:(id<FNode>)serverCache writesCache:(FWriteTreeRef *)writesCache {
  707. FSyncPoint *syncPoint = syncPointTree.value;
  708. // If we don't have cached server data, see if we can get it from this SyncPoint
  709. id<FNode> resolvedServerCache;
  710. if (serverCache == nil & syncPoint != nil) {
  711. resolvedServerCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
  712. } else {
  713. resolvedServerCache = serverCache;
  714. }
  715. NSMutableArray *events = [[NSMutableArray alloc] init];
  716. [syncPointTree.children enumerateKeysAndObjectsUsingBlock:^(NSString *childKey, FImmutableTree *childTree, BOOL *stop) {
  717. id<FNode> childServerCache = nil;
  718. if (resolvedServerCache != nil) {
  719. childServerCache = [resolvedServerCache getImmediateChild:childKey];
  720. }
  721. FWriteTreeRef *childWritesCache = [writesCache childWriteTreeRef:childKey];
  722. id<FOperation> childOperation = [operation operationForChild:childKey];
  723. if (childOperation != nil) {
  724. [events addObjectsFromArray:[self applyOperationDescendantsHelper:childOperation
  725. syncPointTree:childTree
  726. serverCache:childServerCache
  727. writesCache:childWritesCache]];
  728. }
  729. }];
  730. if (syncPoint) {
  731. [events addObjectsFromArray:[syncPoint applyOperation:operation writesCache:writesCache serverCache:resolvedServerCache]];
  732. }
  733. return events;
  734. }
  735. @end