| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- /*
- * Copyright 2017 Google
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #import "Firebase/Messaging/FIRMessagingSecureSocket.h"
- #import <Protobuf/GPBCodedOutputStream.h>
- #import <Protobuf/GPBMessage.h>
- #import <Protobuf/GPBUtilities.h>
- #import "Firebase/Messaging/FIRMessagingCodedInputStream.h"
- #import "Firebase/Messaging/FIRMessagingDefines.h"
- #import "Firebase/Messaging/FIRMessagingLogger.h"
- #import "Firebase/Messaging/FIRMessagingPacketQueue.h"
- static const NSUInteger kMaxBufferLength = 1024 * 1024; // 1M
- static const NSUInteger kBufferLengthIncrement = 16 * 1024; // 16k
- static const uint8_t kVersion = 40;
- static const uint8_t kInvalidTag = -1;
- typedef NS_ENUM(NSUInteger, FIRMessagingSecureSocketReadResult) {
- kFIRMessagingSecureSocketReadResultNone,
- kFIRMessagingSecureSocketReadResultIncomplete,
- kFIRMessagingSecureSocketReadResultCorrupt,
- kFIRMessagingSecureSocketReadResultSuccess
- };
- static int32_t LogicalRightShift32(int32_t value, int32_t spaces) {
- return (int32_t)((uint32_t)(value) >> spaces);
- }
- static NSUInteger SerializedSize(int32_t value) {
- NSUInteger bytes = 0;
- while (YES) {
- if ((value & ~0x7F) == 0) {
- bytes += sizeof(uint8_t);
- return bytes;
- } else {
- bytes += sizeof(uint8_t);
- value = LogicalRightShift32(value, 7);
- }
- }
- }
- @interface FIRMessagingSecureSocket () <NSStreamDelegate>
- @property(nonatomic, readwrite, assign) FIRMessagingSecureSocketState state;
- @property(nonatomic, readwrite, strong) NSInputStream *inStream;
- @property(nonatomic, readwrite, strong) NSOutputStream *outStream;
- @property(nonatomic, readwrite, strong) NSMutableData *inputBuffer;
- @property(nonatomic, readwrite, assign) NSUInteger inputBufferLength;
- @property(nonatomic, readwrite, strong) NSMutableData *outputBuffer;
- @property(nonatomic, readwrite, assign) NSUInteger outputBufferLength;
- @property(nonatomic, readwrite, strong) FIRMessagingPacketQueue *packetQueue;
- @property(nonatomic, readwrite, assign) BOOL isVersionSent;
- @property(nonatomic, readwrite, assign) BOOL isVersionReceived;
- @property(nonatomic, readwrite, assign) BOOL isInStreamOpen;
- @property(nonatomic, readwrite, assign) BOOL isOutStreamOpen;
- @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
- @property(nonatomic, readwrite, strong) NSString *currentRmqIdBeingSent;
- @property(nonatomic, readwrite, assign) int8_t currentProtoTypeBeingSent;
- @end
- @implementation FIRMessagingSecureSocket
- - (instancetype)init {
- self = [super init];
- if (self) {
- _state = kFIRMessagingSecureSocketNotOpen;
- _inputBuffer = [NSMutableData dataWithLength:kBufferLengthIncrement];
- _packetQueue = [[FIRMessagingPacketQueue alloc] init];
- _currentProtoTypeBeingSent = kInvalidTag;
- }
- return self;
- }
- - (void)dealloc {
- [self disconnect];
- }
- - (void)connectToHost:(NSString *)host port:(NSUInteger)port onRunLoop:(NSRunLoop *)runLoop {
- if (!host || self.state != kFIRMessagingSecureSocketNotOpen) {
- return;
- }
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket000,
- @"Opening secure socket to FIRMessaging service");
- self.state = kFIRMessagingSecureSocketOpening;
- self.runLoop = runLoop;
- CFReadStreamRef inputStreamRef;
- CFWriteStreamRef outputStreamRef;
- CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)host, (int)port, &inputStreamRef,
- &outputStreamRef);
- self.inStream = CFBridgingRelease(inputStreamRef);
- self.outStream = CFBridgingRelease(outputStreamRef);
- if (!self.inStream || !self.outStream) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket001,
- @"Failed to initialize socket.");
- return;
- }
- self.isInStreamOpen = NO;
- self.isOutStreamOpen = NO;
- BOOL isVOIPSocket = NO;
- [self openStream:self.outStream isVOIPStream:isVOIPSocket];
- [self openStream:self.inStream isVOIPStream:isVOIPSocket];
- }
- - (void)disconnect {
- if (self.state == kFIRMessagingSecureSocketClosing) {
- return;
- }
- if (!self.inStream && !self.outStream) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket002,
- @"The socket is not open or already closed.");
- return;
- }
- self.state = kFIRMessagingSecureSocketClosing;
- if (self.inStream) {
- [self closeStream:self.inStream];
- self.inStream = nil;
- }
- if (self.outStream) {
- [self closeStream:self.outStream];
- self.outStream = nil;
- }
- self.state = kFIRMessagingSecureSocketClosed;
- [self.delegate didDisconnectWithSecureSocket:self];
- }
- - (void)sendData:(NSData *)data withTag:(int8_t)tag rmqId:(NSString *)rmqId {
- [self.packetQueue push:[FIRMessagingPacket packetWithTag:tag rmqId:rmqId data:data]];
- if ([self.outStream hasSpaceAvailable]) {
- [self performWrite];
- }
- }
- #pragma mark - NSStreamDelegate
- - (void)stream:(NSStream *)stream handleEvent:(NSStreamEvent)eventCode {
- switch (eventCode) {
- case NSStreamEventHasBytesAvailable:
- if (self.state != kFIRMessagingSecureSocketOpen) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket003,
- @"Try to read from socket that is not opened");
- return;
- }
- if (![self performRead]) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket004,
- @"Error occurred when reading incoming stream");
- [self disconnect];
- }
- break;
- case NSStreamEventEndEncountered:
- FIRMessagingLoggerDebug(
- kFIRMessagingMessageCodeSecureSocket005, @"%@ end encountered",
- stream == self.inStream
- ? @"Input stream"
- : (stream == self.outStream ? @"Output stream" : @"Unknown stream"));
- [self disconnect];
- break;
- case NSStreamEventOpenCompleted:
- if (stream == self.inStream) {
- self.isInStreamOpen = YES;
- } else if (stream == self.outStream) {
- self.isOutStreamOpen = YES;
- }
- if (self.isInStreamOpen && self.isOutStreamOpen) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket006,
- @"Secure socket to FIRMessaging service opened");
- self.state = kFIRMessagingSecureSocketOpen;
- [self.delegate secureSocketDidConnect:self];
- }
- break;
- case NSStreamEventErrorOccurred: {
- FIRMessagingLoggerDebug(
- kFIRMessagingMessageCodeSecureSocket007, @"%@ error occurred",
- stream == self.inStream
- ? @"Input stream"
- : (stream == self.outStream ? @"Output stream" : @"Unknown stream"));
- [self disconnect];
- break;
- }
- case NSStreamEventHasSpaceAvailable:
- if (self.state != kFIRMessagingSecureSocketOpen) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket008,
- @"Try to write to socket that is not opened");
- return;
- }
- [self performWrite];
- break;
- default:
- break;
- }
- }
- #pragma mark - Private
- - (void)openStream:(NSStream *)stream isVOIPStream:(BOOL)isVOIPStream {
- if (stream) {
- if ([stream streamStatus] != NSStreamStatusNotOpen) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket009,
- @"stream should not be open.");
- return;
- }
- [stream setProperty:NSStreamSocketSecurityLevelNegotiatedSSL
- forKey:NSStreamSocketSecurityLevelKey];
- if (isVOIPStream) {
- [stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
- }
- stream.delegate = self;
- [stream scheduleInRunLoop:self.runLoop forMode:NSDefaultRunLoopMode];
- [stream open];
- }
- }
- - (void)closeStream:(NSStream *)stream {
- if (stream) {
- [stream close];
- [stream removeFromRunLoop:self.runLoop forMode:NSDefaultRunLoopMode];
- stream.delegate = nil;
- }
- }
- - (BOOL)performRead {
- if (!self.isVersionReceived) {
- self.isVersionReceived = YES;
- uint8_t versionByte = 0;
- NSInteger bytesRead = [self.inStream read:&versionByte maxLength:sizeof(uint8_t)];
- if (bytesRead != sizeof(uint8_t) || kVersion != versionByte) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket010,
- @"Version do not match. Received %d, Expecting %d", versionByte,
- kVersion);
- return NO;
- }
- }
- while (YES) {
- BOOL isInputBufferValid = [self.inputBuffer length] > 0;
- if (!isInputBufferValid) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket011,
- @"Input buffer is not valid.");
- return NO;
- }
- if (![self.inStream hasBytesAvailable]) {
- break;
- }
- // try to read more data
- uint8_t *unusedBufferPtr = (uint8_t *)self.inputBuffer.mutableBytes + self.inputBufferLength;
- NSUInteger unusedBufferLength = [self.inputBuffer length] - self.inputBufferLength;
- NSInteger bytesRead = [self.inStream read:unusedBufferPtr maxLength:unusedBufferLength];
- if (bytesRead <= 0) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket012,
- @"Failed to read input stream. Bytes read %ld, Used buffer size %lu, "
- @"Unused buffer size %lu",
- _FIRMessaging_UL(bytesRead), _FIRMessaging_UL(self.inputBufferLength),
- _FIRMessaging_UL(unusedBufferLength));
- break;
- }
- // did successfully read some more data
- self.inputBufferLength += (NSUInteger)bytesRead;
- if ([self.inputBuffer length] <= self.inputBufferLength) {
- // shouldn't be reading more than 1MB of data in one go
- if ([self.inputBuffer length] + kBufferLengthIncrement > kMaxBufferLength) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket013,
- @"Input buffer exceed 1M, disconnect socket");
- return NO;
- }
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket014,
- @"Input buffer limit exceeded. Used input buffer size %lu, "
- @"Total input buffer size %lu. No unused buffer left. "
- @"Increase buffer size.",
- _FIRMessaging_UL(self.inputBufferLength),
- _FIRMessaging_UL([self.inputBuffer length]));
- [self.inputBuffer increaseLengthBy:kBufferLengthIncrement];
- }
- while (self.inputBufferLength > 0 && [self.inputBuffer length] > 0) {
- NSRange inputRange = NSMakeRange(0, self.inputBufferLength);
- size_t protoBytes = 0;
- // read the actual proto data coming in
- FIRMessagingSecureSocketReadResult readResult =
- [self processCurrentInputBuffer:[self.inputBuffer subdataWithRange:inputRange]
- outOffset:&protoBytes];
- // Corrupt data encountered, stop processing.
- if (readResult == kFIRMessagingSecureSocketReadResultCorrupt) {
- return NO;
- // Incomplete data, keep trying to read by loading more from the stream.
- } else if (readResult == kFIRMessagingSecureSocketReadResultIncomplete) {
- break;
- }
- // we have read (0, protoBytes) of data in the inputBuffer
- if (protoBytes == self.inputBufferLength) {
- // did completely read the buffer data can be reset for further processing
- self.inputBufferLength = 0;
- } else {
- // delete processed bytes while maintaining the buffer size.
- NSUInteger prevLength __unused = [self.inputBuffer length];
- // delete the processed bytes
- [self.inputBuffer replaceBytesInRange:NSMakeRange(0, protoBytes) withBytes:NULL length:0];
- // reallocate more data
- [self.inputBuffer increaseLengthBy:protoBytes];
- self.inputBufferLength -= protoBytes;
- }
- }
- }
- return YES;
- }
- - (FIRMessagingSecureSocketReadResult)processCurrentInputBuffer:(NSData *)readData
- outOffset:(size_t *)outOffset {
- *outOffset = 0;
- FIRMessagingCodedInputStream *input =
- [[FIRMessagingCodedInputStream alloc] initWithData:readData];
- int8_t rawTag;
- if (![input readTag:&rawTag]) {
- return kFIRMessagingSecureSocketReadResultIncomplete;
- }
- int32_t length;
- if (![input readLength:&length]) {
- return kFIRMessagingSecureSocketReadResultIncomplete;
- }
- // NOTE tag can be zero for |HeartbeatPing|, and length can be zero for |Close| proto
- if (rawTag < 0 || length < 0) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket015, @"Buffer data corrupted.");
- return kFIRMessagingSecureSocketReadResultCorrupt;
- }
- NSData *data = [input readDataWithLength:(uint32_t)length];
- if (data == nil) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeSecureSocket016,
- @"Incomplete data, buffered data length %ld, expected length %d",
- _FIRMessaging_UL(self.inputBufferLength), length);
- return kFIRMessagingSecureSocketReadResultIncomplete;
- }
- [self.delegate secureSocket:self didReceiveData:data withTag:rawTag];
- *outOffset = input.offset;
- return kFIRMessagingSecureSocketReadResultSuccess;
- }
- - (void)performWrite {
- if (!self.isVersionSent) {
- self.isVersionSent = YES;
- uint8_t versionByte = kVersion;
- [self.outStream write:&versionByte maxLength:sizeof(uint8_t)];
- }
- while (!self.packetQueue.isEmpty && self.outStream.hasSpaceAvailable) {
- if (self.outputBuffer.length == 0) {
- // serialize new packets only when the output buffer is flushed.
- FIRMessagingPacket *packet = [self.packetQueue pop];
- self.currentRmqIdBeingSent = packet.rmqId;
- self.currentProtoTypeBeingSent = packet.tag;
- NSUInteger length =
- SerializedSize(packet.tag) + SerializedSize((int)packet.data.length) + packet.data.length;
- self.outputBuffer = [NSMutableData dataWithLength:length];
- GPBCodedOutputStream *output = [GPBCodedOutputStream streamWithData:self.outputBuffer];
- [output writeRawVarint32:packet.tag];
- [output writeBytesNoTag:packet.data];
- self.outputBufferLength = 0;
- }
- // flush the output buffer.
- NSInteger written = [self.outStream write:self.outputBuffer.bytes + self.outputBufferLength
- maxLength:self.outputBuffer.length - self.outputBufferLength];
- if (written <= 0) {
- continue;
- }
- self.outputBufferLength += (NSUInteger)written;
- if (self.outputBufferLength >= self.outputBuffer.length) {
- self.outputBufferLength = 0;
- self.outputBuffer = nil;
- [self.delegate secureSocket:self
- didSendProtoWithTag:self.currentProtoTypeBeingSent
- rmqId:self.currentRmqIdBeingSent];
- self.currentRmqIdBeingSent = nil;
- self.currentProtoTypeBeingSent = kInvalidTag;
- }
- }
- }
- @end
|