Interface PipelineStage


public interface PipelineStage
The atomic building bock of QueryApi queries.

A PipelineStage can be seen as a data processing node that

  1. gets data from an source: either a Tables.Table or a PipelineStage
  2. transforms it
  3. provides the transformed data as output which can be used either as input for another pipeline stage or directly fetched using stream(Function)

A PipelineStage can be instantiated via QueryApi.source(Tables.Table, List, Expression), QueryApi.source(Tables.Table, List) or QueryApi.source(Tables.Table) methods.

Once instantiated, another data transformation stage can be applied to its output, such as:

Each of those methods returns a PipelineStage so that multiple of stages can be chained to form a data transformation pipeline.

Once all stages are specified, the stream(Function) method can be used to query and fetch data from the database.

Here is an example of a data pipeline chaining an innerJoin, a filter and finally an aggregation stage.


 def date = new Date()
 def qapi = api.queryApi()
 def products = q.tables().products()
 def pExtCost =  q.tables().productExtensionRows("Cost")
 return qapi.source(products, [products.sku(), products.ProductGroup])
     // joins the "Cost" product extension
     .innerJoin(
         pExtCost,
         [pExtCost.Cost, pExtCost.Currency],
         { prev -> pExtCost.sku().equal(prev.sku) }
     // retain only valid cost entries
     ).filter({ prev ->
         qapi.exprs().and(
             prev.ValidFrom.lessOrEqual(date),
             prev.ValidTo.greaterOrEqual(date)
         )
      })
      // computes the average cost by ProductGroup and Currency
     .aggregateBy(
          {prev -> [prev.ProductGroup, prev.Currency]},
          {prev -> [qapi.exprs().avg(prev.Cost).as("averageCost")]}
      )
      // fetches the result and do something with it
     .stream { it.each { row ->
         // do something with the row
      }}
 
