FSyncTree.m 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FirebaseDatabase/Sources/Core/FSyncTree.h"
  17. #import "FirebaseCore/Sources/Private/FirebaseCoreInternal.h"
  18. #import "FirebaseDatabase/Sources/Core/FCompoundHash.h"
  19. #import "FirebaseDatabase/Sources/Core/FListenProvider.h"
  20. #import "FirebaseDatabase/Sources/Core/FQueryParams.h"
  21. #import "FirebaseDatabase/Sources/Core/FQuerySpec.h"
  22. #import "FirebaseDatabase/Sources/Core/FRangeMerge.h"
  23. #import "FirebaseDatabase/Sources/Core/FServerValues.h"
  24. #import "FirebaseDatabase/Sources/Core/FSnapshotHolder.h"
  25. #import "FirebaseDatabase/Sources/Core/FSyncPoint.h"
  26. #import "FirebaseDatabase/Sources/Core/FWriteRecord.h"
  27. #import "FirebaseDatabase/Sources/Core/FWriteTree.h"
  28. #import "FirebaseDatabase/Sources/Core/FWriteTreeRef.h"
  29. #import "FirebaseDatabase/Sources/Core/Operation/FAckUserWrite.h"
  30. #import "FirebaseDatabase/Sources/Core/Operation/FMerge.h"
  31. #import "FirebaseDatabase/Sources/Core/Operation/FOperation.h"
  32. #import "FirebaseDatabase/Sources/Core/Operation/FOperationSource.h"
  33. #import "FirebaseDatabase/Sources/Core/Operation/FOverwrite.h"
  34. #import "FirebaseDatabase/Sources/Core/Utilities/FImmutableTree.h"
  35. #import "FirebaseDatabase/Sources/Core/Utilities/FPath.h"
  36. #import "FirebaseDatabase/Sources/Core/View/FCacheNode.h"
  37. #import "FirebaseDatabase/Sources/Core/View/FEventRaiser.h"
  38. #import "FirebaseDatabase/Sources/Core/View/FEventRegistration.h"
  39. #import "FirebaseDatabase/Sources/Core/View/FKeepSyncedEventRegistration.h"
  40. #import "FirebaseDatabase/Sources/Core/View/FView.h"
  41. #import "FirebaseDatabase/Sources/FListenComplete.h"
  42. #import "FirebaseDatabase/Sources/Persistence/FPersistenceManager.h"
  43. #import "FirebaseDatabase/Sources/Snapshot/FChildrenNode.h"
  44. #import "FirebaseDatabase/Sources/Snapshot/FCompoundWrite.h"
  45. #import "FirebaseDatabase/Sources/Snapshot/FEmptyNode.h"
  46. #import "FirebaseDatabase/Sources/Snapshot/FNode.h"
  47. #import "FirebaseDatabase/Sources/Snapshot/FSnapshotUtilities.h"
  48. #import "FirebaseDatabase/Sources/Utilities/FAtomicNumber.h"
  49. #import "FirebaseDatabase/Sources/Utilities/FUtilities.h"
  50. #import "FirebaseDatabase/Sources/Utilities/Tuples/FTupleRemovedQueriesEvents.h"
  51. // Size after which we start including the compound hash
  52. static const NSUInteger kFSizeThresholdForCompoundHash = 1024;
  53. @interface FListenContainer : NSObject <FSyncTreeHash>
  54. @property(nonatomic, strong) FView *view;
  55. @property(nonatomic, copy) fbt_nsarray_nsstring onComplete;
  56. @end
  57. @implementation FListenContainer
  58. - (instancetype)initWithView:(FView *)view
  59. onComplete:(fbt_nsarray_nsstring)onComplete {
  60. self = [super init];
  61. if (self != nil) {
  62. self->_view = view;
  63. self->_onComplete = onComplete;
  64. }
  65. return self;
  66. }
  67. - (id<FNode>)serverCache {
  68. return self.view.serverCache;
  69. }
  70. - (FCompoundHash *)compoundHash {
  71. return [FCompoundHash fromNode:[self serverCache]];
  72. }
  73. - (NSString *)simpleHash {
  74. return [[self serverCache] dataHash];
  75. }
  76. - (BOOL)includeCompoundHash {
  77. return [FSnapshotUtilities estimateSerializedNodeSize:[self serverCache]] >
  78. kFSizeThresholdForCompoundHash;
  79. }
  80. @end
  81. @interface FSyncTree ()
  82. /**
  83. * Tree of SyncPoints. There's a SyncPoint at any location that has 1 or more
  84. * views.
  85. */
  86. @property(nonatomic, strong) FImmutableTree *syncPointTree;
  87. /**
  88. * A tree of all pending user writes (user-initiated set, transactions, updates,
  89. * etc)
  90. */
  91. @property(nonatomic, strong) FWriteTree *pendingWriteTree;
  92. /**
  93. * Maps tagId -> FTuplePathQueryParams
  94. */
  95. @property(nonatomic, strong) NSMutableDictionary *tagToQueryMap;
  96. @property(nonatomic, strong) NSMutableDictionary *queryToTagMap;
  97. @property(nonatomic, strong) FListenProvider *listenProvider;
  98. @property(nonatomic, strong) FPersistenceManager *persistenceManager;
  99. @property(nonatomic, strong) FAtomicNumber *queryTagCounter;
  100. @property(nonatomic, strong) NSMutableSet *keepSyncedQueries;
  101. @end
  102. /**
  103. * SyncTree is the central class for managing event callback registration, data
  104. * caching, views (query processing), and event generation. There are typically
  105. * two SyncTree instances for each Repo, one for the normal Firebase data, and
  106. * one for the .info data.
  107. *
  108. * It has a number of responsibilities, including:
  109. * - Tracking all user event callbacks (registered via addEventRegistration:
  110. * and removeEventRegistration:).
  111. * - Applying and caching data changes for user setValue:,
  112. * runTransactionBlock:, and updateChildValues: calls
  113. * (applyUserOverwriteAtPath:, applyUserMergeAtPath:).
  114. * - Applying and caching data changes for server data changes
  115. * (applyServerOverwriteAtPath:, applyServerMergeAtPath:).
  116. * - Generating user-facing events for server and user changes (all of the
  117. * apply* methods return the set of events that need to be raised as a result).
  118. * - Maintaining the appropriate set of server listens to ensure we are always
  119. * subscribed to the correct set of paths and queries to satisfy the current set
  120. * of user event callbacks (listens are started/stopped using the provided
  121. * listenProvider).
  122. *
  123. * NOTE: Although SyncTree tracks event callbacks and calculates events to
  124. * raise, the actual events are returned to the caller rather than raised
  125. * synchronously.
  126. */
  127. @implementation FSyncTree
  128. - (id)initWithListenProvider:(FListenProvider *)provider {
  129. return [self initWithPersistenceManager:nil listenProvider:provider];
  130. }
  131. - (id)initWithPersistenceManager:(FPersistenceManager *)persistenceManager
  132. listenProvider:(FListenProvider *)provider {
  133. self = [super init];
  134. if (self) {
  135. self.syncPointTree = [FImmutableTree empty];
  136. self.pendingWriteTree = [[FWriteTree alloc] init];
  137. self.tagToQueryMap = [[NSMutableDictionary alloc] init];
  138. self.queryToTagMap = [[NSMutableDictionary alloc] init];
  139. self.listenProvider = provider;
  140. self.persistenceManager = persistenceManager;
  141. self.queryTagCounter = [[FAtomicNumber alloc] init];
  142. self.keepSyncedQueries = [NSMutableSet set];
  143. }
  144. return self;
  145. }
  146. #pragma mark -
  147. #pragma mark Apply Operations
  148. /**
  149. * Apply data changes for a user-generated setValue: runTransactionBlock:
  150. * updateChildValues:, etc.
  151. * @return NSArray of FEvent to raise.
  152. */
  153. - (NSArray *)applyUserOverwriteAtPath:(FPath *)path
  154. newData:(id<FNode>)newData
  155. writeId:(NSInteger)writeId
  156. isVisible:(BOOL)visible {
  157. // Record pending write
  158. [self.pendingWriteTree addOverwriteAtPath:path
  159. newData:newData
  160. writeId:writeId
  161. isVisible:visible];
  162. if (!visible) {
  163. return @[];
  164. } else {
  165. FOverwrite *operation =
  166. [[FOverwrite alloc] initWithSource:[FOperationSource userInstance]
  167. path:path
  168. snap:newData];
  169. return [self applyOperationToSyncPoints:operation];
  170. }
  171. }
  172. /**
  173. * Apply the data from a user-generated updateChildValues: call
  174. * @return NSArray of FEvent to raise.
  175. */
  176. - (NSArray *)applyUserMergeAtPath:(FPath *)path
  177. changedChildren:(FCompoundWrite *)changedChildren
  178. writeId:(NSInteger)writeId {
  179. // Record pending merge
  180. [self.pendingWriteTree addMergeAtPath:path
  181. changedChildren:changedChildren
  182. writeId:writeId];
  183. FMerge *operation =
  184. [[FMerge alloc] initWithSource:[FOperationSource userInstance]
  185. path:path
  186. children:changedChildren];
  187. return [self applyOperationToSyncPoints:operation];
  188. }
  189. /**
  190. * Acknowledge a pending user write that was previously registered with
  191. * applyUserOverwriteAtPath: or applyUserMergeAtPath:
  192. * TODO[offline]: Taking a serverClock here is awkward, but server values are
  193. * awkward. :-(
  194. * @return NSArray of FEvent to raise.
  195. */
  196. - (NSArray *)ackUserWriteWithWriteId:(NSInteger)writeId
  197. revert:(BOOL)revert
  198. persist:(BOOL)persist
  199. clock:(id<FClock>)clock {
  200. FWriteRecord *write = [self.pendingWriteTree writeForId:writeId];
  201. BOOL needToReevaluate = [self.pendingWriteTree removeWriteId:writeId];
  202. if (write.visible) {
  203. if (persist) {
  204. [self.persistenceManager removeUserWrite:writeId];
  205. }
  206. if (!revert) {
  207. NSDictionary *serverValues =
  208. [FServerValues generateServerValues:clock];
  209. if ([write isOverwrite]) {
  210. id<FNode> resolvedNode =
  211. [FServerValues resolveDeferredValueSnapshot:write.overwrite
  212. withSyncTree:self
  213. atPath:write.path
  214. serverValues:serverValues];
  215. [self.persistenceManager applyUserWrite:resolvedNode
  216. toServerCacheAtPath:write.path];
  217. } else {
  218. FCompoundWrite *resolvedMerge = [FServerValues
  219. resolveDeferredValueCompoundWrite:write.merge
  220. withSyncTree:self
  221. atPath:write.path
  222. serverValues:serverValues];
  223. [self.persistenceManager applyUserMerge:resolvedMerge
  224. toServerCacheAtPath:write.path];
  225. }
  226. }
  227. }
  228. if (!needToReevaluate) {
  229. return @[];
  230. } else {
  231. __block FImmutableTree *affectedTree = [FImmutableTree empty];
  232. if (write.isOverwrite) {
  233. affectedTree = [affectedTree setValue:@YES atPath:[FPath empty]];
  234. } else {
  235. [write.merge
  236. enumerateWrites:^(FPath *path, id<FNode> node, BOOL *stop) {
  237. affectedTree = [affectedTree setValue:@YES atPath:path];
  238. }];
  239. }
  240. FAckUserWrite *operation =
  241. [[FAckUserWrite alloc] initWithPath:write.path
  242. affectedTree:affectedTree
  243. revert:revert];
  244. return [self applyOperationToSyncPoints:operation];
  245. }
  246. }
  247. /**
  248. * Apply new server data for the specified path
  249. * @return NSArray of FEvent to raise.
  250. */
  251. - (NSArray *)applyServerOverwriteAtPath:(FPath *)path
  252. newData:(id<FNode>)newData {
  253. [self.persistenceManager
  254. updateServerCacheWithNode:newData
  255. forQuery:[FQuerySpec defaultQueryAtPath:path]];
  256. FOverwrite *operation =
  257. [[FOverwrite alloc] initWithSource:[FOperationSource serverInstance]
  258. path:path
  259. snap:newData];
  260. return [self applyOperationToSyncPoints:operation];
  261. }
  262. /**
  263. * Applied new server data to be merged in at the specified path
  264. * @return NSArray of FEvent to raise.
  265. */
  266. - (NSArray *)applyServerMergeAtPath:(FPath *)path
  267. changedChildren:(FCompoundWrite *)changedChildren {
  268. [self.persistenceManager updateServerCacheWithMerge:changedChildren
  269. atPath:path];
  270. FMerge *operation =
  271. [[FMerge alloc] initWithSource:[FOperationSource serverInstance]
  272. path:path
  273. children:changedChildren];
  274. return [self applyOperationToSyncPoints:operation];
  275. }
  276. - (NSArray *)applyServerRangeMergeAtPath:(FPath *)path
  277. updates:(NSArray *)ranges {
  278. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  279. if (syncPoint == nil) {
  280. // Removed view, so it's safe to just ignore this update
  281. return @[];
  282. } else {
  283. // This could be for any "complete" (unfiltered) view, and if there is
  284. // more than one complete view, they should each have the same cache so
  285. // it doesn't matter which one we use.
  286. FView *view = [syncPoint completeView];
  287. if (view != nil) {
  288. id<FNode> serverNode = [view serverCache];
  289. for (FRangeMerge *merge in ranges) {
  290. serverNode = [merge applyToNode:serverNode];
  291. }
  292. return [self applyServerOverwriteAtPath:path newData:serverNode];
  293. } else {
  294. // There doesn't exist a view for this update, so it was removed and
  295. // it's safe to just ignore this range merge
  296. return @[];
  297. }
  298. }
  299. }
  300. /**
  301. * Apply a listen complete to a path
  302. * @return NSArray of FEvent to raise.
  303. */
  304. - (NSArray *)applyListenCompleteAtPath:(FPath *)path {
  305. [self.persistenceManager
  306. setQueryComplete:[FQuerySpec defaultQueryAtPath:path]];
  307. id<FOperation> operation = [[FListenComplete alloc]
  308. initWithSource:[FOperationSource serverInstance]
  309. path:path];
  310. return [self applyOperationToSyncPoints:operation];
  311. }
  312. /**
  313. * Apply a listen complete to a path
  314. * @return NSArray of FEvent to raise.
  315. */
  316. - (NSArray *)applyTaggedListenCompleteAtPath:(FPath *)path
  317. tagId:(NSNumber *)tagId {
  318. FQuerySpec *query = [self queryForTag:tagId];
  319. if (query != nil) {
  320. [self.persistenceManager setQueryComplete:query];
  321. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  322. id<FOperation> op = [[FListenComplete alloc]
  323. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  324. path:relativePath];
  325. return [self applyTaggedOperation:op atPath:query.path];
  326. } else {
  327. // We've already removed the query. No big deal, ignore the update.
  328. return @[];
  329. }
  330. }
  331. /**
  332. * Internal helper method to apply tagged operation
  333. */
  334. - (NSArray *)applyTaggedOperation:(id<FOperation>)operation
  335. atPath:(FPath *)path {
  336. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  337. NSAssert(syncPoint != nil,
  338. @"Missing sync point for query tag that we're tracking.");
  339. FWriteTreeRef *writesCache =
  340. [self.pendingWriteTree childWritesForPath:path];
  341. return [syncPoint applyOperation:operation
  342. writesCache:writesCache
  343. serverCache:nil];
  344. }
  345. /**
  346. * Apply new server data for the specified tagged query
  347. * @return NSArray of FEvent to raise.
  348. */
  349. - (NSArray *)applyTaggedQueryOverwriteAtPath:(FPath *)path
  350. newData:(id<FNode>)newData
  351. tagId:(NSNumber *)tagId {
  352. FQuerySpec *query = [self queryForTag:tagId];
  353. if (query != nil) {
  354. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  355. FQuerySpec *queryToOverwrite =
  356. relativePath.isEmpty ? query : [FQuerySpec defaultQueryAtPath:path];
  357. [self.persistenceManager updateServerCacheWithNode:newData
  358. forQuery:queryToOverwrite];
  359. FOverwrite *operation = [[FOverwrite alloc]
  360. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  361. path:relativePath
  362. snap:newData];
  363. return [self applyTaggedOperation:operation atPath:query.path];
  364. } else {
  365. // Query must have been removed already
  366. return @[];
  367. }
  368. }
  369. /**
  370. * Apply server data to be merged in for the specified tagged query
  371. * @return NSArray of FEvent to raise.
  372. */
  373. - (NSArray *)applyTaggedQueryMergeAtPath:(FPath *)path
  374. changedChildren:(FCompoundWrite *)changedChildren
  375. tagId:(NSNumber *)tagId {
  376. FQuerySpec *query = [self queryForTag:tagId];
  377. if (query != nil) {
  378. FPath *relativePath = [FPath relativePathFrom:query.path to:path];
  379. [self.persistenceManager updateServerCacheWithMerge:changedChildren
  380. atPath:path];
  381. FMerge *operation = [[FMerge alloc]
  382. initWithSource:[FOperationSource forServerTaggedQuery:query.params]
  383. path:relativePath
  384. children:changedChildren];
  385. return [self applyTaggedOperation:operation atPath:query.path];
  386. } else {
  387. // We've already removed the query. No big deal, ignore the update.
  388. return @[];
  389. }
  390. }
  391. - (NSArray *)applyTaggedServerRangeMergeAtPath:(FPath *)path
  392. updates:(NSArray *)ranges
  393. tagId:(NSNumber *)tagId {
  394. FQuerySpec *query = [self queryForTag:tagId];
  395. if (query != nil) {
  396. NSAssert([path isEqual:query.path],
  397. @"Tagged update path and query path must match");
  398. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  399. NSAssert(syncPoint != nil,
  400. @"Missing sync point for query tag that we're tracking.");
  401. FView *view = [syncPoint viewForQuery:query];
  402. NSAssert(view != nil,
  403. @"Missing view for query tag that we're tracking");
  404. id<FNode> serverNode = [view serverCache];
  405. for (FRangeMerge *merge in ranges) {
  406. serverNode = [merge applyToNode:serverNode];
  407. }
  408. return [self applyTaggedQueryOverwriteAtPath:path
  409. newData:serverNode
  410. tagId:tagId];
  411. } else {
  412. // We've already removed the query. No big deal, ignore the update.
  413. return @[];
  414. }
  415. }
  416. /**
  417. * Add an event callback for the specified query
  418. * @return NSArray of FEvent to raise.
  419. */
  420. - (NSArray *)addEventRegistration:(id<FEventRegistration>)eventRegistration
  421. forQuery:(FQuerySpec *)query {
  422. FPath *path = query.path;
  423. __block BOOL foundAncestorDefaultView = NO;
  424. [self.syncPointTree
  425. forEachOnPath:query.path
  426. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  427. foundAncestorDefaultView =
  428. foundAncestorDefaultView || [syncPoint hasCompleteView];
  429. return !foundAncestorDefaultView;
  430. }];
  431. [self.persistenceManager setQueryActive:query];
  432. FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
  433. if (syncPoint == nil) {
  434. syncPoint = [[FSyncPoint alloc]
  435. initWithPersistenceManager:self.persistenceManager];
  436. self.syncPointTree = [self.syncPointTree setValue:syncPoint
  437. atPath:path];
  438. }
  439. BOOL viewAlreadyExists = [syncPoint viewExistsForQuery:query];
  440. NSArray *events;
  441. if (viewAlreadyExists) {
  442. events = [syncPoint addEventRegistration:eventRegistration
  443. forExistingViewForQuery:query];
  444. } else {
  445. if (![query loadsAllData]) {
  446. // We need to track a tag for this query
  447. NSAssert(self.queryToTagMap[query] == nil,
  448. @"View does not exist, but we have a tag");
  449. NSNumber *tagId = [self.queryTagCounter getAndIncrement];
  450. self.queryToTagMap[query] = tagId;
  451. self.tagToQueryMap[tagId] = query;
  452. }
  453. FWriteTreeRef *writesCache =
  454. [self.pendingWriteTree childWritesForPath:path];
  455. FCacheNode *serverCache = [self serverCacheForQuery:query];
  456. events = [syncPoint addEventRegistration:eventRegistration
  457. forNonExistingViewForQuery:query
  458. writesCache:writesCache
  459. serverCache:serverCache];
  460. // There was no view and no default listen
  461. if (!foundAncestorDefaultView) {
  462. FView *view = [syncPoint viewForQuery:query];
  463. NSMutableArray *mutableEvents = [events mutableCopy];
  464. [mutableEvents
  465. addObjectsFromArray:[self setupListenerOnQuery:query
  466. view:view]];
  467. events = mutableEvents;
  468. }
  469. }
  470. return events;
  471. }
  472. - (FCacheNode *)serverCacheForQuery:(FQuerySpec *)query {
  473. __block id<FNode> serverCacheNode = nil;
  474. [self.syncPointTree
  475. forEachOnPath:query.path
  476. whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
  477. FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint
  478. to:query.path];
  479. serverCacheNode =
  480. [syncPoint completeServerCacheAtPath:relativePath];
  481. return serverCacheNode == nil;
  482. }];
  483. FCacheNode *serverCache;
  484. if (serverCacheNode != nil) {
  485. FIndexedNode *indexed =
  486. [FIndexedNode indexedNodeWithNode:serverCacheNode
  487. index:query.index];
  488. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
  489. isFullyInitialized:YES
  490. isFiltered:NO];
  491. } else {
  492. FCacheNode *persistenceServerCache =
  493. [self.persistenceManager serverCacheForQuery:query];
  494. if (persistenceServerCache.isFullyInitialized) {
  495. serverCache = persistenceServerCache;
  496. } else {
  497. serverCacheNode = [FEmptyNode emptyNode];
  498. FImmutableTree *subtree =
  499. [self.syncPointTree subtreeAtPath:query.path];
  500. [subtree
  501. forEachChild:^(NSString *childKey, FSyncPoint *childSyncPoint) {
  502. id<FNode> completeCache =
  503. [childSyncPoint completeServerCacheAtPath:[FPath empty]];
  504. if (completeCache) {
  505. serverCacheNode =
  506. [serverCacheNode updateImmediateChild:childKey
  507. withNewChild:completeCache];
  508. }
  509. }];
  510. // Fill the node with any available children we have
  511. [persistenceServerCache.node
  512. enumerateChildrenUsingBlock:^(NSString *key, id<FNode> node,
  513. BOOL *stop) {
  514. if (![serverCacheNode hasChild:key]) {
  515. serverCacheNode =
  516. [serverCacheNode updateImmediateChild:key
  517. withNewChild:node];
  518. }
  519. }];
  520. FIndexedNode *indexed =
  521. [FIndexedNode indexedNodeWithNode:serverCacheNode
  522. index:query.index];
  523. serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
  524. isFullyInitialized:NO
  525. isFiltered:NO];
  526. }
  527. }
  528. return serverCache;
  529. }
  530. /**
  531. * Remove event callback(s).
  532. *
  533. * If query is the default query, we'll check all queries for the specified
  534. * eventRegistration. If eventRegistration is null, we'll remove all callbacks
  535. * for the specified query/queries.
  536. *
  537. * @param eventRegistration if nil, all callbacks are removed
  538. * @param cancelError If provided, appropriate cancel events will be returned
  539. * @return NSArray of FEvent to raise.
  540. */
  541. - (NSArray *)removeEventRegistration:(id<FEventRegistration>)eventRegistration
  542. forQuery:(FQuerySpec *)query
  543. cancelError:(NSError *)cancelError {
  544. // Find the syncPoint first. Then deal with whether or not it has matching
  545. // listeners
  546. FPath *path = query.path;
  547. FSyncPoint *maybeSyncPoint = [self.syncPointTree valueAtPath:path];
  548. NSArray *cancelEvents = @[];
  549. // A removal on a default query affects all queries at that location. A
  550. // removal on an indexed query, even one without other query constraints,
  551. // does *not* affect all queries at that location. So this check must be for
  552. // 'default', and not loadsAllData:
  553. if (maybeSyncPoint &&
  554. ([query isDefault] || [maybeSyncPoint viewExistsForQuery:query])) {
  555. FTupleRemovedQueriesEvents *removedAndEvents =
  556. [maybeSyncPoint removeEventRegistration:eventRegistration
  557. forQuery:query
  558. cancelError:cancelError];
  559. if ([maybeSyncPoint isEmpty]) {
  560. self.syncPointTree = [self.syncPointTree removeValueAtPath:path];
  561. }
  562. NSArray *removed = removedAndEvents.removedQueries;
  563. cancelEvents = removedAndEvents.cancelEvents;
  564. // We may have just removed one of many listeners and can short-circuit
  565. // this whole process We may also not have removed a default listener,
  566. // in which case all of the descendant listeners should already be
  567. // properly set up.
  568. //
  569. // Since indexed queries can shadow if they don't have other query
  570. // constraints, check for loadsAllData: instead of isDefault:
  571. NSUInteger defaultQueryIndex = [removed
  572. indexOfObjectPassingTest:^BOOL(FQuerySpec *q, NSUInteger idx,
  573. BOOL *stop) {
  574. return [q loadsAllData];
  575. }];
  576. BOOL removingDefault = defaultQueryIndex != NSNotFound;
  577. [removed enumerateObjectsUsingBlock:^(FQuerySpec *query, NSUInteger idx,
  578. BOOL *stop) {
  579. [self.persistenceManager setQueryInactive:query];
  580. }];
  581. NSNumber *covered = [self.syncPointTree
  582. findOnPath:path
  583. andApplyBlock:^id(FPath *relativePath,
  584. FSyncPoint *parentSyncPoint) {
  585. return
  586. [NSNumber numberWithBool:[parentSyncPoint hasCompleteView]];
  587. }];
  588. if (removingDefault && ![covered boolValue]) {
  589. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  590. // There are potentially child listeners. Determine what if any
  591. // listens we need to send before executing the removal
  592. if (![subtree isEmpty]) {
  593. // We need to fold over our subtree and collect the listeners to
  594. // send
  595. NSArray *newViews =
  596. [self collectDistinctViewsForSubTree:subtree];
  597. // Ok, we've collected all the listens we need. Set them up.
  598. [newViews enumerateObjectsUsingBlock:^(
  599. FView *view, NSUInteger idx, BOOL *stop) {
  600. FQuerySpec *newQuery = view.query;
  601. FListenContainer *listenContainer =
  602. [self createListenerForView:view];
  603. self.listenProvider.startListening(
  604. [self queryForListening:newQuery],
  605. [self tagForQuery:newQuery], listenContainer,
  606. listenContainer.onComplete);
  607. }];
  608. } else {
  609. // There's nothing below us, so nothing we need to start
  610. // listening on
  611. }
  612. }
  613. // If we removed anything and we're not covered by a higher up listen,
  614. // we need to stop listening on this query. The above block has us
  615. // covered in terms of making sure we're set up on listens lower in the
  616. // tree. Also, note that if we have a cancelError, it's already been
  617. // removed at the provider level.
  618. if (![covered boolValue] && [removed count] > 0 && cancelError == nil) {
  619. // If we removed a default, then we weren't listening on any of the
  620. // other queries here. Just cancel the one default. Otherwise, we
  621. // need to iterate through and cancel each individual query
  622. if (removingDefault) {
  623. // We don't tag default listeners
  624. self.listenProvider.stopListening(
  625. [self queryForListening:query], nil);
  626. } else {
  627. [removed
  628. enumerateObjectsUsingBlock:^(FQuerySpec *queryToRemove,
  629. NSUInteger idx, BOOL *stop) {
  630. NSNumber *tagToRemove =
  631. [self.queryToTagMap objectForKey:queryToRemove];
  632. self.listenProvider.stopListening(
  633. [self queryForListening:queryToRemove], tagToRemove);
  634. }];
  635. }
  636. }
  637. // Now, clear all the tags we're tracking for the removed listens.
  638. [self removeTags:removed];
  639. } else {
  640. // No-op, this listener must've been already removed
  641. }
  642. return cancelEvents;
  643. }
  644. - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)keepSynced {
  645. // Only do something if we actually need to add/remove an event registration
  646. if (keepSynced && ![self.keepSyncedQueries containsObject:query]) {
  647. [self addEventRegistration:[FKeepSyncedEventRegistration instance]
  648. forQuery:query];
  649. [self.keepSyncedQueries addObject:query];
  650. } else if (!keepSynced && [self.keepSyncedQueries containsObject:query]) {
  651. [self removeEventRegistration:[FKeepSyncedEventRegistration instance]
  652. forQuery:query
  653. cancelError:nil];
  654. [self.keepSyncedQueries removeObject:query];
  655. }
  656. }
  657. - (NSArray *)removeAllWrites {
  658. [self.persistenceManager removeAllUserWrites];
  659. NSArray *removedWrites = [self.pendingWriteTree removeAllWrites];
  660. if (removedWrites.count > 0) {
  661. FImmutableTree *affectedTree =
  662. [[FImmutableTree empty] setValue:@YES atPath:[FPath empty]];
  663. return [self applyOperationToSyncPoints:[[FAckUserWrite alloc]
  664. initWithPath:[FPath empty]
  665. affectedTree:affectedTree
  666. revert:YES]];
  667. } else {
  668. return @[];
  669. }
  670. }
  671. /** Returns a non-empty cache node if one exists. Otherwise returns null. */
  672. - (FIndexedNode *)persistenceServerCache:(FQuerySpec *)querySpec {
  673. FCacheNode *cacheNode =
  674. [self.persistenceManager serverCacheForQuery:querySpec];
  675. if (cacheNode == nil || cacheNode.node.isEmpty) {
  676. return nil;
  677. }
  678. return cacheNode.node;
  679. }
  680. /**
  681. * Returns a complete cache, if we have one, of the data at a particular path.
  682. * The location must have a listener above it, but as this is only used by
  683. * transaction code, that should always be the case anyways.
  684. *
  685. * Note: this method will *include* hidden writes from transaction with
  686. * applyLocally set to false.
  687. * @param path The path to the data we want
  688. * @param writeIdsToExclude A specific set to be excluded
  689. */
  690. - (id<FNode>)calcCompleteEventCacheAtPath:(FPath *)path
  691. excludeWriteIds:(NSArray *)writeIdsToExclude {
  692. BOOL includeHiddenSets = YES;
  693. FWriteTree *writeTree = self.pendingWriteTree;
  694. id<FNode> serverCache = [self.syncPointTree
  695. findOnPath:path
  696. andApplyBlock:^id<FNode>(FPath *pathSoFar, FSyncPoint *syncPoint) {
  697. FPath *relativePath = [FPath relativePathFrom:pathSoFar to:path];
  698. id<FNode> serverCache =
  699. [syncPoint completeServerCacheAtPath:relativePath];
  700. if (serverCache) {
  701. return serverCache;
  702. } else {
  703. return nil;
  704. }
  705. }];
  706. return [writeTree calculateCompleteEventCacheAtPath:path
  707. completeServerCache:serverCache
  708. excludeWriteIds:writeIdsToExclude
  709. includeHiddenWrites:includeHiddenSets];
  710. }
  711. #pragma mark -
  712. #pragma mark Private Methods
  713. /**
  714. * This collapses multiple unfiltered views into a single view, since we only
  715. * need a single listener for them.
  716. * @return NSArray of FView
  717. */
  718. - (NSArray *)collectDistinctViewsForSubTree:(FImmutableTree *)subtree {
  719. return [subtree foldWithBlock:^NSArray *(FPath *relativePath,
  720. FSyncPoint *maybeChildSyncPoint,
  721. NSDictionary *childMap) {
  722. if (maybeChildSyncPoint && [maybeChildSyncPoint hasCompleteView]) {
  723. FView *completeView = [maybeChildSyncPoint completeView];
  724. return @[ completeView ];
  725. } else {
  726. // No complete view here, flatten any deeper listens into an array
  727. NSMutableArray *views = [[NSMutableArray alloc] init];
  728. if (maybeChildSyncPoint) {
  729. views = [[maybeChildSyncPoint queryViews] mutableCopy];
  730. }
  731. [childMap enumerateKeysAndObjectsUsingBlock:^(
  732. NSString *childKey, NSArray *childViews, BOOL *stop) {
  733. [views addObjectsFromArray:childViews];
  734. }];
  735. return views;
  736. }
  737. }];
  738. }
  739. /**
  740. * @param queries NSArray of FQuerySpec
  741. */
  742. - (void)removeTags:(NSArray *)queries {
  743. [queries enumerateObjectsUsingBlock:^(FQuerySpec *removedQuery,
  744. NSUInteger idx, BOOL *stop) {
  745. if (![removedQuery loadsAllData]) {
  746. // We should have a tag for this
  747. NSNumber *removedQueryTag = self.queryToTagMap[removedQuery];
  748. [self.queryToTagMap removeObjectForKey:removedQuery];
  749. [self.tagToQueryMap removeObjectForKey:removedQueryTag];
  750. }
  751. }];
  752. }
  753. - (FQuerySpec *)queryForListening:(FQuerySpec *)query {
  754. if (query.loadsAllData && !query.isDefault) {
  755. // We treat queries that load all data as default queries
  756. return [FQuerySpec defaultQueryAtPath:query.path];
  757. } else {
  758. return query;
  759. }
  760. }
  761. /**
  762. * For a given new listen, manage the de-duplication of outstanding
  763. * subscriptions.
  764. * @return NSArray of FEvent events to support synchronous data sources
  765. */
  766. - (NSArray *)setupListenerOnQuery:(FQuerySpec *)query view:(FView *)view {
  767. FPath *path = query.path;
  768. NSNumber *tagId = [self tagForQuery:query];
  769. FListenContainer *listenContainer = [self createListenerForView:view];
  770. NSArray *events = self.listenProvider.startListening(
  771. [self queryForListening:query], tagId, listenContainer,
  772. listenContainer.onComplete);
  773. FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
  774. // The root of this subtree has our query. We're here because we definitely
  775. // need to send a listen for that, but we may need to shadow other listens
  776. // as well.
  777. if (tagId != nil) {
  778. NSAssert(![subtree.value hasCompleteView],
  779. @"If we're adding a query, it shouldn't be shadowed");
  780. } else {
  781. // Shadow everything at or below this location, this is a default
  782. // listener.
  783. NSArray *queriesToStop =
  784. [subtree foldWithBlock:^id(FPath *relativePath,
  785. FSyncPoint *maybeChildSyncPoint,
  786. NSDictionary *childMap) {
  787. if (![relativePath isEmpty] && maybeChildSyncPoint != nil &&
  788. [maybeChildSyncPoint hasCompleteView]) {
  789. return @[ [maybeChildSyncPoint completeView].query ];
  790. } else {
  791. // No default listener here, flatten any deeper queries into
  792. // an array
  793. NSMutableArray *queries = [[NSMutableArray alloc] init];
  794. if (maybeChildSyncPoint != nil) {
  795. for (FView *view in [maybeChildSyncPoint queryViews]) {
  796. [queries addObject:view.query];
  797. }
  798. }
  799. [childMap
  800. enumerateKeysAndObjectsUsingBlock:^(
  801. NSString *key, NSArray *childQueries, BOOL *stop) {
  802. [queries addObjectsFromArray:childQueries];
  803. }];
  804. return queries;
  805. }
  806. }];
  807. for (FQuerySpec *queryToStop in queriesToStop) {
  808. self.listenProvider.stopListening(
  809. [self queryForListening:queryToStop],
  810. [self tagForQuery:queryToStop]);
  811. }
  812. }
  813. return events;
  814. }
  815. - (FListenContainer *)createListenerForView:(FView *)view {
  816. FQuerySpec *query = view.query;
  817. NSNumber *tagId = [self tagForQuery:query];
  818. FListenContainer *listenContainer = [[FListenContainer alloc]
  819. initWithView:view
  820. onComplete:^(NSString *status) {
  821. if ([status isEqualToString:@"ok"]) {
  822. if (tagId != nil) {
  823. return [self applyTaggedListenCompleteAtPath:query.path
  824. tagId:tagId];
  825. } else {
  826. return [self applyListenCompleteAtPath:query.path];
  827. }
  828. } else {
  829. // If a listen failed, kill all of the listeners here, not just
  830. // the one that triggered the error. Note that this may need to
  831. // be scoped to just this listener if we change permissions on
  832. // filtered children
  833. NSError *error = [FUtilities errorForStatus:status
  834. andReason:nil];
  835. FFWarn(@"I-RDB038012", @"Listener at %@ failed: %@", query.path,
  836. status);
  837. return [self removeEventRegistration:nil
  838. forQuery:query
  839. cancelError:error];
  840. }
  841. }];
  842. return listenContainer;
  843. }
  844. /**
  845. * @return The query associated with the given tag, if we have one
  846. */
  847. - (FQuerySpec *)queryForTag:(NSNumber *)tagId {
  848. return self.tagToQueryMap[tagId];
  849. }
  850. /**
  851. * @return The tag associated with the given query
  852. */
  853. - (NSNumber *)tagForQuery:(FQuerySpec *)query {
  854. return self.queryToTagMap[query];
  855. }
  856. #pragma mark -
  857. #pragma mark applyOperation Helpers
  858. /**
  859. * A helper method that visits all descendant and ancestor SyncPoints, applying
  860. the operation.
  861. *
  862. * NOTES:
  863. * - Descendant SyncPoints will be visited first (since we raise events
  864. depth-first).
  865. * - We call applyOperation: on each SyncPoint passing three things:
  866. * 1. A version of the Operation that has been made relative to the SyncPoint
  867. location.
  868. * 2. A WriteTreeRef of any writes we have cached at the SyncPoint location.
  869. * 3. A snapshot Node with cached server data, if we have it.
  870. * - We concatenate all of the events returned by each SyncPoint and return the
  871. result.
  872. *
  873. * @return Array of FEvent
  874. */
  875. - (NSArray *)applyOperationToSyncPoints:(id<FOperation>)operation {
  876. return [self applyOperationHelper:operation
  877. syncPointTree:self.syncPointTree
  878. serverCache:nil
  879. writesCache:[self.pendingWriteTree
  880. childWritesForPath:[FPath empty]]];
  881. }
  882. /**
  883. * Recursive helper for applyOperationToSyncPoints_
  884. */
  885. - (NSArray *)applyOperationHelper:(id<FOperation>)operation
  886. syncPointTree:(FImmutableTree *)syncPointTree
  887. serverCache:(id<FNode>)serverCache
  888. writesCache:(FWriteTreeRef *)writesCache {
  889. if ([operation.path isEmpty]) {
  890. return [self applyOperationDescendantsHelper:operation
  891. syncPointTree:syncPointTree
  892. serverCache:serverCache
  893. writesCache:writesCache];
  894. } else {
  895. FSyncPoint *syncPoint = syncPointTree.value;
  896. // If we don't have cached server data, see if we can get it from this
  897. // SyncPoint
  898. if (serverCache == nil && syncPoint != nil) {
  899. serverCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
  900. }
  901. NSMutableArray *events = [[NSMutableArray alloc] init];
  902. NSString *childKey = [operation.path getFront];
  903. id<FOperation> childOperation = [operation operationForChild:childKey];
  904. FImmutableTree *childTree = [syncPointTree.children get:childKey];
  905. if (childTree != nil && childOperation != nil) {
  906. id<FNode> childServerCache =
  907. serverCache ? [serverCache getImmediateChild:childKey] : nil;
  908. FWriteTreeRef *childWritesCache =
  909. [writesCache childWriteTreeRef:childKey];
  910. [events
  911. addObjectsFromArray:[self
  912. applyOperationHelper:childOperation
  913. syncPointTree:childTree
  914. serverCache:childServerCache
  915. writesCache:childWritesCache]];
  916. }
  917. if (syncPoint) {
  918. [events addObjectsFromArray:[syncPoint applyOperation:operation
  919. writesCache:writesCache
  920. serverCache:serverCache]];
  921. }
  922. return events;
  923. }
  924. }
  925. /**
  926. * Recursive helper for applyOperationToSyncPoints:
  927. */
  928. - (NSArray *)applyOperationDescendantsHelper:(id<FOperation>)operation
  929. syncPointTree:(FImmutableTree *)syncPointTree
  930. serverCache:(id<FNode>)serverCache
  931. writesCache:(FWriteTreeRef *)writesCache {
  932. FSyncPoint *syncPoint = syncPointTree.value;
  933. // If we don't have cached server data, see if we can get it from this
  934. // SyncPoint
  935. id<FNode> resolvedServerCache;
  936. if (serverCache == nil & syncPoint != nil) {
  937. resolvedServerCache =
  938. [syncPoint completeServerCacheAtPath:[FPath empty]];
  939. } else {
  940. resolvedServerCache = serverCache;
  941. }
  942. NSMutableArray *events = [[NSMutableArray alloc] init];
  943. [syncPointTree.children enumerateKeysAndObjectsUsingBlock:^(
  944. NSString *childKey, FImmutableTree *childTree,
  945. BOOL *stop) {
  946. id<FNode> childServerCache = nil;
  947. if (resolvedServerCache != nil) {
  948. childServerCache = [resolvedServerCache getImmediateChild:childKey];
  949. }
  950. FWriteTreeRef *childWritesCache =
  951. [writesCache childWriteTreeRef:childKey];
  952. id<FOperation> childOperation = [operation operationForChild:childKey];
  953. if (childOperation != nil) {
  954. [events addObjectsFromArray:
  955. [self applyOperationDescendantsHelper:childOperation
  956. syncPointTree:childTree
  957. serverCache:childServerCache
  958. writesCache:childWritesCache]];
  959. }
  960. }];
  961. if (syncPoint) {
  962. [events
  963. addObjectsFromArray:[syncPoint applyOperation:operation
  964. writesCache:writesCache
  965. serverCache:resolvedServerCache]];
  966. }
  967. return events;
  968. }
  969. @end