|
|
@@ -103,6 +103,10 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private func withError(_ message: String) -> Pipeline {
|
|
|
+ return Pipeline(stages: [], db: db, errorMessage: message)
|
|
|
+ }
|
|
|
+
|
|
|
/// Executes the defined pipeline and returns a `Pipeline.Snapshot` containing the results.
|
|
|
///
|
|
|
/// This method asynchronously sends the pipeline definition to Firestore for execution.
|
|
|
@@ -163,11 +167,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func addFields(_ selectables: [Selectable]) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let addFieldsStage = AddFields(selectables: selectables)
|
|
|
if let errorMessage = addFieldsStage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
return Pipeline(stages: stages + [addFieldsStage], db: db)
|
|
|
}
|
|
|
@@ -185,11 +189,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func removeFields(_ fields: [Field]) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = RemoveFieldsStage(fields: fields)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(
|
|
|
stages: stages + [stage],
|
|
|
@@ -211,11 +215,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func removeFields(_ fields: [String]) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = RemoveFieldsStage(fields: fields)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(
|
|
|
stages: stages + [stage],
|
|
|
@@ -251,11 +255,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func select(_ selections: [Selectable]) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let selectStage = Select(selections: selections)
|
|
|
if let errorMessage = selectStage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
return Pipeline(stages: stages + [selectStage], db: db)
|
|
|
}
|
|
|
@@ -276,12 +280,12 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func select(_ selections: [String]) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let selections = selections.map { Field($0) }
|
|
|
let stage = Select(selections: selections)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(
|
|
|
stages: stages + [stage],
|
|
|
@@ -312,11 +316,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func `where`(_ condition: BooleanExpression) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = Where(condition: condition)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -342,11 +346,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func offset(_ offset: Int32) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = Offset(offset)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -373,11 +377,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func limit(_ limit: Int32) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = Limit(limit)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -401,12 +405,12 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func distinct(_ groups: [String]) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let selections = groups.map { Field($0) }
|
|
|
let stage = Distinct(groups: selections)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -438,11 +442,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func distinct(_ groups: [Selectable]) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let distinctStage = Distinct(groups: groups)
|
|
|
if let errorMessage = distinctStage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
return Pipeline(stages: stages + [distinctStage], db: db)
|
|
|
}
|
|
|
@@ -480,11 +484,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
public func aggregate(_ aggregates: [AliasedAggregate],
|
|
|
groups: [Selectable]? = nil) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let aggregateStage = Aggregate(accumulators: aggregates, groups: groups)
|
|
|
if let errorMessage = aggregateStage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
return Pipeline(stages: stages + [aggregateStage], db: db)
|
|
|
}
|
|
|
@@ -520,7 +524,7 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
limit: Int? = nil,
|
|
|
distanceField: String? = nil) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = FindNearest(
|
|
|
field: field,
|
|
|
@@ -530,7 +534,7 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
distanceField: distanceField
|
|
|
)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -556,11 +560,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func sort(_ orderings: [Ordering]) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = Sort(orderings: orderings)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -588,11 +592,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func replace(with expression: Expression) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = ReplaceWith(expr: expression)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -621,11 +625,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func replace(with fieldName: String) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = ReplaceWith(expr: Field(fieldName))
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -648,11 +652,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func sample(count: Int64) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = Sample(count: count)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -675,11 +679,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func sample(percentage: Double) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = Sample(percentage: percentage)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -706,11 +710,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func union(with other: Pipeline) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = Union(other: other)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -756,11 +760,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
/// - Returns: A new `Pipeline` object with this stage appended.
|
|
|
public func unnest(_ field: Selectable, indexField: String? = nil) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = Unnest(field: field, indexField: indexField)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|
|
|
@@ -794,11 +798,11 @@ public struct Pipeline: @unchecked Sendable {
|
|
|
public func rawStage(name: String, params: [Sendable],
|
|
|
options: [String: Sendable]? = nil) -> Pipeline {
|
|
|
if let errorMessage = errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
}
|
|
|
let stage = RawStage(name: name, params: params, options: options)
|
|
|
if let errorMessage = stage.errorMessage {
|
|
|
- return Pipeline(stages: [], db: db, errorMessage: errorMessage)
|
|
|
+ return withError(errorMessage)
|
|
|
} else {
|
|
|
return Pipeline(stages: stages + [stage], db: db)
|
|
|
}
|