Interface PipelineStage
A PipelineStage
can be seen as a data processing node that
- gets data from an source: either a
Tables.Table
or aPipelineStage
- transforms it
- provides the transformed data as output which can be used either as input for another pipeline stage or directly
fetched using
stream(Function)
A PipelineStage
can be instantiated via QueryApi.source(Tables.Table, List, Expression)
,
QueryApi.source(Tables.Table, List)
or QueryApi.source(Tables.Table)
methods.
Once instantiated, another data transformation stage can be applied to its output, such as:
innerJoin(Tables.Table, Function, Function)
leftOuterJoin(Tables.Table, Function, Function)
aggregateBy(Function, Function)
aggregate(Function)
filter(Function)
sortBy(Function)
take(int)
distinct()
addColumns(Function)
retainColumns(Function)
removeColumns(Function)
PipelineStage
so that multiple of stages can be chained to form a
data transformation pipeline.
Once all stages are specified, the stream(Function)
method can be used to query and fetch data
from the database.
Here is an example of a data pipeline chaining an innerJoin, a filter and finally an aggregation stage.
def date = new Date()
def qapi = api.queryApi()
def products = q.tables().products()
def pExtCost = q.tables().productExtensionRows("Cost")
return qapi.source(products, [products.sku(), products.ProductGroup])
// joins the "Cost" product extension
.innerJoin(
pExtCost,
[pExtCost.Cost, pExtCost.Currency],
{ prev -> pExtCost.sku().equal(prev.sku) }
// retain only valid cost entries
).filter({ prev ->
qapi.exprs().and(
prev.ValidFrom.lessOrEqual(date),
prev.ValidTo.greaterOrEqual(date)
)
})
// computes the average cost by ProductGroup and Currency
.aggregateBy(
{prev -> [prev.ProductGroup, prev.Currency]},
{prev -> [qapi.exprs().avg(prev.Cost).as("averageCost")]}
)
// fetches the result and do something with it
.stream { it.each { row ->
// do something with the row
}}
- Since:
- 14.0 - Caribou Lou
-
Nested Class Summary
Nested ClassesModifier and TypeInterfaceDescriptionstatic interface
The result type to be consumed by the function given tostream(Function)
. -
Method Summary
Modifier and TypeMethodDescriptionaddColumns
(Function<Tables.Columns, List<? extends Selectable>> expressions) Adds a stage that add columns to this stage output.aggregate
(Function<Tables.Columns, List<? extends Selectable>> selectables) Adds a stage that aggregates all this stage output rows.aggregateBy
(Function<Tables.Columns, List<? extends Expression>> by, Function<Tables.Columns, List<? extends Selectable>> selectables) Adds a stage that aggregates the outputs of this stage by a given set of expressions.distinct()
Adds a stage that remove duplicate rows from this stage output row set.filter
(Function<Tables.Columns, Expression> condition) Adds a stage that only keeps this stage output rows which verify the given expression.innerJoin
(Tables.Table table, Function<Tables.Columns, List<? extends Selectable>> selectables, Function<Tables.Columns, Expression> joinCriteria) Adds an innerJoin stage that joins thisPipelineStage
to anotherTables.Table
.leftOuterJoin
(Tables.Table table, Function<Tables.Columns, List<? extends Selectable>> selectables, Function<Tables.Columns, Expression> joinCriteria) Adds a leftOuterJoin stage that joins to thisPipelineStage
anotherTables.Table
.Builds up the query string resulting from all the stages of this pipeline.removeColumns
(Function<Tables.Columns, List<? extends Selectable>> columns) Adds a stage that remove the given columns from this stage output.retainColumns
(Function<Tables.Columns, List<? extends Selectable>> columns) Adds a stage that keeps only the given columns from this stage output.sortBy
(Function<Tables.Columns, List<Orders.Order>> orders) Adds a stage that sorts this stage output rows.<T> T
stream
(Function<PipelineStage.ResultStream, T> function) Queries the database, fetches the result as a stream and provides it to the given consumer function.take
(int maxResults) Adds a stage that only returns the firstmaxResult
rows of this pipeline stage output.Prints the result ofqueryString()
into the current logic execution trace usingPublicGroovyAPI.trace(String, String, Object)
.
-
Method Details
-
innerJoin
PipelineStage innerJoin(Tables.Table table, Function<Tables.Columns, List<? extends Selectable>> selectables, Function<Tables.Columns, Expression> joinCriteria) Adds an innerJoin stage that joins thisPipelineStage
to anotherTables.Table
.As an SQL join:
- the list of columns or expressions to be joined to the previous pipeline stage should be provided
by the
selectable
parameter. - the joining criteria as a boolean expression should also be provided a the join criteria.
⚠ It is strongly recommended that the joining conditions use all fields of the database primary index.
Here is an example of a simple innerJoin:
def date = new Date() def qapi = api.queryApi() def products = q.tables().products() def pExtCost = q.tables().productExtensionRows("Cost") return qapi.source(products, [products.sku(), products.ProductGroup]) // joins the "Cost" product extension .innerJoin( pExtCost, [pExtCost.Cost, pExtCost.Currency], { prev -> qapi.exprs().and( pExtCost.sku().equal(prev.sku), prev.ValidFrom.lessOrEqual(date), prev.ValidTo.greaterOrEqual(date) )} ) // computes the average cost by ProductGroup and Currency .aggregateBy( {prev -> [prev.ProductGroup, prev.Currency]}, {prev -> [qapi.exprs().avg(prev.Cost).as("averageCost")]} ) // fetches the result and do something with it .stream { it.each { row -> // do something with the row }}
- Parameters:
table
- the Table to be joined toselectables
- a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.joinCriteria
- a function getting the reference to this stage output columns and returning the joining criteria- Returns:
- a
PipelineStage
as a result of applying thisinnerJoin
stage - Since:
- 14.0 - Caribou Lou
- See Also:
- the list of columns or expressions to be joined to the previous pipeline stage should be provided
by the
-
leftOuterJoin
PipelineStage leftOuterJoin(Tables.Table table, Function<Tables.Columns, List<? extends Selectable>> selectables, Function<Tables.Columns, Expression> joinCriteria) Adds a leftOuterJoin stage that joins to thisPipelineStage
anotherTables.Table
.- Parameters:
table
- the Table to be joined toselectables
- a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.joinCriteria
- the joining criteria- Returns:
- a
PipelineStage
as a result of applying thisleftOuterJoin
stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
aggregateBy
PipelineStage aggregateBy(Function<Tables.Columns, List<? extends Expression>> by, Function<Tables.Columns, List<? extends Selectable>> selectables) Adds a stage that aggregates the outputs of this stage by a given set of expressions.It is similar to the standard GROUP BY sql clause.
As an example here is a pipeline computing the minimum competition price for each sku
def qapi = api.queryApi() def competition = qapi.tables().productExtension("Competition") def exprs = qapi.exprs() return qapi.source(competition, [competition.sku(), competition.Price]) .aggregateBy({ prev -> [prev.sku] }, { prev -> [prev.sku, qapi.exprs().min(prev.Price).as("MinPrice")] }) .stream { it.each { row -> // do something with the row }}
- Parameters:
by
- a function getting the reference to this stage output columns and returning the list of expressions to group byselectables
- a function getting the reference to this stage output columns and returning a list of named aggregated expressions using the reference to the last stage output. QueryApi will accept either aggregated expressions using an aggregation function or a non aggregated expression belonging to the result of theby
parameter- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
aggregate
Adds a stage that aggregates all this stage output rows.As an example here is a pipeline computing the number of rows in the Competition product extension table
def qapi = api.queryApi() def competition = qapi.tables().productExtension("Competition") return qapi.source(competition) .aggregate { prev -> [qapi.exprs().count().as("nbRows")] } .stream { it.each { row -> // do something with the row }}
- Parameters:
selectables
- a function getting the reference to this stage output columns and returning a list of named aggregated expressions. QueryApi will only accept aggregated expressions- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
filter
Adds a stage that only keeps this stage output rows which verify the given expression.As an example, here is a pipeline filtering out null cost products
def qapi = api.queryApi() def products = qapi.tables().products() return qapi.source(products, [products.sku(), products.Cost]) .filter { prev -> prev.Cost.isNotNull() } .stream { it.each { row -> // do something with the row }}
- Parameters:
condition
- a function getting the reference to this stage output columns and returning the filtering expression- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
-
sortBy
Adds a stage that sorts this stage output rows.As an example, here is a pipeline sorting
def qapi = api.queryApi() def prodHierarchy = qapi.tables().companyParameterRows("ProductHierarchy") return qapi.source(prodHierarchy, [prodHierarchy.ProductGroup, prodHierarchy.ProductLine]) .sortBy { cols -> [qapi.orders().ascNullsLast(cols.ProductGroup), qapi.orders().ascNullsLast(cols.ProductLine)] } .stream { it.each { row -> // do something with the row }}
Ordering should be expressed as a list of
Orders.Order
which can be built viaQueryApi.orders()
:Orders.ascNullsFirst(Expression)
returns an ascending order with nulls values evaluated as the lowest value.Orders.ascNullsLast(Expression)
returns an ascending order with nulls values evaluated as the highest value.Orders.descNullsFirst(Expression)
returns a descending order with nulls values evaluated as the highest value.Orders.descNullsLast(Expression)
returns a descending order with nulls values evaluated as the lower value.
- Parameters:
orders
- a function getting the reference to this stage output columns and returning the list of orders that specify the expecting row sorting.- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
-
take
Adds a stage that only returns the firstmaxResult
rows of this pipeline stage output.If there is less than
maxResult
rows, then the complete set of rows will be returned.As an example, here is a pipeline getting the first 10 least costing products.
def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p) .sortBy { prev -> [qapi.orders().ascNullsLast(prev.Cost)] } .take(10) .stream { it.each { row -> // do something with the row }}
- Parameters:
maxResults
- the maximum number of row to take from the start of the previous stage output row set- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
-
distinct
PipelineStage distinct()Adds a stage that remove duplicate rows from this stage output row set.- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
-
addColumns
Adds a stage that add columns to this stage output.This operation is commonly used to enrich the data set with expressions combining several columns.
As an example here is a pipeline that adds computes a
ProductGroup
column from the content of thesku
column.def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p, [p.sku(), p.label()]) .addColumns { prev -> [qapi.exprs().caseWhen(prev.sku.like("foo%"), qapi.exprs().string("FooStyle"), qapi.exprs().string("Generic")).as("ProductGroup")] } .stream { it.each { row -> // do something with the row }}
- Parameters:
expressions
- a function getting the reference to this stage output columns and returning the list of columns to be added to the previous stage output columns- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
retainColumns
Adds a stage that keeps only the given columns from this stage output.As an example here is a very simple example that only keeps the sku and the label from the product table
def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p) .filter { prev -> prev.Cost.isNotNull() } .retainColumns { prev -> [prev.sku, prev.label] } .stream { it.each { row -> // do something with the row }}
- Parameters:
columns
- a function getting the reference to this stage output columns and returning the list of columns to be kept- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
removeColumns
Adds a stage that remove the given columns from this stage output.As an example here is a very simple example that only remove a column previously used for filtering out rows
def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p, [p.sku(), p.label(), p.Cost]) .filter { prev -> prev.Cost.isNotNull() } .removeColumns { prev -> [prev.Cost] } .stream { it.each { row -> // do something with the row }}
- Parameters:
columns
- a function getting the reference to this stage output columns returning the list of columns to be removed- Returns:
- a
PipelineStage
as a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
stream
Queries the database, fetches the result as a stream and provides it to the given consumer function.The
function
argument should iterate over thePipelineStage.ResultStream
to either do a side effect on each row or compute a result from the stream.
After the execution of this function, the stream will be automatically closed.A connection to the database is kept open during the entire stream iteration, this is why the processing of each row should be as fast as possible. The client code should then wisely choose the right tradeoff between:
- either collecting quickly big amount of records into memory using
collect {it}
and processing it later → which may lead to OutOfMemory exception - or processing it "on the flight" → which may lead to too long processing time.
If the result content is not that big, then the usual way of getting the result will be using
.stream { it.toList() }
. In that case the return type will be aList<Map<String, Object>>
each map being the representation of a result row. As an example, this code will return[[sku:"MB-0001"]]
:def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p, [p.sku()], [p.sku().equal("MB-0001")) .stream { it.each { row -> // do something with the row }}
Even if
PipelineStage.ResultStream
is exposing an immutable map view for eachPipelineStage.ResultStream.ResultRow
, enriching this map view with other entries is possible using common non mutating Groovy Map operators like the+
operator:def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p, [p.sku()]) .stream { it.collect { it + [size: it.sku.size()] // adds the 'size' entry with the size of sku String. }}
- Type Parameters:
T
- the return type of thefunction
parameter- Parameters:
function
- the function in charge of consuming the result stream and returning a value which will be itself the return value of this method.- Returns:
- the value returned by
function
- Since:
- 14.0 - Caribou Lou
- either collecting quickly big amount of records into memory using
-
queryString
String queryString()Builds up the query string resulting from all the stages of this pipeline.Only for debugging purpose.
- Returns:
- the string representation of the query that will be sent to the database when calling
stream(Function)
- Since:
- 14.0 - Caribou Lou
-
traceQuery
PipelineStage traceQuery()Prints the result ofqueryString()
into the current logic execution trace usingPublicGroovyAPI.trace(String, String, Object)
.Only for debugging purpose.
- Returns:
- this pipeline stage
- Since:
- 14.0 - Caribou Lou
-