FRepo.m 60 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <Foundation/Foundation.h>
  17. #import "FAtomicNumber.h"
  18. #import "FCachePolicy.h"
  19. #import "FClock.h"
  20. #import "FConstants.h"
  21. #import "FEmptyNode.h"
  22. #import "FEventRaiser.h"
  23. #import "FEventRegistration.h"
  24. #import "FIRDataSnapshot.h"
  25. #import "FIRDataSnapshot_Private.h"
  26. #import "FIRDatabaseConfig_Private.h"
  27. #import "FIRDatabaseQuery_Private.h"
  28. #import "FIRDatabase_Private.h"
  29. #import "FIRMutableData.h"
  30. #import "FIRMutableData_Private.h"
  31. #import "FIRTransactionResult.h"
  32. #import "FIRTransactionResult_Private.h"
  33. #import "FLevelDBStorageEngine.h"
  34. #import "FListenProvider.h"
  35. #import "FPersistenceManager.h"
  36. #import "FQuerySpec.h"
  37. #import "FRepo.h"
  38. #import "FRepoManager.h"
  39. #import "FRepo_Private.h"
  40. #import "FServerValues.h"
  41. #import "FSnapshotHolder.h"
  42. #import "FSnapshotUtilities.h"
  43. #import "FSyncTree.h"
  44. #import "FTree.h"
  45. #import "FTupleNodePath.h"
  46. #import "FTupleSetIdPath.h"
  47. #import "FTupleTransaction.h"
  48. #import "FValueEventRegistration.h"
  49. #import "FWriteRecord.h"
  50. #import <FirebaseCore/FIRLogger.h>
  51. #import <dlfcn.h>
  52. #if TARGET_OS_IOS || TARGET_OS_TV
  53. #import <UIKit/UIKit.h>
  54. #endif
  55. @interface FRepo ()
  56. @property(nonatomic, strong) FOffsetClock *serverClock;
  57. @property(nonatomic, strong) FPersistenceManager *persistenceManager;
  58. @property(nonatomic, strong) FIRDatabase *database;
  59. @property(nonatomic, strong, readwrite) FAuthenticationManager *auth;
  60. @property(nonatomic, strong) FSyncTree *infoSyncTree;
  61. @property(nonatomic) NSInteger writeIdCounter;
  62. @property(nonatomic) BOOL hijackHash;
  63. @property(nonatomic, strong) FTree *transactionQueueTree;
  64. @property(nonatomic) BOOL loggedTransactionPersistenceWarning;
  65. /**
  66. * Test only. For load testing the server.
  67. */
  68. @property(nonatomic, strong) id (^interceptServerDataCallback)
  69. (NSString *pathString, id data);
  70. @end
  71. @implementation FRepo
  72. - (id)initWithRepoInfo:(FRepoInfo *)info
  73. config:(FIRDatabaseConfig *)config
  74. database:(FIRDatabase *)database {
  75. self = [super init];
  76. if (self) {
  77. self.repoInfo = info;
  78. self.config = config;
  79. self.database = database;
  80. // Access can occur outside of shared queue, so the clock needs to be
  81. // initialized here
  82. self.serverClock =
  83. [[FOffsetClock alloc] initWithClock:[FSystemClock clock] offset:0];
  84. self.connection = [[FPersistentConnection alloc]
  85. initWithRepoInfo:self.repoInfo
  86. dispatchQueue:[FIRDatabaseQuery sharedQueue]
  87. config:self.config];
  88. // Needs to be called before authentication manager is instantiated
  89. self.eventRaiser =
  90. [[FEventRaiser alloc] initWithQueue:self.config.callbackQueue];
  91. dispatch_async([FIRDatabaseQuery sharedQueue], ^{
  92. [self deferredInit];
  93. });
  94. }
  95. return self;
  96. }
  97. - (void)deferredInit {
  98. // TODO: cleanup on dealloc
  99. __weak FRepo *weakSelf = self;
  100. [self.config.authTokenProvider listenForTokenChanges:^(NSString *token) {
  101. [weakSelf.connection refreshAuthToken:token];
  102. }];
  103. // Open connection now so that by the time we are connected the deferred
  104. // init has run This relies on the fact that all callbacks run on repos
  105. // queue
  106. self.connection.delegate = self;
  107. [self.connection open];
  108. self.dataUpdateCount = 0;
  109. self.rangeMergeUpdateCount = 0;
  110. self.interceptServerDataCallback = nil;
  111. if (self.config.persistenceEnabled) {
  112. NSString *repoHashString =
  113. [NSString stringWithFormat:@"%@_%@", self.repoInfo.host,
  114. self.repoInfo.namespace];
  115. NSString *persistencePrefix =
  116. [NSString stringWithFormat:@"%@/%@", self.config.sessionIdentifier,
  117. repoHashString];
  118. id<FCachePolicy> cachePolicy = [[FLRUCachePolicy alloc]
  119. initWithMaxSize:self.config.persistenceCacheSizeBytes];
  120. id<FStorageEngine> engine;
  121. if (self.config.forceStorageEngine != nil) {
  122. engine = self.config.forceStorageEngine;
  123. } else {
  124. FLevelDBStorageEngine *levelDBEngine =
  125. [[FLevelDBStorageEngine alloc] initWithPath:persistencePrefix];
  126. // We need the repo info to run the legacy migration. Future
  127. // migrations will be managed by the database itself Remove this
  128. // once we are confident that no-one is using legacy migration
  129. // anymore...
  130. [levelDBEngine runLegacyMigration:self.repoInfo];
  131. engine = levelDBEngine;
  132. }
  133. self.persistenceManager =
  134. [[FPersistenceManager alloc] initWithStorageEngine:engine
  135. cachePolicy:cachePolicy];
  136. } else {
  137. self.persistenceManager = nil;
  138. }
  139. [self initTransactions];
  140. // A list of data pieces and paths to be set when this client disconnects
  141. self.onDisconnect = [[FSparseSnapshotTree alloc] init];
  142. self.infoData = [[FSnapshotHolder alloc] init];
  143. FListenProvider *infoListenProvider = [[FListenProvider alloc] init];
  144. infoListenProvider.startListening =
  145. ^(FQuerySpec *query, NSNumber *tagId, id<FSyncTreeHash> hash,
  146. fbt_nsarray_nsstring onComplete) {
  147. NSArray *infoEvents = @[];
  148. FRepo *strongSelf = weakSelf;
  149. id<FNode> node = [strongSelf.infoData getNode:query.path];
  150. // This is possibly a hack, but we have different semantics for .info
  151. // endpoints. We don't raise null events on initial data...
  152. if (![node isEmpty]) {
  153. infoEvents =
  154. [strongSelf.infoSyncTree applyServerOverwriteAtPath:query.path
  155. newData:node];
  156. [strongSelf.eventRaiser raiseCallback:^{
  157. onComplete(kFWPResponseForActionStatusOk);
  158. }];
  159. }
  160. return infoEvents;
  161. };
  162. infoListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tagId) {
  163. };
  164. self.infoSyncTree =
  165. [[FSyncTree alloc] initWithListenProvider:infoListenProvider];
  166. FListenProvider *serverListenProvider = [[FListenProvider alloc] init];
  167. serverListenProvider.startListening =
  168. ^(FQuerySpec *query, NSNumber *tagId, id<FSyncTreeHash> hash,
  169. fbt_nsarray_nsstring onComplete) {
  170. [weakSelf.connection listen:query
  171. tagId:tagId
  172. hash:hash
  173. onComplete:^(NSString *status) {
  174. NSArray *events = onComplete(status);
  175. [weakSelf.eventRaiser raiseEvents:events];
  176. }];
  177. // No synchronous events for network-backed sync trees
  178. return @[];
  179. };
  180. serverListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tag) {
  181. [weakSelf.connection unlisten:query tagId:tag];
  182. };
  183. self.serverSyncTree =
  184. [[FSyncTree alloc] initWithPersistenceManager:self.persistenceManager
  185. listenProvider:serverListenProvider];
  186. [self restoreWrites];
  187. [self updateInfo:kDotInfoConnected withValue:@NO];
  188. [self setupNotifications];
  189. }
  190. - (void)restoreWrites {
  191. NSArray *writes = self.persistenceManager.userWrites;
  192. NSDictionary *serverValues =
  193. [FServerValues generateServerValues:self.serverClock];
  194. __block NSInteger lastWriteId = NSIntegerMin;
  195. [writes enumerateObjectsUsingBlock:^(FWriteRecord *write, NSUInteger idx,
  196. BOOL *stop) {
  197. NSInteger writeId = write.writeId;
  198. fbt_void_nsstring_nsstring callback =
  199. ^(NSString *status, NSString *errorReason) {
  200. [self warnIfWriteFailedAtPath:write.path
  201. status:status
  202. message:@"Persisted write"];
  203. [self ackWrite:writeId
  204. rerunTransactionsAtPath:write.path
  205. status:status];
  206. };
  207. if (lastWriteId >= writeId) {
  208. [NSException raise:NSInternalInconsistencyException
  209. format:@"Restored writes were not in order!"];
  210. }
  211. lastWriteId = writeId;
  212. self.writeIdCounter = writeId + 1;
  213. id<FNode> existing =
  214. [self.serverSyncTree calcCompleteEventCacheAtPath:write.path
  215. excludeWriteIds:@[]];
  216. if ([write isOverwrite]) {
  217. FFLog(@"I-RDB038001", @"Restoring overwrite with id %ld",
  218. (long)write.writeId);
  219. [self.connection putData:[write.overwrite valForExport:YES]
  220. forPath:[write.path toString]
  221. withHash:nil
  222. withCallback:callback];
  223. id<FNode> resolved =
  224. [FServerValues resolveDeferredValueSnapshot:write.overwrite
  225. withExisting:existing
  226. serverValues:serverValues];
  227. [self.serverSyncTree applyUserOverwriteAtPath:write.path
  228. newData:resolved
  229. writeId:writeId
  230. isVisible:YES];
  231. } else {
  232. FFLog(@"I-RDB038002", @"Restoring merge with id %ld",
  233. (long)write.writeId);
  234. [self.connection mergeData:[write.merge valForExport:YES]
  235. forPath:[write.path toString]
  236. withCallback:callback];
  237. FCompoundWrite *resolved =
  238. [FServerValues resolveDeferredValueCompoundWrite:write.merge
  239. withExisting:existing
  240. serverValues:serverValues];
  241. [self.serverSyncTree applyUserMergeAtPath:write.path
  242. changedChildren:resolved
  243. writeId:writeId];
  244. }
  245. }];
  246. }
  247. - (NSString *)name {
  248. return self.repoInfo.namespace;
  249. }
  250. - (NSString *)description {
  251. return [self.repoInfo description];
  252. }
  253. - (void)interrupt {
  254. [self.connection interruptForReason:kFInterruptReasonRepoInterrupt];
  255. }
  256. - (void)resume {
  257. [self.connection resumeForReason:kFInterruptReasonRepoInterrupt];
  258. }
  259. // NOTE: Typically if you're calling this, you should be in an @autoreleasepool
  260. // block to make sure that ARC kicks in and cleans up things no longer
  261. // referenced (i.e. pendingPutsDB).
  262. - (void)dispose {
  263. [self.connection interruptForReason:kFInterruptReasonRepoInterrupt];
  264. // We need to nil out any references to LevelDB, to make sure the
  265. // LevelDB exclusive locks are released.
  266. [self.persistenceManager close];
  267. }
  268. - (NSInteger)nextWriteId {
  269. return self->_writeIdCounter++;
  270. }
  271. - (NSTimeInterval)serverTime {
  272. return [self.serverClock currentTime];
  273. }
  274. - (void)set:(FPath *)path
  275. withNode:(id<FNode>)node
  276. withCallback:(fbt_void_nserror_ref)onComplete {
  277. id value = [node valForExport:YES];
  278. FFLog(@"I-RDB038003", @"Setting: %@ with %@ pri: %@", [path toString],
  279. [value description], [[node getPriority] val]);
  280. // TODO: Optimize this behavior to either (a) store flag to skip resolving
  281. // where possible and / or (b) store unresolved paths on JSON parse
  282. NSDictionary *serverValues =
  283. [FServerValues generateServerValues:self.serverClock];
  284. id<FNode> existing = [self.serverSyncTree calcCompleteEventCacheAtPath:path
  285. excludeWriteIds:@[]];
  286. id<FNode> newNode =
  287. [FServerValues resolveDeferredValueSnapshot:node
  288. withExisting:existing
  289. serverValues:serverValues];
  290. NSInteger writeId = [self nextWriteId];
  291. [self.persistenceManager saveUserOverwrite:node
  292. atPath:path
  293. writeId:writeId];
  294. NSArray *events = [self.serverSyncTree applyUserOverwriteAtPath:path
  295. newData:newNode
  296. writeId:writeId
  297. isVisible:YES];
  298. [self.eventRaiser raiseEvents:events];
  299. [self.connection putData:value
  300. forPath:[path toString]
  301. withHash:nil
  302. withCallback:^(NSString *status, NSString *errorReason) {
  303. [self warnIfWriteFailedAtPath:path
  304. status:status
  305. message:@"setValue: or removeValue:"];
  306. [self ackWrite:writeId
  307. rerunTransactionsAtPath:path
  308. status:status];
  309. [self callOnComplete:onComplete
  310. withStatus:status
  311. errorReason:errorReason
  312. andPath:path];
  313. }];
  314. FPath *affectedPath = [self abortTransactionsAtPath:path
  315. error:kFTransactionSet];
  316. [self rerunTransactionsForPath:affectedPath];
  317. }
  318. - (void)update:(FPath *)path
  319. withNodes:(FCompoundWrite *)nodes
  320. withCallback:(fbt_void_nserror_ref)callback {
  321. NSDictionary *values = [nodes valForExport:YES];
  322. FFLog(@"I-RDB038004", @"Updating: %@ with %@", [path toString],
  323. [values description]);
  324. NSDictionary *serverValues =
  325. [FServerValues generateServerValues:self.serverClock];
  326. id<FNode> existing = [self.serverSyncTree calcCompleteEventCacheAtPath:path
  327. excludeWriteIds:@[]];
  328. FCompoundWrite *resolved =
  329. [FServerValues resolveDeferredValueCompoundWrite:nodes
  330. withExisting:existing
  331. serverValues:serverValues];
  332. if (!resolved.isEmpty) {
  333. NSInteger writeId = [self nextWriteId];
  334. [self.persistenceManager saveUserMerge:nodes
  335. atPath:path
  336. writeId:writeId];
  337. NSArray *events = [self.serverSyncTree applyUserMergeAtPath:path
  338. changedChildren:resolved
  339. writeId:writeId];
  340. [self.eventRaiser raiseEvents:events];
  341. [self.connection mergeData:values
  342. forPath:[path description]
  343. withCallback:^(NSString *status, NSString *errorReason) {
  344. [self warnIfWriteFailedAtPath:path
  345. status:status
  346. message:@"updateChildValues:"];
  347. [self ackWrite:writeId
  348. rerunTransactionsAtPath:path
  349. status:status];
  350. [self callOnComplete:callback
  351. withStatus:status
  352. errorReason:errorReason
  353. andPath:path];
  354. }];
  355. [nodes enumerateWrites:^(FPath *childPath, id<FNode> node, BOOL *stop) {
  356. FPath *pathFromRoot = [path child:childPath];
  357. FFLog(@"I-RDB038005", @"Cancelling transactions at path: %@",
  358. pathFromRoot);
  359. FPath *affectedPath = [self abortTransactionsAtPath:pathFromRoot
  360. error:kFTransactionSet];
  361. [self rerunTransactionsForPath:affectedPath];
  362. }];
  363. } else {
  364. FFLog(@"I-RDB038006", @"update called with empty data. Doing nothing");
  365. // Do nothing, just call the callback
  366. [self callOnComplete:callback
  367. withStatus:@"ok"
  368. errorReason:nil
  369. andPath:path];
  370. }
  371. }
  372. - (void)onDisconnectCancel:(FPath *)path
  373. withCallback:(fbt_void_nserror_ref)callback {
  374. [self.connection
  375. onDisconnectCancelPath:path
  376. withCallback:^(NSString *status, NSString *errorReason) {
  377. BOOL success =
  378. [status isEqualToString:kFWPResponseForActionStatusOk];
  379. if (success) {
  380. [self.onDisconnect forgetPath:path];
  381. } else {
  382. FFLog(@"I-RDB038007",
  383. @"cancelDisconnectOperations: at %@ failed: %@",
  384. path, status);
  385. }
  386. [self callOnComplete:callback
  387. withStatus:status
  388. errorReason:errorReason
  389. andPath:path];
  390. }];
  391. }
  392. - (void)onDisconnectSet:(FPath *)path
  393. withNode:(id<FNode>)node
  394. withCallback:(fbt_void_nserror_ref)callback {
  395. [self.connection
  396. onDisconnectPutData:[node valForExport:YES]
  397. forPath:path
  398. withCallback:^(NSString *status, NSString *errorReason) {
  399. BOOL success =
  400. [status isEqualToString:kFWPResponseForActionStatusOk];
  401. if (success) {
  402. [self.onDisconnect rememberData:node onPath:path];
  403. } else {
  404. FFWarn(@"I-RDB038008",
  405. @"onDisconnectSetValue: or "
  406. @"onDisconnectRemoveValue: at %@ failed: %@",
  407. path, status);
  408. }
  409. [self callOnComplete:callback
  410. withStatus:status
  411. errorReason:errorReason
  412. andPath:path];
  413. }];
  414. }
  415. - (void)onDisconnectUpdate:(FPath *)path
  416. withNodes:(FCompoundWrite *)nodes
  417. withCallback:(fbt_void_nserror_ref)callback {
  418. if (!nodes.isEmpty) {
  419. NSDictionary *values = [nodes valForExport:YES];
  420. [self.connection
  421. onDisconnectMergeData:values
  422. forPath:path
  423. withCallback:^(NSString *status, NSString *errorReason) {
  424. BOOL success = [status
  425. isEqualToString:kFWPResponseForActionStatusOk];
  426. if (success) {
  427. [nodes enumerateWrites:^(FPath *relativePath,
  428. id<FNode> nodeUnresolved,
  429. BOOL *stop) {
  430. FPath *childPath = [path child:relativePath];
  431. [self.onDisconnect rememberData:nodeUnresolved
  432. onPath:childPath];
  433. }];
  434. } else {
  435. FFWarn(@"I-RDB038009",
  436. @"onDisconnectUpdateChildValues: at %@ "
  437. @"failed %@",
  438. path, status);
  439. }
  440. [self callOnComplete:callback
  441. withStatus:status
  442. errorReason:errorReason
  443. andPath:path];
  444. }];
  445. } else {
  446. // Do nothing, just call the callback
  447. [self callOnComplete:callback
  448. withStatus:@"ok"
  449. errorReason:nil
  450. andPath:path];
  451. }
  452. }
  453. - (void)purgeOutstandingWrites {
  454. FFLog(@"I-RDB038010", @"Purging outstanding writes");
  455. NSArray *events = [self.serverSyncTree removeAllWrites];
  456. [self.eventRaiser raiseEvents:events];
  457. // Abort any transactions
  458. [self abortTransactionsAtPath:[FPath empty] error:kFErrorWriteCanceled];
  459. // Remove outstanding writes from connection
  460. [self.connection purgeOutstandingWrites];
  461. }
  462. - (void)addEventRegistration:(id<FEventRegistration>)eventRegistration
  463. forQuery:(FQuerySpec *)query {
  464. NSArray *events = nil;
  465. if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) {
  466. events = [self.infoSyncTree addEventRegistration:eventRegistration
  467. forQuery:query];
  468. } else {
  469. events = [self.serverSyncTree addEventRegistration:eventRegistration
  470. forQuery:query];
  471. }
  472. [self.eventRaiser raiseEvents:events];
  473. }
  474. - (void)removeEventRegistration:(id<FEventRegistration>)eventRegistration
  475. forQuery:(FQuerySpec *)query {
  476. // These are guaranteed not to raise events, since we're not passing in a
  477. // cancelError. However we can future-proof a little bit by handling the
  478. // return values anyways.
  479. FFLog(@"I-RDB038011", @"Removing event registration with hande: %lu",
  480. (unsigned long)eventRegistration.handle);
  481. NSArray *events = nil;
  482. if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) {
  483. events = [self.infoSyncTree removeEventRegistration:eventRegistration
  484. forQuery:query
  485. cancelError:nil];
  486. } else {
  487. events = [self.serverSyncTree removeEventRegistration:eventRegistration
  488. forQuery:query
  489. cancelError:nil];
  490. }
  491. [self.eventRaiser raiseEvents:events];
  492. }
  493. - (void)keepQuery:(FQuerySpec *)query synced:(BOOL)synced {
  494. NSAssert(![[query.path getFront] isEqualToString:kDotInfoPrefix],
  495. @"Can't keep .info tree synced!");
  496. [self.serverSyncTree keepQuery:query synced:synced];
  497. }
  498. - (void)updateInfo:(NSString *)pathString withValue:(id)value {
  499. // hack to make serverTimeOffset available in a threadsafe way. Property is
  500. // marked as atomic
  501. if ([pathString isEqualToString:kDotInfoServerTimeOffset]) {
  502. NSTimeInterval offset = [(NSNumber *)value doubleValue] / 1000.0;
  503. self.serverClock =
  504. [[FOffsetClock alloc] initWithClock:[FSystemClock clock]
  505. offset:offset];
  506. }
  507. FPath *path = [[FPath alloc]
  508. initWith:[NSString
  509. stringWithFormat:@"%@/%@", kDotInfoPrefix, pathString]];
  510. id<FNode> newNode = [FSnapshotUtilities nodeFrom:value];
  511. [self.infoData updateSnapshot:path withNewSnapshot:newNode];
  512. NSArray *events = [self.infoSyncTree applyServerOverwriteAtPath:path
  513. newData:newNode];
  514. [self.eventRaiser raiseEvents:events];
  515. }
  516. - (void)callOnComplete:(fbt_void_nserror_ref)onComplete
  517. withStatus:(NSString *)status
  518. errorReason:(NSString *)errorReason
  519. andPath:(FPath *)path {
  520. if (onComplete) {
  521. FIRDatabaseReference *ref =
  522. [[FIRDatabaseReference alloc] initWithRepo:self path:path];
  523. BOOL statusOk = [status isEqualToString:kFWPResponseForActionStatusOk];
  524. NSError *err = nil;
  525. if (!statusOk) {
  526. err = [FUtilities errorForStatus:status andReason:errorReason];
  527. }
  528. [self.eventRaiser raiseCallback:^{
  529. onComplete(err, ref);
  530. }];
  531. }
  532. }
  533. - (void)ackWrite:(NSInteger)writeId
  534. rerunTransactionsAtPath:(FPath *)path
  535. status:(NSString *)status {
  536. if ([status isEqualToString:kFErrorWriteCanceled]) {
  537. // This write was already removed, we just need to ignore it...
  538. } else {
  539. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  540. NSArray *clearEvents =
  541. [self.serverSyncTree ackUserWriteWithWriteId:writeId
  542. revert:!success
  543. persist:YES
  544. clock:self.serverClock];
  545. if ([clearEvents count] > 0) {
  546. [self rerunTransactionsForPath:path];
  547. }
  548. [self.eventRaiser raiseEvents:clearEvents];
  549. }
  550. }
  551. - (void)warnIfWriteFailedAtPath:(FPath *)path
  552. status:(NSString *)status
  553. message:(NSString *)message {
  554. if (!([status isEqualToString:kFWPResponseForActionStatusOk] ||
  555. [status isEqualToString:kFErrorWriteCanceled])) {
  556. FFWarn(@"I-RDB038012", @"%@ at %@ failed: %@", message, path, status);
  557. }
  558. }
  559. #pragma mark -
  560. #pragma mark FPersistentConnectionDelegate methods
  561. - (void)onDataUpdate:(FPersistentConnection *)fpconnection
  562. forPath:(NSString *)pathString
  563. message:(id)data
  564. isMerge:(BOOL)isMerge
  565. tagId:(NSNumber *)tagId {
  566. FFLog(@"I-RDB038013", @"onDataUpdateForPath: %@ withMessage: %@",
  567. pathString, data);
  568. // For testing.
  569. self.dataUpdateCount++;
  570. FPath *path = [[FPath alloc] initWith:pathString];
  571. data = self.interceptServerDataCallback
  572. ? self.interceptServerDataCallback(pathString, data)
  573. : data;
  574. NSArray *events = nil;
  575. if (tagId != nil) {
  576. if (isMerge) {
  577. NSDictionary *message = data;
  578. FCompoundWrite *taggedChildren =
  579. [FCompoundWrite compoundWriteWithValueDictionary:message];
  580. events =
  581. [self.serverSyncTree applyTaggedQueryMergeAtPath:path
  582. changedChildren:taggedChildren
  583. tagId:tagId];
  584. } else {
  585. id<FNode> taggedSnap = [FSnapshotUtilities nodeFrom:data];
  586. events =
  587. [self.serverSyncTree applyTaggedQueryOverwriteAtPath:path
  588. newData:taggedSnap
  589. tagId:tagId];
  590. }
  591. } else if (isMerge) {
  592. NSDictionary *message = data;
  593. FCompoundWrite *changedChildren =
  594. [FCompoundWrite compoundWriteWithValueDictionary:message];
  595. events = [self.serverSyncTree applyServerMergeAtPath:path
  596. changedChildren:changedChildren];
  597. } else {
  598. id<FNode> snap = [FSnapshotUtilities nodeFrom:data];
  599. events = [self.serverSyncTree applyServerOverwriteAtPath:path
  600. newData:snap];
  601. }
  602. if ([events count] > 0) {
  603. // Since we have a listener outstanding for each transaction, receiving
  604. // any events is a proxy for some change having occurred.
  605. [self rerunTransactionsForPath:path];
  606. }
  607. [self.eventRaiser raiseEvents:events];
  608. }
  609. - (void)onRangeMerge:(NSArray *)ranges
  610. forPath:(NSString *)pathString
  611. tagId:(NSNumber *)tag {
  612. FFLog(@"I-RDB038014", @"onRangeMerge: %@ => %@", pathString, ranges);
  613. // For testing
  614. self.rangeMergeUpdateCount++;
  615. FPath *path = [[FPath alloc] initWith:pathString];
  616. NSArray *events;
  617. if (tag != nil) {
  618. events = [self.serverSyncTree applyTaggedServerRangeMergeAtPath:path
  619. updates:ranges
  620. tagId:tag];
  621. } else {
  622. events = [self.serverSyncTree applyServerRangeMergeAtPath:path
  623. updates:ranges];
  624. }
  625. if (events.count > 0) {
  626. // Since we have a listener outstanding for each transaction, receiving
  627. // any events is a proxy for some change having occurred.
  628. [self rerunTransactionsForPath:path];
  629. }
  630. [self.eventRaiser raiseEvents:events];
  631. }
  632. - (void)onConnect:(FPersistentConnection *)fpconnection {
  633. [self updateInfo:kDotInfoConnected withValue:@YES];
  634. }
  635. - (void)onDisconnect:(FPersistentConnection *)fpconnection {
  636. [self updateInfo:kDotInfoConnected withValue:@NO];
  637. [self runOnDisconnectEvents];
  638. }
  639. - (void)onServerInfoUpdate:(FPersistentConnection *)fpconnection
  640. updates:(NSDictionary *)updates {
  641. for (NSString *key in updates) {
  642. id val = [updates objectForKey:key];
  643. [self updateInfo:key withValue:val];
  644. }
  645. }
  646. - (void)setupNotifications {
  647. NSString *const *backgroundConstant = (NSString *const *)dlsym(
  648. RTLD_DEFAULT, "UIApplicationDidEnterBackgroundNotification");
  649. if (backgroundConstant) {
  650. FFLog(@"I-RDB038015", @"Registering for background notification.");
  651. [[NSNotificationCenter defaultCenter]
  652. addObserver:self
  653. selector:@selector(didEnterBackground)
  654. name:*backgroundConstant
  655. object:nil];
  656. } else {
  657. FFLog(@"I-RDB038016",
  658. @"Skipped registering for background notification.");
  659. }
  660. }
  661. - (void)didEnterBackground {
  662. if (!self.config.persistenceEnabled)
  663. return;
  664. // Targetted compilation is ONLY for testing. UIKit is weak-linked in actual
  665. // release build.
  666. #if TARGET_OS_IOS || TARGET_OS_TV
  667. // The idea is to wait until any outstanding sets get written to disk. Since
  668. // the sets might still be in our dispatch queue, we wait for the dispatch
  669. // queue to catch up and for persistence to catch up. This may be
  670. // undesirable though. The dispatch queue might just be processing a bunch
  671. // of incoming data or something. We might want to keep track of whether
  672. // there are any unpersisted sets or something.
  673. FFLog(@"I-RDB038017",
  674. @"Entering background. Starting background task to finish work.");
  675. Class uiApplicationClass = NSClassFromString(@"UIApplication");
  676. assert(uiApplicationClass); // If we are here, we should be on iOS and
  677. // UIApplication should be available.
  678. UIApplication *application = [uiApplicationClass sharedApplication];
  679. __block UIBackgroundTaskIdentifier bgTask =
  680. [application beginBackgroundTaskWithExpirationHandler:^{
  681. [application endBackgroundTask:bgTask];
  682. }];
  683. NSDate *start = [NSDate date];
  684. dispatch_async([FIRDatabaseQuery sharedQueue], ^{
  685. NSTimeInterval finishTime = [start timeIntervalSinceNow] * -1;
  686. FFLog(@"I-RDB038018", @"Background task completed. Queue time: %f",
  687. finishTime);
  688. [application endBackgroundTask:bgTask];
  689. });
  690. #endif
  691. }
  692. #pragma mark -
  693. #pragma mark Internal methods
  694. /**
  695. * Applies all the changes stored up in the onDisconnect tree
  696. */
  697. - (void)runOnDisconnectEvents {
  698. FFLog(@"I-RDB038019", @"Running onDisconnectEvents");
  699. NSDictionary *serverValues =
  700. [FServerValues generateServerValues:self.serverClock];
  701. NSMutableArray *events = [[NSMutableArray alloc] init];
  702. [self.onDisconnect
  703. forEachTreeAtPath:[FPath empty]
  704. do:^(FPath *path, id<FNode> node) {
  705. id<FNode> existing = [self.serverSyncTree
  706. calcCompleteEventCacheAtPath:path
  707. excludeWriteIds:@[]];
  708. id<FNode> resolved = [FServerValues
  709. resolveDeferredValueSnapshot:node
  710. withExisting:existing
  711. serverValues:serverValues];
  712. [events addObjectsFromArray:
  713. [self.serverSyncTree
  714. applyServerOverwriteAtPath:path
  715. newData:resolved]];
  716. FPath *affectedPath =
  717. [self abortTransactionsAtPath:path
  718. error:kFTransactionSet];
  719. [self rerunTransactionsForPath:affectedPath];
  720. }];
  721. self.onDisconnect = [[FSparseSnapshotTree alloc] init];
  722. [self.eventRaiser raiseEvents:events];
  723. }
  724. - (NSDictionary *)dumpListens {
  725. return [self.connection dumpListens];
  726. }
  727. #pragma mark -
  728. #pragma mark Transactions
  729. /**
  730. * Setup the transaction data structures
  731. */
  732. - (void)initTransactions {
  733. self.transactionQueueTree = [[FTree alloc] init];
  734. self.hijackHash = NO;
  735. self.loggedTransactionPersistenceWarning = NO;
  736. }
  737. /**
  738. * Creates a new transaction, add its to the transactions we're tracking, and
  739. * sends it to the server if possible
  740. */
  741. - (void)startTransactionOnPath:(FPath *)path
  742. update:(fbt_transactionresult_mutabledata)update
  743. onComplete:(fbt_void_nserror_bool_datasnapshot)onComplete
  744. withLocalEvents:(BOOL)applyLocally {
  745. if (self.config.persistenceEnabled &&
  746. !self.loggedTransactionPersistenceWarning) {
  747. self.loggedTransactionPersistenceWarning = YES;
  748. FFInfo(@"I-RDB038020",
  749. @"runTransactionBlock: usage detected while persistence is "
  750. @"enabled. Please be aware that transactions "
  751. @"*will not* be persisted across app restarts. "
  752. @"See "
  753. @"https://www.firebase.com/docs/ios/guide/"
  754. @"offline-capabilities.html#section-handling-transactions-"
  755. @"offline for more details.");
  756. }
  757. FIRDatabaseReference *watchRef =
  758. [[FIRDatabaseReference alloc] initWithRepo:self path:path];
  759. // make sure we're listening on this node
  760. // Note: we can't do this asynchronously. To preserve event ordering, it has
  761. // to be done in this block. This is ok, this block is guaranteed to be our
  762. // own event loop
  763. NSUInteger handle = [[FUtilities LUIDGenerator] integerValue];
  764. fbt_void_datasnapshot cb = ^(FIRDataSnapshot *snapshot) {
  765. };
  766. FValueEventRegistration *registration =
  767. [[FValueEventRegistration alloc] initWithRepo:self
  768. handle:handle
  769. callback:cb
  770. cancelCallback:nil];
  771. [watchRef.repo addEventRegistration:registration
  772. forQuery:watchRef.querySpec];
  773. fbt_void_void unwatcher = ^{
  774. [watchRef removeObserverWithHandle:handle];
  775. };
  776. // Save all the data that represents this transaction
  777. FTupleTransaction *transaction = [[FTupleTransaction alloc] init];
  778. transaction.path = path;
  779. transaction.update = update;
  780. transaction.onComplete = onComplete;
  781. transaction.status = FTransactionInitializing;
  782. transaction.order = [FUtilities LUIDGenerator];
  783. transaction.applyLocally = applyLocally;
  784. transaction.retryCount = 0;
  785. transaction.unwatcher = unwatcher;
  786. transaction.currentWriteId = nil;
  787. transaction.currentInputSnapshot = nil;
  788. transaction.currentOutputSnapshotRaw = nil;
  789. transaction.currentOutputSnapshotResolved = nil;
  790. // Run transaction initially
  791. id<FNode> currentState = [self latestStateAtPath:path excludeWriteIds:nil];
  792. transaction.currentInputSnapshot = currentState;
  793. FIRMutableData *mutableCurrent =
  794. [[FIRMutableData alloc] initWithNode:currentState];
  795. FIRTransactionResult *result = transaction.update(mutableCurrent);
  796. if (!result.isSuccess) {
  797. // Abort the transaction
  798. transaction.unwatcher();
  799. transaction.currentOutputSnapshotRaw = nil;
  800. transaction.currentOutputSnapshotResolved = nil;
  801. if (transaction.onComplete) {
  802. FIRDatabaseReference *ref =
  803. [[FIRDatabaseReference alloc] initWithRepo:self
  804. path:transaction.path];
  805. FIndexedNode *indexedNode = [FIndexedNode
  806. indexedNodeWithNode:transaction.currentInputSnapshot];
  807. FIRDataSnapshot *snap =
  808. [[FIRDataSnapshot alloc] initWithRef:ref
  809. indexedNode:indexedNode];
  810. [self.eventRaiser raiseCallback:^{
  811. transaction.onComplete(nil, NO, snap);
  812. }];
  813. }
  814. } else {
  815. // Note: different from js. We don't need to validate, FIRMutableData
  816. // does validation. We also don't have to worry about priorities. Just
  817. // mark as run and add to queue.
  818. transaction.status = FTransactionRun;
  819. FTree *queueNode = [self.transactionQueueTree subTree:transaction.path];
  820. NSMutableArray *nodeQueue = [queueNode getValue];
  821. if (nodeQueue == nil) {
  822. nodeQueue = [[NSMutableArray alloc] init];
  823. }
  824. [nodeQueue addObject:transaction];
  825. [queueNode setValue:nodeQueue];
  826. // Update visibleData and raise events
  827. // Note: We intentionally raise events after updating all of our
  828. // transaction state, since the user could start new transactions from
  829. // the event callbacks
  830. NSDictionary *serverValues =
  831. [FServerValues generateServerValues:self.serverClock];
  832. id<FNode> newValUnresolved = [result.update nodeValue];
  833. id<FNode> newVal =
  834. [FServerValues resolveDeferredValueSnapshot:newValUnresolved
  835. withExisting:currentState
  836. serverValues:serverValues];
  837. transaction.currentOutputSnapshotRaw = newValUnresolved;
  838. transaction.currentOutputSnapshotResolved = newVal;
  839. transaction.currentWriteId =
  840. [NSNumber numberWithInteger:[self nextWriteId]];
  841. NSArray *events = [self.serverSyncTree
  842. applyUserOverwriteAtPath:path
  843. newData:newVal
  844. writeId:[transaction.currentWriteId integerValue]
  845. isVisible:transaction.applyLocally];
  846. [self.eventRaiser raiseEvents:events];
  847. [self sendAllReadyTransactions];
  848. }
  849. }
  850. /**
  851. * @param writeIdsToExclude A specific set to exclude
  852. */
  853. - (id<FNode>)latestStateAtPath:(FPath *)path
  854. excludeWriteIds:(NSArray *)writeIdsToExclude {
  855. id<FNode> latestState =
  856. [self.serverSyncTree calcCompleteEventCacheAtPath:path
  857. excludeWriteIds:writeIdsToExclude];
  858. return latestState ? latestState : [FEmptyNode emptyNode];
  859. }
  860. /**
  861. * Sends any already-run transactions that aren't waiting for outstanding
  862. * transactions to complete.
  863. *
  864. * Externally, call the version with no arguments.
  865. * Internally, calls itself recursively with a particular transactionQueueTree
  866. * node to recurse through the tree
  867. */
  868. - (void)sendAllReadyTransactions {
  869. FTree *node = self.transactionQueueTree;
  870. [self pruneCompletedTransactionsBelowNode:node];
  871. [self sendReadyTransactionsForTree:node];
  872. }
  873. - (void)sendReadyTransactionsForTree:(FTree *)node {
  874. NSMutableArray *queue = [node getValue];
  875. if (queue != nil) {
  876. queue = [self buildTransactionQueueAtNode:node];
  877. NSAssert([queue count] > 0, @"Sending zero length transaction queue");
  878. NSUInteger notRunIndex = [queue
  879. indexOfObjectPassingTest:^BOOL(id obj, NSUInteger idx, BOOL *stop) {
  880. return ((FTupleTransaction *)obj).status != FTransactionRun;
  881. }];
  882. // If they're all run (and not sent), we can send them. Else, we must
  883. // wait.
  884. if (notRunIndex == NSNotFound) {
  885. [self sendTransactionQueue:queue atPath:node.path];
  886. }
  887. } else if ([node hasChildren]) {
  888. [node forEachChild:^(FTree *child) {
  889. [self sendReadyTransactionsForTree:child];
  890. }];
  891. }
  892. }
  893. /**
  894. * Given a list of run transactions, send them to the server and then handle the
  895. * result (success or failure).
  896. */
  897. - (void)sendTransactionQueue:(NSMutableArray *)queue atPath:(FPath *)path {
  898. // Mark transactions as sent and bump the retry count
  899. NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init];
  900. for (FTupleTransaction *transaction in queue) {
  901. [writeIdsToExclude addObject:transaction.currentWriteId];
  902. }
  903. id<FNode> latestState = [self latestStateAtPath:path
  904. excludeWriteIds:writeIdsToExclude];
  905. id<FNode> snapToSend = latestState;
  906. NSString *latestHash = [latestState dataHash];
  907. for (FTupleTransaction *transaction in queue) {
  908. NSAssert(
  909. transaction.status == FTransactionRun,
  910. @"[FRepo sendTransactionQueue:] items in queue should all be run.");
  911. FFLog(@"I-RDB038021", @"Transaction at %@ set to SENT",
  912. transaction.path);
  913. transaction.status = FTransactionSent;
  914. transaction.retryCount++;
  915. FPath *relativePath = [FPath relativePathFrom:path to:transaction.path];
  916. // If we've gotten to this point, the output snapshot must be defined.
  917. snapToSend =
  918. [snapToSend updateChild:relativePath
  919. withNewChild:transaction.currentOutputSnapshotRaw];
  920. }
  921. id dataToSend = [snapToSend valForExport:YES];
  922. NSString *pathToSend = [path description];
  923. latestHash = self.hijackHash ? @"badhash" : latestHash;
  924. // Send the put
  925. [self.connection
  926. putData:dataToSend
  927. forPath:pathToSend
  928. withHash:latestHash
  929. withCallback:^(NSString *status, NSString *errorReason) {
  930. FFLog(@"I-RDB038022", @"Transaction put response: %@ : %@",
  931. pathToSend, status);
  932. NSMutableArray *events = [[NSMutableArray alloc] init];
  933. if ([status isEqualToString:kFWPResponseForActionStatusOk]) {
  934. // Queue up the callbacks and fire them after cleaning up all of
  935. // our transaction state, since the callback could trigger more
  936. // transactions or sets.
  937. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  938. for (FTupleTransaction *transaction in queue) {
  939. transaction.status = FTransactionCompleted;
  940. [events addObjectsFromArray:
  941. [self.serverSyncTree
  942. ackUserWriteWithWriteId:
  943. [transaction.currentWriteId integerValue]
  944. revert:NO
  945. persist:NO
  946. clock:self.serverClock]];
  947. if (transaction.onComplete) {
  948. // We never unset the output snapshot, and given that this
  949. // transaction is complete, it should be set
  950. id<FNode> node =
  951. transaction.currentOutputSnapshotResolved;
  952. FIndexedNode *indexedNode =
  953. [FIndexedNode indexedNodeWithNode:node];
  954. FIRDatabaseReference *ref = [[FIRDatabaseReference alloc]
  955. initWithRepo:self
  956. path:transaction.path];
  957. FIRDataSnapshot *snapshot =
  958. [[FIRDataSnapshot alloc] initWithRef:ref
  959. indexedNode:indexedNode];
  960. fbt_void_void cb = ^{
  961. transaction.onComplete(nil, YES, snapshot);
  962. };
  963. [callbacks addObject:[cb copy]];
  964. }
  965. transaction.unwatcher();
  966. }
  967. // Now remove the completed transactions.
  968. [self
  969. pruneCompletedTransactionsBelowNode:[self.transactionQueueTree
  970. subTree:path]];
  971. // There may be pending transactions that we can now send.
  972. [self sendAllReadyTransactions];
  973. // Finally, trigger onComplete callbacks
  974. [self.eventRaiser raiseCallbacks:callbacks];
  975. } else {
  976. // transactions are no longer sent. Update their status
  977. // appropriately.
  978. if ([status
  979. isEqualToString:kFWPResponseForActionStatusDataStale]) {
  980. for (FTupleTransaction *transaction in queue) {
  981. if (transaction.status == FTransactionSentNeedsAbort) {
  982. transaction.status = FTransactionNeedsAbort;
  983. } else {
  984. transaction.status = FTransactionRun;
  985. }
  986. }
  987. } else {
  988. FFWarn(@"I-RDB038023",
  989. @"runTransactionBlock: at %@ failed: %@", path,
  990. status);
  991. for (FTupleTransaction *transaction in queue) {
  992. transaction.status = FTransactionNeedsAbort;
  993. [transaction setAbortStatus:status reason:errorReason];
  994. }
  995. }
  996. }
  997. [self rerunTransactionsForPath:path];
  998. [self.eventRaiser raiseEvents:events];
  999. }];
  1000. }
  1001. /**
  1002. * Finds all transactions dependent on the data at changed Path and reruns them.
  1003. *
  1004. * Should be called any time cached data changes.
  1005. *
  1006. * Return the highest path that was affected by rerunning transactions. This is
  1007. * the path at which events need to be raised for.
  1008. */
  1009. - (FPath *)rerunTransactionsForPath:(FPath *)changedPath {
  1010. // For the common case that there are no transactions going on, skip all
  1011. // this!
  1012. if ([self.transactionQueueTree isEmpty]) {
  1013. return changedPath;
  1014. } else {
  1015. FTree *rootMostTransactionNode =
  1016. [self getAncestorTransactionNodeForPath:changedPath];
  1017. FPath *path = rootMostTransactionNode.path;
  1018. NSArray *queue =
  1019. [self buildTransactionQueueAtNode:rootMostTransactionNode];
  1020. [self rerunTransactionQueue:queue atPath:path];
  1021. return path;
  1022. }
  1023. }
  1024. /**
  1025. * Does all the work of rerunning transactions (as well as cleans up aborted
  1026. * transactions and whatnot).
  1027. */
  1028. - (void)rerunTransactionQueue:(NSArray *)queue atPath:(FPath *)path {
  1029. if (queue.count == 0) {
  1030. return; // nothing to do
  1031. }
  1032. // Queue up the callbacks and fire them after cleaning up all of our
  1033. // transaction state, since the callback could trigger more transactions or
  1034. // sets.
  1035. NSMutableArray *events = [[NSMutableArray alloc] init];
  1036. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  1037. // Ignore, by default, all of the sets in this queue, since we're re-running
  1038. // all of them. However, we want to include the results of new sets
  1039. // triggered as part of this re-run, so we don't want to ignore a range,
  1040. // just these specific sets.
  1041. NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init];
  1042. for (FTupleTransaction *transaction in queue) {
  1043. [writeIdsToExclude addObject:transaction.currentWriteId];
  1044. }
  1045. for (FTupleTransaction *transaction in queue) {
  1046. FPath *relativePath __unused =
  1047. [FPath relativePathFrom:path to:transaction.path];
  1048. BOOL abortTransaction = NO;
  1049. NSAssert(relativePath != nil, @"[FRepo rerunTransactionsQueue:] "
  1050. @"relativePath should not be null.");
  1051. if (transaction.status == FTransactionNeedsAbort) {
  1052. abortTransaction = YES;
  1053. if (![transaction.abortStatus
  1054. isEqualToString:kFErrorWriteCanceled]) {
  1055. NSArray *ackEvents = [self.serverSyncTree
  1056. ackUserWriteWithWriteId:[transaction.currentWriteId
  1057. integerValue]
  1058. revert:YES
  1059. persist:NO
  1060. clock:self.serverClock];
  1061. [events addObjectsFromArray:ackEvents];
  1062. }
  1063. } else if (transaction.status == FTransactionRun) {
  1064. if (transaction.retryCount >= kFTransactionMaxRetries) {
  1065. abortTransaction = YES;
  1066. [transaction setAbortStatus:kFTransactionTooManyRetries
  1067. reason:nil];
  1068. [events
  1069. addObjectsFromArray:
  1070. [self.serverSyncTree
  1071. ackUserWriteWithWriteId:[transaction.currentWriteId
  1072. integerValue]
  1073. revert:YES
  1074. persist:NO
  1075. clock:self.serverClock]];
  1076. } else {
  1077. // This code reruns a transaction
  1078. id<FNode> currentNode =
  1079. [self latestStateAtPath:transaction.path
  1080. excludeWriteIds:writeIdsToExclude];
  1081. transaction.currentInputSnapshot = currentNode;
  1082. FIRMutableData *mutableCurrent =
  1083. [[FIRMutableData alloc] initWithNode:currentNode];
  1084. FIRTransactionResult *result =
  1085. transaction.update(mutableCurrent);
  1086. if (result.isSuccess) {
  1087. NSNumber *oldWriteId = transaction.currentWriteId;
  1088. NSDictionary *serverValues =
  1089. [FServerValues generateServerValues:self.serverClock];
  1090. id<FNode> newVal = [result.update nodeValue];
  1091. id<FNode> newValResolved = [FServerValues
  1092. resolveDeferredValueSnapshot:newVal
  1093. withExisting:transaction
  1094. .currentInputSnapshot
  1095. serverValues:serverValues];
  1096. transaction.currentOutputSnapshotRaw = newVal;
  1097. transaction.currentOutputSnapshotResolved = newValResolved;
  1098. transaction.currentWriteId =
  1099. [NSNumber numberWithInteger:[self nextWriteId]];
  1100. // Mutates writeIdsToExclude in place
  1101. [writeIdsToExclude removeObject:oldWriteId];
  1102. [events
  1103. addObjectsFromArray:
  1104. [self.serverSyncTree
  1105. applyUserOverwriteAtPath:transaction.path
  1106. newData:
  1107. transaction
  1108. .currentOutputSnapshotResolved
  1109. writeId:
  1110. [transaction.currentWriteId
  1111. integerValue]
  1112. isVisible:transaction
  1113. .applyLocally]];
  1114. [events addObjectsFromArray:
  1115. [self.serverSyncTree
  1116. ackUserWriteWithWriteId:[oldWriteId
  1117. integerValue]
  1118. revert:YES
  1119. persist:NO
  1120. clock:self.serverClock]];
  1121. } else {
  1122. abortTransaction = YES;
  1123. // The user aborted the transaction. JS treats ths as a
  1124. // "nodata" abort, but it's not an error, so we don't send
  1125. // them an error.
  1126. [transaction setAbortStatus:nil reason:nil];
  1127. [events
  1128. addObjectsFromArray:
  1129. [self.serverSyncTree
  1130. ackUserWriteWithWriteId:
  1131. [transaction.currentWriteId integerValue]
  1132. revert:YES
  1133. persist:NO
  1134. clock:self.serverClock]];
  1135. }
  1136. }
  1137. }
  1138. [self.eventRaiser raiseEvents:events];
  1139. events = nil;
  1140. if (abortTransaction) {
  1141. // Abort
  1142. transaction.status = FTransactionCompleted;
  1143. transaction.unwatcher();
  1144. if (transaction.onComplete) {
  1145. FIRDatabaseReference *ref = [[FIRDatabaseReference alloc]
  1146. initWithRepo:self
  1147. path:transaction.path];
  1148. FIndexedNode *lastInput = [FIndexedNode
  1149. indexedNodeWithNode:transaction.currentInputSnapshot];
  1150. FIRDataSnapshot *snap =
  1151. [[FIRDataSnapshot alloc] initWithRef:ref
  1152. indexedNode:lastInput];
  1153. fbt_void_void cb = ^{
  1154. // Unlike JS, no need to check for "nodata" because ObjC has
  1155. // abortError = nil
  1156. transaction.onComplete(transaction.abortError, NO, snap);
  1157. };
  1158. [callbacks addObject:[cb copy]];
  1159. }
  1160. }
  1161. }
  1162. // Note: unlike current js client, we don't need to preserve priority. Users
  1163. // can set priority via FIRMutableData
  1164. // Clean up completed transactions.
  1165. [self pruneCompletedTransactionsBelowNode:self.transactionQueueTree];
  1166. // Now fire callbacks, now that we're in a good, known state.
  1167. [self.eventRaiser raiseCallbacks:callbacks];
  1168. // Try to send the transaction result to the server
  1169. [self sendAllReadyTransactions];
  1170. }
  1171. - (FTree *)getAncestorTransactionNodeForPath:(FPath *)path {
  1172. FTree *transactionNode = self.transactionQueueTree;
  1173. while (![path isEmpty] && [transactionNode getValue] == nil) {
  1174. NSString *front = [path getFront];
  1175. transactionNode =
  1176. [transactionNode subTree:[[FPath alloc] initWith:front]];
  1177. path = [path popFront];
  1178. }
  1179. return transactionNode;
  1180. }
  1181. - (NSMutableArray *)buildTransactionQueueAtNode:(FTree *)node {
  1182. NSMutableArray *queue = [[NSMutableArray alloc] init];
  1183. [self aggregateTransactionQueuesForNode:node andQueue:queue];
  1184. [queue sortUsingComparator:^NSComparisonResult(FTupleTransaction *obj1,
  1185. FTupleTransaction *obj2) {
  1186. return [obj1.order compare:obj2.order];
  1187. }];
  1188. return queue;
  1189. }
  1190. - (void)aggregateTransactionQueuesForNode:(FTree *)node
  1191. andQueue:(NSMutableArray *)queue {
  1192. NSArray *nodeQueue = [node getValue];
  1193. [queue addObjectsFromArray:nodeQueue];
  1194. [node forEachChild:^(FTree *child) {
  1195. [self aggregateTransactionQueuesForNode:child andQueue:queue];
  1196. }];
  1197. }
  1198. /**
  1199. * Remove COMPLETED transactions at or below this node in the
  1200. * transactionQueueTree
  1201. */
  1202. - (void)pruneCompletedTransactionsBelowNode:(FTree *)node {
  1203. NSMutableArray *queue = [node getValue];
  1204. if (queue != nil) {
  1205. int i = 0;
  1206. // remove all of the completed transactions from the queue
  1207. while (i < queue.count) {
  1208. FTupleTransaction *transaction = [queue objectAtIndex:i];
  1209. if (transaction.status == FTransactionCompleted) {
  1210. [queue removeObjectAtIndex:i];
  1211. } else {
  1212. i++;
  1213. }
  1214. }
  1215. if (queue.count > 0) {
  1216. [node setValue:queue];
  1217. } else {
  1218. [node setValue:nil];
  1219. }
  1220. }
  1221. [node forEachChildMutationSafe:^(FTree *child) {
  1222. [self pruneCompletedTransactionsBelowNode:child];
  1223. }];
  1224. }
  1225. /**
  1226. * Aborts all transactions on ancestors or descendants of the specified path.
  1227. * Called when doing a setValue: or updateChildValues: since we consider them
  1228. * incompatible with transactions
  1229. *
  1230. * @param path path for which we want to abort related transactions.
  1231. */
  1232. - (FPath *)abortTransactionsAtPath:(FPath *)path error:(NSString *)error {
  1233. // For the common case that there are no transactions going on, skip all
  1234. // this!
  1235. if ([self.transactionQueueTree isEmpty]) {
  1236. return path;
  1237. } else {
  1238. FPath *affectedPath =
  1239. [self getAncestorTransactionNodeForPath:path].path;
  1240. FTree *transactionNode = [self.transactionQueueTree subTree:path];
  1241. [transactionNode forEachAncestor:^BOOL(FTree *ancestor) {
  1242. [self abortTransactionsAtNode:ancestor error:error];
  1243. return NO;
  1244. }];
  1245. [self abortTransactionsAtNode:transactionNode error:error];
  1246. [transactionNode forEachDescendant:^(FTree *child) {
  1247. [self abortTransactionsAtNode:child error:error];
  1248. }];
  1249. return affectedPath;
  1250. }
  1251. }
  1252. /**
  1253. * Abort transactions stored in this transactions queue node.
  1254. *
  1255. * @param node Node to abort transactions for.
  1256. */
  1257. - (void)abortTransactionsAtNode:(FTree *)node error:(NSString *)error {
  1258. NSMutableArray *queue = [node getValue];
  1259. if (queue != nil) {
  1260. // Queue up the callbacks and fire them after cleaning up all of our
  1261. // transaction state, since can be immediately aborted and removed.
  1262. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  1263. // Go through queue. Any already-sent transactions must be marked for
  1264. // abort, while the unsent ones can be immediately aborted and removed
  1265. NSMutableArray *events = [[NSMutableArray alloc] init];
  1266. int lastSent = -1;
  1267. // Note: all of the sent transactions will be at the front of the queue,
  1268. // so safe to increment lastSent
  1269. for (FTupleTransaction *transaction in queue) {
  1270. if (transaction.status == FTransactionSentNeedsAbort) {
  1271. // No-op. already marked.
  1272. } else if (transaction.status == FTransactionSent) {
  1273. // Mark this transaction for abort when it returns
  1274. lastSent++;
  1275. transaction.status = FTransactionSentNeedsAbort;
  1276. [transaction setAbortStatus:error reason:nil];
  1277. } else {
  1278. // we can abort this immediately
  1279. transaction.unwatcher();
  1280. if ([error isEqualToString:kFTransactionSet]) {
  1281. [events
  1282. addObjectsFromArray:
  1283. [self.serverSyncTree
  1284. ackUserWriteWithWriteId:
  1285. [transaction.currentWriteId integerValue]
  1286. revert:YES
  1287. persist:NO
  1288. clock:self.serverClock]];
  1289. } else {
  1290. // If it was cancelled it was already removed from the sync
  1291. // tree, no need to ack
  1292. NSAssert([error isEqualToString:kFErrorWriteCanceled], nil);
  1293. }
  1294. if (transaction.onComplete) {
  1295. NSError *abortReason = [FUtilities errorForStatus:error
  1296. andReason:nil];
  1297. FIRDataSnapshot *snapshot = nil;
  1298. fbt_void_void cb = ^{
  1299. transaction.onComplete(abortReason, NO, snapshot);
  1300. };
  1301. [callbacks addObject:[cb copy]];
  1302. }
  1303. }
  1304. }
  1305. if (lastSent == -1) {
  1306. // We're not waiting for any sent transactions. We can clear the
  1307. // queue.
  1308. [node setValue:nil];
  1309. } else {
  1310. // Remove the transactions we aborted
  1311. NSRange theRange;
  1312. theRange.location = lastSent + 1;
  1313. theRange.length = queue.count - theRange.location;
  1314. [queue removeObjectsInRange:theRange];
  1315. }
  1316. // Now fire the callbacks
  1317. [self.eventRaiser raiseEvents:events];
  1318. [self.eventRaiser raiseCallbacks:callbacks];
  1319. }
  1320. }
  1321. @end