remote_event.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. /*
  2. * Copyright 2019 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef FIRESTORE_CORE_SRC_REMOTE_REMOTE_EVENT_H_
  17. #define FIRESTORE_CORE_SRC_REMOTE_REMOTE_EVENT_H_
  18. #include <set>
  19. #include <unordered_map>
  20. #include <unordered_set>
  21. #include <utility>
  22. #include <vector>
  23. #include "Firestore/core/src/core/view_snapshot.h"
  24. #include "Firestore/core/src/model/document_key.h"
  25. #include "Firestore/core/src/model/document_key_set.h"
  26. #include "Firestore/core/src/model/mutable_document.h"
  27. #include "Firestore/core/src/model/snapshot_version.h"
  28. #include "Firestore/core/src/model/types.h"
  29. #include "Firestore/core/src/nanopb/byte_string.h"
  30. #include "Firestore/core/src/remote/watch_change.h"
  31. namespace firebase {
  32. namespace firestore {
  33. namespace local {
  34. class TargetData;
  35. } // namespace local
  36. namespace remote {
  37. enum class BloomFilterApplicationStatus { kSuccess, kSkipped, kFalsePositive };
  38. /**
  39. * Interface implemented by `RemoteStore` to expose target metadata to the
  40. * `WatchChangeAggregator`.
  41. */
  42. class TargetMetadataProvider {
  43. public:
  44. virtual ~TargetMetadataProvider() = default;
  45. /**
  46. * Returns the set of remote document keys for the given target ID as of the
  47. * last raised snapshot.
  48. */
  49. virtual model::DocumentKeySet GetRemoteKeysForTarget(
  50. model::TargetId target_id) const = 0;
  51. /**
  52. * Returns the TargetData for an active target ID or `nullopt` if this query
  53. * has become inactive.
  54. */
  55. virtual absl::optional<local::TargetData> GetTargetDataForTarget(
  56. model::TargetId target_id) const = 0;
  57. /** Returns the database ID of the Firestore instance. */
  58. virtual const model::DatabaseId& GetDatabaseId() const = 0;
  59. };
  60. /**
  61. * A `TargetChange` specifies the set of changes for a specific target as part
  62. * of an `RemoteEvent`. These changes track which documents are added,
  63. * modified or removed, as well as the target's resume token and whether the
  64. * target is marked CURRENT.
  65. *
  66. * The actual changes *to* documents are not part of the `TargetChange` since
  67. * documents may be part of multiple targets.
  68. */
  69. class TargetChange {
  70. public:
  71. static TargetChange CreateSynthesizedTargetChange(
  72. bool current, nanopb::ByteString resume_token) {
  73. return TargetChange(std::move(resume_token), current);
  74. }
  75. TargetChange() = default;
  76. TargetChange(nanopb::ByteString resume_token,
  77. bool current,
  78. model::DocumentKeySet added_documents,
  79. model::DocumentKeySet modified_documents,
  80. model::DocumentKeySet removed_documents)
  81. : resume_token_{std::move(resume_token)},
  82. current_{current},
  83. added_documents_{std::move(added_documents)},
  84. modified_documents_{std::move(modified_documents)},
  85. removed_documents_{std::move(removed_documents)} {
  86. }
  87. /**
  88. * An opaque, server-assigned token that allows watching a target to be
  89. * resumed after disconnecting without retransmitting all the data that
  90. * matches the target. The resume token essentially identifies a point in time
  91. * from which the server should resume sending results.
  92. */
  93. const nanopb::ByteString& resume_token() const {
  94. return resume_token_;
  95. }
  96. /**
  97. * The "current" (synced) status of this target. Note that "current" has
  98. * special meaning in the RPC protocol that implies that a target is both
  99. * up-to-date and consistent with the rest of the watch stream.
  100. */
  101. bool current() const {
  102. return current_;
  103. }
  104. /**
  105. * The set of documents that were newly assigned to this target as part of
  106. * this remote event.
  107. */
  108. const model::DocumentKeySet& added_documents() const {
  109. return added_documents_;
  110. }
  111. /**
  112. * The set of documents that were already assigned to this target but received
  113. * an update during this remote event.
  114. */
  115. const model::DocumentKeySet& modified_documents() const {
  116. return modified_documents_;
  117. }
  118. /**
  119. * The set of documents that were removed from this target as part of this
  120. * remote event.
  121. */
  122. const model::DocumentKeySet& removed_documents() const {
  123. return removed_documents_;
  124. }
  125. private:
  126. TargetChange(nanopb::ByteString resume_token, bool current)
  127. : resume_token_(std::move(resume_token)), current_{current} {
  128. }
  129. nanopb::ByteString resume_token_;
  130. bool current_ = false;
  131. model::DocumentKeySet added_documents_;
  132. model::DocumentKeySet modified_documents_;
  133. model::DocumentKeySet removed_documents_;
  134. };
  135. bool operator==(const TargetChange& lhs, const TargetChange& rhs);
  136. /** Tracks the internal state of a Watch target. */
  137. class TargetState {
  138. public:
  139. /**
  140. * Whether this target has been marked 'current'.
  141. *
  142. * 'current' has special meaning in the RPC protocol: It implies that the
  143. * Watch backend has sent us all changes up to the point at which the target
  144. * was added and that the target is consistent with the rest of the watch
  145. * stream.
  146. */
  147. bool current() const {
  148. return current_;
  149. }
  150. /** The last resume token sent to us for this target. */
  151. const nanopb::ByteString& resume_token() const {
  152. return resume_token_;
  153. }
  154. /** Whether this target has pending target adds or target removes. */
  155. bool IsPending() const {
  156. return outstanding_responses_ != 0;
  157. }
  158. /** Whether we have modified any state that should trigger a snapshot. */
  159. bool HasPendingChanges() const {
  160. return has_pending_changes_;
  161. }
  162. /**
  163. * Applies the resume token to the `TargetChange`, but only when it has a new
  164. * value. Empty resume tokens are discarded.
  165. */
  166. void UpdateResumeToken(nanopb::ByteString resume_token);
  167. /**
  168. * Creates a target change from the current set of changes.
  169. *
  170. * To reset the document changes after raising this snapshot, call
  171. * `ClearPendingChanges()`.
  172. */
  173. TargetChange ToTargetChange() const;
  174. /** Resets the document changes and sets `HasPendingChanges` to false. */
  175. void ClearPendingChanges();
  176. void AddDocumentChange(const model::DocumentKey& document_key,
  177. core::DocumentViewChange::Type type);
  178. void RemoveDocumentChange(const model::DocumentKey& document_key);
  179. void RecordPendingTargetRequest();
  180. void RecordTargetResponse();
  181. void MarkCurrent();
  182. private:
  183. /**
  184. * The number of outstanding responses (adds or removes) that we are waiting
  185. * on. We only consider targets active that have no outstanding responses.
  186. */
  187. int outstanding_responses_ = 0;
  188. /**
  189. * Keeps track of the document changes since the last raised snapshot.
  190. *
  191. * These changes are continuously updated as we receive document updates and
  192. * always reflect the current set of changes against the last issued snapshot.
  193. */
  194. std::unordered_map<model::DocumentKey,
  195. core::DocumentViewChange::Type,
  196. model::DocumentKeyHash>
  197. document_changes_;
  198. nanopb::ByteString resume_token_;
  199. bool current_ = false;
  200. /**
  201. * Whether this target state should be included in the next snapshot. We
  202. * initialize to true so that newly-added targets are included in the next
  203. * RemoteEvent.
  204. */
  205. bool has_pending_changes_ = true;
  206. };
  207. /**
  208. * An event from the RemoteStore. It is split into `TargetChanges` (changes to
  209. * the state or the set of documents in our watched targets) and
  210. * `DocumentUpdates` (changes to the actual documents).
  211. */
  212. class RemoteEvent {
  213. public:
  214. using TargetChangeMap = std::unordered_map<model::TargetId, TargetChange>;
  215. using TargetMismatchMap =
  216. std::unordered_map<model::TargetId, local::QueryPurpose>;
  217. RemoteEvent(model::SnapshotVersion snapshot_version,
  218. TargetChangeMap target_changes,
  219. TargetMismatchMap target_mismatches,
  220. model::DocumentUpdateMap document_updates,
  221. model::DocumentKeySet limbo_document_changes)
  222. : snapshot_version_{snapshot_version},
  223. target_changes_{std::move(target_changes)},
  224. target_mismatches_{std::move(target_mismatches)},
  225. document_updates_{std::move(document_updates)},
  226. limbo_document_changes_{std::move(limbo_document_changes)} {
  227. }
  228. /** The snapshot version this event brings us up to. */
  229. const model::SnapshotVersion& snapshot_version() const {
  230. return snapshot_version_;
  231. }
  232. /** A map from target to changes to the target. See `TargetChange`. */
  233. const TargetChangeMap& target_changes() const {
  234. return target_changes_;
  235. }
  236. /**
  237. * A map of targets that is known to be inconsistent, and the purpose for
  238. * re-listening. Listens for these targets should be re-established without
  239. * resume tokens.
  240. */
  241. const TargetMismatchMap& target_mismatches() const {
  242. return target_mismatches_;
  243. }
  244. /**
  245. * A set of which documents have changed or been deleted, along with the doc's
  246. * new values (if not deleted).
  247. */
  248. const model::DocumentUpdateMap& document_updates() const {
  249. return document_updates_;
  250. }
  251. /**
  252. * A set of which document updates are due only to limbo resolution targets.
  253. */
  254. const model::DocumentKeySet& limbo_document_changes() const {
  255. return limbo_document_changes_;
  256. }
  257. private:
  258. model::SnapshotVersion snapshot_version_;
  259. TargetChangeMap target_changes_;
  260. TargetMismatchMap target_mismatches_;
  261. model::DocumentUpdateMap document_updates_;
  262. model::DocumentKeySet limbo_document_changes_;
  263. };
  264. /**
  265. * A helper class to accumulate watch changes into a `RemoteEvent` and other
  266. * target information.
  267. */
  268. class WatchChangeAggregator {
  269. public:
  270. explicit WatchChangeAggregator(
  271. TargetMetadataProvider* target_metadata_provider);
  272. /**
  273. * Processes and adds the `DocumentWatchChange` to the current set of changes.
  274. */
  275. void HandleDocumentChange(const DocumentWatchChange& document_change);
  276. /**
  277. * Processes and adds the `WatchTargetChange` to the current set of changes.
  278. */
  279. void HandleTargetChange(const WatchTargetChange& target_change);
  280. /**
  281. * Handles existence filters and synthesizes deletes for filter mismatches.
  282. * Targets that are invalidated by filter mismatches are added to
  283. * `pending_target_resets_`.
  284. */
  285. void HandleExistenceFilter(
  286. const ExistenceFilterWatchChange& existence_filter);
  287. /**
  288. * Converts the current state into a remote event with the snapshot version
  289. * taken from the initializer. Resets the accumulated changes before
  290. * returning.
  291. */
  292. RemoteEvent CreateRemoteEvent(const model::SnapshotVersion& snapshot_version);
  293. /** Removes the in-memory state for the provided target. */
  294. void RemoveTarget(model::TargetId target_id);
  295. /**
  296. * Increment the number of acks needed from watch before we can consider the
  297. * server to be 'in-sync' with the client's active targets.
  298. */
  299. void RecordPendingTargetRequest(model::TargetId target_id);
  300. private:
  301. /**
  302. * Returns all `TargetId`s that the watch change applies to: either the
  303. * `TargetId`s explicitly listed in the change or the `TargetId`s of all
  304. * currently active targets.
  305. */
  306. std::vector<model::TargetId> GetTargetIds(
  307. const WatchTargetChange& target_change) const;
  308. /**
  309. * Adds the provided document to the internal list of document updates and its
  310. * document key to the given target's mapping.
  311. */
  312. void AddDocumentToTarget(model::TargetId target_id,
  313. const model::MutableDocument& document);
  314. /**
  315. * Removes the provided document from the target mapping. If the document no
  316. * longer matches the target, but the document's state is still known (e.g. we
  317. * know that the document was deleted or we received the change that caused
  318. * the filter mismatch), the new document can be provided to update the remote
  319. * document cache.
  320. */
  321. void RemoveDocumentFromTarget(
  322. model::TargetId target_id,
  323. const model::DocumentKey& key,
  324. const absl::optional<model::MutableDocument>& updated_document);
  325. /**
  326. * Returns the current count of documents in the target. This includes both
  327. * the number of documents that the LocalStore considers to be part of the
  328. * target as well as any accumulated changes.
  329. */
  330. int GetCurrentDocumentCountForTarget(model::TargetId target_id);
  331. // PORTING NOTE: this method exists only for consistency with other platforms;
  332. // in C++, it's pretty much unnecessary.
  333. TargetState& EnsureTargetState(model::TargetId target_id);
  334. /**
  335. * Returns true if the given `target_id` is active. Active targets are those
  336. * for which there are no pending requests to add a listen and are in the
  337. * current list of targets the client cares about.
  338. *
  339. * Clients can repeatedly listen and stop listening to targets, so this check
  340. * is useful in preventing race conditions for a target where events arrive
  341. * but the server hasn't yet acknowledged the intended change in state.
  342. */
  343. bool IsActiveTarget(model::TargetId target_id) const;
  344. /**
  345. * Returns the `TargetData` for an active target (i.e., a target that the user
  346. * is still interested in that has no outstanding target change requests).
  347. */
  348. absl::optional<local::TargetData> TargetDataForActiveTarget(
  349. model::TargetId target_id) const;
  350. /**
  351. * Resets the state of a Watch target to its initial state (e.g. sets
  352. * 'current' to false, clears the resume token and removes its target mapping
  353. * from all documents).
  354. */
  355. void ResetTarget(model::TargetId target_id);
  356. /** Returns whether the local store considers the document to be part of the
  357. * specified target. */
  358. bool TargetContainsDocument(model::TargetId target_id,
  359. const model::DocumentKey& key);
  360. /**
  361. * Parse the bloom filter from the "unchanged_names" field of an existence
  362. * filter.
  363. */
  364. static absl::optional<BloomFilter> ParseBloomFilter(
  365. const ExistenceFilterWatchChange& existence_filter);
  366. /**
  367. * Apply bloom filter to remove the deleted documents, and return the
  368. * application status.
  369. */
  370. BloomFilterApplicationStatus ApplyBloomFilter(
  371. const BloomFilter& bloom_filter,
  372. const ExistenceFilterWatchChange& existence_filter,
  373. int current_count);
  374. /**
  375. * Filter out removed documents based on bloom filter membership result and
  376. * return number of documents removed.
  377. */
  378. int FilterRemovedDocuments(const BloomFilter& bloom_filter, int target_id);
  379. /** The internal state of all tracked targets. */
  380. std::unordered_map<model::TargetId, TargetState> target_states_;
  381. /** Keeps track of the documents to update since the last raised snapshot. */
  382. model::DocumentUpdateMap pending_document_updates_;
  383. /** A mapping of document keys to their set of target IDs. */
  384. std::unordered_map<model::DocumentKey,
  385. std::set<model::TargetId>,
  386. model::DocumentKeyHash>
  387. pending_document_target_mappings_;
  388. /**
  389. * A map of targets with existence filter mismatches. These targets are known
  390. * to be inconsistent and their listens needs to be re-established by
  391. * `RemoteStore`.
  392. */
  393. RemoteEvent::TargetMismatchMap pending_target_resets_;
  394. TargetMetadataProvider* target_metadata_provider_ = nullptr;
  395. };
  396. } // namespace remote
  397. } // namespace firestore
  398. } // namespace firebase
  399. #endif // FIRESTORE_CORE_SRC_REMOTE_REMOTE_EVENT_H_