FSyncTree.m 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FSyncTree.h"
  17. #import "FAckUserWrite.h"
  18. #import "FAtomicNumber.h"
  19. #import "FCacheNode.h"
  20. #import "FChildrenNode.h"
  21. #import "FCompoundHash.h"
  22. #import "FCompoundWrite.h"
  23. #import "FEmptyNode.h"
  24. #import "FEventRaiser.h"
  25. #import "FEventRegistration.h"
  26. #import "FImmutableTree.h"
  27. #import "FKeepSyncedEventRegistration.h"
  28. #import "FListenComplete.h"
  29. #import "FListenProvider.h"
  30. #import "FMerge.h"
  31. #import "FNode.h"
  32. #import "FOperation.h"
  33. #import "FOperationSource.h"
  34. #import "FOverwrite.h"
  35. #import "FPath.h"
  36. #import "FPersistenceManager.h"
  37. #import "FQueryParams.h"
  38. #import "FQuerySpec.h"
  39. #import "FRangeMerge.h"
  40. #import "FServerValues.h"
  41. #import "FSnapshotHolder.h"
  42. #import "FSnapshotUtilities.h"
  43. #import "FSyncPoint.h"
  44. #import "FTupleRemovedQueriesEvents.h"
  45. #import "FUtilities.h"
  46. #import "FView.h"
  47. #import "FWriteRecord.h"
  48. #import "FWriteTree.h"
  49. #import "FWriteTreeRef.h"
  50. #import <FirebaseCore/FIRLogger.h>
  51. // Size after which we start including the compound hash
  52. static const NSUInteger kFSizeThresholdForCompoundHash = 1024;
  53. @interface FListenContainer : NSObject <FSyncTreeHash>
  54. @property(nonatomic, strong) FView *view;
  55. @property(nonatomic, copy) fbt_nsarray_nsstring onComplete;
  56. @end
  57. @implementation FListenContainer
  58. - (instancetype)initWithView:(FView *)view
  59. onComplete:(fbt_nsarray_nsstring)onComplete {
  60. self = [super init];
  61. if (self != nil) {
  62. self->_view = view;
  63. self->_onComplete = onComplete;
  64. }
  65. return self;
  66. }
  67. - (id<FNode>)serverCache {
  68. return self.view.serverCache;
  69. }
  70. - (FCompoundHash *)compoundHash {
  71. return [FCompoundHash fromNode:[self serverCache]];
  72. }
  73. - (NSString *)simpleHash {
  74. return [[self serverCache] dataHash];
  75. }
  76. - (BOOL)includeCompoundHash {
  77. return [FSnapshotUtilities estimateSerializedNodeSize:[self serverCache]] >
  78. kFSizeThresholdForCompoundHash;
  79. }
  80. @end
  81. @interface FSyncTree ()
  82. /**
  83. * Tree of SyncPoints. There's a SyncPoint at any location that has 1 or more
  84. * views.
  85. */
  86. @property(nonatomic, strong) FImmutableTree *syncPointTree;
  87. /**
  88. * A tree of all pending user writes (user-initiated set, transactions, updates,
  89. * etc)
  90. */
  91. @property(nonatomic, strong) FWriteTree *pendingWriteTree;
  92. /**
  93. * Maps tagId -> FTuplePathQueryParams
  94. */
  95. @property(nonatomic, strong) NSMutableDictionary *tagToQueryMap;
  96. @property(nonatomic, strong) NSMutableDictionary *queryToTagMap;
  97. @property(nonatomic, strong) FListenProvider *listenProvider;
  98. @property(nonatomic, strong) FPersistenceManager *persistenceManager;
  99. @property(nonatomic, strong) FAtomicNumber *queryTagCounter;
  100. @property(nonatomic, strong) NSMutableSet *keepSyncedQueries;
  101. @end
  102. /**
  103. * SyncTree is the central class for managing event callback registration, data
  104. * caching, views (query processing), and event generation. There are typically
  105. * two SyncTree instances for each Repo, one for the normal Firebase data, and
  106. * one for the .info data.
  107. *
  108. * It has a number of responsibilities, including:
  109. * - Tracking all user event callbacks (registered via addEventRegistration:
  110. * and removeEventRegistration:).
  111. * - Applying and caching data changes for user setValue:,
  112. * runTransactionBlock:, and updateChildValues: calls
  113. * (applyUserOverwriteAtPath:, applyUserMergeAtPath:).
  114. * - Applying and caching data changes for server data changes
  115. * (applyServerOverwriteAtPath:, applyServerMergeAtPath:).
  116. * - Generating user-facing events for server and user changes (all of the
  117. * apply* methods return the set of events that need to be raised as a result).
  118. * - Maintaining the appropriate set of server listens to ensure we are always
  119. * subscribed to the correct set of paths and queries to satisfy the current set
  120. * of user event callbacks (listens are started/stopped using the provided
  121. * listenProvider).
  122. *
  123. * NOTE: Although SyncTree tracks event callbacks and calculates events to
  124. * raise, the actual events are returned to the caller rather than raised
  125. * synchronously.
  126. */
  127. @implementation FSyncTree
  128. - (id)initWithListenProvider:(FListenProvider *)provider {
  129. return [self initWithPersistenceManager:nil listenProvider:provider];
  130. }
  131. - (id)initWithPersistenceManager:(FPersistenceManager *)persistenceManager
  132. listenProvider:(FListenProvider *)provider {
  133. self = [super init];
  134. if (self) {
  135. self.syncPointTree = [FImmutableTree empty];
  136. self.pendingWriteTree = [[FWriteTree alloc] init];
  137. self.tagToQueryMap = [[NSMutableDictionary alloc] init];
  138. self.queryToTagMap = [[NSMutableDictionary alloc] init];
  139. self.listenProvider = provider;
  140. self.persistenceManager = persistenceManager;
  141. self.queryTagCounter = [[FAtomicNumber alloc] init];
  142. self.keepSyncedQueries = [NSMutableSet set];
  143. }
  144. return self;
  145. }
  146. #pragma mark -
  147. #pragma mark Apply Operations
  148. /**
  149. * Apply data changes for a user-generated setValue: runTransactionBlock:
  150. * updateChildValues:, etc.
  151. * @return NSArray of FEvent to raise.
  152. */
  153. - (NSArray *)applyUserOverwriteAtPath:(FPath *)path
  154. newData:(id<FNode>)newData
  155. writeId:(NSInteger)writeId
  156. isVisible:(BOOL)visible {
  157. // Record pending write
  158. [self.pendingWriteTree addOverwriteAtPath:path
  159. newData:newData
  160. writeId:writeId
  161. isVisible:visible];
  162. if (!visible) {
  163. return @[];
  164. } else {
  165. FOverwrite *operation =
  166. [[FOverwrite alloc] initWithSource:[FOperationSource userInstance]
  167. path:path
  168. snap:newData];
  169. return [self applyOperationToSyncPoints:operation];
  170. }
  171. }
  172. /**
  173. * Apply the data from a user-generated updateChildValues: call
  174. * @return NSArray of FEvent to raise.
  175. */
  176. - (NSArray *)applyUserMergeAtPath:(FPath *)path
  177. changedChildren:(FCompoundWrite *)changedChildren
  178. writeId:(NSInteger)writeId {
  179. // Record pending merge
  180. [self.pendingWriteTree addMergeAtPath:path
  181. changedChildren:changedChildren
  182. writeId:writeId];
  183. FMerge *operation =
  184. [[FMerge alloc] initWithSource:[FOperationSource userInstance]
  185. path:path
  186. children:changedChildren];
  187. return [self applyOperationToSyncPoints:operation];
  188. }
  189. /**
  190. * Acknowledge a pending user write that was previously registered with
  191. * applyUserOverwriteAtPath: or applyUserMergeAtPath:
  192. * TODO[offline]: Taking a serverClock here is awkward, but server values are
  193. * awkward. :-(
  194. * @return NSArray of FEvent to raise.
  195. */
  196. - (NSArray *)ackUserWriteWithWriteId:(NSInteger)writeId
  197. revert:(BOOL)revert
  198. persist:(BOOL)persist
  199. clock:(id<FClock>)clock {
  200. FWriteRecord *write = [self.pendingWriteTree writeForId:writeId];
  201. BOOL needToReevaluate = [self.pendingWriteTree removeWriteId:writeId];
  202. if (write.visible) {
  203. if (persist) {
  204. [self.persistenceManager removeUserWrite:writeId];
  205. }
  206. if (!revert) {
  207. NSDictionary *serverValues =
  208. [FServerValues generateServerValues:clock];
  209. if ([write isOverwrite]) {
  210. id<FNode> resolvedNode =
  211. [FServerValues resolveDeferredValueSnapshot:write.overwrite
  212. withSyncTree:self
  213. atPath:write.path
  214. serverValues:serverValues];
  215. [self.persistenceManager applyUserWrite:resolvedNode
  216. toServerCacheAtPath:write.path];
  217. } else {
  218. FCompoundWrite *resolvedMerge = [FServerValues
  219. resolveDeferredValueCompoundWrite:write.merge
  220. withSyncTree:self
  221. atPath:write.path
  222. serverValues:serverValues];
  223. [self.persistenceManager applyUserMerge:resolvedMerge
  224. toServerCacheAtPath:write.path];
  225. }
  226. }
  227. }
  228. if (!needToReevaluate) {
  229. return @[];
  230. } else {
  231. __block FImmutableTree *affectedTree = [FImmutableTree empty];
  232. if (write.isOverwrite) {
  233. affectedTree = [affectedTree setValue:@YES atPath:[FPath empty]];
  234. } else {
  235. [write.merge
  236. enumerateWrites:^(FPath *path, id<FNode> node, BOOL *stop) {
  237. affectedTree = [affectedTree setValue:@YES atPath:path];
  238. }];
  239. }
  240. FAckUserWrite *operation =
  241. [[FAckUserWrite alloc] initWithPath:write.path
  242. affectedTree:affectedTree
  243. revert:revert];
  244. return [self applyOperationToSyncPoints:operation];
  245. }
  246. }
  247. /**
  248. * Apply new server data for the specified path
  249. * @return NSArray of FEvent to raise.
  250. */
  251. - (NSArray *)applyServerOverwriteAtPath:(FPath *)path
  252. newData:(id<FNode>)newData {
  253. [self.persistenceManager
  254. updateServerCacheWithNode:newData
  255. forQuery:[FQuerySpec defaultQueryAtPath:path]];
  256. FOverwrite *operation =
  257. [[FOverwrite alloc] initWithSource:[FOperationSource serverInstance]
  258. path:path
  259. snap:newData];
  260. return [self applyOperationToSyncPoints:operation];
  261. }
  262. /**
  263. * Applied new server data to be merged in at the specified path
  264. * @return NSArray of FEvent to raise.
  265. */
  266. - (NSArray *)applyServerMergeAtPath:(FPath *)path
  267. changedChildren:(FCompoundWrite *)changedChildren {
  268. [self.persistenceManager updateServerCacheWithMerge:changedChildren
  269. atPath:path];
  270. FMerge *operation =
  271. [[FMerge alloc] initWithSource:[FOperationSource serverInstance]
  272. path:path
  273. children:changedChildren];
  274. return [self applyOperationToSyncPoints:operation];
  275. }
  276. - (NSArray *)applyServerRangeMergeAtPath:(FPath *)path
  277. updates:(NSArray *)ranges {
  278. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  279. if (syncPoint == nil) {
  280. // Removed view, so it's safe to just ignore this update
  281. return @[];
  282. } else {
  283. // This could be for any "complete" (unfiltered) view, and if there is
  284. // more than one complete view, they should each have the same cache so
  285. // it doesn't matter which one we use.
  286. FView *view = [syncPoint completeView];
  287. if (view != nil) {
  288. id<FNode> serverNode = [view serverCache];
  289. for (FRangeMerge *merge in ranges) {
  290. serverNode = [merge applyToNode:serverNode];
  291. }
  292. return [self applyServerOverwriteAtPath:path newData:serverNode];
  293. } else {
  294. // There doesn't exist a view for this update, so it was removed and
  295. // it's safe to just ignore this range merge
  296. return @[];
  297. }
  298. }
  299. }
  300. /**
  301. * Apply a listen complete to a path
  302. * @return NSArray of FEvent to raise.
  303. */
  304. - (NSArray *)applyListenCompleteAtPath:(FPath *)path {
  305. [self.persistenceManager
  306. setQueryComplete:[FQuerySpec defaultQueryAtPath:path]];
  307. id<FOperation> operation = [[FListenComplete alloc]
  308. initWithSource:[FOperationSource serverInstance]
  309. path:path];
  310. return [self applyOperationToSyncPoints:operation];
  311. }
  312. /**
  313. * Apply a listen complete to a path
  314. * @return NSArray of FEvent to raise.
  315. */
  316. - (NSArray *)applyTaggedListenCompleteAtPath:(FPath *)path
  317. tagId:(NSNumber *)tagId {
  318. FQuerySpec *query = [self queryForTag:tagId];
  319. if (query != nil) {
  320. [self.persistenceManager setQueryComplete:query];
  321. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  322. id<FOperation> op = [[FListenComplete alloc]
  323. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  324. path:relativePath];
  325. return [self applyTaggedOperation:op atPath:query.path];
  326. } else {
  327. // We've already removed the query. No big deal, ignore the update.
  328. return @[];
  329. }
  330. }
  331. /**
  332. * Internal helper method to apply tagged operation
  333. */
  334. - (NSArray *)applyTaggedOperation:(id<FOperation>)operation
  335. atPath:(FPath *)path {
  336. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  337. NSAssert(syncPoint != nil,
  338. @"Missing sync point for query tag that we're tracking.");
  339. FWriteTreeRef *writesCache =
  340. [self.pendingWriteTree childWritesForPath:path];
  341. return [syncPoint applyOperation:operation
  342. writesCache:writesCache
  343. serverCache:nil];
  344. }
  345. /**
  346. * Apply new server data for the specified tagged query
  347. * @return NSArray of FEvent to raise.
  348. */
  349. - (NSArray *)applyTaggedQueryOverwriteAtPath:(FPath *)path
  350. newData:(id<FNode>)newData
  351. tagId:(NSNumber *)tagId {
  352. FQuerySpec *query = [self queryForTag:tagId];
  353. if (query != nil) {
  354. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  355. FQuerySpec *queryToOverwrite =
  356. relativePath.isEmpty ? query : [FQuerySpec defaultQueryAtPath:path];
  357. [self.persistenceManager updateServerCacheWithNode:newData
  358. forQuery:queryToOverwrite];
  359. FOverwrite *operation = [[FOverwrite alloc]
  360. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  361. path:relativePath
  362. snap:newData];
  363. return [self applyTaggedOperation:operation atPath:query.path];
  364. } else {
  365. // Query must have been removed already
  366. return @[];
  367. }
  368. }
  369. /**
  370. * Apply server data to be merged in for the specified tagged query
  371. * @return NSArray of FEvent to raise.
  372. */
  373. - (NSArray *)applyTaggedQueryMergeAtPath:(FPath *)path
  374. changedChildren:(FCompoundWrite *)changedChildren
  375. tagId:(NSNumber *)tagId {
  376. FQuerySpec *query = [self queryForTag:tagId];
  377. if (query != nil) {
  378. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  379. [self.persistenceManager updateServerCacheWithMerge:changedChildren
  380. atPath:path];
  381. FMerge *operation = [[FMerge alloc]
  382. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  383. path:relativePath
  384. children:changedChildren];
  385. return [self applyTaggedOperation:operation atPath:query.path];
  386. } else {
  387. // We've already removed the query. No big deal, ignore the update.
  388. return @[];
  389. }
  390. }
  391. - (NSArray *)applyTaggedServerRangeMergeAtPath:(FPath *)path
  392. updates:(NSArray *)ranges
  393. tagId:(NSNumber *)tagId {
  394. FQuerySpec *query = [self queryForTag:tagId];
  395. if (query != nil) {
  396. NSAssert([path isEqual:query.path],
  397. @"Tagged update path and query path must match");
  398. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  399. NSAssert(syncPoint != nil,
  400. @"Missing sync point for query tag that we're tracking.");
  401. FView *view = [syncPoint viewForQuery:query];
  402. NSAssert(view != nil,
  403. @"Missing view for query tag that we're tracking");
  404. id<FNode> serverNode = [view serverCache];
  405. for (FRangeMerge *merge in ranges) {
  406. serverNode = [merge applyToNode:serverNode];
  407. }
  408. return [self applyTaggedQueryOverwriteAtPath:path
  409. newData:serverNode
  410. tagId:tagId];
  411. } else {
  412. // We've already removed the query. No big deal, ignore the update.
  413. return @[];
  414. }
  415. }
  416. /**
  417. * Add an event callback for the specified query
  418. * @return NSArray of FEvent to raise.
  419. */
  420. - (NSArray *)addEventRegistration:(id<FEventRegistration>)eventRegistration
  421. forQuery:(FQuerySpec *)query {
  422. FPath *path = query.path;
  423. __block BOOL foundAncestorDefaultView = NO;
  424. [self.syncPointTree
  425. forEachOnPath:query.path
  426. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  427. foundAncestorDefaultView =
  428. foundAncestorDefaultView || [syncPoint hasCompleteView];
  429. return !foundAncestorDefaultView;
  430. }];
  431. [self.persistenceManager setQueryActive:query];
  432. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  433. if (syncPoint == nil) {
  434. syncPoint = [[FSyncPoint alloc]
  435. initWithPersistenceManager:self.persistenceManager];
  436. self.syncPointTree = [self.syncPointTree setValue:syncPoint
  437. atPath:path];
  438. }
  439. BOOL viewAlreadyExists = [syncPoint viewExistsForQuery:query];
  440. NSArray *events;
  441. if (viewAlreadyExists) {
  442. events = [syncPoint addEventRegistration:eventRegistration
  443. forExistingViewForQuery:query];
  444. } else {
  445. if (![query loadsAllData]) {
  446. // We need to track a tag for this query
  447. NSAssert(self.queryToTagMap[query] == nil,
  448. @"View does not exist, but we have a tag");
  449. NSNumber *tagId = [self.queryTagCounter getAndIncrement];
  450. self.queryToTagMap[query] = tagId;
  451. self.tagToQueryMap[tagId] = query;
  452. }
  453. FWriteTreeRef *writesCache =
  454. [self.pendingWriteTree childWritesForPath:path];
  455. FCacheNode *serverCache = [self serverCacheForQuery:query];
  456. events = [syncPoint addEventRegistration:eventRegistration
  457. forNonExistingViewForQuery:query
  458. writesCache:writesCache
  459. serverCache:serverCache];
  460. // There was no view and no default listen
  461. if (!foundAncestorDefaultView) {
  462. FView *view = [syncPoint viewForQuery:query];
  463. NSMutableArray *mutableEvents = [events mutableCopy];
  464. [mutableEvents
  465. addObjectsFromArray:[self setupListenerOnQuery:query
  466. view:view]];
  467. events = mutableEvents;
  468. }
  469. }
  470. return events;
  471. }
  472. - (FCacheNode *)serverCacheForQuery:(FQuerySpec *)query {
  473. __block id<FNode> serverCacheNode = nil;
  474. [self.syncPointTree
  475. forEachOnPath:query.path
  476. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  477. FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint
  478. to:query.path];
  479. serverCacheNode =
  480. [syncPoint completeServerCacheAtPath:relativePath];
  481. return serverCacheNode == nil;
  482. }];
  483. FCacheNode *serverCache;
  484. if (serverCacheNode != nil) {
  485. FIndexedNode *indexed =
  486. [FIndexedNode indexedNodeWithNode:serverCacheNode
  487. index:query.index];
  488. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
  489. isFullyInitialized:YES
  490. isFiltered:NO];
  491. } else {
  492. FCacheNode *persistenceServerCache =
  493. [self.persistenceManager serverCacheForQuery:query];
  494. if (persistenceServerCache.isFullyInitialized) {
  495. serverCache = persistenceServerCache;
  496. } else {
  497. serverCacheNode = [FEmptyNode emptyNode];
  498. FImmutableTree *subtree =
  499. [self.syncPointTree subtreeAtPath:query.path];
  500. [subtree
  501. forEachChild:^(NSString *childKey, FSyncPoint *childSyncPoint) {
  502. id<FNode> completeCache =
  503. [childSyncPoint completeServerCacheAtPath:[FPath empty]];
  504. if (completeCache) {
  505. serverCacheNode =
  506. [serverCacheNode updateImmediateChild:childKey
  507. withNewChild:completeCache];
  508. }
  509. }];
  510. // Fill the node with any available children we have
  511. [persistenceServerCache.node
  512. enumerateChildrenUsingBlock:^(NSString *key, id<FNode> node,
  513. BOOL *stop) {
  514. if (![serverCacheNode hasChild:key]) {
  515. serverCacheNode =
  516. [serverCacheNode updateImmediateChild:key
  517. withNewChild:node];
  518. }
  519. }];
  520. FIndexedNode *indexed =
  521. [FIndexedNode indexedNodeWithNode:serverCacheNode
  522. index:query.index];
  523. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
  524. isFullyInitialized:NO
  525. isFiltered:NO];
  526. }
  527. }
  528. return serverCache;
  529. }
  530. /**
  531. * Remove event callback(s).
  532. *
  533. * If query is the default query, we'll check all queries for the specified
  534. * eventRegistration. If eventRegistration is null, we'll remove all callbacks
  535. * for the specified query/queries.
  536. *
  537. * @param eventRegistration if nil, all callbacks are removed
  538. * @param cancelError If provided, appropriate cancel events will be returned
  539. * @return NSArray of FEvent to raise.
  540. */
  541. - (NSArray *)removeEventRegistration:(id<FEventRegistration>)eventRegistration
  542. forQuery:(FQuerySpec *)query
  543. cancelError:(NSError *)cancelError {
  544. // Find the syncPoint first. Then deal with whether or not it has matching
  545. // listeners
  546. FPath *path = query.path;
  547. FSyncPoint *maybeSyncPoint = [self.syncPointTree valueAtPath:path];
  548. NSArray *cancelEvents = @[];
  549. // A removal on a default query affects all queries at that location. A
  550. // removal on an indexed query, even one without other query constraints,
  551. // does *not* affect all queries at that location. So this check must be for
  552. // 'default', and not loadsAllData:
  553. if (maybeSyncPoint &&
  554. ([query isDefault] || [maybeSyncPoint viewExistsForQuery:query])) {
  555. FTupleRemovedQueriesEvents *removedAndEvents =
  556. [maybeSyncPoint removeEventRegistration:eventRegistration
  557. forQuery:query
  558. cancelError:cancelError];
  559. if ([maybeSyncPoint isEmpty]) {
  560. self.syncPointTree = [self.syncPointTree removeValueAtPath:path];
  561. }
  562. NSArray *removed = removedAndEvents.removedQueries;
  563. cancelEvents = removedAndEvents.cancelEvents;
  564. // We may have just removed one of many listeners and can short-circuit
  565. // this whole process We may also not have removed a default listener,
  566. // in which case all of the descendant listeners should already be
  567. // properly set up.
  568. //
  569. // Since indexed queries can shadow if they don't have other query
  570. // constraints, check for loadsAllData: instead of isDefault:
  571. NSUInteger defaultQueryIndex = [removed
  572. indexOfObjectPassingTest:^BOOL(FQuerySpec *q, NSUInteger idx,
  573. BOOL *stop) {
  574. return [q loadsAllData];
  575. }];
  576. BOOL removingDefault = defaultQueryIndex != NSNotFound;
  577. [removed enumerateObjectsUsingBlock:^(FQuerySpec *query, NSUInteger idx,
  578. BOOL *stop) {
  579. [self.persistenceManager setQueryInactive:query];
  580. }];
  581. NSNumber *covered = [self.syncPointTree
  582. findOnPath:path
  583. andApplyBlock:^id(FPath *relativePath,
  584. FSyncPoint *parentSyncPoint) {
  585. return
  586. [NSNumber numberWithBool:[parentSyncPoint hasCompleteView]];
  587. }];
  588. if (removingDefault && ![covered boolValue]) {
  589. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  590. // There are potentially child listeners. Determine what if any
  591. // listens we need to send before executing the removal
  592. if (![subtree isEmpty]) {
  593. // We need to fold over our subtree and collect the listeners to
  594. // send
  595. NSArray *newViews =
  596. [self collectDistinctViewsForSubTree:subtree];
  597. // Ok, we've collected all the listens we need. Set them up.
  598. [newViews enumerateObjectsUsingBlock:^(
  599. FView *view, NSUInteger idx, BOOL *stop) {
  600. FQuerySpec *newQuery = view.query;
  601. FListenContainer *listenContainer =
  602. [self createListenerForView:view];
  603. self.listenProvider.startListening(
  604. [self queryForListening:newQuery],
  605. [self tagForQuery:newQuery], listenContainer,
  606. listenContainer.onComplete);
  607. }];
  608. } else {
  609. // There's nothing below us, so nothing we need to start
  610. // listening on
  611. }
  612. }
  613. // If we removed anything and we're not covered by a higher up listen,
  614. // we need to stop listening on this query. The above block has us
  615. // covered in terms of making sure we're set up on listens lower in the
  616. // tree. Also, note that if we have a cancelError, it's already been
  617. // removed at the provider level.
  618. if (![covered boolValue] && [removed count] > 0 && cancelError == nil) {
  619. // If we removed a default, then we weren't listening on any of the
  620. // other queries here. Just cancel the one default. Otherwise, we
  621. // need to iterate through and cancel each individual query
  622. if (removingDefault) {
  623. // We don't tag default listeners
  624. self.listenProvider.stopListening(
  625. [self queryForListening:query], nil);
  626. } else {
  627. [removed
  628. enumerateObjectsUsingBlock:^(FQuerySpec *queryToRemove,
  629. NSUInteger idx, BOOL *stop) {
  630. NSNumber *tagToRemove =
  631. [self.queryToTagMap objectForKey:queryToRemove];
  632. self.listenProvider.stopListening(
  633. [self queryForListening:queryToRemove], tagToRemove);
  634. }];
  635. }
  636. }
  637. // Now, clear all the tags we're tracking for the removed listens.
  638. [self removeTags:removed];
  639. } else {
  640. // No-op, this listener must've been already removed
  641. }
  642. return cancelEvents;
  643. }
  644. - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)keepSynced {
  645. // Only do something if we actually need to add/remove an event registration
  646. if (keepSynced && ![self.keepSyncedQueries containsObject:query]) {
  647. [self addEventRegistration:[FKeepSyncedEventRegistration instance]
  648. forQuery:query];
  649. [self.keepSyncedQueries addObject:query];
  650. } else if (!keepSynced && [self.keepSyncedQueries containsObject:query]) {
  651. [self removeEventRegistration:[FKeepSyncedEventRegistration instance]
  652. forQuery:query
  653. cancelError:nil];
  654. [self.keepSyncedQueries removeObject:query];
  655. }
  656. }
  657. - (NSArray *)removeAllWrites {
  658. [self.persistenceManager removeAllUserWrites];
  659. NSArray *removedWrites = [self.pendingWriteTree removeAllWrites];
  660. if (removedWrites.count > 0) {
  661. FImmutableTree *affectedTree =
  662. [[FImmutableTree empty] setValue:@YES atPath:[FPath empty]];
  663. return [self applyOperationToSyncPoints:[[FAckUserWrite alloc]
  664. initWithPath:[FPath empty]
  665. affectedTree:affectedTree
  666. revert:YES]];
  667. } else {
  668. return @[];
  669. }
  670. }
  671. /**
  672. * Returns a complete cache, if we have one, of the data at a particular path.
  673. * The location must have a listener above it, but as this is only used by
  674. * transaction code, that should always be the case anyways.
  675. *
  676. * Note: this method will *include* hidden writes from transaction with
  677. * applyLocally set to false.
  678. * @param path The path to the data we want
  679. * @param writeIdsToExclude A specific set to be excluded
  680. */
  681. - (id<FNode>)calcCompleteEventCacheAtPath:(FPath *)path
  682. excludeWriteIds:(NSArray *)writeIdsToExclude {
  683. BOOL includeHiddenSets = YES;
  684. FWriteTree *writeTree = self.pendingWriteTree;
  685. id<FNode> serverCache = [self.syncPointTree
  686. findOnPath:path
  687. andApplyBlock:^id<FNode>(FPath *pathSoFar, FSyncPoint *syncPoint) {
  688. FPath *relativePath = [FPath relativePathFrom:pathSoFar to:path];
  689. id<FNode> serverCache =
  690. [syncPoint completeServerCacheAtPath:relativePath];
  691. if (serverCache) {
  692. return serverCache;
  693. } else {
  694. return nil;
  695. }
  696. }];
  697. return [writeTree calculateCompleteEventCacheAtPath:path
  698. completeServerCache:serverCache
  699. excludeWriteIds:writeIdsToExclude
  700. includeHiddenWrites:includeHiddenSets];
  701. }
  702. #pragma mark -
  703. #pragma mark Private Methods
  704. /**
  705. * This collapses multiple unfiltered views into a single view, since we only
  706. * need a single listener for them.
  707. * @return NSArray of FView
  708. */
  709. - (NSArray *)collectDistinctViewsForSubTree:(FImmutableTree *)subtree {
  710. return [subtree foldWithBlock:^NSArray *(FPath *relativePath,
  711. FSyncPoint *maybeChildSyncPoint,
  712. NSDictionary *childMap) {
  713. if (maybeChildSyncPoint && [maybeChildSyncPoint hasCompleteView]) {
  714. FView *completeView = [maybeChildSyncPoint completeView];
  715. return @[ completeView ];
  716. } else {
  717. // No complete view here, flatten any deeper listens into an array
  718. NSMutableArray *views = [[NSMutableArray alloc] init];
  719. if (maybeChildSyncPoint) {
  720. views = [[maybeChildSyncPoint queryViews] mutableCopy];
  721. }
  722. [childMap enumerateKeysAndObjectsUsingBlock:^(
  723. NSString *childKey, NSArray *childViews, BOOL *stop) {
  724. [views addObjectsFromArray:childViews];
  725. }];
  726. return views;
  727. }
  728. }];
  729. }
  730. /**
  731. * @param queries NSArray of FQuerySpec
  732. */
  733. - (void)removeTags:(NSArray *)queries {
  734. [queries enumerateObjectsUsingBlock:^(FQuerySpec *removedQuery,
  735. NSUInteger idx, BOOL *stop) {
  736. if (![removedQuery loadsAllData]) {
  737. // We should have a tag for this
  738. NSNumber *removedQueryTag = self.queryToTagMap[removedQuery];
  739. [self.queryToTagMap removeObjectForKey:removedQuery];
  740. [self.tagToQueryMap removeObjectForKey:removedQueryTag];
  741. }
  742. }];
  743. }
  744. - (FQuerySpec *)queryForListening:(FQuerySpec *)query {
  745. if (query.loadsAllData && !query.isDefault) {
  746. // We treat queries that load all data as default queries
  747. return [FQuerySpec defaultQueryAtPath:query.path];
  748. } else {
  749. return query;
  750. }
  751. }
  752. /**
  753. * For a given new listen, manage the de-duplication of outstanding
  754. * subscriptions.
  755. * @return NSArray of FEvent events to support synchronous data sources
  756. */
  757. - (NSArray *)setupListenerOnQuery:(FQuerySpec *)query view:(FView *)view {
  758. FPath *path = query.path;
  759. NSNumber *tagId = [self tagForQuery:query];
  760. FListenContainer *listenContainer = [self createListenerForView:view];
  761. NSArray *events = self.listenProvider.startListening(
  762. [self queryForListening:query], tagId, listenContainer,
  763. listenContainer.onComplete);
  764. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  765. // The root of this subtree has our query. We're here because we definitely
  766. // need to send a listen for that, but we may need to shadow other listens
  767. // as well.
  768. if (tagId != nil) {
  769. NSAssert(![subtree.value hasCompleteView],
  770. @"If we're adding a query, it shouldn't be shadowed");
  771. } else {
  772. // Shadow everything at or below this location, this is a default
  773. // listener.
  774. NSArray *queriesToStop =
  775. [subtree foldWithBlock:^id(FPath *relativePath,
  776. FSyncPoint *maybeChildSyncPoint,
  777. NSDictionary *childMap) {
  778. if (![relativePath isEmpty] && maybeChildSyncPoint != nil &&
  779. [maybeChildSyncPoint hasCompleteView]) {
  780. return @[ [maybeChildSyncPoint completeView].query ];
  781. } else {
  782. // No default listener here, flatten any deeper queries into
  783. // an array
  784. NSMutableArray *queries = [[NSMutableArray alloc] init];
  785. if (maybeChildSyncPoint != nil) {
  786. for (FView *view in [maybeChildSyncPoint queryViews]) {
  787. [queries addObject:view.query];
  788. }
  789. }
  790. [childMap
  791. enumerateKeysAndObjectsUsingBlock:^(
  792. NSString *key, NSArray *childQueries, BOOL *stop) {
  793. [queries addObjectsFromArray:childQueries];
  794. }];
  795. return queries;
  796. }
  797. }];
  798. for (FQuerySpec *queryToStop in queriesToStop) {
  799. self.listenProvider.stopListening(
  800. [self queryForListening:queryToStop],
  801. [self tagForQuery:queryToStop]);
  802. }
  803. }
  804. return events;
  805. }
  806. - (FListenContainer *)createListenerForView:(FView *)view {
  807. FQuerySpec *query = view.query;
  808. NSNumber *tagId = [self tagForQuery:query];
  809. FListenContainer *listenContainer = [[FListenContainer alloc]
  810. initWithView:view
  811. onComplete:^(NSString *status) {
  812. if ([status isEqualToString:@"ok"]) {
  813. if (tagId != nil) {
  814. return [self applyTaggedListenCompleteAtPath:query.path
  815. tagId:tagId];
  816. } else {
  817. return [self applyListenCompleteAtPath:query.path];
  818. }
  819. } else {
  820. // If a listen failed, kill all of the listeners here, not just
  821. // the one that triggered the error. Note that this may need to
  822. // be scoped to just this listener if we change permissions on
  823. // filtered children
  824. NSError *error = [FUtilities errorForStatus:status
  825. andReason:nil];
  826. FFWarn(@"I-RDB038012", @"Listener at %@ failed: %@", query.path,
  827. status);
  828. return [self removeEventRegistration:nil
  829. forQuery:query
  830. cancelError:error];
  831. }
  832. }];
  833. return listenContainer;
  834. }
  835. /**
  836. * @return The query associated with the given tag, if we have one
  837. */
  838. - (FQuerySpec *)queryForTag:(NSNumber *)tagId {
  839. return self.tagToQueryMap[tagId];
  840. }
  841. /**
  842. * @return The tag associated with the given query
  843. */
  844. - (NSNumber *)tagForQuery:(FQuerySpec *)query {
  845. return self.queryToTagMap[query];
  846. }
  847. #pragma mark -
  848. #pragma mark applyOperation Helpers
  849. /**
  850. * A helper method that visits all descendant and ancestor SyncPoints, applying
  851. the operation.
  852. *
  853. * NOTES:
  854. * - Descendant SyncPoints will be visited first (since we raise events
  855. depth-first).
  856. * - We call applyOperation: on each SyncPoint passing three things:
  857. * 1. A version of the Operation that has been made relative to the SyncPoint
  858. location.
  859. * 2. A WriteTreeRef of any writes we have cached at the SyncPoint location.
  860. * 3. A snapshot Node with cached server data, if we have it.
  861. * - We concatenate all of the events returned by each SyncPoint and return the
  862. result.
  863. *
  864. * @return Array of FEvent
  865. */
  866. - (NSArray *)applyOperationToSyncPoints:(id<FOperation>)operation {
  867. return [self applyOperationHelper:operation
  868. syncPointTree:self.syncPointTree
  869. serverCache:nil
  870. writesCache:[self.pendingWriteTree
  871. childWritesForPath:[FPath empty]]];
  872. }
  873. /**
  874. * Recursive helper for applyOperationToSyncPoints_
  875. */
  876. - (NSArray *)applyOperationHelper:(id<FOperation>)operation
  877. syncPointTree:(FImmutableTree *)syncPointTree
  878. serverCache:(id<FNode>)serverCache
  879. writesCache:(FWriteTreeRef *)writesCache {
  880. if ([operation.path isEmpty]) {
  881. return [self applyOperationDescendantsHelper:operation
  882. syncPointTree:syncPointTree
  883. serverCache:serverCache
  884. writesCache:writesCache];
  885. } else {
  886. FSyncPoint *syncPoint = syncPointTree.value;
  887. // If we don't have cached server data, see if we can get it from this
  888. // SyncPoint
  889. if (serverCache == nil && syncPoint != nil) {
  890. serverCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
  891. }
  892. NSMutableArray *events = [[NSMutableArray alloc] init];
  893. NSString *childKey = [operation.path getFront];
  894. id<FOperation> childOperation = [operation operationForChild:childKey];
  895. FImmutableTree *childTree = [syncPointTree.children get:childKey];
  896. if (childTree != nil && childOperation != nil) {
  897. id<FNode> childServerCache =
  898. serverCache ? [serverCache getImmediateChild:childKey] : nil;
  899. FWriteTreeRef *childWritesCache =
  900. [writesCache childWriteTreeRef:childKey];
  901. [events
  902. addObjectsFromArray:[self
  903. applyOperationHelper:childOperation
  904. syncPointTree:childTree
  905. serverCache:childServerCache
  906. writesCache:childWritesCache]];
  907. }
  908. if (syncPoint) {
  909. [events addObjectsFromArray:[syncPoint applyOperation:operation
  910. writesCache:writesCache
  911. serverCache:serverCache]];
  912. }
  913. return events;
  914. }
  915. }
  916. /**
  917. * Recursive helper for applyOperationToSyncPoints:
  918. */
  919. - (NSArray *)applyOperationDescendantsHelper:(id<FOperation>)operation
  920. syncPointTree:(FImmutableTree *)syncPointTree
  921. serverCache:(id<FNode>)serverCache
  922. writesCache:(FWriteTreeRef *)writesCache {
  923. FSyncPoint *syncPoint = syncPointTree.value;
  924. // If we don't have cached server data, see if we can get it from this
  925. // SyncPoint
  926. id<FNode> resolvedServerCache;
  927. if (serverCache == nil & syncPoint != nil) {
  928. resolvedServerCache =
  929. [syncPoint completeServerCacheAtPath:[FPath empty]];
  930. } else {
  931. resolvedServerCache = serverCache;
  932. }
  933. NSMutableArray *events = [[NSMutableArray alloc] init];
  934. [syncPointTree.children enumerateKeysAndObjectsUsingBlock:^(
  935. NSString *childKey, FImmutableTree *childTree,
  936. BOOL *stop) {
  937. id<FNode> childServerCache = nil;
  938. if (resolvedServerCache != nil) {
  939. childServerCache = [resolvedServerCache getImmediateChild:childKey];
  940. }
  941. FWriteTreeRef *childWritesCache =
  942. [writesCache childWriteTreeRef:childKey];
  943. id<FOperation> childOperation = [operation operationForChild:childKey];
  944. if (childOperation != nil) {
  945. [events addObjectsFromArray:
  946. [self applyOperationDescendantsHelper:childOperation
  947. syncPointTree:childTree
  948. serverCache:childServerCache
  949. writesCache:childWritesCache]];
  950. }
  951. }];
  952. if (syncPoint) {
  953. [events
  954. addObjectsFromArray:[syncPoint applyOperation:operation
  955. writesCache:writesCache
  956. serverCache:resolvedServerCache]];
  957. }
  958. return events;
  959. }
  960. @end