|
|
@@ -53,6 +53,8 @@ const AsyncQueue::Milliseconds kBackoffInitialDelay{std::chrono::seconds(1)};
|
|
|
const AsyncQueue::Milliseconds kBackoffMaxDelay{std::chrono::seconds(60)};
|
|
|
/** The time a stream stays open after it is marked idle. */
|
|
|
const AsyncQueue::Milliseconds kIdleTimeout{std::chrono::seconds(60)};
|
|
|
+/** The time a stream stays open until we consider it healthy. */
|
|
|
+const AsyncQueue::Milliseconds kHealthyTimeout{std::chrono::seconds(10)};
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
@@ -60,20 +62,22 @@ Stream::Stream(const std::shared_ptr<AsyncQueue>& worker_queue,
|
|
|
std::shared_ptr<AuthCredentialsProvider> credentials_provider,
|
|
|
GrpcConnection* grpc_connection,
|
|
|
TimerId backoff_timer_id,
|
|
|
- TimerId idle_timer_id)
|
|
|
+ TimerId idle_timer_id,
|
|
|
+ TimerId health_check_timer_id)
|
|
|
: backoff_{worker_queue, backoff_timer_id, kBackoffFactor,
|
|
|
kBackoffInitialDelay, kBackoffMaxDelay},
|
|
|
credentials_provider_{std::move(credentials_provider)},
|
|
|
worker_queue_{worker_queue},
|
|
|
grpc_connection_{grpc_connection},
|
|
|
- idle_timer_id_{idle_timer_id} {
|
|
|
+ idle_timer_id_{idle_timer_id},
|
|
|
+ health_check_timer_id_{health_check_timer_id} {
|
|
|
}
|
|
|
|
|
|
// Check state
|
|
|
|
|
|
bool Stream::IsOpen() const {
|
|
|
EnsureOnQueue();
|
|
|
- return state_ == State::Open;
|
|
|
+ return state_ == State::Open || state_ == State::Healthy;
|
|
|
}
|
|
|
|
|
|
bool Stream::IsStarted() const {
|
|
|
@@ -147,6 +151,15 @@ void Stream::OnStreamStart() {
|
|
|
|
|
|
state_ = State::Open;
|
|
|
NotifyStreamOpen();
|
|
|
+
|
|
|
+ health_check_ = worker_queue_->EnqueueAfterDelay(
|
|
|
+ kHealthyTimeout, health_check_timer_id_, [this] {
|
|
|
+ {
|
|
|
+ if (IsOpen()) {
|
|
|
+ state_ = State::Healthy;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
// Backoff
|
|
|
@@ -249,6 +262,7 @@ void Stream::Close(const Status& status) {
|
|
|
// execute).
|
|
|
CancelIdleCheck();
|
|
|
backoff_.Cancel();
|
|
|
+ health_check_.Cancel();
|
|
|
|
|
|
// Step 3 (both): increment close count, which invalidates long-lived
|
|
|
// callbacks, guaranteeing they won't execute against a new instance of the
|
|
|
@@ -288,9 +302,14 @@ void Stream::HandleErrorStatus(const Status& status) {
|
|
|
"%s Using maximum backoff delay to prevent overloading the backend.",
|
|
|
GetDebugDescription());
|
|
|
backoff_.ResetToMax();
|
|
|
- } else if (status.code() == Error::kErrorUnauthenticated) {
|
|
|
- // "unauthenticated" error means the token was rejected. Try force
|
|
|
- // refreshing it in case it just expired.
|
|
|
+ } else if (status.code() == Error::kErrorUnauthenticated &&
|
|
|
+ state_ != State::Healthy) {
|
|
|
+ // "unauthenticated" error means the token was rejected. This should rarely
|
|
|
+ // happen since both Auth and AppCheck ensure a sufficient TTL when we
|
|
|
+ // request a token. If a user manually resets their system clock this can
|
|
|
+ // fail, however. In this case, we should get a kErrorUnauthenticated error
|
|
|
+ // before we received the first message and we need to invalidate the token
|
|
|
+ // to ensure that we fetch a new token.
|
|
|
credentials_provider_->InvalidateToken();
|
|
|
}
|
|
|
}
|