| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634 |
- /*
- * Copyright 2017 Google
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #import "Firestore/Source/Remote/FSTRemoteEvent.h"
- #include <map>
- #include <set>
- #include <unordered_map>
- #include <utility>
- #include <vector>
- #import "Firestore/Source/Core/FSTQuery.h"
- #import "Firestore/Source/Core/FSTViewSnapshot.h"
- #import "Firestore/Source/Local/FSTQueryData.h"
- #import "Firestore/Source/Model/FSTDocument.h"
- #import "Firestore/Source/Remote/FSTWatchChange.h"
- #import "Firestore/Source/Util/FSTClasses.h"
- #include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
- #include "Firestore/core/src/firebase/firestore/util/hashing.h"
- #include "Firestore/core/src/firebase/firestore/util/log.h"
- using firebase::firestore::model::DocumentKey;
- using firebase::firestore::model::DocumentKeyHash;
- using firebase::firestore::model::DocumentKeySet;
- using firebase::firestore::model::SnapshotVersion;
- using firebase::firestore::model::TargetId;
- using firebase::firestore::util::Hash;
- NS_ASSUME_NONNULL_BEGIN
- #pragma mark - FSTTargetChange
- @implementation FSTTargetChange {
- DocumentKeySet _addedDocuments;
- DocumentKeySet _modifiedDocuments;
- DocumentKeySet _removedDocuments;
- }
- - (instancetype)initWithResumeToken:(NSData *)resumeToken
- current:(BOOL)current
- addedDocuments:(DocumentKeySet)addedDocuments
- modifiedDocuments:(DocumentKeySet)modifiedDocuments
- removedDocuments:(DocumentKeySet)removedDocuments {
- if (self = [super init]) {
- _resumeToken = [resumeToken copy];
- _current = current;
- _addedDocuments = std::move(addedDocuments);
- _modifiedDocuments = std::move(modifiedDocuments);
- _removedDocuments = std::move(removedDocuments);
- }
- return self;
- }
- - (const DocumentKeySet &)addedDocuments {
- return _addedDocuments;
- }
- - (const DocumentKeySet &)modifiedDocuments {
- return _modifiedDocuments;
- }
- - (const DocumentKeySet &)removedDocuments {
- return _removedDocuments;
- }
- - (BOOL)isEqual:(id)other {
- if (other == self) {
- return YES;
- }
- if (![other isMemberOfClass:[FSTTargetChange class]]) {
- return NO;
- }
- return [self current] == [other current] &&
- [[self resumeToken] isEqualToData:[other resumeToken]] &&
- [self addedDocuments] == [other addedDocuments] &&
- [self modifiedDocuments] == [other modifiedDocuments] &&
- [self removedDocuments] == [other removedDocuments];
- }
- @end
- #pragma mark - FSTTargetState
- /** Tracks the internal state of a Watch target. */
- @interface FSTTargetState : NSObject
- /**
- * Whether this target has been marked 'current'.
- *
- * 'Current' has special meaning in the RPC protocol: It implies that the Watch backend has sent us
- * all changes up to the point at which the target was added and that the target is consistent with
- * the rest of the watch stream.
- */
- @property(nonatomic) BOOL current;
- /** The last resume token sent to us for this target. */
- @property(nonatomic, readonly, strong) NSData *resumeToken;
- /** Whether we have modified any state that should trigger a snapshot. */
- @property(nonatomic, readonly) BOOL hasPendingChanges;
- /** Whether this target has pending target adds or target removes. */
- - (BOOL)isPending;
- /**
- * Applies the resume token to the TargetChange, but only when it has a new value. Empty
- * resumeTokens are discarded.
- */
- - (void)updateResumeToken:(NSData *)resumeToken;
- /** Resets the document changes and sets `hasPendingChanges` to false. */
- - (void)clearPendingChanges;
- /**
- * Creates a target change from the current set of changes.
- *
- * To reset the document changes after raising this snapshot, call `clearPendingChanges()`.
- */
- - (FSTTargetChange *)toTargetChange;
- - (void)recordTargetRequest;
- - (void)recordTargetResponse;
- - (void)markCurrent;
- - (void)addDocumentChangeWithType:(FSTDocumentViewChangeType)type
- forKey:(const DocumentKey &)documentKey;
- - (void)removeDocumentChangeForKey:(const DocumentKey &)documentKey;
- @end
- @implementation FSTTargetState {
- /**
- * The number of outstanding responses (adds or removes) that we are waiting on. We only consider
- * targets active that have no outstanding responses.
- */
- int _outstandingResponses;
- /**
- * Keeps track of the document changes since the last raised snapshot.
- *
- * These changes are continuously updated as we receive document updates and always reflect the
- * current set of changes against the last issued snapshot.
- */
- std::unordered_map<DocumentKey, FSTDocumentViewChangeType, DocumentKeyHash> _documentChanges;
- }
- - (instancetype)init {
- if (self = [super init]) {
- _resumeToken = [NSData data];
- _outstandingResponses = 0;
- // We initialize to 'true' so that newly-added targets are included in the next RemoteEvent.
- _hasPendingChanges = YES;
- }
- return self;
- }
- - (BOOL)isPending {
- return _outstandingResponses != 0;
- }
- - (void)updateResumeToken:(NSData *)resumeToken {
- if (resumeToken.length > 0) {
- _hasPendingChanges = YES;
- _resumeToken = [resumeToken copy];
- }
- }
- - (void)clearPendingChanges {
- _hasPendingChanges = NO;
- _documentChanges.clear();
- }
- - (void)recordTargetRequest {
- _outstandingResponses += 1;
- }
- - (void)recordTargetResponse {
- _outstandingResponses -= 1;
- }
- - (void)markCurrent {
- _hasPendingChanges = YES;
- _current = true;
- }
- - (void)addDocumentChangeWithType:(FSTDocumentViewChangeType)type
- forKey:(const DocumentKey &)documentKey {
- _hasPendingChanges = YES;
- _documentChanges[documentKey] = type;
- }
- - (void)removeDocumentChangeForKey:(const DocumentKey &)documentKey {
- _hasPendingChanges = YES;
- _documentChanges.erase(documentKey);
- }
- - (FSTTargetChange *)toTargetChange {
- DocumentKeySet addedDocuments;
- DocumentKeySet modifiedDocuments;
- DocumentKeySet removedDocuments;
- for (const auto &entry : _documentChanges) {
- switch (entry.second) {
- case FSTDocumentViewChangeTypeAdded:
- addedDocuments = addedDocuments.insert(entry.first);
- break;
- case FSTDocumentViewChangeTypeModified:
- modifiedDocuments = modifiedDocuments.insert(entry.first);
- break;
- case FSTDocumentViewChangeTypeRemoved:
- removedDocuments = removedDocuments.insert(entry.first);
- break;
- default:
- HARD_FAIL("Encountered invalid change type: %s", entry.second);
- }
- }
- return [[FSTTargetChange alloc] initWithResumeToken:_resumeToken
- current:_current
- addedDocuments:std::move(addedDocuments)
- modifiedDocuments:std::move(modifiedDocuments)
- removedDocuments:std::move(removedDocuments)];
- }
- @end
- #pragma mark - FSTRemoteEvent
- @implementation FSTRemoteEvent {
- SnapshotVersion _snapshotVersion;
- std::unordered_map<TargetId, FSTTargetChange *> _targetChanges;
- std::unordered_set<TargetId> _targetMismatches;
- std::unordered_map<DocumentKey, FSTMaybeDocument *, DocumentKeyHash> _documentUpdates;
- DocumentKeySet _limboDocumentChanges;
- }
- - (instancetype)
- initWithSnapshotVersion:(SnapshotVersion)snapshotVersion
- targetChanges:(std::unordered_map<TargetId, FSTTargetChange *>)targetChanges
- targetMismatches:(std::unordered_set<TargetId>)targetMismatches
- documentUpdates:(std::unordered_map<DocumentKey, FSTMaybeDocument *, DocumentKeyHash>)
- documentUpdates
- limboDocuments:(DocumentKeySet)limboDocuments {
- self = [super init];
- if (self) {
- _snapshotVersion = std::move(snapshotVersion);
- _targetChanges = std::move(targetChanges);
- _targetMismatches = std::move(targetMismatches);
- _documentUpdates = std::move(documentUpdates);
- _limboDocumentChanges = std::move(limboDocuments);
- }
- return self;
- }
- - (const SnapshotVersion &)snapshotVersion {
- return _snapshotVersion;
- }
- - (const DocumentKeySet &)limboDocumentChanges {
- return _limboDocumentChanges;
- }
- - (const std::unordered_map<TargetId, FSTTargetChange *> &)targetChanges {
- return _targetChanges;
- }
- - (const std::unordered_map<DocumentKey, FSTMaybeDocument *, DocumentKeyHash> &)documentUpdates {
- return _documentUpdates;
- }
- - (const std::unordered_set<TargetId> &)targetMismatches {
- return _targetMismatches;
- }
- @end
- #pragma mark - FSTWatchChangeAggregator
- @implementation FSTWatchChangeAggregator {
- /** The internal state of all tracked targets. */
- std::unordered_map<TargetId, FSTTargetState *> _targetStates;
- /** Keeps track of document to update */
- std::unordered_map<DocumentKey, FSTMaybeDocument *, DocumentKeyHash> _pendingDocumentUpdates;
- /** A mapping of document keys to their set of target IDs. */
- std::unordered_map<DocumentKey, std::set<TargetId>, DocumentKeyHash>
- _pendingDocumentTargetMappings;
- /**
- * A list of targets with existence filter mismatches. These targets are known to be inconsistent
- * and their listens needs to be re-established by RemoteStore.
- */
- std::unordered_set<TargetId> _pendingTargetResets;
- id<FSTTargetMetadataProvider> _targetMetadataProvider;
- }
- - (instancetype)initWithTargetMetadataProvider:
- (id<FSTTargetMetadataProvider>)targetMetadataProvider {
- self = [super init];
- if (self) {
- _targetMetadataProvider = targetMetadataProvider;
- }
- return self;
- }
- - (void)handleDocumentChange:(FSTDocumentWatchChange *)documentChange {
- for (FSTBoxedTargetID *targetID in documentChange.updatedTargetIDs) {
- if ([documentChange.document isKindOfClass:[FSTDocument class]]) {
- [self addDocument:documentChange.document toTarget:targetID.intValue];
- } else if ([documentChange.document isKindOfClass:[FSTDeletedDocument class]]) {
- [self removeDocument:documentChange.document
- withKey:documentChange.documentKey
- fromTarget:targetID.intValue];
- }
- }
- for (FSTBoxedTargetID *targetID in documentChange.removedTargetIDs) {
- [self removeDocument:documentChange.document
- withKey:documentChange.documentKey
- fromTarget:targetID.intValue];
- }
- }
- - (void)handleTargetChange:(FSTWatchTargetChange *)targetChange {
- for (TargetId targetID : [self targetIdsForChange:targetChange]) {
- FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID];
- switch (targetChange.state) {
- case FSTWatchTargetChangeStateNoChange:
- if ([self isActiveTarget:targetID]) {
- [targetState updateResumeToken:targetChange.resumeToken];
- }
- break;
- case FSTWatchTargetChangeStateAdded:
- // We need to decrement the number of pending acks needed from watch for this targetId.
- [targetState recordTargetResponse];
- if (!targetState.isPending) {
- // We have a freshly added target, so we need to reset any state that we had previously.
- // This can happen e.g. when remove and add back a target for existence filter mismatches.
- [targetState clearPendingChanges];
- }
- [targetState updateResumeToken:targetChange.resumeToken];
- break;
- case FSTWatchTargetChangeStateRemoved:
- // We need to keep track of removed targets to we can post-filter and remove any target
- // changes.
- [targetState recordTargetResponse];
- if (!targetState.isPending) {
- [self removeTarget:targetID];
- }
- HARD_ASSERT(!targetChange.cause, "WatchChangeAggregator does not handle errored targets");
- break;
- case FSTWatchTargetChangeStateCurrent:
- if ([self isActiveTarget:targetID]) {
- [targetState markCurrent];
- [targetState updateResumeToken:targetChange.resumeToken];
- }
- break;
- case FSTWatchTargetChangeStateReset:
- if ([self isActiveTarget:targetID]) {
- // Reset the target and synthesizes removes for all existing documents. The backend will
- // re-add any documents that still match the target before it sends the next global
- // snapshot.
- [self resetTarget:targetID];
- [targetState updateResumeToken:targetChange.resumeToken];
- }
- break;
- default:
- HARD_FAIL("Unknown target watch change state: %s", targetChange.state);
- }
- }
- }
- /**
- * Returns all targetIds that the watch change applies to: either the targetIds explicitly listed
- * in the change or the targetIds of all currently active targets.
- */
- - (std::vector<TargetId>)targetIdsForChange:(FSTWatchTargetChange *)targetChange {
- NSArray<NSNumber *> *targetIDs = targetChange.targetIDs;
- std::vector<TargetId> result;
- if (targetIDs.count > 0) {
- result.reserve(targetIDs.count);
- for (NSNumber *targetID in targetIDs) {
- result.push_back(targetID.intValue);
- }
- } else {
- result.reserve(_targetStates.size());
- for (const auto &entry : _targetStates) {
- result.push_back(entry.first);
- }
- }
- return result;
- }
- - (void)removeTarget:(TargetId)targetID {
- _targetStates.erase(targetID);
- }
- - (void)handleExistenceFilter:(FSTExistenceFilterWatchChange *)existenceFilter {
- TargetId targetID = existenceFilter.targetID;
- int expectedCount = existenceFilter.filter.count();
- FSTQueryData *queryData = [self queryDataForActiveTarget:targetID];
- if (queryData) {
- FSTQuery *query = queryData.query;
- if ([query isDocumentQuery]) {
- if (expectedCount == 0) {
- // The existence filter told us the document does not exist. We deduce that this document
- // does not exist and apply a deleted document to our updates. Without applying this deleted
- // document there might be another query that will raise this document as part of a snapshot
- // until it is resolved, essentially exposing inconsistency between queries.
- DocumentKey key{query.path};
- [self removeDocument:[FSTDeletedDocument documentWithKey:key
- version:SnapshotVersion::None()
- hasCommittedMutations:NO]
- withKey:key
- fromTarget:targetID];
- } else {
- HARD_ASSERT(expectedCount == 1, "Single document existence filter with count: %s",
- expectedCount);
- }
- } else {
- int currentSize = [self currentDocumentCountForTarget:targetID];
- if (currentSize != expectedCount) {
- // Existence filter mismatch: We reset the mapping and raise a new snapshot with
- // `isFromCache:true`.
- [self resetTarget:targetID];
- _pendingTargetResets.insert(targetID);
- }
- }
- }
- }
- - (int)currentDocumentCountForTarget:(TargetId)targetID {
- FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID];
- FSTTargetChange *targetChange = [targetState toTargetChange];
- return ([_targetMetadataProvider remoteKeysForTarget:@(targetID)].size() +
- targetChange.addedDocuments.size() - targetChange.removedDocuments.size());
- }
- /**
- * Resets the state of a Watch target to its initial state (e.g. sets 'current' to false, clears the
- * resume token and removes its target mapping from all documents).
- */
- - (void)resetTarget:(TargetId)targetID {
- auto currentTargetState = _targetStates.find(targetID);
- HARD_ASSERT(currentTargetState != _targetStates.end() && !(currentTargetState->second.isPending),
- "Should only reset active targets");
- _targetStates[targetID] = [FSTTargetState new];
- // Trigger removal for any documents currently mapped to this target. These removals will be part
- // of the initial snapshot if Watch does not resend these documents.
- DocumentKeySet existingKeys = [_targetMetadataProvider remoteKeysForTarget:@(targetID)];
- for (const DocumentKey &key : existingKeys) {
- [self removeDocument:nil withKey:key fromTarget:targetID];
- }
- }
- /**
- * Adds the provided document to the internal list of document updates and its document key to the
- * given target's mapping.
- */
- - (void)addDocument:(FSTMaybeDocument *)document toTarget:(TargetId)targetID {
- if (![self isActiveTarget:targetID]) {
- return;
- }
- FSTDocumentViewChangeType changeType = [self containsDocument:document.key inTarget:targetID]
- ? FSTDocumentViewChangeTypeModified
- : FSTDocumentViewChangeTypeAdded;
- FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID];
- [targetState addDocumentChangeWithType:changeType forKey:document.key];
- _pendingDocumentUpdates[document.key] = document;
- _pendingDocumentTargetMappings[document.key].insert(targetID);
- }
- /**
- * Removes the provided document from the target mapping. If the document no longer matches the
- * target, but the document's state is still known (e.g. we know that the document was deleted or we
- * received the change that caused the filter mismatch), the new document can be provided to update
- * the remote document cache.
- */
- - (void)removeDocument:(FSTMaybeDocument *_Nullable)document
- withKey:(const DocumentKey &)key
- fromTarget:(TargetId)targetID {
- if (![self isActiveTarget:targetID]) {
- return;
- }
- FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID];
- if ([self containsDocument:key inTarget:targetID]) {
- [targetState addDocumentChangeWithType:FSTDocumentViewChangeTypeRemoved forKey:key];
- } else {
- // The document may have entered and left the target before we raised a snapshot, so we can just
- // ignore the change.
- [targetState removeDocumentChangeForKey:key];
- }
- _pendingDocumentTargetMappings[key].insert(targetID);
- if (document) {
- _pendingDocumentUpdates[key] = document;
- }
- }
- /**
- * Returns whether the LocalStore considers the document to be part of the specified target.
- */
- - (BOOL)containsDocument:(const DocumentKey &)key inTarget:(TargetId)targetID {
- const DocumentKeySet &existingKeys = [_targetMetadataProvider remoteKeysForTarget:@(targetID)];
- return existingKeys.contains(key);
- }
- - (FSTTargetState *)ensureTargetStateForTarget:(TargetId)targetID {
- if (!_targetStates[targetID]) {
- _targetStates[targetID] = [FSTTargetState new];
- }
- return _targetStates[targetID];
- }
- /**
- * Returns YES if the given targetId is active. Active targets are those for which there are no
- * pending requests to add a listen and are in the current list of targets the client cares about.
- *
- * Clients can repeatedly listen and stop listening to targets, so this check is useful in
- * preventing in preventing race conditions for a target where events arrive but the server hasn't
- * yet acknowledged the intended change in state.
- */
- - (BOOL)isActiveTarget:(TargetId)targetID {
- return [self queryDataForActiveTarget:targetID] != nil;
- }
- - (nullable FSTQueryData *)queryDataForActiveTarget:(TargetId)targetID {
- auto targetState = _targetStates.find(targetID);
- return targetState != _targetStates.end() && targetState->second.isPending
- ? nil
- : [_targetMetadataProvider queryDataForTarget:@(targetID)];
- }
- - (FSTRemoteEvent *)remoteEventAtSnapshotVersion:(const SnapshotVersion &)snapshotVersion {
- std::unordered_map<TargetId, FSTTargetChange *> targetChanges;
- for (const auto &entry : _targetStates) {
- TargetId targetID = entry.first;
- FSTTargetState *targetState = entry.second;
- FSTQueryData *queryData = [self queryDataForActiveTarget:targetID];
- if (queryData) {
- if (targetState.current && [queryData.query isDocumentQuery]) {
- // Document queries for document that don't exist can produce an empty result set. To update
- // our local cache, we synthesize a document delete if we have not previously received the
- // document. This resolves the limbo state of the document, removing it from
- // limboDocumentRefs.
- DocumentKey key{queryData.query.path};
- if (_pendingDocumentUpdates.find(key) == _pendingDocumentUpdates.end() &&
- ![self containsDocument:key inTarget:targetID]) {
- [self removeDocument:[FSTDeletedDocument documentWithKey:key
- version:snapshotVersion
- hasCommittedMutations:NO]
- withKey:key
- fromTarget:targetID];
- }
- }
- if (targetState.hasPendingChanges) {
- targetChanges[targetID] = [targetState toTargetChange];
- [targetState clearPendingChanges];
- }
- }
- }
- DocumentKeySet resolvedLimboDocuments;
- // We extract the set of limbo-only document updates as the GC logic special-cases documents that
- // do not appear in the query cache.
- //
- // TODO(gsoltis): Expand on this comment.
- for (const auto &entry : _pendingDocumentTargetMappings) {
- BOOL isOnlyLimboTarget = YES;
- for (TargetId targetID : entry.second) {
- FSTQueryData *queryData = [self queryDataForActiveTarget:targetID];
- if (queryData && queryData.purpose != FSTQueryPurposeLimboResolution) {
- isOnlyLimboTarget = NO;
- break;
- }
- }
- if (isOnlyLimboTarget) {
- resolvedLimboDocuments = resolvedLimboDocuments.insert(entry.first);
- }
- }
- FSTRemoteEvent *remoteEvent =
- [[FSTRemoteEvent alloc] initWithSnapshotVersion:snapshotVersion
- targetChanges:targetChanges
- targetMismatches:_pendingTargetResets
- documentUpdates:_pendingDocumentUpdates
- limboDocuments:resolvedLimboDocuments];
- _pendingDocumentUpdates.clear();
- _pendingDocumentTargetMappings.clear();
- _pendingTargetResets.clear();
- return remoteEvent;
- }
- - (void)recordTargetRequest:(FSTBoxedTargetID *)targetID {
- // For each request we get we need to record we need a response for it.
- FSTTargetState *targetState = [self ensureTargetStateForTarget:targetID.intValue];
- [targetState recordTargetRequest];
- }
- @end
- NS_ASSUME_NONNULL_END
|