FRepo.m 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <dlfcn.h>
  17. #import "FRepo.h"
  18. #import "FSnapshotUtilities.h"
  19. #import "FConstants.h"
  20. #import "FIRDatabaseQuery_Private.h"
  21. #import "FQuerySpec.h"
  22. #import "FTupleNodePath.h"
  23. #import "FRepo_Private.h"
  24. #import "FRepoManager.h"
  25. #import "FServerValues.h"
  26. #import "FTupleSetIdPath.h"
  27. #import "FSyncTree.h"
  28. #import "FEventRegistration.h"
  29. #import "FAtomicNumber.h"
  30. #import "FSyncTree.h"
  31. #import "FListenProvider.h"
  32. #import "FEventRaiser.h"
  33. #import "FSnapshotHolder.h"
  34. #import "FIRDatabaseConfig_Private.h"
  35. #import "FLevelDBStorageEngine.h"
  36. #import "FPersistenceManager.h"
  37. #import "FWriteRecord.h"
  38. #import "FCachePolicy.h"
  39. #import "FClock.h"
  40. #import "FIRDatabase_Private.h"
  41. #import "FTree.h"
  42. #import "FTupleTransaction.h"
  43. #import "FIRTransactionResult.h"
  44. #import "FIRTransactionResult_Private.h"
  45. #import "FIRMutableData.h"
  46. #import "FIRMutableData_Private.h"
  47. #import "FIRDataSnapshot.h"
  48. #import "FIRDataSnapshot_Private.h"
  49. #import "FValueEventRegistration.h"
  50. #import "FEmptyNode.h"
  51. #ifdef TARGET_OS_IPHONE
  52. #import <UIKit/UIKit.h>
  53. #endif
  54. @interface FRepo()
  55. @property (nonatomic, strong) FOffsetClock *serverClock;
  56. @property (nonatomic, strong) FPersistenceManager* persistenceManager;
  57. @property (nonatomic, strong) FIRDatabase *database;
  58. @property (nonatomic, strong, readwrite) FAuthenticationManager *auth;
  59. @property (nonatomic, strong) FSyncTree *infoSyncTree;
  60. @property (nonatomic) NSInteger writeIdCounter;
  61. @property (nonatomic) BOOL hijackHash;
  62. @property (nonatomic, strong) FTree *transactionQueueTree;
  63. @property (nonatomic) BOOL loggedTransactionPersistenceWarning;
  64. /**
  65. * Test only. For load testing the server.
  66. */
  67. @property (nonatomic, strong) id (^interceptServerDataCallback)(NSString *pathString, id data);
  68. @end
  69. @implementation FRepo
  70. - (id)initWithRepoInfo:(FRepoInfo*)info config:(FIRDatabaseConfig *)config database:(FIRDatabase *)database {
  71. self = [super init];
  72. if (self) {
  73. self.repoInfo = info;
  74. self.config = config;
  75. self.database = database;
  76. // Access can occur outside of shared queue, so the clock needs to be initialized here
  77. self.serverClock = [[FOffsetClock alloc] initWithClock:[FSystemClock clock] offset:0];
  78. self.connection = [[FPersistentConnection alloc] initWithRepoInfo:self.repoInfo dispatchQueue:[FIRDatabaseQuery sharedQueue] config:self.config];
  79. // Needs to be called before authentication manager is instantiated
  80. self.eventRaiser = [[FEventRaiser alloc] initWithQueue:self.config.callbackQueue];
  81. dispatch_async([FIRDatabaseQuery sharedQueue], ^{
  82. [self deferredInit];
  83. });
  84. }
  85. return self;
  86. }
  87. - (void)deferredInit {
  88. // TODO: cleanup on dealloc
  89. __weak FRepo *weakSelf = self;
  90. [self.config.authTokenProvider listenForTokenChanges:^(NSString *token) {
  91. [weakSelf.connection refreshAuthToken:token];
  92. }];
  93. // Open connection now so that by the time we are connected the deferred init has run
  94. // This relies on the fact that all callbacks run on repos queue
  95. self.connection.delegate = self;
  96. [self.connection open];
  97. self.dataUpdateCount = 0;
  98. self.rangeMergeUpdateCount = 0;
  99. self.interceptServerDataCallback = nil;
  100. if (self.config.persistenceEnabled) {
  101. NSString* repoHashString = [NSString stringWithFormat:@"%@_%@", self.repoInfo.host, self.repoInfo.namespace];
  102. NSString* persistencePrefix = [NSString stringWithFormat:@"%@/%@", self.config.sessionIdentifier, repoHashString];
  103. id<FCachePolicy> cachePolicy = [[FLRUCachePolicy alloc] initWithMaxSize:self.config.persistenceCacheSizeBytes];
  104. id<FStorageEngine> engine;
  105. if (self.config.forceStorageEngine != nil) {
  106. engine = self.config.forceStorageEngine;
  107. } else {
  108. FLevelDBStorageEngine *levelDBEngine = [[FLevelDBStorageEngine alloc] initWithPath:persistencePrefix];
  109. // We need the repo info to run the legacy migration. Future migrations will be managed by the database itself
  110. // Remove this once we are confident that no-one is using legacy migration anymore...
  111. [levelDBEngine runLegacyMigration:self.repoInfo];
  112. engine = levelDBEngine;
  113. }
  114. self.persistenceManager = [[FPersistenceManager alloc] initWithStorageEngine:engine cachePolicy:cachePolicy];
  115. } else {
  116. self.persistenceManager = nil;
  117. }
  118. [self initTransactions];
  119. // A list of data pieces and paths to be set when this client disconnects
  120. self.onDisconnect = [[FSparseSnapshotTree alloc] init];
  121. self.infoData = [[FSnapshotHolder alloc] init];
  122. FListenProvider *infoListenProvider = [[FListenProvider alloc] init];
  123. infoListenProvider.startListening = ^(FQuerySpec *query,
  124. NSNumber *tagId,
  125. id<FSyncTreeHash> hash,
  126. fbt_nsarray_nsstring onComplete) {
  127. NSArray *infoEvents = @[];
  128. FRepo *strongSelf = weakSelf;
  129. id<FNode> node = [strongSelf.infoData getNode:query.path];
  130. // This is possibly a hack, but we have different semantics for .info endpoints. We don't raise null events
  131. // on initial data...
  132. if (![node isEmpty]) {
  133. infoEvents = [strongSelf.infoSyncTree applyServerOverwriteAtPath:query.path newData:node];
  134. [strongSelf.eventRaiser raiseCallback:^{
  135. onComplete(kFWPResponseForActionStatusOk);
  136. }];
  137. }
  138. return infoEvents;
  139. };
  140. infoListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tagId) {};
  141. self.infoSyncTree = [[FSyncTree alloc] initWithListenProvider:infoListenProvider];
  142. FListenProvider *serverListenProvider = [[FListenProvider alloc] init];
  143. serverListenProvider.startListening = ^(FQuerySpec *query,
  144. NSNumber *tagId,
  145. id<FSyncTreeHash> hash,
  146. fbt_nsarray_nsstring onComplete) {
  147. [weakSelf.connection listen:query tagId:tagId hash:hash onComplete:^(NSString *status) {
  148. NSArray *events = onComplete(status);
  149. [weakSelf.eventRaiser raiseEvents:events];
  150. }];
  151. // No synchronous events for network-backed sync trees
  152. return @[];
  153. };
  154. serverListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tag) {
  155. [weakSelf.connection unlisten:query tagId:tag];
  156. };
  157. self.serverSyncTree = [[FSyncTree alloc] initWithPersistenceManager:self.persistenceManager
  158. listenProvider:serverListenProvider];
  159. [self restoreWrites];
  160. [self updateInfo:kDotInfoConnected withValue:@NO];
  161. [self setupNotifications];
  162. }
  163. - (void) restoreWrites {
  164. NSArray *writes = self.persistenceManager.userWrites;
  165. NSDictionary *serverValues = [FServerValues generateServerValues:self.serverClock];
  166. __block NSInteger lastWriteId = NSIntegerMin;
  167. [writes enumerateObjectsUsingBlock:^(FWriteRecord *write, NSUInteger idx, BOOL *stop) {
  168. NSInteger writeId = write.writeId;
  169. fbt_void_nsstring_nsstring callback = ^(NSString *status, NSString *errorReason) {
  170. [self warnIfWriteFailedAtPath:write.path status:status message:@"Persisted write"];
  171. [self ackWrite:writeId rerunTransactionsAtPath:write.path status:status];
  172. };
  173. if (lastWriteId >= writeId) {
  174. [NSException raise:NSInternalInconsistencyException format:@"Restored writes were not in order!"];
  175. }
  176. lastWriteId = writeId;
  177. self.writeIdCounter = writeId + 1;
  178. if ([write isOverwrite]) {
  179. FFLog(@"I-RDB038001", @"Restoring overwrite with id %ld", (long)write.writeId);
  180. [self.connection putData:[write.overwrite valForExport:YES]
  181. forPath:[write.path toString]
  182. withHash:nil
  183. withCallback:callback];
  184. id<FNode> resolved = [FServerValues resolveDeferredValueSnapshot:write.overwrite withServerValues:serverValues];
  185. [self.serverSyncTree applyUserOverwriteAtPath:write.path newData:resolved writeId:writeId isVisible:YES];
  186. } else {
  187. FFLog(@"I-RDB038002", @"Restoring merge with id %ld", (long)write.writeId);
  188. [self.connection mergeData:[write.merge valForExport:YES]
  189. forPath:[write.path toString]
  190. withCallback:callback];
  191. FCompoundWrite *resolved = [FServerValues resolveDeferredValueCompoundWrite:write.merge withServerValues:serverValues];
  192. [self.serverSyncTree applyUserMergeAtPath:write.path changedChildren:resolved writeId:writeId];
  193. }
  194. }];
  195. }
  196. - (NSString*)name {
  197. return self.repoInfo.namespace;
  198. }
  199. - (NSString *) description {
  200. return [self.repoInfo description];
  201. }
  202. - (void) interrupt {
  203. [self.connection interruptForReason:kFInterruptReasonRepoInterrupt];
  204. }
  205. - (void) resume {
  206. [self.connection resumeForReason:kFInterruptReasonRepoInterrupt];
  207. }
  208. // NOTE: Typically if you're calling this, you should be in an @autoreleasepool block to make sure that ARC kicks
  209. // in and cleans up things no longer referenced (i.e. pendingPutsDB).
  210. - (void) dispose {
  211. [self.connection interruptForReason:kFInterruptReasonRepoInterrupt];
  212. // We need to nil out any references to LevelDB, to make sure the
  213. // LevelDB exclusive locks are released.
  214. [self.persistenceManager close];
  215. }
  216. - (NSInteger) nextWriteId {
  217. return self->_writeIdCounter++;
  218. }
  219. - (NSTimeInterval) serverTime {
  220. return [self.serverClock currentTime];
  221. }
  222. - (void) set:(FPath *)path withNode:(id<FNode>)node withCallback:(fbt_void_nserror_ref)onComplete {
  223. id value = [node valForExport:YES];
  224. FFLog(@"I-RDB038003", @"Setting: %@ with %@ pri: %@", [path toString], [value description], [[node getPriority] val]);
  225. // TODO: Optimize this behavior to either (a) store flag to skip resolving where possible and / or
  226. // (b) store unresolved paths on JSON parse
  227. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  228. id<FNode> newNode = [FServerValues resolveDeferredValueSnapshot:node withServerValues:serverValues];
  229. NSInteger writeId = [self nextWriteId];
  230. [self.persistenceManager saveUserOverwrite:node atPath:path writeId:writeId];
  231. NSArray *events = [self.serverSyncTree applyUserOverwriteAtPath:path newData:newNode writeId:writeId isVisible:YES];
  232. [self.eventRaiser raiseEvents:events];
  233. [self.connection putData:value forPath:[path toString] withHash:nil withCallback:^(NSString *status, NSString *errorReason) {
  234. [self warnIfWriteFailedAtPath:path status:status message:@"setValue: or removeValue:"];
  235. [self ackWrite:writeId rerunTransactionsAtPath:path status:status];
  236. [self callOnComplete:onComplete withStatus:status errorReason:errorReason andPath:path];
  237. }];
  238. FPath* affectedPath = [self abortTransactionsAtPath:path error:kFTransactionSet];
  239. [self rerunTransactionsForPath:affectedPath];
  240. }
  241. - (void) update:(FPath *)path withNodes:(FCompoundWrite *)nodes withCallback:(fbt_void_nserror_ref)callback {
  242. NSDictionary *values = [nodes valForExport:YES];
  243. FFLog(@"I-RDB038004", @"Updating: %@ with %@", [path toString], [values description]);
  244. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  245. FCompoundWrite *resolved = [FServerValues resolveDeferredValueCompoundWrite:nodes withServerValues:serverValues];
  246. if (!resolved.isEmpty) {
  247. NSInteger writeId = [self nextWriteId];
  248. [self.persistenceManager saveUserMerge:nodes atPath:path writeId:writeId];
  249. NSArray *events = [self.serverSyncTree applyUserMergeAtPath:path changedChildren:resolved writeId:writeId];
  250. [self.eventRaiser raiseEvents:events];
  251. [self.connection mergeData:values forPath:[path description] withCallback:^(NSString *status, NSString *errorReason) {
  252. [self warnIfWriteFailedAtPath:path status:status message:@"updateChildValues:"];
  253. [self ackWrite:writeId rerunTransactionsAtPath:path status:status];
  254. [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path];
  255. }];
  256. [nodes enumerateWrites:^(FPath *childPath, id<FNode> node, BOOL *stop) {
  257. FPath* pathFromRoot = [path child:childPath];
  258. FFLog(@"I-RDB038005", @"Cancelling transactions at path: %@", pathFromRoot);
  259. FPath *affectedPath = [self abortTransactionsAtPath:pathFromRoot error:kFTransactionSet];
  260. [self rerunTransactionsForPath:affectedPath];
  261. }];
  262. } else {
  263. FFLog(@"I-RDB038006", @"update called with empty data. Doing nothing");
  264. // Do nothing, just call the callback
  265. [self callOnComplete:callback withStatus:@"ok" errorReason:nil andPath:path];
  266. }
  267. }
  268. - (void) onDisconnectCancel:(FPath *)path withCallback:(fbt_void_nserror_ref)callback {
  269. [self.connection onDisconnectCancelPath:path withCallback:^(NSString *status, NSString *errorReason) {
  270. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  271. if (success) {
  272. [self.onDisconnect forgetPath:path];
  273. } else {
  274. FFLog(@"I-RDB038007", @"cancelDisconnectOperations: at %@ failed: %@", path, status);
  275. }
  276. [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path];
  277. }];
  278. }
  279. - (void) onDisconnectSet:(FPath *)path withNode:(id<FNode>)node withCallback:(fbt_void_nserror_ref)callback {
  280. [self.connection onDisconnectPutData:[node valForExport:YES] forPath:path withCallback:^(NSString *status, NSString *errorReason) {
  281. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  282. if (success) {
  283. [self.onDisconnect rememberData:node onPath:path];
  284. } else {
  285. FFWarn(@"I-RDB038008", @"onDisconnectSetValue: or onDisconnectRemoveValue: at %@ failed: %@", path, status);
  286. }
  287. [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path];
  288. }];
  289. }
  290. - (void) onDisconnectUpdate:(FPath *)path withNodes:(FCompoundWrite *)nodes withCallback:(fbt_void_nserror_ref)callback {
  291. if (!nodes.isEmpty) {
  292. NSDictionary *values = [nodes valForExport:YES];
  293. [self.connection onDisconnectMergeData:values forPath:path withCallback:^(NSString *status, NSString *errorReason) {
  294. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  295. if (success) {
  296. [nodes enumerateWrites:^(FPath *relativePath, id<FNode> nodeUnresolved, BOOL *stop) {
  297. FPath* childPath = [path child:relativePath];
  298. [self.onDisconnect rememberData:nodeUnresolved onPath:childPath];
  299. }];
  300. } else {
  301. FFWarn(@"I-RDB038009", @"onDisconnectUpdateChildValues: at %@ failed %@", path, status);
  302. }
  303. [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path];
  304. }];
  305. } else {
  306. // Do nothing, just call the callback
  307. [self callOnComplete:callback withStatus:@"ok" errorReason:nil andPath:path];
  308. }
  309. }
  310. - (void) purgeOutstandingWrites {
  311. FFLog(@"I-RDB038010", @"Purging outstanding writes");
  312. NSArray *events = [self.serverSyncTree removeAllWrites];
  313. [self.eventRaiser raiseEvents:events];
  314. // Abort any transactions
  315. [self abortTransactionsAtPath:[FPath empty] error:kFErrorWriteCanceled];
  316. // Remove outstanding writes from connection
  317. [self.connection purgeOutstandingWrites];
  318. }
  319. - (void) addEventRegistration:(id <FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query {
  320. NSArray *events = nil;
  321. if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) {
  322. events = [self.infoSyncTree addEventRegistration:eventRegistration forQuery:query];
  323. } else {
  324. events = [self.serverSyncTree addEventRegistration:eventRegistration forQuery:query];
  325. }
  326. [self.eventRaiser raiseEvents:events];
  327. }
  328. - (void) removeEventRegistration:(id<FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query {
  329. // These are guaranteed not to raise events, since we're not passing in a cancelError. However we can future-proof
  330. // a little bit by handling the return values anyways.
  331. FFLog(@"I-RDB038011", @"Removing event registration with hande: %lu", (unsigned long)eventRegistration.handle);
  332. NSArray *events = nil;
  333. if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) {
  334. events = [self.infoSyncTree removeEventRegistration:eventRegistration forQuery:query cancelError:nil];
  335. } else {
  336. events = [self.serverSyncTree removeEventRegistration:eventRegistration forQuery:query cancelError:nil];
  337. }
  338. [self.eventRaiser raiseEvents:events];
  339. }
  340. - (void) keepQuery:(FQuerySpec *)query synced:(BOOL)synced {
  341. NSAssert(![[query.path getFront] isEqualToString:kDotInfoPrefix], @"Can't keep .info tree synced!");
  342. [self.serverSyncTree keepQuery:query synced:synced];
  343. }
  344. - (void) updateInfo:(NSString *) pathString withValue:(id)value {
  345. // hack to make serverTimeOffset available in a threadsafe way. Property is marked as atomic
  346. if ([pathString isEqualToString:kDotInfoServerTimeOffset]) {
  347. NSTimeInterval offset = [(NSNumber *)value doubleValue]/1000.0;
  348. self.serverClock = [[FOffsetClock alloc] initWithClock:[FSystemClock clock] offset:offset];
  349. }
  350. FPath* path = [[FPath alloc] initWith:[NSString stringWithFormat:@"%@/%@", kDotInfoPrefix, pathString]];
  351. id<FNode> newNode = [FSnapshotUtilities nodeFrom:value];
  352. [self.infoData updateSnapshot:path withNewSnapshot:newNode];
  353. NSArray *events = [self.infoSyncTree applyServerOverwriteAtPath:path newData:newNode];
  354. [self.eventRaiser raiseEvents:events];
  355. }
  356. - (void) callOnComplete:(fbt_void_nserror_ref)onComplete withStatus:(NSString *)status errorReason:(NSString *)errorReason andPath:(FPath *)path {
  357. if (onComplete) {
  358. FIRDatabaseReference * ref = [[FIRDatabaseReference alloc] initWithRepo:self path:path];
  359. BOOL statusOk = [status isEqualToString:kFWPResponseForActionStatusOk];
  360. NSError* err = nil;
  361. if (!statusOk) {
  362. err = [FUtilities errorForStatus:status andReason:errorReason];
  363. }
  364. [self.eventRaiser raiseCallback:^{
  365. onComplete(err, ref);
  366. }];
  367. }
  368. }
  369. - (void)ackWrite:(NSInteger)writeId rerunTransactionsAtPath:(FPath *)path status:(NSString *)status {
  370. if ([status isEqualToString:kFErrorWriteCanceled]) {
  371. // This write was already removed, we just need to ignore it...
  372. } else {
  373. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  374. NSArray *clearEvents = [self.serverSyncTree ackUserWriteWithWriteId:writeId revert:!success persist:YES clock:self.serverClock];
  375. if ([clearEvents count] > 0) {
  376. [self rerunTransactionsForPath:path];
  377. }
  378. [self.eventRaiser raiseEvents:clearEvents];
  379. }
  380. }
  381. - (void) warnIfWriteFailedAtPath:(FPath *)path status:(NSString *)status message:(NSString *)message {
  382. if (!([status isEqualToString:kFWPResponseForActionStatusOk] || [status isEqualToString:kFErrorWriteCanceled])) {
  383. FFWarn(@"I-RDB038012", @"%@ at %@ failed: %@", message, path, status);
  384. }
  385. }
  386. #pragma mark -
  387. #pragma mark FPersistentConnectionDelegate methods
  388. - (void) onDataUpdate:(FPersistentConnection *)fpconnection forPath:(NSString *)pathString message:(id)data isMerge:(BOOL)isMerge tagId:(NSNumber *)tagId {
  389. FFLog(@"I-RDB038013", @"onDataUpdateForPath: %@ withMessage: %@", pathString, data);
  390. // For testing.
  391. self.dataUpdateCount++;
  392. FPath* path = [[FPath alloc] initWith:pathString];
  393. data = self.interceptServerDataCallback ? self.interceptServerDataCallback(pathString, data) : data;
  394. NSArray *events = nil;
  395. if (tagId != nil) {
  396. if (isMerge) {
  397. NSDictionary *message = data;
  398. FCompoundWrite *taggedChildren = [FCompoundWrite compoundWriteWithValueDictionary:message];
  399. events = [self.serverSyncTree applyTaggedQueryMergeAtPath:path changedChildren:taggedChildren tagId:tagId];
  400. } else {
  401. id<FNode> taggedSnap = [FSnapshotUtilities nodeFrom:data];
  402. events = [self.serverSyncTree applyTaggedQueryOverwriteAtPath:path newData:taggedSnap tagId:tagId];
  403. }
  404. } else if (isMerge) {
  405. NSDictionary *message = data;
  406. FCompoundWrite *changedChildren = [FCompoundWrite compoundWriteWithValueDictionary:message];
  407. events = [self.serverSyncTree applyServerMergeAtPath:path changedChildren:changedChildren];
  408. } else {
  409. id<FNode> snap = [FSnapshotUtilities nodeFrom:data];
  410. events = [self.serverSyncTree applyServerOverwriteAtPath:path newData:snap];
  411. }
  412. if ([events count] > 0) {
  413. // Since we have a listener outstanding for each transaction, receiving any events
  414. // is a proxy for some change having occurred.
  415. [self rerunTransactionsForPath:path];
  416. }
  417. [self.eventRaiser raiseEvents:events];
  418. }
  419. - (void)onRangeMerge:(NSArray *)ranges forPath:(NSString *)pathString tagId:(NSNumber *)tag {
  420. FFLog(@"I-RDB038014", @"onRangeMerge: %@ => %@", pathString, ranges);
  421. // For testing
  422. self.rangeMergeUpdateCount++;
  423. FPath* path = [[FPath alloc] initWith:pathString];
  424. NSArray *events;
  425. if (tag != nil) {
  426. events = [self.serverSyncTree applyTaggedServerRangeMergeAtPath:path updates:ranges tagId:tag];
  427. } else {
  428. events = [self.serverSyncTree applyServerRangeMergeAtPath:path updates:ranges];
  429. }
  430. if (events.count > 0) {
  431. // Since we have a listener outstanding for each transaction, receiving any events
  432. // is a proxy for some change having occurred.
  433. [self rerunTransactionsForPath:path];
  434. }
  435. [self.eventRaiser raiseEvents:events];
  436. }
  437. - (void)onConnect:(FPersistentConnection *)fpconnection {
  438. [self updateInfo:kDotInfoConnected withValue:@true];
  439. }
  440. - (void)onDisconnect:(FPersistentConnection *)fpconnection {
  441. [self updateInfo:kDotInfoConnected withValue:@false];
  442. [self runOnDisconnectEvents];
  443. }
  444. - (void)onServerInfoUpdate:(FPersistentConnection *)fpconnection updates:(NSDictionary *)updates {
  445. for (NSString* key in updates) {
  446. id val = [updates objectForKey:key];
  447. [self updateInfo:key withValue:val];
  448. }
  449. }
  450. - (void) setupNotifications {
  451. NSString * const *backgroundConstant = (NSString * const *) dlsym(RTLD_DEFAULT, "UIApplicationDidEnterBackgroundNotification");
  452. if (backgroundConstant) {
  453. FFLog(@"I-RDB038015", @"Registering for background notification.");
  454. [[NSNotificationCenter defaultCenter] addObserver:self
  455. selector:@selector(didEnterBackground)
  456. name:*backgroundConstant
  457. object:nil];
  458. } else {
  459. FFLog(@"I-RDB038016", @"Skipped registering for background notification.");
  460. }
  461. }
  462. - (void) didEnterBackground {
  463. if (!self.config.persistenceEnabled)
  464. return;
  465. // Targetted compilation is ONLY for testing. UIKit is weak-linked in actual release build.
  466. #if TARGET_OS_IPHONE
  467. // The idea is to wait until any outstanding sets get written to disk. Since the sets might still be in our
  468. // dispatch queue, we wait for the dispatch queue to catch up and for persistence to catch up.
  469. // This may be undesirable though. The dispatch queue might just be processing a bunch of incoming data or
  470. // something. We might want to keep track of whether there are any unpersisted sets or something.
  471. FFLog(@"I-RDB038017", @"Entering background. Starting background task to finish work.");
  472. Class uiApplicationClass = NSClassFromString(@"UIApplication");
  473. assert(uiApplicationClass); // If we are here, we should be on iOS and UIApplication should be available.
  474. UIApplication *application = [uiApplicationClass sharedApplication];
  475. __block UIBackgroundTaskIdentifier bgTask = [application beginBackgroundTaskWithExpirationHandler:^{
  476. [application endBackgroundTask:bgTask];
  477. }];
  478. NSDate *start = [NSDate date];
  479. dispatch_async([FIRDatabaseQuery sharedQueue], ^{
  480. NSTimeInterval finishTime = [start timeIntervalSinceNow]*-1;
  481. FFLog(@"I-RDB038018", @"Background task completed. Queue time: %f", finishTime);
  482. [application endBackgroundTask:bgTask];
  483. });
  484. #endif
  485. }
  486. #pragma mark -
  487. #pragma mark Internal methods
  488. /**
  489. * Applies all the changes stored up in the onDisconnect tree
  490. */
  491. - (void) runOnDisconnectEvents {
  492. FFLog(@"I-RDB038019", @"Running onDisconnectEvents");
  493. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  494. FSparseSnapshotTree* resolvedTree = [FServerValues resolveDeferredValueTree:self.onDisconnect withServerValues:serverValues];
  495. NSMutableArray *events = [[NSMutableArray alloc] init];
  496. [resolvedTree forEachTreeAtPath:[FPath empty] do:^(FPath *path, id<FNode> node) {
  497. [events addObjectsFromArray:[self.serverSyncTree applyServerOverwriteAtPath:path newData:node]];
  498. FPath* affectedPath = [self abortTransactionsAtPath:path error:kFTransactionSet];
  499. [self rerunTransactionsForPath:affectedPath];
  500. }];
  501. self.onDisconnect = [[FSparseSnapshotTree alloc] init];
  502. [self.eventRaiser raiseEvents:events];
  503. }
  504. - (NSDictionary *) dumpListens {
  505. return [self.connection dumpListens];
  506. }
  507. #pragma mark -
  508. #pragma mark Transactions
  509. /**
  510. * Setup the transaction data structures
  511. */
  512. - (void) initTransactions {
  513. self.transactionQueueTree = [[FTree alloc] init];
  514. self.hijackHash = NO;
  515. self.loggedTransactionPersistenceWarning = NO;
  516. }
  517. /**
  518. * Creates a new transaction, add its to the transactions we're tracking, and sends it to the server if possible
  519. */
  520. - (void) startTransactionOnPath:(FPath *)path update:(fbt_transactionresult_mutabledata)update onComplete:(fbt_void_nserror_bool_datasnapshot)onComplete withLocalEvents:(BOOL)applyLocally {
  521. if (self.config.persistenceEnabled && !self.loggedTransactionPersistenceWarning) {
  522. self.loggedTransactionPersistenceWarning = YES;
  523. FFInfo(@"I-RDB038020", @"runTransactionBlock: usage detected while persistence is enabled. Please be aware that transactions "
  524. @"*will not* be persisted across app restarts. "
  525. @"See https://www.firebase.com/docs/ios/guide/offline-capabilities.html#section-handling-transactions-offline for more details.");
  526. }
  527. FIRDatabaseReference * watchRef = [[FIRDatabaseReference alloc] initWithRepo:self path:path];
  528. // make sure we're listening on this node
  529. // Note: we can't do this asynchronously. To preserve event ordering, it has to be done in this block.
  530. // This is ok, this block is guaranteed to be our own event loop
  531. NSUInteger handle = [[FUtilities LUIDGenerator] integerValue];
  532. fbt_void_datasnapshot cb = ^(FIRDataSnapshot *snapshot) {};
  533. FValueEventRegistration *registration = [[FValueEventRegistration alloc] initWithRepo:self
  534. handle:handle
  535. callback:cb
  536. cancelCallback:nil];
  537. [watchRef.repo addEventRegistration:registration forQuery:watchRef.querySpec];
  538. fbt_void_void unwatcher = ^{ [watchRef removeObserverWithHandle:handle]; };
  539. // Save all the data that represents this transaction
  540. FTupleTransaction* transaction = [[FTupleTransaction alloc] init];
  541. transaction.path = path;
  542. transaction.update = update;
  543. transaction.onComplete = onComplete;
  544. transaction.status = FTransactionInitializing;
  545. transaction.order = [FUtilities LUIDGenerator];
  546. transaction.applyLocally = applyLocally;
  547. transaction.retryCount = 0;
  548. transaction.unwatcher = unwatcher;
  549. transaction.currentWriteId = nil;
  550. transaction.currentInputSnapshot = nil;
  551. transaction.currentOutputSnapshotRaw = nil;
  552. transaction.currentOutputSnapshotResolved = nil;
  553. // Run transaction initially
  554. id<FNode> currentState = [self latestStateAtPath:path excludeWriteIds:nil];
  555. transaction.currentInputSnapshot = currentState;
  556. FIRMutableData * mutableCurrent = [[FIRMutableData alloc] initWithNode:currentState];
  557. FIRTransactionResult * result = transaction.update(mutableCurrent);
  558. if (!result.isSuccess) {
  559. // Abort the transaction
  560. transaction.unwatcher();
  561. transaction.currentOutputSnapshotRaw = nil;
  562. transaction.currentOutputSnapshotResolved = nil;
  563. if (transaction.onComplete) {
  564. FIRDatabaseReference *ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path];
  565. FIndexedNode *indexedNode = [FIndexedNode indexedNodeWithNode:transaction.currentInputSnapshot];
  566. FIRDataSnapshot *snap = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:indexedNode];
  567. [self.eventRaiser raiseCallback:^{
  568. transaction.onComplete(nil, NO, snap);
  569. }];
  570. }
  571. } else {
  572. // Note: different from js. We don't need to validate, FIRMutableData does validation.
  573. // We also don't have to worry about priorities. Just mark as run and add to queue.
  574. transaction.status = FTransactionRun;
  575. FTree* queueNode = [self.transactionQueueTree subTree:transaction.path];
  576. NSMutableArray* nodeQueue = [queueNode getValue];
  577. if (nodeQueue == nil) {
  578. nodeQueue = [[NSMutableArray alloc] init];
  579. }
  580. [nodeQueue addObject:transaction];
  581. [queueNode setValue:nodeQueue];
  582. // Update visibleData and raise events
  583. // Note: We intentionally raise events after updating all of our transaction state, since the user could
  584. // start new transactions from the event callbacks
  585. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  586. id<FNode> newValUnresolved = [result.update nodeValue];
  587. id<FNode> newVal = [FServerValues resolveDeferredValueSnapshot:newValUnresolved withServerValues:serverValues];
  588. transaction.currentOutputSnapshotRaw = newValUnresolved;
  589. transaction.currentOutputSnapshotResolved = newVal;
  590. transaction.currentWriteId = [NSNumber numberWithInteger:[self nextWriteId]];
  591. NSArray *events = [self.serverSyncTree applyUserOverwriteAtPath:path newData:newVal
  592. writeId:[transaction.currentWriteId integerValue]
  593. isVisible:transaction.applyLocally];
  594. [self.eventRaiser raiseEvents:events];
  595. [self sendAllReadyTransactions];
  596. }
  597. }
  598. /**
  599. * @param writeIdsToExclude A specific set to exclude
  600. */
  601. - (id<FNode>) latestStateAtPath:(FPath *)path excludeWriteIds:(NSArray *)writeIdsToExclude {
  602. id<FNode> latestState = [self.serverSyncTree calcCompleteEventCacheAtPath:path excludeWriteIds:writeIdsToExclude];
  603. return latestState ? latestState : [FEmptyNode emptyNode];
  604. }
  605. /**
  606. * Sends any already-run transactions that aren't waiting for outstanding transactions to complete.
  607. *
  608. * Externally, call the version with no arguments.
  609. * Internally, calls itself recursively with a particular transactionQueueTree node to recurse through the tree
  610. */
  611. - (void) sendAllReadyTransactions {
  612. FTree* node = self.transactionQueueTree;
  613. [self pruneCompletedTransactionsBelowNode:node];
  614. [self sendReadyTransactionsForTree:node];
  615. }
  616. - (void) sendReadyTransactionsForTree:(FTree *)node {
  617. NSMutableArray* queue = [node getValue];
  618. if (queue != nil) {
  619. queue = [self buildTransactionQueueAtNode:node];
  620. NSAssert([queue count] > 0, @"Sending zero length transaction queue");
  621. NSUInteger notRunIndex = [queue indexOfObjectPassingTest:^BOOL(id obj, NSUInteger idx, BOOL *stop) {
  622. return ((FTupleTransaction*)obj).status != FTransactionRun;
  623. }];
  624. // If they're all run (and not sent), we can send them. Else, we must wait.
  625. if (notRunIndex == NSNotFound) {
  626. [self sendTransactionQueue:queue atPath:node.path];
  627. }
  628. } else if ([node hasChildren]) {
  629. [node forEachChild:^(FTree *child) {
  630. [self sendReadyTransactionsForTree:child];
  631. }];
  632. }
  633. }
  634. /**
  635. * Given a list of run transactions, send them to the server and then handle the result (success or failure).
  636. */
  637. - (void) sendTransactionQueue:(NSMutableArray *)queue atPath:(FPath *)path {
  638. // Mark transactions as sent and bump the retry count
  639. NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init];
  640. for (FTupleTransaction *transaction in queue) {
  641. [writeIdsToExclude addObject:transaction.currentWriteId];
  642. }
  643. id<FNode> latestState = [self latestStateAtPath:path excludeWriteIds:writeIdsToExclude];
  644. id<FNode> snapToSend = latestState;
  645. NSString *latestHash = [latestState dataHash];
  646. for (FTupleTransaction* transaction in queue) {
  647. NSAssert(transaction.status == FTransactionRun, @"[FRepo sendTransactionQueue:] items in queue should all be run.");
  648. FFLog(@"I-RDB038021", @"Transaction at %@ set to SENT", transaction.path);
  649. transaction.status = FTransactionSent;
  650. transaction.retryCount++;
  651. FPath *relativePath = [FPath relativePathFrom:path to:transaction.path];
  652. // If we've gotten to this point, the output snapshot must be defined.
  653. snapToSend = [snapToSend updateChild:relativePath withNewChild:transaction.currentOutputSnapshotRaw];
  654. }
  655. id dataToSend = [snapToSend valForExport:YES];
  656. NSString *pathToSend = [path description];
  657. latestHash = self.hijackHash ? @"badhash" : latestHash;
  658. // Send the put
  659. [self.connection putData:dataToSend forPath:pathToSend withHash:latestHash withCallback:^(NSString *status, NSString *errorReason) {
  660. FFLog(@"I-RDB038022", @"Transaction put response: %@ : %@", pathToSend, status);
  661. NSMutableArray *events = [[NSMutableArray alloc] init];
  662. if ([status isEqualToString:kFWPResponseForActionStatusOk]) {
  663. // Queue up the callbacks and fire them after cleaning up all of our transaction state, since
  664. // the callback could trigger more transactions or sets.
  665. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  666. for (FTupleTransaction *transaction in queue) {
  667. transaction.status = FTransactionCompleted;
  668. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  669. revert:NO
  670. persist:NO
  671. clock:self.serverClock]];
  672. if (transaction.onComplete) {
  673. // We never unset the output snapshot, and given that this transaction is complete, it should be set
  674. id <FNode> node = transaction.currentOutputSnapshotResolved;
  675. FIndexedNode *indexedNode = [FIndexedNode indexedNodeWithNode:node];
  676. FIRDatabaseReference *ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path];
  677. FIRDataSnapshot *snapshot = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:indexedNode];
  678. fbt_void_void cb = ^{
  679. transaction.onComplete(nil, YES, snapshot);
  680. };
  681. [callbacks addObject:[cb copy]];
  682. }
  683. transaction.unwatcher();
  684. }
  685. // Now remove the completed transactions.
  686. [self pruneCompletedTransactionsBelowNode:[self.transactionQueueTree subTree:path]];
  687. // There may be pending transactions that we can now send.
  688. [self sendAllReadyTransactions];
  689. // Finally, trigger onComplete callbacks
  690. [self.eventRaiser raiseCallbacks:callbacks];
  691. } else {
  692. // transactions are no longer sent. Update their status appropriately.
  693. if ([status isEqualToString:kFWPResponseForActionStatusDataStale]) {
  694. for (FTupleTransaction *transaction in queue) {
  695. if (transaction.status == FTransactionSentNeedsAbort) {
  696. transaction.status = FTransactionNeedsAbort;
  697. } else {
  698. transaction.status = FTransactionRun;
  699. }
  700. }
  701. } else {
  702. FFWarn(@"I-RDB038023", @"runTransactionBlock: at %@ failed: %@", path, status);
  703. for (FTupleTransaction *transaction in queue) {
  704. transaction.status = FTransactionNeedsAbort;
  705. [transaction setAbortStatus:status reason:errorReason];
  706. }
  707. }
  708. }
  709. [self rerunTransactionsForPath:path];
  710. [self.eventRaiser raiseEvents:events];
  711. }];
  712. }
  713. /**
  714. * Finds all transactions dependent on the data at changed Path and reruns them.
  715. *
  716. * Should be called any time cached data changes.
  717. *
  718. * Return the highest path that was affected by rerunning transactions. This is the path at which events need to
  719. * be raised for.
  720. */
  721. - (FPath *) rerunTransactionsForPath:(FPath *)changedPath {
  722. // For the common case that there are no transactions going on, skip all this!
  723. if ([self.transactionQueueTree isEmpty]) {
  724. return changedPath;
  725. } else {
  726. FTree* rootMostTransactionNode = [self getAncestorTransactionNodeForPath:changedPath];
  727. FPath* path = rootMostTransactionNode.path;
  728. NSArray* queue = [self buildTransactionQueueAtNode:rootMostTransactionNode];
  729. [self rerunTransactionQueue:queue atPath:path];
  730. return path;
  731. }
  732. }
  733. /**
  734. * Does all the work of rerunning transactions (as well as cleans up aborted transactions and whatnot).
  735. */
  736. - (void) rerunTransactionQueue:(NSArray *)queue atPath:(FPath *)path {
  737. if (queue.count == 0) {
  738. return; // nothing to do
  739. }
  740. // Queue up the callbacks and fire them after cleaning up all of our transaction state, since
  741. // the callback could trigger more transactions or sets.
  742. NSMutableArray *events = [[NSMutableArray alloc] init];
  743. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  744. // Ignore, by default, all of the sets in this queue, since we're re-running all of them. However, we want to include
  745. // the results of new sets triggered as part of this re-run, so we don't want to ignore a range, just these specific
  746. // sets.
  747. NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init];
  748. for (FTupleTransaction *transaction in queue) {
  749. [writeIdsToExclude addObject:transaction.currentWriteId];
  750. }
  751. for (FTupleTransaction* transaction in queue) {
  752. FPath* relativePath __unused = [FPath relativePathFrom:path to:transaction.path];
  753. BOOL abortTransaction = NO;
  754. NSAssert(relativePath != nil, @"[FRepo rerunTransactionsQueue:] relativePath should not be null.");
  755. if (transaction.status == FTransactionNeedsAbort) {
  756. abortTransaction = YES;
  757. if (![transaction.abortStatus isEqualToString:kFErrorWriteCanceled]) {
  758. NSArray *ackEvents = [self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  759. revert:YES
  760. persist:NO
  761. clock:self.serverClock];
  762. [events addObjectsFromArray:ackEvents];
  763. }
  764. } else if (transaction.status == FTransactionRun) {
  765. if (transaction.retryCount >= kFTransactionMaxRetries) {
  766. abortTransaction = YES;
  767. [transaction setAbortStatus:kFTransactionTooManyRetries reason:nil];
  768. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  769. revert:YES
  770. persist:NO
  771. clock:self.serverClock]];
  772. } else {
  773. // This code reruns a transaction
  774. id<FNode> currentNode = [self latestStateAtPath:transaction.path excludeWriteIds:writeIdsToExclude];
  775. transaction.currentInputSnapshot = currentNode;
  776. FIRMutableData * mutableCurrent = [[FIRMutableData alloc] initWithNode:currentNode];
  777. FIRTransactionResult * result = transaction.update(mutableCurrent);
  778. if (result.isSuccess) {
  779. NSNumber *oldWriteId = transaction.currentWriteId;
  780. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  781. id<FNode> newVal = [result.update nodeValue];
  782. id<FNode> newValResolved = [FServerValues resolveDeferredValueSnapshot:newVal withServerValues:serverValues];
  783. transaction.currentOutputSnapshotRaw = newVal;
  784. transaction.currentOutputSnapshotResolved = newValResolved;
  785. transaction.currentWriteId = [NSNumber numberWithInteger:[self nextWriteId]];
  786. // Mutates writeIdsToExclude in place
  787. [writeIdsToExclude removeObject:oldWriteId];
  788. [events addObjectsFromArray:[self.serverSyncTree applyUserOverwriteAtPath:transaction.path
  789. newData:transaction.currentOutputSnapshotResolved
  790. writeId:[transaction.currentWriteId integerValue]
  791. isVisible:transaction.applyLocally]];
  792. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[oldWriteId integerValue]
  793. revert:YES
  794. persist:NO
  795. clock:self.serverClock]];
  796. } else {
  797. abortTransaction = YES;
  798. // The user aborted the transaction. JS treats ths as a "nodata" abort, but it's not an error, so we don't send them an error.
  799. [transaction setAbortStatus:nil reason:nil];
  800. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  801. revert:YES
  802. persist:NO
  803. clock:self.serverClock]];
  804. }
  805. }
  806. }
  807. [self.eventRaiser raiseEvents:events];
  808. events = nil;
  809. if (abortTransaction) {
  810. // Abort
  811. transaction.status = FTransactionCompleted;
  812. transaction.unwatcher();
  813. if (transaction.onComplete) {
  814. FIRDatabaseReference * ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path];
  815. FIndexedNode *lastInput = [FIndexedNode indexedNodeWithNode:transaction.currentInputSnapshot];
  816. FIRDataSnapshot * snap = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:lastInput];
  817. fbt_void_void cb = ^{
  818. // Unlike JS, no need to check for "nodata" because ObjC has abortError = nil
  819. transaction.onComplete(transaction.abortError, NO, snap);
  820. };
  821. [callbacks addObject:[cb copy]];
  822. }
  823. }
  824. }
  825. // Note: unlike current js client, we don't need to preserve priority. Users can set priority via FIRMutableData
  826. // Clean up completed transactions.
  827. [self pruneCompletedTransactionsBelowNode:self.transactionQueueTree];
  828. // Now fire callbacks, now that we're in a good, known state.
  829. [self.eventRaiser raiseCallbacks:callbacks];
  830. // Try to send the transaction result to the server
  831. [self sendAllReadyTransactions];
  832. }
  833. - (FTree *) getAncestorTransactionNodeForPath:(FPath *)path {
  834. FTree* transactionNode = self.transactionQueueTree;
  835. while (![path isEmpty] && [transactionNode getValue] == nil) {
  836. NSString* front = [path getFront];
  837. transactionNode = [transactionNode subTree:[[FPath alloc] initWith:front]];
  838. path = [path popFront];
  839. }
  840. return transactionNode;
  841. }
  842. - (NSMutableArray *) buildTransactionQueueAtNode:(FTree *)node {
  843. NSMutableArray* queue = [[NSMutableArray alloc] init];
  844. [self aggregateTransactionQueuesForNode:node andQueue:queue];
  845. [queue sortUsingComparator:^NSComparisonResult(FTupleTransaction* obj1, FTupleTransaction* obj2) {
  846. return [obj1.order compare:obj2.order];
  847. }];
  848. return queue;
  849. }
  850. - (void) aggregateTransactionQueuesForNode:(FTree *)node andQueue:(NSMutableArray *)queue {
  851. NSArray* nodeQueue = [node getValue];
  852. [queue addObjectsFromArray:nodeQueue];
  853. [node forEachChild:^(FTree *child) {
  854. [self aggregateTransactionQueuesForNode:child andQueue:queue];
  855. }];
  856. }
  857. /**
  858. * Remove COMPLETED transactions at or below this node in the transactionQueueTree
  859. */
  860. - (void) pruneCompletedTransactionsBelowNode:(FTree *)node {
  861. NSMutableArray* queue = [node getValue];
  862. if (queue != nil) {
  863. int i = 0;
  864. // remove all of the completed transactions from the queue
  865. while (i < queue.count) {
  866. FTupleTransaction* transaction = [queue objectAtIndex:i];
  867. if (transaction.status == FTransactionCompleted) {
  868. [queue removeObjectAtIndex:i];
  869. } else {
  870. i++;
  871. }
  872. }
  873. if (queue.count > 0) {
  874. [node setValue:queue];
  875. } else {
  876. [node setValue:nil];
  877. }
  878. }
  879. [node forEachChildMutationSafe:^(FTree *child) {
  880. [self pruneCompletedTransactionsBelowNode:child];
  881. }];
  882. }
  883. /**
  884. * Aborts all transactions on ancestors or descendants of the specified path. Called when doing a setValue: or
  885. * updateChildValues: since we consider them incompatible with transactions
  886. *
  887. * @param path path for which we want to abort related transactions.
  888. */
  889. - (FPath *) abortTransactionsAtPath:(FPath *)path error:(NSString *)error {
  890. // For the common case that there are no transactions going on, skip all this!
  891. if ([self.transactionQueueTree isEmpty]) {
  892. return path;
  893. } else {
  894. FPath* affectedPath = [self getAncestorTransactionNodeForPath:path].path;
  895. FTree* transactionNode = [self.transactionQueueTree subTree:path];
  896. [transactionNode forEachAncestor:^BOOL(FTree *ancestor) {
  897. [self abortTransactionsAtNode:ancestor error:error];
  898. return NO;
  899. }];
  900. [self abortTransactionsAtNode:transactionNode error:error];
  901. [transactionNode forEachDescendant:^(FTree *child) {
  902. [self abortTransactionsAtNode:child error:error];
  903. }];
  904. return affectedPath;
  905. }
  906. }
  907. /**
  908. * Abort transactions stored in this transactions queue node.
  909. *
  910. * @param node Node to abort transactions for.
  911. */
  912. - (void) abortTransactionsAtNode:(FTree *)node error:(NSString *)error {
  913. NSMutableArray* queue = [node getValue];
  914. if (queue != nil) {
  915. // Queue up the callbacks and fire them after cleaning up all of our transaction state, since
  916. // can be immediately aborted and removed.
  917. NSMutableArray* callbacks = [[NSMutableArray alloc] init];
  918. // Go through queue. Any already-sent transactions must be marked for abort, while the unsent ones
  919. // can be immediately aborted and removed
  920. NSMutableArray *events = [[NSMutableArray alloc] init];
  921. int lastSent = -1;
  922. // Note: all of the sent transactions will be at the front of the queue, so safe to increment lastSent
  923. for (FTupleTransaction* transaction in queue) {
  924. if (transaction.status == FTransactionSentNeedsAbort) {
  925. // No-op. already marked.
  926. } else if (transaction.status == FTransactionSent) {
  927. // Mark this transaction for abort when it returns
  928. lastSent++;
  929. transaction.status = FTransactionSentNeedsAbort;
  930. [transaction setAbortStatus:error reason:nil];
  931. } else {
  932. // we can abort this immediately
  933. transaction.unwatcher();
  934. if ([error isEqualToString:kFTransactionSet]) {
  935. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  936. revert:YES
  937. persist:NO
  938. clock:self.serverClock]];
  939. } else {
  940. // If it was cancelled it was already removed from the sync tree, no need to ack
  941. NSAssert([error isEqualToString:kFErrorWriteCanceled], nil);
  942. }
  943. if (transaction.onComplete) {
  944. NSError* abortReason = [FUtilities errorForStatus:error andReason:nil];
  945. FIRDataSnapshot * snapshot = nil;
  946. fbt_void_void cb = ^{
  947. transaction.onComplete(abortReason, NO, snapshot);
  948. };
  949. [callbacks addObject:[cb copy]];
  950. }
  951. }
  952. }
  953. if (lastSent == -1) {
  954. // We're not waiting for any sent transactions. We can clear the queue.
  955. [node setValue:nil];
  956. } else {
  957. // Remove the transactions we aborted
  958. NSRange theRange;
  959. theRange.location = lastSent + 1;
  960. theRange.length = queue.count - theRange.location;
  961. [queue removeObjectsInRange:theRange];
  962. }
  963. // Now fire the callbacks
  964. [self.eventRaiser raiseEvents:events];
  965. [self.eventRaiser raiseCallbacks:callbacks];
  966. }
  967. }
  968. @end