BroadcastServerSocketConnection.swift 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. * Copyright 2024 LiveKit
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. import Darwin
  17. import Foundation
  18. #if canImport(CHeaders)
  19. import CHeaders
  20. #endif
  21. class BroadcastServerSocketConnection: NSObject {
  22. private let streamDelegate: StreamDelegate
  23. private let filePath: String
  24. private var socketHandle: Int32 = -1
  25. private var address: sockaddr_un?
  26. private var inputStream: InputStream?
  27. private var outputStream: OutputStream?
  28. private var listeningSource: DispatchSourceRead?
  29. private var networkQueue: DispatchQueue?
  30. private var shouldKeepRunning = false
  31. init?(filePath path: String, streamDelegate: StreamDelegate) {
  32. self.streamDelegate = streamDelegate
  33. filePath = path
  34. socketHandle = socket(AF_UNIX, SOCK_STREAM, 0)
  35. guard socketHandle >= 0 else {
  36. logger.log(level: .debug, "failure: create socket")
  37. return nil
  38. }
  39. }
  40. func open() -> Bool {
  41. logger.log(level: .debug, "open socket connection")
  42. guard setupAddress() == true else {
  43. logger.log(level: .debug, "failed setting up address")
  44. return false
  45. }
  46. guard bindSocket() == true else {
  47. return false
  48. }
  49. guard FileManager.default.fileExists(atPath: filePath) else {
  50. logger.log(level: .debug, "failure: socket file missing")
  51. return false
  52. }
  53. guard Darwin.listen(socketHandle, 10) >= 0 else {
  54. logger.log(level: .debug, "failure: socket failed listening connection")
  55. return false
  56. }
  57. let listeningSource = DispatchSource.makeReadSource(fileDescriptor: socketHandle)
  58. listeningSource.setEventHandler {
  59. let clientSocket = Darwin.accept(self.socketHandle, nil, nil)
  60. guard clientSocket >= 0 else {
  61. logger.log(level: .debug, "failure: socket failed accepting connection")
  62. return
  63. }
  64. self.setupStreams(clientSocket: clientSocket)
  65. self.inputStream?.open()
  66. self.outputStream?.open()
  67. logger.log(level: .debug, "streams open")
  68. }
  69. self.listeningSource = listeningSource
  70. listeningSource.resume()
  71. return true
  72. }
  73. func close() {
  74. unscheduleStreams()
  75. inputStream?.delegate = nil
  76. outputStream?.delegate = nil
  77. inputStream?.close()
  78. outputStream?.close()
  79. inputStream = nil
  80. outputStream = nil
  81. logger.log(level: .debug, "closing server socket")
  82. listeningSource?.cancel()
  83. Darwin.close(socketHandle)
  84. }
  85. func writeToStream(buffer: UnsafePointer<UInt8>, maxLength length: Int) -> Int {
  86. outputStream?.write(buffer, maxLength: length) ?? 0
  87. }
  88. private func setupAddress() -> Bool {
  89. var addr = sockaddr_un()
  90. addr.sun_family = sa_family_t(AF_UNIX)
  91. guard filePath.count < MemoryLayout.size(ofValue: addr.sun_path) else {
  92. logger.log(level: .debug, "failure: fd path is too long")
  93. return false
  94. }
  95. _ = filePath.withCString {
  96. unlink($0)
  97. }
  98. _ = withUnsafeMutablePointer(to: &addr.sun_path.0) { ptr in
  99. filePath.withCString {
  100. strncpy(ptr, $0, filePath.count)
  101. }
  102. }
  103. address = addr
  104. return true
  105. }
  106. private func bindSocket() -> Bool {
  107. guard var addr = address else {
  108. logger.log(level: .debug, "failure: no address?")
  109. return false
  110. }
  111. let status = withUnsafePointer(to: &addr) { ptr in
  112. ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) {
  113. Darwin.bind(socketHandle, $0, socklen_t(MemoryLayout<sockaddr_un>.size))
  114. }
  115. }
  116. guard status == noErr else {
  117. logger.log(level: .debug, "failure: \(status)")
  118. return false
  119. }
  120. return true
  121. }
  122. private func setupStreams(clientSocket: Int32) {
  123. var readStream: Unmanaged<CFReadStream>?
  124. var writeStream: Unmanaged<CFWriteStream>?
  125. CFStreamCreatePairWithSocket(kCFAllocatorDefault, clientSocket, &readStream, &writeStream)
  126. inputStream = readStream?.takeRetainedValue()
  127. inputStream?.delegate = streamDelegate
  128. inputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String))
  129. outputStream = writeStream?.takeRetainedValue()
  130. outputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String))
  131. scheduleStreams()
  132. }
  133. private func scheduleStreams() {
  134. shouldKeepRunning = true
  135. networkQueue = DispatchQueue.global(qos: .userInitiated)
  136. networkQueue?.async { [weak self] in
  137. self?.inputStream?.schedule(in: .current, forMode: .default)
  138. self?.outputStream?.schedule(in: .current, forMode: .default)
  139. logger.log(level: .debug, "streams scheduled")
  140. var isRunning = false
  141. repeat {
  142. isRunning = self?.shouldKeepRunning ?? false && RunLoop.current.run(mode: .default, before: .distantFuture)
  143. } while isRunning
  144. logger.log(level: .debug, "streams stopped")
  145. }
  146. }
  147. private func unscheduleStreams() {
  148. logger.log(level: .debug, "unscheduleStreams")
  149. networkQueue?.sync { [weak self] in
  150. self?.inputStream?.remove(from: .current, forMode: .common)
  151. self?.outputStream?.remove(from: .current, forMode: .common)
  152. }
  153. shouldKeepRunning = false
  154. }
  155. }