FIRMessagingConnection.m 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683
  1. /*
  2. * Copyright 2020 Google LLC
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FirebaseMessaging/Sources/FIRMessagingConnection.h"
  17. #import <FirebaseMessaging/FIRMessaging.h>
  18. #import "FirebaseMessaging/Sources/Protos/GtalkCore.pbobjc.h"
  19. #import "FirebaseMessaging/Sources/Protos/GtalkExtensions.pbobjc.h"
  20. #import "FirebaseMessaging/Sources/FIRMessagingDataMessageManager.h"
  21. #import "FirebaseMessaging/Sources/FIRMessagingDefines.h"
  22. #import "FirebaseMessaging/Sources/FIRMessagingLogger.h"
  23. #import "FirebaseMessaging/Sources/FIRMessagingRmqManager.h"
  24. #import "FirebaseMessaging/Sources/FIRMessagingSecureSocket.h"
  25. #import "FirebaseMessaging/Sources/FIRMessagingUtilities.h"
  26. #import "FirebaseMessaging/Sources/FIRMessagingVersionUtilities.h"
  27. #import "FirebaseMessaging/Sources/FIRMessaging_Private.h"
  28. static NSInteger const kIqSelectiveAck = 12;
  29. static NSInteger const kIqStreamAck = 13;
  30. static int const kInvalidStreamId = -1;
  31. static NSTimeInterval const kHeartbeatInterval = 30.0;
  32. static NSTimeInterval const kConnectionTimeout = 20.0;
  33. static int32_t const kAckingInterval = 10;
  34. static NSString *const kUnackedS2dIdKey = @"FIRMessagingUnackedS2dIdKey";
  35. static NSString *const kAckedS2dIdMapKey = @"FIRMessagingAckedS2dIdMapKey";
  36. static NSString *const kRemoteFromAddress = @"from";
  37. @interface FIRMessagingD2SInfo : NSObject
  38. @property(nonatomic, readwrite, assign) int streamId;
  39. @property(nonatomic, readwrite, strong) NSString *d2sID;
  40. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID;
  41. @end
  42. @implementation FIRMessagingD2SInfo
  43. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID {
  44. self = [super init];
  45. if (self) {
  46. _streamId = streamId;
  47. _d2sID = [d2sID copy];
  48. }
  49. return self;
  50. }
  51. - (BOOL)isEqual:(id)object {
  52. if ([object isKindOfClass:[self class]]) {
  53. FIRMessagingD2SInfo *other = (FIRMessagingD2SInfo *)object;
  54. return self.streamId == other.streamId && [self.d2sID isEqualToString:other.d2sID];
  55. }
  56. return NO;
  57. }
  58. - (NSUInteger)hash {
  59. return [self.d2sID hash];
  60. }
  61. @end
  62. @interface FIRMessagingConnection () <FIRMessagingSecureSocketDelegate>
  63. @property(nonatomic, readwrite, weak) FIRMessagingRmqManager *rmq2Manager;
  64. @property(nonatomic, readwrite, weak) FIRMessagingDataMessageManager *dataMessageManager;
  65. @property(nonatomic, readwrite, assign) FIRMessagingConnectionState state;
  66. @property(nonatomic, readwrite, copy) NSString *host;
  67. @property(nonatomic, readwrite, assign) NSUInteger port;
  68. @property(nonatomic, readwrite, strong) NSString *authId;
  69. @property(nonatomic, readwrite, strong) NSString *token;
  70. @property(nonatomic, readwrite, strong) FIRMessagingSecureSocket *socket;
  71. @property(nonatomic, readwrite, assign) int64_t lastLoginServerTimestamp;
  72. @property(nonatomic, readwrite, assign) int lastStreamIdAcked;
  73. @property(nonatomic, readwrite, assign) int inStreamId;
  74. @property(nonatomic, readwrite, assign) int outStreamId;
  75. @property(nonatomic, readwrite, strong) NSMutableArray *unackedS2dIds;
  76. @property(nonatomic, readwrite, strong) NSMutableDictionary *ackedS2dMap;
  77. @property(nonatomic, readwrite, strong) NSMutableArray *d2sInfos;
  78. // ttl=0 messages that need to be sent as soon as we establish a connection
  79. @property(nonatomic, readwrite, strong) NSMutableArray *sendOnConnectMessages;
  80. @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
  81. @end
  82. @implementation FIRMessagingConnection
  83. ;
  84. - (instancetype)initWithAuthID:(NSString *)authId
  85. token:(NSString *)token
  86. host:(NSString *)host
  87. port:(NSUInteger)port
  88. runLoop:(NSRunLoop *)runLoop
  89. rmq2Manager:(FIRMessagingRmqManager *)rmq2Manager
  90. fcmManager:(FIRMessagingDataMessageManager *)dataMessageManager {
  91. self = [super init];
  92. if (self) {
  93. _authId = [authId copy];
  94. _token = [token copy];
  95. _host = [host copy];
  96. _port = port;
  97. _runLoop = runLoop;
  98. _rmq2Manager = rmq2Manager;
  99. _dataMessageManager = dataMessageManager;
  100. _d2sInfos = [NSMutableArray array];
  101. _unackedS2dIds = [NSMutableArray arrayWithArray:[_rmq2Manager unackedS2dRmqIds]];
  102. _ackedS2dMap = [NSMutableDictionary dictionary];
  103. _sendOnConnectMessages = [NSMutableArray array];
  104. }
  105. return self;
  106. }
  107. - (NSString *)description {
  108. return [NSString stringWithFormat:@"host: %@, port: %lu, stream id in: %d, stream id out: %d",
  109. self.host, _FIRMessaging_UL(self.port), self.inStreamId,
  110. self.outStreamId];
  111. }
  112. - (void)signIn {
  113. if (self.state != kFIRMessagingConnectionNotConnected) {
  114. return;
  115. }
  116. // break it up for testing
  117. [self setupConnectionSocket];
  118. [self connectToSocket:self.socket];
  119. }
  120. - (void)setupConnectionSocket {
  121. self.socket = [[FIRMessagingSecureSocket alloc] init];
  122. self.socket.delegate = self;
  123. }
  124. - (void)connectToSocket:(FIRMessagingSecureSocket *)socket {
  125. self.state = kFIRMessagingConnectionConnecting;
  126. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection000,
  127. @"Start connecting to FIRMessaging service.");
  128. [socket connectToHost:self.host port:self.port onRunLoop:self.runLoop];
  129. }
  130. - (void)signOut {
  131. // Clear the list of messages to be sent on connect. This will only
  132. // have messages in it if an error happened before receiving the LoginResponse.
  133. [self.sendOnConnectMessages removeAllObjects];
  134. if (self.state == kFIRMessagingConnectionSignedIn) {
  135. [self sendClose];
  136. }
  137. if (self.state != kFIRMessagingConnectionNotConnected) {
  138. [self disconnect];
  139. }
  140. }
  141. - (void)teardown {
  142. if (self.state != kFIRMessagingConnectionNotConnected) {
  143. [self disconnect];
  144. }
  145. }
  146. #pragma mark - FIRMessagingSecureSocketDelegate
  147. - (void)secureSocketDidConnect:(FIRMessagingSecureSocket *)socket {
  148. self.state = kFIRMessagingConnectionConnected;
  149. self.lastStreamIdAcked = 0;
  150. self.inStreamId = 0;
  151. self.outStreamId = 0;
  152. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection001,
  153. @"Connected to FIRMessaging service.");
  154. [self resetUnconfirmedAcks];
  155. [self sendLoginRequest:self.authId token:self.token];
  156. }
  157. - (void)didDisconnectWithSecureSocket:(FIRMessagingSecureSocket *)socket {
  158. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection002,
  159. @"Secure socket disconnected from FIRMessaging service. %ld",
  160. (long)self.socket.state);
  161. [self disconnect];
  162. [self.delegate connection:self
  163. didCloseForReason:kFIRMessagingConnectionCloseReasonSocketDisconnected];
  164. }
  165. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  166. didReceiveData:(NSData *)data
  167. withTag:(int8_t)tag {
  168. if (tag < 0) {
  169. // Invalid proto tag
  170. return;
  171. }
  172. Class klassForTag = FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag);
  173. if ([klassForTag isSubclassOfClass:[NSNull class]]) {
  174. FIRMessagingLoggerError(kFIRMessagingMessageCodeConnection003, @"Invalid tag %d for proto",
  175. tag);
  176. return;
  177. }
  178. GPBMessage *proto = [klassForTag parseFromData:data error:NULL];
  179. if (tag == kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionConnected) {
  180. FIRMessagingLoggerDebug(
  181. kFIRMessagingMessageCodeConnection004,
  182. @"Should not receive generated message when the connection is not connected.");
  183. return;
  184. } else if (tag != kFIRMessagingProtoTagLoginResponse &&
  185. self.state != kFIRMessagingConnectionSignedIn) {
  186. FIRMessagingLoggerDebug(
  187. kFIRMessagingMessageCodeConnection005,
  188. @"Should not receive generated message when the connection is not signed in.");
  189. return;
  190. }
  191. // If traffic is received after a heartbeat it is safe to assume the connection is healthy.
  192. [self cancelConnectionTimeoutTask];
  193. [self performSelector:@selector(sendHeartbeatPing) withObject:nil afterDelay:kHeartbeatInterval];
  194. [self willProcessProto:proto];
  195. switch (tag) {
  196. case kFIRMessagingProtoTagLoginResponse:
  197. [self didReceiveLoginResponse:(GtalkLoginResponse *)proto];
  198. break;
  199. case kFIRMessagingProtoTagDataMessageStanza:
  200. [self didReceiveDataMessageStanza:(GtalkDataMessageStanza *)proto];
  201. break;
  202. case kFIRMessagingProtoTagHeartbeatPing:
  203. [self didReceiveHeartbeatPing:(GtalkHeartbeatPing *)proto];
  204. break;
  205. case kFIRMessagingProtoTagHeartbeatAck:
  206. [self didReceiveHeartbeatAck:(GtalkHeartbeatAck *)proto];
  207. break;
  208. case kFIRMessagingProtoTagClose:
  209. [self didReceiveClose:(GtalkClose *)proto];
  210. break;
  211. case kFIRMessagingProtoTagIqStanza:
  212. [self handleIqStanza:(GtalkIqStanza *)proto];
  213. break;
  214. default:
  215. [self didReceiveUnhandledProto:proto];
  216. break;
  217. }
  218. }
  219. // Called from secure socket once we have send the proto with given rmqId over the wire
  220. // since we are mostly concerned with user facing messages which certainly have a rmqId
  221. // we can retrieve them from the Rmq if necessary to look at stuff but for now we just
  222. // log it.
  223. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  224. didSendProtoWithTag:(int8_t)tag
  225. rmqId:(NSString *)rmqId {
  226. // log the message
  227. [self logMessage:rmqId messageType:tag isOut:YES];
  228. }
  229. #pragma mark - FIRMessagingTestConnection
  230. - (void)sendProto:(GPBMessage *)proto {
  231. FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(proto);
  232. if (tag == kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionConnected) {
  233. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection006,
  234. @"Cannot send generated message when the connection is not connected.");
  235. return;
  236. } else if (tag != kFIRMessagingProtoTagLoginRequest &&
  237. self.state != kFIRMessagingConnectionSignedIn) {
  238. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection007,
  239. @"Cannot send generated message when the connection is not signed in.");
  240. return;
  241. }
  242. if (self.socket == nil) {
  243. return;
  244. }
  245. [self willSendProto:proto];
  246. [self.socket sendData:proto.data withTag:tag rmqId:FIRMessagingGetRmq2Id(proto)];
  247. }
  248. - (void)sendOnConnectOrDrop:(GPBMessage *)message {
  249. if (self.state == kFIRMessagingConnectionSignedIn) {
  250. // If a connection has already been established, send normally
  251. [self sendProto:message];
  252. } else {
  253. // Otherwise add them to the list of messages to send after login
  254. [self.sendOnConnectMessages addObject:message];
  255. }
  256. }
  257. + (GtalkLoginRequest *)loginRequestWithToken:(NSString *)token authID:(NSString *)authID {
  258. GtalkLoginRequest *login = [[GtalkLoginRequest alloc] init];
  259. login.accountId = 1000000;
  260. login.authService = GtalkLoginRequest_AuthService_AndroidId;
  261. login.authToken = token;
  262. login.id_p = [NSString stringWithFormat:@"%@-%@", @"ios", FIRMessagingCurrentLibraryVersion()];
  263. login.domain = @"mcs.android.com";
  264. login.deviceId = [NSString stringWithFormat:@"android-%llx", authID.longLongValue];
  265. login.networkType = [self currentNetworkType];
  266. login.resource = authID;
  267. login.user = authID;
  268. login.useRmq2 = YES;
  269. login.lastRmqId = 1; // Sending not enabled yet so this stays as 1.
  270. return login;
  271. }
  272. + (int32_t)currentNetworkType {
  273. // http://developer.android.com/reference/android/net/ConnectivityManager.html
  274. int32_t fcmNetworkType;
  275. FIRMessagingNetworkStatus type = [[FIRMessaging messaging] networkType];
  276. switch (type) {
  277. case kFIRMessagingReachabilityReachableViaWiFi:
  278. fcmNetworkType = 1;
  279. break;
  280. case kFIRMessagingReachabilityReachableViaWWAN:
  281. fcmNetworkType = 0;
  282. break;
  283. default:
  284. fcmNetworkType = -1;
  285. break;
  286. }
  287. return fcmNetworkType;
  288. }
  289. - (void)sendLoginRequest:(NSString *)authId token:(NSString *)token {
  290. GtalkLoginRequest *login = [[self class] loginRequestWithToken:token authID:authId];
  291. // clear the messages sent during last connection
  292. if ([self.d2sInfos count]) {
  293. [self.d2sInfos removeAllObjects];
  294. }
  295. if (self.unackedS2dIds.count > 0) {
  296. FIRMessagingLoggerDebug(
  297. kFIRMessagingMessageCodeConnection008,
  298. @"There are unacked persistent Ids in the login request: %@",
  299. [self.unackedS2dIds.description stringByReplacingOccurrencesOfString:@"%"
  300. withString:@"%%"]);
  301. }
  302. // Send out acks.
  303. for (NSString *unackedPersistentS2dId in self.unackedS2dIds) {
  304. [login.receivedPersistentIdArray addObject:unackedPersistentS2dId];
  305. }
  306. GtalkSetting *setting = [[GtalkSetting alloc] init];
  307. setting.name = @"new_vc";
  308. setting.value = @"1";
  309. [login.settingArray addObject:setting];
  310. [self sendProto:login];
  311. }
  312. - (void)sendHeartbeatAck {
  313. [self sendProto:[[GtalkHeartbeatAck alloc] init]];
  314. }
  315. - (void)sendHeartbeatPing {
  316. // cancel the previous heartbeat request.
  317. [NSObject cancelPreviousPerformRequestsWithTarget:self
  318. selector:@selector(sendHeartbeatPing)
  319. object:nil];
  320. [self scheduleConnectionTimeoutTask];
  321. [self sendProto:[[GtalkHeartbeatPing alloc] init]];
  322. }
  323. + (GtalkIqStanza *)createStreamAck {
  324. GtalkIqStanza *iq = [[GtalkIqStanza alloc] init];
  325. iq.type = GtalkIqStanza_IqType_Set;
  326. iq.id_p = @"";
  327. GtalkExtension *ext = [[GtalkExtension alloc] init];
  328. ext.id_p = kIqStreamAck;
  329. ext.data_p = @"";
  330. iq.extension = ext;
  331. return iq;
  332. }
  333. - (void)sendStreamAck {
  334. GtalkIqStanza *iq = [[self class] createStreamAck];
  335. [self sendProto:iq];
  336. }
  337. - (void)sendClose {
  338. [self sendProto:[[GtalkClose alloc] init]];
  339. }
  340. - (void)handleIqStanza:(GtalkIqStanza *)iq {
  341. if (iq.hasExtension) {
  342. if (iq.extension.id_p == kIqStreamAck) {
  343. [self didReceiveStreamAck:iq];
  344. return;
  345. }
  346. if (iq.extension.id_p == kIqSelectiveAck) {
  347. [self didReceiveSelectiveAck:iq];
  348. return;
  349. }
  350. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection009, @"Unknown ack extension id %d.",
  351. iq.extension.id_p);
  352. } else {
  353. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection010, @"Ip stanza without extension.");
  354. }
  355. [self didReceiveUnhandledProto:iq];
  356. }
  357. - (void)didReceiveLoginResponse:(GtalkLoginResponse *)loginResponse {
  358. if (loginResponse.hasError) {
  359. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection011,
  360. @"Login error with type: %@, message: %@.", loginResponse.error.type,
  361. loginResponse.error.message);
  362. return;
  363. }
  364. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection012, @"Logged onto MCS service.");
  365. self.state = kFIRMessagingConnectionSignedIn;
  366. self.lastLoginServerTimestamp = loginResponse.serverTimestamp;
  367. [self.delegate didLoginWithConnection:self];
  368. [self sendHeartbeatPing];
  369. // Add all the TTL=0 messages on connect
  370. for (GPBMessage *message in self.sendOnConnectMessages) {
  371. [self sendProto:message];
  372. }
  373. [self.sendOnConnectMessages removeAllObjects];
  374. }
  375. - (void)didReceiveHeartbeatPing:(GtalkHeartbeatPing *)heartbeatPing {
  376. [self sendHeartbeatAck];
  377. }
  378. - (void)didReceiveHeartbeatAck:(GtalkHeartbeatAck *)heartbeatAck {
  379. }
  380. - (void)didReceiveDataMessageStanza:(GtalkDataMessageStanza *)dataMessageStanza {
  381. // TODO: Maybe add support raw data later
  382. [self.delegate connectionDidRecieveMessage:dataMessageStanza];
  383. }
  384. - (void)didReceiveUnhandledProto:(GPBMessage *)proto {
  385. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection013, @"Received unhandled proto");
  386. }
  387. - (void)didReceiveStreamAck:(GtalkIqStanza *)iq {
  388. // Server received some stuff from us we don't really need to do anything special
  389. }
  390. - (void)didReceiveSelectiveAck:(GtalkIqStanza *)iq {
  391. GtalkExtension *extension = iq.extension;
  392. if (extension) {
  393. int extensionId = extension.id_p;
  394. if (extensionId == kIqSelectiveAck) {
  395. NSString *dataString = extension.data_p;
  396. GtalkSelectiveAck *selectiveAck = [[GtalkSelectiveAck alloc] init];
  397. [selectiveAck mergeFromData:[dataString dataUsingEncoding:NSUTF8StringEncoding]
  398. extensionRegistry:nil];
  399. NSArray<NSString *> *acks = [selectiveAck idArray];
  400. // we've received ACK's
  401. [self.delegate connectionDidReceiveAckForRmqIds:acks];
  402. // resend unacked messages
  403. [self.dataMessageManager resendMessagesWithConnection:self];
  404. }
  405. }
  406. }
  407. - (void)didReceiveClose:(GtalkClose *)close {
  408. [self disconnect];
  409. }
  410. - (void)willProcessProto:(GPBMessage *)proto {
  411. self.inStreamId++;
  412. if ([proto isKindOfClass:GtalkDataMessageStanza.class]) {
  413. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection014,
  414. @"RMQ: Receiving %@ with rmq_id: %@ incoming stream Id: %d",
  415. proto.class, FIRMessagingGetRmq2Id(proto), self.inStreamId);
  416. } else {
  417. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection015,
  418. @"RMQ: Receiving %@ with incoming stream Id: %d.", proto.class,
  419. self.inStreamId);
  420. }
  421. int streamId = FIRMessagingGetLastStreamId(proto);
  422. if (streamId != kInvalidStreamId) {
  423. // confirm the D2S messages that were sent by us
  424. [self confirmAckedD2sIdsWithStreamId:streamId];
  425. // We can now confirm that our ack was received by the server and start our unack'd list fresh
  426. // with the proto we just received.
  427. [self confirmAckedS2dIdsWithStreamId:streamId];
  428. }
  429. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  430. if (rmq2Id != nil) {
  431. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection016,
  432. @"RMQ: Add unacked persistent Id: %@.",
  433. [rmq2Id stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  434. [self.unackedS2dIds addObject:rmq2Id];
  435. [self.rmq2Manager saveS2dMessageWithRmqId:rmq2Id]; // RMQ save
  436. }
  437. BOOL explicitAck = ([proto isKindOfClass:[GtalkDataMessageStanza class]] &&
  438. [(GtalkDataMessageStanza *)proto immediateAck]);
  439. // If we have not sent anything and the ack threshold has been reached then explicitly send one
  440. // to notify the server that we have received messages.
  441. if (self.inStreamId - self.lastStreamIdAcked >= kAckingInterval || explicitAck) {
  442. [self sendStreamAck];
  443. }
  444. }
  445. - (void)willSendProto:(GPBMessage *)proto {
  446. self.outStreamId++;
  447. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  448. if ([rmq2Id length]) {
  449. FIRMessagingD2SInfo *d2sInfo = [[FIRMessagingD2SInfo alloc] initWithStreamId:self.outStreamId
  450. d2sId:rmq2Id];
  451. [self.d2sInfos addObject:d2sInfo];
  452. }
  453. // each time we send a d2s message, it acks previously received
  454. // s2d messages via the last (s2d) stream id received.
  455. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection017,
  456. @"RMQ: Sending %@ with outgoing stream Id: %d.", proto.class,
  457. self.outStreamId);
  458. // We have received messages since last time we sent something - send ack info to server.
  459. if (self.inStreamId > self.lastStreamIdAcked) {
  460. FIRMessagingSetLastStreamId(proto, self.inStreamId);
  461. self.lastStreamIdAcked = self.inStreamId;
  462. }
  463. if (self.unackedS2dIds.count > 0) {
  464. // Move all 'unack'd' messages to the ack'd map so they can be removed once the
  465. // ack is confirmed.
  466. NSArray *ackedS2dIds = [NSArray arrayWithArray:self.unackedS2dIds];
  467. FIRMessagingLoggerDebug(
  468. kFIRMessagingMessageCodeConnection018, @"RMQ: Mark persistent Ids as acked: %@.",
  469. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  470. [self.unackedS2dIds removeAllObjects];
  471. self.ackedS2dMap[[@(self.outStreamId) stringValue]] = ackedS2dIds;
  472. }
  473. }
  474. #pragma mark - Private
  475. /**
  476. * This processes the s2d message received in reference to the d2s messages
  477. * that we have sent before.
  478. */
  479. - (void)confirmAckedD2sIdsWithStreamId:(int)lastReceivedStreamId {
  480. NSMutableArray *d2sIdsAcked = [NSMutableArray array];
  481. for (FIRMessagingD2SInfo *d2sInfo in self.d2sInfos) {
  482. if (lastReceivedStreamId < d2sInfo.streamId) {
  483. break;
  484. }
  485. [d2sIdsAcked addObject:d2sInfo];
  486. }
  487. NSMutableArray *rmqIds = [NSMutableArray arrayWithCapacity:[d2sIdsAcked count]];
  488. // remove ACK'ed messages
  489. for (FIRMessagingD2SInfo *d2sInfo in d2sIdsAcked) {
  490. if ([d2sInfo.d2sID length]) {
  491. [rmqIds addObject:d2sInfo.d2sID];
  492. }
  493. [self.d2sInfos removeObject:d2sInfo];
  494. }
  495. [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
  496. }
  497. - (void)confirmAckedS2dIdsWithStreamId:(int)lastReceivedStreamId {
  498. // If the server hasn't received the streamId yet.
  499. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection019,
  500. @"RMQ: Server last received stream Id: %d.", lastReceivedStreamId);
  501. if (lastReceivedStreamId < self.outStreamId) {
  502. // TODO: This could be a good indicator that we need to re-send something (acks)?
  503. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection020,
  504. @"RMQ: There are unsent messages that should be send...\n"
  505. "server received: %d\nlast stream id sent: %d",
  506. lastReceivedStreamId, self.outStreamId);
  507. }
  508. NSSet *ackedStreamIds =
  509. [self.ackedS2dMap keysOfEntriesPassingTest:^BOOL(id key, id obj, BOOL *stop) {
  510. NSString *streamId = key;
  511. return streamId.intValue <= lastReceivedStreamId;
  512. }];
  513. NSMutableArray *s2dIdsToDelete = [NSMutableArray array];
  514. for (NSString *streamId in ackedStreamIds) {
  515. NSArray *ackedS2dIds = self.ackedS2dMap[streamId];
  516. if (ackedS2dIds.count > 0) {
  517. FIRMessagingLoggerDebug(
  518. kFIRMessagingMessageCodeConnection021,
  519. @"RMQ: Mark persistent Ids as confirmed by stream id %@: %@.", streamId,
  520. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  521. [self.ackedS2dMap removeObjectForKey:streamId];
  522. }
  523. [s2dIdsToDelete addObjectsFromArray:ackedS2dIds];
  524. }
  525. // clean up s2d ids that the server knows we've received.
  526. // we let the server know via a s2d last stream id received in a
  527. // d2s message. the server lets us know it has received our d2s
  528. // message via a d2s last stream id received in a s2d message.
  529. [self.rmq2Manager removeS2dIds:s2dIdsToDelete];
  530. }
  531. - (void)resetUnconfirmedAcks {
  532. [self.ackedS2dMap enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
  533. [self.unackedS2dIds addObjectsFromArray:obj];
  534. }];
  535. [self.ackedS2dMap removeAllObjects];
  536. }
  537. - (void)disconnect {
  538. // cancel pending timeout tasks.
  539. [self cancelConnectionTimeoutTask];
  540. // cancel pending heartbeat.
  541. [NSObject cancelPreviousPerformRequestsWithTarget:self
  542. selector:@selector(sendHeartbeatPing)
  543. object:nil];
  544. // Unset the delegate. FIRMessagingConnection will not receive further events from the socket from
  545. // now on.
  546. self.socket.delegate = nil;
  547. [self.socket disconnect];
  548. self.state = kFIRMessagingConnectionNotConnected;
  549. }
  550. - (void)connectionTimedOut {
  551. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection022,
  552. @"Connection to FIRMessaging service timed out.");
  553. [self disconnect];
  554. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonTimeout];
  555. }
  556. - (void)scheduleConnectionTimeoutTask {
  557. // cancel the previous heartbeat timeout event and schedule a new one.
  558. [self cancelConnectionTimeoutTask];
  559. [self performSelector:@selector(connectionTimedOut)
  560. withObject:nil
  561. afterDelay:[self connectionTimeoutInterval]];
  562. }
  563. - (void)cancelConnectionTimeoutTask {
  564. // cancel pending timeout tasks.
  565. [NSObject cancelPreviousPerformRequestsWithTarget:self
  566. selector:@selector(connectionTimedOut)
  567. object:nil];
  568. }
  569. - (void)logMessage:(NSString *)description messageType:(int)messageType isOut:(BOOL)isOut {
  570. messageType = isOut ? -messageType : messageType;
  571. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection023,
  572. @"Send msg: %@ type: %d inStreamId: %d outStreamId: %d", description,
  573. messageType, self.inStreamId, self.outStreamId);
  574. }
  575. - (NSTimeInterval)connectionTimeoutInterval {
  576. return kConnectionTimeout;
  577. }
  578. @end