FSyncTree.m 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FirebaseDatabase/Sources/Core/FSyncTree.h"
  17. #import "FirebaseCore/Sources/Private/FirebaseCoreInternal.h"
  18. #import "FirebaseDatabase/Sources/Core/FCompoundHash.h"
  19. #import "FirebaseDatabase/Sources/Core/FListenProvider.h"
  20. #import "FirebaseDatabase/Sources/Core/FQueryParams.h"
  21. #import "FirebaseDatabase/Sources/Core/FQuerySpec.h"
  22. #import "FirebaseDatabase/Sources/Core/FRangeMerge.h"
  23. #import "FirebaseDatabase/Sources/Core/FServerValues.h"
  24. #import "FirebaseDatabase/Sources/Core/FSnapshotHolder.h"
  25. #import "FirebaseDatabase/Sources/Core/FSyncPoint.h"
  26. #import "FirebaseDatabase/Sources/Core/FWriteRecord.h"
  27. #import "FirebaseDatabase/Sources/Core/FWriteTree.h"
  28. #import "FirebaseDatabase/Sources/Core/FWriteTreeRef.h"
  29. #import "FirebaseDatabase/Sources/Core/Operation/FAckUserWrite.h"
  30. #import "FirebaseDatabase/Sources/Core/Operation/FMerge.h"
  31. #import "FirebaseDatabase/Sources/Core/Operation/FOperation.h"
  32. #import "FirebaseDatabase/Sources/Core/Operation/FOperationSource.h"
  33. #import "FirebaseDatabase/Sources/Core/Operation/FOverwrite.h"
  34. #import "FirebaseDatabase/Sources/Core/Utilities/FImmutableTree.h"
  35. #import "FirebaseDatabase/Sources/Core/Utilities/FPath.h"
  36. #import "FirebaseDatabase/Sources/Core/View/FCacheNode.h"
  37. #import "FirebaseDatabase/Sources/Core/View/FEventRaiser.h"
  38. #import "FirebaseDatabase/Sources/Core/View/FEventRegistration.h"
  39. #import "FirebaseDatabase/Sources/Core/View/FKeepSyncedEventRegistration.h"
  40. #import "FirebaseDatabase/Sources/Core/View/FView.h"
  41. #import "FirebaseDatabase/Sources/FListenComplete.h"
  42. #import "FirebaseDatabase/Sources/Persistence/FPersistenceManager.h"
  43. #import "FirebaseDatabase/Sources/Snapshot/FChildrenNode.h"
  44. #import "FirebaseDatabase/Sources/Snapshot/FCompoundWrite.h"
  45. #import "FirebaseDatabase/Sources/Snapshot/FEmptyNode.h"
  46. #import "FirebaseDatabase/Sources/Snapshot/FNode.h"
  47. #import "FirebaseDatabase/Sources/Snapshot/FSnapshotUtilities.h"
  48. #import "FirebaseDatabase/Sources/Utilities/FAtomicNumber.h"
  49. #import "FirebaseDatabase/Sources/Utilities/FUtilities.h"
  50. #import "FirebaseDatabase/Sources/Utilities/Tuples/FTupleRemovedQueriesEvents.h"
  51. // Size after which we start including the compound hash
  52. static const NSUInteger kFSizeThresholdForCompoundHash = 1024;
  53. @interface FListenContainer : NSObject <FSyncTreeHash>
  54. @property(nonatomic, strong) FView *view;
  55. @property(nonatomic, copy) fbt_nsarray_nsstring onComplete;
  56. @end
  57. @implementation FListenContainer
  58. - (instancetype)initWithView:(FView *)view
  59. onComplete:(fbt_nsarray_nsstring)onComplete {
  60. self = [super init];
  61. if (self != nil) {
  62. self->_view = view;
  63. self->_onComplete = onComplete;
  64. }
  65. return self;
  66. }
  67. - (id<FNode>)serverCache {
  68. return self.view.serverCache;
  69. }
  70. - (FCompoundHash *)compoundHash {
  71. return [FCompoundHash fromNode:[self serverCache]];
  72. }
  73. - (NSString *)simpleHash {
  74. return [[self serverCache] dataHash];
  75. }
  76. - (BOOL)includeCompoundHash {
  77. return [FSnapshotUtilities estimateSerializedNodeSize:[self serverCache]] >
  78. kFSizeThresholdForCompoundHash;
  79. }
  80. @end
  81. @interface FSyncTree ()
  82. /**
  83. * Tree of SyncPoints. There's a SyncPoint at any location that has 1 or more
  84. * views.
  85. */
  86. @property(nonatomic, strong) FImmutableTree *syncPointTree;
  87. /**
  88. * A tree of all pending user writes (user-initiated set, transactions, updates,
  89. * etc)
  90. */
  91. @property(nonatomic, strong) FWriteTree *pendingWriteTree;
  92. /**
  93. * Maps tagId -> FTuplePathQueryParams
  94. */
  95. @property(nonatomic, strong) NSMutableDictionary *tagToQueryMap;
  96. @property(nonatomic, strong) NSMutableDictionary *queryToTagMap;
  97. @property(nonatomic, strong) FListenProvider *listenProvider;
  98. @property(nonatomic, strong) FPersistenceManager *persistenceManager;
  99. @property(nonatomic, strong) FAtomicNumber *queryTagCounter;
  100. @property(nonatomic, strong) NSMutableSet *keepSyncedQueries;
  101. @end
  102. /**
  103. * SyncTree is the central class for managing event callback registration, data
  104. * caching, views (query processing), and event generation. There are typically
  105. * two SyncTree instances for each Repo, one for the normal Firebase data, and
  106. * one for the .info data.
  107. *
  108. * It has a number of responsibilities, including:
  109. * - Tracking all user event callbacks (registered via addEventRegistration:
  110. * and removeEventRegistration:).
  111. * - Applying and caching data changes for user setValue:,
  112. * runTransactionBlock:, and updateChildValues: calls
  113. * (applyUserOverwriteAtPath:, applyUserMergeAtPath:).
  114. * - Applying and caching data changes for server data changes
  115. * (applyServerOverwriteAtPath:, applyServerMergeAtPath:).
  116. * - Generating user-facing events for server and user changes (all of the
  117. * apply* methods return the set of events that need to be raised as a result).
  118. * - Maintaining the appropriate set of server listens to ensure we are always
  119. * subscribed to the correct set of paths and queries to satisfy the current set
  120. * of user event callbacks (listens are started/stopped using the provided
  121. * listenProvider).
  122. *
  123. * NOTE: Although SyncTree tracks event callbacks and calculates events to
  124. * raise, the actual events are returned to the caller rather than raised
  125. * synchronously.
  126. */
  127. @implementation FSyncTree
  128. - (id)initWithListenProvider:(FListenProvider *)provider {
  129. return [self initWithPersistenceManager:nil listenProvider:provider];
  130. }
  131. - (id)initWithPersistenceManager:(FPersistenceManager *)persistenceManager
  132. listenProvider:(FListenProvider *)provider {
  133. self = [super init];
  134. if (self) {
  135. self.syncPointTree = [FImmutableTree empty];
  136. self.pendingWriteTree = [[FWriteTree alloc] init];
  137. self.tagToQueryMap = [[NSMutableDictionary alloc] init];
  138. self.queryToTagMap = [[NSMutableDictionary alloc] init];
  139. self.listenProvider = provider;
  140. self.persistenceManager = persistenceManager;
  141. self.queryTagCounter = [[FAtomicNumber alloc] init];
  142. self.keepSyncedQueries = [NSMutableSet set];
  143. }
  144. return self;
  145. }
  146. #pragma mark -
  147. #pragma mark Apply Operations
  148. /**
  149. * Apply data changes for a user-generated setValue: runTransactionBlock:
  150. * updateChildValues:, etc.
  151. * @return NSArray of FEvent to raise.
  152. */
  153. - (NSArray *)applyUserOverwriteAtPath:(FPath *)path
  154. newData:(id<FNode>)newData
  155. writeId:(NSInteger)writeId
  156. isVisible:(BOOL)visible {
  157. // Record pending write
  158. [self.pendingWriteTree addOverwriteAtPath:path
  159. newData:newData
  160. writeId:writeId
  161. isVisible:visible];
  162. if (!visible) {
  163. return @[];
  164. } else {
  165. FOverwrite *operation =
  166. [[FOverwrite alloc] initWithSource:[FOperationSource userInstance]
  167. path:path
  168. snap:newData];
  169. return [self applyOperationToSyncPoints:operation];
  170. }
  171. }
  172. /**
  173. * Apply the data from a user-generated updateChildValues: call
  174. * @return NSArray of FEvent to raise.
  175. */
  176. - (NSArray *)applyUserMergeAtPath:(FPath *)path
  177. changedChildren:(FCompoundWrite *)changedChildren
  178. writeId:(NSInteger)writeId {
  179. // Record pending merge
  180. [self.pendingWriteTree addMergeAtPath:path
  181. changedChildren:changedChildren
  182. writeId:writeId];
  183. FMerge *operation =
  184. [[FMerge alloc] initWithSource:[FOperationSource userInstance]
  185. path:path
  186. children:changedChildren];
  187. return [self applyOperationToSyncPoints:operation];
  188. }
  189. /**
  190. * Acknowledge a pending user write that was previously registered with
  191. * applyUserOverwriteAtPath: or applyUserMergeAtPath:
  192. * TODO[offline]: Taking a serverClock here is awkward, but server values are
  193. * awkward. :-(
  194. * @return NSArray of FEvent to raise.
  195. */
  196. - (NSArray *)ackUserWriteWithWriteId:(NSInteger)writeId
  197. revert:(BOOL)revert
  198. persist:(BOOL)persist
  199. clock:(id<FClock>)clock {
  200. FWriteRecord *write = [self.pendingWriteTree writeForId:writeId];
  201. BOOL needToReevaluate = [self.pendingWriteTree removeWriteId:writeId];
  202. if (write.visible) {
  203. if (persist) {
  204. [self.persistenceManager removeUserWrite:writeId];
  205. }
  206. if (!revert) {
  207. NSDictionary *serverValues =
  208. [FServerValues generateServerValues:clock];
  209. if ([write isOverwrite]) {
  210. id<FNode> resolvedNode =
  211. [FServerValues resolveDeferredValueSnapshot:write.overwrite
  212. withSyncTree:self
  213. atPath:write.path
  214. serverValues:serverValues];
  215. [self.persistenceManager applyUserWrite:resolvedNode
  216. toServerCacheAtPath:write.path];
  217. } else {
  218. FCompoundWrite *resolvedMerge = [FServerValues
  219. resolveDeferredValueCompoundWrite:write.merge
  220. withSyncTree:self
  221. atPath:write.path
  222. serverValues:serverValues];
  223. [self.persistenceManager applyUserMerge:resolvedMerge
  224. toServerCacheAtPath:write.path];
  225. }
  226. }
  227. }
  228. if (!needToReevaluate) {
  229. return @[];
  230. } else {
  231. __block FImmutableTree *affectedTree = [FImmutableTree empty];
  232. if (write.isOverwrite) {
  233. affectedTree = [affectedTree setValue:@YES atPath:[FPath empty]];
  234. } else {
  235. [write.merge
  236. enumerateWrites:^(FPath *path, id<FNode> node, BOOL *stop) {
  237. affectedTree = [affectedTree setValue:@YES atPath:path];
  238. }];
  239. }
  240. FAckUserWrite *operation =
  241. [[FAckUserWrite alloc] initWithPath:write.path
  242. affectedTree:affectedTree
  243. revert:revert];
  244. return [self applyOperationToSyncPoints:operation];
  245. }
  246. }
  247. /**
  248. * Apply new server data for the specified path
  249. * @return NSArray of FEvent to raise.
  250. */
  251. - (NSArray *)applyServerOverwriteAtPath:(FPath *)path
  252. newData:(id<FNode>)newData {
  253. [self.persistenceManager
  254. updateServerCacheWithNode:newData
  255. forQuery:[FQuerySpec defaultQueryAtPath:path]];
  256. FOverwrite *operation =
  257. [[FOverwrite alloc] initWithSource:[FOperationSource serverInstance]
  258. path:path
  259. snap:newData];
  260. return [self applyOperationToSyncPoints:operation];
  261. }
  262. /**
  263. * Applied new server data to be merged in at the specified path
  264. * @return NSArray of FEvent to raise.
  265. */
  266. - (NSArray *)applyServerMergeAtPath:(FPath *)path
  267. changedChildren:(FCompoundWrite *)changedChildren {
  268. [self.persistenceManager updateServerCacheWithMerge:changedChildren
  269. atPath:path];
  270. FMerge *operation =
  271. [[FMerge alloc] initWithSource:[FOperationSource serverInstance]
  272. path:path
  273. children:changedChildren];
  274. return [self applyOperationToSyncPoints:operation];
  275. }
  276. - (NSArray *)applyServerRangeMergeAtPath:(FPath *)path
  277. updates:(NSArray *)ranges {
  278. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  279. if (syncPoint == nil) {
  280. // Removed view, so it's safe to just ignore this update
  281. return @[];
  282. } else {
  283. // This could be for any "complete" (unfiltered) view, and if there is
  284. // more than one complete view, they should each have the same cache so
  285. // it doesn't matter which one we use.
  286. FView *view = [syncPoint completeView];
  287. if (view != nil) {
  288. id<FNode> serverNode = [view serverCache];
  289. for (FRangeMerge *merge in ranges) {
  290. serverNode = [merge applyToNode:serverNode];
  291. }
  292. return [self applyServerOverwriteAtPath:path newData:serverNode];
  293. } else {
  294. // There doesn't exist a view for this update, so it was removed and
  295. // it's safe to just ignore this range merge
  296. return @[];
  297. }
  298. }
  299. }
  300. /**
  301. * Apply a listen complete to a path
  302. * @return NSArray of FEvent to raise.
  303. */
  304. - (NSArray *)applyListenCompleteAtPath:(FPath *)path {
  305. [self.persistenceManager
  306. setQueryComplete:[FQuerySpec defaultQueryAtPath:path]];
  307. id<FOperation> operation = [[FListenComplete alloc]
  308. initWithSource:[FOperationSource serverInstance]
  309. path:path];
  310. return [self applyOperationToSyncPoints:operation];
  311. }
  312. /**
  313. * Apply a listen complete to a path
  314. * @return NSArray of FEvent to raise.
  315. */
  316. - (NSArray *)applyTaggedListenCompleteAtPath:(FPath *)path
  317. tagId:(NSNumber *)tagId {
  318. FQuerySpec *query = [self queryForTag:tagId];
  319. if (query != nil) {
  320. [self.persistenceManager setQueryComplete:query];
  321. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  322. id<FOperation> op = [[FListenComplete alloc]
  323. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  324. path:relativePath];
  325. return [self applyTaggedOperation:op atPath:query.path];
  326. } else {
  327. // We've already removed the query. No big deal, ignore the update.
  328. return @[];
  329. }
  330. }
  331. /**
  332. * Internal helper method to apply tagged operation
  333. */
  334. - (NSArray *)applyTaggedOperation:(id<FOperation>)operation
  335. atPath:(FPath *)path {
  336. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  337. NSAssert(syncPoint != nil,
  338. @"Missing sync point for query tag that we're tracking.");
  339. FWriteTreeRef *writesCache =
  340. [self.pendingWriteTree childWritesForPath:path];
  341. return [syncPoint applyOperation:operation
  342. writesCache:writesCache
  343. serverCache:nil];
  344. }
  345. /**
  346. * Apply new server data for the specified tagged query
  347. * @return NSArray of FEvent to raise.
  348. */
  349. - (NSArray *)applyTaggedQueryOverwriteAtPath:(FPath *)path
  350. newData:(id<FNode>)newData
  351. tagId:(NSNumber *)tagId {
  352. FQuerySpec *query = [self queryForTag:tagId];
  353. if (query != nil) {
  354. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  355. FQuerySpec *queryToOverwrite =
  356. relativePath.isEmpty ? query : [FQuerySpec defaultQueryAtPath:path];
  357. [self.persistenceManager updateServerCacheWithNode:newData
  358. forQuery:queryToOverwrite];
  359. FOverwrite *operation = [[FOverwrite alloc]
  360. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  361. path:relativePath
  362. snap:newData];
  363. return [self applyTaggedOperation:operation atPath:query.path];
  364. } else {
  365. // Query must have been removed already
  366. return @[];
  367. }
  368. }
  369. /**
  370. * Apply server data to be merged in for the specified tagged query
  371. * @return NSArray of FEvent to raise.
  372. */
  373. - (NSArray *)applyTaggedQueryMergeAtPath:(FPath *)path
  374. changedChildren:(FCompoundWrite *)changedChildren
  375. tagId:(NSNumber *)tagId {
  376. FQuerySpec *query = [self queryForTag:tagId];
  377. if (query != nil) {
  378. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  379. [self.persistenceManager updateServerCacheWithMerge:changedChildren
  380. atPath:path];
  381. FMerge *operation = [[FMerge alloc]
  382. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  383. path:relativePath
  384. children:changedChildren];
  385. return [self applyTaggedOperation:operation atPath:query.path];
  386. } else {
  387. // We've already removed the query. No big deal, ignore the update.
  388. return @[];
  389. }
  390. }
  391. - (NSArray *)applyTaggedServerRangeMergeAtPath:(FPath *)path
  392. updates:(NSArray *)ranges
  393. tagId:(NSNumber *)tagId {
  394. FQuerySpec *query = [self queryForTag:tagId];
  395. if (query != nil) {
  396. NSAssert([path isEqual:query.path],
  397. @"Tagged update path and query path must match");
  398. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  399. NSAssert(syncPoint != nil,
  400. @"Missing sync point for query tag that we're tracking.");
  401. FView *view = [syncPoint viewForQuery:query];
  402. NSAssert(view != nil,
  403. @"Missing view for query tag that we're tracking");
  404. id<FNode> serverNode = [view serverCache];
  405. for (FRangeMerge *merge in ranges) {
  406. serverNode = [merge applyToNode:serverNode];
  407. }
  408. return [self applyTaggedQueryOverwriteAtPath:path
  409. newData:serverNode
  410. tagId:tagId];
  411. } else {
  412. // We've already removed the query. No big deal, ignore the update.
  413. return @[];
  414. }
  415. }
  416. /**
  417. * Add an event callback for the specified query
  418. * @return NSArray of FEvent to raise.
  419. */
  420. - (NSArray *)addEventRegistration:(id<FEventRegistration>)eventRegistration
  421. forQuery:(FQuerySpec *)query {
  422. FPath *path = query.path;
  423. __block BOOL foundAncestorDefaultView = NO;
  424. [self.syncPointTree
  425. forEachOnPath:query.path
  426. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  427. foundAncestorDefaultView =
  428. foundAncestorDefaultView || [syncPoint hasCompleteView];
  429. return !foundAncestorDefaultView;
  430. }];
  431. [self.persistenceManager setQueryActive:query];
  432. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  433. if (syncPoint == nil) {
  434. syncPoint = [[FSyncPoint alloc]
  435. initWithPersistenceManager:self.persistenceManager];
  436. self.syncPointTree = [self.syncPointTree setValue:syncPoint
  437. atPath:path];
  438. }
  439. BOOL viewAlreadyExists = [syncPoint viewExistsForQuery:query];
  440. NSArray *events;
  441. if (viewAlreadyExists) {
  442. events = [syncPoint addEventRegistration:eventRegistration
  443. forExistingViewForQuery:query];
  444. } else {
  445. if (![query loadsAllData]) {
  446. // We need to track a tag for this query
  447. NSAssert(self.queryToTagMap[query] == nil,
  448. @"View does not exist, but we have a tag");
  449. NSNumber *tagId = [self.queryTagCounter getAndIncrement];
  450. self.queryToTagMap[query] = tagId;
  451. self.tagToQueryMap[tagId] = query;
  452. }
  453. FWriteTreeRef *writesCache =
  454. [self.pendingWriteTree childWritesForPath:path];
  455. FCacheNode *serverCache = [self serverCacheForQuery:query];
  456. events = [syncPoint addEventRegistration:eventRegistration
  457. forNonExistingViewForQuery:query
  458. writesCache:writesCache
  459. serverCache:serverCache];
  460. // There was no view and no default listen
  461. if (!foundAncestorDefaultView) {
  462. FView *view = [syncPoint viewForQuery:query];
  463. NSMutableArray *mutableEvents = [events mutableCopy];
  464. [mutableEvents
  465. addObjectsFromArray:[self setupListenerOnQuery:query
  466. view:view]];
  467. events = mutableEvents;
  468. }
  469. }
  470. return events;
  471. }
  472. - (FCacheNode *)serverCacheForQuery:(FQuerySpec *)query {
  473. __block id<FNode> serverCacheNode = nil;
  474. [self.syncPointTree
  475. forEachOnPath:query.path
  476. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  477. FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint
  478. to:query.path];
  479. serverCacheNode =
  480. [syncPoint completeServerCacheAtPath:relativePath];
  481. return serverCacheNode == nil;
  482. }];
  483. FCacheNode *serverCache;
  484. if (serverCacheNode != nil) {
  485. FIndexedNode *indexed =
  486. [FIndexedNode indexedNodeWithNode:serverCacheNode
  487. index:query.index];
  488. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
  489. isFullyInitialized:YES
  490. isFiltered:NO];
  491. } else {
  492. FCacheNode *persistenceServerCache =
  493. [self.persistenceManager serverCacheForQuery:query];
  494. if (persistenceServerCache.isFullyInitialized) {
  495. serverCache = persistenceServerCache;
  496. } else {
  497. serverCacheNode = [FEmptyNode emptyNode];
  498. FImmutableTree *subtree =
  499. [self.syncPointTree subtreeAtPath:query.path];
  500. [subtree
  501. forEachChild:^(NSString *childKey, FSyncPoint *childSyncPoint) {
  502. id<FNode> completeCache =
  503. [childSyncPoint completeServerCacheAtPath:[FPath empty]];
  504. if (completeCache) {
  505. serverCacheNode =
  506. [serverCacheNode updateImmediateChild:childKey
  507. withNewChild:completeCache];
  508. }
  509. }];
  510. // Fill the node with any available children we have
  511. [persistenceServerCache.node
  512. enumerateChildrenUsingBlock:^(NSString *key, id<FNode> node,
  513. BOOL *stop) {
  514. if (![serverCacheNode hasChild:key]) {
  515. serverCacheNode =
  516. [serverCacheNode updateImmediateChild:key
  517. withNewChild:node];
  518. }
  519. }];
  520. FIndexedNode *indexed =
  521. [FIndexedNode indexedNodeWithNode:serverCacheNode
  522. index:query.index];
  523. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
  524. isFullyInitialized:NO
  525. isFiltered:NO];
  526. }
  527. }
  528. return serverCache;
  529. }
  530. /**
  531. * Remove event callback(s).
  532. *
  533. * If query is the default query, we'll check all queries for the specified
  534. * eventRegistration. If eventRegistration is null, we'll remove all callbacks
  535. * for the specified query/queries.
  536. *
  537. * @param eventRegistration if nil, all callbacks are removed
  538. * @param cancelError If provided, appropriate cancel events will be returned
  539. * @return NSArray of FEvent to raise.
  540. */
  541. - (NSArray *)removeEventRegistration:(id<FEventRegistration>)eventRegistration
  542. forQuery:(FQuerySpec *)query
  543. cancelError:(NSError *)cancelError {
  544. // Find the syncPoint first. Then deal with whether or not it has matching
  545. // listeners
  546. FPath *path = query.path;
  547. FSyncPoint *maybeSyncPoint = [self.syncPointTree valueAtPath:path];
  548. NSArray *cancelEvents = @[];
  549. // A removal on a default query affects all queries at that location. A
  550. // removal on an indexed query, even one without other query constraints,
  551. // does *not* affect all queries at that location. So this check must be for
  552. // 'default', and not loadsAllData:
  553. if (maybeSyncPoint &&
  554. ([query isDefault] || [maybeSyncPoint viewExistsForQuery:query])) {
  555. FTupleRemovedQueriesEvents *removedAndEvents =
  556. [maybeSyncPoint removeEventRegistration:eventRegistration
  557. forQuery:query
  558. cancelError:cancelError];
  559. if ([maybeSyncPoint isEmpty]) {
  560. self.syncPointTree = [self.syncPointTree removeValueAtPath:path];
  561. }
  562. NSArray *removed = removedAndEvents.removedQueries;
  563. cancelEvents = removedAndEvents.cancelEvents;
  564. // We may have just removed one of many listeners and can short-circuit
  565. // this whole process We may also not have removed a default listener,
  566. // in which case all of the descendant listeners should already be
  567. // properly set up.
  568. //
  569. // Since indexed queries can shadow if they don't have other query
  570. // constraints, check for loadsAllData: instead of isDefault:
  571. NSUInteger defaultQueryIndex = [removed
  572. indexOfObjectPassingTest:^BOOL(FQuerySpec *q, NSUInteger idx,
  573. BOOL *stop) {
  574. return [q loadsAllData];
  575. }];
  576. BOOL removingDefault = defaultQueryIndex != NSNotFound;
  577. [removed enumerateObjectsUsingBlock:^(FQuerySpec *query, NSUInteger idx,
  578. BOOL *stop) {
  579. [self.persistenceManager setQueryInactive:query];
  580. }];
  581. NSNumber *covered = [self.syncPointTree
  582. findOnPath:path
  583. andApplyBlock:^id(FPath *relativePath,
  584. FSyncPoint *parentSyncPoint) {
  585. return
  586. [NSNumber numberWithBool:[parentSyncPoint hasCompleteView]];
  587. }];
  588. if (removingDefault && ![covered boolValue]) {
  589. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  590. // There are potentially child listeners. Determine what if any
  591. // listens we need to send before executing the removal
  592. if (![subtree isEmpty]) {
  593. // We need to fold over our subtree and collect the listeners to
  594. // send
  595. NSArray *newViews =
  596. [self collectDistinctViewsForSubTree:subtree];
  597. // Ok, we've collected all the listens we need. Set them up.
  598. [newViews enumerateObjectsUsingBlock:^(
  599. FView *view, NSUInteger idx, BOOL *stop) {
  600. FQuerySpec *newQuery = view.query;
  601. FListenContainer *listenContainer =
  602. [self createListenerForView:view];
  603. self.listenProvider.startListening(
  604. [self queryForListening:newQuery],
  605. [self tagForQuery:newQuery], listenContainer,
  606. listenContainer.onComplete);
  607. }];
  608. } else {
  609. // There's nothing below us, so nothing we need to start
  610. // listening on
  611. }
  612. }
  613. // If we removed anything and we're not covered by a higher up listen,
  614. // we need to stop listening on this query. The above block has us
  615. // covered in terms of making sure we're set up on listens lower in the
  616. // tree. Also, note that if we have a cancelError, it's already been
  617. // removed at the provider level.
  618. if (![covered boolValue] && [removed count] > 0 && cancelError == nil) {
  619. // If we removed a default, then we weren't listening on any of the
  620. // other queries here. Just cancel the one default. Otherwise, we
  621. // need to iterate through and cancel each individual query
  622. if (removingDefault) {
  623. // We don't tag default listeners
  624. self.listenProvider.stopListening(
  625. [self queryForListening:query], nil);
  626. } else {
  627. [removed
  628. enumerateObjectsUsingBlock:^(FQuerySpec *queryToRemove,
  629. NSUInteger idx, BOOL *stop) {
  630. NSNumber *tagToRemove =
  631. [self.queryToTagMap objectForKey:queryToRemove];
  632. self.listenProvider.stopListening(
  633. [self queryForListening:queryToRemove], tagToRemove);
  634. }];
  635. }
  636. }
  637. // Now, clear all the tags we're tracking for the removed listens.
  638. [self removeTags:removed];
  639. } else {
  640. // No-op, this listener must've been already removed
  641. }
  642. return cancelEvents;
  643. }
  644. - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)keepSynced {
  645. // Only do something if we actually need to add/remove an event registration
  646. if (keepSynced && ![self.keepSyncedQueries containsObject:query]) {
  647. [self addEventRegistration:[FKeepSyncedEventRegistration instance]
  648. forQuery:query];
  649. [self.keepSyncedQueries addObject:query];
  650. } else if (!keepSynced && [self.keepSyncedQueries containsObject:query]) {
  651. [self removeEventRegistration:[FKeepSyncedEventRegistration instance]
  652. forQuery:query
  653. cancelError:nil];
  654. [self.keepSyncedQueries removeObject:query];
  655. }
  656. }
  657. - (NSArray *)removeAllWrites {
  658. [self.persistenceManager removeAllUserWrites];
  659. NSArray *removedWrites = [self.pendingWriteTree removeAllWrites];
  660. if (removedWrites.count > 0) {
  661. FImmutableTree *affectedTree =
  662. [[FImmutableTree empty] setValue:@YES atPath:[FPath empty]];
  663. return [self applyOperationToSyncPoints:[[FAckUserWrite alloc]
  664. initWithPath:[FPath empty]
  665. affectedTree:affectedTree
  666. revert:YES]];
  667. } else {
  668. return @[];
  669. }
  670. }
  671. /** Returns a non-empty cache node if one exists. Otherwise returns null. */
  672. - (FIndexedNode *)persistenceServerCache:(FQuerySpec *)querySpec {
  673. FCacheNode *cacheNode =
  674. [self.persistenceManager serverCacheForQuery:querySpec];
  675. if (cacheNode == nil || cacheNode.node.isEmpty) {
  676. return nil;
  677. }
  678. return cacheNode.indexedNode;
  679. }
  680. - (id<FNode>)getServerValue:(FQuerySpec *)query {
  681. __block id<FNode> serverCacheNode = nil;
  682. __block FSyncPoint *targetSyncPoint = nil;
  683. [self.syncPointTree
  684. forEachOnPath:query.path
  685. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  686. FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint
  687. to:query.path];
  688. serverCacheNode =
  689. [syncPoint completeEventCacheAtPath:relativePath];
  690. targetSyncPoint = syncPoint;
  691. return serverCacheNode == nil;
  692. }];
  693. if (targetSyncPoint == nil) {
  694. targetSyncPoint = [[FSyncPoint alloc]
  695. initWithPersistenceManager:self.persistenceManager];
  696. self.syncPointTree = [self.syncPointTree setValue:targetSyncPoint
  697. atPath:[query path]];
  698. } else {
  699. serverCacheNode =
  700. serverCacheNode != nil
  701. ? serverCacheNode
  702. : [targetSyncPoint completeServerCacheAtPath:[FPath empty]];
  703. }
  704. FIndexedNode *indexed = [FIndexedNode
  705. indexedNodeWithNode:serverCacheNode != nil ? serverCacheNode
  706. : [FEmptyNode emptyNode]
  707. index:query.index];
  708. FCacheNode *serverCache =
  709. [[FCacheNode alloc] initWithIndexedNode:indexed
  710. isFullyInitialized:serverCacheNode != nil
  711. isFiltered:NO];
  712. FView *view = [targetSyncPoint
  713. getView:query
  714. writesCache:[_pendingWriteTree childWritesForPath:[query path]]
  715. serverCache:serverCache];
  716. return [view completeEventCache];
  717. }
  718. /**
  719. * Returns a complete cache, if we have one, of the data at a particular path.
  720. * The location must have a listener above it, but as this is only used by
  721. * transaction code, that should always be the case anyways.
  722. *
  723. * Note: this method will *include* hidden writes from transaction with
  724. * applyLocally set to false.
  725. * @param path The path to the data we want
  726. * @param writeIdsToExclude A specific set to be excluded
  727. */
  728. - (id<FNode>)calcCompleteEventCacheAtPath:(FPath *)path
  729. excludeWriteIds:(NSArray *)writeIdsToExclude {
  730. BOOL includeHiddenSets = YES;
  731. FWriteTree *writeTree = self.pendingWriteTree;
  732. id<FNode> serverCache = [self.syncPointTree
  733. findOnPath:path
  734. andApplyBlock:^id<FNode>(FPath *pathSoFar, FSyncPoint *syncPoint) {
  735. FPath *relativePath = [FPath relativePathFrom:pathSoFar to:path];
  736. id<FNode> serverCache =
  737. [syncPoint completeServerCacheAtPath:relativePath];
  738. if (serverCache) {
  739. return serverCache;
  740. } else {
  741. return nil;
  742. }
  743. }];
  744. return [writeTree calculateCompleteEventCacheAtPath:path
  745. completeServerCache:serverCache
  746. excludeWriteIds:writeIdsToExclude
  747. includeHiddenWrites:includeHiddenSets];
  748. }
  749. #pragma mark -
  750. #pragma mark Private Methods
  751. /**
  752. * This collapses multiple unfiltered views into a single view, since we only
  753. * need a single listener for them.
  754. * @return NSArray of FView
  755. */
  756. - (NSArray *)collectDistinctViewsForSubTree:(FImmutableTree *)subtree {
  757. return [subtree foldWithBlock:^NSArray *(FPath *relativePath,
  758. FSyncPoint *maybeChildSyncPoint,
  759. NSDictionary *childMap) {
  760. if (maybeChildSyncPoint && [maybeChildSyncPoint hasCompleteView]) {
  761. FView *completeView = [maybeChildSyncPoint completeView];
  762. return @[ completeView ];
  763. } else {
  764. // No complete view here, flatten any deeper listens into an array
  765. NSMutableArray *views = [[NSMutableArray alloc] init];
  766. if (maybeChildSyncPoint) {
  767. views = [[maybeChildSyncPoint queryViews] mutableCopy];
  768. }
  769. [childMap enumerateKeysAndObjectsUsingBlock:^(
  770. NSString *childKey, NSArray *childViews, BOOL *stop) {
  771. [views addObjectsFromArray:childViews];
  772. }];
  773. return views;
  774. }
  775. }];
  776. }
  777. /**
  778. * @param queries NSArray of FQuerySpec
  779. */
  780. - (void)removeTags:(NSArray *)queries {
  781. [queries enumerateObjectsUsingBlock:^(FQuerySpec *removedQuery,
  782. NSUInteger idx, BOOL *stop) {
  783. if (![removedQuery loadsAllData]) {
  784. // We should have a tag for this
  785. NSNumber *removedQueryTag = self.queryToTagMap[removedQuery];
  786. [self.queryToTagMap removeObjectForKey:removedQuery];
  787. [self.tagToQueryMap removeObjectForKey:removedQueryTag];
  788. }
  789. }];
  790. }
  791. - (FQuerySpec *)queryForListening:(FQuerySpec *)query {
  792. if (query.loadsAllData && !query.isDefault) {
  793. // We treat queries that load all data as default queries
  794. return [FQuerySpec defaultQueryAtPath:query.path];
  795. } else {
  796. return query;
  797. }
  798. }
  799. /**
  800. * For a given new listen, manage the de-duplication of outstanding
  801. * subscriptions.
  802. * @return NSArray of FEvent events to support synchronous data sources
  803. */
  804. - (NSArray *)setupListenerOnQuery:(FQuerySpec *)query view:(FView *)view {
  805. FPath *path = query.path;
  806. NSNumber *tagId = [self tagForQuery:query];
  807. FListenContainer *listenContainer = [self createListenerForView:view];
  808. NSArray *events = self.listenProvider.startListening(
  809. [self queryForListening:query], tagId, listenContainer,
  810. listenContainer.onComplete);
  811. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  812. // The root of this subtree has our query. We're here because we definitely
  813. // need to send a listen for that, but we may need to shadow other listens
  814. // as well.
  815. if (tagId != nil) {
  816. NSAssert(![subtree.value hasCompleteView],
  817. @"If we're adding a query, it shouldn't be shadowed");
  818. } else {
  819. // Shadow everything at or below this location, this is a default
  820. // listener.
  821. NSArray *queriesToStop =
  822. [subtree foldWithBlock:^id(FPath *relativePath,
  823. FSyncPoint *maybeChildSyncPoint,
  824. NSDictionary *childMap) {
  825. if (![relativePath isEmpty] && maybeChildSyncPoint != nil &&
  826. [maybeChildSyncPoint hasCompleteView]) {
  827. return @[ [maybeChildSyncPoint completeView].query ];
  828. } else {
  829. // No default listener here, flatten any deeper queries into
  830. // an array
  831. NSMutableArray *queries = [[NSMutableArray alloc] init];
  832. if (maybeChildSyncPoint != nil) {
  833. for (FView *view in [maybeChildSyncPoint queryViews]) {
  834. [queries addObject:view.query];
  835. }
  836. }
  837. [childMap
  838. enumerateKeysAndObjectsUsingBlock:^(
  839. NSString *key, NSArray *childQueries, BOOL *stop) {
  840. [queries addObjectsFromArray:childQueries];
  841. }];
  842. return queries;
  843. }
  844. }];
  845. for (FQuerySpec *queryToStop in queriesToStop) {
  846. self.listenProvider.stopListening(
  847. [self queryForListening:queryToStop],
  848. [self tagForQuery:queryToStop]);
  849. }
  850. }
  851. return events;
  852. }
  853. - (FListenContainer *)createListenerForView:(FView *)view {
  854. FQuerySpec *query = view.query;
  855. NSNumber *tagId = [self tagForQuery:query];
  856. FListenContainer *listenContainer = [[FListenContainer alloc]
  857. initWithView:view
  858. onComplete:^(NSString *status) {
  859. if ([status isEqualToString:@"ok"]) {
  860. if (tagId != nil) {
  861. return [self applyTaggedListenCompleteAtPath:query.path
  862. tagId:tagId];
  863. } else {
  864. return [self applyListenCompleteAtPath:query.path];
  865. }
  866. } else {
  867. // If a listen failed, kill all of the listeners here, not just
  868. // the one that triggered the error. Note that this may need to
  869. // be scoped to just this listener if we change permissions on
  870. // filtered children
  871. NSError *error = [FUtilities errorForStatus:status
  872. andReason:nil];
  873. FFWarn(@"I-RDB038012", @"Listener at %@ failed: %@", query.path,
  874. status);
  875. return [self removeEventRegistration:nil
  876. forQuery:query
  877. cancelError:error];
  878. }
  879. }];
  880. return listenContainer;
  881. }
  882. /**
  883. * @return The query associated with the given tag, if we have one
  884. */
  885. - (FQuerySpec *)queryForTag:(NSNumber *)tagId {
  886. return self.tagToQueryMap[tagId];
  887. }
  888. /**
  889. * @return The tag associated with the given query
  890. */
  891. - (NSNumber *)tagForQuery:(FQuerySpec *)query {
  892. return self.queryToTagMap[query];
  893. }
  894. #pragma mark -
  895. #pragma mark applyOperation Helpers
  896. /**
  897. * A helper method that visits all descendant and ancestor SyncPoints, applying
  898. the operation.
  899. *
  900. * NOTES:
  901. * - Descendant SyncPoints will be visited first (since we raise events
  902. depth-first).
  903. * - We call applyOperation: on each SyncPoint passing three things:
  904. * 1. A version of the Operation that has been made relative to the SyncPoint
  905. location.
  906. * 2. A WriteTreeRef of any writes we have cached at the SyncPoint location.
  907. * 3. A snapshot Node with cached server data, if we have it.
  908. * - We concatenate all of the events returned by each SyncPoint and return the
  909. result.
  910. *
  911. * @return Array of FEvent
  912. */
  913. - (NSArray *)applyOperationToSyncPoints:(id<FOperation>)operation {
  914. return [self applyOperationHelper:operation
  915. syncPointTree:self.syncPointTree
  916. serverCache:nil
  917. writesCache:[self.pendingWriteTree
  918. childWritesForPath:[FPath empty]]];
  919. }
  920. /**
  921. * Recursive helper for applyOperationToSyncPoints_
  922. */
  923. - (NSArray *)applyOperationHelper:(id<FOperation>)operation
  924. syncPointTree:(FImmutableTree *)syncPointTree
  925. serverCache:(id<FNode>)serverCache
  926. writesCache:(FWriteTreeRef *)writesCache {
  927. if ([operation.path isEmpty]) {
  928. return [self applyOperationDescendantsHelper:operation
  929. syncPointTree:syncPointTree
  930. serverCache:serverCache
  931. writesCache:writesCache];
  932. } else {
  933. FSyncPoint *syncPoint = syncPointTree.value;
  934. // If we don't have cached server data, see if we can get it from this
  935. // SyncPoint
  936. if (serverCache == nil && syncPoint != nil) {
  937. serverCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
  938. }
  939. NSMutableArray *events = [[NSMutableArray alloc] init];
  940. NSString *childKey = [operation.path getFront];
  941. id<FOperation> childOperation = [operation operationForChild:childKey];
  942. FImmutableTree *childTree = [syncPointTree.children get:childKey];
  943. if (childTree != nil && childOperation != nil) {
  944. id<FNode> childServerCache =
  945. serverCache ? [serverCache getImmediateChild:childKey] : nil;
  946. FWriteTreeRef *childWritesCache =
  947. [writesCache childWriteTreeRef:childKey];
  948. [events
  949. addObjectsFromArray:[self
  950. applyOperationHelper:childOperation
  951. syncPointTree:childTree
  952. serverCache:childServerCache
  953. writesCache:childWritesCache]];
  954. }
  955. if (syncPoint) {
  956. [events addObjectsFromArray:[syncPoint applyOperation:operation
  957. writesCache:writesCache
  958. serverCache:serverCache]];
  959. }
  960. return events;
  961. }
  962. }
  963. /**
  964. * Recursive helper for applyOperationToSyncPoints:
  965. */
  966. - (NSArray *)applyOperationDescendantsHelper:(id<FOperation>)operation
  967. syncPointTree:(FImmutableTree *)syncPointTree
  968. serverCache:(id<FNode>)serverCache
  969. writesCache:(FWriteTreeRef *)writesCache {
  970. FSyncPoint *syncPoint = syncPointTree.value;
  971. // If we don't have cached server data, see if we can get it from this
  972. // SyncPoint
  973. id<FNode> resolvedServerCache;
  974. if (serverCache == nil & syncPoint != nil) {
  975. resolvedServerCache =
  976. [syncPoint completeServerCacheAtPath:[FPath empty]];
  977. } else {
  978. resolvedServerCache = serverCache;
  979. }
  980. NSMutableArray *events = [[NSMutableArray alloc] init];
  981. [syncPointTree.children enumerateKeysAndObjectsUsingBlock:^(
  982. NSString *childKey, FImmutableTree *childTree,
  983. BOOL *stop) {
  984. id<FNode> childServerCache = nil;
  985. if (resolvedServerCache != nil) {
  986. childServerCache = [resolvedServerCache getImmediateChild:childKey];
  987. }
  988. FWriteTreeRef *childWritesCache =
  989. [writesCache childWriteTreeRef:childKey];
  990. id<FOperation> childOperation = [operation operationForChild:childKey];
  991. if (childOperation != nil) {
  992. [events addObjectsFromArray:
  993. [self applyOperationDescendantsHelper:childOperation
  994. syncPointTree:childTree
  995. serverCache:childServerCache
  996. writesCache:childWritesCache]];
  997. }
  998. }];
  999. if (syncPoint) {
  1000. [events
  1001. addObjectsFromArray:[syncPoint applyOperation:operation
  1002. writesCache:writesCache
  1003. serverCache:resolvedServerCache]];
  1004. }
  1005. return events;
  1006. }
  1007. @end