FSyncTree.m 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <FirebaseCore/FIRLogger.h>
  17. #import "FSyncTree.h"
  18. #import "FListenProvider.h"
  19. #import "FWriteTree.h"
  20. #import "FNode.h"
  21. #import "FPath.h"
  22. #import "FEventRegistration.h"
  23. #import "FImmutableTree.h"
  24. #import "FOperation.h"
  25. #import "FWriteTreeRef.h"
  26. #import "FOverwrite.h"
  27. #import "FOperationSource.h"
  28. #import "FMerge.h"
  29. #import "FAckUserWrite.h"
  30. #import "FView.h"
  31. #import "FSyncPoint.h"
  32. #import "FEmptyNode.h"
  33. #import "FQueryParams.h"
  34. #import "FQuerySpec.h"
  35. #import "FSnapshotHolder.h"
  36. #import "FChildrenNode.h"
  37. #import "FTupleRemovedQueriesEvents.h"
  38. #import "FAtomicNumber.h"
  39. #import "FEventRaiser.h"
  40. #import "FListenComplete.h"
  41. #import "FSnapshotUtilities.h"
  42. #import "FCacheNode.h"
  43. #import "FUtilities.h"
  44. #import "FCompoundWrite.h"
  45. #import "FWriteRecord.h"
  46. #import "FPersistenceManager.h"
  47. #import "FKeepSyncedEventRegistration.h"
  48. #import "FServerValues.h"
  49. #import "FCompoundHash.h"
  50. #import "FRangeMerge.h"
  51. // Size after which we start including the compound hash
  52. static const NSUInteger kFSizeThresholdForCompoundHash = 1024;
  53. @interface FListenContainer : NSObject<FSyncTreeHash>
  54. @property (nonatomic, strong) FView *view;
  55. @property (nonatomic, copy) fbt_nsarray_nsstring onComplete;
  56. @end
  57. @implementation FListenContainer
  58. - (instancetype)initWithView:(FView *)view onComplete:(fbt_nsarray_nsstring)onComplete {
  59. self = [super init];
  60. if (self != nil) {
  61. self->_view = view;
  62. self->_onComplete = onComplete;
  63. }
  64. return self;
  65. }
  66. - (id<FNode>)serverCache {
  67. return self.view.serverCache;
  68. }
  69. - (FCompoundHash *)compoundHash {
  70. return [FCompoundHash fromNode:[self serverCache]];
  71. }
  72. - (NSString *)simpleHash {
  73. return [[self serverCache] dataHash];
  74. }
  75. - (BOOL)includeCompoundHash {
  76. return [FSnapshotUtilities estimateSerializedNodeSize:[self serverCache]] > kFSizeThresholdForCompoundHash;
  77. }
  78. @end
  79. @interface FSyncTree ()
  80. /**
  81. * Tree of SyncPoints. There's a SyncPoint at any location that has 1 or more views.
  82. */
  83. @property (nonatomic, strong) FImmutableTree *syncPointTree;
  84. /**
  85. * A tree of all pending user writes (user-initiated set, transactions, updates, etc)
  86. */
  87. @property (nonatomic, strong) FWriteTree *pendingWriteTree;
  88. /**
  89. * Maps tagId -> FTuplePathQueryParams
  90. */
  91. @property (nonatomic, strong) NSMutableDictionary *tagToQueryMap;
  92. @property (nonatomic, strong) NSMutableDictionary *queryToTagMap;
  93. @property (nonatomic, strong) FListenProvider *listenProvider;
  94. @property (nonatomic, strong) FPersistenceManager *persistenceManager;
  95. @property (nonatomic, strong) FAtomicNumber *queryTagCounter;
  96. @property (nonatomic, strong) NSMutableSet *keepSyncedQueries;
  97. @end
  98. /**
  99. * SyncTree is the central class for managing event callback registration, data caching, views
  100. * (query processing), and event generation. There are typically two SyncTree instances for
  101. * each Repo, one for the normal Firebase data, and one for the .info data.
  102. *
  103. * It has a number of responsibilities, including:
  104. * - Tracking all user event callbacks (registered via addEventRegistration: and removeEventRegistration:).
  105. * - Applying and caching data changes for user setValue:, runTransactionBlock:, and updateChildValues: calls
  106. * (applyUserOverwriteAtPath:, applyUserMergeAtPath:).
  107. * - Applying and caching data changes for server data changes (applyServerOverwriteAtPath:,
  108. * applyServerMergeAtPath:).
  109. * - Generating user-facing events for server and user changes (all of the apply* methods
  110. * return the set of events that need to be raised as a result).
  111. * - Maintaining the appropriate set of server listens to ensure we are always subscribed
  112. * to the correct set of paths and queries to satisfy the current set of user event
  113. * callbacks (listens are started/stopped using the provided listenProvider).
  114. *
  115. * NOTE: Although SyncTree tracks event callbacks and calculates events to raise, the actual
  116. * events are returned to the caller rather than raised synchronously.
  117. */
  118. @implementation FSyncTree
  119. - (id) initWithListenProvider:(FListenProvider *)provider {
  120. return [self initWithPersistenceManager:nil listenProvider:provider];
  121. }
  122. - (id) initWithPersistenceManager:(FPersistenceManager *)persistenceManager listenProvider:(FListenProvider *)provider {
  123. self = [super init];
  124. if (self) {
  125. self.syncPointTree = [FImmutableTree empty];
  126. self.pendingWriteTree = [[FWriteTree alloc] init];
  127. self.tagToQueryMap = [[NSMutableDictionary alloc] init];
  128. self.queryToTagMap = [[NSMutableDictionary alloc] init];
  129. self.listenProvider = provider;
  130. self.persistenceManager = persistenceManager;
  131. self.queryTagCounter = [[FAtomicNumber alloc] init];
  132. self.keepSyncedQueries = [NSMutableSet set];
  133. }
  134. return self;
  135. }
  136. #pragma mark -
  137. #pragma mark Apply Operations
  138. /**
  139. * Apply data changes for a user-generated setValue: runTransactionBlock: updateChildValues:, etc.
  140. * @return NSArray of FEvent to raise.
  141. */
  142. - (NSArray *) applyUserOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData writeId:(NSInteger)writeId isVisible:(BOOL)visible {
  143. // Record pending write
  144. [self.pendingWriteTree addOverwriteAtPath:path newData:newData writeId:writeId isVisible:visible];
  145. if (!visible) {
  146. return @[];
  147. } else {
  148. FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource userInstance] path:path snap:newData];
  149. return [self applyOperationToSyncPoints:operation];
  150. }
  151. }
  152. /**
  153. * Apply the data from a user-generated updateChildValues: call
  154. * @return NSArray of FEvent to raise.
  155. */
  156. - (NSArray *) applyUserMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren writeId:(NSInteger)writeId {
  157. // Record pending merge
  158. [self.pendingWriteTree addMergeAtPath:path changedChildren:changedChildren writeId:writeId];
  159. FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource userInstance] path:path children:changedChildren];
  160. return [self applyOperationToSyncPoints:operation];
  161. }
  162. /**
  163. * Acknowledge a pending user write that was previously registered with applyUserOverwriteAtPath: or applyUserMergeAtPath:
  164. * TODO[offline]: Taking a serverClock here is awkward, but server values are awkward. :-(
  165. * @return NSArray of FEvent to raise.
  166. */
  167. - (NSArray *) ackUserWriteWithWriteId:(NSInteger)writeId revert:(BOOL)revert persist:(BOOL)persist clock:(id<FClock>)clock {
  168. FWriteRecord *write = [self.pendingWriteTree writeForId:writeId];
  169. BOOL needToReevaluate = [self.pendingWriteTree removeWriteId:writeId];
  170. if (write.visible) {
  171. if (persist) {
  172. [self.persistenceManager removeUserWrite:writeId];
  173. }
  174. if (!revert) {
  175. NSDictionary *serverValues = [FServerValues generateServerValues:clock];
  176. if ([write isOverwrite]) {
  177. id<FNode> resolvedNode = [FServerValues resolveDeferredValueSnapshot:write.overwrite withServerValues:serverValues];
  178. [self.persistenceManager applyUserWrite:resolvedNode toServerCacheAtPath:write.path];
  179. } else {
  180. FCompoundWrite *resolvedMerge = [FServerValues resolveDeferredValueCompoundWrite:write.merge withServerValues:serverValues];
  181. [self.persistenceManager applyUserMerge:resolvedMerge toServerCacheAtPath:write.path];
  182. }
  183. }
  184. }
  185. if (!needToReevaluate) {
  186. return @[];
  187. } else {
  188. __block FImmutableTree *affectedTree = [FImmutableTree empty];
  189. if (write.isOverwrite) {
  190. affectedTree = [affectedTree setValue:@YES atPath:[FPath empty]];
  191. } else {
  192. [write.merge enumerateWrites:^(FPath *path, id <FNode> node, BOOL *stop) {
  193. affectedTree = [affectedTree setValue:@YES atPath:path];
  194. }];
  195. }
  196. FAckUserWrite *operation = [[FAckUserWrite alloc] initWithPath:write.path affectedTree:affectedTree revert:revert];
  197. return [self applyOperationToSyncPoints:operation];
  198. }
  199. }
  200. /**
  201. * Apply new server data for the specified path
  202. * @return NSArray of FEvent to raise.
  203. */
  204. - (NSArray *) applyServerOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData {
  205. [self.persistenceManager updateServerCacheWithNode:newData forQuery:[FQuerySpec defaultQueryAtPath:path]];
  206. FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource serverInstance] path:path snap:newData];
  207. return [self applyOperationToSyncPoints:operation];
  208. }
  209. /**
  210. * Applied new server data to be merged in at the specified path
  211. * @return NSArray of FEvent to raise.
  212. */
  213. - (NSArray *) applyServerMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren {
  214. [self.persistenceManager updateServerCacheWithMerge:changedChildren atPath:path];
  215. FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource serverInstance] path:path children:changedChildren];
  216. return [self applyOperationToSyncPoints:operation];
  217. }
  218. - (NSArray *) applyServerRangeMergeAtPath:(FPath *)path updates:(NSArray *)ranges {
  219. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  220. if (syncPoint == nil) {
  221. // Removed view, so it's safe to just ignore this update
  222. return @[];
  223. } else {
  224. // This could be for any "complete" (unfiltered) view, and if there is more than one complete view, they should
  225. // each have the same cache so it doesn't matter which one we use.
  226. FView *view = [syncPoint completeView];
  227. if (view != nil) {
  228. id<FNode> serverNode = [view serverCache];
  229. for (FRangeMerge *merge in ranges) {
  230. serverNode = [merge applyToNode:serverNode];
  231. }
  232. return [self applyServerOverwriteAtPath:path newData:serverNode];
  233. } else {
  234. // There doesn't exist a view for this update, so it was removed and it's safe to just ignore this range
  235. // merge
  236. return @[];
  237. }
  238. }
  239. }
  240. /**
  241. * Apply a listen complete to a path
  242. * @return NSArray of FEvent to raise.
  243. */
  244. - (NSArray *) applyListenCompleteAtPath:(FPath *)path {
  245. [self.persistenceManager setQueryComplete:[FQuerySpec defaultQueryAtPath:path]];
  246. id<FOperation> operation = [[FListenComplete alloc] initWithSource:[FOperationSource serverInstance] path:path];
  247. return [self applyOperationToSyncPoints:operation];
  248. }
  249. /**
  250. * Apply a listen complete to a path
  251. * @return NSArray of FEvent to raise.
  252. */
  253. - (NSArray *) applyTaggedListenCompleteAtPath:(FPath *)path tagId:(NSNumber *)tagId {
  254. FQuerySpec *query = [self queryForTag:tagId];
  255. if (query != nil) {
  256. [self.persistenceManager setQueryComplete:query];
  257. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  258. id<FOperation> op = [[FListenComplete alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  259. path:relativePath];
  260. return [self applyTaggedOperation:op atPath:query.path];
  261. } else {
  262. // We've already removed the query. No big deal, ignore the update.
  263. return @[];
  264. }
  265. }
  266. /**
  267. * Internal helper method to apply tagged operation
  268. */
  269. - (NSArray *) applyTaggedOperation:(id<FOperation>)operation atPath:(FPath *)path {
  270. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  271. NSAssert(syncPoint != nil, @"Missing sync point for query tag that we're tracking.");
  272. FWriteTreeRef *writesCache = [self.pendingWriteTree childWritesForPath:path];
  273. return [syncPoint applyOperation:operation writesCache:writesCache serverCache:nil];
  274. }
  275. /**
  276. * Apply new server data for the specified tagged query
  277. * @return NSArray of FEvent to raise.
  278. */
  279. - (NSArray *) applyTaggedQueryOverwriteAtPath:(FPath *)path newData:(id <FNode>)newData tagId:(NSNumber *)tagId {
  280. FQuerySpec *query = [self queryForTag:tagId];
  281. if (query != nil) {
  282. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  283. FQuerySpec *queryToOverwrite = relativePath.isEmpty ? query : [FQuerySpec defaultQueryAtPath:path];
  284. [self.persistenceManager updateServerCacheWithNode:newData forQuery:queryToOverwrite];
  285. FOverwrite *operation = [[FOverwrite alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  286. path:relativePath snap:newData];
  287. return [self applyTaggedOperation:operation atPath:query.path];
  288. } else {
  289. // Query must have been removed already
  290. return @[];
  291. }
  292. }
  293. /**
  294. * Apply server data to be merged in for the specified tagged query
  295. * @return NSArray of FEvent to raise.
  296. */
  297. - (NSArray *) applyTaggedQueryMergeAtPath:(FPath *)path changedChildren:(FCompoundWrite *)changedChildren tagId:(NSNumber *)tagId {
  298. FQuerySpec *query = [self queryForTag:tagId];
  299. if (query != nil) {
  300. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  301. [self.persistenceManager updateServerCacheWithMerge:changedChildren atPath:path];
  302. FMerge *operation = [[FMerge alloc] initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  303. path:relativePath
  304. children:changedChildren];
  305. return [self applyTaggedOperation:operation atPath:query.path];
  306. } else {
  307. // We've already removed the query. No big deal, ignore the update.
  308. return @[];
  309. }
  310. }
  311. - (NSArray *) applyTaggedServerRangeMergeAtPath:(FPath *)path updates:(NSArray *)ranges tagId:(NSNumber *)tagId {
  312. FQuerySpec *query = [self queryForTag:tagId];
  313. if (query != nil) {
  314. NSAssert([path isEqual:query.path], @"Tagged update path and query path must match");
  315. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  316. NSAssert(syncPoint != nil, @"Missing sync point for query tag that we're tracking.");
  317. FView *view = [syncPoint viewForQuery:query];
  318. NSAssert(view != nil, @"Missing view for query tag that we're tracking");
  319. id<FNode> serverNode = [view serverCache];
  320. for (FRangeMerge *merge in ranges) {
  321. serverNode = [merge applyToNode:serverNode];
  322. }
  323. return [self applyTaggedQueryOverwriteAtPath:path newData:serverNode tagId:tagId];
  324. } else {
  325. // We've already removed the query. No big deal, ignore the update.
  326. return @[];
  327. }
  328. }
  329. /**
  330. * Add an event callback for the specified query
  331. * @return NSArray of FEvent to raise.
  332. */
  333. - (NSArray *) addEventRegistration:(id<FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query {
  334. FPath *path = query.path;
  335. __block BOOL foundAncestorDefaultView = NO;
  336. [self.syncPointTree forEachOnPath:query.path whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  337. foundAncestorDefaultView = foundAncestorDefaultView || [syncPoint hasCompleteView];
  338. return !foundAncestorDefaultView;
  339. }];
  340. [self.persistenceManager setQueryActive:query];
  341. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  342. if (syncPoint == nil) {
  343. syncPoint = [[FSyncPoint alloc] initWithPersistenceManager:self.persistenceManager];
  344. self.syncPointTree = [self.syncPointTree setValue:syncPoint atPath:path];
  345. }
  346. BOOL viewAlreadyExists = [syncPoint viewExistsForQuery:query];
  347. NSArray *events;
  348. if (viewAlreadyExists) {
  349. events = [syncPoint addEventRegistration:eventRegistration forExistingViewForQuery:query];
  350. } else {
  351. if (![query loadsAllData]) {
  352. // We need to track a tag for this query
  353. NSAssert(self.queryToTagMap[query] == nil, @"View does not exist, but we have a tag");
  354. NSNumber *tagId = [self.queryTagCounter getAndIncrement];
  355. self.queryToTagMap[query] = tagId;
  356. self.tagToQueryMap[tagId] = query;
  357. }
  358. FWriteTreeRef *writesCache = [self.pendingWriteTree childWritesForPath:path];
  359. FCacheNode *serverCache = [self serverCacheForQuery:query];
  360. events = [syncPoint addEventRegistration:eventRegistration
  361. forNonExistingViewForQuery:query
  362. writesCache:writesCache
  363. serverCache:serverCache];
  364. // There was no view and no default listen
  365. if (!foundAncestorDefaultView) {
  366. FView *view = [syncPoint viewForQuery:query];
  367. NSMutableArray *mutableEvents = [events mutableCopy];
  368. [mutableEvents addObjectsFromArray:[self setupListenerOnQuery:query view:view]];
  369. events = mutableEvents;
  370. }
  371. }
  372. return events;
  373. }
  374. - (FCacheNode *)serverCacheForQuery:(FQuerySpec *)query {
  375. __block id<FNode> serverCacheNode = nil;
  376. [self.syncPointTree forEachOnPath:query.path whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  377. FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint to:query.path];
  378. serverCacheNode = [syncPoint completeServerCacheAtPath:relativePath];
  379. return serverCacheNode == nil;
  380. }];
  381. FCacheNode *serverCache;
  382. if (serverCacheNode != nil) {
  383. FIndexedNode *indexed = [FIndexedNode indexedNodeWithNode:serverCacheNode index:query.index];
  384. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed isFullyInitialized:YES isFiltered:NO];
  385. } else {
  386. FCacheNode *persistenceServerCache = [self.persistenceManager serverCacheForQuery:query];
  387. if (persistenceServerCache.isFullyInitialized) {
  388. serverCache = persistenceServerCache;
  389. } else {
  390. serverCacheNode = [FEmptyNode emptyNode];
  391. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:query.path];
  392. [subtree forEachChild:^(NSString *childKey, FSyncPoint *childSyncPoint) {
  393. id<FNode> completeCache = [childSyncPoint completeServerCacheAtPath:[FPath empty]];
  394. if (completeCache) {
  395. serverCacheNode = [serverCacheNode updateImmediateChild:childKey withNewChild:completeCache];
  396. }
  397. }];
  398. // Fill the node with any available children we have
  399. [persistenceServerCache.node enumerateChildrenUsingBlock:^(NSString *key, id<FNode> node, BOOL *stop) {
  400. if (![serverCacheNode hasChild:key]) {
  401. serverCacheNode = [serverCacheNode updateImmediateChild:key withNewChild:node];
  402. }
  403. }];
  404. FIndexedNode *indexed = [FIndexedNode indexedNodeWithNode:serverCacheNode index:query.index];
  405. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed isFullyInitialized:NO isFiltered:NO];
  406. }
  407. }
  408. return serverCache;
  409. }
  410. /**
  411. * Remove event callback(s).
  412. *
  413. * If query is the default query, we'll check all queries for the specified eventRegistration.
  414. * If eventRegistration is null, we'll remove all callbacks for the specified query/queries.
  415. *
  416. * @param eventRegistration if nil, all callbacks are removed
  417. * @param cancelError If provided, appropriate cancel events will be returned
  418. * @return NSArray of FEvent to raise.
  419. */
  420. - (NSArray *) removeEventRegistration:(id <FEventRegistration>)eventRegistration
  421. forQuery:(FQuerySpec *)query
  422. cancelError:(NSError *)cancelError {
  423. // Find the syncPoint first. Then deal with whether or not it has matching listeners
  424. FPath *path = query.path;
  425. FSyncPoint *maybeSyncPoint = [self.syncPointTree valueAtPath:path];
  426. NSArray *cancelEvents = @[];
  427. // A removal on a default query affects all queries at that location. A removal on an indexed query, even one without
  428. // other query constraints, does *not* affect all queries at that location. So this check must be for 'default', and
  429. // not loadsAllData:
  430. if (maybeSyncPoint && ([query isDefault] || [maybeSyncPoint viewExistsForQuery:query])) {
  431. FTupleRemovedQueriesEvents *removedAndEvents = [maybeSyncPoint removeEventRegistration:eventRegistration forQuery:query cancelError:cancelError];
  432. if ([maybeSyncPoint isEmpty]) {
  433. self.syncPointTree = [self.syncPointTree removeValueAtPath:path];
  434. }
  435. NSArray *removed = removedAndEvents.removedQueries;
  436. cancelEvents = removedAndEvents.cancelEvents;
  437. // We may have just removed one of many listeners and can short-circuit this whole process
  438. // We may also not have removed a default listener, in which case all of the descendant listeners should already
  439. // be properly set up.
  440. //
  441. // Since indexed queries can shadow if they don't have other query constraints, check for loadsAllData: instead
  442. // of isDefault:
  443. NSUInteger defaultQueryIndex = [removed indexOfObjectPassingTest:^BOOL(FQuerySpec *q, NSUInteger idx, BOOL *stop) {
  444. return [q loadsAllData];
  445. }];
  446. BOOL removingDefault = defaultQueryIndex != NSNotFound;
  447. [removed enumerateObjectsUsingBlock:^(FQuerySpec *query, NSUInteger idx, BOOL *stop) {
  448. [self.persistenceManager setQueryInactive:query];
  449. }];
  450. NSNumber *covered = [self.syncPointTree findOnPath:path andApplyBlock:^id(FPath *relativePath, FSyncPoint *parentSyncPoint) {
  451. return [NSNumber numberWithBool:[parentSyncPoint hasCompleteView]];
  452. }];
  453. if (removingDefault && ![covered boolValue]) {
  454. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  455. // There are potentially child listeners. Determine what if any listens we need to send before executing
  456. // the removal
  457. if (![subtree isEmpty]) {
  458. // We need to fold over our subtree and collect the listeners to send
  459. NSArray *newViews = [self collectDistinctViewsForSubTree:subtree];
  460. // Ok, we've collected all the listens we need. Set them up.
  461. [newViews enumerateObjectsUsingBlock:^(FView *view, NSUInteger idx, BOOL *stop) {
  462. FQuerySpec *newQuery = view.query;
  463. FListenContainer *listenContainer = [self createListenerForView:view];
  464. self.listenProvider.startListening([self queryForListening:newQuery], [self tagForQuery:newQuery],
  465. listenContainer, listenContainer.onComplete);
  466. }];
  467. } else {
  468. // There's nothing below us, so nothing we need to start listening on
  469. }
  470. }
  471. // If we removed anything and we're not covered by a higher up listen, we need to stop listening on this query.
  472. // The above block has us covered in terms of making sure we're set up on listens lower in the tree.
  473. // Also, note that if we have a cancelError, it's already been removed at the provider level.
  474. if (![covered boolValue] && [removed count] > 0 && cancelError == nil) {
  475. // If we removed a default, then we weren't listening on any of the other queries here. Just cancel the one
  476. // default. Otherwise, we need to iterate through and cancel each individual query
  477. if (removingDefault) {
  478. // We don't tag default listeners
  479. self.listenProvider.stopListening([self queryForListening:query], nil);
  480. } else {
  481. [removed enumerateObjectsUsingBlock:^(FQuerySpec *queryToRemove, NSUInteger idx, BOOL *stop) {
  482. NSNumber *tagToRemove = [self.queryToTagMap objectForKey:queryToRemove];
  483. self.listenProvider.stopListening([self queryForListening:queryToRemove], tagToRemove);
  484. }];
  485. }
  486. }
  487. // Now, clear all the tags we're tracking for the removed listens.
  488. [self removeTags:removed];
  489. } else {
  490. // No-op, this listener must've been already removed
  491. }
  492. return cancelEvents;
  493. }
  494. - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)keepSynced {
  495. // Only do something if we actually need to add/remove an event registration
  496. if (keepSynced && ![self.keepSyncedQueries containsObject:query]) {
  497. [self addEventRegistration:[FKeepSyncedEventRegistration instance] forQuery:query];
  498. [self.keepSyncedQueries addObject:query];
  499. } else if (!keepSynced && [self.keepSyncedQueries containsObject:query]) {
  500. [self removeEventRegistration:[FKeepSyncedEventRegistration instance] forQuery:query cancelError:nil];
  501. [self.keepSyncedQueries removeObject:query];
  502. }
  503. }
  504. - (NSArray *) removeAllWrites {
  505. [self.persistenceManager removeAllUserWrites];
  506. NSArray *removedWrites = [self.pendingWriteTree removeAllWrites];
  507. if (removedWrites.count > 0) {
  508. FImmutableTree *affectedTree = [[FImmutableTree empty] setValue:@YES atPath:[FPath empty]];
  509. return [self applyOperationToSyncPoints:[[FAckUserWrite alloc] initWithPath:[FPath empty]
  510. affectedTree:affectedTree revert:YES]];
  511. } else {
  512. return @[];
  513. }
  514. }
  515. /**
  516. * Returns a complete cache, if we have one, of the data at a particular path. The location must have a listener above
  517. * it, but as this is only used by transaction code, that should always be the case anyways.
  518. *
  519. * Note: this method will *include* hidden writes from transaction with applyLocally set to false.
  520. * @param path The path to the data we want
  521. * @param writeIdsToExclude A specific set to be excluded
  522. */
  523. - (id <FNode>) calcCompleteEventCacheAtPath:(FPath *)path excludeWriteIds:(NSArray *)writeIdsToExclude {
  524. BOOL includeHiddenSets = YES;
  525. FWriteTree *writeTree = self.pendingWriteTree;
  526. id<FNode> serverCache = [self.syncPointTree findOnPath:path andApplyBlock:^id<FNode>(FPath *pathSoFar, FSyncPoint *syncPoint) {
  527. FPath *relativePath = [FPath relativePathFrom:pathSoFar to:path];
  528. id<FNode> serverCache = [syncPoint completeServerCacheAtPath:relativePath];
  529. if (serverCache) {
  530. return serverCache;
  531. } else {
  532. return nil;
  533. }
  534. }];
  535. return [writeTree calculateCompleteEventCacheAtPath:path completeServerCache:serverCache excludeWriteIds:writeIdsToExclude includeHiddenWrites:includeHiddenSets];
  536. }
  537. #pragma mark -
  538. #pragma mark Private Methods
  539. /**
  540. * This collapses multiple unfiltered views into a single view, since we only need a single
  541. * listener for them.
  542. * @return NSArray of FView
  543. */
  544. - (NSArray *) collectDistinctViewsForSubTree:(FImmutableTree *)subtree {
  545. return [subtree foldWithBlock:^NSArray *(FPath *relativePath, FSyncPoint *maybeChildSyncPoint, NSDictionary *childMap) {
  546. if (maybeChildSyncPoint && [maybeChildSyncPoint hasCompleteView]) {
  547. FView *completeView = [maybeChildSyncPoint completeView];
  548. return @[completeView];
  549. } else {
  550. // No complete view here, flatten any deeper listens into an array
  551. NSMutableArray *views = [[NSMutableArray alloc] init];
  552. if (maybeChildSyncPoint) {
  553. views = [[maybeChildSyncPoint queryViews] mutableCopy];
  554. }
  555. [childMap enumerateKeysAndObjectsUsingBlock:^(NSString *childKey, NSArray *childViews, BOOL *stop) {
  556. [views addObjectsFromArray:childViews];
  557. }];
  558. return views;
  559. }
  560. }];
  561. }
  562. /**
  563. * @param queries NSArray of FQuerySpec
  564. */
  565. - (void) removeTags:(NSArray *)queries {
  566. [queries enumerateObjectsUsingBlock:^(FQuerySpec *removedQuery, NSUInteger idx, BOOL *stop) {
  567. if (![removedQuery loadsAllData]) {
  568. // We should have a tag for this
  569. NSNumber *removedQueryTag = self.queryToTagMap[removedQuery];
  570. [self.queryToTagMap removeObjectForKey:removedQuery];
  571. [self.tagToQueryMap removeObjectForKey:removedQueryTag];
  572. }
  573. }];
  574. }
  575. - (FQuerySpec *) queryForListening:(FQuerySpec *)query {
  576. if (query.loadsAllData && !query.isDefault) {
  577. // We treat queries that load all data as default queries
  578. return [FQuerySpec defaultQueryAtPath:query.path];
  579. } else {
  580. return query;
  581. }
  582. }
  583. /**
  584. * For a given new listen, manage the de-duplication of outstanding subscriptions.
  585. * @return NSArray of FEvent events to support synchronous data sources
  586. */
  587. - (NSArray *) setupListenerOnQuery:(FQuerySpec *)query view:(FView *)view {
  588. FPath *path = query.path;
  589. NSNumber *tagId = [self tagForQuery:query];
  590. FListenContainer *listenContainer = [self createListenerForView:view];
  591. NSArray *events = self.listenProvider.startListening([self queryForListening:query], tagId, listenContainer,
  592. listenContainer.onComplete);
  593. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  594. // The root of this subtree has our query. We're here because we definitely need to send a listen for that, but we
  595. // may need to shadow other listens as well.
  596. if (tagId != nil) {
  597. NSAssert(![subtree.value hasCompleteView], @"If we're adding a query, it shouldn't be shadowed");
  598. } else {
  599. // Shadow everything at or below this location, this is a default listener.
  600. NSArray *queriesToStop = [subtree foldWithBlock:^id(FPath *relativePath, FSyncPoint *maybeChildSyncPoint, NSDictionary *childMap) {
  601. if (![relativePath isEmpty] && maybeChildSyncPoint != nil && [maybeChildSyncPoint hasCompleteView]) {
  602. return @[[maybeChildSyncPoint completeView].query];
  603. } else {
  604. // No default listener here, flatten any deeper queries into an array
  605. NSMutableArray *queries = [[NSMutableArray alloc] init];
  606. if (maybeChildSyncPoint != nil) {
  607. for (FView *view in [maybeChildSyncPoint queryViews]) {
  608. [queries addObject:view.query];
  609. }
  610. }
  611. [childMap enumerateKeysAndObjectsUsingBlock:^(NSString *key, NSArray *childQueries, BOOL *stop) {
  612. [queries addObjectsFromArray:childQueries];
  613. }];
  614. return queries;
  615. }
  616. }];
  617. for (FQuerySpec *queryToStop in queriesToStop) {
  618. self.listenProvider.stopListening([self queryForListening:queryToStop], [self tagForQuery:queryToStop]);
  619. }
  620. }
  621. return events;
  622. }
  623. - (FListenContainer *) createListenerForView:(FView *)view {
  624. FQuerySpec *query = view.query;
  625. NSNumber *tagId = [self tagForQuery:query];
  626. FListenContainer *listenContainer = [[FListenContainer alloc] initWithView:view
  627. onComplete:^(NSString *status) {
  628. if ([status isEqualToString:@"ok"]) {
  629. if (tagId != nil) {
  630. return [self applyTaggedListenCompleteAtPath:query.path tagId:tagId];
  631. } else {
  632. return [self applyListenCompleteAtPath:query.path];
  633. }
  634. } else {
  635. // If a listen failed, kill all of the listeners here, not just the one that triggered the error.
  636. // Note that this may need to be scoped to just this listener if we change permissions on filtered children
  637. NSError *error = [FUtilities errorForStatus:status andReason:nil];
  638. FFWarn(@"I-RDB038012", @"Listener at %@ failed: %@", query.path, status);
  639. return [self removeEventRegistration:nil forQuery:query cancelError:error];
  640. }
  641. }];
  642. return listenContainer;
  643. }
  644. /**
  645. * @return The query associated with the given tag, if we have one
  646. */
  647. - (FQuerySpec *) queryForTag:(NSNumber *)tagId {
  648. return self.tagToQueryMap[tagId];
  649. }
  650. /**
  651. * @return The tag associated with the given query
  652. */
  653. - (NSNumber *) tagForQuery:(FQuerySpec *)query {
  654. return self.queryToTagMap[query];
  655. }
  656. #pragma mark -
  657. #pragma mark applyOperation Helpers
  658. /**
  659. * A helper method that visits all descendant and ancestor SyncPoints, applying the operation.
  660. *
  661. * NOTES:
  662. * - Descendant SyncPoints will be visited first (since we raise events depth-first).
  663. * - We call applyOperation: on each SyncPoint passing three things:
  664. * 1. A version of the Operation that has been made relative to the SyncPoint location.
  665. * 2. A WriteTreeRef of any writes we have cached at the SyncPoint location.
  666. * 3. A snapshot Node with cached server data, if we have it.
  667. * - We concatenate all of the events returned by each SyncPoint and return the result.
  668. *
  669. * @return Array of FEvent
  670. */
  671. - (NSArray *) applyOperationToSyncPoints:(id<FOperation>)operation {
  672. return [self applyOperationHelper:operation syncPointTree:self.syncPointTree serverCache:nil
  673. writesCache:[self.pendingWriteTree childWritesForPath:[FPath empty]]];
  674. }
  675. /**
  676. * Recursive helper for applyOperationToSyncPoints_
  677. */
  678. - (NSArray *) applyOperationHelper:(id<FOperation>)operation syncPointTree:(FImmutableTree *)syncPointTree
  679. serverCache:(id<FNode>)serverCache writesCache:(FWriteTreeRef *)writesCache {
  680. if ([operation.path isEmpty]) {
  681. return [self applyOperationDescendantsHelper:operation syncPointTree:syncPointTree serverCache:serverCache writesCache:writesCache];
  682. } else {
  683. FSyncPoint *syncPoint = syncPointTree.value;
  684. // If we don't have cached server data, see if we can get it from this SyncPoint
  685. if (serverCache == nil && syncPoint != nil) {
  686. serverCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
  687. }
  688. NSMutableArray *events = [[NSMutableArray alloc] init];
  689. NSString *childKey = [operation.path getFront];
  690. id<FOperation> childOperation = [operation operationForChild:childKey];
  691. FImmutableTree *childTree = [syncPointTree.children get:childKey];
  692. if (childTree != nil && childOperation != nil) {
  693. id<FNode> childServerCache = serverCache ? [serverCache getImmediateChild:childKey] : nil;
  694. FWriteTreeRef *childWritesCache = [writesCache childWriteTreeRef:childKey];
  695. [events addObjectsFromArray:[self applyOperationHelper:childOperation syncPointTree:childTree serverCache:childServerCache writesCache:childWritesCache]];
  696. }
  697. if (syncPoint) {
  698. [events addObjectsFromArray:[syncPoint applyOperation:operation writesCache:writesCache serverCache:serverCache]];
  699. }
  700. return events;
  701. }
  702. }
  703. /**
  704. * Recursive helper for applyOperationToSyncPoints:
  705. */
  706. - (NSArray *) applyOperationDescendantsHelper:(id<FOperation>)operation syncPointTree:(FImmutableTree *)syncPointTree
  707. serverCache:(id<FNode>)serverCache writesCache:(FWriteTreeRef *)writesCache {
  708. FSyncPoint *syncPoint = syncPointTree.value;
  709. // If we don't have cached server data, see if we can get it from this SyncPoint
  710. id<FNode> resolvedServerCache;
  711. if (serverCache == nil & syncPoint != nil) {
  712. resolvedServerCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
  713. } else {
  714. resolvedServerCache = serverCache;
  715. }
  716. NSMutableArray *events = [[NSMutableArray alloc] init];
  717. [syncPointTree.children enumerateKeysAndObjectsUsingBlock:^(NSString *childKey, FImmutableTree *childTree, BOOL *stop) {
  718. id<FNode> childServerCache = nil;
  719. if (resolvedServerCache != nil) {
  720. childServerCache = [resolvedServerCache getImmediateChild:childKey];
  721. }
  722. FWriteTreeRef *childWritesCache = [writesCache childWriteTreeRef:childKey];
  723. id<FOperation> childOperation = [operation operationForChild:childKey];
  724. if (childOperation != nil) {
  725. [events addObjectsFromArray:[self applyOperationDescendantsHelper:childOperation
  726. syncPointTree:childTree
  727. serverCache:childServerCache
  728. writesCache:childWritesCache]];
  729. }
  730. }];
  731. if (syncPoint) {
  732. [events addObjectsFromArray:[syncPoint applyOperation:operation writesCache:writesCache serverCache:resolvedServerCache]];
  733. }
  734. return events;
  735. }
  736. @end