FIRMessagingConnection.m 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import "FIRMessagingConnection.h"
  17. #import "Protos/GtalkCore.pbobjc.h"
  18. #import "Protos/GtalkExtensions.pbobjc.h"
  19. #import "FIRMessaging.h"
  20. #import "FIRMessagingDataMessageManager.h"
  21. #import "FIRMessagingDefines.h"
  22. #import "FIRMessagingLogger.h"
  23. #import "FIRMessagingRmqManager.h"
  24. #import "FIRMessagingSecureSocket.h"
  25. #import "FIRMessagingUtilities.h"
  26. #import "FIRMessagingVersionUtilities.h"
  27. #import "FIRMessaging_Private.h"
  28. static NSInteger const kIqSelectiveAck = 12;
  29. static NSInteger const kIqStreamAck = 13;
  30. static int const kInvalidStreamId = -1;
  31. // Threshold for number of messages removed that we will ack, for short lived connections
  32. static int const kMessageRemoveAckThresholdCount = 5;
  33. static NSTimeInterval const kHeartbeatInterval = 30.0;
  34. static NSTimeInterval const kConnectionTimeout = 20.0;
  35. static int32_t const kAckingInterval = 10;
  36. static NSString *const kUnackedS2dIdKey = @"FIRMessagingUnackedS2dIdKey";
  37. static NSString *const kAckedS2dIdMapKey = @"FIRMessagingAckedS2dIdMapKey";
  38. static NSString *const kRemoteFromAddress = @"from";
  39. @interface FIRMessagingD2SInfo : NSObject
  40. @property(nonatomic, readwrite, assign) int streamId;
  41. @property(nonatomic, readwrite, strong) NSString *d2sID;
  42. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID;
  43. @end
  44. @implementation FIRMessagingD2SInfo
  45. - (instancetype)initWithStreamId:(int)streamId d2sId:(NSString *)d2sID {
  46. self = [super init];
  47. if (self) {
  48. _streamId = streamId;
  49. _d2sID = [d2sID copy];
  50. }
  51. return self;
  52. }
  53. - (BOOL)isEqual:(id)object {
  54. if ([object isKindOfClass:[self class]]) {
  55. FIRMessagingD2SInfo *other = (FIRMessagingD2SInfo *)object;
  56. return self.streamId == other.streamId && [self.d2sID isEqualToString:other.d2sID];
  57. }
  58. return NO;
  59. }
  60. - (NSUInteger)hash {
  61. return [self.d2sID hash];
  62. }
  63. @end
  64. @interface FIRMessagingConnection ()<FIRMessagingSecureSocketDelegate>
  65. @property(nonatomic, readwrite, weak) FIRMessagingRmqManager *rmq2Manager;
  66. @property(nonatomic, readwrite, weak) FIRMessagingDataMessageManager *dataMessageManager;
  67. @property(nonatomic, readwrite, assign) FIRMessagingConnectionState state;
  68. @property(nonatomic, readwrite, copy) NSString *host;
  69. @property(nonatomic, readwrite, assign) NSUInteger port;
  70. @property(nonatomic, readwrite, strong) NSString *authId;
  71. @property(nonatomic, readwrite, strong) NSString *token;
  72. @property(nonatomic, readwrite, strong) FIRMessagingSecureSocket *socket;
  73. @property(nonatomic, readwrite, assign) int64_t lastLoginServerTimestamp;
  74. @property(nonatomic, readwrite, assign) int lastStreamIdAcked;
  75. @property(nonatomic, readwrite, assign) int inStreamId;
  76. @property(nonatomic, readwrite, assign) int outStreamId;
  77. @property(nonatomic, readwrite, strong) NSMutableArray *unackedS2dIds;
  78. @property(nonatomic, readwrite, strong) NSMutableDictionary *ackedS2dMap;
  79. @property(nonatomic, readwrite, strong) NSMutableArray *d2sInfos;
  80. // ttl=0 messages that need to be sent as soon as we establish a connection
  81. @property(nonatomic, readwrite, strong) NSMutableArray *sendOnConnectMessages;
  82. @property(nonatomic, readwrite, strong) NSRunLoop *runLoop;
  83. @end
  84. @implementation FIRMessagingConnection;
  85. - (instancetype)initWithAuthID:(NSString *)authId
  86. token:(NSString *)token
  87. host:(NSString *)host
  88. port:(NSUInteger)port
  89. runLoop:(NSRunLoop *)runLoop
  90. rmq2Manager:(FIRMessagingRmqManager *)rmq2Manager
  91. fcmManager:(FIRMessagingDataMessageManager *)dataMessageManager {
  92. self = [super init];
  93. if (self) {
  94. _authId = [authId copy];
  95. _token = [token copy];
  96. _host = [host copy];
  97. _port = port;
  98. _runLoop = runLoop;
  99. _rmq2Manager = rmq2Manager;
  100. _dataMessageManager = dataMessageManager;
  101. _d2sInfos = [NSMutableArray array];
  102. _unackedS2dIds = [NSMutableArray arrayWithArray:[_rmq2Manager unackedS2dRmqIds]];
  103. _ackedS2dMap = [NSMutableDictionary dictionary];
  104. _sendOnConnectMessages = [NSMutableArray array];
  105. }
  106. return self;
  107. }
  108. - (NSString *)description {
  109. return [NSString stringWithFormat:@"host: %@, port: %lu, stream id in: %d, stream id out: %d",
  110. self.host,
  111. _FIRMessaging_UL(self.port),
  112. self.inStreamId,
  113. self.outStreamId];
  114. }
  115. - (void)signIn {
  116. _FIRMessagingDevAssert(self.state == kFIRMessagingConnectionNotConnected, @"Invalid connection state.");
  117. if (self.state != kFIRMessagingConnectionNotConnected) {
  118. return;
  119. }
  120. // break it up for testing
  121. [self setupConnectionSocket];
  122. [self connectToSocket:self.socket];
  123. }
  124. - (void)setupConnectionSocket {
  125. self.socket = [[FIRMessagingSecureSocket alloc] init];
  126. self.socket.delegate = self;
  127. }
  128. - (void)connectToSocket:(FIRMessagingSecureSocket *)socket {
  129. self.state = kFIRMessagingConnectionConnecting;
  130. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection000,
  131. @"Start connecting to FIRMessaging service.");
  132. [socket connectToHost:self.host port:self.port onRunLoop:self.runLoop];
  133. }
  134. - (void)signOut {
  135. // Clear the list of messages to be sent on connect. This will only
  136. // have messages in it if an error happened before receiving the LoginResponse.
  137. [self.sendOnConnectMessages removeAllObjects];
  138. if (self.state == kFIRMessagingConnectionSignedIn) {
  139. [self sendClose];
  140. }
  141. if (self.state != kFIRMessagingConnectionNotConnected) {
  142. [self disconnect];
  143. }
  144. }
  145. - (void)teardown {
  146. if (self.state != kFIRMessagingConnectionNotConnected) {
  147. [self disconnect];
  148. }
  149. }
  150. #pragma mark - FIRMessagingSecureSocketDelegate
  151. - (void)secureSocketDidConnect:(FIRMessagingSecureSocket *)socket {
  152. self.state = kFIRMessagingConnectionConnected;
  153. self.lastStreamIdAcked = 0;
  154. self.inStreamId = 0;
  155. self.outStreamId = 0;
  156. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection001,
  157. @"Connected to FIRMessaging service.");
  158. [self resetUnconfirmedAcks];
  159. [self sendLoginRequest:self.authId token:self.token];
  160. }
  161. - (void)didDisconnectWithSecureSocket:(FIRMessagingSecureSocket *)socket {
  162. _FIRMessagingDevAssert(self.socket == socket, @"Invalid socket");
  163. _FIRMessagingDevAssert(self.socket.state == kFIRMessagingSecureSocketClosed, @"Socket already closed");
  164. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection002,
  165. @"Secure socket disconnected from FIRMessaging service.");
  166. [self disconnect];
  167. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonSocketDisconnected];
  168. }
  169. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  170. didReceiveData:(NSData *)data
  171. withTag:(int8_t)tag {
  172. if (tag < 0) {
  173. // Invalid proto tag
  174. return;
  175. }
  176. Class klassForTag = FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag);
  177. if ([klassForTag isSubclassOfClass:[NSNull class]]) {
  178. FIRMessagingLoggerError(kFIRMessagingMessageCodeConnection003, @"Invalid tag %d for proto",
  179. tag);
  180. return;
  181. }
  182. GPBMessage *proto = [klassForTag parseFromData:data error:NULL];
  183. if (tag == kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionConnected) {
  184. FIRMessagingLoggerDebug(
  185. kFIRMessagingMessageCodeConnection004,
  186. @"Should not receive generated message when the connection is not connected.");
  187. return;
  188. } else if (tag != kFIRMessagingProtoTagLoginResponse && self.state != kFIRMessagingConnectionSignedIn) {
  189. FIRMessagingLoggerDebug(
  190. kFIRMessagingMessageCodeConnection005,
  191. @"Should not receive generated message when the connection is not signed in.");
  192. return;
  193. }
  194. // If traffic is received after a heartbeat it is safe to assume the connection is healthy.
  195. [self cancelConnectionTimeoutTask];
  196. [self performSelector:@selector(sendHeartbeatPing)
  197. withObject:nil
  198. afterDelay:kHeartbeatInterval];
  199. [self willProcessProto:proto];
  200. switch (tag) {
  201. case kFIRMessagingProtoTagLoginResponse:
  202. [self didReceiveLoginResponse:(GtalkLoginResponse *)proto];
  203. break;
  204. case kFIRMessagingProtoTagDataMessageStanza:
  205. [self didReceiveDataMessageStanza:(GtalkDataMessageStanza *)proto];
  206. break;
  207. case kFIRMessagingProtoTagHeartbeatPing:
  208. [self didReceiveHeartbeatPing:(GtalkHeartbeatPing *)proto];
  209. break;
  210. case kFIRMessagingProtoTagHeartbeatAck:
  211. [self didReceiveHeartbeatAck:(GtalkHeartbeatAck *)proto];
  212. break;
  213. case kFIRMessagingProtoTagClose:
  214. [self didReceiveClose:(GtalkClose *)proto];
  215. break;
  216. case kFIRMessagingProtoTagIqStanza:
  217. [self handleIqStanza:(GtalkIqStanza *)proto];
  218. break;
  219. default:
  220. [self didReceiveUnhandledProto:proto];
  221. break;
  222. }
  223. }
  224. // Called from secure socket once we have send the proto with given rmqId over the wire
  225. // since we are mostly concerned with user facing messages which certainly have a rmqId
  226. // we can retrieve them from the Rmq if necessary to look at stuff but for now we just
  227. // log it.
  228. - (void)secureSocket:(FIRMessagingSecureSocket *)socket
  229. didSendProtoWithTag:(int8_t)tag
  230. rmqId:(NSString *)rmqId {
  231. // log the message
  232. [self logMessage:rmqId messageType:tag isOut:YES];
  233. }
  234. #pragma mark - FIRMessagingTestConnection
  235. - (void)sendProto:(GPBMessage *)proto {
  236. FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(proto);
  237. if (tag == kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionConnected) {
  238. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection006,
  239. @"Cannot send generated message when the connection is not connected.");
  240. return;
  241. } else if (tag != kFIRMessagingProtoTagLoginRequest && self.state != kFIRMessagingConnectionSignedIn) {
  242. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection007,
  243. @"Cannot send generated message when the connection is not signed in.");
  244. return;
  245. }
  246. _FIRMessagingDevAssert(self.socket != nil, @"Socket shouldn't be nil");
  247. if (self.socket == nil) {
  248. return;
  249. }
  250. [self willSendProto:proto];
  251. [self.socket sendData:proto.data withTag:tag rmqId:FIRMessagingGetRmq2Id(proto)];
  252. }
  253. - (void)sendOnConnectOrDrop:(GPBMessage *)message {
  254. if (self.state == kFIRMessagingConnectionSignedIn) {
  255. // If a connection has already been established, send normally
  256. [self sendProto:message];
  257. } else {
  258. // Otherwise add them to the list of messages to send after login
  259. [self.sendOnConnectMessages addObject:message];
  260. }
  261. }
  262. + (GtalkLoginRequest *)loginRequestWithToken:(NSString *)token authID:(NSString *)authID {
  263. GtalkLoginRequest *login = [[GtalkLoginRequest alloc] init];
  264. login.accountId = 1000000;
  265. login.authService = GtalkLoginRequest_AuthService_AndroidId;
  266. login.authToken = token;
  267. login.id_p = [NSString stringWithFormat:@"%@-%@", @"ios", FIRMessagingCurrentLibraryVersion()];
  268. login.domain = @"mcs.android.com";
  269. login.deviceId = [NSString stringWithFormat:@"android-%llx", authID.longLongValue];
  270. login.networkType = [self currentNetworkType];
  271. login.resource = authID;
  272. login.user = authID;
  273. login.useRmq2 = YES;
  274. login.lastRmqId = 1; // Sending not enabled yet so this stays as 1.
  275. return login;
  276. }
  277. + (int32_t)currentNetworkType {
  278. // http://developer.android.com/reference/android/net/ConnectivityManager.html
  279. int32_t fcmNetworkType;
  280. FIRMessagingNetworkStatus type = [[FIRMessaging messaging] networkType];
  281. switch (type) {
  282. case kFIRMessagingReachabilityReachableViaWiFi:
  283. fcmNetworkType = 1;
  284. break;
  285. case kFIRMessagingReachabilityReachableViaWWAN:
  286. fcmNetworkType = 0;
  287. break;
  288. default:
  289. fcmNetworkType = -1;
  290. break;
  291. }
  292. return fcmNetworkType;
  293. }
  294. - (void)sendLoginRequest:(NSString *)authId
  295. token:(NSString *)token {
  296. GtalkLoginRequest *login = [[self class] loginRequestWithToken:token authID:authId];
  297. // clear the messages sent during last connection
  298. if ([self.d2sInfos count]) {
  299. [self.d2sInfos removeAllObjects];
  300. }
  301. if (self.unackedS2dIds.count > 0) {
  302. FIRMessagingLoggerDebug(
  303. kFIRMessagingMessageCodeConnection008,
  304. @"There are unacked persistent Ids in the login request: %@",
  305. [self.unackedS2dIds.description stringByReplacingOccurrencesOfString:@"%"
  306. withString:@"%%"]);
  307. }
  308. // Send out acks.
  309. for (NSString *unackedPersistentS2dId in self.unackedS2dIds) {
  310. [login.receivedPersistentIdArray addObject:unackedPersistentS2dId];
  311. }
  312. GtalkSetting *setting = [[GtalkSetting alloc] init];
  313. setting.name = @"new_vc";
  314. setting.value = @"1";
  315. [login.settingArray addObject:setting];
  316. [self sendProto:login];
  317. }
  318. - (void)sendHeartbeatAck {
  319. [self sendProto:[[GtalkHeartbeatAck alloc] init]];
  320. }
  321. - (void)sendHeartbeatPing {
  322. // cancel the previous heartbeat request.
  323. [NSObject cancelPreviousPerformRequestsWithTarget:self
  324. selector:@selector(sendHeartbeatPing)
  325. object:nil];
  326. [self scheduleConnectionTimeoutTask];
  327. [self sendProto:[[GtalkHeartbeatPing alloc] init]];
  328. }
  329. + (GtalkIqStanza *)createStreamAck {
  330. GtalkIqStanza *iq = [[GtalkIqStanza alloc] init];
  331. iq.type = GtalkIqStanza_IqType_Set;
  332. iq.id_p = @"";
  333. GtalkExtension *ext = [[GtalkExtension alloc] init];
  334. ext.id_p = kIqStreamAck;
  335. ext.data_p = @"";
  336. iq.extension = ext;
  337. return iq;
  338. }
  339. - (void)sendStreamAck {
  340. GtalkIqStanza *iq = [[self class] createStreamAck];
  341. [self sendProto:iq];
  342. }
  343. - (void)sendClose {
  344. [self sendProto:[[GtalkClose alloc] init]];
  345. }
  346. - (void)handleIqStanza:(GtalkIqStanza *)iq {
  347. if (iq.hasExtension) {
  348. if (iq.extension.id_p == kIqStreamAck) {
  349. [self didReceiveStreamAck:iq];
  350. return;
  351. }
  352. if (iq.extension.id_p == kIqSelectiveAck) {
  353. [self didReceiveSelectiveAck:iq];
  354. return;
  355. }
  356. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection009, @"Unknown ack extension id %d.",
  357. iq.extension.id_p);
  358. } else {
  359. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection010, @"Ip stanza without extension.");
  360. }
  361. [self didReceiveUnhandledProto:iq];
  362. }
  363. - (void)didReceiveLoginResponse:(GtalkLoginResponse *)loginResponse {
  364. if (loginResponse.hasError) {
  365. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection011,
  366. @"Login error with type: %@, message: %@.", loginResponse.error.type,
  367. loginResponse.error.message);
  368. return;
  369. }
  370. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection012, @"Logged onto MCS service.");
  371. // We sent the persisted list of unack'd messages with login so we can assume they have been ack'd
  372. // by the server.
  373. _FIRMessagingDevAssert(self.unackedS2dIds.count == 0, @"No ids present");
  374. _FIRMessagingDevAssert(self.outStreamId == 1, @"Login should be the first stream id");
  375. self.state = kFIRMessagingConnectionSignedIn;
  376. self.lastLoginServerTimestamp = loginResponse.serverTimestamp;
  377. [self.delegate didLoginWithConnection:self];
  378. [self sendHeartbeatPing];
  379. // Add all the TTL=0 messages on connect
  380. for (GPBMessage *message in self.sendOnConnectMessages) {
  381. [self sendProto:message];
  382. }
  383. [self.sendOnConnectMessages removeAllObjects];
  384. }
  385. - (void)didReceiveHeartbeatPing:(GtalkHeartbeatPing *)heartbeatPing {
  386. [self sendHeartbeatAck];
  387. }
  388. - (void)didReceiveHeartbeatAck:(GtalkHeartbeatAck *)heartbeatAck {
  389. }
  390. - (void)didReceiveDataMessageStanza:(GtalkDataMessageStanza *)dataMessageStanza {
  391. // TODO: Maybe add support raw data later
  392. [self.delegate connectionDidRecieveMessage:dataMessageStanza];
  393. }
  394. - (void)didReceiveUnhandledProto:(GPBMessage *)proto {
  395. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection013, @"Received unhandled proto");
  396. }
  397. - (void)didReceiveStreamAck:(GtalkIqStanza *)iq {
  398. // Server received some stuff from us we don't really need to do anything special
  399. }
  400. - (void)didReceiveSelectiveAck:(GtalkIqStanza *)iq {
  401. GtalkExtension *extension = iq.extension;
  402. if (extension) {
  403. int extensionId = extension.id_p;
  404. if (extensionId == kIqSelectiveAck) {
  405. NSString *dataString = extension.data_p;
  406. GtalkSelectiveAck *selectiveAck = [[GtalkSelectiveAck alloc] init];
  407. [selectiveAck mergeFromData:[dataString dataUsingEncoding:NSUTF8StringEncoding]
  408. extensionRegistry:nil];
  409. NSArray <NSString *>*acks = [selectiveAck idArray];
  410. // we've received ACK's
  411. [self.delegate connectionDidReceiveAckForRmqIds:acks];
  412. // resend unacked messages
  413. [self.dataMessageManager resendMessagesWithConnection:self];
  414. }
  415. }
  416. }
  417. - (void)didReceiveClose:(GtalkClose *)close {
  418. [self disconnect];
  419. }
  420. - (void)willProcessProto:(GPBMessage *)proto {
  421. self.inStreamId++;
  422. if ([proto isKindOfClass:GtalkDataMessageStanza.class]) {
  423. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection014,
  424. @"RMQ: Receiving %@ with rmq_id: %@ incoming stream Id: %d",
  425. proto.class, FIRMessagingGetRmq2Id(proto), self.inStreamId);
  426. } else {
  427. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection015,
  428. @"RMQ: Receiving %@ with incoming stream Id: %d.", proto.class,
  429. self.inStreamId);
  430. }
  431. int streamId = FIRMessagingGetLastStreamId(proto);
  432. if (streamId != kInvalidStreamId) {
  433. // confirm the D2S messages that were sent by us
  434. [self confirmAckedD2sIdsWithStreamId:streamId];
  435. // We can now confirm that our ack was received by the server and start our unack'd list fresh
  436. // with the proto we just received.
  437. [self confirmAckedS2dIdsWithStreamId:streamId];
  438. }
  439. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  440. if (rmq2Id != nil) {
  441. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection016,
  442. @"RMQ: Add unacked persistent Id: %@.",
  443. [rmq2Id stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  444. [self.unackedS2dIds addObject:rmq2Id];
  445. [self.rmq2Manager saveS2dMessageWithRmqId:rmq2Id]; // RMQ save
  446. }
  447. BOOL explicitAck = ([proto isKindOfClass:[GtalkDataMessageStanza class]] &&
  448. [(GtalkDataMessageStanza *)proto immediateAck]);
  449. // If we have not sent anything and the ack threshold has been reached then explicitly send one
  450. // to notify the server that we have received messages.
  451. if (self.inStreamId - self.lastStreamIdAcked >= kAckingInterval || explicitAck) {
  452. [self sendStreamAck];
  453. }
  454. }
  455. - (void)willSendProto:(GPBMessage *)proto {
  456. self.outStreamId++;
  457. NSString *rmq2Id = FIRMessagingGetRmq2Id(proto);
  458. if ([rmq2Id length]) {
  459. FIRMessagingD2SInfo *d2sInfo = [[FIRMessagingD2SInfo alloc] initWithStreamId:self.outStreamId d2sId:rmq2Id];
  460. [self.d2sInfos addObject:d2sInfo];
  461. }
  462. // each time we send a d2s message, it acks previously received
  463. // s2d messages via the last (s2d) stream id received.
  464. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection017,
  465. @"RMQ: Sending %@ with outgoing stream Id: %d.", proto.class,
  466. self.outStreamId);
  467. // We have received messages since last time we sent something - send ack info to server.
  468. if (self.inStreamId > self.lastStreamIdAcked) {
  469. FIRMessagingSetLastStreamId(proto, self.inStreamId);
  470. self.lastStreamIdAcked = self.inStreamId;
  471. }
  472. if (self.unackedS2dIds.count > 0) {
  473. // Move all 'unack'd' messages to the ack'd map so they can be removed once the
  474. // ack is confirmed.
  475. NSArray *ackedS2dIds = [NSArray arrayWithArray:self.unackedS2dIds];
  476. FIRMessagingLoggerDebug(
  477. kFIRMessagingMessageCodeConnection018, @"RMQ: Mark persistent Ids as acked: %@.",
  478. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  479. [self.unackedS2dIds removeAllObjects];
  480. self.ackedS2dMap[[@(self.outStreamId) stringValue]] = ackedS2dIds;
  481. }
  482. }
  483. #pragma mark - Private
  484. /**
  485. * This processes the s2d message received in reference to the d2s messages
  486. * that we have sent before.
  487. */
  488. - (void)confirmAckedD2sIdsWithStreamId:(int)lastReceivedStreamId {
  489. NSMutableArray *d2sIdsAcked = [NSMutableArray array];
  490. for (FIRMessagingD2SInfo *d2sInfo in self.d2sInfos) {
  491. if (lastReceivedStreamId < d2sInfo.streamId) {
  492. break;
  493. }
  494. [d2sIdsAcked addObject:d2sInfo];
  495. }
  496. NSMutableArray *rmqIds = [NSMutableArray arrayWithCapacity:[d2sIdsAcked count]];
  497. // remove ACK'ed messages
  498. for (FIRMessagingD2SInfo *d2sInfo in d2sIdsAcked) {
  499. if ([d2sInfo.d2sID length]) {
  500. [rmqIds addObject:d2sInfo.d2sID];
  501. }
  502. [self.d2sInfos removeObject:d2sInfo];
  503. }
  504. [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
  505. int count = [self.delegate connectionDidReceiveAckForRmqIds:rmqIds];
  506. if (kMessageRemoveAckThresholdCount > 0 && count >= kMessageRemoveAckThresholdCount) {
  507. // For short lived connections, if a large number of messages are removed, send an
  508. // ack straight away so the server knows that this message was received.
  509. [self sendStreamAck];
  510. }
  511. }
  512. /**
  513. * Called when a stream ACK or a selective ACK are received - this indicates the message has
  514. * been received by MCS.
  515. */
  516. - (void)didReceiveAckForRmqIds:(NSArray *)rmqIds {
  517. // TODO: let the user know that the following messages were received by the server
  518. }
  519. - (void)confirmAckedS2dIdsWithStreamId:(int)lastReceivedStreamId {
  520. // If the server hasn't received the streamId yet.
  521. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection019,
  522. @"RMQ: Server last received stream Id: %d.", lastReceivedStreamId);
  523. if (lastReceivedStreamId < self.outStreamId) {
  524. // TODO: This could be a good indicator that we need to re-send something (acks)?
  525. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection020,
  526. @"RMQ: There are unsent messages that should be send...\n"
  527. "server received: %d\nlast stream id sent: %d",
  528. lastReceivedStreamId, self.outStreamId);
  529. }
  530. NSSet *ackedStreamIds =
  531. [self.ackedS2dMap keysOfEntriesPassingTest:^BOOL(id key, id obj, BOOL *stop) {
  532. NSString *streamId = key;
  533. return streamId.intValue <= lastReceivedStreamId;
  534. }];
  535. NSMutableArray *s2dIdsToDelete = [NSMutableArray array];
  536. for (NSString *streamId in ackedStreamIds) {
  537. NSArray *ackedS2dIds = self.ackedS2dMap[streamId];
  538. if (ackedS2dIds.count > 0) {
  539. FIRMessagingLoggerDebug(
  540. kFIRMessagingMessageCodeConnection021,
  541. @"RMQ: Mark persistent Ids as confirmed by stream id %@: %@.", streamId,
  542. [ackedS2dIds.description stringByReplacingOccurrencesOfString:@"%" withString:@"%%"]);
  543. [self.ackedS2dMap removeObjectForKey:streamId];
  544. }
  545. [s2dIdsToDelete addObjectsFromArray:ackedS2dIds];
  546. }
  547. // clean up s2d ids that the server knows we've received.
  548. // we let the server know via a s2d last stream id received in a
  549. // d2s message. the server lets us know it has received our d2s
  550. // message via a d2s last stream id received in a s2d message.
  551. [self.rmq2Manager removeS2dIds:s2dIdsToDelete];
  552. }
  553. - (void)resetUnconfirmedAcks {
  554. [self.ackedS2dMap enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
  555. [self.unackedS2dIds addObjectsFromArray:obj];
  556. }];
  557. [self.ackedS2dMap removeAllObjects];
  558. }
  559. - (void)disconnect {
  560. _FIRMessagingDevAssert(self.state != kFIRMessagingConnectionNotConnected, @"Connection already not connected");
  561. // cancel pending timeout tasks.
  562. [self cancelConnectionTimeoutTask];
  563. // cancel pending heartbeat.
  564. [NSObject cancelPreviousPerformRequestsWithTarget:self
  565. selector:@selector(sendHeartbeatPing)
  566. object:nil];
  567. // Unset the delegate. FIRMessagingConnection will not receive further events from the socket from now on.
  568. self.socket.delegate = nil;
  569. [self.socket disconnect];
  570. self.state = kFIRMessagingConnectionNotConnected;
  571. }
  572. - (void)connectionTimedOut {
  573. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection022,
  574. @"Connection to FIRMessaging service timed out.");
  575. [self disconnect];
  576. [self.delegate connection:self didCloseForReason:kFIRMessagingConnectionCloseReasonTimeout];
  577. }
  578. - (void)scheduleConnectionTimeoutTask {
  579. // cancel the previous heartbeat timeout event and schedule a new one.
  580. [self cancelConnectionTimeoutTask];
  581. [self performSelector:@selector(connectionTimedOut)
  582. withObject:nil
  583. afterDelay:[self connectionTimeoutInterval]];
  584. }
  585. - (void)cancelConnectionTimeoutTask {
  586. // cancel pending timeout tasks.
  587. [NSObject cancelPreviousPerformRequestsWithTarget:self
  588. selector:@selector(connectionTimedOut)
  589. object:nil];
  590. }
  591. - (void)logMessage:(NSString *)description messageType:(int)messageType isOut:(BOOL)isOut {
  592. messageType = isOut ? -messageType : messageType;
  593. FIRMessagingLoggerDebug(kFIRMessagingMessageCodeConnection023,
  594. @"Send msg: %@ type: %d inStreamId: %d outStreamId: %d", description,
  595. messageType, self.inStreamId, self.outStreamId);
  596. }
  597. - (NSTimeInterval)connectionTimeoutInterval {
  598. return kConnectionTimeout;
  599. }
  600. @end