| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- /*
- * Copyright 2017 Google
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #import "FIRMessagingRmqManager.h"
- #import <sqlite3.h>
- #import "FIRMessagingDefines.h"
- #import "FIRMessagingLogger.h"
- #import "FIRMessagingRmq2PersistentStore.h"
- #import "FIRMessagingUtilities.h"
- #import "Protos/GtalkCore.pbobjc.h"
- #ifndef _FIRMessagingRmqLogAndExit
- #define _FIRMessagingRmqLogAndExit(stmt, return_value) \
- do { \
- [self logErrorAndFinalizeStatement:stmt]; \
- return return_value; \
- } while(0)
- #endif
- static NSString *const kFCMRmqTag = @"FIRMessagingRmq:";
- @interface FIRMessagingRmqManager ()
- @property(nonatomic, readwrite, strong) FIRMessagingRmq2PersistentStore *rmq2Store;
- // map the category of an outgoing message with the number of messages for that category
- // should always have two keys -- the app, gcm
- @property(nonatomic, readwrite, strong) NSMutableDictionary *outstandingMessages;
- // Outgoing RMQ persistent id
- @property(nonatomic, readwrite, assign) int64_t rmqId;
- @end
- @implementation FIRMessagingRmqManager
- - (instancetype)initWithDatabaseName:(NSString *)databaseName {
- self = [super init];
- if (self) {
- _FIRMessagingDevAssert([databaseName length] > 0, @"RMQ: Invalid rmq db name");
- _rmq2Store = [[FIRMessagingRmq2PersistentStore alloc] initWithDatabaseName:databaseName];
- _outstandingMessages = [NSMutableDictionary dictionaryWithCapacity:2];
- _rmqId = -1;
- }
- return self;
- }
- - (void)loadRmqId {
- if (self.rmqId >= 0) {
- return; // already done
- }
- [self loadInitialOutgoingPersistentId];
- if (self.outstandingMessages.count) {
- FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmqManager000,
- @"%@: outstanding categories %ld", kFCMRmqTag,
- _FIRMessaging_UL(self.outstandingMessages.count));
- }
- }
- /**
- * Initialize the 'initial RMQ':
- * - max ID of any message in the queue
- * - if the queue is empty, stored value in separate DB.
- *
- * Stream acks will remove from RMQ, when we remove the highest message we keep track
- * of its ID.
- */
- - (void)loadInitialOutgoingPersistentId {
- // we shouldn't always trust the lastRmqId stored in the LastRmqId table, because
- // we only save to the LastRmqId table once in a while (after getting the lastRmqId sent
- // by the server after reconnect, and after getting a rmq ack from the server). The
- // rmq message with the highest rmq id tells the real story, so check against that first.
- int64_t rmqId = [self queryHighestRmqId];
- if (rmqId == 0) {
- rmqId = [self querylastRmqId];
- }
- self.rmqId = rmqId + 1;
- }
- #pragma mark - Save
- /**
- * Save a message to RMQ2. Will populate the rmq2 persistent ID.
- */
- - (BOOL)saveRmqMessage:(GPBMessage *)message
- error:(NSError **)error {
- // send using rmq2manager
- // the wire format of rmq2 id is a string. However, we keep it as a long internally
- // in the database. So only convert the id to string when preparing for sending over
- // the wire.
- NSString *rmq2Id = FIRMessagingGetRmq2Id(message);
- if (![rmq2Id length]) {
- int64_t rmqId = [self nextRmqId];
- rmq2Id = [NSString stringWithFormat:@"%lld", rmqId];
- FIRMessagingSetRmq2Id(message, rmq2Id);
- }
- FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(message);
- return [self saveMessage:message withRmqId:[rmq2Id integerValue] tag:tag error:error];
- }
- - (BOOL)saveMessage:(GPBMessage *)message
- withRmqId:(int64_t)rmqId
- tag:(int8_t)tag
- error:(NSError **)error {
- NSData *data = [message data];
- return [self.rmq2Store saveMessageWithRmqId:rmqId tag:tag data:data error:error];
- }
- /**
- * This is called when we delete the largest outgoing message from queue.
- */
- - (void)saveLastOutgoingRmqId:(int64_t)rmqID {
- [self.rmq2Store updateLastOutgoingRmqId:rmqID];
- }
- - (BOOL)saveS2dMessageWithRmqId:(NSString *)rmqID {
- return [self.rmq2Store saveUnackedS2dMessageWithRmqId:rmqID];
- }
- #pragma mark - Query
- - (int64_t)queryHighestRmqId {
- return [self.rmq2Store queryHighestRmqId];
- }
- - (int64_t)querylastRmqId {
- return [self.rmq2Store queryLastRmqId];
- }
- - (NSArray *)unackedS2dRmqIds {
- return [self.rmq2Store unackedS2dRmqIds];
- }
- #pragma mark - FIRMessagingRMQScanner protocol
- /**
- * We don't have a 'getMessages' method - it would require loading in memory
- * the entire content body of all messages.
- *
- * Instead we iterate and call 'resend' for each message.
- *
- * This is called:
- * - on connect MCS, to resend any outstanding messages
- * - init
- */
- - (void)scanWithRmqMessageHandler:(FIRMessagingRmqMessageHandler)rmqMessageHandler
- dataMessageHandler:(FIRMessagingDataMessageHandler)dataMessageHandler {
- // no need to scan database with no callbacks
- if (rmqMessageHandler || dataMessageHandler) {
- [self.rmq2Store scanOutgoingRmqMessagesWithHandler:^(int64_t rmqId, int8_t tag, NSData *data) {
- if (rmqMessageHandler != nil) {
- rmqMessageHandler(rmqId, tag, data);
- }
- if (dataMessageHandler != nil && kFIRMessagingProtoTagDataMessageStanza == tag) {
- GPBMessage *proto =
- [FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag) parseFromData:data error:NULL];
- GtalkDataMessageStanza *stanza = (GtalkDataMessageStanza *)proto;
- dataMessageHandler(rmqId, stanza);
- }
- }];
- }
- }
- #pragma mark - Remove
- - (void)ackReceivedForRmqId:(NSString *)rmqId {
- // TODO: Optional book-keeping
- }
- - (int)removeRmqMessagesWithRmqId:(NSString *)rmqId {
- return [self removeRmqMessagesWithRmqIds:@[rmqId]];
- }
- - (int)removeRmqMessagesWithRmqIds:(NSArray *)rmqIds {
- if (![rmqIds count]) {
- return 0;
- }
- for (NSString *rmqId in rmqIds) {
- [self ackReceivedForRmqId:rmqId];
- }
- int64_t maxRmqId = -1;
- for (NSString *rmqId in rmqIds) {
- int64_t rmqIdValue = [rmqId longLongValue];
- if (rmqIdValue > maxRmqId) {
- maxRmqId = rmqIdValue;
- }
- }
- maxRmqId++;
- if (maxRmqId >= self.rmqId) {
- [self saveLastOutgoingRmqId:maxRmqId];
- }
- return [self.rmq2Store deleteMessagesFromTable:kTableOutgoingRmqMessages withRmqIds:rmqIds];
- }
- - (void)removeS2dIds:(NSArray *)s2dIds {
- [self.rmq2Store deleteMessagesFromTable:kTableS2DRmqIds withRmqIds:s2dIds];
- }
- #pragma mark - Sync Messages
- // TODO: RMQManager should also have a cache for all the sync messages
- // so we don't hit the DB each time.
- - (FIRMessagingPersistentSyncMessage *)querySyncMessageWithRmqID:(NSString *)rmqID {
- return [self.rmq2Store querySyncMessageWithRmqID:rmqID];
- }
- - (BOOL)deleteSyncMessageWithRmqID:(NSString *)rmqID {
- return [self.rmq2Store deleteSyncMessageWithRmqID:rmqID];
- }
- - (int)deleteExpiredOrFinishedSyncMessages:(NSError **)error {
- return [self.rmq2Store deleteExpiredOrFinishedSyncMessages:error];
- }
- - (BOOL)saveSyncMessageWithRmqID:(NSString *)rmqID
- expirationTime:(int64_t)expirationTime
- apnsReceived:(BOOL)apnsReceived
- mcsReceived:(BOOL)mcsReceived
- error:(NSError *__autoreleasing *)error {
- return [self.rmq2Store saveSyncMessageWithRmqID:rmqID
- expirationTime:expirationTime
- apnsReceived:apnsReceived
- mcsReceived:mcsReceived
- error:error];
- }
- - (BOOL)updateSyncMessageViaAPNSWithRmqID:(NSString *)rmqID error:(NSError **)error {
- return [self.rmq2Store updateSyncMessageViaAPNSWithRmqID:rmqID error:error];
- }
- - (BOOL)updateSyncMessageViaMCSWithRmqID:(NSString *)rmqID error:(NSError **)error {
- return [self.rmq2Store updateSyncMessageViaMCSWithRmqID:rmqID error:error];
- }
- #pragma mark - Testing
- + (void)removeDatabaseWithName:(NSString *)dbName {
- [FIRMessagingRmq2PersistentStore removeDatabase:dbName];
- }
- #pragma mark - Private
- - (int64_t)nextRmqId {
- return ++self.rmqId;
- }
- @end
|