FRepo.m 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <Foundation/Foundation.h>
  17. #import "FAtomicNumber.h"
  18. #import "FCachePolicy.h"
  19. #import "FClock.h"
  20. #import "FConstants.h"
  21. #import "FEmptyNode.h"
  22. #import "FEventRaiser.h"
  23. #import "FEventRegistration.h"
  24. #import "FIRDataSnapshot.h"
  25. #import "FIRDataSnapshot_Private.h"
  26. #import "FIRDatabaseConfig_Private.h"
  27. #import "FIRDatabaseQuery_Private.h"
  28. #import "FIRDatabase_Private.h"
  29. #import "FIRMutableData.h"
  30. #import "FIRMutableData_Private.h"
  31. #import "FIRTransactionResult.h"
  32. #import "FIRTransactionResult_Private.h"
  33. #import "FLevelDBStorageEngine.h"
  34. #import "FListenProvider.h"
  35. #import "FPersistenceManager.h"
  36. #import "FQuerySpec.h"
  37. #import "FRepo.h"
  38. #import "FRepoManager.h"
  39. #import "FRepo_Private.h"
  40. #import "FServerValues.h"
  41. #import "FSnapshotHolder.h"
  42. #import "FSnapshotUtilities.h"
  43. #import "FSyncTree.h"
  44. #import "FTree.h"
  45. #import "FTupleNodePath.h"
  46. #import "FTupleSetIdPath.h"
  47. #import "FTupleTransaction.h"
  48. #import "FValueEventRegistration.h"
  49. #import "FWriteRecord.h"
  50. #import <FirebaseCore/FIRLogger.h>
  51. #import <dlfcn.h>
  52. #if TARGET_OS_IOS || TARGET_OS_TV
  53. #import <UIKit/UIKit.h>
  54. #endif
  55. @interface FRepo ()
  56. @property(nonatomic, strong) FOffsetClock *serverClock;
  57. @property(nonatomic, strong) FPersistenceManager *persistenceManager;
  58. @property(nonatomic, strong) FIRDatabase *database;
  59. @property(nonatomic, strong, readwrite) FAuthenticationManager *auth;
  60. @property(nonatomic, strong) FSyncTree *infoSyncTree;
  61. @property(nonatomic) NSInteger writeIdCounter;
  62. @property(nonatomic) BOOL hijackHash;
  63. @property(nonatomic, strong) FTree *transactionQueueTree;
  64. @property(nonatomic) BOOL loggedTransactionPersistenceWarning;
  65. /**
  66. * Test only. For load testing the server.
  67. */
  68. @property(nonatomic, strong) id (^interceptServerDataCallback)
  69. (NSString *pathString, id data);
  70. @end
  71. @implementation FRepo
  72. - (id)initWithRepoInfo:(FRepoInfo *)info
  73. config:(FIRDatabaseConfig *)config
  74. database:(FIRDatabase *)database {
  75. self = [super init];
  76. if (self) {
  77. self.repoInfo = info;
  78. self.config = config;
  79. self.database = database;
  80. // Access can occur outside of shared queue, so the clock needs to be
  81. // initialized here
  82. self.serverClock =
  83. [[FOffsetClock alloc] initWithClock:[FSystemClock clock] offset:0];
  84. self.connection = [[FPersistentConnection alloc]
  85. initWithRepoInfo:self.repoInfo
  86. dispatchQueue:[FIRDatabaseQuery sharedQueue]
  87. config:self.config];
  88. // Needs to be called before authentication manager is instantiated
  89. self.eventRaiser =
  90. [[FEventRaiser alloc] initWithQueue:self.config.callbackQueue];
  91. dispatch_async([FIRDatabaseQuery sharedQueue], ^{
  92. [self deferredInit];
  93. });
  94. }
  95. return self;
  96. }
  97. - (void)deferredInit {
  98. // TODO: cleanup on dealloc
  99. __weak FRepo *weakSelf = self;
  100. [self.config.authTokenProvider listenForTokenChanges:^(NSString *token) {
  101. [weakSelf.connection refreshAuthToken:token];
  102. }];
  103. // Open connection now so that by the time we are connected the deferred
  104. // init has run This relies on the fact that all callbacks run on repos
  105. // queue
  106. self.connection.delegate = self;
  107. [self.connection open];
  108. self.dataUpdateCount = 0;
  109. self.rangeMergeUpdateCount = 0;
  110. self.interceptServerDataCallback = nil;
  111. if (self.config.persistenceEnabled) {
  112. NSString *repoHashString =
  113. [NSString stringWithFormat:@"%@_%@", self.repoInfo.host,
  114. self.repoInfo.namespace];
  115. NSString *persistencePrefix =
  116. [NSString stringWithFormat:@"%@/%@", self.config.sessionIdentifier,
  117. repoHashString];
  118. id<FCachePolicy> cachePolicy = [[FLRUCachePolicy alloc]
  119. initWithMaxSize:self.config.persistenceCacheSizeBytes];
  120. id<FStorageEngine> engine;
  121. if (self.config.forceStorageEngine != nil) {
  122. engine = self.config.forceStorageEngine;
  123. } else {
  124. FLevelDBStorageEngine *levelDBEngine =
  125. [[FLevelDBStorageEngine alloc] initWithPath:persistencePrefix];
  126. // We need the repo info to run the legacy migration. Future
  127. // migrations will be managed by the database itself Remove this
  128. // once we are confident that no-one is using legacy migration
  129. // anymore...
  130. [levelDBEngine runLegacyMigration:self.repoInfo];
  131. engine = levelDBEngine;
  132. }
  133. self.persistenceManager =
  134. [[FPersistenceManager alloc] initWithStorageEngine:engine
  135. cachePolicy:cachePolicy];
  136. } else {
  137. self.persistenceManager = nil;
  138. }
  139. [self initTransactions];
  140. // A list of data pieces and paths to be set when this client disconnects
  141. self.onDisconnect = [[FSparseSnapshotTree alloc] init];
  142. self.infoData = [[FSnapshotHolder alloc] init];
  143. FListenProvider *infoListenProvider = [[FListenProvider alloc] init];
  144. infoListenProvider.startListening =
  145. ^(FQuerySpec *query, NSNumber *tagId, id<FSyncTreeHash> hash,
  146. fbt_nsarray_nsstring onComplete) {
  147. NSArray *infoEvents = @[];
  148. FRepo *strongSelf = weakSelf;
  149. id<FNode> node = [strongSelf.infoData getNode:query.path];
  150. // This is possibly a hack, but we have different semantics for .info
  151. // endpoints. We don't raise null events on initial data...
  152. if (![node isEmpty]) {
  153. infoEvents =
  154. [strongSelf.infoSyncTree applyServerOverwriteAtPath:query.path
  155. newData:node];
  156. [strongSelf.eventRaiser raiseCallback:^{
  157. onComplete(kFWPResponseForActionStatusOk);
  158. }];
  159. }
  160. return infoEvents;
  161. };
  162. infoListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tagId) {
  163. };
  164. self.infoSyncTree =
  165. [[FSyncTree alloc] initWithListenProvider:infoListenProvider];
  166. FListenProvider *serverListenProvider = [[FListenProvider alloc] init];
  167. serverListenProvider.startListening =
  168. ^(FQuerySpec *query, NSNumber *tagId, id<FSyncTreeHash> hash,
  169. fbt_nsarray_nsstring onComplete) {
  170. [weakSelf.connection listen:query
  171. tagId:tagId
  172. hash:hash
  173. onComplete:^(NSString *status) {
  174. NSArray *events = onComplete(status);
  175. [weakSelf.eventRaiser raiseEvents:events];
  176. }];
  177. // No synchronous events for network-backed sync trees
  178. return @[];
  179. };
  180. serverListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tag) {
  181. [weakSelf.connection unlisten:query tagId:tag];
  182. };
  183. self.serverSyncTree =
  184. [[FSyncTree alloc] initWithPersistenceManager:self.persistenceManager
  185. listenProvider:serverListenProvider];
  186. [self restoreWrites];
  187. [self updateInfo:kDotInfoConnected withValue:@NO];
  188. [self setupNotifications];
  189. }
  190. - (void)restoreWrites {
  191. NSArray *writes = self.persistenceManager.userWrites;
  192. NSDictionary *serverValues =
  193. [FServerValues generateServerValues:self.serverClock];
  194. __block NSInteger lastWriteId = NSIntegerMin;
  195. [writes enumerateObjectsUsingBlock:^(FWriteRecord *write, NSUInteger idx,
  196. BOOL *stop) {
  197. NSInteger writeId = write.writeId;
  198. fbt_void_nsstring_nsstring callback =
  199. ^(NSString *status, NSString *errorReason) {
  200. [self warnIfWriteFailedAtPath:write.path
  201. status:status
  202. message:@"Persisted write"];
  203. [self ackWrite:writeId
  204. rerunTransactionsAtPath:write.path
  205. status:status];
  206. };
  207. if (lastWriteId >= writeId) {
  208. [NSException raise:NSInternalInconsistencyException
  209. format:@"Restored writes were not in order!"];
  210. }
  211. lastWriteId = writeId;
  212. self.writeIdCounter = writeId + 1;
  213. if ([write isOverwrite]) {
  214. FFLog(@"I-RDB038001", @"Restoring overwrite with id %ld",
  215. (long)write.writeId);
  216. [self.connection putData:[write.overwrite valForExport:YES]
  217. forPath:[write.path toString]
  218. withHash:nil
  219. withCallback:callback];
  220. id<FNode> resolved =
  221. [FServerValues resolveDeferredValueSnapshot:write.overwrite
  222. withSyncTree:self.serverSyncTree
  223. atPath:write.path
  224. serverValues:serverValues];
  225. [self.serverSyncTree applyUserOverwriteAtPath:write.path
  226. newData:resolved
  227. writeId:writeId
  228. isVisible:YES];
  229. } else {
  230. FFLog(@"I-RDB038002", @"Restoring merge with id %ld",
  231. (long)write.writeId);
  232. [self.connection mergeData:[write.merge valForExport:YES]
  233. forPath:[write.path toString]
  234. withCallback:callback];
  235. FCompoundWrite *resolved = [FServerValues
  236. resolveDeferredValueCompoundWrite:write.merge
  237. withSyncTree:self.serverSyncTree
  238. atPath:write.path
  239. serverValues:serverValues];
  240. [self.serverSyncTree applyUserMergeAtPath:write.path
  241. changedChildren:resolved
  242. writeId:writeId];
  243. }
  244. }];
  245. }
  246. - (NSString *)name {
  247. return self.repoInfo.namespace;
  248. }
  249. - (NSString *)description {
  250. return [self.repoInfo description];
  251. }
  252. - (void)interrupt {
  253. [self.connection interruptForReason:kFInterruptReasonRepoInterrupt];
  254. }
  255. - (void)resume {
  256. [self.connection resumeForReason:kFInterruptReasonRepoInterrupt];
  257. }
  258. // NOTE: Typically if you're calling this, you should be in an @autoreleasepool
  259. // block to make sure that ARC kicks in and cleans up things no longer
  260. // referenced (i.e. pendingPutsDB).
  261. - (void)dispose {
  262. [self.connection interruptForReason:kFInterruptReasonRepoInterrupt];
  263. // We need to nil out any references to LevelDB, to make sure the
  264. // LevelDB exclusive locks are released.
  265. [self.persistenceManager close];
  266. }
  267. - (NSInteger)nextWriteId {
  268. return self->_writeIdCounter++;
  269. }
  270. - (NSTimeInterval)serverTime {
  271. return [self.serverClock currentTime];
  272. }
  273. - (void)set:(FPath *)path
  274. withNode:(id<FNode>)node
  275. withCallback:(fbt_void_nserror_ref)onComplete {
  276. id value = [node valForExport:YES];
  277. FFLog(@"I-RDB038003", @"Setting: %@ with %@ pri: %@", [path toString],
  278. [value description], [[node getPriority] val]);
  279. // TODO: Optimize this behavior to either (a) store flag to skip resolving
  280. // where possible and / or (b) store unresolved paths on JSON parse
  281. NSDictionary *serverValues =
  282. [FServerValues generateServerValues:self.serverClock];
  283. id<FNode> existing = [self.serverSyncTree calcCompleteEventCacheAtPath:path
  284. excludeWriteIds:@[]];
  285. id<FNode> newNode =
  286. [FServerValues resolveDeferredValueSnapshot:node
  287. withExisting:existing
  288. serverValues:serverValues];
  289. NSInteger writeId = [self nextWriteId];
  290. [self.persistenceManager saveUserOverwrite:node
  291. atPath:path
  292. writeId:writeId];
  293. NSArray *events = [self.serverSyncTree applyUserOverwriteAtPath:path
  294. newData:newNode
  295. writeId:writeId
  296. isVisible:YES];
  297. [self.eventRaiser raiseEvents:events];
  298. [self.connection putData:value
  299. forPath:[path toString]
  300. withHash:nil
  301. withCallback:^(NSString *status, NSString *errorReason) {
  302. [self warnIfWriteFailedAtPath:path
  303. status:status
  304. message:@"setValue: or removeValue:"];
  305. [self ackWrite:writeId
  306. rerunTransactionsAtPath:path
  307. status:status];
  308. [self callOnComplete:onComplete
  309. withStatus:status
  310. errorReason:errorReason
  311. andPath:path];
  312. }];
  313. FPath *affectedPath = [self abortTransactionsAtPath:path
  314. error:kFTransactionSet];
  315. [self rerunTransactionsForPath:affectedPath];
  316. }
  317. - (void)update:(FPath *)path
  318. withNodes:(FCompoundWrite *)nodes
  319. withCallback:(fbt_void_nserror_ref)callback {
  320. NSDictionary *values = [nodes valForExport:YES];
  321. FFLog(@"I-RDB038004", @"Updating: %@ with %@", [path toString],
  322. [values description]);
  323. NSDictionary *serverValues =
  324. [FServerValues generateServerValues:self.serverClock];
  325. FCompoundWrite *resolved =
  326. [FServerValues resolveDeferredValueCompoundWrite:nodes
  327. withSyncTree:self.serverSyncTree
  328. atPath:path
  329. serverValues:serverValues];
  330. if (!resolved.isEmpty) {
  331. NSInteger writeId = [self nextWriteId];
  332. [self.persistenceManager saveUserMerge:nodes
  333. atPath:path
  334. writeId:writeId];
  335. NSArray *events = [self.serverSyncTree applyUserMergeAtPath:path
  336. changedChildren:resolved
  337. writeId:writeId];
  338. [self.eventRaiser raiseEvents:events];
  339. [self.connection mergeData:values
  340. forPath:[path description]
  341. withCallback:^(NSString *status, NSString *errorReason) {
  342. [self warnIfWriteFailedAtPath:path
  343. status:status
  344. message:@"updateChildValues:"];
  345. [self ackWrite:writeId
  346. rerunTransactionsAtPath:path
  347. status:status];
  348. [self callOnComplete:callback
  349. withStatus:status
  350. errorReason:errorReason
  351. andPath:path];
  352. }];
  353. [nodes enumerateWrites:^(FPath *childPath, id<FNode> node, BOOL *stop) {
  354. FPath *pathFromRoot = [path child:childPath];
  355. FFLog(@"I-RDB038005", @"Cancelling transactions at path: %@",
  356. pathFromRoot);
  357. FPath *affectedPath = [self abortTransactionsAtPath:pathFromRoot
  358. error:kFTransactionSet];
  359. [self rerunTransactionsForPath:affectedPath];
  360. }];
  361. } else {
  362. FFLog(@"I-RDB038006", @"update called with empty data. Doing nothing");
  363. // Do nothing, just call the callback
  364. [self callOnComplete:callback
  365. withStatus:@"ok"
  366. errorReason:nil
  367. andPath:path];
  368. }
  369. }
  370. - (void)onDisconnectCancel:(FPath *)path
  371. withCallback:(fbt_void_nserror_ref)callback {
  372. [self.connection
  373. onDisconnectCancelPath:path
  374. withCallback:^(NSString *status, NSString *errorReason) {
  375. BOOL success =
  376. [status isEqualToString:kFWPResponseForActionStatusOk];
  377. if (success) {
  378. [self.onDisconnect forgetPath:path];
  379. } else {
  380. FFLog(@"I-RDB038007",
  381. @"cancelDisconnectOperations: at %@ failed: %@",
  382. path, status);
  383. }
  384. [self callOnComplete:callback
  385. withStatus:status
  386. errorReason:errorReason
  387. andPath:path];
  388. }];
  389. }
  390. - (void)onDisconnectSet:(FPath *)path
  391. withNode:(id<FNode>)node
  392. withCallback:(fbt_void_nserror_ref)callback {
  393. [self.connection
  394. onDisconnectPutData:[node valForExport:YES]
  395. forPath:path
  396. withCallback:^(NSString *status, NSString *errorReason) {
  397. BOOL success =
  398. [status isEqualToString:kFWPResponseForActionStatusOk];
  399. if (success) {
  400. [self.onDisconnect rememberData:node onPath:path];
  401. } else {
  402. FFWarn(@"I-RDB038008",
  403. @"onDisconnectSetValue: or "
  404. @"onDisconnectRemoveValue: at %@ failed: %@",
  405. path, status);
  406. }
  407. [self callOnComplete:callback
  408. withStatus:status
  409. errorReason:errorReason
  410. andPath:path];
  411. }];
  412. }
  413. - (void)onDisconnectUpdate:(FPath *)path
  414. withNodes:(FCompoundWrite *)nodes
  415. withCallback:(fbt_void_nserror_ref)callback {
  416. if (!nodes.isEmpty) {
  417. NSDictionary *values = [nodes valForExport:YES];
  418. [self.connection
  419. onDisconnectMergeData:values
  420. forPath:path
  421. withCallback:^(NSString *status, NSString *errorReason) {
  422. BOOL success = [status
  423. isEqualToString:kFWPResponseForActionStatusOk];
  424. if (success) {
  425. [nodes enumerateWrites:^(FPath *relativePath,
  426. id<FNode> nodeUnresolved,
  427. BOOL *stop) {
  428. FPath *childPath = [path child:relativePath];
  429. [self.onDisconnect rememberData:nodeUnresolved
  430. onPath:childPath];
  431. }];
  432. } else {
  433. FFWarn(@"I-RDB038009",
  434. @"onDisconnectUpdateChildValues: at %@ "
  435. @"failed %@",
  436. path, status);
  437. }
  438. [self callOnComplete:callback
  439. withStatus:status
  440. errorReason:errorReason
  441. andPath:path];
  442. }];
  443. } else {
  444. // Do nothing, just call the callback
  445. [self callOnComplete:callback
  446. withStatus:@"ok"
  447. errorReason:nil
  448. andPath:path];
  449. }
  450. }
  451. - (void)purgeOutstandingWrites {
  452. FFLog(@"I-RDB038010", @"Purging outstanding writes");
  453. NSArray *events = [self.serverSyncTree removeAllWrites];
  454. [self.eventRaiser raiseEvents:events];
  455. // Abort any transactions
  456. [self abortTransactionsAtPath:[FPath empty] error:kFErrorWriteCanceled];
  457. // Remove outstanding writes from connection
  458. [self.connection purgeOutstandingWrites];
  459. }
  460. - (void)addEventRegistration:(id<FEventRegistration>)eventRegistration
  461. forQuery:(FQuerySpec *)query {
  462. NSArray *events = nil;
  463. if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) {
  464. events = [self.infoSyncTree addEventRegistration:eventRegistration
  465. forQuery:query];
  466. } else {
  467. events = [self.serverSyncTree addEventRegistration:eventRegistration
  468. forQuery:query];
  469. }
  470. [self.eventRaiser raiseEvents:events];
  471. }
  472. - (void)removeEventRegistration:(id<FEventRegistration>)eventRegistration
  473. forQuery:(FQuerySpec *)query {
  474. // These are guaranteed not to raise events, since we're not passing in a
  475. // cancelError. However we can future-proof a little bit by handling the
  476. // return values anyways.
  477. FFLog(@"I-RDB038011", @"Removing event registration with hande: %lu",
  478. (unsigned long)eventRegistration.handle);
  479. NSArray *events = nil;
  480. if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) {
  481. events = [self.infoSyncTree removeEventRegistration:eventRegistration
  482. forQuery:query
  483. cancelError:nil];
  484. } else {
  485. events = [self.serverSyncTree removeEventRegistration:eventRegistration
  486. forQuery:query
  487. cancelError:nil];
  488. }
  489. [self.eventRaiser raiseEvents:events];
  490. }
  491. - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)synced {
  492. NSAssert(![[query.path getFront] isEqualToString:kDotInfoPrefix],
  493. @"Can't keep .info tree synced!");
  494. [self.serverSyncTree keepQuery:query synced:synced];
  495. }
  496. - (void)updateInfo:(NSString *)pathString withValue:(id)value {
  497. // hack to make serverTimeOffset available in a threadsafe way. Property is
  498. // marked as atomic
  499. if ([pathString isEqualToString:kDotInfoServerTimeOffset]) {
  500. NSTimeInterval offset = [(NSNumber *)value doubleValue] / 1000.0;
  501. self.serverClock =
  502. [[FOffsetClock alloc] initWithClock:[FSystemClock clock]
  503. offset:offset];
  504. }
  505. FPath *path = [[FPath alloc]
  506. initWith:[NSString
  507. stringWithFormat:@"%@/%@", kDotInfoPrefix, pathString]];
  508. id<FNode> newNode = [FSnapshotUtilities nodeFrom:value];
  509. [self.infoData updateSnapshot:path withNewSnapshot:newNode];
  510. NSArray *events = [self.infoSyncTree applyServerOverwriteAtPath:path
  511. newData:newNode];
  512. [self.eventRaiser raiseEvents:events];
  513. }
  514. - (void)callOnComplete:(fbt_void_nserror_ref)onComplete
  515. withStatus:(NSString *)status
  516. errorReason:(NSString *)errorReason
  517. andPath:(FPath *)path {
  518. if (onComplete) {
  519. FIRDatabaseReference *ref =
  520. [[FIRDatabaseReference alloc] initWithRepo:self path:path];
  521. BOOL statusOk = [status isEqualToString:kFWPResponseForActionStatusOk];
  522. NSError *err = nil;
  523. if (!statusOk) {
  524. err = [FUtilities errorForStatus:status andReason:errorReason];
  525. }
  526. [self.eventRaiser raiseCallback:^{
  527. onComplete(err, ref);
  528. }];
  529. }
  530. }
  531. - (void)ackWrite:(NSInteger)writeId
  532. rerunTransactionsAtPath:(FPath *)path
  533. status:(NSString *)status {
  534. if ([status isEqualToString:kFErrorWriteCanceled]) {
  535. // This write was already removed, we just need to ignore it...
  536. } else {
  537. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  538. NSArray *clearEvents =
  539. [self.serverSyncTree ackUserWriteWithWriteId:writeId
  540. revert:!success
  541. persist:YES
  542. clock:self.serverClock];
  543. if ([clearEvents count] > 0) {
  544. [self rerunTransactionsForPath:path];
  545. }
  546. [self.eventRaiser raiseEvents:clearEvents];
  547. }
  548. }
  549. - (void)warnIfWriteFailedAtPath:(FPath *)path
  550. status:(NSString *)status
  551. message:(NSString *)message {
  552. if (!([status isEqualToString:kFWPResponseForActionStatusOk] ||
  553. [status isEqualToString:kFErrorWriteCanceled])) {
  554. FFWarn(@"I-RDB038012", @"%@ at %@ failed: %@", message, path, status);
  555. }
  556. }
  557. #pragma mark -
  558. #pragma mark FPersistentConnectionDelegate methods
  559. - (void)onDataUpdate:(FPersistentConnection *)fpconnection
  560. forPath:(NSString *)pathString
  561. message:(id)data
  562. isMerge:(BOOL)isMerge
  563. tagId:(NSNumber *)tagId {
  564. FFLog(@"I-RDB038013", @"onDataUpdateForPath: %@ withMessage: %@",
  565. pathString, data);
  566. // For testing.
  567. self.dataUpdateCount++;
  568. FPath *path = [[FPath alloc] initWith:pathString];
  569. data = self.interceptServerDataCallback
  570. ? self.interceptServerDataCallback(pathString, data)
  571. : data;
  572. NSArray *events = nil;
  573. if (tagId != nil) {
  574. if (isMerge) {
  575. NSDictionary *message = data;
  576. FCompoundWrite *taggedChildren =
  577. [FCompoundWrite compoundWriteWithValueDictionary:message];
  578. events =
  579. [self.serverSyncTree applyTaggedQueryMergeAtPath:path
  580. changedChildren:taggedChildren
  581. tagId:tagId];
  582. } else {
  583. id<FNode> taggedSnap = [FSnapshotUtilities nodeFrom:data];
  584. events =
  585. [self.serverSyncTree applyTaggedQueryOverwriteAtPath:path
  586. newData:taggedSnap
  587. tagId:tagId];
  588. }
  589. } else if (isMerge) {
  590. NSDictionary *message = data;
  591. FCompoundWrite *changedChildren =
  592. [FCompoundWrite compoundWriteWithValueDictionary:message];
  593. events = [self.serverSyncTree applyServerMergeAtPath:path
  594. changedChildren:changedChildren];
  595. } else {
  596. id<FNode> snap = [FSnapshotUtilities nodeFrom:data];
  597. events = [self.serverSyncTree applyServerOverwriteAtPath:path
  598. newData:snap];
  599. }
  600. if ([events count] > 0) {
  601. // Since we have a listener outstanding for each transaction, receiving
  602. // any events is a proxy for some change having occurred.
  603. [self rerunTransactionsForPath:path];
  604. }
  605. [self.eventRaiser raiseEvents:events];
  606. }
  607. - (void)onRangeMerge:(NSArray *)ranges
  608. forPath:(NSString *)pathString
  609. tagId:(NSNumber *)tag {
  610. FFLog(@"I-RDB038014", @"onRangeMerge: %@ => %@", pathString, ranges);
  611. // For testing
  612. self.rangeMergeUpdateCount++;
  613. FPath *path = [[FPath alloc] initWith:pathString];
  614. NSArray *events;
  615. if (tag != nil) {
  616. events = [self.serverSyncTree applyTaggedServerRangeMergeAtPath:path
  617. updates:ranges
  618. tagId:tag];
  619. } else {
  620. events = [self.serverSyncTree applyServerRangeMergeAtPath:path
  621. updates:ranges];
  622. }
  623. if (events.count > 0) {
  624. // Since we have a listener outstanding for each transaction, receiving
  625. // any events is a proxy for some change having occurred.
  626. [self rerunTransactionsForPath:path];
  627. }
  628. [self.eventRaiser raiseEvents:events];
  629. }
  630. - (void)onConnect:(FPersistentConnection *)fpconnection {
  631. [self updateInfo:kDotInfoConnected withValue:@YES];
  632. }
  633. - (void)onDisconnect:(FPersistentConnection *)fpconnection {
  634. [self updateInfo:kDotInfoConnected withValue:@NO];
  635. [self runOnDisconnectEvents];
  636. }
  637. - (void)onServerInfoUpdate:(FPersistentConnection *)fpconnection
  638. updates:(NSDictionary *)updates {
  639. for (NSString *key in updates) {
  640. id val = [updates objectForKey:key];
  641. [self updateInfo:key withValue:val];
  642. }
  643. }
  644. - (void)setupNotifications {
  645. NSString *const *backgroundConstant = (NSString *const *)dlsym(
  646. RTLD_DEFAULT, "UIApplicationDidEnterBackgroundNotification");
  647. if (backgroundConstant) {
  648. FFLog(@"I-RDB038015", @"Registering for background notification.");
  649. [[NSNotificationCenter defaultCenter]
  650. addObserver:self
  651. selector:@selector(didEnterBackground)
  652. name:*backgroundConstant
  653. object:nil];
  654. } else {
  655. FFLog(@"I-RDB038016",
  656. @"Skipped registering for background notification.");
  657. }
  658. }
  659. - (void)didEnterBackground {
  660. if (!self.config.persistenceEnabled)
  661. return;
  662. // Targetted compilation is ONLY for testing. UIKit is weak-linked in actual
  663. // release build.
  664. #if TARGET_OS_IOS || TARGET_OS_TV
  665. // The idea is to wait until any outstanding sets get written to disk. Since
  666. // the sets might still be in our dispatch queue, we wait for the dispatch
  667. // queue to catch up and for persistence to catch up. This may be
  668. // undesirable though. The dispatch queue might just be processing a bunch
  669. // of incoming data or something. We might want to keep track of whether
  670. // there are any unpersisted sets or something.
  671. FFLog(@"I-RDB038017",
  672. @"Entering background. Starting background task to finish work.");
  673. Class uiApplicationClass = NSClassFromString(@"UIApplication");
  674. assert(uiApplicationClass); // If we are here, we should be on iOS and
  675. // UIApplication should be available.
  676. UIApplication *application = [uiApplicationClass sharedApplication];
  677. __block UIBackgroundTaskIdentifier bgTask =
  678. [application beginBackgroundTaskWithExpirationHandler:^{
  679. [application endBackgroundTask:bgTask];
  680. }];
  681. NSDate *start = [NSDate date];
  682. dispatch_async([FIRDatabaseQuery sharedQueue], ^{
  683. NSTimeInterval finishTime = [start timeIntervalSinceNow] * -1;
  684. FFLog(@"I-RDB038018", @"Background task completed. Queue time: %f",
  685. finishTime);
  686. [application endBackgroundTask:bgTask];
  687. });
  688. #endif
  689. }
  690. #pragma mark -
  691. #pragma mark Internal methods
  692. /**
  693. * Applies all the changes stored up in the onDisconnect tree
  694. */
  695. - (void)runOnDisconnectEvents {
  696. FFLog(@"I-RDB038019", @"Running onDisconnectEvents");
  697. NSDictionary *serverValues =
  698. [FServerValues generateServerValues:self.serverClock];
  699. NSMutableArray *events = [[NSMutableArray alloc] init];
  700. [self.onDisconnect
  701. forEachTreeAtPath:[FPath empty]
  702. do:^(FPath *path, id<FNode> node) {
  703. id<FNode> existing = [self.serverSyncTree
  704. calcCompleteEventCacheAtPath:path
  705. excludeWriteIds:@[]];
  706. id<FNode> resolved = [FServerValues
  707. resolveDeferredValueSnapshot:node
  708. withExisting:existing
  709. serverValues:serverValues];
  710. [events addObjectsFromArray:
  711. [self.serverSyncTree
  712. applyServerOverwriteAtPath:path
  713. newData:resolved]];
  714. FPath *affectedPath =
  715. [self abortTransactionsAtPath:path
  716. error:kFTransactionSet];
  717. [self rerunTransactionsForPath:affectedPath];
  718. }];
  719. self.onDisconnect = [[FSparseSnapshotTree alloc] init];
  720. [self.eventRaiser raiseEvents:events];
  721. }
  722. - (NSDictionary *)dumpListens {
  723. return [self.connection dumpListens];
  724. }
  725. #pragma mark -
  726. #pragma mark Transactions
  727. /**
  728. * Setup the transaction data structures
  729. */
  730. - (void)initTransactions {
  731. self.transactionQueueTree = [[FTree alloc] init];
  732. self.hijackHash = NO;
  733. self.loggedTransactionPersistenceWarning = NO;
  734. }
  735. /**
  736. * Creates a new transaction, add its to the transactions we're tracking, and
  737. * sends it to the server if possible
  738. */
  739. - (void)startTransactionOnPath:(FPath *)path
  740. update:(fbt_transactionresult_mutabledata)update
  741. onComplete:(fbt_void_nserror_bool_datasnapshot)onComplete
  742. withLocalEvents:(BOOL)applyLocally {
  743. if (self.config.persistenceEnabled &&
  744. !self.loggedTransactionPersistenceWarning) {
  745. self.loggedTransactionPersistenceWarning = YES;
  746. FFInfo(@"I-RDB038020",
  747. @"runTransactionBlock: usage detected while persistence is "
  748. @"enabled. Please be aware that transactions "
  749. @"*will not* be persisted across app restarts. "
  750. @"See "
  751. @"https://www.firebase.com/docs/ios/guide/"
  752. @"offline-capabilities.html#section-handling-transactions-"
  753. @"offline for more details.");
  754. }
  755. FIRDatabaseReference *watchRef =
  756. [[FIRDatabaseReference alloc] initWithRepo:self path:path];
  757. // make sure we're listening on this node
  758. // Note: we can't do this asynchronously. To preserve event ordering, it has
  759. // to be done in this block. This is ok, this block is guaranteed to be our
  760. // own event loop
  761. NSUInteger handle = [[FUtilities LUIDGenerator] integerValue];
  762. fbt_void_datasnapshot cb = ^(FIRDataSnapshot *snapshot) {
  763. };
  764. FValueEventRegistration *registration =
  765. [[FValueEventRegistration alloc] initWithRepo:self
  766. handle:handle
  767. callback:cb
  768. cancelCallback:nil];
  769. [watchRef.repo addEventRegistration:registration
  770. forQuery:watchRef.querySpec];
  771. fbt_void_void unwatcher = ^{
  772. [watchRef removeObserverWithHandle:handle];
  773. };
  774. // Save all the data that represents this transaction
  775. FTupleTransaction *transaction = [[FTupleTransaction alloc] init];
  776. transaction.path = path;
  777. transaction.update = update;
  778. transaction.onComplete = onComplete;
  779. transaction.status = FTransactionInitializing;
  780. transaction.order = [FUtilities LUIDGenerator];
  781. transaction.applyLocally = applyLocally;
  782. transaction.retryCount = 0;
  783. transaction.unwatcher = unwatcher;
  784. transaction.currentWriteId = nil;
  785. transaction.currentInputSnapshot = nil;
  786. transaction.currentOutputSnapshotRaw = nil;
  787. transaction.currentOutputSnapshotResolved = nil;
  788. // Run transaction initially
  789. id<FNode> currentState = [self latestStateAtPath:path excludeWriteIds:nil];
  790. transaction.currentInputSnapshot = currentState;
  791. FIRMutableData *mutableCurrent =
  792. [[FIRMutableData alloc] initWithNode:currentState];
  793. FIRTransactionResult *result = transaction.update(mutableCurrent);
  794. if (!result.isSuccess) {
  795. // Abort the transaction
  796. transaction.unwatcher();
  797. transaction.currentOutputSnapshotRaw = nil;
  798. transaction.currentOutputSnapshotResolved = nil;
  799. if (transaction.onComplete) {
  800. FIRDatabaseReference *ref =
  801. [[FIRDatabaseReference alloc] initWithRepo:self
  802. path:transaction.path];
  803. FIndexedNode *indexedNode = [FIndexedNode
  804. indexedNodeWithNode:transaction.currentInputSnapshot];
  805. FIRDataSnapshot *snap =
  806. [[FIRDataSnapshot alloc] initWithRef:ref
  807. indexedNode:indexedNode];
  808. [self.eventRaiser raiseCallback:^{
  809. transaction.onComplete(nil, NO, snap);
  810. }];
  811. }
  812. } else {
  813. // Note: different from js. We don't need to validate, FIRMutableData
  814. // does validation. We also don't have to worry about priorities. Just
  815. // mark as run and add to queue.
  816. transaction.status = FTransactionRun;
  817. FTree *queueNode = [self.transactionQueueTree subTree:transaction.path];
  818. NSMutableArray *nodeQueue = [queueNode getValue];
  819. if (nodeQueue == nil) {
  820. nodeQueue = [[NSMutableArray alloc] init];
  821. }
  822. [nodeQueue addObject:transaction];
  823. [queueNode setValue:nodeQueue];
  824. // Update visibleData and raise events
  825. // Note: We intentionally raise events after updating all of our
  826. // transaction state, since the user could start new transactions from
  827. // the event callbacks
  828. NSDictionary *serverValues =
  829. [FServerValues generateServerValues:self.serverClock];
  830. id<FNode> newValUnresolved = [result.update nodeValue];
  831. id<FNode> newVal =
  832. [FServerValues resolveDeferredValueSnapshot:newValUnresolved
  833. withExisting:currentState
  834. serverValues:serverValues];
  835. transaction.currentOutputSnapshotRaw = newValUnresolved;
  836. transaction.currentOutputSnapshotResolved = newVal;
  837. transaction.currentWriteId =
  838. [NSNumber numberWithInteger:[self nextWriteId]];
  839. NSArray *events = [self.serverSyncTree
  840. applyUserOverwriteAtPath:path
  841. newData:newVal
  842. writeId:[transaction.currentWriteId integerValue]
  843. isVisible:transaction.applyLocally];
  844. [self.eventRaiser raiseEvents:events];
  845. [self sendAllReadyTransactions];
  846. }
  847. }
  848. /**
  849. * @param writeIdsToExclude A specific set to exclude
  850. */
  851. - (id<FNode>)latestStateAtPath:(FPath *)path
  852. excludeWriteIds:(NSArray *)writeIdsToExclude {
  853. id<FNode> latestState =
  854. [self.serverSyncTree calcCompleteEventCacheAtPath:path
  855. excludeWriteIds:writeIdsToExclude];
  856. return latestState ? latestState : [FEmptyNode emptyNode];
  857. }
  858. /**
  859. * Sends any already-run transactions that aren't waiting for outstanding
  860. * transactions to complete.
  861. *
  862. * Externally, call the version with no arguments.
  863. * Internally, calls itself recursively with a particular transactionQueueTree
  864. * node to recurse through the tree
  865. */
  866. - (void)sendAllReadyTransactions {
  867. FTree *node = self.transactionQueueTree;
  868. [self pruneCompletedTransactionsBelowNode:node];
  869. [self sendReadyTransactionsForTree:node];
  870. }
  871. - (void)sendReadyTransactionsForTree:(FTree *)node {
  872. NSMutableArray *queue = [node getValue];
  873. if (queue != nil) {
  874. queue = [self buildTransactionQueueAtNode:node];
  875. NSAssert([queue count] > 0, @"Sending zero length transaction queue");
  876. NSUInteger notRunIndex = [queue
  877. indexOfObjectPassingTest:^BOOL(id obj, NSUInteger idx, BOOL *stop) {
  878. return ((FTupleTransaction *)obj).status != FTransactionRun;
  879. }];
  880. // If they're all run (and not sent), we can send them. Else, we must
  881. // wait.
  882. if (notRunIndex == NSNotFound) {
  883. [self sendTransactionQueue:queue atPath:node.path];
  884. }
  885. } else if ([node hasChildren]) {
  886. [node forEachChild:^(FTree *child) {
  887. [self sendReadyTransactionsForTree:child];
  888. }];
  889. }
  890. }
  891. /**
  892. * Given a list of run transactions, send them to the server and then handle the
  893. * result (success or failure).
  894. */
  895. - (void)sendTransactionQueue:(NSMutableArray *)queue atPath:(FPath *)path {
  896. // Mark transactions as sent and bump the retry count
  897. NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init];
  898. for (FTupleTransaction *transaction in queue) {
  899. [writeIdsToExclude addObject:transaction.currentWriteId];
  900. }
  901. id<FNode> latestState = [self latestStateAtPath:path
  902. excludeWriteIds:writeIdsToExclude];
  903. id<FNode> snapToSend = latestState;
  904. NSString *latestHash = [latestState dataHash];
  905. for (FTupleTransaction *transaction in queue) {
  906. NSAssert(
  907. transaction.status == FTransactionRun,
  908. @"[FRepo sendTransactionQueue:] items in queue should all be run.");
  909. FFLog(@"I-RDB038021", @"Transaction at %@ set to SENT",
  910. transaction.path);
  911. transaction.status = FTransactionSent;
  912. transaction.retryCount++;
  913. FPath *relativePath = [FPath relativePathFrom:path to:transaction.path];
  914. // If we've gotten to this point, the output snapshot must be defined.
  915. snapToSend =
  916. [snapToSend updateChild:relativePath
  917. withNewChild:transaction.currentOutputSnapshotRaw];
  918. }
  919. id dataToSend = [snapToSend valForExport:YES];
  920. NSString *pathToSend = [path description];
  921. latestHash = self.hijackHash ? @"badhash" : latestHash;
  922. // Send the put
  923. [self.connection
  924. putData:dataToSend
  925. forPath:pathToSend
  926. withHash:latestHash
  927. withCallback:^(NSString *status, NSString *errorReason) {
  928. FFLog(@"I-RDB038022", @"Transaction put response: %@ : %@",
  929. pathToSend, status);
  930. NSMutableArray *events = [[NSMutableArray alloc] init];
  931. if ([status isEqualToString:kFWPResponseForActionStatusOk]) {
  932. // Queue up the callbacks and fire them after cleaning up all of
  933. // our transaction state, since the callback could trigger more
  934. // transactions or sets.
  935. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  936. for (FTupleTransaction *transaction in queue) {
  937. transaction.status = FTransactionCompleted;
  938. [events addObjectsFromArray:
  939. [self.serverSyncTree
  940. ackUserWriteWithWriteId:
  941. [transaction.currentWriteId integerValue]
  942. revert:NO
  943. persist:NO
  944. clock:self.serverClock]];
  945. if (transaction.onComplete) {
  946. // We never unset the output snapshot, and given that this
  947. // transaction is complete, it should be set
  948. id<FNode> node =
  949. transaction.currentOutputSnapshotResolved;
  950. FIndexedNode *indexedNode =
  951. [FIndexedNode indexedNodeWithNode:node];
  952. FIRDatabaseReference *ref = [[FIRDatabaseReference alloc]
  953. initWithRepo:self
  954. path:transaction.path];
  955. FIRDataSnapshot *snapshot =
  956. [[FIRDataSnapshot alloc] initWithRef:ref
  957. indexedNode:indexedNode];
  958. fbt_void_void cb = ^{
  959. transaction.onComplete(nil, YES, snapshot);
  960. };
  961. [callbacks addObject:[cb copy]];
  962. }
  963. transaction.unwatcher();
  964. }
  965. // Now remove the completed transactions.
  966. [self
  967. pruneCompletedTransactionsBelowNode:[self.transactionQueueTree
  968. subTree:path]];
  969. // There may be pending transactions that we can now send.
  970. [self sendAllReadyTransactions];
  971. // Finally, trigger onComplete callbacks
  972. [self.eventRaiser raiseCallbacks:callbacks];
  973. } else {
  974. // transactions are no longer sent. Update their status
  975. // appropriately.
  976. if ([status
  977. isEqualToString:kFWPResponseForActionStatusDataStale]) {
  978. for (FTupleTransaction *transaction in queue) {
  979. if (transaction.status == FTransactionSentNeedsAbort) {
  980. transaction.status = FTransactionNeedsAbort;
  981. } else {
  982. transaction.status = FTransactionRun;
  983. }
  984. }
  985. } else {
  986. FFWarn(@"I-RDB038023",
  987. @"runTransactionBlock: at %@ failed: %@", path,
  988. status);
  989. for (FTupleTransaction *transaction in queue) {
  990. transaction.status = FTransactionNeedsAbort;
  991. [transaction setAbortStatus:status reason:errorReason];
  992. }
  993. }
  994. }
  995. [self rerunTransactionsForPath:path];
  996. [self.eventRaiser raiseEvents:events];
  997. }];
  998. }
  999. /**
  1000. * Finds all transactions dependent on the data at changed Path and reruns them.
  1001. *
  1002. * Should be called any time cached data changes.
  1003. *
  1004. * Return the highest path that was affected by rerunning transactions. This is
  1005. * the path at which events need to be raised for.
  1006. */
  1007. - (FPath *)rerunTransactionsForPath:(FPath *)changedPath {
  1008. // For the common case that there are no transactions going on, skip all
  1009. // this!
  1010. if ([self.transactionQueueTree isEmpty]) {
  1011. return changedPath;
  1012. } else {
  1013. FTree *rootMostTransactionNode =
  1014. [self getAncestorTransactionNodeForPath:changedPath];
  1015. FPath *path = rootMostTransactionNode.path;
  1016. NSArray *queue =
  1017. [self buildTransactionQueueAtNode:rootMostTransactionNode];
  1018. [self rerunTransactionQueue:queue atPath:path];
  1019. return path;
  1020. }
  1021. }
  1022. /**
  1023. * Does all the work of rerunning transactions (as well as cleans up aborted
  1024. * transactions and whatnot).
  1025. */
  1026. - (void)rerunTransactionQueue:(NSArray *)queue atPath:(FPath *)path {
  1027. if (queue.count == 0) {
  1028. return; // nothing to do
  1029. }
  1030. // Queue up the callbacks and fire them after cleaning up all of our
  1031. // transaction state, since the callback could trigger more transactions or
  1032. // sets.
  1033. NSMutableArray *events = [[NSMutableArray alloc] init];
  1034. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  1035. // Ignore, by default, all of the sets in this queue, since we're re-running
  1036. // all of them. However, we want to include the results of new sets
  1037. // triggered as part of this re-run, so we don't want to ignore a range,
  1038. // just these specific sets.
  1039. NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init];
  1040. for (FTupleTransaction *transaction in queue) {
  1041. [writeIdsToExclude addObject:transaction.currentWriteId];
  1042. }
  1043. for (FTupleTransaction *transaction in queue) {
  1044. FPath *relativePath __unused =
  1045. [FPath relativePathFrom:path to:transaction.path];
  1046. BOOL abortTransaction = NO;
  1047. NSAssert(relativePath != nil, @"[FRepo rerunTransactionsQueue:] "
  1048. @"relativePath should not be null.");
  1049. if (transaction.status == FTransactionNeedsAbort) {
  1050. abortTransaction = YES;
  1051. if (![transaction.abortStatus
  1052. isEqualToString:kFErrorWriteCanceled]) {
  1053. NSArray *ackEvents = [self.serverSyncTree
  1054. ackUserWriteWithWriteId:[transaction.currentWriteId
  1055. integerValue]
  1056. revert:YES
  1057. persist:NO
  1058. clock:self.serverClock];
  1059. [events addObjectsFromArray:ackEvents];
  1060. }
  1061. } else if (transaction.status == FTransactionRun) {
  1062. if (transaction.retryCount >= kFTransactionMaxRetries) {
  1063. abortTransaction = YES;
  1064. [transaction setAbortStatus:kFTransactionTooManyRetries
  1065. reason:nil];
  1066. [events
  1067. addObjectsFromArray:
  1068. [self.serverSyncTree
  1069. ackUserWriteWithWriteId:[transaction.currentWriteId
  1070. integerValue]
  1071. revert:YES
  1072. persist:NO
  1073. clock:self.serverClock]];
  1074. } else {
  1075. // This code reruns a transaction
  1076. id<FNode> currentNode =
  1077. [self latestStateAtPath:transaction.path
  1078. excludeWriteIds:writeIdsToExclude];
  1079. transaction.currentInputSnapshot = currentNode;
  1080. FIRMutableData *mutableCurrent =
  1081. [[FIRMutableData alloc] initWithNode:currentNode];
  1082. FIRTransactionResult *result =
  1083. transaction.update(mutableCurrent);
  1084. if (result.isSuccess) {
  1085. NSNumber *oldWriteId = transaction.currentWriteId;
  1086. NSDictionary *serverValues =
  1087. [FServerValues generateServerValues:self.serverClock];
  1088. id<FNode> newVal = [result.update nodeValue];
  1089. id<FNode> newValResolved = [FServerValues
  1090. resolveDeferredValueSnapshot:newVal
  1091. withExisting:transaction
  1092. .currentInputSnapshot
  1093. serverValues:serverValues];
  1094. transaction.currentOutputSnapshotRaw = newVal;
  1095. transaction.currentOutputSnapshotResolved = newValResolved;
  1096. transaction.currentWriteId =
  1097. [NSNumber numberWithInteger:[self nextWriteId]];
  1098. // Mutates writeIdsToExclude in place
  1099. [writeIdsToExclude removeObject:oldWriteId];
  1100. [events
  1101. addObjectsFromArray:
  1102. [self.serverSyncTree
  1103. applyUserOverwriteAtPath:transaction.path
  1104. newData:
  1105. transaction
  1106. .currentOutputSnapshotResolved
  1107. writeId:
  1108. [transaction.currentWriteId
  1109. integerValue]
  1110. isVisible:transaction
  1111. .applyLocally]];
  1112. [events addObjectsFromArray:
  1113. [self.serverSyncTree
  1114. ackUserWriteWithWriteId:[oldWriteId
  1115. integerValue]
  1116. revert:YES
  1117. persist:NO
  1118. clock:self.serverClock]];
  1119. } else {
  1120. abortTransaction = YES;
  1121. // The user aborted the transaction. JS treats ths as a
  1122. // "nodata" abort, but it's not an error, so we don't send
  1123. // them an error.
  1124. [transaction setAbortStatus:nil reason:nil];
  1125. [events
  1126. addObjectsFromArray:
  1127. [self.serverSyncTree
  1128. ackUserWriteWithWriteId:
  1129. [transaction.currentWriteId integerValue]
  1130. revert:YES
  1131. persist:NO
  1132. clock:self.serverClock]];
  1133. }
  1134. }
  1135. }
  1136. [self.eventRaiser raiseEvents:events];
  1137. events = nil;
  1138. if (abortTransaction) {
  1139. // Abort
  1140. transaction.status = FTransactionCompleted;
  1141. transaction.unwatcher();
  1142. if (transaction.onComplete) {
  1143. FIRDatabaseReference *ref = [[FIRDatabaseReference alloc]
  1144. initWithRepo:self
  1145. path:transaction.path];
  1146. FIndexedNode *lastInput = [FIndexedNode
  1147. indexedNodeWithNode:transaction.currentInputSnapshot];
  1148. FIRDataSnapshot *snap =
  1149. [[FIRDataSnapshot alloc] initWithRef:ref
  1150. indexedNode:lastInput];
  1151. fbt_void_void cb = ^{
  1152. // Unlike JS, no need to check for "nodata" because ObjC has
  1153. // abortError = nil
  1154. transaction.onComplete(transaction.abortError, NO, snap);
  1155. };
  1156. [callbacks addObject:[cb copy]];
  1157. }
  1158. }
  1159. }
  1160. // Note: unlike current js client, we don't need to preserve priority. Users
  1161. // can set priority via FIRMutableData
  1162. // Clean up completed transactions.
  1163. [self pruneCompletedTransactionsBelowNode:self.transactionQueueTree];
  1164. // Now fire callbacks, now that we're in a good, known state.
  1165. [self.eventRaiser raiseCallbacks:callbacks];
  1166. // Try to send the transaction result to the server
  1167. [self sendAllReadyTransactions];
  1168. }
  1169. - (FTree *)getAncestorTransactionNodeForPath:(FPath *)path {
  1170. FTree *transactionNode = self.transactionQueueTree;
  1171. while (![path isEmpty] && [transactionNode getValue] == nil) {
  1172. NSString *front = [path getFront];
  1173. transactionNode =
  1174. [transactionNode subTree:[[FPath alloc] initWith:front]];
  1175. path = [path popFront];
  1176. }
  1177. return transactionNode;
  1178. }
  1179. - (NSMutableArray *)buildTransactionQueueAtNode:(FTree *)node {
  1180. NSMutableArray *queue = [[NSMutableArray alloc] init];
  1181. [self aggregateTransactionQueuesForNode:node andQueue:queue];
  1182. [queue sortUsingComparator:^NSComparisonResult(FTupleTransaction *obj1,
  1183. FTupleTransaction *obj2) {
  1184. return [obj1.order compare:obj2.order];
  1185. }];
  1186. return queue;
  1187. }
  1188. - (void)aggregateTransactionQueuesForNode:(FTree *)node
  1189. andQueue:(NSMutableArray *)queue {
  1190. NSArray *nodeQueue = [node getValue];
  1191. [queue addObjectsFromArray:nodeQueue];
  1192. [node forEachChild:^(FTree *child) {
  1193. [self aggregateTransactionQueuesForNode:child andQueue:queue];
  1194. }];
  1195. }
  1196. /**
  1197. * Remove COMPLETED transactions at or below this node in the
  1198. * transactionQueueTree
  1199. */
  1200. - (void)pruneCompletedTransactionsBelowNode:(FTree *)node {
  1201. NSMutableArray *queue = [node getValue];
  1202. if (queue != nil) {
  1203. int i = 0;
  1204. // remove all of the completed transactions from the queue
  1205. while (i < queue.count) {
  1206. FTupleTransaction *transaction = [queue objectAtIndex:i];
  1207. if (transaction.status == FTransactionCompleted) {
  1208. [queue removeObjectAtIndex:i];
  1209. } else {
  1210. i++;
  1211. }
  1212. }
  1213. if (queue.count > 0) {
  1214. [node setValue:queue];
  1215. } else {
  1216. [node setValue:nil];
  1217. }
  1218. }
  1219. [node forEachChildMutationSafe:^(FTree *child) {
  1220. [self pruneCompletedTransactionsBelowNode:child];
  1221. }];
  1222. }
  1223. /**
  1224. * Aborts all transactions on ancestors or descendants of the specified path.
  1225. * Called when doing a setValue: or updateChildValues: since we consider them
  1226. * incompatible with transactions
  1227. *
  1228. * @param path path for which we want to abort related transactions.
  1229. */
  1230. - (FPath *)abortTransactionsAtPath:(FPath *)path error:(NSString *)error {
  1231. // For the common case that there are no transactions going on, skip all
  1232. // this!
  1233. if ([self.transactionQueueTree isEmpty]) {
  1234. return path;
  1235. } else {
  1236. FPath *affectedPath =
  1237. [self getAncestorTransactionNodeForPath:path].path;
  1238. FTree *transactionNode = [self.transactionQueueTree subTree:path];
  1239. [transactionNode forEachAncestor:^BOOL(FTree *ancestor) {
  1240. [self abortTransactionsAtNode:ancestor error:error];
  1241. return NO;
  1242. }];
  1243. [self abortTransactionsAtNode:transactionNode error:error];
  1244. [transactionNode forEachDescendant:^(FTree *child) {
  1245. [self abortTransactionsAtNode:child error:error];
  1246. }];
  1247. return affectedPath;
  1248. }
  1249. }
  1250. /**
  1251. * Abort transactions stored in this transactions queue node.
  1252. *
  1253. * @param node Node to abort transactions for.
  1254. */
  1255. - (void)abortTransactionsAtNode:(FTree *)node error:(NSString *)error {
  1256. NSMutableArray *queue = [node getValue];
  1257. if (queue != nil) {
  1258. // Queue up the callbacks and fire them after cleaning up all of our
  1259. // transaction state, since can be immediately aborted and removed.
  1260. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  1261. // Go through queue. Any already-sent transactions must be marked for
  1262. // abort, while the unsent ones can be immediately aborted and removed
  1263. NSMutableArray *events = [[NSMutableArray alloc] init];
  1264. int lastSent = -1;
  1265. // Note: all of the sent transactions will be at the front of the queue,
  1266. // so safe to increment lastSent
  1267. for (FTupleTransaction *transaction in queue) {
  1268. if (transaction.status == FTransactionSentNeedsAbort) {
  1269. // No-op. already marked.
  1270. } else if (transaction.status == FTransactionSent) {
  1271. // Mark this transaction for abort when it returns
  1272. lastSent++;
  1273. transaction.status = FTransactionSentNeedsAbort;
  1274. [transaction setAbortStatus:error reason:nil];
  1275. } else {
  1276. // we can abort this immediately
  1277. transaction.unwatcher();
  1278. if ([error isEqualToString:kFTransactionSet]) {
  1279. [events
  1280. addObjectsFromArray:
  1281. [self.serverSyncTree
  1282. ackUserWriteWithWriteId:
  1283. [transaction.currentWriteId integerValue]
  1284. revert:YES
  1285. persist:NO
  1286. clock:self.serverClock]];
  1287. } else {
  1288. // If it was cancelled it was already removed from the sync
  1289. // tree, no need to ack
  1290. NSAssert([error isEqualToString:kFErrorWriteCanceled], nil);
  1291. }
  1292. if (transaction.onComplete) {
  1293. NSError *abortReason = [FUtilities errorForStatus:error
  1294. andReason:nil];
  1295. FIRDataSnapshot *snapshot = nil;
  1296. fbt_void_void cb = ^{
  1297. transaction.onComplete(abortReason, NO, snapshot);
  1298. };
  1299. [callbacks addObject:[cb copy]];
  1300. }
  1301. }
  1302. }
  1303. if (lastSent == -1) {
  1304. // We're not waiting for any sent transactions. We can clear the
  1305. // queue.
  1306. [node setValue:nil];
  1307. } else {
  1308. // Remove the transactions we aborted
  1309. NSRange theRange;
  1310. theRange.location = lastSent + 1;
  1311. theRange.length = queue.count - theRange.location;
  1312. [queue removeObjectsInRange:theRange];
  1313. }
  1314. // Now fire the callbacks
  1315. [self.eventRaiser raiseEvents:events];
  1316. [self.eventRaiser raiseCallbacks:callbacks];
  1317. }
  1318. }
  1319. @end