| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093 |
- /*
- * Copyright 2017 Google
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #import "FirebaseDatabase/Sources/Core/FSyncTree.h"
- #import "FirebaseCore/Sources/Private/FirebaseCoreInternal.h"
- #import "FirebaseDatabase/Sources/Core/FCompoundHash.h"
- #import "FirebaseDatabase/Sources/Core/FListenProvider.h"
- #import "FirebaseDatabase/Sources/Core/FQueryParams.h"
- #import "FirebaseDatabase/Sources/Core/FQuerySpec.h"
- #import "FirebaseDatabase/Sources/Core/FRangeMerge.h"
- #import "FirebaseDatabase/Sources/Core/FServerValues.h"
- #import "FirebaseDatabase/Sources/Core/FSnapshotHolder.h"
- #import "FirebaseDatabase/Sources/Core/FSyncPoint.h"
- #import "FirebaseDatabase/Sources/Core/FWriteRecord.h"
- #import "FirebaseDatabase/Sources/Core/FWriteTree.h"
- #import "FirebaseDatabase/Sources/Core/FWriteTreeRef.h"
- #import "FirebaseDatabase/Sources/Core/Operation/FAckUserWrite.h"
- #import "FirebaseDatabase/Sources/Core/Operation/FMerge.h"
- #import "FirebaseDatabase/Sources/Core/Operation/FOperation.h"
- #import "FirebaseDatabase/Sources/Core/Operation/FOperationSource.h"
- #import "FirebaseDatabase/Sources/Core/Operation/FOverwrite.h"
- #import "FirebaseDatabase/Sources/Core/Utilities/FImmutableTree.h"
- #import "FirebaseDatabase/Sources/Core/Utilities/FPath.h"
- #import "FirebaseDatabase/Sources/Core/View/FCacheNode.h"
- #import "FirebaseDatabase/Sources/Core/View/FEventRaiser.h"
- #import "FirebaseDatabase/Sources/Core/View/FEventRegistration.h"
- #import "FirebaseDatabase/Sources/Core/View/FKeepSyncedEventRegistration.h"
- #import "FirebaseDatabase/Sources/Core/View/FView.h"
- #import "FirebaseDatabase/Sources/FListenComplete.h"
- #import "FirebaseDatabase/Sources/Persistence/FPersistenceManager.h"
- #import "FirebaseDatabase/Sources/Snapshot/FChildrenNode.h"
- #import "FirebaseDatabase/Sources/Snapshot/FCompoundWrite.h"
- #import "FirebaseDatabase/Sources/Snapshot/FEmptyNode.h"
- #import "FirebaseDatabase/Sources/Snapshot/FNode.h"
- #import "FirebaseDatabase/Sources/Snapshot/FSnapshotUtilities.h"
- #import "FirebaseDatabase/Sources/Utilities/FAtomicNumber.h"
- #import "FirebaseDatabase/Sources/Utilities/FUtilities.h"
- #import "FirebaseDatabase/Sources/Utilities/Tuples/FTupleRemovedQueriesEvents.h"
- // Size after which we start including the compound hash
- static const NSUInteger kFSizeThresholdForCompoundHash = 1024;
- @interface FListenContainer : NSObject <FSyncTreeHash>
- @property(nonatomic, strong) FView *view;
- @property(nonatomic, copy) fbt_nsarray_nsstring onComplete;
- @end
- @implementation FListenContainer
- - (instancetype)initWithView:(FView *)view
- onComplete:(fbt_nsarray_nsstring)onComplete {
- self = [super init];
- if (self != nil) {
- self->_view = view;
- self->_onComplete = onComplete;
- }
- return self;
- }
- - (id<FNode>)serverCache {
- return self.view.serverCache;
- }
- - (FCompoundHash *)compoundHash {
- return [FCompoundHash fromNode:[self serverCache]];
- }
- - (NSString *)simpleHash {
- return [[self serverCache] dataHash];
- }
- - (BOOL)includeCompoundHash {
- return [FSnapshotUtilities estimateSerializedNodeSize:[self serverCache]] >
- kFSizeThresholdForCompoundHash;
- }
- @end
- @interface FSyncTree ()
- /**
- * Tree of SyncPoints. There's a SyncPoint at any location that has 1 or more
- * views.
- */
- @property(nonatomic, strong) FImmutableTree *syncPointTree;
- /**
- * A tree of all pending user writes (user-initiated set, transactions, updates,
- * etc)
- */
- @property(nonatomic, strong) FWriteTree *pendingWriteTree;
- /**
- * Maps tagId -> FTuplePathQueryParams
- */
- @property(nonatomic, strong) NSMutableDictionary *tagToQueryMap;
- @property(nonatomic, strong) NSMutableDictionary *queryToTagMap;
- @property(nonatomic, strong) FListenProvider *listenProvider;
- @property(nonatomic, strong) FPersistenceManager *persistenceManager;
- @property(nonatomic, strong) FAtomicNumber *queryTagCounter;
- @property(nonatomic, strong) NSMutableSet *keepSyncedQueries;
- @end
- /**
- * SyncTree is the central class for managing event callback registration, data
- * caching, views (query processing), and event generation. There are typically
- * two SyncTree instances for each Repo, one for the normal Firebase data, and
- * one for the .info data.
- *
- * It has a number of responsibilities, including:
- * - Tracking all user event callbacks (registered via addEventRegistration:
- * and removeEventRegistration:).
- * - Applying and caching data changes for user setValue:,
- * runTransactionBlock:, and updateChildValues: calls
- * (applyUserOverwriteAtPath:, applyUserMergeAtPath:).
- * - Applying and caching data changes for server data changes
- * (applyServerOverwriteAtPath:, applyServerMergeAtPath:).
- * - Generating user-facing events for server and user changes (all of the
- * apply* methods return the set of events that need to be raised as a result).
- * - Maintaining the appropriate set of server listens to ensure we are always
- * subscribed to the correct set of paths and queries to satisfy the current set
- * of user event callbacks (listens are started/stopped using the provided
- * listenProvider).
- *
- * NOTE: Although SyncTree tracks event callbacks and calculates events to
- * raise, the actual events are returned to the caller rather than raised
- * synchronously.
- */
- @implementation FSyncTree
- - (id)initWithListenProvider:(FListenProvider *)provider {
- return [self initWithPersistenceManager:nil listenProvider:provider];
- }
- - (id)initWithPersistenceManager:(FPersistenceManager *)persistenceManager
- listenProvider:(FListenProvider *)provider {
- self = [super init];
- if (self) {
- self.syncPointTree = [FImmutableTree empty];
- self.pendingWriteTree = [[FWriteTree alloc] init];
- self.tagToQueryMap = [[NSMutableDictionary alloc] init];
- self.queryToTagMap = [[NSMutableDictionary alloc] init];
- self.listenProvider = provider;
- self.persistenceManager = persistenceManager;
- self.queryTagCounter = [[FAtomicNumber alloc] init];
- self.keepSyncedQueries = [NSMutableSet set];
- }
- return self;
- }
- #pragma mark -
- #pragma mark Apply Operations
- /**
- * Apply data changes for a user-generated setValue: runTransactionBlock:
- * updateChildValues:, etc.
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)applyUserOverwriteAtPath:(FPath *)path
- newData:(id<FNode>)newData
- writeId:(NSInteger)writeId
- isVisible:(BOOL)visible {
- // Record pending write
- [self.pendingWriteTree addOverwriteAtPath:path
- newData:newData
- writeId:writeId
- isVisible:visible];
- if (!visible) {
- return @[];
- } else {
- FOverwrite *operation =
- [[FOverwrite alloc] initWithSource:[FOperationSource userInstance]
- path:path
- snap:newData];
- return [self applyOperationToSyncPoints:operation];
- }
- }
- /**
- * Apply the data from a user-generated updateChildValues: call
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)applyUserMergeAtPath:(FPath *)path
- changedChildren:(FCompoundWrite *)changedChildren
- writeId:(NSInteger)writeId {
- // Record pending merge
- [self.pendingWriteTree addMergeAtPath:path
- changedChildren:changedChildren
- writeId:writeId];
- FMerge *operation =
- [[FMerge alloc] initWithSource:[FOperationSource userInstance]
- path:path
- children:changedChildren];
- return [self applyOperationToSyncPoints:operation];
- }
- /**
- * Acknowledge a pending user write that was previously registered with
- * applyUserOverwriteAtPath: or applyUserMergeAtPath:
- * TODO[offline]: Taking a serverClock here is awkward, but server values are
- * awkward. :-(
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)ackUserWriteWithWriteId:(NSInteger)writeId
- revert:(BOOL)revert
- persist:(BOOL)persist
- clock:(id<FClock>)clock {
- FWriteRecord *write = [self.pendingWriteTree writeForId:writeId];
- BOOL needToReevaluate = [self.pendingWriteTree removeWriteId:writeId];
- if (write.visible) {
- if (persist) {
- [self.persistenceManager removeUserWrite:writeId];
- }
- if (!revert) {
- NSDictionary *serverValues =
- [FServerValues generateServerValues:clock];
- if ([write isOverwrite]) {
- id<FNode> resolvedNode =
- [FServerValues resolveDeferredValueSnapshot:write.overwrite
- withSyncTree:self
- atPath:write.path
- serverValues:serverValues];
- [self.persistenceManager applyUserWrite:resolvedNode
- toServerCacheAtPath:write.path];
- } else {
- FCompoundWrite *resolvedMerge = [FServerValues
- resolveDeferredValueCompoundWrite:write.merge
- withSyncTree:self
- atPath:write.path
- serverValues:serverValues];
- [self.persistenceManager applyUserMerge:resolvedMerge
- toServerCacheAtPath:write.path];
- }
- }
- }
- if (!needToReevaluate) {
- return @[];
- } else {
- __block FImmutableTree *affectedTree = [FImmutableTree empty];
- if (write.isOverwrite) {
- affectedTree = [affectedTree setValue:@YES atPath:[FPath empty]];
- } else {
- [write.merge
- enumerateWrites:^(FPath *path, id<FNode> node, BOOL *stop) {
- affectedTree = [affectedTree setValue:@YES atPath:path];
- }];
- }
- FAckUserWrite *operation =
- [[FAckUserWrite alloc] initWithPath:write.path
- affectedTree:affectedTree
- revert:revert];
- return [self applyOperationToSyncPoints:operation];
- }
- }
- /**
- * Apply new server data for the specified path
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)applyServerOverwriteAtPath:(FPath *)path
- newData:(id<FNode>)newData {
- [self.persistenceManager
- updateServerCacheWithNode:newData
- forQuery:[FQuerySpec defaultQueryAtPath:path]];
- FOverwrite *operation =
- [[FOverwrite alloc] initWithSource:[FOperationSource serverInstance]
- path:path
- snap:newData];
- return [self applyOperationToSyncPoints:operation];
- }
- /**
- * Applied new server data to be merged in at the specified path
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)applyServerMergeAtPath:(FPath *)path
- changedChildren:(FCompoundWrite *)changedChildren {
- [self.persistenceManager updateServerCacheWithMerge:changedChildren
- atPath:path];
- FMerge *operation =
- [[FMerge alloc] initWithSource:[FOperationSource serverInstance]
- path:path
- children:changedChildren];
- return [self applyOperationToSyncPoints:operation];
- }
- - (NSArray *)applyServerRangeMergeAtPath:(FPath *)path
- updates:(NSArray *)ranges {
- FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
- if (syncPoint == nil) {
- // Removed view, so it's safe to just ignore this update
- return @[];
- } else {
- // This could be for any "complete" (unfiltered) view, and if there is
- // more than one complete view, they should each have the same cache so
- // it doesn't matter which one we use.
- FView *view = [syncPoint completeView];
- if (view != nil) {
- id<FNode> serverNode = [view serverCache];
- for (FRangeMerge *merge in ranges) {
- serverNode = [merge applyToNode:serverNode];
- }
- return [self applyServerOverwriteAtPath:path newData:serverNode];
- } else {
- // There doesn't exist a view for this update, so it was removed and
- // it's safe to just ignore this range merge
- return @[];
- }
- }
- }
- /**
- * Apply a listen complete to a path
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)applyListenCompleteAtPath:(FPath *)path {
- [self.persistenceManager
- setQueryComplete:[FQuerySpec defaultQueryAtPath:path]];
- id<FOperation> operation = [[FListenComplete alloc]
- initWithSource:[FOperationSource serverInstance]
- path:path];
- return [self applyOperationToSyncPoints:operation];
- }
- /**
- * Apply a listen complete to a path
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)applyTaggedListenCompleteAtPath:(FPath *)path
- tagId:(NSNumber *)tagId {
- FQuerySpec *query = [self queryForTag:tagId];
- if (query != nil) {
- [self.persistenceManager setQueryComplete:query];
- FPath *relativePath = [FPath relativePathFrom:query.path to:path];
- id<FOperation> op = [[FListenComplete alloc]
- initWithSource:[FOperationSource forServerTaggedQuery:query.params]
- path:relativePath];
- return [self applyTaggedOperation:op atPath:query.path];
- } else {
- // We've already removed the query. No big deal, ignore the update.
- return @[];
- }
- }
- /**
- * Internal helper method to apply tagged operation
- */
- - (NSArray *)applyTaggedOperation:(id<FOperation>)operation
- atPath:(FPath *)path {
- FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
- NSAssert(syncPoint != nil,
- @"Missing sync point for query tag that we're tracking.");
- FWriteTreeRef *writesCache =
- [self.pendingWriteTree childWritesForPath:path];
- return [syncPoint applyOperation:operation
- writesCache:writesCache
- serverCache:nil];
- }
- /**
- * Apply new server data for the specified tagged query
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)applyTaggedQueryOverwriteAtPath:(FPath *)path
- newData:(id<FNode>)newData
- tagId:(NSNumber *)tagId {
- FQuerySpec *query = [self queryForTag:tagId];
- if (query != nil) {
- FPath *relativePath = [FPath relativePathFrom:query.path to:path];
- FQuerySpec *queryToOverwrite =
- relativePath.isEmpty ? query : [FQuerySpec defaultQueryAtPath:path];
- [self.persistenceManager updateServerCacheWithNode:newData
- forQuery:queryToOverwrite];
- FOverwrite *operation = [[FOverwrite alloc]
- initWithSource:[FOperationSource forServerTaggedQuery:query.params]
- path:relativePath
- snap:newData];
- return [self applyTaggedOperation:operation atPath:query.path];
- } else {
- // Query must have been removed already
- return @[];
- }
- }
- /**
- * Apply server data to be merged in for the specified tagged query
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)applyTaggedQueryMergeAtPath:(FPath *)path
- changedChildren:(FCompoundWrite *)changedChildren
- tagId:(NSNumber *)tagId {
- FQuerySpec *query = [self queryForTag:tagId];
- if (query != nil) {
- FPath *relativePath = [FPath relativePathFrom:query.path to:path];
- [self.persistenceManager updateServerCacheWithMerge:changedChildren
- atPath:path];
- FMerge *operation = [[FMerge alloc]
- initWithSource:[FOperationSource forServerTaggedQuery:query.params]
- path:relativePath
- children:changedChildren];
- return [self applyTaggedOperation:operation atPath:query.path];
- } else {
- // We've already removed the query. No big deal, ignore the update.
- return @[];
- }
- }
- - (NSArray *)applyTaggedServerRangeMergeAtPath:(FPath *)path
- updates:(NSArray *)ranges
- tagId:(NSNumber *)tagId {
- FQuerySpec *query = [self queryForTag:tagId];
- if (query != nil) {
- NSAssert([path isEqual:query.path],
- @"Tagged update path and query path must match");
- FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
- NSAssert(syncPoint != nil,
- @"Missing sync point for query tag that we're tracking.");
- FView *view = [syncPoint viewForQuery:query];
- NSAssert(view != nil,
- @"Missing view for query tag that we're tracking");
- id<FNode> serverNode = [view serverCache];
- for (FRangeMerge *merge in ranges) {
- serverNode = [merge applyToNode:serverNode];
- }
- return [self applyTaggedQueryOverwriteAtPath:path
- newData:serverNode
- tagId:tagId];
- } else {
- // We've already removed the query. No big deal, ignore the update.
- return @[];
- }
- }
- /**
- * Add an event callback for the specified query
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)addEventRegistration:(id<FEventRegistration>)eventRegistration
- forQuery:(FQuerySpec *)query {
- FPath *path = query.path;
- __block BOOL foundAncestorDefaultView = NO;
- [self.syncPointTree
- forEachOnPath:query.path
- whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
- foundAncestorDefaultView =
- foundAncestorDefaultView || [syncPoint hasCompleteView];
- return !foundAncestorDefaultView;
- }];
- [self.persistenceManager setQueryActive:query];
- FSyncPoint *syncPoint = [self.syncPointTree valueAtPath:path];
- if (syncPoint == nil) {
- syncPoint = [[FSyncPoint alloc]
- initWithPersistenceManager:self.persistenceManager];
- self.syncPointTree = [self.syncPointTree setValue:syncPoint
- atPath:path];
- }
- BOOL viewAlreadyExists = [syncPoint viewExistsForQuery:query];
- NSArray *events;
- if (viewAlreadyExists) {
- events = [syncPoint addEventRegistration:eventRegistration
- forExistingViewForQuery:query];
- } else {
- if (![query loadsAllData]) {
- // We need to track a tag for this query
- NSAssert(self.queryToTagMap[query] == nil,
- @"View does not exist, but we have a tag");
- NSNumber *tagId = [self.queryTagCounter getAndIncrement];
- self.queryToTagMap[query] = tagId;
- self.tagToQueryMap[tagId] = query;
- }
- FWriteTreeRef *writesCache =
- [self.pendingWriteTree childWritesForPath:path];
- FCacheNode *serverCache = [self serverCacheForQuery:query];
- events = [syncPoint addEventRegistration:eventRegistration
- forNonExistingViewForQuery:query
- writesCache:writesCache
- serverCache:serverCache];
- // There was no view and no default listen
- if (!foundAncestorDefaultView) {
- FView *view = [syncPoint viewForQuery:query];
- NSMutableArray *mutableEvents = [events mutableCopy];
- [mutableEvents
- addObjectsFromArray:[self setupListenerOnQuery:query
- view:view]];
- events = mutableEvents;
- }
- }
- return events;
- }
- - (FCacheNode *)serverCacheForQuery:(FQuerySpec *)query {
- __block id<FNode> serverCacheNode = nil;
- [self.syncPointTree
- forEachOnPath:query.path
- whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
- FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint
- to:query.path];
- serverCacheNode =
- [syncPoint completeServerCacheAtPath:relativePath];
- return serverCacheNode == nil;
- }];
- FCacheNode *serverCache;
- if (serverCacheNode != nil) {
- FIndexedNode *indexed =
- [FIndexedNode indexedNodeWithNode:serverCacheNode
- index:query.index];
- serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
- isFullyInitialized:YES
- isFiltered:NO];
- } else {
- FCacheNode *persistenceServerCache =
- [self.persistenceManager serverCacheForQuery:query];
- if (persistenceServerCache.isFullyInitialized) {
- serverCache = persistenceServerCache;
- } else {
- serverCacheNode = [FEmptyNode emptyNode];
- FImmutableTree *subtree =
- [self.syncPointTree subtreeAtPath:query.path];
- [subtree
- forEachChild:^(NSString *childKey, FSyncPoint *childSyncPoint) {
- id<FNode> completeCache =
- [childSyncPoint completeServerCacheAtPath:[FPath empty]];
- if (completeCache) {
- serverCacheNode =
- [serverCacheNode updateImmediateChild:childKey
- withNewChild:completeCache];
- }
- }];
- // Fill the node with any available children we have
- [persistenceServerCache.node
- enumerateChildrenUsingBlock:^(NSString *key, id<FNode> node,
- BOOL *stop) {
- if (![serverCacheNode hasChild:key]) {
- serverCacheNode =
- [serverCacheNode updateImmediateChild:key
- withNewChild:node];
- }
- }];
- FIndexedNode *indexed =
- [FIndexedNode indexedNodeWithNode:serverCacheNode
- index:query.index];
- serverCache = [[FCacheNode alloc] initWithIndexedNode:indexed
- isFullyInitialized:NO
- isFiltered:NO];
- }
- }
- return serverCache;
- }
- /**
- * Remove event callback(s).
- *
- * If query is the default query, we'll check all queries for the specified
- * eventRegistration. If eventRegistration is null, we'll remove all callbacks
- * for the specified query/queries.
- *
- * @param eventRegistration if nil, all callbacks are removed
- * @param cancelError If provided, appropriate cancel events will be returned
- * @return NSArray of FEvent to raise.
- */
- - (NSArray *)removeEventRegistration:(id<FEventRegistration>)eventRegistration
- forQuery:(FQuerySpec *)query
- cancelError:(NSError *)cancelError {
- // Find the syncPoint first. Then deal with whether or not it has matching
- // listeners
- FPath *path = query.path;
- FSyncPoint *maybeSyncPoint = [self.syncPointTree valueAtPath:path];
- NSArray *cancelEvents = @[];
- // A removal on a default query affects all queries at that location. A
- // removal on an indexed query, even one without other query constraints,
- // does *not* affect all queries at that location. So this check must be for
- // 'default', and not loadsAllData:
- if (maybeSyncPoint &&
- ([query isDefault] || [maybeSyncPoint viewExistsForQuery:query])) {
- FTupleRemovedQueriesEvents *removedAndEvents =
- [maybeSyncPoint removeEventRegistration:eventRegistration
- forQuery:query
- cancelError:cancelError];
- if ([maybeSyncPoint isEmpty]) {
- self.syncPointTree = [self.syncPointTree removeValueAtPath:path];
- }
- NSArray *removed = removedAndEvents.removedQueries;
- cancelEvents = removedAndEvents.cancelEvents;
- // We may have just removed one of many listeners and can short-circuit
- // this whole process We may also not have removed a default listener,
- // in which case all of the descendant listeners should already be
- // properly set up.
- //
- // Since indexed queries can shadow if they don't have other query
- // constraints, check for loadsAllData: instead of isDefault:
- NSUInteger defaultQueryIndex = [removed
- indexOfObjectPassingTest:^BOOL(FQuerySpec *q, NSUInteger idx,
- BOOL *stop) {
- return [q loadsAllData];
- }];
- BOOL removingDefault = defaultQueryIndex != NSNotFound;
- [removed enumerateObjectsUsingBlock:^(FQuerySpec *query, NSUInteger idx,
- BOOL *stop) {
- [self.persistenceManager setQueryInactive:query];
- }];
- NSNumber *covered = [self.syncPointTree
- findOnPath:path
- andApplyBlock:^id(FPath *relativePath,
- FSyncPoint *parentSyncPoint) {
- return
- [NSNumber numberWithBool:[parentSyncPoint hasCompleteView]];
- }];
- if (removingDefault && ![covered boolValue]) {
- FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
- // There are potentially child listeners. Determine what if any
- // listens we need to send before executing the removal
- if (![subtree isEmpty]) {
- // We need to fold over our subtree and collect the listeners to
- // send
- NSArray *newViews =
- [self collectDistinctViewsForSubTree:subtree];
- // Ok, we've collected all the listens we need. Set them up.
- [newViews enumerateObjectsUsingBlock:^(
- FView *view, NSUInteger idx, BOOL *stop) {
- FQuerySpec *newQuery = view.query;
- FListenContainer *listenContainer =
- [self createListenerForView:view];
- self.listenProvider.startListening(
- [self queryForListening:newQuery],
- [self tagForQuery:newQuery], listenContainer,
- listenContainer.onComplete);
- }];
- } else {
- // There's nothing below us, so nothing we need to start
- // listening on
- }
- }
- // If we removed anything and we're not covered by a higher up listen,
- // we need to stop listening on this query. The above block has us
- // covered in terms of making sure we're set up on listens lower in the
- // tree. Also, note that if we have a cancelError, it's already been
- // removed at the provider level.
- if (![covered boolValue] && [removed count] > 0 && cancelError == nil) {
- // If we removed a default, then we weren't listening on any of the
- // other queries here. Just cancel the one default. Otherwise, we
- // need to iterate through and cancel each individual query
- if (removingDefault) {
- // We don't tag default listeners
- self.listenProvider.stopListening(
- [self queryForListening:query], nil);
- } else {
- [removed
- enumerateObjectsUsingBlock:^(FQuerySpec *queryToRemove,
- NSUInteger idx, BOOL *stop) {
- NSNumber *tagToRemove =
- [self.queryToTagMap objectForKey:queryToRemove];
- self.listenProvider.stopListening(
- [self queryForListening:queryToRemove], tagToRemove);
- }];
- }
- }
- // Now, clear all the tags we're tracking for the removed listens.
- [self removeTags:removed];
- } else {
- // No-op, this listener must've been already removed
- }
- return cancelEvents;
- }
- - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)keepSynced {
- // Only do something if we actually need to add/remove an event registration
- if (keepSynced && ![self.keepSyncedQueries containsObject:query]) {
- [self addEventRegistration:[FKeepSyncedEventRegistration instance]
- forQuery:query];
- [self.keepSyncedQueries addObject:query];
- } else if (!keepSynced && [self.keepSyncedQueries containsObject:query]) {
- [self removeEventRegistration:[FKeepSyncedEventRegistration instance]
- forQuery:query
- cancelError:nil];
- [self.keepSyncedQueries removeObject:query];
- }
- }
- - (NSArray *)removeAllWrites {
- [self.persistenceManager removeAllUserWrites];
- NSArray *removedWrites = [self.pendingWriteTree removeAllWrites];
- if (removedWrites.count > 0) {
- FImmutableTree *affectedTree =
- [[FImmutableTree empty] setValue:@YES atPath:[FPath empty]];
- return [self applyOperationToSyncPoints:[[FAckUserWrite alloc]
- initWithPath:[FPath empty]
- affectedTree:affectedTree
- revert:YES]];
- } else {
- return @[];
- }
- }
- /** Returns a non-empty cache node if one exists. Otherwise returns null. */
- - (FIndexedNode *)persistenceServerCache:(FQuerySpec *)querySpec {
- FCacheNode *cacheNode =
- [self.persistenceManager serverCacheForQuery:querySpec];
- if (cacheNode == nil || cacheNode.node.isEmpty) {
- return nil;
- }
- return cacheNode.indexedNode;
- }
- - (id<FNode>)getServerValue:(FQuerySpec *)query {
- __block id<FNode> serverCacheNode = nil;
- __block FSyncPoint *targetSyncPoint = nil;
- [self.syncPointTree
- forEachOnPath:query.path
- whileBlock:^BOOL(FPath *pathToSyncPoint, FSyncPoint *syncPoint) {
- FPath *relativePath = [FPath relativePathFrom:pathToSyncPoint
- to:query.path];
- serverCacheNode =
- [syncPoint completeEventCacheAtPath:relativePath];
- targetSyncPoint = syncPoint;
- return serverCacheNode == nil;
- }];
- if (targetSyncPoint == nil) {
- targetSyncPoint = [[FSyncPoint alloc]
- initWithPersistenceManager:self.persistenceManager];
- self.syncPointTree = [self.syncPointTree setValue:targetSyncPoint
- atPath:[query path]];
- } else {
- serverCacheNode =
- serverCacheNode != nil
- ? serverCacheNode
- : [targetSyncPoint completeServerCacheAtPath:[FPath empty]];
- }
- FIndexedNode *indexed = [FIndexedNode
- indexedNodeWithNode:serverCacheNode != nil ? serverCacheNode
- : [FEmptyNode emptyNode]
- index:query.index];
- FCacheNode *serverCache =
- [[FCacheNode alloc] initWithIndexedNode:indexed
- isFullyInitialized:serverCacheNode != nil
- isFiltered:NO];
- FView *view = [targetSyncPoint
- getView:query
- writesCache:[_pendingWriteTree childWritesForPath:[query path]]
- serverCache:serverCache];
- return [view completeEventCache];
- }
- /**
- * Returns a complete cache, if we have one, of the data at a particular path.
- * The location must have a listener above it, but as this is only used by
- * transaction code, that should always be the case anyways.
- *
- * Note: this method will *include* hidden writes from transaction with
- * applyLocally set to false.
- * @param path The path to the data we want
- * @param writeIdsToExclude A specific set to be excluded
- */
- - (id<FNode>)calcCompleteEventCacheAtPath:(FPath *)path
- excludeWriteIds:(NSArray *)writeIdsToExclude {
- BOOL includeHiddenSets = YES;
- FWriteTree *writeTree = self.pendingWriteTree;
- id<FNode> serverCache = [self.syncPointTree
- findOnPath:path
- andApplyBlock:^id<FNode>(FPath *pathSoFar, FSyncPoint *syncPoint) {
- FPath *relativePath = [FPath relativePathFrom:pathSoFar to:path];
- id<FNode> serverCache =
- [syncPoint completeServerCacheAtPath:relativePath];
- if (serverCache) {
- return serverCache;
- } else {
- return nil;
- }
- }];
- return [writeTree calculateCompleteEventCacheAtPath:path
- completeServerCache:serverCache
- excludeWriteIds:writeIdsToExclude
- includeHiddenWrites:includeHiddenSets];
- }
- #pragma mark -
- #pragma mark Private Methods
- /**
- * This collapses multiple unfiltered views into a single view, since we only
- * need a single listener for them.
- * @return NSArray of FView
- */
- - (NSArray *)collectDistinctViewsForSubTree:(FImmutableTree *)subtree {
- return [subtree foldWithBlock:^NSArray *(FPath *relativePath,
- FSyncPoint *maybeChildSyncPoint,
- NSDictionary *childMap) {
- if (maybeChildSyncPoint && [maybeChildSyncPoint hasCompleteView]) {
- FView *completeView = [maybeChildSyncPoint completeView];
- return @[ completeView ];
- } else {
- // No complete view here, flatten any deeper listens into an array
- NSMutableArray *views = [[NSMutableArray alloc] init];
- if (maybeChildSyncPoint) {
- views = [[maybeChildSyncPoint queryViews] mutableCopy];
- }
- [childMap enumerateKeysAndObjectsUsingBlock:^(
- NSString *childKey, NSArray *childViews, BOOL *stop) {
- [views addObjectsFromArray:childViews];
- }];
- return views;
- }
- }];
- }
- /**
- * @param queries NSArray of FQuerySpec
- */
- - (void)removeTags:(NSArray *)queries {
- [queries enumerateObjectsUsingBlock:^(FQuerySpec *removedQuery,
- NSUInteger idx, BOOL *stop) {
- if (![removedQuery loadsAllData]) {
- // We should have a tag for this
- NSNumber *removedQueryTag = self.queryToTagMap[removedQuery];
- [self.queryToTagMap removeObjectForKey:removedQuery];
- [self.tagToQueryMap removeObjectForKey:removedQueryTag];
- }
- }];
- }
- - (FQuerySpec *)queryForListening:(FQuerySpec *)query {
- if (query.loadsAllData && !query.isDefault) {
- // We treat queries that load all data as default queries
- return [FQuerySpec defaultQueryAtPath:query.path];
- } else {
- return query;
- }
- }
- /**
- * For a given new listen, manage the de-duplication of outstanding
- * subscriptions.
- * @return NSArray of FEvent events to support synchronous data sources
- */
- - (NSArray *)setupListenerOnQuery:(FQuerySpec *)query view:(FView *)view {
- FPath *path = query.path;
- NSNumber *tagId = [self tagForQuery:query];
- FListenContainer *listenContainer = [self createListenerForView:view];
- NSArray *events = self.listenProvider.startListening(
- [self queryForListening:query], tagId, listenContainer,
- listenContainer.onComplete);
- FImmutableTree *subtree = [self.syncPointTree subtreeAtPath:path];
- // The root of this subtree has our query. We're here because we definitely
- // need to send a listen for that, but we may need to shadow other listens
- // as well.
- if (tagId != nil) {
- NSAssert(![subtree.value hasCompleteView],
- @"If we're adding a query, it shouldn't be shadowed");
- } else {
- // Shadow everything at or below this location, this is a default
- // listener.
- NSArray *queriesToStop =
- [subtree foldWithBlock:^id(FPath *relativePath,
- FSyncPoint *maybeChildSyncPoint,
- NSDictionary *childMap) {
- if (![relativePath isEmpty] && maybeChildSyncPoint != nil &&
- [maybeChildSyncPoint hasCompleteView]) {
- return @[ [maybeChildSyncPoint completeView].query ];
- } else {
- // No default listener here, flatten any deeper queries into
- // an array
- NSMutableArray *queries = [[NSMutableArray alloc] init];
- if (maybeChildSyncPoint != nil) {
- for (FView *view in [maybeChildSyncPoint queryViews]) {
- [queries addObject:view.query];
- }
- }
- [childMap
- enumerateKeysAndObjectsUsingBlock:^(
- NSString *key, NSArray *childQueries, BOOL *stop) {
- [queries addObjectsFromArray:childQueries];
- }];
- return queries;
- }
- }];
- for (FQuerySpec *queryToStop in queriesToStop) {
- self.listenProvider.stopListening(
- [self queryForListening:queryToStop],
- [self tagForQuery:queryToStop]);
- }
- }
- return events;
- }
- - (FListenContainer *)createListenerForView:(FView *)view {
- FQuerySpec *query = view.query;
- NSNumber *tagId = [self tagForQuery:query];
- FListenContainer *listenContainer = [[FListenContainer alloc]
- initWithView:view
- onComplete:^(NSString *status) {
- if ([status isEqualToString:@"ok"]) {
- if (tagId != nil) {
- return [self applyTaggedListenCompleteAtPath:query.path
- tagId:tagId];
- } else {
- return [self applyListenCompleteAtPath:query.path];
- }
- } else {
- // If a listen failed, kill all of the listeners here, not just
- // the one that triggered the error. Note that this may need to
- // be scoped to just this listener if we change permissions on
- // filtered children
- NSError *error = [FUtilities errorForStatus:status
- andReason:nil];
- FFWarn(@"I-RDB038012", @"Listener at %@ failed: %@", query.path,
- status);
- return [self removeEventRegistration:nil
- forQuery:query
- cancelError:error];
- }
- }];
- return listenContainer;
- }
- /**
- * @return The query associated with the given tag, if we have one
- */
- - (FQuerySpec *)queryForTag:(NSNumber *)tagId {
- return self.tagToQueryMap[tagId];
- }
- /**
- * @return The tag associated with the given query
- */
- - (NSNumber *)tagForQuery:(FQuerySpec *)query {
- return self.queryToTagMap[query];
- }
- #pragma mark -
- #pragma mark applyOperation Helpers
- /**
- * A helper method that visits all descendant and ancestor SyncPoints, applying
- the operation.
- *
- * NOTES:
- * - Descendant SyncPoints will be visited first (since we raise events
- depth-first).
- * - We call applyOperation: on each SyncPoint passing three things:
- * 1. A version of the Operation that has been made relative to the SyncPoint
- location.
- * 2. A WriteTreeRef of any writes we have cached at the SyncPoint location.
- * 3. A snapshot Node with cached server data, if we have it.
- * - We concatenate all of the events returned by each SyncPoint and return the
- result.
- *
- * @return Array of FEvent
- */
- - (NSArray *)applyOperationToSyncPoints:(id<FOperation>)operation {
- return [self applyOperationHelper:operation
- syncPointTree:self.syncPointTree
- serverCache:nil
- writesCache:[self.pendingWriteTree
- childWritesForPath:[FPath empty]]];
- }
- /**
- * Recursive helper for applyOperationToSyncPoints_
- */
- - (NSArray *)applyOperationHelper:(id<FOperation>)operation
- syncPointTree:(FImmutableTree *)syncPointTree
- serverCache:(id<FNode>)serverCache
- writesCache:(FWriteTreeRef *)writesCache {
- if ([operation.path isEmpty]) {
- return [self applyOperationDescendantsHelper:operation
- syncPointTree:syncPointTree
- serverCache:serverCache
- writesCache:writesCache];
- } else {
- FSyncPoint *syncPoint = syncPointTree.value;
- // If we don't have cached server data, see if we can get it from this
- // SyncPoint
- if (serverCache == nil && syncPoint != nil) {
- serverCache = [syncPoint completeServerCacheAtPath:[FPath empty]];
- }
- NSMutableArray *events = [[NSMutableArray alloc] init];
- NSString *childKey = [operation.path getFront];
- id<FOperation> childOperation = [operation operationForChild:childKey];
- FImmutableTree *childTree = [syncPointTree.children get:childKey];
- if (childTree != nil && childOperation != nil) {
- id<FNode> childServerCache =
- serverCache ? [serverCache getImmediateChild:childKey] : nil;
- FWriteTreeRef *childWritesCache =
- [writesCache childWriteTreeRef:childKey];
- [events
- addObjectsFromArray:[self
- applyOperationHelper:childOperation
- syncPointTree:childTree
- serverCache:childServerCache
- writesCache:childWritesCache]];
- }
- if (syncPoint) {
- [events addObjectsFromArray:[syncPoint applyOperation:operation
- writesCache:writesCache
- serverCache:serverCache]];
- }
- return events;
- }
- }
- /**
- * Recursive helper for applyOperationToSyncPoints:
- */
- - (NSArray *)applyOperationDescendantsHelper:(id<FOperation>)operation
- syncPointTree:(FImmutableTree *)syncPointTree
- serverCache:(id<FNode>)serverCache
- writesCache:(FWriteTreeRef *)writesCache {
- FSyncPoint *syncPoint = syncPointTree.value;
- // If we don't have cached server data, see if we can get it from this
- // SyncPoint
- id<FNode> resolvedServerCache;
- if (serverCache == nil & syncPoint != nil) {
- resolvedServerCache =
- [syncPoint completeServerCacheAtPath:[FPath empty]];
- } else {
- resolvedServerCache = serverCache;
- }
- NSMutableArray *events = [[NSMutableArray alloc] init];
- [syncPointTree.children enumerateKeysAndObjectsUsingBlock:^(
- NSString *childKey, FImmutableTree *childTree,
- BOOL *stop) {
- id<FNode> childServerCache = nil;
- if (resolvedServerCache != nil) {
- childServerCache = [resolvedServerCache getImmediateChild:childKey];
- }
- FWriteTreeRef *childWritesCache =
- [writesCache childWriteTreeRef:childKey];
- id<FOperation> childOperation = [operation operationForChild:childKey];
- if (childOperation != nil) {
- [events addObjectsFromArray:
- [self applyOperationDescendantsHelper:childOperation
- syncPointTree:childTree
- serverCache:childServerCache
- writesCache:childWritesCache]];
- }
- }];
- if (syncPoint) {
- [events
- addObjectsFromArray:[syncPoint applyOperation:operation
- writesCache:writesCache
- serverCache:resolvedServerCache]];
- }
- return events;
- }
- @end
|