FIRMessagingConnection.m 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "Firebase/Messaging/FIRMessagingConnection.h"
  17. #import <FirebaseMessaging/FIRMessaging.h>
  18. #import "Firebase/Messaging/Protos/GtalkCore.pbobjc.h"
  19. #import "Firebase/Messaging/Protos/GtalkExtensions.pbobjc.h"
  20. #import "Firebase/Messaging/FIRMessagingDataMessageManager.h"
  21. #import "Firebase/Messaging/FIRMessagingDefines.h"
  22. #import "Firebase/Messaging/FIRMessagingLogger.h"
  23. #import "Firebase/Messaging/FIRMessagingRmqManager.h"
  24. #import "Firebase/Messaging/FIRMessagingSecureSocket.h"
  25. #import "Firebase/Messaging/FIRMessagingUtilities.h"
  26. #import "Firebase/Messaging/FIRMessagingVersionUtilities.h"
  27. #import "Firebase/Messaging/FIRMessaging_Private.h"
  28. static NSInteger const kIqSelectiveAck = 12;
  29. static NSInteger const kIqStreamAck = 13;
  30. static int const kInvalidStreamId = -1;
  31. // Threshold for number of messages removed that we will ack, for short lived connections
  32. static int const kMessageRemoveAckThresholdCount = 5;
  33. static NSTimeInterval const kHeartbeatInterval = 30.0;
  34. static NSTimeInterval const kConnectionTimeout = 20.0;
  35. static int32_t const kAckingInterval = 10;
  36. static NSString *const kUnackedS2dIdKey = @"FIRMessagingUnackedS2dIdKey";
  37. static NSString *const kAckedS2dIdMapKey = @"FIRMessagingAckedS2dIdMapKey";
  38. static NSString *const kRemoteFromAddress = @"from";
  39. @interface FIRMessagingD2SInfo : NSObject
  40. @property(nonatomic, readwrite, assign) int streamId;
  41. @property(nonatomic, readwrite, strong) NSString *d2sID;
  42. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID;
  43. @end
  44. @implementation FIRMessagingD2SInfo
  45. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID {
  46. self = [super init];
  47. if (self) {
  48. _streamId = streamId;
  49. _d2sID = [d2sID copy];
  50. }
  51. return self;
  52. }
  53. - (BOOL)isEqual:(id)object {
  54. if ([object isKindOfClass:[self class]]) {
  55. FIRMessagingD2SInfo *other = (FIRMessagingD2SInfo *)object;
  56. return self.streamId == other.streamId && [self.d2sID isEqualToString:other.d2sID];
  57. }
  58. return NO;
  59. }
  60. - (NSUInteger)hash {
  61. return [self.d2sID hash];
  62. }
  63. @end
  64. @interface FIRMessagingConnection ()<FIRMessagingSecureSocketDelegate>
  65. @property(nonatomic, readwrite, weak) FIRMessagingRmqManager *rmq2Manager;
  66. @property(nonatomic, readwrite, weak) FIRMessagingDataMessageManager *dataMessageManager;
  67. @property(nonatomic, readwrite, assign) FIRMessagingConnectionState state;
  68. @property(nonatomic, readwrite, copy) NSString *host;
  69. @property(nonatomic, readwrite, assign) NSUInteger port;
  70. @property(nonatomic, readwrite, strong) NSString *authId;
  71. @property(nonatomic, readwrite, strong) NSString *token;
  72. @property(nonatomic, readwrite, strong) FIRMessagingSecureSocket *socket;
  73. @property(nonatomic, readwrite, assign) int64_t lastLoginServerTimestamp;
  74. @property(nonatomic, readwrite, assign) int lastStreamIdAcked;
  75. @property(nonatomic, readwrite, assign) int inStreamId;
  76. @property(nonatomic, readwrite, assign) int outStreamId;
  77. @property(nonatomic, readwrite, strong) NSMutableArray *unackedS2dIds;
  78. @property(nonatomic, readwrite, strong) NSMutableDictionary *ackedS2dMap;
  79. @property(nonatomic, readwrite, strong) NSMutableArray *d2sInfos;
  80. // ttl=0 messages that need to be sent as soon as we establish a connection
  81. @property(nonatomic, readwrite, strong) NSMutableArray *sendOnConnectMessages;
  82. @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
  83. @end
  84. @implementation FIRMessagingConnection;
  85. - (instancetype)initWithAuthID:(NSString *)authId
  86. token:(NSString *)token
  87. host:(NSString *)host
  88. port:(NSUInteger)port
  89. runLoop:(NSRunLoop *)runLoop
  90. rmq2Manager:(FIRMessagingRmqManager *)rmq2Manager
  91. fcmManager:(FIRMessagingDataMessageManager *)dataMessageManager {
  92. self = [super init];
  93. if (self) {
  94. _authId = [authId copy];
  95. _token = [token copy];
  96. _host = [host copy];
  97. _port = port;
  98. _runLoop = runLoop;
  99. _rmq2Manager = rmq2Manager;
  100. _dataMessageManager = dataMessageManager;
  101. _d2sInfos = [NSMutableArray array];
  102. _unackedS2dIds = [NSMutableArray arrayWithArray:[_rmq2Manager unackedS2dRmqIds]];
  103. _ackedS2dMap = [NSMutableDictionary dictionary];
  104. _sendOnConnectMessages = [NSMutableArray array];
  105. }
  106. return self;
  107. }
  108. - (NSString *)description {
  109. return [NSString stringWithFormat:@"host: %@, port: %lu, stream id in: %d, stream id out: %d",
  110. self.host,
  111. _FIRMessaging_UL(self.port),
  112. self.inStreamId,
  113. self.outStreamId];
  114. }
  115. - (void)signIn {
  116. if (self.state != kFIRMessagingConnectionNotConnected) {
  117. return;
  118. }
  119. // break it up for testing
  120. [self setupConnectionSocket];
  121. [self connectToSocket:self.socket];
  122. }
  123. - (void)setupConnectionSocket {
  124. self.socket = [[FIRMessagingSecureSocket alloc] init];
  125. self.socket.delegate = self;
  126. }
  127. - (void)connectToSocket:(FIRMessagingSecureSocket *)socket {
  128. self.state = kFIRMessagingConnectionConnecting;
  129. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection000,
  130. @"Start connecting to FIRMessaging service.");
  131. [socket connectToHost:self.host port:self.port onRunLoop:self.runLoop];
  132. }
  133. - (void)signOut {
  134. // Clear the list of messages to be sent on connect. This will only
  135. // have messages in it if an error happened before receiving the LoginResponse.
  136. [self.sendOnConnectMessages removeAllObjects];
  137. if (self.state == kFIRMessagingConnectionSignedIn) {
  138. [self sendClose];
  139. }
  140. if (self.state != kFIRMessagingConnectionNotConnected) {
  141. [self disconnect];
  142. }
  143. }
  144. - (void)teardown {
  145. if (self.state != kFIRMessagingConnectionNotConnected) {
  146. [self disconnect];
  147. }
  148. }
  149. #pragma mark - FIRMessagingSecureSocketDelegate
  150. - (void)secureSocketDidConnect:(FIRMessagingSecureSocket *)socket {
  151. self.state = kFIRMessagingConnectionConnected;
  152. self.lastStreamIdAcked = 0;
  153. self.inStreamId = 0;
  154. self.outStreamId = 0;
  155. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection001,
  156. @"Connected to FIRMessaging service.");
  157. [self resetUnconfirmedAcks];
  158. [self sendLoginRequest:self.authId token:self.token];
  159. }
  160. - (void)didDisconnectWithSecureSocket:(FIRMessagingSecureSocket *)socket {
  161. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection002,
  162. @"Secure socket disconnected from FIRMessaging service. %ld", (long)self.socket.state);
  163. [self disconnect];
  164. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonSocketDisconnected];
  165. }
  166. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  167. didReceiveData:(NSData *)data
  168. withTag:(int8_t)tag {
  169. if (tag < 0) {
  170. // Invalid proto tag
  171. return;
  172. }
  173. Class klassForTag = FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag);
  174. if ([klassForTag isSubclassOfClass:[NSNull class]]) {
  175. FIRMessagingLoggerError(kFIRMessagingMessageCodeConnection003, @"Invalid tag %d for proto",
  176. tag);
  177. return;
  178. }
  179. GPBMessage *proto = [klassForTag parseFromData:data error:NULL];
  180. if (tag == kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionConnected) {
  181. FIRMessagingLoggerDebug(
  182. kFIRMessagingMessageCodeConnection004,
  183. @"Should not receive generated message when the connection is not connected.");
  184. return;
  185. } else if (tag != kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionSignedIn) {
  186. FIRMessagingLoggerDebug(
  187. kFIRMessagingMessageCodeConnection005,
  188. @"Should not receive generated message when the connection is not signed in.");
  189. return;
  190. }
  191. // If traffic is received after a heartbeat it is safe to assume the connection is healthy.
  192. [self cancelConnectionTimeoutTask];
  193. [self performSelector:@selector(sendHeartbeatPing)
  194. withObject:nil
  195. afterDelay:kHeartbeatInterval];
  196. [self willProcessProto:proto];
  197. switch (tag) {
  198. case kFIRMessagingProtoTagLoginResponse:
  199. [self didReceiveLoginResponse:(GtalkLoginResponse *)proto];
  200. break;
  201. case kFIRMessagingProtoTagDataMessageStanza:
  202. [self didReceiveDataMessageStanza:(GtalkDataMessageStanza *)proto];
  203. break;
  204. case kFIRMessagingProtoTagHeartbeatPing:
  205. [self didReceiveHeartbeatPing:(GtalkHeartbeatPing *)proto];
  206. break;
  207. case kFIRMessagingProtoTagHeartbeatAck:
  208. [self didReceiveHeartbeatAck:(GtalkHeartbeatAck *)proto];
  209. break;
  210. case kFIRMessagingProtoTagClose:
  211. [self didReceiveClose:(GtalkClose *)proto];
  212. break;
  213. case kFIRMessagingProtoTagIqStanza:
  214. [self handleIqStanza:(GtalkIqStanza *)proto];
  215. break;
  216. default:
  217. [self didReceiveUnhandledProto:proto];
  218. break;
  219. }
  220. }
  221. // Called from secure socket once we have send the proto with given rmqId over the wire
  222. // since we are mostly concerned with user facing messages which certainly have a rmqId
  223. // we can retrieve them from the Rmq if necessary to look at stuff but for now we just
  224. // log it.
  225. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  226. didSendProtoWithTag:(int8_t)tag
  227. rmqId:(NSString *)rmqId {
  228. // log the message
  229. [self logMessage:rmqId messageType:tag isOut:YES];
  230. }
  231. #pragma mark - FIRMessagingTestConnection
  232. - (void)sendProto:(GPBMessage *)proto {
  233. FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(proto);
  234. if (tag == kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionConnected) {
  235. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection006,
  236. @"Cannot send generated message when the connection is not connected.");
  237. return;
  238. } else if (tag != kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionSignedIn) {
  239. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection007,
  240. @"Cannot send generated message when the connection is not signed in.");
  241. return;
  242. }
  243. if (self.socket == nil) {
  244. return;
  245. }
  246. [self willSendProto:proto];
  247. [self.socket sendData:proto.data withTag:tag rmqId:FIRMessagingGetRmq2Id(proto)];
  248. }
  249. - (void)sendOnConnectOrDrop:(GPBMessage *)message {
  250. if (self.state == kFIRMessagingConnectionSignedIn) {
  251. // If a connection has already been established, send normally
  252. [self sendProto:message];
  253. } else {
  254. // Otherwise add them to the list of messages to send after login
  255. [self.sendOnConnectMessages addObject:message];
  256. }
  257. }
  258. + (GtalkLoginRequest *)loginRequestWithToken:(NSString *)token authID:(NSString *)authID {
  259. GtalkLoginRequest *login = [[GtalkLoginRequest alloc] init];
  260. login.accountId = 1000000;
  261. login.authService = GtalkLoginRequest_AuthService_AndroidId;
  262. login.authToken = token;
  263. login.id_p = [NSString stringWithFormat:@"%@-%@", @"ios", FIRMessagingCurrentLibraryVersion()];
  264. login.domain = @"mcs.android.com";
  265. login.deviceId = [NSString stringWithFormat:@"android-%llx", authID.longLongValue];
  266. login.networkType = [self currentNetworkType];
  267. login.resource = authID;
  268. login.user = authID;
  269. login.useRmq2 = YES;
  270. login.lastRmqId = 1; // Sending not enabled yet so this stays as 1.
  271. return login;
  272. }
  273. + (int32_t)currentNetworkType {
  274. // http://developer.android.com/reference/android/net/ConnectivityManager.html
  275. int32_t fcmNetworkType;
  276. FIRMessagingNetworkStatus type = [[FIRMessaging messaging] networkType];
  277. switch (type) {
  278. case kFIRMessagingReachabilityReachableViaWiFi:
  279. fcmNetworkType = 1;
  280. break;
  281. case kFIRMessagingReachabilityReachableViaWWAN:
  282. fcmNetworkType = 0;
  283. break;
  284. default:
  285. fcmNetworkType = -1;
  286. break;
  287. }
  288. return fcmNetworkType;
  289. }
  290. - (void)sendLoginRequest:(NSString *)authId
  291. token:(NSString *)token {
  292. GtalkLoginRequest *login = [[self class] loginRequestWithToken:token authID:authId];
  293. // clear the messages sent during last connection
  294. if ([self.d2sInfos count]) {
  295. [self.d2sInfos removeAllObjects];
  296. }
  297. if (self.unackedS2dIds.count > 0) {
  298. FIRMessagingLoggerDebug(
  299. kFIRMessagingMessageCodeConnection008,
  300. @"There are unacked persistent Ids in the login request: %@",
  301. [self.unackedS2dIds.description stringByReplacingOccurrencesOfString:@"%"
  302. withString:@"%%"]);
  303. }
  304. // Send out acks.
  305. for (NSString *unackedPersistentS2dId in self.unackedS2dIds) {
  306. [login.receivedPersistentIdArray addObject:unackedPersistentS2dId];
  307. }
  308. GtalkSetting *setting = [[GtalkSetting alloc] init];
  309. setting.name = @"new_vc";
  310. setting.value = @"1";
  311. [login.settingArray addObject:setting];
  312. [self sendProto:login];
  313. }
  314. - (void)sendHeartbeatAck {
  315. [self sendProto:[[GtalkHeartbeatAck alloc] init]];
  316. }
  317. - (void)sendHeartbeatPing {
  318. // cancel the previous heartbeat request.
  319. [NSObject cancelPreviousPerformRequestsWithTarget:self
  320. selector:@selector(sendHeartbeatPing)
  321. object:nil];
  322. [self scheduleConnectionTimeoutTask];
  323. [self sendProto:[[GtalkHeartbeatPing alloc] init]];
  324. }
  325. + (GtalkIqStanza *)createStreamAck {
  326. GtalkIqStanza *iq = [[GtalkIqStanza alloc] init];
  327. iq.type = GtalkIqStanza_IqType_Set;
  328. iq.id_p = @"";
  329. GtalkExtension *ext = [[GtalkExtension alloc] init];
  330. ext.id_p = kIqStreamAck;
  331. ext.data_p = @"";
  332. iq.extension = ext;
  333. return iq;
  334. }
  335. - (void)sendStreamAck {
  336. GtalkIqStanza *iq = [[self class] createStreamAck];
  337. [self sendProto:iq];
  338. }
  339. - (void)sendClose {
  340. [self sendProto:[[GtalkClose alloc] init]];
  341. }
  342. - (void)handleIqStanza:(GtalkIqStanza *)iq {
  343. if (iq.hasExtension) {
  344. if (iq.extension.id_p == kIqStreamAck) {
  345. [self didReceiveStreamAck:iq];
  346. return;
  347. }
  348. if (iq.extension.id_p == kIqSelectiveAck) {
  349. [self didReceiveSelectiveAck:iq];
  350. return;
  351. }
  352. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection009, @"Unknown ack extension id %d.",
  353. iq.extension.id_p);
  354. } else {
  355. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection010, @"Ip stanza without extension.");
  356. }
  357. [self didReceiveUnhandledProto:iq];
  358. }
  359. - (void)didReceiveLoginResponse:(GtalkLoginResponse *)loginResponse {
  360. if (loginResponse.hasError) {
  361. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection011,
  362. @"Login error with type: %@, message: %@.", loginResponse.error.type,
  363. loginResponse.error.message);
  364. return;
  365. }
  366. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection012, @"Logged onto MCS service.");
  367. self.state = kFIRMessagingConnectionSignedIn;
  368. self.lastLoginServerTimestamp = loginResponse.serverTimestamp;
  369. [self.delegate didLoginWithConnection:self];
  370. [self sendHeartbeatPing];
  371. // Add all the TTL=0 messages on connect
  372. for (GPBMessage *message in self.sendOnConnectMessages) {
  373. [self sendProto:message];
  374. }
  375. [self.sendOnConnectMessages removeAllObjects];
  376. }
  377. - (void)didReceiveHeartbeatPing:(GtalkHeartbeatPing *)heartbeatPing {
  378. [self sendHeartbeatAck];
  379. }
  380. - (void)didReceiveHeartbeatAck:(GtalkHeartbeatAck *)heartbeatAck {
  381. }
  382. - (void)didReceiveDataMessageStanza:(GtalkDataMessageStanza *)dataMessageStanza {
  383. // TODO: Maybe add support raw data later
  384. [self.delegate connectionDidRecieveMessage:dataMessageStanza];
  385. }
  386. - (void)didReceiveUnhandledProto:(GPBMessage *)proto {
  387. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection013, @"Received unhandled proto");
  388. }
  389. - (void)didReceiveStreamAck:(GtalkIqStanza *)iq {
  390. // Server received some stuff from us we don't really need to do anything special
  391. }
  392. - (void)didReceiveSelectiveAck:(GtalkIqStanza *)iq {
  393. GtalkExtension *extension = iq.extension;
  394. if (extension) {
  395. int extensionId = extension.id_p;
  396. if (extensionId == kIqSelectiveAck) {
  397. NSString *dataString = extension.data_p;
  398. GtalkSelectiveAck *selectiveAck = [[GtalkSelectiveAck alloc] init];
  399. [selectiveAck mergeFromData:[dataString dataUsingEncoding:NSUTF8StringEncoding]
  400. extensionRegistry:nil];
  401. NSArray <NSString *>*acks = [selectiveAck idArray];
  402. // we've received ACK's
  403. [self.delegate connectionDidReceiveAckForRmqIds:acks];
  404. // resend unacked messages
  405. [self.dataMessageManager resendMessagesWithConnection:self];
  406. }
  407. }
  408. }
  409. - (void)didReceiveClose:(GtalkClose *)close {
  410. [self disconnect];
  411. }
  412. - (void)willProcessProto:(GPBMessage *)proto {
  413. self.inStreamId++;
  414. if ([proto isKindOfClass:GtalkDataMessageStanza.class]) {
  415. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection014,
  416. @"RMQ: Receiving %@ with rmq_id: %@ incoming stream Id: %d",
  417. proto.class, FIRMessagingGetRmq2Id(proto), self.inStreamId);
  418. } else {
  419. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection015,
  420. @"RMQ: Receiving %@ with incoming stream Id: %d.", proto.class,
  421. self.inStreamId);
  422. }
  423. int streamId = FIRMessagingGetLastStreamId(proto);
  424. if (streamId != kInvalidStreamId) {
  425. // confirm the D2S messages that were sent by us
  426. [self confirmAckedD2sIdsWithStreamId:streamId];
  427. // We can now confirm that our ack was received by the server and start our unack'd list fresh
  428. // with the proto we just received.
  429. [self confirmAckedS2dIdsWithStreamId:streamId];
  430. }
  431. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  432. if (rmq2Id != nil) {
  433. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection016,
  434. @"RMQ: Add unacked persistent Id: %@.",
  435. [rmq2Id stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  436. [self.unackedS2dIds addObject:rmq2Id];
  437. [self.rmq2Manager saveS2dMessageWithRmqId:rmq2Id]; // RMQ save
  438. }
  439. BOOL explicitAck = ([proto isKindOfClass:[GtalkDataMessageStanza class]] &&
  440. [(GtalkDataMessageStanza *)proto immediateAck]);
  441. // If we have not sent anything and the ack threshold has been reached then explicitly send one
  442. // to notify the server that we have received messages.
  443. if (self.inStreamId - self.lastStreamIdAcked >= kAckingInterval || explicitAck) {
  444. [self sendStreamAck];
  445. }
  446. }
  447. - (void)willSendProto:(GPBMessage *)proto {
  448. self.outStreamId++;
  449. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  450. if ([rmq2Id length]) {
  451. FIRMessagingD2SInfo *d2sInfo = [[FIRMessagingD2SInfo alloc] initWithStreamId:self.outStreamId d2sId:rmq2Id];
  452. [self.d2sInfos addObject:d2sInfo];
  453. }
  454. // each time we send a d2s message, it acks previously received
  455. // s2d messages via the last (s2d) stream id received.
  456. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection017,
  457. @"RMQ: Sending %@ with outgoing stream Id: %d.", proto.class,
  458. self.outStreamId);
  459. // We have received messages since last time we sent something - send ack info to server.
  460. if (self.inStreamId > self.lastStreamIdAcked) {
  461. FIRMessagingSetLastStreamId(proto, self.inStreamId);
  462. self.lastStreamIdAcked = self.inStreamId;
  463. }
  464. if (self.unackedS2dIds.count > 0) {
  465. // Move all 'unack'd' messages to the ack'd map so they can be removed once the
  466. // ack is confirmed.
  467. NSArray *ackedS2dIds = [NSArray arrayWithArray:self.unackedS2dIds];
  468. FIRMessagingLoggerDebug(
  469. kFIRMessagingMessageCodeConnection018, @"RMQ: Mark persistent Ids as acked: %@.",
  470. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  471. [self.unackedS2dIds removeAllObjects];
  472. self.ackedS2dMap[[@(self.outStreamId) stringValue]] = ackedS2dIds;
  473. }
  474. }
  475. #pragma mark - Private
  476. /**
  477. * This processes the s2d message received in reference to the d2s messages
  478. * that we have sent before.
  479. */
  480. - (void)confirmAckedD2sIdsWithStreamId:(int)lastReceivedStreamId {
  481. NSMutableArray *d2sIdsAcked = [NSMutableArray array];
  482. for (FIRMessagingD2SInfo *d2sInfo in self.d2sInfos) {
  483. if (lastReceivedStreamId < d2sInfo.streamId) {
  484. break;
  485. }
  486. [d2sIdsAcked addObject:d2sInfo];
  487. }
  488. NSMutableArray *rmqIds = [NSMutableArray arrayWithCapacity:[d2sIdsAcked count]];
  489. // remove ACK'ed messages
  490. for (FIRMessagingD2SInfo *d2sInfo in d2sIdsAcked) {
  491. if ([d2sInfo.d2sID length]) {
  492. [rmqIds addObject:d2sInfo.d2sID];
  493. }
  494. [self.d2sInfos removeObject:d2sInfo];
  495. }
  496. [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
  497. int count = [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
  498. if (kMessageRemoveAckThresholdCount > 0 && count >= kMessageRemoveAckThresholdCount) {
  499. // For short lived connections, if a large number of messages are removed, send an
  500. // ack straight away so the server knows that this message was received.
  501. [self sendStreamAck];
  502. }
  503. }
  504. /**
  505. * Called when a stream ACK or a selective ACK are received - this indicates the message has
  506. * been received by MCS.
  507. */
  508. - (void)didReceiveAckForRmqIds:(NSArray *)rmqIds {
  509. // TODO: let the user know that the following messages were received by the server
  510. }
  511. - (void)confirmAckedS2dIdsWithStreamId:(int)lastReceivedStreamId {
  512. // If the server hasn't received the streamId yet.
  513. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection019,
  514. @"RMQ: Server last received stream Id: %d.", lastReceivedStreamId);
  515. if (lastReceivedStreamId < self.outStreamId) {
  516. // TODO: This could be a good indicator that we need to re-send something (acks)?
  517. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection020,
  518. @"RMQ: There are unsent messages that should be send...\n"
  519. "server received: %d\nlast stream id sent: %d",
  520. lastReceivedStreamId, self.outStreamId);
  521. }
  522. NSSet *ackedStreamIds =
  523. [self.ackedS2dMap keysOfEntriesPassingTest:^BOOL(id key, id obj, BOOL *stop) {
  524. NSString *streamId = key;
  525. return streamId.intValue <= lastReceivedStreamId;
  526. }];
  527. NSMutableArray *s2dIdsToDelete = [NSMutableArray array];
  528. for (NSString *streamId in ackedStreamIds) {
  529. NSArray *ackedS2dIds = self.ackedS2dMap[streamId];
  530. if (ackedS2dIds.count > 0) {
  531. FIRMessagingLoggerDebug(
  532. kFIRMessagingMessageCodeConnection021,
  533. @"RMQ: Mark persistent Ids as confirmed by stream id %@: %@.", streamId,
  534. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  535. [self.ackedS2dMap removeObjectForKey:streamId];
  536. }
  537. [s2dIdsToDelete addObjectsFromArray:ackedS2dIds];
  538. }
  539. // clean up s2d ids that the server knows we've received.
  540. // we let the server know via a s2d last stream id received in a
  541. // d2s message. the server lets us know it has received our d2s
  542. // message via a d2s last stream id received in a s2d message.
  543. [self.rmq2Manager removeS2dIds:s2dIdsToDelete];
  544. }
  545. - (void)resetUnconfirmedAcks {
  546. [self.ackedS2dMap enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
  547. [self.unackedS2dIds addObjectsFromArray:obj];
  548. }];
  549. [self.ackedS2dMap removeAllObjects];
  550. }
  551. - (void)disconnect {
  552. // cancel pending timeout tasks.
  553. [self cancelConnectionTimeoutTask];
  554. // cancel pending heartbeat.
  555. [NSObject cancelPreviousPerformRequestsWithTarget:self
  556. selector:@selector(sendHeartbeatPing)
  557. object:nil];
  558. // Unset the delegate. FIRMessagingConnection will not receive further events from the socket from now on.
  559. self.socket.delegate = nil;
  560. [self.socket disconnect];
  561. self.state = kFIRMessagingConnectionNotConnected;
  562. }
  563. - (void)connectionTimedOut {
  564. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection022,
  565. @"Connection to FIRMessaging service timed out.");
  566. [self disconnect];
  567. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonTimeout];
  568. }
  569. - (void)scheduleConnectionTimeoutTask {
  570. // cancel the previous heartbeat timeout event and schedule a new one.
  571. [self cancelConnectionTimeoutTask];
  572. [self performSelector:@selector(connectionTimedOut)
  573. withObject:nil
  574. afterDelay:[self connectionTimeoutInterval]];
  575. }
  576. - (void)cancelConnectionTimeoutTask {
  577. // cancel pending timeout tasks.
  578. [NSObject cancelPreviousPerformRequestsWithTarget:self
  579. selector:@selector(connectionTimedOut)
  580. object:nil];
  581. }
  582. - (void)logMessage:(NSString *)description messageType:(int)messageType isOut:(BOOL)isOut {
  583. messageType = isOut ? -messageType : messageType;
  584. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection023,
  585. @"Send msg: %@ type: %d inStreamId: %d outStreamId: %d", description,
  586. messageType, self.inStreamId, self.outStreamId);
  587. }
  588. - (NSTimeInterval)connectionTimeoutInterval {
  589. return kConnectionTimeout;
  590. }
  591. @end