Przeglądaj źródła

[Functions] Add support for streamable cloud functions (#14395)

Co-authored-by: Eblen Macari <eblenmacari@gmail.com>
Co-authored-by: Eblen M <joseeblen@xwf.google.com>
Co-authored-by: Rodrigo Lazo <rlazo@users.noreply.github.com>
Nick Cooke 1 rok temu
rodzic
commit
baf60edf13

+ 6 - 7
.github/workflows/functions.yml

@@ -31,8 +31,6 @@ jobs:
       matrix:
         target: [ios, tvos, macos, watchos]
         build-env:
-          - os: macos-14
-            xcode: Xcode_15.2
           - os: macos-15
             xcode: Xcode_16.2
     runs-on: ${{ matrix.build-env.os }}
@@ -43,14 +41,12 @@ jobs:
       run: sudo xcode-select -s /Applications/${{ matrix.build-env.xcode }}.app/Contents/Developer
     - name: Setup Bundler
       run: scripts/setup_bundler.sh
-    # The integration tests are flaky on Xcode 15 so only run the unit tests. The integration tests still run with SPM.
-    # - name: Integration Test Server
-    #   run: FirebaseFunctions/Backend/start.sh synchronous
+    - name: Integration Test Server
+      run: FirebaseFunctions/Backend/start.sh synchronous
     - name: Build and test
       run: |
         scripts/third_party/travis/retry.sh scripts/pod_lib_lint.rb FirebaseFunctions.podspec \
-          --test-specs=unit --platforms=${{ matrix.target }}
-
+          --platforms=${{ matrix.target }}
 
   spm-package-resolved:
     runs-on: macos-14
@@ -145,6 +141,9 @@ jobs:
         key: ${{needs.spm-package-resolved.outputs.cache_key}}
     - name: Xcode
       run: sudo xcode-select -s /Applications/${{ matrix.xcode }}.app/Contents/Developer
+    - name: Install visionOS, if needed.
+      if: matrix.target == 'visionOS'
+      run: xcodebuild -downloadPlatform visionOS
     - name: Initialize xcodebuild
       run: scripts/setup_spm_tests.sh
     - name: Unit Tests

+ 83 - 9
FirebaseFunctions/Backend/index.js

@@ -16,6 +16,14 @@ const assert = require('assert');
 const functionsV1 = require('firebase-functions/v1');
 const functionsV2 = require('firebase-functions/v2');
 
+// MARK: - Utilities
+
+function sleep(ms) {
+  return new Promise(resolve => setTimeout(resolve, ms));
+};
+
+// MARK: - Callable Functions
+
 exports.dataTest = functionsV1.https.onRequest((request, response) => {
   assert.deepEqual(request.body, {
     data: {
@@ -121,14 +129,10 @@ exports.timeoutTest = functionsV1.https.onRequest((request, response) => {
 
 const streamData = ["hello", "world", "this", "is", "cool"]
 
-function sleep(ms) {
-  return new Promise(resolve => setTimeout(resolve, ms));
-};
-
 async function* generateText() {
   for (const chunk of streamData) {
     yield chunk;
-    await sleep(1000);
+    await sleep(100);
   }
 };
 
@@ -136,7 +140,7 @@ exports.genStream = functionsV2.https.onCall(
   async (request, response) => {
     if (request.acceptsStreaming) {
       for await (const chunk of generateText()) {
-        response.sendChunk({ chunk });
+        response.sendChunk(chunk);
       }
     }
     return streamData.join(" ");
@@ -145,11 +149,81 @@ exports.genStream = functionsV2.https.onCall(
 
 exports.genStreamError = functionsV2.https.onCall(
   async (request, response) => {
+    // Note: The functions backend does not pass the error message to the
+    // client at this time.
+    throw Error("BOOM")
+  }
+);
+
+const weatherForecasts = {
+  Toronto: { conditions: 'snowy', temperature: 25 },
+  London: { conditions: 'rainy', temperature: 50 },
+  Dubai: { conditions: 'sunny', temperature: 75 }
+};
+
+async function* generateForecast(locations) {
+  for (const location of locations) {
+    yield { 'location': location,  ...weatherForecasts[location.name] };
+    await sleep(100);
+  }
+};
+
+exports.genStreamWeather = functionsV2.https.onCall(
+  async (request, response) => {
+    const forecasts = [];
     if (request.acceptsStreaming) {
-      for await (const chunk of generateText()) {
-        response.write({ chunk });
+      for await (const chunk of generateForecast(request.data)) {
+        forecasts.push(chunk)
+        response.sendChunk(chunk);
+      }
+    }
+    return { forecasts };
+  }
+);
+
+exports.genStreamWeatherError = functionsV2.https.onCall(
+  async (request, response) => {
+    if (request.acceptsStreaming) {
+      for await (const chunk of generateForecast(request.data)) {
+        // Remove the location field, since the SDK cannot decode the message
+        // if it's there.
+        delete chunk.location;
+        response.sendChunk(chunk);
+      }
+    }
+    return "Number of forecasts generated: " + request.data.length;
+  }
+);
+
+exports.genStreamEmpty = functionsV2.https.onCall(
+  async (request, response) => {
+    if (request.acceptsStreaming) {
+      // Send no chunks
+    }
+    // Implicitly return null.
+  }
+);
+
+exports.genStreamResultOnly = functionsV2.https.onCall(
+  async (request, response) => {
+    if (request.acceptsStreaming) {
+      // Do not send any chunks.
+    }
+    return "Only a result";
+  }
+);
+
+exports.genStreamLargeData = functionsV2.https.onCall(
+  async (request, response) => {
+    if (request.acceptsStreaming) {
+      const largeString = 'A'.repeat(10000);
+      const chunkSize = 1024;
+      for (let i = 0; i < largeString.length; i += chunkSize) {
+        const chunk = largeString.substring(i, i + chunkSize);
+        response.sendChunk(chunk);
+        await sleep(100);
       }
-      throw Error("BOOM")
     }
+    return "Stream Completed";
   }
 );

+ 5 - 0
FirebaseFunctions/Backend/start.sh

@@ -57,6 +57,11 @@ FUNCTIONS_BIN="./node_modules/.bin/functions"
 "${FUNCTIONS_BIN}" deploy timeoutTest --trigger-http
 "${FUNCTIONS_BIN}" deploy genStream --trigger-http
 "${FUNCTIONS_BIN}" deploy genStreamError --trigger-http
+"${FUNCTIONS_BIN}" deploy genStreamWeather --trigger-http
+"${FUNCTIONS_BIN}" deploy genStreamWeatherError --trigger-http
+"${FUNCTIONS_BIN}" deploy genStreamEmpty --trigger-http
+"${FUNCTIONS_BIN}" deploy genStreamResultOnly --trigger-http
+"${FUNCTIONS_BIN}" deploy genStreamLargeData --trigger-http
 
 if [ "$1" != "synchronous" ]; then
   # Wait for the user to tell us to stop the server.

+ 3 - 0
FirebaseFunctions/CHANGELOG.md

@@ -1,3 +1,6 @@
+# Unreleased
+- [added] Streaming callable functions are now supported.
+
 # 11.9.0
 - [fixed] Fixed App Check token reporting to enable differentiating outdated
   (`MISSING`) and inauthentic (`INVALID`) clients; see [Monitor App Check

+ 177 - 1
FirebaseFunctions/Sources/Callable+Codable.swift

@@ -15,7 +15,11 @@
 import FirebaseSharedSwift
 import Foundation
 
-/// A `Callable` is reference to a particular Callable HTTPS trigger in Cloud Functions.
+/// A `Callable` is a reference to a particular Callable HTTPS trigger in Cloud Functions.
+///
+/// - Note: If the Callable HTTPS trigger accepts no parameters, ``Never`` can be used for
+///   iOS 17.0+. Otherwise, a simple encodable placeholder type (e.g.,
+///   `struct EmptyRequest: Encodable {}`) can be used.
 public struct Callable<Request: Encodable, Response: Decodable> {
   /// The timeout to use when calling the function. Defaults to 70 seconds.
   public var timeoutInterval: TimeInterval {
@@ -160,3 +164,175 @@ public struct Callable<Request: Encodable, Response: Decodable> {
     return try await call(data)
   }
 }
+
+/// Used to determine when a `StreamResponse<_, _>` is being decoded.
+private protocol StreamResponseProtocol {}
+
+/// A convenience type used to receive both the streaming callable function's yielded messages and
+/// its return value.
+///
+/// This can be used as the generic `Response` parameter to ``Callable`` to receive both the
+/// yielded messages and final return value of the streaming callable function.
+@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
+public enum StreamResponse<Message: Decodable, Result: Decodable>: Decodable,
+  StreamResponseProtocol {
+  /// The message yielded by the callable function.
+  case message(Message)
+  /// The final result returned by the callable function.
+  case result(Result)
+
+  private enum CodingKeys: String, CodingKey {
+    case message
+    case result
+  }
+
+  public init(from decoder: any Decoder) throws {
+    do {
+      let container = try decoder
+        .container(keyedBy: Self<Message, Result>.CodingKeys.self)
+      guard let onlyKey = container.allKeys.first, container.allKeys.count == 1 else {
+        throw DecodingError
+          .typeMismatch(
+            Self<Message,
+              Result>.self,
+            DecodingError.Context(
+              codingPath: container.codingPath,
+              debugDescription: "Invalid number of keys found, expected one.",
+              underlyingError: nil
+            )
+          )
+      }
+
+      switch onlyKey {
+      case .message:
+        self = try Self
+          .message(container.decode(Message.self, forKey: .message))
+      case .result:
+        self = try Self
+          .result(container.decode(Result.self, forKey: .result))
+      }
+    } catch {
+      throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error])
+    }
+  }
+}
+
+@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
+public extension Callable where Request: Sendable, Response: Sendable {
+  /// Creates a stream that yields responses from the streaming callable function.
+  ///
+  /// The request to the Cloud Functions backend made by this method automatically includes a FCM
+  /// token to identify the app instance. If a user is logged in with Firebase Auth, an auth ID
+  /// token for the user is included. If App Check is integrated, an app check token is included.
+  ///
+  /// Firebase Cloud Messaging sends data to the Firebase backend periodically to collect
+  /// information regarding the app instance. To stop this, see `Messaging.deleteData()`. It
+  /// resumes with a new FCM Token the next time you call this method.
+  ///
+  /// - Important: The final result returned by the callable function is only accessible when
+  ///   using `StreamResponse` as the `Response` generic type.
+  ///
+  /// Example of using `stream` _without_ `StreamResponse`:
+  /// ```swift
+  /// let callable: Callable<MyRequest, MyResponse> = // ...
+  /// let request: MyRequest = // ...
+  /// let stream = try callable.stream(request)
+  /// for try await response in stream {
+  ///   // Process each `MyResponse` message
+  ///   print(response)
+  /// }
+  /// ```
+  ///
+  /// Example of using `stream` _with_ `StreamResponse`:
+  /// ```swift
+  /// let callable: Callable<MyRequest, StreamResponse<MyMessage, MyResult>> = // ...
+  /// let request: MyRequest = // ...
+  /// let stream = try callable.stream(request)
+  /// for try await response in stream {
+  ///   switch response {
+  ///   case .message(let message):
+  ///     // Process each `MyMessage`
+  ///     print(message)
+  ///   case .result(let result):
+  ///     // Process the final `MyResult`
+  ///     print(result)
+  ///   }
+  /// }
+  /// ```
+  ///
+  /// - Parameter data: The `Request` data to pass to the callable function.
+  /// - Throws: A ``FunctionsError`` if the parameter `data` cannot be encoded.
+  /// - Returns: A stream wrapping responses yielded by the streaming callable function or
+  ///   a ``FunctionsError`` if an error occurred.
+  func stream(_ data: Request? = nil) throws -> AsyncThrowingStream<Response, Error> {
+    let encoded: Any
+    do {
+      encoded = try encoder.encode(data)
+    } catch {
+      throw FunctionsError(.invalidArgument, userInfo: [NSUnderlyingErrorKey: error])
+    }
+
+    return AsyncThrowingStream { continuation in
+      Task {
+        do {
+          for try await response in callable.stream(encoded) {
+            do {
+              // This response JSON should only be able to be decoded to an `StreamResponse<_, _>`
+              // instance. If the decoding succeeds and the decoded response conforms to
+              // `StreamResponseProtocol`, we know the `Response` generic argument
+              // is `StreamResponse<_, _>`.
+              let responseJSON = switch response {
+              case .message(let json), .result(let json): json
+              }
+              let response = try decoder.decode(Response.self, from: responseJSON)
+              if response is StreamResponseProtocol {
+                continuation.yield(response)
+              } else {
+                // `Response` is a custom type that matched the decoding logic as the
+                // `StreamResponse<_, _>` type. Only the `StreamResponse<_, _>` type should decode
+                // successfully here to avoid exposing the `result` value in a custom type.
+                throw FunctionsError(.internal)
+              }
+            } catch let error as FunctionsError where error.code == .dataLoss {
+              // `Response` is of type `StreamResponse<_, _>`, but failed to decode. Rethrow.
+              throw error
+            } catch {
+              // `Response` is *not* of type `StreamResponse<_, _>`, and needs to be unboxed and
+              // decoded.
+              guard case let .message(messageJSON) = response else {
+                // Since `Response` is not a `StreamResponse<_, _>`, only messages should be
+                // decoded.
+                continue
+              }
+
+              do {
+                let boxedMessage = try decoder.decode(
+                  StreamResponseMessage.self,
+                  from: messageJSON
+                )
+                continuation.yield(boxedMessage.message)
+              } catch {
+                throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error])
+              }
+            }
+          }
+        } catch {
+          continuation.finish(throwing: error)
+        }
+        continuation.finish()
+      }
+    }
+  }
+
+  /// A container type for the type-safe decoding of the message object from the generic `Response`
+  /// type.
+  private struct StreamResponseMessage: Decodable {
+    let message: Response
+  }
+}
+
+/// A container type for differentiating between message and result responses.
+enum JSONStreamResponse {
+  case message([String: Any])
+  case result([String: Any])
+}

+ 195 - 0
FirebaseFunctions/Sources/Functions.swift

@@ -471,6 +471,201 @@ enum FunctionsConstants {
     }
   }
 
+  @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
+  func stream(at url: URL,
+              data: Any?,
+              options: HTTPSCallableOptions?,
+              timeout: TimeInterval)
+    -> AsyncThrowingStream<JSONStreamResponse, Error> {
+    AsyncThrowingStream { continuation in
+      Task {
+        let urlRequest: URLRequest
+        do {
+          let context = try await contextProvider.context(options: options)
+          urlRequest = try makeRequestForStreamableContent(
+            url: url,
+            data: data,
+            options: options,
+            timeout: timeout,
+            context: context
+          )
+        } catch {
+          continuation.finish(throwing: FunctionsError(
+            .invalidArgument,
+            userInfo: [NSUnderlyingErrorKey: error]
+          ))
+          return
+        }
+
+        let stream: URLSession.AsyncBytes
+        let rawResponse: URLResponse
+        do {
+          (stream, rawResponse) = try await URLSession.shared.bytes(for: urlRequest)
+        } catch {
+          continuation.finish(throwing: FunctionsError(
+            .unavailable,
+            userInfo: [NSUnderlyingErrorKey: error]
+          ))
+          return
+        }
+
+        // Verify the status code is an HTTP response.
+        guard let response = rawResponse as? HTTPURLResponse else {
+          continuation.finish(
+            throwing: FunctionsError(
+              .unavailable,
+              userInfo: [NSLocalizedDescriptionKey: "Response was not an HTTP response."]
+            )
+          )
+          return
+        }
+
+        // Verify the status code is a 200.
+        guard response.statusCode == 200 else {
+          continuation.finish(
+            throwing: FunctionsError(
+              httpStatusCode: response.statusCode,
+              region: region,
+              url: url,
+              body: nil,
+              serializer: serializer
+            )
+          )
+          return
+        }
+
+        do {
+          for try await line in stream.lines {
+            guard line.hasPrefix("data:") else {
+              continuation.finish(
+                throwing: FunctionsError(
+                  .dataLoss,
+                  userInfo: [NSLocalizedDescriptionKey: "Unexpected format for streamed response."]
+                )
+              )
+              return
+            }
+
+            do {
+              // We can assume 5 characters since it's utf-8 encoded, removing `data:`.
+              let jsonText = String(line.dropFirst(5))
+              let data = try jsonData(jsonText: jsonText)
+              // Handle the content and parse it.
+              let content = try callableStreamResult(fromResponseData: data, endpointURL: url)
+              continuation.yield(content)
+            } catch {
+              continuation.finish(throwing: error)
+              return
+            }
+          }
+        } catch {
+          continuation.finish(
+            throwing: FunctionsError(
+              .dataLoss,
+              userInfo: [
+                NSLocalizedDescriptionKey: "Unexpected format for streamed response.",
+                NSUnderlyingErrorKey: error,
+              ]
+            )
+          )
+          return
+        }
+
+        continuation.finish()
+      }
+    }
+  }
+
+  @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
+  private func callableStreamResult(fromResponseData data: Data,
+                                    endpointURL url: URL) throws -> JSONStreamResponse {
+    let data = try processedData(fromResponseData: data, endpointURL: url)
+
+    let responseJSONObject: Any
+    do {
+      responseJSONObject = try JSONSerialization.jsonObject(with: data)
+    } catch {
+      throw FunctionsError(.dataLoss, userInfo: [NSUnderlyingErrorKey: error])
+    }
+
+    guard let responseJSON = responseJSONObject as? [String: Any] else {
+      let userInfo = [NSLocalizedDescriptionKey: "Response was not a dictionary."]
+      throw FunctionsError(.dataLoss, userInfo: userInfo)
+    }
+
+    if let _ = responseJSON["result"] {
+      return .result(responseJSON)
+    } else if let _ = responseJSON["message"] {
+      return .message(responseJSON)
+    } else {
+      throw FunctionsError(
+        .dataLoss,
+        userInfo: [NSLocalizedDescriptionKey: "Response is missing result or message field."]
+      )
+    }
+  }
+
+  private func jsonData(jsonText: String) throws -> Data {
+    guard let data = jsonText.data(using: .utf8) else {
+      throw FunctionsError(.dataLoss, userInfo: [
+        NSUnderlyingErrorKey: DecodingError.dataCorrupted(DecodingError.Context(
+          codingPath: [],
+          debugDescription: "Could not parse response as UTF8."
+        )),
+      ])
+    }
+    return data
+  }
+
+  private func makeRequestForStreamableContent(url: URL,
+                                               data: Any?,
+                                               options: HTTPSCallableOptions?,
+                                               timeout: TimeInterval,
+                                               context: FunctionsContext) throws
+    -> URLRequest {
+    var urlRequest = URLRequest(
+      url: url,
+      cachePolicy: .useProtocolCachePolicy,
+      timeoutInterval: timeout
+    )
+
+    let data = data ?? NSNull()
+    let encoded = try serializer.encode(data)
+    let body = ["data": encoded]
+    let payload = try JSONSerialization.data(withJSONObject: body, options: [.fragmentsAllowed])
+    urlRequest.httpBody = payload
+
+    // Set the headers for starting a streaming session.
+    urlRequest.setValue("application/json", forHTTPHeaderField: "Content-Type")
+    urlRequest.setValue("text/event-stream", forHTTPHeaderField: "Accept")
+    urlRequest.httpMethod = "POST"
+
+    if let authToken = context.authToken {
+      let value = "Bearer \(authToken)"
+      urlRequest.setValue(value, forHTTPHeaderField: "Authorization")
+    }
+
+    if let fcmToken = context.fcmToken {
+      urlRequest.setValue(fcmToken, forHTTPHeaderField: Constants.fcmTokenHeader)
+    }
+
+    if options?.requireLimitedUseAppCheckTokens == true {
+      if let appCheckToken = context.limitedUseAppCheckToken {
+        urlRequest.setValue(
+          appCheckToken,
+          forHTTPHeaderField: Constants.appCheckTokenHeader
+        )
+      }
+    } else if let appCheckToken = context.appCheckToken {
+      urlRequest.setValue(
+        appCheckToken,
+        forHTTPHeaderField: Constants.appCheckTokenHeader
+      )
+    }
+
+    return urlRequest
+  }
+
   private func makeFetcher(url: URL,
                            data: Any?,
                            options: HTTPSCallableOptions?,

+ 5 - 0
FirebaseFunctions/Sources/HTTPSCallable.swift

@@ -143,4 +143,9 @@ open class HTTPSCallable: NSObject {
     try await functions
       .callFunction(at: url, withObject: data, options: options, timeout: timeoutInterval)
   }
+
+  @available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
+  func stream(_ data: Any? = nil) -> AsyncThrowingStream<JSONStreamResponse, Error> {
+    functions.stream(at: url, data: data, options: options, timeout: timeoutInterval)
+  }
 }

+ 422 - 0
FirebaseFunctions/Tests/Integration/IntegrationTests.swift

@@ -65,6 +65,7 @@ struct DataTestResponse: Decodable, Equatable {
   var code: Int32
 }
 
+/// - Important: These tests require the emulator. Run `./FirebaseFunctions/Backend/start.sh`
 class IntegrationTests: XCTestCase {
   let functions = Functions(projectID: "functions-integration-test",
                             region: "us-central1",
@@ -868,6 +869,427 @@ class IntegrationTests: XCTestCase {
   }
 }
 
+// MARK: - Streaming
+
+/// A convenience type used to represent that a callable function does not
+/// accept parameters.
+///
+/// This can be used as the generic `Request` parameter to ``Callable`` to
+/// indicate the callable function does not accept parameters.
+private struct EmptyRequest: Encodable {}
+
+@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
+extension IntegrationTests {
+  func testStream_NoArgs() async throws {
+    // 1. Custom `EmptyRequest` struct is passed as a placeholder generic arg.
+    let callable: Callable<EmptyRequest, String> = functions.httpsCallable("genStream")
+    // 2. No request data is passed when creating stream.
+    let stream = try callable.stream()
+    var streamContents: [String] = []
+    for try await response in stream {
+      streamContents.append(response)
+    }
+    XCTAssertEqual(
+      streamContents,
+      ["hello", "world", "this", "is", "cool"]
+    )
+  }
+
+  @available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *)
+  func testStream_NoArgs_UeeNever() async throws {
+    let callable: Callable<Never, String> = functions.httpsCallable("genStream")
+    let stream = try callable.stream()
+    var streamContents: [String] = []
+    for try await response in stream {
+      streamContents.append(response)
+    }
+    XCTAssertEqual(
+      streamContents,
+      ["hello", "world", "this", "is", "cool"]
+    )
+  }
+
+  func testStream_SimpleStreamResponse() async throws {
+    let callable: Callable<EmptyRequest, StreamResponse<String, String>> = functions
+      .httpsCallable("genStream")
+    let stream = try callable.stream()
+    var streamContents: [String] = []
+    for try await response in stream {
+      switch response {
+      case let .message(message):
+        streamContents.append(message)
+      case let .result(result):
+        streamContents.append(result)
+      }
+    }
+    XCTAssertEqual(
+      streamContents,
+      ["hello", "world", "this", "is", "cool", "hello world this is cool"]
+    )
+  }
+
+  func testStream_CodableString() async throws {
+    let byName: Callable<EmptyRequest, String> = functions.httpsCallable("genStream")
+    let stream = try byName.stream()
+    let result: [String] = try await stream.reduce([]) { $0 + [$1] }
+    XCTAssertEqual(result, ["hello", "world", "this", "is", "cool"])
+  }
+
+  private struct Location: Codable, Equatable {
+    let name: String
+  }
+
+  private struct WeatherForecast: Decodable, Equatable {
+    enum Conditions: String, Decodable {
+      case sunny
+      case rainy
+      case snowy
+    }
+
+    let location: Location
+    let temperature: Int
+    let conditions: Conditions
+  }
+
+  private struct WeatherForecastReport: Decodable, Equatable {
+    let forecasts: [WeatherForecast]
+  }
+
+  func testStream_CodableObject() async throws {
+    let callable: Callable<[Location], WeatherForecast> = functions
+      .httpsCallable("genStreamWeather")
+    let stream = try callable.stream([
+      Location(name: "Toronto"),
+      Location(name: "London"),
+      Location(name: "Dubai"),
+    ])
+    let result: [WeatherForecast] = try await stream.reduce([]) { $0 + [$1] }
+    XCTAssertEqual(
+      result,
+      [
+        WeatherForecast(location: Location(name: "Toronto"), temperature: 25, conditions: .snowy),
+        WeatherForecast(location: Location(name: "London"), temperature: 50, conditions: .rainy),
+        WeatherForecast(location: Location(name: "Dubai"), temperature: 75, conditions: .sunny),
+      ]
+    )
+  }
+
+  func testStream_ResponseMessageDecodingFailure() async throws {
+    let callable: Callable<[Location], StreamResponse<WeatherForecast, WeatherForecastReport>> =
+      functions
+        .httpsCallable("genStreamWeatherError")
+    let stream = try callable.stream([Location(name: "Toronto")])
+    do {
+      for try await _ in stream {
+        XCTFail("Expected error to be thrown from stream.")
+      }
+    } catch let error as FunctionsError where error.code == .dataLoss {
+      XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? DecodingError)
+    }
+  }
+
+  func testStream_ResponseResultDecodingFailure() async throws {
+    let callable: Callable<[Location], StreamResponse<WeatherForecast, String>> = functions
+      .httpsCallable("genStreamWeather")
+    let stream = try callable.stream([Location(name: "Toronto")])
+    do {
+      for try await response in stream {
+        if case .result = response {
+          XCTFail("Expected error to be thrown from stream.")
+        }
+      }
+    } catch let error as FunctionsError where error.code == .dataLoss {
+      XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? DecodingError)
+    }
+  }
+
+  func testStream_ComplexStreamResponse() async throws {
+    let callable: Callable<[Location], StreamResponse<WeatherForecast, WeatherForecastReport>> =
+      functions
+        .httpsCallable("genStreamWeather")
+    let stream = try callable.stream([
+      Location(name: "Toronto"),
+      Location(name: "London"),
+      Location(name: "Dubai"),
+    ])
+    var streamContents: [WeatherForecast] = []
+    var streamResult: WeatherForecastReport?
+    for try await response in stream {
+      switch response {
+      case let .message(message):
+        streamContents.append(message)
+      case let .result(result):
+        streamResult = result
+      }
+    }
+    XCTAssertEqual(
+      streamContents,
+      [
+        WeatherForecast(location: Location(name: "Toronto"), temperature: 25, conditions: .snowy),
+        WeatherForecast(location: Location(name: "London"), temperature: 50, conditions: .rainy),
+        WeatherForecast(location: Location(name: "Dubai"), temperature: 75, conditions: .sunny),
+      ]
+    )
+
+    try XCTAssertEqual(
+      XCTUnwrap(streamResult), WeatherForecastReport(forecasts: streamContents)
+    )
+  }
+
+  func testStream_ComplexStreamResponse_Functional() async throws {
+    let callable: Callable<[Location], StreamResponse<WeatherForecast, WeatherForecastReport>> =
+      functions
+        .httpsCallable("genStreamWeather")
+    let stream = try callable.stream([
+      Location(name: "Toronto"),
+      Location(name: "London"),
+      Location(name: "Dubai"),
+    ])
+    let result: (accumulatedMessages: [WeatherForecast], result: WeatherForecastReport?) =
+      try await stream.reduce(([], nil)) { partialResult, streamResponse in
+        switch streamResponse {
+        case let .message(message):
+          (partialResult.accumulatedMessages + [message], partialResult.result)
+        case let .result(result):
+          (partialResult.accumulatedMessages, result)
+        }
+      }
+    XCTAssertEqual(
+      result.accumulatedMessages,
+      [
+        WeatherForecast(location: Location(name: "Toronto"), temperature: 25, conditions: .snowy),
+        WeatherForecast(location: Location(name: "London"), temperature: 50, conditions: .rainy),
+        WeatherForecast(location: Location(name: "Dubai"), temperature: 75, conditions: .sunny),
+      ]
+    )
+
+    try XCTAssertEqual(
+      XCTUnwrap(result.result), WeatherForecastReport(forecasts: result.accumulatedMessages)
+    )
+  }
+
+  func testStream_Canceled() async throws {
+    let task = Task.detached { [self] in
+      let callable: Callable<EmptyRequest, String> = functions.httpsCallable("genStream")
+      let stream = try callable.stream()
+      // Since we cancel the call we are expecting an empty array.
+      return try await stream.reduce([]) { $0 + [$1] } as [String]
+    }
+    // We cancel the task and we expect a null response even if the stream was initiated.
+    task.cancel()
+    let respone = try await task.value
+    XCTAssertEqual(respone, [])
+  }
+
+  func testStream_NonexistentFunction() async throws {
+    let callable: Callable<EmptyRequest, String> = functions.httpsCallable(
+      "nonexistentFunction"
+    )
+    let stream = try callable.stream()
+    do {
+      for try await _ in stream {
+        XCTFail("Expected error to be thrown from stream.")
+      }
+    } catch let error as FunctionsError where error.code == .notFound {
+      XCTAssertEqual(error.localizedDescription, "NOT FOUND")
+    }
+  }
+
+  func testStream_StreamError() async throws {
+    let callable: Callable<EmptyRequest, String> = functions.httpsCallable("genStreamError")
+    let stream = try callable.stream()
+    do {
+      for try await _ in stream {
+        XCTFail("Expected error to be thrown from stream.")
+      }
+    } catch let error as FunctionsError where error.code == .internal {
+      XCTAssertEqual(error.localizedDescription, "INTERNAL")
+    }
+  }
+
+  func testStream_RequestEncodingFailure() async throws {
+    struct Foo: Encodable {
+      enum CodingKeys: CodingKey {}
+
+      func encode(to encoder: any Encoder) throws {
+        throw EncodingError
+          .invalidValue("", EncodingError.Context(codingPath: [], debugDescription: ""))
+      }
+    }
+    let callable: Callable<Foo, String> = functions
+      .httpsCallable("genStream")
+    do {
+      _ = try callable.stream(Foo())
+    } catch let error as FunctionsError where error.code == .invalidArgument {
+      _ = try XCTUnwrap(error.errorUserInfo[NSUnderlyingErrorKey] as? EncodingError)
+    }
+  }
+
+  /// This tests an edge case to assert that if a custom `Response` is used
+  /// that matches the decoding logic of `StreamResponse`, the custom
+  /// `Response` does not decode successfully.
+  func testStream_ResultIsOnlyExposedInStreamResponse() async throws {
+    // The implementation is copied from `StreamResponse`. The only difference is the do-catch is
+    // removed from the decoding initializer.
+    enum MyStreamResponse<Message: Decodable, Result: Decodable>: Decodable {
+      /// The message yielded by the callable function.
+      case message(Message)
+      /// The final result returned by the callable function.
+      case result(Result)
+
+      private enum CodingKeys: String, CodingKey {
+        case message
+        case result
+      }
+
+      public init(from decoder: any Decoder) throws {
+        let container = try decoder
+          .container(keyedBy: Self<Message, Result>.CodingKeys.self)
+        var allKeys = ArraySlice(container.allKeys)
+        guard let onlyKey = allKeys.popFirst(), allKeys.isEmpty else {
+          throw DecodingError
+            .typeMismatch(
+              Self<Message,
+                Result>.self,
+              DecodingError.Context(
+                codingPath: container.codingPath,
+                debugDescription: "Invalid number of keys found, expected one.",
+                underlyingError: nil
+              )
+            )
+        }
+
+        switch onlyKey {
+        case .message:
+          self = try Self
+            .message(container.decode(Message.self, forKey: .message))
+        case .result:
+          self = try Self
+            .result(container.decode(Result.self, forKey: .result))
+        }
+      }
+    }
+
+    let callable: Callable<[Location], MyStreamResponse<WeatherForecast, WeatherForecastReport>> =
+      functions
+        .httpsCallable("genStreamWeather")
+    let stream = try callable.stream([Location(name: "Toronto")])
+    do {
+      for try await _ in stream {
+        XCTFail("Expected error to be thrown from stream.")
+      }
+    } catch let error as FunctionsError where error.code == .dataLoss {
+      XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? DecodingError)
+    }
+  }
+
+  func testStream_ForNonStreamingCF3() async throws {
+    let callable: Callable<Int16, Int> = functions.httpsCallable("scalarTest")
+    let stream = try callable.stream(17)
+    do {
+      for try await _ in stream {
+        XCTFail("Expected error to be thrown from stream.")
+      }
+    } catch let error as FunctionsError where error.code == .dataLoss {
+      XCTAssertEqual(error.localizedDescription, "Unexpected format for streamed response.")
+    }
+  }
+
+  func testStream_EmptyStream() async throws {
+    let callable: Callable<EmptyRequest, String> = functions.httpsCallable("genStreamEmpty")
+    var streamContents: [String] = []
+    for try await response in try callable.stream() {
+      streamContents.append(response)
+    }
+    XCTAssertEqual(streamContents, [])
+  }
+
+  func testStream_ResultOnly() async throws {
+    let callable: Callable<EmptyRequest, String> = functions.httpsCallable("genStreamResultOnly")
+    let stream = try callable.stream()
+    for try await _ in stream {
+      // The stream should not yield anything, so this should not be reached.
+      XCTFail("Stream should not yield any messages")
+    }
+    // Because StreamResponse was not used, the result is not accessible,
+    // but the message should not throw.
+  }
+
+  func testStream_ResultOnly_StreamResponse() async throws {
+    struct EmptyResponse: Decodable {}
+    let callable: Callable<EmptyRequest, StreamResponse<EmptyResponse, String>> = functions
+      .httpsCallable(
+        "genStreamResultOnly"
+      )
+    let stream = try callable.stream()
+    var streamResult = ""
+    for try await response in stream {
+      switch response {
+      case .message:
+        XCTFail("Stream should not yield any messages")
+      case let .result(result):
+        streamResult = result
+      }
+    }
+    // The hardcoded string matches the CF3's return value.
+    XCTAssertEqual(streamResult, "Only a result")
+  }
+
+  func testStream_UnexpectedType() async throws {
+    // This function yields strings, not integers.
+    let callable: Callable<EmptyRequest, Int> = functions.httpsCallable("genStream")
+    let stream = try callable.stream()
+    do {
+      for try await _ in stream {
+        XCTFail("Expected error to be thrown from stream.")
+      }
+    } catch let error as FunctionsError where error.code == .dataLoss {
+      XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? DecodingError)
+    }
+  }
+
+  func testStream_Timeout() async throws {
+    var callable: Callable<EmptyRequest, String> = functions.httpsCallable("timeoutTest")
+    // Set a short timeout
+    callable.timeoutInterval = 0.01 // 10 milliseconds
+
+    let stream = try callable.stream()
+
+    do {
+      for try await _ in stream {
+        XCTFail("Expected error to be thrown from stream.")
+      }
+    } catch let error as FunctionsError where error.code == .unavailable {
+      // This should be a timeout error.
+      XCTAssertEqual(
+        error.localizedDescription,
+        "The operation couldn’t be completed. (com.firebase.functions error 14.)"
+      )
+      XCTAssertNotNil(error.errorUserInfo[NSUnderlyingErrorKey] as? URLError)
+    }
+  }
+
+  func testStream_LargeData() async throws {
+    func generateLargeString() -> String {
+      var largeString = ""
+      for _ in 0 ..< 10000 {
+        largeString += "A"
+      }
+      return largeString
+    }
+    let callable: Callable<EmptyRequest, String> = functions.httpsCallable("genStreamLargeData")
+    let stream = try callable.stream()
+    var concatenatedData = ""
+    for try await response in stream {
+      concatenatedData += response
+    }
+    // Assert that the concatenated data matches the expected large data.
+    XCTAssertEqual(concatenatedData, generateLargeString())
+  }
+}
+
+// MARK: - Helpers
+
 private class AuthTokenProvider: AuthInterop {
   func getUserID() -> String? {
     return "fake user"