| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683 |
- /*
- * Copyright 2020 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #import "FirebaseMessaging/Sources/FIRMessagingConnection.h"
- #import <FirebaseMessaging/FIRMessaging.h>
- #import "FirebaseMessaging/Sources/Protos/GtalkCore.pbobjc.h"
- #import "FirebaseMessaging/Sources/Protos/GtalkExtensions.pbobjc.h"
- #import "FirebaseMessaging/Sources/FIRMessagingDataMessageManager.h"
- #import "FirebaseMessaging/Sources/FIRMessagingDefines.h"
- #import "FirebaseMessaging/Sources/FIRMessagingLogger.h"
- #import "FirebaseMessaging/Sources/FIRMessagingRmqManager.h"
- #import "FirebaseMessaging/Sources/FIRMessagingSecureSocket.h"
- #import "FirebaseMessaging/Sources/FIRMessagingUtilities.h"
- #import "FirebaseMessaging/Sources/FIRMessagingVersionUtilities.h"
- #import "FirebaseMessaging/Sources/FIRMessaging_Private.h"
- static NSInteger const kIqSelectiveAck = 12;
- static NSInteger const kIqStreamAck = 13;
- static int const kInvalidStreamId = -1;
- static NSTimeInterval const kHeartbeatInterval = 30.0;
- static NSTimeInterval const kConnectionTimeout = 20.0;
- static int32_t const kAckingInterval = 10;
- static NSString *const kUnackedS2dIdKey = @"FIRMessagingUnackedS2dIdKey";
- static NSString *const kAckedS2dIdMapKey = @"FIRMessagingAckedS2dIdMapKey";
- static NSString *const kRemoteFromAddress = @"from";
- @interface FIRMessagingD2SInfo : NSObject
- @property(nonatomic, readwrite, assign) int streamId;
- @property(nonatomic, readwrite, strong) NSString *d2sID;
- - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID;
- @end
- @implementation FIRMessagingD2SInfo
- - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID {
- self = [super init];
- if (self) {
- _streamId = streamId;
- _d2sID = [d2sID copy];
- }
- return self;
- }
- - (BOOL)isEqual:(id)object {
- if ([object isKindOfClass:[self class]]) {
- FIRMessagingD2SInfo *other = (FIRMessagingD2SInfo *)object;
- return self.streamId == other.streamId && [self.d2sID isEqualToString:other.d2sID];
- }
- return NO;
- }
- - (NSUInteger)hash {
- return [self.d2sID hash];
- }
- @end
- @interface FIRMessagingConnection () <FIRMessagingSecureSocketDelegate>
- @property(nonatomic, readwrite, weak) FIRMessagingRmqManager *rmq2Manager;
- @property(nonatomic, readwrite, weak) FIRMessagingDataMessageManager *dataMessageManager;
- @property(nonatomic, readwrite, assign) FIRMessagingConnectionState state;
- @property(nonatomic, readwrite, copy) NSString *host;
- @property(nonatomic, readwrite, assign) NSUInteger port;
- @property(nonatomic, readwrite, strong) NSString *authId;
- @property(nonatomic, readwrite, strong) NSString *token;
- @property(nonatomic, readwrite, strong) FIRMessagingSecureSocket *socket;
- @property(nonatomic, readwrite, assign) int64_t lastLoginServerTimestamp;
- @property(nonatomic, readwrite, assign) int lastStreamIdAcked;
- @property(nonatomic, readwrite, assign) int inStreamId;
- @property(nonatomic, readwrite, assign) int outStreamId;
- @property(nonatomic, readwrite, strong) NSMutableArray *unackedS2dIds;
- @property(nonatomic, readwrite, strong) NSMutableDictionary *ackedS2dMap;
- @property(nonatomic, readwrite, strong) NSMutableArray *d2sInfos;
- // ttl=0 messages that need to be sent as soon as we establish a connection
- @property(nonatomic, readwrite, strong) NSMutableArray *sendOnConnectMessages;
- @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
- @end
- @implementation FIRMessagingConnection
- ;
- - (instancetype)initWithAuthID:(NSString *)authId
- token:(NSString *)token
- host:(NSString *)host
- port:(NSUInteger)port
- runLoop:(NSRunLoop *)runLoop
- rmq2Manager:(FIRMessagingRmqManager *)rmq2Manager
- fcmManager:(FIRMessagingDataMessageManager *)dataMessageManager {
- self = [super init];
- if (self) {
- _authId = [authId copy];
- _token = [token copy];
- _host = [host copy];
- _port = port;
- _runLoop = runLoop;
- _rmq2Manager = rmq2Manager;
- _dataMessageManager = dataMessageManager;
- _d2sInfos = [NSMutableArray array];
- _unackedS2dIds = [NSMutableArray arrayWithArray:[_rmq2Manager unackedS2dRmqIds]];
- _ackedS2dMap = [NSMutableDictionary dictionary];
- _sendOnConnectMessages = [NSMutableArray array];
- }
- return self;
- }
- - (NSString *)description {
- return [NSString stringWithFormat:@"host: %@, port: %lu, stream id in: %d, stream id out: %d",
- self.host, _FIRMessaging_UL(self.port), self.inStreamId,
- self.outStreamId];
- }
- - (void)signIn {
- if (self.state != kFIRMessagingConnectionNotConnected) {
- return;
- }
- // break it up for testing
- [self setupConnectionSocket];
- [self connectToSocket:self.socket];
- }
- - (void)setupConnectionSocket {
- self.socket = [[FIRMessagingSecureSocket alloc] init];
- self.socket.delegate = self;
- }
- - (void)connectToSocket:(FIRMessagingSecureSocket *)socket {
- self.state = kFIRMessagingConnectionConnecting;
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection000,
- @"Start connecting to FIRMessaging service.");
- [socket connectToHost:self.host port:self.port onRunLoop:self.runLoop];
- }
- - (void)signOut {
- // Clear the list of messages to be sent on connect. This will only
- // have messages in it if an error happened before receiving the LoginResponse.
- [self.sendOnConnectMessages removeAllObjects];
- if (self.state == kFIRMessagingConnectionSignedIn) {
- [self sendClose];
- }
- if (self.state != kFIRMessagingConnectionNotConnected) {
- [self disconnect];
- }
- }
- - (void)teardown {
- if (self.state != kFIRMessagingConnectionNotConnected) {
- [self disconnect];
- }
- }
- #pragma mark - FIRMessagingSecureSocketDelegate
- - (void)secureSocketDidConnect:(FIRMessagingSecureSocket *)socket {
- self.state = kFIRMessagingConnectionConnected;
- self.lastStreamIdAcked = 0;
- self.inStreamId = 0;
- self.outStreamId = 0;
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection001,
- @"Connected to FIRMessaging service.");
- [self resetUnconfirmedAcks];
- [self sendLoginRequest:self.authId token:self.token];
- }
- - (void)didDisconnectWithSecureSocket:(FIRMessagingSecureSocket *)socket {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection002,
- @"Secure socket disconnected from FIRMessaging service. %ld",
- (long)self.socket.state);
- [self disconnect];
- [self.delegate connection:self
- didCloseForReason:kFIRMessagingConnectionCloseReasonSocketDisconnected];
- }
- - (void)secureSocket:(FIRMessagingSecureSocket *)socket
- didReceiveData:(NSData *)data
- withTag:(int8_t)tag {
- if (tag < 0) {
- // Invalid proto tag
- return;
- }
- Class klassForTag = FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag);
- if ([klassForTag isSubclassOfClass:[NSNull class]]) {
- FIRMessagingLoggerError(kFIRMessagingMessageCodeConnection003, @"Invalid tag %d for proto",
- tag);
- return;
- }
- GPBMessage *proto = [klassForTag parseFromData:data error:NULL];
- if (tag == kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionConnected) {
- FIRMessagingLoggerDebug(
- kFIRMessagingMessageCodeConnection004,
- @"Should not receive generated message when the connection is not connected.");
- return;
- } else if (tag != kFIRMessagingProtoTagLoginResponse &&
- self.state != kFIRMessagingConnectionSignedIn) {
- FIRMessagingLoggerDebug(
- kFIRMessagingMessageCodeConnection005,
- @"Should not receive generated message when the connection is not signed in.");
- return;
- }
- // If traffic is received after a heartbeat it is safe to assume the connection is healthy.
- [self cancelConnectionTimeoutTask];
- [self performSelector:@selector(sendHeartbeatPing) withObject:nil afterDelay:kHeartbeatInterval];
- [self willProcessProto:proto];
- switch (tag) {
- case kFIRMessagingProtoTagLoginResponse:
- [self didReceiveLoginResponse:(GtalkLoginResponse *)proto];
- break;
- case kFIRMessagingProtoTagDataMessageStanza:
- [self didReceiveDataMessageStanza:(GtalkDataMessageStanza *)proto];
- break;
- case kFIRMessagingProtoTagHeartbeatPing:
- [self didReceiveHeartbeatPing:(GtalkHeartbeatPing *)proto];
- break;
- case kFIRMessagingProtoTagHeartbeatAck:
- [self didReceiveHeartbeatAck:(GtalkHeartbeatAck *)proto];
- break;
- case kFIRMessagingProtoTagClose:
- [self didReceiveClose:(GtalkClose *)proto];
- break;
- case kFIRMessagingProtoTagIqStanza:
- [self handleIqStanza:(GtalkIqStanza *)proto];
- break;
- default:
- [self didReceiveUnhandledProto:proto];
- break;
- }
- }
- // Called from secure socket once we have send the proto with given rmqId over the wire
- // since we are mostly concerned with user facing messages which certainly have a rmqId
- // we can retrieve them from the Rmq if necessary to look at stuff but for now we just
- // log it.
- - (void)secureSocket:(FIRMessagingSecureSocket *)socket
- didSendProtoWithTag:(int8_t)tag
- rmqId:(NSString *)rmqId {
- // log the message
- [self logMessage:rmqId messageType:tag isOut:YES];
- }
- #pragma mark - FIRMessagingTestConnection
- - (void)sendProto:(GPBMessage *)proto {
- FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(proto);
- if (tag == kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionConnected) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection006,
- @"Cannot send generated message when the connection is not connected.");
- return;
- } else if (tag != kFIRMessagingProtoTagLoginRequest &&
- self.state != kFIRMessagingConnectionSignedIn) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection007,
- @"Cannot send generated message when the connection is not signed in.");
- return;
- }
- if (self.socket == nil) {
- return;
- }
- [self willSendProto:proto];
- [self.socket sendData:proto.data withTag:tag rmqId:FIRMessagingGetRmq2Id(proto)];
- }
- - (void)sendOnConnectOrDrop:(GPBMessage *)message {
- if (self.state == kFIRMessagingConnectionSignedIn) {
- // If a connection has already been established, send normally
- [self sendProto:message];
- } else {
- // Otherwise add them to the list of messages to send after login
- [self.sendOnConnectMessages addObject:message];
- }
- }
- + (GtalkLoginRequest *)loginRequestWithToken:(NSString *)token authID:(NSString *)authID {
- GtalkLoginRequest *login = [[GtalkLoginRequest alloc] init];
- login.accountId = 1000000;
- login.authService = GtalkLoginRequest_AuthService_AndroidId;
- login.authToken = token;
- login.id_p = [NSString stringWithFormat:@"%@-%@", @"ios", FIRMessagingCurrentLibraryVersion()];
- login.domain = @"mcs.android.com";
- login.deviceId = [NSString stringWithFormat:@"android-%llx", authID.longLongValue];
- login.networkType = [self currentNetworkType];
- login.resource = authID;
- login.user = authID;
- login.useRmq2 = YES;
- login.lastRmqId = 1; // Sending not enabled yet so this stays as 1.
- return login;
- }
- + (int32_t)currentNetworkType {
- // http://developer.android.com/reference/android/net/ConnectivityManager.html
- int32_t fcmNetworkType;
- FIRMessagingNetworkStatus type = [[FIRMessaging messaging] networkType];
- switch (type) {
- case kFIRMessagingReachabilityReachableViaWiFi:
- fcmNetworkType = 1;
- break;
- case kFIRMessagingReachabilityReachableViaWWAN:
- fcmNetworkType = 0;
- break;
- default:
- fcmNetworkType = -1;
- break;
- }
- return fcmNetworkType;
- }
- - (void)sendLoginRequest:(NSString *)authId token:(NSString *)token {
- GtalkLoginRequest *login = [[self class] loginRequestWithToken:token authID:authId];
- // clear the messages sent during last connection
- if ([self.d2sInfos count]) {
- [self.d2sInfos removeAllObjects];
- }
- if (self.unackedS2dIds.count > 0) {
- FIRMessagingLoggerDebug(
- kFIRMessagingMessageCodeConnection008,
- @"There are unacked persistent Ids in the login request: %@",
- [self.unackedS2dIds.description stringByReplacingOccurrencesOfString:@"%"
- withString:@"%%"]);
- }
- // Send out acks.
- for (NSString *unackedPersistentS2dId in self.unackedS2dIds) {
- [login.receivedPersistentIdArray addObject:unackedPersistentS2dId];
- }
- GtalkSetting *setting = [[GtalkSetting alloc] init];
- setting.name = @"new_vc";
- setting.value = @"1";
- [login.settingArray addObject:setting];
- [self sendProto:login];
- }
- - (void)sendHeartbeatAck {
- [self sendProto:[[GtalkHeartbeatAck alloc] init]];
- }
- - (void)sendHeartbeatPing {
- // cancel the previous heartbeat request.
- [NSObject cancelPreviousPerformRequestsWithTarget:self
- selector:@selector(sendHeartbeatPing)
- object:nil];
- [self scheduleConnectionTimeoutTask];
- [self sendProto:[[GtalkHeartbeatPing alloc] init]];
- }
- + (GtalkIqStanza *)createStreamAck {
- GtalkIqStanza *iq = [[GtalkIqStanza alloc] init];
- iq.type = GtalkIqStanza_IqType_Set;
- iq.id_p = @"";
- GtalkExtension *ext = [[GtalkExtension alloc] init];
- ext.id_p = kIqStreamAck;
- ext.data_p = @"";
- iq.extension = ext;
- return iq;
- }
- - (void)sendStreamAck {
- GtalkIqStanza *iq = [[self class] createStreamAck];
- [self sendProto:iq];
- }
- - (void)sendClose {
- [self sendProto:[[GtalkClose alloc] init]];
- }
- - (void)handleIqStanza:(GtalkIqStanza *)iq {
- if (iq.hasExtension) {
- if (iq.extension.id_p == kIqStreamAck) {
- [self didReceiveStreamAck:iq];
- return;
- }
- if (iq.extension.id_p == kIqSelectiveAck) {
- [self didReceiveSelectiveAck:iq];
- return;
- }
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection009, @"Unknown ack extension id %d.",
- iq.extension.id_p);
- } else {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection010, @"Ip stanza without extension.");
- }
- [self didReceiveUnhandledProto:iq];
- }
- - (void)didReceiveLoginResponse:(GtalkLoginResponse *)loginResponse {
- if (loginResponse.hasError) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection011,
- @"Login error with type: %@, message: %@.", loginResponse.error.type,
- loginResponse.error.message);
- return;
- }
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection012, @"Logged onto MCS service.");
- self.state = kFIRMessagingConnectionSignedIn;
- self.lastLoginServerTimestamp = loginResponse.serverTimestamp;
- [self.delegate didLoginWithConnection:self];
- [self sendHeartbeatPing];
- // Add all the TTL=0 messages on connect
- for (GPBMessage *message in self.sendOnConnectMessages) {
- [self sendProto:message];
- }
- [self.sendOnConnectMessages removeAllObjects];
- }
- - (void)didReceiveHeartbeatPing:(GtalkHeartbeatPing *)heartbeatPing {
- [self sendHeartbeatAck];
- }
- - (void)didReceiveHeartbeatAck:(GtalkHeartbeatAck *)heartbeatAck {
- }
- - (void)didReceiveDataMessageStanza:(GtalkDataMessageStanza *)dataMessageStanza {
- // TODO: Maybe add support raw data later
- [self.delegate connectionDidRecieveMessage:dataMessageStanza];
- }
- - (void)didReceiveUnhandledProto:(GPBMessage *)proto {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection013, @"Received unhandled proto");
- }
- - (void)didReceiveStreamAck:(GtalkIqStanza *)iq {
- // Server received some stuff from us we don't really need to do anything special
- }
- - (void)didReceiveSelectiveAck:(GtalkIqStanza *)iq {
- GtalkExtension *extension = iq.extension;
- if (extension) {
- int extensionId = extension.id_p;
- if (extensionId == kIqSelectiveAck) {
- NSString *dataString = extension.data_p;
- GtalkSelectiveAck *selectiveAck = [[GtalkSelectiveAck alloc] init];
- [selectiveAck mergeFromData:[dataString dataUsingEncoding:NSUTF8StringEncoding]
- extensionRegistry:nil];
- NSArray<NSString *> *acks = [selectiveAck idArray];
- // we've received ACK's
- [self.delegate connectionDidReceiveAckForRmqIds:acks];
- // resend unacked messages
- [self.dataMessageManager resendMessagesWithConnection:self];
- }
- }
- }
- - (void)didReceiveClose:(GtalkClose *)close {
- [self disconnect];
- }
- - (void)willProcessProto:(GPBMessage *)proto {
- self.inStreamId++;
- if ([proto isKindOfClass:GtalkDataMessageStanza.class]) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection014,
- @"RMQ: Receiving %@ with rmq_id: %@ incoming stream Id: %d",
- proto.class, FIRMessagingGetRmq2Id(proto), self.inStreamId);
- } else {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection015,
- @"RMQ: Receiving %@ with incoming stream Id: %d.", proto.class,
- self.inStreamId);
- }
- int streamId = FIRMessagingGetLastStreamId(proto);
- if (streamId != kInvalidStreamId) {
- // confirm the D2S messages that were sent by us
- [self confirmAckedD2sIdsWithStreamId:streamId];
- // We can now confirm that our ack was received by the server and start our unack'd list fresh
- // with the proto we just received.
- [self confirmAckedS2dIdsWithStreamId:streamId];
- }
- NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
- if (rmq2Id != nil) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection016,
- @"RMQ: Add unacked persistent Id: %@.",
- [rmq2Id stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
- [self.unackedS2dIds addObject:rmq2Id];
- [self.rmq2Manager saveS2dMessageWithRmqId:rmq2Id]; // RMQ save
- }
- BOOL explicitAck = ([proto isKindOfClass:[GtalkDataMessageStanza class]] &&
- [(GtalkDataMessageStanza *)proto immediateAck]);
- // If we have not sent anything and the ack threshold has been reached then explicitly send one
- // to notify the server that we have received messages.
- if (self.inStreamId - self.lastStreamIdAcked >= kAckingInterval || explicitAck) {
- [self sendStreamAck];
- }
- }
- - (void)willSendProto:(GPBMessage *)proto {
- self.outStreamId++;
- NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
- if ([rmq2Id length]) {
- FIRMessagingD2SInfo *d2sInfo = [[FIRMessagingD2SInfo alloc] initWithStreamId:self.outStreamId
- d2sId:rmq2Id];
- [self.d2sInfos addObject:d2sInfo];
- }
- // each time we send a d2s message, it acks previously received
- // s2d messages via the last (s2d) stream id received.
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection017,
- @"RMQ: Sending %@ with outgoing stream Id: %d.", proto.class,
- self.outStreamId);
- // We have received messages since last time we sent something - send ack info to server.
- if (self.inStreamId > self.lastStreamIdAcked) {
- FIRMessagingSetLastStreamId(proto, self.inStreamId);
- self.lastStreamIdAcked = self.inStreamId;
- }
- if (self.unackedS2dIds.count > 0) {
- // Move all 'unack'd' messages to the ack'd map so they can be removed once the
- // ack is confirmed.
- NSArray *ackedS2dIds = [NSArray arrayWithArray:self.unackedS2dIds];
- FIRMessagingLoggerDebug(
- kFIRMessagingMessageCodeConnection018, @"RMQ: Mark persistent Ids as acked: %@.",
- [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
- [self.unackedS2dIds removeAllObjects];
- self.ackedS2dMap[[@(self.outStreamId) stringValue]] = ackedS2dIds;
- }
- }
- #pragma mark - Private
- /**
- * This processes the s2d message received in reference to the d2s messages
- * that we have sent before.
- */
- - (void)confirmAckedD2sIdsWithStreamId:(int)lastReceivedStreamId {
- NSMutableArray *d2sIdsAcked = [NSMutableArray array];
- for (FIRMessagingD2SInfo *d2sInfo in self.d2sInfos) {
- if (lastReceivedStreamId < d2sInfo.streamId) {
- break;
- }
- [d2sIdsAcked addObject:d2sInfo];
- }
- NSMutableArray *rmqIds = [NSMutableArray arrayWithCapacity:[d2sIdsAcked count]];
- // remove ACK'ed messages
- for (FIRMessagingD2SInfo *d2sInfo in d2sIdsAcked) {
- if ([d2sInfo.d2sID length]) {
- [rmqIds addObject:d2sInfo.d2sID];
- }
- [self.d2sInfos removeObject:d2sInfo];
- }
- [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
- }
- - (void)confirmAckedS2dIdsWithStreamId:(int)lastReceivedStreamId {
- // If the server hasn't received the streamId yet.
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection019,
- @"RMQ: Server last received stream Id: %d.", lastReceivedStreamId);
- if (lastReceivedStreamId < self.outStreamId) {
- // TODO: This could be a good indicator that we need to re-send something (acks)?
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection020,
- @"RMQ: There are unsent messages that should be send...\n"
- "server received: %d\nlast stream id sent: %d",
- lastReceivedStreamId, self.outStreamId);
- }
- NSSet *ackedStreamIds =
- [self.ackedS2dMap keysOfEntriesPassingTest:^BOOL(id key, id obj, BOOL *stop) {
- NSString *streamId = key;
- return streamId.intValue <= lastReceivedStreamId;
- }];
- NSMutableArray *s2dIdsToDelete = [NSMutableArray array];
- for (NSString *streamId in ackedStreamIds) {
- NSArray *ackedS2dIds = self.ackedS2dMap[streamId];
- if (ackedS2dIds.count > 0) {
- FIRMessagingLoggerDebug(
- kFIRMessagingMessageCodeConnection021,
- @"RMQ: Mark persistent Ids as confirmed by stream id %@: %@.", streamId,
- [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
- [self.ackedS2dMap removeObjectForKey:streamId];
- }
- [s2dIdsToDelete addObjectsFromArray:ackedS2dIds];
- }
- // clean up s2d ids that the server knows we've received.
- // we let the server know via a s2d last stream id received in a
- // d2s message. the server lets us know it has received our d2s
- // message via a d2s last stream id received in a s2d message.
- [self.rmq2Manager removeS2dIds:s2dIdsToDelete];
- }
- - (void)resetUnconfirmedAcks {
- [self.ackedS2dMap enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
- [self.unackedS2dIds addObjectsFromArray:obj];
- }];
- [self.ackedS2dMap removeAllObjects];
- }
- - (void)disconnect {
- // cancel pending timeout tasks.
- [self cancelConnectionTimeoutTask];
- // cancel pending heartbeat.
- [NSObject cancelPreviousPerformRequestsWithTarget:self
- selector:@selector(sendHeartbeatPing)
- object:nil];
- // Unset the delegate. FIRMessagingConnection will not receive further events from the socket from
- // now on.
- self.socket.delegate = nil;
- [self.socket disconnect];
- self.state = kFIRMessagingConnectionNotConnected;
- }
- - (void)connectionTimedOut {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection022,
- @"Connection to FIRMessaging service timed out.");
- [self disconnect];
- [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonTimeout];
- }
- - (void)scheduleConnectionTimeoutTask {
- // cancel the previous heartbeat timeout event and schedule a new one.
- [self cancelConnectionTimeoutTask];
- [self performSelector:@selector(connectionTimedOut)
- withObject:nil
- afterDelay:[self connectionTimeoutInterval]];
- }
- - (void)cancelConnectionTimeoutTask {
- // cancel pending timeout tasks.
- [NSObject cancelPreviousPerformRequestsWithTarget:self
- selector:@selector(connectionTimedOut)
- object:nil];
- }
- - (void)logMessage:(NSString *)description messageType:(int)messageType isOut:(BOOL)isOut {
- messageType = isOut ? -messageType : messageType;
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection023,
- @"Send msg: %@ type: %d inStreamId: %d outStreamId: %d", description,
- messageType, self.inStreamId, self.outStreamId);
- }
- - (NSTimeInterval)connectionTimeoutInterval {
- return kConnectionTimeout;
- }
- @end
|