schedule.h 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. /*
  2. * Copyright 2020 Google LLC
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef FIRESTORE_CORE_SRC_UTIL_SCHEDULE_H_
  17. #define FIRESTORE_CORE_SRC_UTIL_SCHEDULE_H_
  18. #include <algorithm>
  19. #include <condition_variable> // NOLINT(build/c++11)
  20. #include <deque>
  21. #include <mutex> // NOLINT(build/c++11)
  22. #include <vector>
  23. #include "Firestore/core/src/util/executor.h"
  24. namespace firebase {
  25. namespace firestore {
  26. namespace util {
  27. class Task;
  28. // A thread-safe class similar to a priority queue where the entries are
  29. // prioritized by the time for which they're scheduled. Entries scheduled for
  30. // the exact same time are prioritized in FIFO order.
  31. //
  32. // The main function of `Schedule` is `PopBlocking`, which sleeps until an entry
  33. // becomes available. It correctly handles entries being asynchronously added or
  34. // removed from the schedule.
  35. //
  36. // The details of time management are completely concealed within the class.
  37. // Once an entry is scheduled, there is no way to reschedule or even retrieve
  38. // the time.
  39. class Schedule {
  40. // Internal invariants:
  41. // - entries are always in sorted order, leftmost entry is always the most
  42. // due;
  43. // - each operation modifying the queue notifies the condition variable `cv_`.
  44. public:
  45. using Duration = Executor::Milliseconds;
  46. using Clock = Executor::Clock;
  47. // Entries are scheduled using absolute time.
  48. using TimePoint = Executor::TimePoint;
  49. ~Schedule();
  50. void Clear();
  51. // Schedules a task for it's specified target time.
  52. void Push(Task* task);
  53. // If the queue contains at least one entry for which the scheduled time is
  54. // due now (according to the system clock), removes the entry which is the
  55. // most overdue from the queue and returns it. If no entry is due, returns
  56. // `nullptr`.
  57. Task* PopIfDue();
  58. // Blocks until at least one entry is available for which the scheduled time
  59. // is due now (according to the system clock), removes the entry which is the
  60. // most overdue from the queue and returns it. The function will attempt to
  61. // minimize both the waiting time and busy waiting.
  62. Task* PopBlocking();
  63. bool empty() const;
  64. size_t size() const;
  65. // Removes the first entry satisfying predicate from the queue and returns it.
  66. // If no such entry exists, returns `nullptr`. The predicate is applied to
  67. // entries in order according to their scheduled time.
  68. //
  69. // Note that this function doesn't take into account whether the removed entry
  70. // is past its due time.
  71. template <typename Pred>
  72. Task* RemoveIf(const Pred pred) {
  73. std::lock_guard<std::mutex> lock{mutex_};
  74. for (auto iter = scheduled_.begin(), end = scheduled_.end(); iter != end;
  75. ++iter) {
  76. Task* task = *iter;
  77. if (pred(*task)) {
  78. return ExtractLocked(iter);
  79. }
  80. }
  81. return nullptr;
  82. }
  83. // Checks whether the queue contains an entry satisfying the given predicate.
  84. template <typename Pred>
  85. bool Contains(const Pred pred) const {
  86. std::lock_guard<std::mutex> lock{mutex_};
  87. return std::any_of(scheduled_.begin(), scheduled_.end(),
  88. [&pred](Task* t) { return pred(*t); });
  89. }
  90. private:
  91. // All removals are on the front, but most insertions are expected to be on
  92. // the back.
  93. using Container = std::deque<Task*>;
  94. using Iterator = typename Container::iterator;
  95. void InsertPreservingOrder(Task* new_entry);
  96. // This function expects the mutex to be already locked.
  97. bool HasDueLocked() const;
  98. // This function expects the mutex to be already locked.
  99. Task* ExtractLocked(const Iterator where);
  100. mutable std::mutex mutex_;
  101. std::condition_variable cv_;
  102. Container scheduled_;
  103. };
  104. } // namespace util
  105. } // namespace firestore
  106. } // namespace firebase
  107. #endif // FIRESTORE_CORE_SRC_UTIL_SCHEDULE_H_