FRepo.m 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118
  1. /*
  2. * Copyright 2017 Google
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #import <Foundation/Foundation.h>
  17. #import <dlfcn.h>
  18. #import "FRepo.h"
  19. #import "FSnapshotUtilities.h"
  20. #import "FConstants.h"
  21. #import "FIRDatabaseQuery_Private.h"
  22. #import "FQuerySpec.h"
  23. #import "FTupleNodePath.h"
  24. #import "FRepo_Private.h"
  25. #import "FRepoManager.h"
  26. #import "FServerValues.h"
  27. #import "FTupleSetIdPath.h"
  28. #import "FSyncTree.h"
  29. #import "FEventRegistration.h"
  30. #import "FAtomicNumber.h"
  31. #import "FSyncTree.h"
  32. #import "FListenProvider.h"
  33. #import "FEventRaiser.h"
  34. #import "FSnapshotHolder.h"
  35. #import "FIRDatabaseConfig_Private.h"
  36. #import "FLevelDBStorageEngine.h"
  37. #import "FPersistenceManager.h"
  38. #import "FWriteRecord.h"
  39. #import "FCachePolicy.h"
  40. #import "FClock.h"
  41. #import "FIRDatabase_Private.h"
  42. #import "FTree.h"
  43. #import "FTupleTransaction.h"
  44. #import "FIRTransactionResult.h"
  45. #import "FIRTransactionResult_Private.h"
  46. #import "FIRMutableData.h"
  47. #import "FIRMutableData_Private.h"
  48. #import "FIRDataSnapshot.h"
  49. #import "FIRDataSnapshot_Private.h"
  50. #import "FValueEventRegistration.h"
  51. #import "FEmptyNode.h"
  52. #if TARGET_OS_IOS
  53. #import <UIKit/UIKit.h>
  54. #endif
  55. @interface FRepo()
  56. @property (nonatomic, strong) FOffsetClock *serverClock;
  57. @property (nonatomic, strong) FPersistenceManager* persistenceManager;
  58. @property (nonatomic, strong) FIRDatabase *database;
  59. @property (nonatomic, strong, readwrite) FAuthenticationManager *auth;
  60. @property (nonatomic, strong) FSyncTree *infoSyncTree;
  61. @property (nonatomic) NSInteger writeIdCounter;
  62. @property (nonatomic) BOOL hijackHash;
  63. @property (nonatomic, strong) FTree *transactionQueueTree;
  64. @property (nonatomic) BOOL loggedTransactionPersistenceWarning;
  65. /**
  66. * Test only. For load testing the server.
  67. */
  68. @property (nonatomic, strong) id (^interceptServerDataCallback)(NSString *pathString, id data);
  69. @end
  70. @implementation FRepo
  71. - (id)initWithRepoInfo:(FRepoInfo*)info config:(FIRDatabaseConfig *)config database:(FIRDatabase *)database {
  72. self = [super init];
  73. if (self) {
  74. self.repoInfo = info;
  75. self.config = config;
  76. self.database = database;
  77. // Access can occur outside of shared queue, so the clock needs to be initialized here
  78. self.serverClock = [[FOffsetClock alloc] initWithClock:[FSystemClock clock] offset:0];
  79. self.connection = [[FPersistentConnection alloc] initWithRepoInfo:self.repoInfo dispatchQueue:[FIRDatabaseQuery sharedQueue] config:self.config];
  80. // Needs to be called before authentication manager is instantiated
  81. self.eventRaiser = [[FEventRaiser alloc] initWithQueue:self.config.callbackQueue];
  82. dispatch_async([FIRDatabaseQuery sharedQueue], ^{
  83. [self deferredInit];
  84. });
  85. }
  86. return self;
  87. }
  88. - (void)deferredInit {
  89. // TODO: cleanup on dealloc
  90. __weak FRepo *weakSelf = self;
  91. [self.config.authTokenProvider listenForTokenChanges:^(NSString *token) {
  92. [weakSelf.connection refreshAuthToken:token];
  93. }];
  94. // Open connection now so that by the time we are connected the deferred init has run
  95. // This relies on the fact that all callbacks run on repos queue
  96. self.connection.delegate = self;
  97. [self.connection open];
  98. self.dataUpdateCount = 0;
  99. self.rangeMergeUpdateCount = 0;
  100. self.interceptServerDataCallback = nil;
  101. if (self.config.persistenceEnabled) {
  102. NSString* repoHashString = [NSString stringWithFormat:@"%@_%@", self.repoInfo.host, self.repoInfo.namespace];
  103. NSString* persistencePrefix = [NSString stringWithFormat:@"%@/%@", self.config.sessionIdentifier, repoHashString];
  104. id<FCachePolicy> cachePolicy = [[FLRUCachePolicy alloc] initWithMaxSize:self.config.persistenceCacheSizeBytes];
  105. id<FStorageEngine> engine;
  106. if (self.config.forceStorageEngine != nil) {
  107. engine = self.config.forceStorageEngine;
  108. } else {
  109. FLevelDBStorageEngine *levelDBEngine = [[FLevelDBStorageEngine alloc] initWithPath:persistencePrefix];
  110. // We need the repo info to run the legacy migration. Future migrations will be managed by the database itself
  111. // Remove this once we are confident that no-one is using legacy migration anymore...
  112. [levelDBEngine runLegacyMigration:self.repoInfo];
  113. engine = levelDBEngine;
  114. }
  115. self.persistenceManager = [[FPersistenceManager alloc] initWithStorageEngine:engine cachePolicy:cachePolicy];
  116. } else {
  117. self.persistenceManager = nil;
  118. }
  119. [self initTransactions];
  120. // A list of data pieces and paths to be set when this client disconnects
  121. self.onDisconnect = [[FSparseSnapshotTree alloc] init];
  122. self.infoData = [[FSnapshotHolder alloc] init];
  123. FListenProvider *infoListenProvider = [[FListenProvider alloc] init];
  124. infoListenProvider.startListening = ^(FQuerySpec *query,
  125. NSNumber *tagId,
  126. id<FSyncTreeHash> hash,
  127. fbt_nsarray_nsstring onComplete) {
  128. NSArray *infoEvents = @[];
  129. FRepo *strongSelf = weakSelf;
  130. id<FNode> node = [strongSelf.infoData getNode:query.path];
  131. // This is possibly a hack, but we have different semantics for .info endpoints. We don't raise null events
  132. // on initial data...
  133. if (![node isEmpty]) {
  134. infoEvents = [strongSelf.infoSyncTree applyServerOverwriteAtPath:query.path newData:node];
  135. [strongSelf.eventRaiser raiseCallback:^{
  136. onComplete(kFWPResponseForActionStatusOk);
  137. }];
  138. }
  139. return infoEvents;
  140. };
  141. infoListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tagId) {};
  142. self.infoSyncTree = [[FSyncTree alloc] initWithListenProvider:infoListenProvider];
  143. FListenProvider *serverListenProvider = [[FListenProvider alloc] init];
  144. serverListenProvider.startListening = ^(FQuerySpec *query,
  145. NSNumber *tagId,
  146. id<FSyncTreeHash> hash,
  147. fbt_nsarray_nsstring onComplete) {
  148. [weakSelf.connection listen:query tagId:tagId hash:hash onComplete:^(NSString *status) {
  149. NSArray *events = onComplete(status);
  150. [weakSelf.eventRaiser raiseEvents:events];
  151. }];
  152. // No synchronous events for network-backed sync trees
  153. return @[];
  154. };
  155. serverListenProvider.stopListening = ^(FQuerySpec *query, NSNumber *tag) {
  156. [weakSelf.connection unlisten:query tagId:tag];
  157. };
  158. self.serverSyncTree = [[FSyncTree alloc] initWithPersistenceManager:self.persistenceManager
  159. listenProvider:serverListenProvider];
  160. [self restoreWrites];
  161. [self updateInfo:kDotInfoConnected withValue:@NO];
  162. [self setupNotifications];
  163. }
  164. - (void) restoreWrites {
  165. NSArray *writes = self.persistenceManager.userWrites;
  166. NSDictionary *serverValues = [FServerValues generateServerValues:self.serverClock];
  167. __block NSInteger lastWriteId = NSIntegerMin;
  168. [writes enumerateObjectsUsingBlock:^(FWriteRecord *write, NSUInteger idx, BOOL *stop) {
  169. NSInteger writeId = write.writeId;
  170. fbt_void_nsstring_nsstring callback = ^(NSString *status, NSString *errorReason) {
  171. [self warnIfWriteFailedAtPath:write.path status:status message:@"Persisted write"];
  172. [self ackWrite:writeId rerunTransactionsAtPath:write.path status:status];
  173. };
  174. if (lastWriteId >= writeId) {
  175. [NSException raise:NSInternalInconsistencyException format:@"Restored writes were not in order!"];
  176. }
  177. lastWriteId = writeId;
  178. self.writeIdCounter = writeId + 1;
  179. if ([write isOverwrite]) {
  180. FFLog(@"I-RDB038001", @"Restoring overwrite with id %ld", (long)write.writeId);
  181. [self.connection putData:[write.overwrite valForExport:YES]
  182. forPath:[write.path toString]
  183. withHash:nil
  184. withCallback:callback];
  185. id<FNode> resolved = [FServerValues resolveDeferredValueSnapshot:write.overwrite withServerValues:serverValues];
  186. [self.serverSyncTree applyUserOverwriteAtPath:write.path newData:resolved writeId:writeId isVisible:YES];
  187. } else {
  188. FFLog(@"I-RDB038002", @"Restoring merge with id %ld", (long)write.writeId);
  189. [self.connection mergeData:[write.merge valForExport:YES]
  190. forPath:[write.path toString]
  191. withCallback:callback];
  192. FCompoundWrite *resolved = [FServerValues resolveDeferredValueCompoundWrite:write.merge withServerValues:serverValues];
  193. [self.serverSyncTree applyUserMergeAtPath:write.path changedChildren:resolved writeId:writeId];
  194. }
  195. }];
  196. }
  197. - (NSString*)name {
  198. return self.repoInfo.namespace;
  199. }
  200. - (NSString *) description {
  201. return [self.repoInfo description];
  202. }
  203. - (void) interrupt {
  204. [self.connection interruptForReason:kFInterruptReasonRepoInterrupt];
  205. }
  206. - (void) resume {
  207. [self.connection resumeForReason:kFInterruptReasonRepoInterrupt];
  208. }
  209. // NOTE: Typically if you're calling this, you should be in an @autoreleasepool block to make sure that ARC kicks
  210. // in and cleans up things no longer referenced (i.e. pendingPutsDB).
  211. - (void) dispose {
  212. [self.connection interruptForReason:kFInterruptReasonRepoInterrupt];
  213. // We need to nil out any references to LevelDB, to make sure the
  214. // LevelDB exclusive locks are released.
  215. [self.persistenceManager close];
  216. }
  217. - (NSInteger) nextWriteId {
  218. return self->_writeIdCounter++;
  219. }
  220. - (NSTimeInterval) serverTime {
  221. return [self.serverClock currentTime];
  222. }
  223. - (void) set:(FPath *)path withNode:(id<FNode>)node withCallback:(fbt_void_nserror_ref)onComplete {
  224. id value = [node valForExport:YES];
  225. FFLog(@"I-RDB038003", @"Setting: %@ with %@ pri: %@", [path toString], [value description], [[node getPriority] val]);
  226. // TODO: Optimize this behavior to either (a) store flag to skip resolving where possible and / or
  227. // (b) store unresolved paths on JSON parse
  228. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  229. id<FNode> newNode = [FServerValues resolveDeferredValueSnapshot:node withServerValues:serverValues];
  230. NSInteger writeId = [self nextWriteId];
  231. [self.persistenceManager saveUserOverwrite:node atPath:path writeId:writeId];
  232. NSArray *events = [self.serverSyncTree applyUserOverwriteAtPath:path newData:newNode writeId:writeId isVisible:YES];
  233. [self.eventRaiser raiseEvents:events];
  234. [self.connection putData:value forPath:[path toString] withHash:nil withCallback:^(NSString *status, NSString *errorReason) {
  235. [self warnIfWriteFailedAtPath:path status:status message:@"setValue: or removeValue:"];
  236. [self ackWrite:writeId rerunTransactionsAtPath:path status:status];
  237. [self callOnComplete:onComplete withStatus:status errorReason:errorReason andPath:path];
  238. }];
  239. FPath* affectedPath = [self abortTransactionsAtPath:path error:kFTransactionSet];
  240. [self rerunTransactionsForPath:affectedPath];
  241. }
  242. - (void) update:(FPath *)path withNodes:(FCompoundWrite *)nodes withCallback:(fbt_void_nserror_ref)callback {
  243. NSDictionary *values = [nodes valForExport:YES];
  244. FFLog(@"I-RDB038004", @"Updating: %@ with %@", [path toString], [values description]);
  245. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  246. FCompoundWrite *resolved = [FServerValues resolveDeferredValueCompoundWrite:nodes withServerValues:serverValues];
  247. if (!resolved.isEmpty) {
  248. NSInteger writeId = [self nextWriteId];
  249. [self.persistenceManager saveUserMerge:nodes atPath:path writeId:writeId];
  250. NSArray *events = [self.serverSyncTree applyUserMergeAtPath:path changedChildren:resolved writeId:writeId];
  251. [self.eventRaiser raiseEvents:events];
  252. [self.connection mergeData:values forPath:[path description] withCallback:^(NSString *status, NSString *errorReason) {
  253. [self warnIfWriteFailedAtPath:path status:status message:@"updateChildValues:"];
  254. [self ackWrite:writeId rerunTransactionsAtPath:path status:status];
  255. [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path];
  256. }];
  257. [nodes enumerateWrites:^(FPath *childPath, id<FNode> node, BOOL *stop) {
  258. FPath* pathFromRoot = [path child:childPath];
  259. FFLog(@"I-RDB038005", @"Cancelling transactions at path: %@", pathFromRoot);
  260. FPath *affectedPath = [self abortTransactionsAtPath:pathFromRoot error:kFTransactionSet];
  261. [self rerunTransactionsForPath:affectedPath];
  262. }];
  263. } else {
  264. FFLog(@"I-RDB038006", @"update called with empty data. Doing nothing");
  265. // Do nothing, just call the callback
  266. [self callOnComplete:callback withStatus:@"ok" errorReason:nil andPath:path];
  267. }
  268. }
  269. - (void) onDisconnectCancel:(FPath *)path withCallback:(fbt_void_nserror_ref)callback {
  270. [self.connection onDisconnectCancelPath:path withCallback:^(NSString *status, NSString *errorReason) {
  271. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  272. if (success) {
  273. [self.onDisconnect forgetPath:path];
  274. } else {
  275. FFLog(@"I-RDB038007", @"cancelDisconnectOperations: at %@ failed: %@", path, status);
  276. }
  277. [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path];
  278. }];
  279. }
  280. - (void) onDisconnectSet:(FPath *)path withNode:(id<FNode>)node withCallback:(fbt_void_nserror_ref)callback {
  281. [self.connection onDisconnectPutData:[node valForExport:YES] forPath:path withCallback:^(NSString *status, NSString *errorReason) {
  282. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  283. if (success) {
  284. [self.onDisconnect rememberData:node onPath:path];
  285. } else {
  286. FFWarn(@"I-RDB038008", @"onDisconnectSetValue: or onDisconnectRemoveValue: at %@ failed: %@", path, status);
  287. }
  288. [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path];
  289. }];
  290. }
  291. - (void) onDisconnectUpdate:(FPath *)path withNodes:(FCompoundWrite *)nodes withCallback:(fbt_void_nserror_ref)callback {
  292. if (!nodes.isEmpty) {
  293. NSDictionary *values = [nodes valForExport:YES];
  294. [self.connection onDisconnectMergeData:values forPath:path withCallback:^(NSString *status, NSString *errorReason) {
  295. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  296. if (success) {
  297. [nodes enumerateWrites:^(FPath *relativePath, id<FNode> nodeUnresolved, BOOL *stop) {
  298. FPath* childPath = [path child:relativePath];
  299. [self.onDisconnect rememberData:nodeUnresolved onPath:childPath];
  300. }];
  301. } else {
  302. FFWarn(@"I-RDB038009", @"onDisconnectUpdateChildValues: at %@ failed %@", path, status);
  303. }
  304. [self callOnComplete:callback withStatus:status errorReason:errorReason andPath:path];
  305. }];
  306. } else {
  307. // Do nothing, just call the callback
  308. [self callOnComplete:callback withStatus:@"ok" errorReason:nil andPath:path];
  309. }
  310. }
  311. - (void) purgeOutstandingWrites {
  312. FFLog(@"I-RDB038010", @"Purging outstanding writes");
  313. NSArray *events = [self.serverSyncTree removeAllWrites];
  314. [self.eventRaiser raiseEvents:events];
  315. // Abort any transactions
  316. [self abortTransactionsAtPath:[FPath empty] error:kFErrorWriteCanceled];
  317. // Remove outstanding writes from connection
  318. [self.connection purgeOutstandingWrites];
  319. }
  320. - (void) addEventRegistration:(id <FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query {
  321. NSArray *events = nil;
  322. if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) {
  323. events = [self.infoSyncTree addEventRegistration:eventRegistration forQuery:query];
  324. } else {
  325. events = [self.serverSyncTree addEventRegistration:eventRegistration forQuery:query];
  326. }
  327. [self.eventRaiser raiseEvents:events];
  328. }
  329. - (void) removeEventRegistration:(id<FEventRegistration>)eventRegistration forQuery:(FQuerySpec *)query {
  330. // These are guaranteed not to raise events, since we're not passing in a cancelError. However we can future-proof
  331. // a little bit by handling the return values anyways.
  332. FFLog(@"I-RDB038011", @"Removing event registration with hande: %lu", (unsigned long)eventRegistration.handle);
  333. NSArray *events = nil;
  334. if ([[query.path getFront] isEqualToString:kDotInfoPrefix]) {
  335. events = [self.infoSyncTree removeEventRegistration:eventRegistration forQuery:query cancelError:nil];
  336. } else {
  337. events = [self.serverSyncTree removeEventRegistration:eventRegistration forQuery:query cancelError:nil];
  338. }
  339. [self.eventRaiser raiseEvents:events];
  340. }
  341. - (void) keepQuery:(FQuerySpec *)query synced:(BOOL)synced {
  342. NSAssert(![[query.path getFront] isEqualToString:kDotInfoPrefix], @"Can't keep .info tree synced!");
  343. [self.serverSyncTree keepQuery:query synced:synced];
  344. }
  345. - (void) updateInfo:(NSString *) pathString withValue:(id)value {
  346. // hack to make serverTimeOffset available in a threadsafe way. Property is marked as atomic
  347. if ([pathString isEqualToString:kDotInfoServerTimeOffset]) {
  348. NSTimeInterval offset = [(NSNumber *)value doubleValue]/1000.0;
  349. self.serverClock = [[FOffsetClock alloc] initWithClock:[FSystemClock clock] offset:offset];
  350. }
  351. FPath* path = [[FPath alloc] initWith:[NSString stringWithFormat:@"%@/%@", kDotInfoPrefix, pathString]];
  352. id<FNode> newNode = [FSnapshotUtilities nodeFrom:value];
  353. [self.infoData updateSnapshot:path withNewSnapshot:newNode];
  354. NSArray *events = [self.infoSyncTree applyServerOverwriteAtPath:path newData:newNode];
  355. [self.eventRaiser raiseEvents:events];
  356. }
  357. - (void) callOnComplete:(fbt_void_nserror_ref)onComplete withStatus:(NSString *)status errorReason:(NSString *)errorReason andPath:(FPath *)path {
  358. if (onComplete) {
  359. FIRDatabaseReference * ref = [[FIRDatabaseReference alloc] initWithRepo:self path:path];
  360. BOOL statusOk = [status isEqualToString:kFWPResponseForActionStatusOk];
  361. NSError* err = nil;
  362. if (!statusOk) {
  363. err = [FUtilities errorForStatus:status andReason:errorReason];
  364. }
  365. [self.eventRaiser raiseCallback:^{
  366. onComplete(err, ref);
  367. }];
  368. }
  369. }
  370. - (void)ackWrite:(NSInteger)writeId rerunTransactionsAtPath:(FPath *)path status:(NSString *)status {
  371. if ([status isEqualToString:kFErrorWriteCanceled]) {
  372. // This write was already removed, we just need to ignore it...
  373. } else {
  374. BOOL success = [status isEqualToString:kFWPResponseForActionStatusOk];
  375. NSArray *clearEvents = [self.serverSyncTree ackUserWriteWithWriteId:writeId revert:!success persist:YES clock:self.serverClock];
  376. if ([clearEvents count] > 0) {
  377. [self rerunTransactionsForPath:path];
  378. }
  379. [self.eventRaiser raiseEvents:clearEvents];
  380. }
  381. }
  382. - (void) warnIfWriteFailedAtPath:(FPath *)path status:(NSString *)status message:(NSString *)message {
  383. if (!([status isEqualToString:kFWPResponseForActionStatusOk] || [status isEqualToString:kFErrorWriteCanceled])) {
  384. FFWarn(@"I-RDB038012", @"%@ at %@ failed: %@", message, path, status);
  385. }
  386. }
  387. #pragma mark -
  388. #pragma mark FPersistentConnectionDelegate methods
  389. - (void) onDataUpdate:(FPersistentConnection *)fpconnection forPath:(NSString *)pathString message:(id)data isMerge:(BOOL)isMerge tagId:(NSNumber *)tagId {
  390. FFLog(@"I-RDB038013", @"onDataUpdateForPath: %@ withMessage: %@", pathString, data);
  391. // For testing.
  392. self.dataUpdateCount++;
  393. FPath* path = [[FPath alloc] initWith:pathString];
  394. data = self.interceptServerDataCallback ? self.interceptServerDataCallback(pathString, data) : data;
  395. NSArray *events = nil;
  396. if (tagId != nil) {
  397. if (isMerge) {
  398. NSDictionary *message = data;
  399. FCompoundWrite *taggedChildren = [FCompoundWrite compoundWriteWithValueDictionary:message];
  400. events = [self.serverSyncTree applyTaggedQueryMergeAtPath:path changedChildren:taggedChildren tagId:tagId];
  401. } else {
  402. id<FNode> taggedSnap = [FSnapshotUtilities nodeFrom:data];
  403. events = [self.serverSyncTree applyTaggedQueryOverwriteAtPath:path newData:taggedSnap tagId:tagId];
  404. }
  405. } else if (isMerge) {
  406. NSDictionary *message = data;
  407. FCompoundWrite *changedChildren = [FCompoundWrite compoundWriteWithValueDictionary:message];
  408. events = [self.serverSyncTree applyServerMergeAtPath:path changedChildren:changedChildren];
  409. } else {
  410. id<FNode> snap = [FSnapshotUtilities nodeFrom:data];
  411. events = [self.serverSyncTree applyServerOverwriteAtPath:path newData:snap];
  412. }
  413. if ([events count] > 0) {
  414. // Since we have a listener outstanding for each transaction, receiving any events
  415. // is a proxy for some change having occurred.
  416. [self rerunTransactionsForPath:path];
  417. }
  418. [self.eventRaiser raiseEvents:events];
  419. }
  420. - (void)onRangeMerge:(NSArray *)ranges forPath:(NSString *)pathString tagId:(NSNumber *)tag {
  421. FFLog(@"I-RDB038014", @"onRangeMerge: %@ => %@", pathString, ranges);
  422. // For testing
  423. self.rangeMergeUpdateCount++;
  424. FPath* path = [[FPath alloc] initWith:pathString];
  425. NSArray *events;
  426. if (tag != nil) {
  427. events = [self.serverSyncTree applyTaggedServerRangeMergeAtPath:path updates:ranges tagId:tag];
  428. } else {
  429. events = [self.serverSyncTree applyServerRangeMergeAtPath:path updates:ranges];
  430. }
  431. if (events.count > 0) {
  432. // Since we have a listener outstanding for each transaction, receiving any events
  433. // is a proxy for some change having occurred.
  434. [self rerunTransactionsForPath:path];
  435. }
  436. [self.eventRaiser raiseEvents:events];
  437. }
  438. - (void)onConnect:(FPersistentConnection *)fpconnection {
  439. [self updateInfo:kDotInfoConnected withValue:@true];
  440. }
  441. - (void)onDisconnect:(FPersistentConnection *)fpconnection {
  442. [self updateInfo:kDotInfoConnected withValue:@false];
  443. [self runOnDisconnectEvents];
  444. }
  445. - (void)onServerInfoUpdate:(FPersistentConnection *)fpconnection updates:(NSDictionary *)updates {
  446. for (NSString* key in updates) {
  447. id val = [updates objectForKey:key];
  448. [self updateInfo:key withValue:val];
  449. }
  450. }
  451. - (void) setupNotifications {
  452. NSString * const *backgroundConstant = (NSString * const *) dlsym(RTLD_DEFAULT, "UIApplicationDidEnterBackgroundNotification");
  453. if (backgroundConstant) {
  454. FFLog(@"I-RDB038015", @"Registering for background notification.");
  455. [[NSNotificationCenter defaultCenter] addObserver:self
  456. selector:@selector(didEnterBackground)
  457. name:*backgroundConstant
  458. object:nil];
  459. } else {
  460. FFLog(@"I-RDB038016", @"Skipped registering for background notification.");
  461. }
  462. }
  463. - (void) didEnterBackground {
  464. if (!self.config.persistenceEnabled)
  465. return;
  466. // Targetted compilation is ONLY for testing. UIKit is weak-linked in actual release build.
  467. #if TARGET_OS_IOS
  468. // The idea is to wait until any outstanding sets get written to disk. Since the sets might still be in our
  469. // dispatch queue, we wait for the dispatch queue to catch up and for persistence to catch up.
  470. // This may be undesirable though. The dispatch queue might just be processing a bunch of incoming data or
  471. // something. We might want to keep track of whether there are any unpersisted sets or something.
  472. FFLog(@"I-RDB038017", @"Entering background. Starting background task to finish work.");
  473. Class uiApplicationClass = NSClassFromString(@"UIApplication");
  474. assert(uiApplicationClass); // If we are here, we should be on iOS and UIApplication should be available.
  475. UIApplication *application = [uiApplicationClass sharedApplication];
  476. __block UIBackgroundTaskIdentifier bgTask = [application beginBackgroundTaskWithExpirationHandler:^{
  477. [application endBackgroundTask:bgTask];
  478. }];
  479. NSDate *start = [NSDate date];
  480. dispatch_async([FIRDatabaseQuery sharedQueue], ^{
  481. NSTimeInterval finishTime = [start timeIntervalSinceNow]*-1;
  482. FFLog(@"I-RDB038018", @"Background task completed. Queue time: %f", finishTime);
  483. [application endBackgroundTask:bgTask];
  484. });
  485. #endif
  486. }
  487. #pragma mark -
  488. #pragma mark Internal methods
  489. /**
  490. * Applies all the changes stored up in the onDisconnect tree
  491. */
  492. - (void) runOnDisconnectEvents {
  493. FFLog(@"I-RDB038019", @"Running onDisconnectEvents");
  494. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  495. FSparseSnapshotTree* resolvedTree = [FServerValues resolveDeferredValueTree:self.onDisconnect withServerValues:serverValues];
  496. NSMutableArray *events = [[NSMutableArray alloc] init];
  497. [resolvedTree forEachTreeAtPath:[FPath empty] do:^(FPath *path, id<FNode> node) {
  498. [events addObjectsFromArray:[self.serverSyncTree applyServerOverwriteAtPath:path newData:node]];
  499. FPath* affectedPath = [self abortTransactionsAtPath:path error:kFTransactionSet];
  500. [self rerunTransactionsForPath:affectedPath];
  501. }];
  502. self.onDisconnect = [[FSparseSnapshotTree alloc] init];
  503. [self.eventRaiser raiseEvents:events];
  504. }
  505. - (NSDictionary *) dumpListens {
  506. return [self.connection dumpListens];
  507. }
  508. #pragma mark -
  509. #pragma mark Transactions
  510. /**
  511. * Setup the transaction data structures
  512. */
  513. - (void) initTransactions {
  514. self.transactionQueueTree = [[FTree alloc] init];
  515. self.hijackHash = NO;
  516. self.loggedTransactionPersistenceWarning = NO;
  517. }
  518. /**
  519. * Creates a new transaction, add its to the transactions we're tracking, and sends it to the server if possible
  520. */
  521. - (void) startTransactionOnPath:(FPath *)path update:(fbt_transactionresult_mutabledata)update onComplete:(fbt_void_nserror_bool_datasnapshot)onComplete withLocalEvents:(BOOL)applyLocally {
  522. if (self.config.persistenceEnabled && !self.loggedTransactionPersistenceWarning) {
  523. self.loggedTransactionPersistenceWarning = YES;
  524. FFInfo(@"I-RDB038020", @"runTransactionBlock: usage detected while persistence is enabled. Please be aware that transactions "
  525. @"*will not* be persisted across app restarts. "
  526. @"See https://www.firebase.com/docs/ios/guide/offline-capabilities.html#section-handling-transactions-offline for more details.");
  527. }
  528. FIRDatabaseReference * watchRef = [[FIRDatabaseReference alloc] initWithRepo:self path:path];
  529. // make sure we're listening on this node
  530. // Note: we can't do this asynchronously. To preserve event ordering, it has to be done in this block.
  531. // This is ok, this block is guaranteed to be our own event loop
  532. NSUInteger handle = [[FUtilities LUIDGenerator] integerValue];
  533. fbt_void_datasnapshot cb = ^(FIRDataSnapshot *snapshot) {};
  534. FValueEventRegistration *registration = [[FValueEventRegistration alloc] initWithRepo:self
  535. handle:handle
  536. callback:cb
  537. cancelCallback:nil];
  538. [watchRef.repo addEventRegistration:registration forQuery:watchRef.querySpec];
  539. fbt_void_void unwatcher = ^{ [watchRef removeObserverWithHandle:handle]; };
  540. // Save all the data that represents this transaction
  541. FTupleTransaction* transaction = [[FTupleTransaction alloc] init];
  542. transaction.path = path;
  543. transaction.update = update;
  544. transaction.onComplete = onComplete;
  545. transaction.status = FTransactionInitializing;
  546. transaction.order = [FUtilities LUIDGenerator];
  547. transaction.applyLocally = applyLocally;
  548. transaction.retryCount = 0;
  549. transaction.unwatcher = unwatcher;
  550. transaction.currentWriteId = nil;
  551. transaction.currentInputSnapshot = nil;
  552. transaction.currentOutputSnapshotRaw = nil;
  553. transaction.currentOutputSnapshotResolved = nil;
  554. // Run transaction initially
  555. id<FNode> currentState = [self latestStateAtPath:path excludeWriteIds:nil];
  556. transaction.currentInputSnapshot = currentState;
  557. FIRMutableData * mutableCurrent = [[FIRMutableData alloc] initWithNode:currentState];
  558. FIRTransactionResult * result = transaction.update(mutableCurrent);
  559. if (!result.isSuccess) {
  560. // Abort the transaction
  561. transaction.unwatcher();
  562. transaction.currentOutputSnapshotRaw = nil;
  563. transaction.currentOutputSnapshotResolved = nil;
  564. if (transaction.onComplete) {
  565. FIRDatabaseReference *ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path];
  566. FIndexedNode *indexedNode = [FIndexedNode indexedNodeWithNode:transaction.currentInputSnapshot];
  567. FIRDataSnapshot *snap = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:indexedNode];
  568. [self.eventRaiser raiseCallback:^{
  569. transaction.onComplete(nil, NO, snap);
  570. }];
  571. }
  572. } else {
  573. // Note: different from js. We don't need to validate, FIRMutableData does validation.
  574. // We also don't have to worry about priorities. Just mark as run and add to queue.
  575. transaction.status = FTransactionRun;
  576. FTree* queueNode = [self.transactionQueueTree subTree:transaction.path];
  577. NSMutableArray* nodeQueue = [queueNode getValue];
  578. if (nodeQueue == nil) {
  579. nodeQueue = [[NSMutableArray alloc] init];
  580. }
  581. [nodeQueue addObject:transaction];
  582. [queueNode setValue:nodeQueue];
  583. // Update visibleData and raise events
  584. // Note: We intentionally raise events after updating all of our transaction state, since the user could
  585. // start new transactions from the event callbacks
  586. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  587. id<FNode> newValUnresolved = [result.update nodeValue];
  588. id<FNode> newVal = [FServerValues resolveDeferredValueSnapshot:newValUnresolved withServerValues:serverValues];
  589. transaction.currentOutputSnapshotRaw = newValUnresolved;
  590. transaction.currentOutputSnapshotResolved = newVal;
  591. transaction.currentWriteId = [NSNumber numberWithInteger:[self nextWriteId]];
  592. NSArray *events = [self.serverSyncTree applyUserOverwriteAtPath:path newData:newVal
  593. writeId:[transaction.currentWriteId integerValue]
  594. isVisible:transaction.applyLocally];
  595. [self.eventRaiser raiseEvents:events];
  596. [self sendAllReadyTransactions];
  597. }
  598. }
  599. /**
  600. * @param writeIdsToExclude A specific set to exclude
  601. */
  602. - (id<FNode>) latestStateAtPath:(FPath *)path excludeWriteIds:(NSArray *)writeIdsToExclude {
  603. id<FNode> latestState = [self.serverSyncTree calcCompleteEventCacheAtPath:path excludeWriteIds:writeIdsToExclude];
  604. return latestState ? latestState : [FEmptyNode emptyNode];
  605. }
  606. /**
  607. * Sends any already-run transactions that aren't waiting for outstanding transactions to complete.
  608. *
  609. * Externally, call the version with no arguments.
  610. * Internally, calls itself recursively with a particular transactionQueueTree node to recurse through the tree
  611. */
  612. - (void) sendAllReadyTransactions {
  613. FTree* node = self.transactionQueueTree;
  614. [self pruneCompletedTransactionsBelowNode:node];
  615. [self sendReadyTransactionsForTree:node];
  616. }
  617. - (void) sendReadyTransactionsForTree:(FTree *)node {
  618. NSMutableArray* queue = [node getValue];
  619. if (queue != nil) {
  620. queue = [self buildTransactionQueueAtNode:node];
  621. NSAssert([queue count] > 0, @"Sending zero length transaction queue");
  622. NSUInteger notRunIndex = [queue indexOfObjectPassingTest:^BOOL(id obj, NSUInteger idx, BOOL *stop) {
  623. return ((FTupleTransaction*)obj).status != FTransactionRun;
  624. }];
  625. // If they're all run (and not sent), we can send them. Else, we must wait.
  626. if (notRunIndex == NSNotFound) {
  627. [self sendTransactionQueue:queue atPath:node.path];
  628. }
  629. } else if ([node hasChildren]) {
  630. [node forEachChild:^(FTree *child) {
  631. [self sendReadyTransactionsForTree:child];
  632. }];
  633. }
  634. }
  635. /**
  636. * Given a list of run transactions, send them to the server and then handle the result (success or failure).
  637. */
  638. - (void) sendTransactionQueue:(NSMutableArray *)queue atPath:(FPath *)path {
  639. // Mark transactions as sent and bump the retry count
  640. NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init];
  641. for (FTupleTransaction *transaction in queue) {
  642. [writeIdsToExclude addObject:transaction.currentWriteId];
  643. }
  644. id<FNode> latestState = [self latestStateAtPath:path excludeWriteIds:writeIdsToExclude];
  645. id<FNode> snapToSend = latestState;
  646. NSString *latestHash = [latestState dataHash];
  647. for (FTupleTransaction* transaction in queue) {
  648. NSAssert(transaction.status == FTransactionRun, @"[FRepo sendTransactionQueue:] items in queue should all be run.");
  649. FFLog(@"I-RDB038021", @"Transaction at %@ set to SENT", transaction.path);
  650. transaction.status = FTransactionSent;
  651. transaction.retryCount++;
  652. FPath *relativePath = [FPath relativePathFrom:path to:transaction.path];
  653. // If we've gotten to this point, the output snapshot must be defined.
  654. snapToSend = [snapToSend updateChild:relativePath withNewChild:transaction.currentOutputSnapshotRaw];
  655. }
  656. id dataToSend = [snapToSend valForExport:YES];
  657. NSString *pathToSend = [path description];
  658. latestHash = self.hijackHash ? @"badhash" : latestHash;
  659. // Send the put
  660. [self.connection putData:dataToSend forPath:pathToSend withHash:latestHash withCallback:^(NSString *status, NSString *errorReason) {
  661. FFLog(@"I-RDB038022", @"Transaction put response: %@ : %@", pathToSend, status);
  662. NSMutableArray *events = [[NSMutableArray alloc] init];
  663. if ([status isEqualToString:kFWPResponseForActionStatusOk]) {
  664. // Queue up the callbacks and fire them after cleaning up all of our transaction state, since
  665. // the callback could trigger more transactions or sets.
  666. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  667. for (FTupleTransaction *transaction in queue) {
  668. transaction.status = FTransactionCompleted;
  669. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  670. revert:NO
  671. persist:NO
  672. clock:self.serverClock]];
  673. if (transaction.onComplete) {
  674. // We never unset the output snapshot, and given that this transaction is complete, it should be set
  675. id <FNode> node = transaction.currentOutputSnapshotResolved;
  676. FIndexedNode *indexedNode = [FIndexedNode indexedNodeWithNode:node];
  677. FIRDatabaseReference *ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path];
  678. FIRDataSnapshot *snapshot = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:indexedNode];
  679. fbt_void_void cb = ^{
  680. transaction.onComplete(nil, YES, snapshot);
  681. };
  682. [callbacks addObject:[cb copy]];
  683. }
  684. transaction.unwatcher();
  685. }
  686. // Now remove the completed transactions.
  687. [self pruneCompletedTransactionsBelowNode:[self.transactionQueueTree subTree:path]];
  688. // There may be pending transactions that we can now send.
  689. [self sendAllReadyTransactions];
  690. // Finally, trigger onComplete callbacks
  691. [self.eventRaiser raiseCallbacks:callbacks];
  692. } else {
  693. // transactions are no longer sent. Update their status appropriately.
  694. if ([status isEqualToString:kFWPResponseForActionStatusDataStale]) {
  695. for (FTupleTransaction *transaction in queue) {
  696. if (transaction.status == FTransactionSentNeedsAbort) {
  697. transaction.status = FTransactionNeedsAbort;
  698. } else {
  699. transaction.status = FTransactionRun;
  700. }
  701. }
  702. } else {
  703. FFWarn(@"I-RDB038023", @"runTransactionBlock: at %@ failed: %@", path, status);
  704. for (FTupleTransaction *transaction in queue) {
  705. transaction.status = FTransactionNeedsAbort;
  706. [transaction setAbortStatus:status reason:errorReason];
  707. }
  708. }
  709. }
  710. [self rerunTransactionsForPath:path];
  711. [self.eventRaiser raiseEvents:events];
  712. }];
  713. }
  714. /**
  715. * Finds all transactions dependent on the data at changed Path and reruns them.
  716. *
  717. * Should be called any time cached data changes.
  718. *
  719. * Return the highest path that was affected by rerunning transactions. This is the path at which events need to
  720. * be raised for.
  721. */
  722. - (FPath *) rerunTransactionsForPath:(FPath *)changedPath {
  723. // For the common case that there are no transactions going on, skip all this!
  724. if ([self.transactionQueueTree isEmpty]) {
  725. return changedPath;
  726. } else {
  727. FTree* rootMostTransactionNode = [self getAncestorTransactionNodeForPath:changedPath];
  728. FPath* path = rootMostTransactionNode.path;
  729. NSArray* queue = [self buildTransactionQueueAtNode:rootMostTransactionNode];
  730. [self rerunTransactionQueue:queue atPath:path];
  731. return path;
  732. }
  733. }
  734. /**
  735. * Does all the work of rerunning transactions (as well as cleans up aborted transactions and whatnot).
  736. */
  737. - (void) rerunTransactionQueue:(NSArray *)queue atPath:(FPath *)path {
  738. if (queue.count == 0) {
  739. return; // nothing to do
  740. }
  741. // Queue up the callbacks and fire them after cleaning up all of our transaction state, since
  742. // the callback could trigger more transactions or sets.
  743. NSMutableArray *events = [[NSMutableArray alloc] init];
  744. NSMutableArray *callbacks = [[NSMutableArray alloc] init];
  745. // Ignore, by default, all of the sets in this queue, since we're re-running all of them. However, we want to include
  746. // the results of new sets triggered as part of this re-run, so we don't want to ignore a range, just these specific
  747. // sets.
  748. NSMutableArray *writeIdsToExclude = [[NSMutableArray alloc] init];
  749. for (FTupleTransaction *transaction in queue) {
  750. [writeIdsToExclude addObject:transaction.currentWriteId];
  751. }
  752. for (FTupleTransaction* transaction in queue) {
  753. FPath* relativePath __unused = [FPath relativePathFrom:path to:transaction.path];
  754. BOOL abortTransaction = NO;
  755. NSAssert(relativePath != nil, @"[FRepo rerunTransactionsQueue:] relativePath should not be null.");
  756. if (transaction.status == FTransactionNeedsAbort) {
  757. abortTransaction = YES;
  758. if (![transaction.abortStatus isEqualToString:kFErrorWriteCanceled]) {
  759. NSArray *ackEvents = [self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  760. revert:YES
  761. persist:NO
  762. clock:self.serverClock];
  763. [events addObjectsFromArray:ackEvents];
  764. }
  765. } else if (transaction.status == FTransactionRun) {
  766. if (transaction.retryCount >= kFTransactionMaxRetries) {
  767. abortTransaction = YES;
  768. [transaction setAbortStatus:kFTransactionTooManyRetries reason:nil];
  769. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  770. revert:YES
  771. persist:NO
  772. clock:self.serverClock]];
  773. } else {
  774. // This code reruns a transaction
  775. id<FNode> currentNode = [self latestStateAtPath:transaction.path excludeWriteIds:writeIdsToExclude];
  776. transaction.currentInputSnapshot = currentNode;
  777. FIRMutableData * mutableCurrent = [[FIRMutableData alloc] initWithNode:currentNode];
  778. FIRTransactionResult * result = transaction.update(mutableCurrent);
  779. if (result.isSuccess) {
  780. NSNumber *oldWriteId = transaction.currentWriteId;
  781. NSDictionary* serverValues = [FServerValues generateServerValues:self.serverClock];
  782. id<FNode> newVal = [result.update nodeValue];
  783. id<FNode> newValResolved = [FServerValues resolveDeferredValueSnapshot:newVal withServerValues:serverValues];
  784. transaction.currentOutputSnapshotRaw = newVal;
  785. transaction.currentOutputSnapshotResolved = newValResolved;
  786. transaction.currentWriteId = [NSNumber numberWithInteger:[self nextWriteId]];
  787. // Mutates writeIdsToExclude in place
  788. [writeIdsToExclude removeObject:oldWriteId];
  789. [events addObjectsFromArray:[self.serverSyncTree applyUserOverwriteAtPath:transaction.path
  790. newData:transaction.currentOutputSnapshotResolved
  791. writeId:[transaction.currentWriteId integerValue]
  792. isVisible:transaction.applyLocally]];
  793. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[oldWriteId integerValue]
  794. revert:YES
  795. persist:NO
  796. clock:self.serverClock]];
  797. } else {
  798. abortTransaction = YES;
  799. // The user aborted the transaction. JS treats ths as a "nodata" abort, but it's not an error, so we don't send them an error.
  800. [transaction setAbortStatus:nil reason:nil];
  801. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  802. revert:YES
  803. persist:NO
  804. clock:self.serverClock]];
  805. }
  806. }
  807. }
  808. [self.eventRaiser raiseEvents:events];
  809. events = nil;
  810. if (abortTransaction) {
  811. // Abort
  812. transaction.status = FTransactionCompleted;
  813. transaction.unwatcher();
  814. if (transaction.onComplete) {
  815. FIRDatabaseReference * ref = [[FIRDatabaseReference alloc] initWithRepo:self path:transaction.path];
  816. FIndexedNode *lastInput = [FIndexedNode indexedNodeWithNode:transaction.currentInputSnapshot];
  817. FIRDataSnapshot * snap = [[FIRDataSnapshot alloc] initWithRef:ref indexedNode:lastInput];
  818. fbt_void_void cb = ^{
  819. // Unlike JS, no need to check for "nodata" because ObjC has abortError = nil
  820. transaction.onComplete(transaction.abortError, NO, snap);
  821. };
  822. [callbacks addObject:[cb copy]];
  823. }
  824. }
  825. }
  826. // Note: unlike current js client, we don't need to preserve priority. Users can set priority via FIRMutableData
  827. // Clean up completed transactions.
  828. [self pruneCompletedTransactionsBelowNode:self.transactionQueueTree];
  829. // Now fire callbacks, now that we're in a good, known state.
  830. [self.eventRaiser raiseCallbacks:callbacks];
  831. // Try to send the transaction result to the server
  832. [self sendAllReadyTransactions];
  833. }
  834. - (FTree *) getAncestorTransactionNodeForPath:(FPath *)path {
  835. FTree* transactionNode = self.transactionQueueTree;
  836. while (![path isEmpty] && [transactionNode getValue] == nil) {
  837. NSString* front = [path getFront];
  838. transactionNode = [transactionNode subTree:[[FPath alloc] initWith:front]];
  839. path = [path popFront];
  840. }
  841. return transactionNode;
  842. }
  843. - (NSMutableArray *) buildTransactionQueueAtNode:(FTree *)node {
  844. NSMutableArray* queue = [[NSMutableArray alloc] init];
  845. [self aggregateTransactionQueuesForNode:node andQueue:queue];
  846. [queue sortUsingComparator:^NSComparisonResult(FTupleTransaction* obj1, FTupleTransaction* obj2) {
  847. return [obj1.order compare:obj2.order];
  848. }];
  849. return queue;
  850. }
  851. - (void) aggregateTransactionQueuesForNode:(FTree *)node andQueue:(NSMutableArray *)queue {
  852. NSArray* nodeQueue = [node getValue];
  853. [queue addObjectsFromArray:nodeQueue];
  854. [node forEachChild:^(FTree *child) {
  855. [self aggregateTransactionQueuesForNode:child andQueue:queue];
  856. }];
  857. }
  858. /**
  859. * Remove COMPLETED transactions at or below this node in the transactionQueueTree
  860. */
  861. - (void) pruneCompletedTransactionsBelowNode:(FTree *)node {
  862. NSMutableArray* queue = [node getValue];
  863. if (queue != nil) {
  864. int i = 0;
  865. // remove all of the completed transactions from the queue
  866. while (i < queue.count) {
  867. FTupleTransaction* transaction = [queue objectAtIndex:i];
  868. if (transaction.status == FTransactionCompleted) {
  869. [queue removeObjectAtIndex:i];
  870. } else {
  871. i++;
  872. }
  873. }
  874. if (queue.count > 0) {
  875. [node setValue:queue];
  876. } else {
  877. [node setValue:nil];
  878. }
  879. }
  880. [node forEachChildMutationSafe:^(FTree *child) {
  881. [self pruneCompletedTransactionsBelowNode:child];
  882. }];
  883. }
  884. /**
  885. * Aborts all transactions on ancestors or descendants of the specified path. Called when doing a setValue: or
  886. * updateChildValues: since we consider them incompatible with transactions
  887. *
  888. * @param path path for which we want to abort related transactions.
  889. */
  890. - (FPath *) abortTransactionsAtPath:(FPath *)path error:(NSString *)error {
  891. // For the common case that there are no transactions going on, skip all this!
  892. if ([self.transactionQueueTree isEmpty]) {
  893. return path;
  894. } else {
  895. FPath* affectedPath = [self getAncestorTransactionNodeForPath:path].path;
  896. FTree* transactionNode = [self.transactionQueueTree subTree:path];
  897. [transactionNode forEachAncestor:^BOOL(FTree *ancestor) {
  898. [self abortTransactionsAtNode:ancestor error:error];
  899. return NO;
  900. }];
  901. [self abortTransactionsAtNode:transactionNode error:error];
  902. [transactionNode forEachDescendant:^(FTree *child) {
  903. [self abortTransactionsAtNode:child error:error];
  904. }];
  905. return affectedPath;
  906. }
  907. }
  908. /**
  909. * Abort transactions stored in this transactions queue node.
  910. *
  911. * @param node Node to abort transactions for.
  912. */
  913. - (void) abortTransactionsAtNode:(FTree *)node error:(NSString *)error {
  914. NSMutableArray* queue = [node getValue];
  915. if (queue != nil) {
  916. // Queue up the callbacks and fire them after cleaning up all of our transaction state, since
  917. // can be immediately aborted and removed.
  918. NSMutableArray* callbacks = [[NSMutableArray alloc] init];
  919. // Go through queue. Any already-sent transactions must be marked for abort, while the unsent ones
  920. // can be immediately aborted and removed
  921. NSMutableArray *events = [[NSMutableArray alloc] init];
  922. int lastSent = -1;
  923. // Note: all of the sent transactions will be at the front of the queue, so safe to increment lastSent
  924. for (FTupleTransaction* transaction in queue) {
  925. if (transaction.status == FTransactionSentNeedsAbort) {
  926. // No-op. already marked.
  927. } else if (transaction.status == FTransactionSent) {
  928. // Mark this transaction for abort when it returns
  929. lastSent++;
  930. transaction.status = FTransactionSentNeedsAbort;
  931. [transaction setAbortStatus:error reason:nil];
  932. } else {
  933. // we can abort this immediately
  934. transaction.unwatcher();
  935. if ([error isEqualToString:kFTransactionSet]) {
  936. [events addObjectsFromArray:[self.serverSyncTree ackUserWriteWithWriteId:[transaction.currentWriteId integerValue]
  937. revert:YES
  938. persist:NO
  939. clock:self.serverClock]];
  940. } else {
  941. // If it was cancelled it was already removed from the sync tree, no need to ack
  942. NSAssert([error isEqualToString:kFErrorWriteCanceled], nil);
  943. }
  944. if (transaction.onComplete) {
  945. NSError* abortReason = [FUtilities errorForStatus:error andReason:nil];
  946. FIRDataSnapshot * snapshot = nil;
  947. fbt_void_void cb = ^{
  948. transaction.onComplete(abortReason, NO, snapshot);
  949. };
  950. [callbacks addObject:[cb copy]];
  951. }
  952. }
  953. }
  954. if (lastSent == -1) {
  955. // We're not waiting for any sent transactions. We can clear the queue.
  956. [node setValue:nil];
  957. } else {
  958. // Remove the transactions we aborted
  959. NSRange theRange;
  960. theRange.location = lastSent + 1;
  961. theRange.length = queue.count - theRange.location;
  962. [queue removeObjectsInRange:theRange];
  963. }
  964. // Now fire the callbacks
  965. [self.eventRaiser raiseEvents:events];
  966. [self.eventRaiser raiseCallbacks:callbacks];
  967. }
  968. }
  969. @end