AsyncMessageSequence.swift 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. //
  2. // Sources/SwiftProtobuf/AsyncMessageSequence.swift - Async sequence over binary delimited protobuf
  3. //
  4. // Copyright (c) 2023 Apple Inc. and the project authors
  5. // Licensed under Apache License v2.0 with Runtime Library Exception
  6. //
  7. // See LICENSE.txt for license information:
  8. // https://github.com/apple/swift-protobuf/blob/main/LICENSE.txt
  9. //
  10. // -----------------------------------------------------------------------------
  11. ///
  12. /// An async sequence of messages decoded from a binary delimited protobuf stream.
  13. ///
  14. // -----------------------------------------------------------------------------
  15. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
  16. extension AsyncSequence where Element == UInt8 {
  17. /// Creates an asynchronous sequence of size-delimited messages from this sequence of bytes.
  18. /// Delimited format allows a single file or stream to contain multiple messages. A delimited message
  19. /// is a varint encoding the message size followed by a message of exactly that size.
  20. ///
  21. /// - Parameters:
  22. /// - messageType: The type of message to read.
  23. /// - extensions: An ``ExtensionMap`` used to look up and decode any extensions in
  24. /// messages encoded by this sequence, or in messages nested within these messages.
  25. /// - partial: If `false` (the default), after decoding a message, ``Message/isInitialized-6abgi`
  26. /// will be checked to ensure all fields are present.
  27. /// - options: The ``BinaryDecodingOptions`` to use.
  28. /// - Returns: An asynchronous sequence of messages read from the `AsyncSequence` of bytes.
  29. @inlinable
  30. public func binaryProtobufDelimitedMessages<M: Message>(
  31. of messageType: M.Type = M.self,
  32. extensions: (any ExtensionMap)? = nil,
  33. partial: Bool = false,
  34. options: BinaryDecodingOptions = BinaryDecodingOptions()
  35. ) -> AsyncMessageSequence<Self, M> {
  36. AsyncMessageSequence<Self, M>(
  37. base: self,
  38. extensions: extensions,
  39. partial: partial,
  40. options: options
  41. )
  42. }
  43. }
  44. /// An asynchronous sequence of messages decoded from an asynchronous sequence of bytes.
  45. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
  46. public struct AsyncMessageSequence<
  47. Base: AsyncSequence,
  48. M: Message
  49. >: AsyncSequence where Base.Element == UInt8 {
  50. /// The message type in this asynchronous sequence.
  51. public typealias Element = M
  52. private let base: Base
  53. private let extensions: (any ExtensionMap)?
  54. private let partial: Bool
  55. private let options: BinaryDecodingOptions
  56. /// Reads size-delimited messages from the given sequence of bytes. Delimited
  57. /// format allows a single file or stream to contain multiple messages. A delimited message
  58. /// is a varint encoding the message size followed by a message of exactly that size.
  59. ///
  60. /// - Parameters:
  61. /// - baseSequence: The `AsyncSequence` to read messages from.
  62. /// - extensions: An ``ExtensionMap`` used to look up and decode any extensions in
  63. /// messages encoded by this sequence, or in messages nested within these messages.
  64. /// - partial: If `false` (the default), after decoding a message, ``Message/isInitialized-6abgi``
  65. /// will be checked to ensure all fields are present.
  66. /// - options: The ``BinaryDecodingOptions`` to use.
  67. /// - Returns: An asynchronous sequence of messages read from the `AsyncSequence` of bytes.
  68. public init(
  69. base: Base,
  70. extensions: (any ExtensionMap)? = nil,
  71. partial: Bool = false,
  72. options: BinaryDecodingOptions = BinaryDecodingOptions()
  73. ) {
  74. self.base = base
  75. self.extensions = extensions
  76. self.partial = partial
  77. self.options = options
  78. }
  79. /// An asynchronous iterator that produces the messages of this asynchronous sequence.
  80. public struct AsyncIterator: AsyncIteratorProtocol {
  81. @usableFromInline
  82. var iterator: Base.AsyncIterator?
  83. @usableFromInline
  84. let extensions: (any ExtensionMap)?
  85. @usableFromInline
  86. let partial: Bool
  87. @usableFromInline
  88. let options: BinaryDecodingOptions
  89. init(
  90. iterator: Base.AsyncIterator,
  91. extensions: (any ExtensionMap)?,
  92. partial: Bool,
  93. options: BinaryDecodingOptions
  94. ) {
  95. self.iterator = iterator
  96. self.extensions = extensions
  97. self.partial = partial
  98. self.options = options
  99. }
  100. /// Asynchronously reads the next varint.
  101. @inlinable
  102. mutating func nextVarInt() async throws -> UInt64? {
  103. var messageSize: UInt64 = 0
  104. var shift: UInt64 = 0
  105. while let byte = try await iterator?.next() {
  106. messageSize |= UInt64(byte & 0x7f) << shift
  107. shift += UInt64(7)
  108. if shift > 35 {
  109. iterator = nil
  110. throw SwiftProtobufError.BinaryStreamDecoding.malformedLength()
  111. }
  112. if byte & 0x80 == 0 {
  113. return messageSize
  114. }
  115. }
  116. if shift > 0 {
  117. // The stream has ended inside a varint.
  118. iterator = nil
  119. throw BinaryDelimited.Error.truncated
  120. }
  121. return nil // End of stream reached.
  122. }
  123. /// Helper to read the given number of bytes.
  124. @usableFromInline
  125. mutating func readBytes(_ size: Int) async throws -> [UInt8] {
  126. // Even though the bytes are read in chunks, things can still hard fail if
  127. // there isn't enough memory to append to have all the bytes at once for
  128. // parsing; but this at least catches some possible OOM attacks.
  129. var bytesNeeded = size
  130. var buffer = [UInt8]()
  131. let kChunkSize = 16 * 1024 * 1024
  132. var chunk = [UInt8](repeating: 0, count: Swift.min(bytesNeeded, kChunkSize))
  133. while bytesNeeded > 0 {
  134. var consumedBytes = 0
  135. let maxLength = Swift.min(bytesNeeded, chunk.count)
  136. while consumedBytes < maxLength {
  137. guard let byte = try await iterator?.next() else {
  138. // The iterator hit the end, but the chunk wasn't filled, so the full
  139. // payload wasn't read.
  140. throw BinaryDelimited.Error.truncated
  141. }
  142. chunk[consumedBytes] = byte
  143. consumedBytes += 1
  144. }
  145. if consumedBytes < chunk.count {
  146. buffer += chunk[0..<consumedBytes]
  147. } else {
  148. buffer += chunk
  149. }
  150. bytesNeeded -= maxLength
  151. }
  152. return buffer
  153. }
  154. /// Asynchronously advances to the next message and returns it, or ends the
  155. /// sequence if there is no next message.
  156. ///
  157. /// - Returns: The next message, if it exists, or `nil` to signal the end of
  158. /// the sequence.
  159. @inlinable
  160. public mutating func next() async throws -> M? {
  161. guard let messageSize = try await nextVarInt() else {
  162. iterator = nil
  163. return nil
  164. }
  165. guard messageSize <= UInt64(0x7fff_ffff) else {
  166. iterator = nil
  167. throw SwiftProtobufError.BinaryDecoding.tooLarge()
  168. }
  169. if messageSize == 0 {
  170. return try M(
  171. serializedBytes: [],
  172. extensions: extensions,
  173. partial: partial,
  174. options: options
  175. )
  176. }
  177. let buffer = try await readBytes(Int(messageSize))
  178. return try M(
  179. serializedBytes: buffer,
  180. extensions: extensions,
  181. partial: partial,
  182. options: options
  183. )
  184. }
  185. }
  186. /// Creates the asynchronous iterator that produces elements of this
  187. /// asynchronous sequence.
  188. ///
  189. /// - Returns: An instance of the `AsyncIterator` type used to produce
  190. /// messages in the asynchronous sequence.
  191. public func makeAsyncIterator() -> AsyncMessageSequence.AsyncIterator {
  192. AsyncIterator(
  193. iterator: base.makeAsyncIterator(),
  194. extensions: extensions,
  195. partial: partial,
  196. options: options
  197. )
  198. }
  199. }
  200. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
  201. extension AsyncMessageSequence: Sendable where Base: Sendable {}
  202. @available(*, unavailable)
  203. extension AsyncMessageSequence.AsyncIterator: Sendable {}