FSyncTree.m 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FSyncTree.h"
  17. #import "FAckUserWrite.h"
  18. #import "FAtomicNumber.h"
  19. #import "FCacheNode.h"
  20. #import "FChildrenNode.h"
  21. #import "FCompoundHash.h"
  22. #import "FCompoundWrite.h"
  23. #import "FEmptyNode.h"
  24. #import "FEventRaiser.h"
  25. #import "FEventRegistration.h"
  26. #import "FImmutableTree.h"
  27. #import "FKeepSyncedEventRegistration.h"
  28. #import "FListenComplete.h"
  29. #import "FListenProvider.h"
  30. #import "FMerge.h"
  31. #import "FNode.h"
  32. #import "FOperation.h"
  33. #import "FOperationSource.h"
  34. #import "FOverwrite.h"
  35. #import "FPath.h"
  36. #import "FPersistenceManager.h"
  37. #import "FQueryParams.h"
  38. #import "FQuerySpec.h"
  39. #import "FRangeMerge.h"
  40. #import "FServerValues.h"
  41. #import "FSnapshotHolder.h"
  42. #import "FSnapshotUtilities.h"
  43. #import "FSyncPoint.h"
  44. #import "FTupleRemovedQueriesEvents.h"
  45. #import "FUtilities.h"
  46. #import "FView.h"
  47. #import "FWriteRecord.h"
  48. #import "FWriteTree.h"
  49. #import "FWriteTreeRef.h"
  50. #import <FirebaseCore/FIRLogger.h>
  51. // Size after which we start including the compound hash
  52. static const NSUInteger kFSizeThresholdForCompoundHash = 1024;
  53. @interface FListenContainer : NSObject <FSyncTreeHash>
  54. @property(nonatomic, strong) FView *view;
  55. @property(nonatomic, copy) fbt_nsarray_nsstring onComplete;
  56. @end
  57. @implementation FListenContainer
  58. - (instancetype)initWithView:(FView *)view
  59. onComplete:(fbt_nsarray_nsstring)onComplete {
  60. self = [super init];
  61. if (self != nil) {
  62. self->_view = view;
  63. self->_onComplete = onComplete;
  64. }
  65. return self;
  66. }
  67. - (id<FNode>)serverCache {
  68. return self.view.serverCache;
  69. }
  70. - (FCompoundHash *)compoundHash {
  71. return [FCompoundHash fromNode:[self serverCache]];
  72. }
  73. - (NSString *)simpleHash {
  74. return [[self serverCache] dataHash];
  75. }
  76. - (BOOL)includeCompoundHash {
  77. return [FSnapshotUtilities estimateSerializedNodeSize:[self serverCache]] >
  78. kFSizeThresholdForCompoundHash;
  79. }
  80. @end
  81. @interface FSyncTree ()
  82. /**
  83. * Tree of SyncPoints. There's a SyncPoint at any location that has 1 or more
  84. * views.
  85. */
  86. @property(nonatomic, strong) FImmutableTree *syncPointTree;
  87. /**
  88. * A tree of all pending user writes (user-initiated set, transactions, updates,
  89. * etc)
  90. */
  91. @property(nonatomic, strong) FWriteTree *pendingWriteTree;
  92. /**
  93. * Maps tagId -> FTuplePathQueryParams
  94. */
  95. @property(nonatomic, strong) NSMutableDictionary *tagToQueryMap;
  96. @property(nonatomic, strong) NSMutableDictionary *queryToTagMap;
  97. @property(nonatomic, strong) FListenProvider *listenProvider;
  98. @property(nonatomic, strong) FPersistenceManager *persistenceManager;
  99. @property(nonatomic, strong) FAtomicNumber *queryTagCounter;
  100. @property(nonatomic, strong) NSMutableSet *keepSyncedQueries;
  101. @end
  102. /**
  103. * SyncTree is the central class for managing event callback registration, data
  104. * caching, views (query processing), and event generation. There are typically
  105. * two SyncTree instances for each Repo, one for the normal Firebase data, and
  106. * one for the .info data.
  107. *
  108. * It has a number of responsibilities, including:
  109. * - Tracking all user event callbacks (registered via addEventRegistration:
  110. * and removeEventRegistration:).
  111. * - Applying and caching data changes for user setValue:,
  112. * runTransactionBlock:, and updateChildValues: calls
  113. * (applyUserOverwriteAtPath:, applyUserMergeAtPath:).
  114. * - Applying and caching data changes for server data changes
  115. * (applyServerOverwriteAtPath:, applyServerMergeAtPath:).
  116. * - Generating user-facing events for server and user changes (all of the
  117. * apply* methods return the set of events that need to be raised as a result).
  118. * - Maintaining the appropriate set of server listens to ensure we are always
  119. * subscribed to the correct set of paths and queries to satisfy the current set
  120. * of user event callbacks (listens are started/stopped using the provided
  121. * listenProvider).
  122. *
  123. * NOTE: Although SyncTree tracks event callbacks and calculates events to
  124. * raise, the actual events are returned to the caller rather than raised
  125. * synchronously.
  126. */
  127. @implementation FSyncTree
  128. - (id)initWithListenProvider:(FListenProvider *)provider {
  129. return [self initWithPersistenceManager:nil listenProvider:provider];
  130. }
  131. - (id)initWithPersistenceManager:(FPersistenceManager *)persistenceManager
  132. listenProvider:(FListenProvider *)provider {
  133. self = [super init];
  134. if (self) {
  135. self.syncPointTree = [FImmutableTree empty];
  136. self.pendingWriteTree = [[FWriteTree alloc] init];
  137. self.tagToQueryMap = [[NSMutableDictionary alloc] init];
  138. self.queryToTagMap = [[NSMutableDictionary alloc] init];
  139. self.listenProvider = provider;
  140. self.persistenceManager = persistenceManager;
  141. self.queryTagCounter = [[FAtomicNumber alloc] init];
  142. self.keepSyncedQueries = [NSMutableSet set];
  143. }
  144. return self;
  145. }
  146. #pragma mark -
  147. #pragma mark Apply Operations
  148. /**
  149. * Apply data changes for a user-generated setValue: runTransactionBlock:
  150. * updateChildValues:, etc.
  151. * @return NSArray of FEvent to raise.
  152. */
  153. - (NSArray *)applyUserOverwriteAtPath:(FPath *)path
  154. newData:(id<FNode>)newData
  155. writeId:(NSInteger)writeId
  156. isVisible:(BOOL)visible {
  157. // Record pending write
  158. [self.pendingWriteTree addOverwriteAtPath:path
  159. newData:newData
  160. writeId:writeId
  161. isVisible:visible];
  162. if (!visible) {
  163. return @[];
  164. } else {
  165. FOverwrite *operation =
  166. [[FOverwrite alloc] initWithSource:[FOperationSource userInstance]
  167. path:path
  168. snap:newData];
  169. return [self applyOperationToSyncPoints:operation];
  170. }
  171. }
  172. /**
  173. * Apply the data from a user-generated updateChildValues: call
  174. * @return NSArray of FEvent to raise.
  175. */
  176. - (NSArray *)applyUserMergeAtPath:(FPath *)path
  177. changedChildren:(FCompoundWrite *)changedChildren
  178. writeId:(NSInteger)writeId {
  179. // Record pending merge
  180. [self.pendingWriteTree addMergeAtPath:path
  181. changedChildren:changedChildren
  182. writeId:writeId];
  183. FMerge *operation =
  184. [[FMerge alloc] initWithSource:[FOperationSource userInstance]
  185. path:path
  186. children:changedChildren];
  187. return [self applyOperationToSyncPoints:operation];
  188. }
  189. /**
  190. * Acknowledge a pending user write that was previously registered with
  191. * applyUserOverwriteAtPath: or applyUserMergeAtPath:
  192. * TODO[offline]: Taking a serverClock here is awkward, but server values are
  193. * awkward. :-(
  194. * @return NSArray of FEvent to raise.
  195. */
  196. - (NSArray *)ackUserWriteWithWriteId:(NSInteger)writeId
  197. revert:(BOOL)revert
  198. persist:(BOOL)persist
  199. clock:(id<FClock>)clock {
  200. FWriteRecord *write = [self.pendingWriteTree writeForId:writeId];
  201. BOOL needToReevaluate = [self.pendingWriteTree removeWriteId:writeId];
  202. if (write.visible) {
  203. if (persist) {
  204. [self.persistenceManager removeUserWrite:writeId];
  205. }
  206. if (!revert) {
  207. NSDictionary *serverValues =
  208. [FServerValues generateServerValues:clock];
  209. if ([write isOverwrite]) {
  210. id<FNode> resolvedNode =
  211. [FServerValues resolveDeferredValueSnapshot:write.overwrite
  212. withServerValues:serverValues];
  213. [self.persistenceManager applyUserWrite:resolvedNode
  214. toServerCacheAtPath:write.path];
  215. } else {
  216. FCompoundWrite *resolvedMerge = [FServerValues
  217. resolveDeferredValueCompoundWrite:write.merge
  218. withServerValues:serverValues];
  219. [self.persistenceManager applyUserMerge:resolvedMerge
  220. toServerCacheAtPath:write.path];
  221. }
  222. }
  223. }
  224. if (!needToReevaluate) {
  225. return @[];
  226. } else {
  227. __block FImmutableTree *affectedTree = [FImmutableTree empty];
  228. if (write.isOverwrite) {
  229. affectedTree = [affectedTree setValue:@YES atPath:[FPath empty]];
  230. } else {
  231. [write.merge
  232. enumerateWrites:^(FPath *path, id<FNode> node, BOOL *stop) {
  233. affectedTree = [affectedTree setValue:@YES atPath:path];
  234. }];
  235. }
  236. FAckUserWrite *operation =
  237. [[FAckUserWrite alloc] initWithPath:write.path
  238. affectedTree:affectedTree
  239. revert:revert];
  240. return [self applyOperationToSyncPoints:operation];
  241. }
  242. }
  243. /**
  244. * Apply new server data for the specified path
  245. * @return NSArray of FEvent to raise.
  246. */
  247. - (NSArray *)applyServerOverwriteAtPath:(FPath *)path
  248. newData:(id<FNode>)newData {
  249. [self.persistenceManager
  250. updateServerCacheWithNode:newData
  251. forQuery:[FQuerySpec defaultQueryAtPath:path]];
  252. FOverwrite *operation =
  253. [[FOverwrite alloc] initWithSource:[FOperationSource serverInstance]
  254. path:path
  255. snap:newData];
  256. return [self applyOperationToSyncPoints:operation];
  257. }
  258. /**
  259. * Applied new server data to be merged in at the specified path
  260. * @return NSArray of FEvent to raise.
  261. */
  262. - (NSArray *)applyServerMergeAtPath:(FPath *)path
  263. changedChildren:(FCompoundWrite *)changedChildren {
  264. [self.persistenceManager updateServerCacheWithMerge:changedChildren
  265. atPath:path];
  266. FMerge *operation =
  267. [[FMerge alloc] initWithSource:[FOperationSource serverInstance]
  268. path:path
  269. children:changedChildren];
  270. return [self applyOperationToSyncPoints:operation];
  271. }
  272. - (NSArray *)applyServerRangeMergeAtPath:(FPath *)path
  273. updates:(NSArray *)ranges {
  274. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  275. if (syncPoint == nil) {
  276. // Removed view, so it's safe to just ignore this update
  277. return @[];
  278. } else {
  279. // This could be for any "complete" (unfiltered) view, and if there is
  280. // more than one complete view, they should each have the same cache so
  281. // it doesn't matter which one we use.
  282. FView *view = [syncPoint completeView];
  283. if (view != nil) {
  284. id<FNode> serverNode = [view serverCache];
  285. for (FRangeMerge *merge in ranges) {
  286. serverNode = [merge applyToNode:serverNode];
  287. }
  288. return [self applyServerOverwriteAtPath:path newData:serverNode];
  289. } else {
  290. // There doesn't exist a view for this update, so it was removed and
  291. // it's safe to just ignore this range merge
  292. return @[];
  293. }
  294. }
  295. }
  296. /**
  297. * Apply a listen complete to a path
  298. * @return NSArray of FEvent to raise.
  299. */
  300. - (NSArray *)applyListenCompleteAtPath:(FPath *)path {
  301. [self.persistenceManager
  302. setQueryComplete:[FQuerySpec defaultQueryAtPath:path]];
  303. id<FOperation> operation = [[FListenComplete alloc]
  304. initWithSource:[FOperationSource serverInstance]
  305. path:path];
  306. return [self applyOperationToSyncPoints:operation];
  307. }
  308. /**
  309. * Apply a listen complete to a path
  310. * @return NSArray of FEvent to raise.
  311. */
  312. - (NSArray *)applyTaggedListenCompleteAtPath:(FPath *)path
  313. tagId:(NSNumber *)tagId {
  314. FQuerySpec *query = [self queryForTag:tagId];
  315. if (query != nil) {
  316. [self.persistenceManager setQueryComplete:query];
  317. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  318. id<FOperation> op = [[FListenComplete alloc]
  319. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  320. path:relativePath];
  321. return [self applyTaggedOperation:op atPath:query.path];
  322. } else {
  323. // We've already removed the query. No big deal, ignore the update.
  324. return @[];
  325. }
  326. }
  327. /**
  328. * Internal helper method to apply tagged operation
  329. */
  330. - (NSArray *)applyTaggedOperation:(id<FOperation>)operation
  331. atPath:(FPath *)path {
  332. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  333. NSAssert(syncPoint != nil,
  334. @"Missing sync point for query tag that we're tracking.");
  335. FWriteTreeRef *writesCache =
  336. [self.pendingWriteTree childWritesForPath:path];
  337. return [syncPoint applyOperation:operation
  338. writesCache:writesCache
  339. serverCache:nil];
  340. }
  341. /**
  342. * Apply new server data for the specified tagged query
  343. * @return NSArray of FEvent to raise.
  344. */
  345. - (NSArray *)applyTaggedQueryOverwriteAtPath:(FPath *)path
  346. newData:(id<FNode>)newData
  347. tagId:(NSNumber *)tagId {
  348. FQuerySpec *query = [self queryForTag:tagId];
  349. if (query != nil) {
  350. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  351. FQuerySpec *queryToOverwrite =
  352. relativePath.isEmpty ? query : [FQuerySpec defaultQueryAtPath:path];
  353. [self.persistenceManager updateServerCacheWithNode:newData
  354. forQuery:queryToOverwrite];
  355. FOverwrite *operation = [[FOverwrite alloc]
  356. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  357. path:relativePath
  358. snap:newData];
  359. return [self applyTaggedOperation:operation atPath:query.path];
  360. } else {
  361. // Query must have been removed already
  362. return @[];
  363. }
  364. }
  365. /**
  366. * Apply server data to be merged in for the specified tagged query
  367. * @return NSArray of FEvent to raise.
  368. */
  369. - (NSArray *)applyTaggedQueryMergeAtPath:(FPath *)path
  370. changedChildren:(FCompoundWrite *)changedChildren
  371. tagId:(NSNumber *)tagId {
  372. FQuerySpec *query = [self queryForTag:tagId];
  373. if (query != nil) {
  374. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  375. [self.persistenceManager updateServerCacheWithMerge:changedChildren
  376. atPath:path];
  377. FMerge *operation = [[FMerge alloc]
  378. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  379. path:relativePath
  380. children:changedChildren];
  381. return [self applyTaggedOperation:operation atPath:query.path];
  382. } else {
  383. // We've already removed the query. No big deal, ignore the update.
  384. return @[];
  385. }
  386. }
  387. - (NSArray *)applyTaggedServerRangeMergeAtPath:(FPath *)path
  388. updates:(NSArray *)ranges
  389. tagId:(NSNumber *)tagId {
  390. FQuerySpec *query = [self queryForTag:tagId];
  391. if (query != nil) {
  392. NSAssert([path isEqual:query.path],
  393. @"Tagged update path and query path must match");
  394. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  395. NSAssert(syncPoint != nil,
  396. @"Missing sync point for query tag that we're tracking.");
  397. FView *view = [syncPoint viewForQuery:query];
  398. NSAssert(view != nil,
  399. @"Missing view for query tag that we're tracking");
  400. id<FNode> serverNode = [view serverCache];
  401. for (FRangeMerge *merge in ranges) {
  402. serverNode = [merge applyToNode:serverNode];
  403. }
  404. return [self applyTaggedQueryOverwriteAtPath:path
  405. newData:serverNode
  406. tagId:tagId];
  407. } else {
  408. // We've already removed the query. No big deal, ignore the update.
  409. return @[];
  410. }
  411. }
  412. /**
  413. * Add an event callback for the specified query
  414. * @return NSArray of FEvent to raise.
  415. */
  416. - (NSArray *)addEventRegistration:(id<FEventRegistration>)eventRegistration
  417. forQuery:(FQuerySpec *)query {
  418. FPath *path = query.path;
  419. __block BOOL foundAncestorDefaultView = NO;
  420. [self.syncPointTree
  421. forEachOnPath:query.path
  422. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  423. foundAncestorDefaultView =
  424. foundAncestorDefaultView || [syncPoint hasCompleteView];
  425. return !foundAncestorDefaultView;
  426. }];
  427. [self.persistenceManager setQueryActive:query];
  428. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  429. if (syncPoint == nil) {
  430. syncPoint = [[FSyncPoint alloc]
  431. initWithPersistenceManager:self.persistenceManager];
  432. self.syncPointTree = [self.syncPointTree setValue:syncPoint
  433. atPath:path];
  434. }
  435. BOOL viewAlreadyExists = [syncPoint viewExistsForQuery:query];
  436. NSArray *events;
  437. if (viewAlreadyExists) {
  438. events = [syncPoint addEventRegistration:eventRegistration
  439. forExistingViewForQuery:query];
  440. } else {
  441. if (![query loadsAllData]) {
  442. // We need to track a tag for this query
  443. NSAssert(self.queryToTagMap[query] == nil,
  444. @"View does not exist, but we have a tag");
  445. NSNumber *tagId = [self.queryTagCounter getAndIncrement];
  446. self.queryToTagMap[query] = tagId;
  447. self.tagToQueryMap[tagId] = query;
  448. }
  449. FWriteTreeRef *writesCache =
  450. [self.pendingWriteTree childWritesForPath:path];
  451. FCacheNode *serverCache = [self serverCacheForQuery:query];
  452. events = [syncPoint addEventRegistration:eventRegistration
  453. forNonExistingViewForQuery:query
  454. writesCache:writesCache
  455. serverCache:serverCache];
  456. // There was no view and no default listen
  457. if (!foundAncestorDefaultView) {
  458. FView *view = [syncPoint viewForQuery:query];
  459. NSMutableArray *mutableEvents = [events mutableCopy];
  460. [mutableEvents
  461. addObjectsFromArray:[self setupListenerOnQuery:query
  462. view:view]];
  463. events = mutableEvents;
  464. }
  465. }
  466. return events;
  467. }
  468. - (FCacheNode *)serverCacheForQuery:(FQuerySpec *)query {
  469. __block id<FNode> serverCacheNode = nil;
  470. [self.syncPointTree
  471. forEachOnPath:query.path
  472. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  473. FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint
  474. to:query.path];
  475. serverCacheNode =
  476. [syncPoint completeServerCacheAtPath:relativePath];
  477. return serverCacheNode == nil;
  478. }];
  479. FCacheNode *serverCache;
  480. if (serverCacheNode != nil) {
  481. FIndexedNode *indexed =
  482. [FIndexedNode indexedNodeWithNode:serverCacheNode
  483. index:query.index];
  484. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
  485. isFullyInitialized:YES
  486. isFiltered:NO];
  487. } else {
  488. FCacheNode *persistenceServerCache =
  489. [self.persistenceManager serverCacheForQuery:query];
  490. if (persistenceServerCache.isFullyInitialized) {
  491. serverCache = persistenceServerCache;
  492. } else {
  493. serverCacheNode = [FEmptyNode emptyNode];
  494. FImmutableTree *subtree =
  495. [self.syncPointTree subtreeAtPath:query.path];
  496. [subtree
  497. forEachChild:^(NSString *childKey, FSyncPoint *childSyncPoint) {
  498. id<FNode> completeCache =
  499. [childSyncPoint completeServerCacheAtPath:[FPath empty]];
  500. if (completeCache) {
  501. serverCacheNode =
  502. [serverCacheNode updateImmediateChild:childKey
  503. withNewChild:completeCache];
  504. }
  505. }];
  506. // Fill the node with any available children we have
  507. [persistenceServerCache.node
  508. enumerateChildrenUsingBlock:^(NSString *key, id<FNode> node,
  509. BOOL *stop) {
  510. if (![serverCacheNode hasChild:key]) {
  511. serverCacheNode =
  512. [serverCacheNode updateImmediateChild:key
  513. withNewChild:node];
  514. }
  515. }];
  516. FIndexedNode *indexed =
  517. [FIndexedNode indexedNodeWithNode:serverCacheNode
  518. index:query.index];
  519. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
  520. isFullyInitialized:NO
  521. isFiltered:NO];
  522. }
  523. }
  524. return serverCache;
  525. }
  526. /**
  527. * Remove event callback(s).
  528. *
  529. * If query is the default query, we'll check all queries for the specified
  530. * eventRegistration. If eventRegistration is null, we'll remove all callbacks
  531. * for the specified query/queries.
  532. *
  533. * @param eventRegistration if nil, all callbacks are removed
  534. * @param cancelError If provided, appropriate cancel events will be returned
  535. * @return NSArray of FEvent to raise.
  536. */
  537. - (NSArray *)removeEventRegistration:(id<FEventRegistration>)eventRegistration
  538. forQuery:(FQuerySpec *)query
  539. cancelError:(NSError *)cancelError {
  540. // Find the syncPoint first. Then deal with whether or not it has matching
  541. // listeners
  542. FPath *path = query.path;
  543. FSyncPoint *maybeSyncPoint = [self.syncPointTree valueAtPath:path];
  544. NSArray *cancelEvents = @[];
  545. // A removal on a default query affects all queries at that location. A
  546. // removal on an indexed query, even one without other query constraints,
  547. // does *not* affect all queries at that location. So this check must be for
  548. // 'default', and not loadsAllData:
  549. if (maybeSyncPoint &&
  550. ([query isDefault] || [maybeSyncPoint viewExistsForQuery:query])) {
  551. FTupleRemovedQueriesEvents *removedAndEvents =
  552. [maybeSyncPoint removeEventRegistration:eventRegistration
  553. forQuery:query
  554. cancelError:cancelError];
  555. if ([maybeSyncPoint isEmpty]) {
  556. self.syncPointTree = [self.syncPointTree removeValueAtPath:path];
  557. }
  558. NSArray *removed = removedAndEvents.removedQueries;
  559. cancelEvents = removedAndEvents.cancelEvents;
  560. // We may have just removed one of many listeners and can short-circuit
  561. // this whole process We may also not have removed a default listener,
  562. // in which case all of the descendant listeners should already be
  563. // properly set up.
  564. //
  565. // Since indexed queries can shadow if they don't have other query
  566. // constraints, check for loadsAllData: instead of isDefault:
  567. NSUInteger defaultQueryIndex = [removed
  568. indexOfObjectPassingTest:^BOOL(FQuerySpec *q, NSUInteger idx,
  569. BOOL *stop) {
  570. return [q loadsAllData];
  571. }];
  572. BOOL removingDefault = defaultQueryIndex != NSNotFound;
  573. [removed enumerateObjectsUsingBlock:^(FQuerySpec *query, NSUInteger idx,
  574. BOOL *stop) {
  575. [self.persistenceManager setQueryInactive:query];
  576. }];
  577. NSNumber *covered = [self.syncPointTree
  578. findOnPath:path
  579. andApplyBlock:^id(FPath *relativePath,
  580. FSyncPoint *parentSyncPoint) {
  581. return
  582. [NSNumber numberWithBool:[parentSyncPoint hasCompleteView]];
  583. }];
  584. if (removingDefault && ![covered boolValue]) {
  585. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  586. // There are potentially child listeners. Determine what if any
  587. // listens we need to send before executing the removal
  588. if (![subtree isEmpty]) {
  589. // We need to fold over our subtree and collect the listeners to
  590. // send
  591. NSArray *newViews =
  592. [self collectDistinctViewsForSubTree:subtree];
  593. // Ok, we've collected all the listens we need. Set them up.
  594. [newViews enumerateObjectsUsingBlock:^(
  595. FView *view, NSUInteger idx, BOOL *stop) {
  596. FQuerySpec *newQuery = view.query;
  597. FListenContainer *listenContainer =
  598. [self createListenerForView:view];
  599. self.listenProvider.startListening(
  600. [self queryForListening:newQuery],
  601. [self tagForQuery:newQuery], listenContainer,
  602. listenContainer.onComplete);
  603. }];
  604. } else {
  605. // There's nothing below us, so nothing we need to start
  606. // listening on
  607. }
  608. }
  609. // If we removed anything and we're not covered by a higher up listen,
  610. // we need to stop listening on this query. The above block has us
  611. // covered in terms of making sure we're set up on listens lower in the
  612. // tree. Also, note that if we have a cancelError, it's already been
  613. // removed at the provider level.
  614. if (![covered boolValue] && [removed count] > 0 && cancelError == nil) {
  615. // If we removed a default, then we weren't listening on any of the
  616. // other queries here. Just cancel the one default. Otherwise, we
  617. // need to iterate through and cancel each individual query
  618. if (removingDefault) {
  619. // We don't tag default listeners
  620. self.listenProvider.stopListening(
  621. [self queryForListening:query], nil);
  622. } else {
  623. [removed
  624. enumerateObjectsUsingBlock:^(FQuerySpec *queryToRemove,
  625. NSUInteger idx, BOOL *stop) {
  626. NSNumber *tagToRemove =
  627. [self.queryToTagMap objectForKey:queryToRemove];
  628. self.listenProvider.stopListening(
  629. [self queryForListening:queryToRemove], tagToRemove);
  630. }];
  631. }
  632. }
  633. // Now, clear all the tags we're tracking for the removed listens.
  634. [self removeTags:removed];
  635. } else {
  636. // No-op, this listener must've been already removed
  637. }
  638. return cancelEvents;
  639. }
  640. - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)keepSynced {
  641. // Only do something if we actually need to add/remove an event registration
  642. if (keepSynced && ![self.keepSyncedQueries containsObject:query]) {
  643. [self addEventRegistration:[FKeepSyncedEventRegistration instance]
  644. forQuery:query];
  645. [self.keepSyncedQueries addObject:query];
  646. } else if (!keepSynced && [self.keepSyncedQueries containsObject:query]) {
  647. [self removeEventRegistration:[FKeepSyncedEventRegistration instance]
  648. forQuery:query
  649. cancelError:nil];
  650. [self.keepSyncedQueries removeObject:query];
  651. }
  652. }
  653. - (NSArray *)removeAllWrites {
  654. [self.persistenceManager removeAllUserWrites];
  655. NSArray *removedWrites = [self.pendingWriteTree removeAllWrites];
  656. if (removedWrites.count > 0) {
  657. FImmutableTree *affectedTree =
  658. [[FImmutableTree empty] setValue:@YES atPath:[FPath empty]];
  659. return [self applyOperationToSyncPoints:[[FAckUserWrite alloc]
  660. initWithPath:[FPath empty]
  661. affectedTree:affectedTree
  662. revert:YES]];
  663. } else {
  664. return @[];
  665. }
  666. }
  667. /**
  668. * Returns a complete cache, if we have one, of the data at a particular path.
  669. * The location must have a listener above it, but as this is only used by
  670. * transaction code, that should always be the case anyways.
  671. *
  672. * Note: this method will *include* hidden writes from transaction with
  673. * applyLocally set to false.
  674. * @param path The path to the data we want
  675. * @param writeIdsToExclude A specific set to be excluded
  676. */
  677. - (id<FNode>)calcCompleteEventCacheAtPath:(FPath *)path
  678. excludeWriteIds:(NSArray *)writeIdsToExclude {
  679. BOOL includeHiddenSets = YES;
  680. FWriteTree *writeTree = self.pendingWriteTree;
  681. id<FNode> serverCache = [self.syncPointTree
  682. findOnPath:path
  683. andApplyBlock:^id<FNode>(FPath *pathSoFar, FSyncPoint *syncPoint) {
  684. FPath *relativePath = [FPath relativePathFrom:pathSoFar to:path];
  685. id<FNode> serverCache =
  686. [syncPoint completeServerCacheAtPath:relativePath];
  687. if (serverCache) {
  688. return serverCache;
  689. } else {
  690. return nil;
  691. }
  692. }];
  693. return [writeTree calculateCompleteEventCacheAtPath:path
  694. completeServerCache:serverCache
  695. excludeWriteIds:writeIdsToExclude
  696. includeHiddenWrites:includeHiddenSets];
  697. }
  698. #pragma mark -
  699. #pragma mark Private Methods
  700. /**
  701. * This collapses multiple unfiltered views into a single view, since we only
  702. * need a single listener for them.
  703. * @return NSArray of FView
  704. */
  705. - (NSArray *)collectDistinctViewsForSubTree:(FImmutableTree *)subtree {
  706. return [subtree foldWithBlock:^NSArray *(FPath *relativePath,
  707. FSyncPoint *maybeChildSyncPoint,
  708. NSDictionary *childMap) {
  709. if (maybeChildSyncPoint && [maybeChildSyncPoint hasCompleteView]) {
  710. FView *completeView = [maybeChildSyncPoint completeView];
  711. return @[ completeView ];
  712. } else {
  713. // No complete view here, flatten any deeper listens into an array
  714. NSMutableArray *views = [[NSMutableArray alloc] init];
  715. if (maybeChildSyncPoint) {
  716. views = [[maybeChildSyncPoint queryViews] mutableCopy];
  717. }
  718. [childMap enumerateKeysAndObjectsUsingBlock:^(
  719. NSString *childKey, NSArray *childViews, BOOL *stop) {
  720. [views addObjectsFromArray:childViews];
  721. }];
  722. return views;
  723. }
  724. }];
  725. }
  726. /**
  727. * @param queries NSArray of FQuerySpec
  728. */
  729. - (void)removeTags:(NSArray *)queries {
  730. [queries enumerateObjectsUsingBlock:^(FQuerySpec *removedQuery,
  731. NSUInteger idx, BOOL *stop) {
  732. if (![removedQuery loadsAllData]) {
  733. // We should have a tag for this
  734. NSNumber *removedQueryTag = self.queryToTagMap[removedQuery];
  735. [self.queryToTagMap removeObjectForKey:removedQuery];
  736. [self.tagToQueryMap removeObjectForKey:removedQueryTag];
  737. }
  738. }];
  739. }
  740. - (FQuerySpec *)queryForListening:(FQuerySpec *)query {
  741. if (query.loadsAllData && !query.isDefault) {
  742. // We treat queries that load all data as default queries
  743. return [FQuerySpec defaultQueryAtPath:query.path];
  744. } else {
  745. return query;
  746. }
  747. }
  748. /**
  749. * For a given new listen, manage the de-duplication of outstanding
  750. * subscriptions.
  751. * @return NSArray of FEvent events to support synchronous data sources
  752. */
  753. - (NSArray *)setupListenerOnQuery:(FQuerySpec *)query view:(FView *)view {
  754. FPath *path = query.path;
  755. NSNumber *tagId = [self tagForQuery:query];
  756. FListenContainer *listenContainer = [self createListenerForView:view];
  757. NSArray *events = self.listenProvider.startListening(
  758. [self queryForListening:query], tagId, listenContainer,
  759. listenContainer.onComplete);
  760. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  761. // The root of this subtree has our query. We're here because we definitely
  762. // need to send a listen for that, but we may need to shadow other listens
  763. // as well.
  764. if (tagId != nil) {
  765. NSAssert(![subtree.value hasCompleteView],
  766. @"If we're adding a query, it shouldn't be shadowed");
  767. } else {
  768. // Shadow everything at or below this location, this is a default
  769. // listener.
  770. NSArray *queriesToStop =
  771. [subtree foldWithBlock:^id(FPath *relativePath,
  772. FSyncPoint *maybeChildSyncPoint,
  773. NSDictionary *childMap) {
  774. if (![relativePath isEmpty] && maybeChildSyncPoint != nil &&
  775. [maybeChildSyncPoint hasCompleteView]) {
  776. return @[ [maybeChildSyncPoint completeView].query ];
  777. } else {
  778. // No default listener here, flatten any deeper queries into
  779. // an array
  780. NSMutableArray *queries = [[NSMutableArray alloc] init];
  781. if (maybeChildSyncPoint != nil) {
  782. for (FView *view in [maybeChildSyncPoint queryViews]) {
  783. [queries addObject:view.query];
  784. }
  785. }
  786. [childMap
  787. enumerateKeysAndObjectsUsingBlock:^(
  788. NSString *key, NSArray *childQueries, BOOL *stop) {
  789. [queries addObjectsFromArray:childQueries];
  790. }];
  791. return queries;
  792. }
  793. }];
  794. for (FQuerySpec *queryToStop in queriesToStop) {
  795. self.listenProvider.stopListening(
  796. [self queryForListening:queryToStop],
  797. [self tagForQuery:queryToStop]);
  798. }
  799. }
  800. return events;
  801. }
  802. - (FListenContainer *)createListenerForView:(FView *)view {
  803. FQuerySpec *query = view.query;
  804. NSNumber *tagId = [self tagForQuery:query];
  805. FListenContainer *listenContainer = [[FListenContainer alloc]
  806. initWithView:view
  807. onComplete:^(NSString *status) {
  808. if ([status isEqualToString:@"ok"]) {
  809. if (tagId != nil) {
  810. return [self applyTaggedListenCompleteAtPath:query.path
  811. tagId:tagId];
  812. } else {
  813. return [self applyListenCompleteAtPath:query.path];
  814. }
  815. } else {
  816. // If a listen failed, kill all of the listeners here, not just
  817. // the one that triggered the error. Note that this may need to
  818. // be scoped to just this listener if we change permissions on
  819. // filtered children
  820. NSError *error = [FUtilities errorForStatus:status
  821. andReason:nil];
  822. FFWarn(@"I-RDB038012", @"Listener at %@ failed: %@", query.path,
  823. status);
  824. return [self removeEventRegistration:nil
  825. forQuery:query
  826. cancelError:error];
  827. }
  828. }];
  829. return listenContainer;
  830. }
  831. /**
  832. * @return The query associated with the given tag, if we have one
  833. */
  834. - (FQuerySpec *)queryForTag:(NSNumber *)tagId {
  835. return self.tagToQueryMap[tagId];
  836. }
  837. /**
  838. * @return The tag associated with the given query
  839. */
  840. - (NSNumber *)tagForQuery:(FQuerySpec *)query {
  841. return self.queryToTagMap[query];
  842. }
  843. #pragma mark -
  844. #pragma mark applyOperation Helpers
  845. /**
  846. * A helper method that visits all descendant and ancestor SyncPoints, applying
  847. the operation.
  848. *
  849. * NOTES:
  850. * - Descendant SyncPoints will be visited first (since we raise events
  851. depth-first).
  852. * - We call applyOperation: on each SyncPoint passing three things:
  853. * 1. A version of the Operation that has been made relative to the SyncPoint
  854. location.
  855. * 2. A WriteTreeRef of any writes we have cached at the SyncPoint location.
  856. * 3. A snapshot Node with cached server data, if we have it.
  857. * - We concatenate all of the events returned by each SyncPoint and return the
  858. result.
  859. *
  860. * @return Array of FEvent
  861. */
  862. - (NSArray *)applyOperationToSyncPoints:(id<FOperation>)operation {
  863. return [self applyOperationHelper:operation
  864. syncPointTree:self.syncPointTree
  865. serverCache:nil
  866. writesCache:[self.pendingWriteTree
  867. childWritesForPath:[FPath empty]]];
  868. }
  869. /**
  870. * Recursive helper for applyOperationToSyncPoints_
  871. */
  872. - (NSArray *)applyOperationHelper:(id<FOperation>)operation
  873. syncPointTree:(FImmutableTree *)syncPointTree
  874. serverCache:(id<FNode>)serverCache
  875. writesCache:(FWriteTreeRef *)writesCache {
  876. if ([operation.path isEmpty]) {
  877. return [self applyOperationDescendantsHelper:operation
  878. syncPointTree:syncPointTree
  879. serverCache:serverCache
  880. writesCache:writesCache];
  881. } else {
  882. FSyncPoint *syncPoint = syncPointTree.value;
  883. // If we don't have cached server data, see if we can get it from this
  884. // SyncPoint
  885. if (serverCache == nil && syncPoint != nil) {
  886. serverCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
  887. }
  888. NSMutableArray *events = [[NSMutableArray alloc] init];
  889. NSString *childKey = [operation.path getFront];
  890. id<FOperation> childOperation = [operation operationForChild:childKey];
  891. FImmutableTree *childTree = [syncPointTree.children get:childKey];
  892. if (childTree != nil && childOperation != nil) {
  893. id<FNode> childServerCache =
  894. serverCache ? [serverCache getImmediateChild:childKey] : nil;
  895. FWriteTreeRef *childWritesCache =
  896. [writesCache childWriteTreeRef:childKey];
  897. [events
  898. addObjectsFromArray:[self
  899. applyOperationHelper:childOperation
  900. syncPointTree:childTree
  901. serverCache:childServerCache
  902. writesCache:childWritesCache]];
  903. }
  904. if (syncPoint) {
  905. [events addObjectsFromArray:[syncPoint applyOperation:operation
  906. writesCache:writesCache
  907. serverCache:serverCache]];
  908. }
  909. return events;
  910. }
  911. }
  912. /**
  913. * Recursive helper for applyOperationToSyncPoints:
  914. */
  915. - (NSArray *)applyOperationDescendantsHelper:(id<FOperation>)operation
  916. syncPointTree:(FImmutableTree *)syncPointTree
  917. serverCache:(id<FNode>)serverCache
  918. writesCache:(FWriteTreeRef *)writesCache {
  919. FSyncPoint *syncPoint = syncPointTree.value;
  920. // If we don't have cached server data, see if we can get it from this
  921. // SyncPoint
  922. id<FNode> resolvedServerCache;
  923. if (serverCache == nil & syncPoint != nil) {
  924. resolvedServerCache =
  925. [syncPoint completeServerCacheAtPath:[FPath empty]];
  926. } else {
  927. resolvedServerCache = serverCache;
  928. }
  929. NSMutableArray *events = [[NSMutableArray alloc] init];
  930. [syncPointTree.children enumerateKeysAndObjectsUsingBlock:^(
  931. NSString *childKey, FImmutableTree *childTree,
  932. BOOL *stop) {
  933. id<FNode> childServerCache = nil;
  934. if (resolvedServerCache != nil) {
  935. childServerCache = [resolvedServerCache getImmediateChild:childKey];
  936. }
  937. FWriteTreeRef *childWritesCache =
  938. [writesCache childWriteTreeRef:childKey];
  939. id<FOperation> childOperation = [operation operationForChild:childKey];
  940. if (childOperation != nil) {
  941. [events addObjectsFromArray:
  942. [self applyOperationDescendantsHelper:childOperation
  943. syncPointTree:childTree
  944. serverCache:childServerCache
  945. writesCache:childWritesCache]];
  946. }
  947. }];
  948. if (syncPoint) {
  949. [events
  950. addObjectsFromArray:[syncPoint applyOperation:operation
  951. writesCache:writesCache
  952. serverCache:resolvedServerCache]];
  953. }
  954. return events;
  955. }
  956. @end