Since:
14.0 - Caribou Lou
  • Method Details

    • innerJoin

      PipelineStage innerJoin(Tables.Table table, Function<Tables.Columns,List<? extends Selectable>> selectables, Function<Tables.Columns,Expression> joinCriteria)
      Adds an innerJoin stage that joins this PipelineStage to another Tables.Table.

      As an SQL join:

      • the list of columns or expressions to be joined to the previous pipeline stage should be provided by the selectable parameter.
      • the joining criteria as a boolean expression should also be provided a the join criteria.
        ⚠ It is strongly recommended that the joining conditions use all fields of the database primary index.
      Both of those parameters should be constructed via a function that gets as argument an accessor to the previous stage columns.

      Here is an example of a simple innerJoin:

      
       def date = new Date()
       def qapi = api.queryApi()
       def products = q.tables().products()
       def pExtCost =  q.tables().productExtensionRows("Cost")
       return qapi.source(products, [products.sku(), products.ProductGroup])
           // joins the "Cost" product extension
           .innerJoin(
               pExtCost,
               [pExtCost.Cost, pExtCost.Currency],
               { prev -> qapi.exprs().and(
                   pExtCost.sku().equal(prev.sku),
                   prev.ValidFrom.lessOrEqual(date),
                   prev.ValidTo.greaterOrEqual(date)
               )}
            )
            // computes the average cost by ProductGroup and Currency
           .aggregateBy(
                {prev -> [prev.ProductGroup, prev.Currency]},
                {prev -> [qapi.exprs().avg(prev.Cost).as("averageCost")]}
            )
            // fetches the result and do something with it
           .stream { it.each { row ->
               // do something with the row
            }}
       
      Parameters:
      table - the Table to be joined to
      selectables - a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.
      joinCriteria - a function getting the reference to this stage output columns and returning the joining criteria
      Returns:
      a PipelineStage as a result of applying this innerJoin stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • leftOuterJoin

      PipelineStage leftOuterJoin(Tables.Table table, Function<Tables.Columns,List<? extends Selectable>> selectables, Function<Tables.Columns,Expression> joinCriteria)
      Adds a leftOuterJoin stage that joins to this PipelineStage another Tables.Table.
      Parameters:
      table - the Table to be joined to
      selectables - a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.
      joinCriteria - the joining criteria
      Returns:
      a PipelineStage as a result of applying this leftOuterJoin stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • aggregateBy

      PipelineStage aggregateBy(Function<Tables.Columns,List<? extends Expression>> by, Function<Tables.Columns,List<? extends Selectable>> selectables)
      Adds a stage that aggregates the outputs of this stage by a given set of expressions.

      It is similar to the standard GROUP BY sql clause.

      As an example here is a pipeline computing the minimum competition price for each sku

      
       def qapi = api.queryApi()
       def competition = qapi.tables().productExtension("Competition")
       def exprs = qapi.exprs()
      
       return qapi.source(competition, [competition.sku(), competition.Price])
               .aggregateBy({ prev -> [prev.sku] }, { prev -> [prev.sku, qapi.exprs().min(prev.Price).as("MinPrice")] })
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      by - a function getting the reference to this stage output columns and returning the list of expressions to group by
      selectables - a function getting the reference to this stage output columns and returning a list of named aggregated expressions using the reference to the last stage output. QueryApi will accept either aggregated expressions using an aggregation function or a non aggregated expression belonging to the result of the by parameter
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • aggregate

      PipelineStage aggregate(Function<Tables.Columns,List<? extends Selectable>> selectables)
      Adds a stage that aggregates all this stage output rows.

      As an example here is a pipeline computing the number of rows in the Competition product extension table

      
       def qapi = api.queryApi()
       def competition = qapi.tables().productExtension("Competition")
      
       return qapi.source(competition)
               .aggregate { prev -> [qapi.exprs().count().as("nbRows")] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      selectables - a function getting the reference to this stage output columns and returning a list of named aggregated expressions. QueryApi will only accept aggregated expressions
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • filter

      Adds a stage that only keeps this stage output rows which verify the given expression.

      As an example, here is a pipeline filtering out null cost products

      
       def qapi = api.queryApi()
       def products = qapi.tables().products()
       return qapi.source(products, [products.sku(), products.Cost])
               .filter { prev -> prev.Cost.isNotNull() }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      condition - a function getting the reference to this stage output columns and returning the filtering expression
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
    • sortBy

      Adds a stage that sorts this stage output rows.

      As an example, here is a pipeline sorting

      
       def qapi = api.queryApi()
       def prodHierarchy = qapi.tables().companyParameterRows("ProductHierarchy")
       return qapi.source(prodHierarchy, [prodHierarchy.ProductGroup, prodHierarchy.ProductLine])
               .sortBy { cols -> [qapi.orders().ascNullsLast(cols.ProductGroup), qapi.orders().ascNullsLast(cols.ProductLine)] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       

      Ordering should be expressed as a list of Orders.Order which can be built via QueryApi.orders():

      Parameters:
      orders - a function getting the reference to this stage output columns and returning the list of orders that specify the expecting row sorting.
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
    • take

      PipelineStage take(int maxResults)
      Adds a stage that only returns the first maxResult rows of this pipeline stage output.

      If there is less than maxResult rows, then the complete set of rows will be returned.

      As an example, here is a pipeline getting the first 10 least costing products.

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p)
               .sortBy { prev -> [qapi.orders().ascNullsLast(prev.Cost)] }
               .take(10)
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      maxResults - the maximum number of row to take from the start of the previous stage output row set
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
    • distinct

      PipelineStage distinct()
      Adds a stage that remove duplicate rows from this stage output row set.
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
    • addColumns

      PipelineStage addColumns(Function<Tables.Columns,List<? extends Selectable>> expressions)
      Adds a stage that add columns to this stage output.

      This operation is commonly used to enrich the data set with expressions combining several columns.

      As an example here is a pipeline that adds computes a ProductGroup column from the content of the sku column.

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p, [p.sku(), p.label()])
               .addColumns { prev -> [qapi.exprs().caseWhen(prev.sku.like("foo%"), qapi.exprs().string("FooStyle"), qapi.exprs().string("Generic")).as("ProductGroup")] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      expressions - a function getting the reference to this stage output columns and returning the list of columns to be added to the previous stage output columns
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • retainColumns

      PipelineStage retainColumns(Function<Tables.Columns,List<? extends Selectable>> columns)
      Adds a stage that keeps only the given columns from this stage output.

      As an example here is a very simple example that only keeps the sku and the label from the product table

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p)
               .filter { prev -> prev.Cost.isNotNull() }
               .retainColumns { prev -> [prev.sku, prev.label] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      columns - a function getting the reference to this stage output columns and returning the list of columns to be kept
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • removeColumns

      PipelineStage removeColumns(Function<Tables.Columns,List<? extends Selectable>> columns)
      Adds a stage that remove the given columns from this stage output.

      As an example here is a very simple example that only remove a column previously used for filtering out rows

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p, [p.sku(), p.label(), p.Cost])
               .filter { prev -> prev.Cost.isNotNull() }
               .removeColumns { prev -> [prev.Cost] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      columns - a function getting the reference to this stage output columns returning the list of columns to be removed
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • stream

      <T> T stream(Function<PipelineStage.ResultStream,T> function)
      Queries the database, fetches the result as a stream and provides it to the given consumer function.

      The function argument should iterate over the PipelineStage.ResultStream to either do a side effect on each row or compute a result from the stream.
      After the execution of this function, the stream will be automatically closed.

      A connection to the database is kept open during the entire stream iteration, this is why the processing of each row should be as fast as possible. The client code should then wisely choose the right tradeoff between:

      • either collecting quickly big amount of records into memory using collect {it} and processing it later → which may lead to OutOfMemory exception
      • or processing it "on the flight" → which may lead to too long processing time.

      If the result content is not that big, then the usual way of getting the result will be using .stream { it.toList() }. In that case the return type will be a List<Map<String, Object>> each map being the representation of a result row. As an example, this code will return [[sku:"MB-0001"]]:

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p, [p.sku()], [p.sku().equal("MB-0001"))
               .stream { it.each {
                    row -> // do something with the row
                }}
       

      Even if PipelineStage.ResultStream is exposing an immutable map view for each PipelineStage.ResultStream.ResultRow, enriching this map view with other entries is possible using common non mutating Groovy Map operators like the + operator:

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p, [p.sku()])
               .stream { it.collect {
                   it + [size: it.sku.size()] // adds the 'size' entry with the size of sku String.
               }}
       
      Type Parameters:
      T - the return type of the function parameter
      Parameters:
      function - the function in charge of consuming the result stream and returning a value which will be itself the return value of this method.
      Returns:
      the value returned by function
      Since:
      14.0 - Caribou Lou
    • queryString

      String queryString()
      Builds up the query string resulting from all the stages of this pipeline.

      Only for debugging purpose.

      Returns:
      the string representation of the query that will be sent to the database when calling stream(Function)
      Since:
      14.0 - Caribou Lou
    • traceQuery

      PipelineStage traceQuery()
      Prints the result of queryString() into the current logic execution trace using PublicGroovyAPI.trace(String, String, Object).

      Only for debugging purpose.

      Returns:
      this pipeline stage
      Since:
      14.0 - Caribou Lou