Aggregation

An aggregate or aggregation is a function where the values of multiple rows are processed together to form a single summary value. For performing an aggregation, DataFusion provides the aggregate()

In [1]: import urllib.request

In [2]: from datafusion import SessionContext

In [3]: from datafusion import col, lit

In [4]: from datafusion import functions as f

In [5]: urllib.request.urlretrieve(
   ...:     "https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv",
   ...:     "pokemon.csv",
   ...: )
   ...: 
Out[5]: ('pokemon.csv', <http.client.HTTPMessage at 0x7f60fab40690>)

In [6]: ctx = SessionContext()

In [7]: df = ctx.read_csv("pokemon.csv")

In [8]: col_type_1 = col('"Type 1"')

In [9]: col_type_2 = col('"Type 2"')

In [10]: col_speed = col('"Speed"')

In [11]: col_attack = col('"Attack"')

In [12]: df.aggregate([col_type_1], [
   ....:     f.approx_distinct(col_speed).alias("Count"),
   ....:     f.approx_median(col_speed).alias("Median Speed"),
   ....:     f.approx_percentile_cont(col_speed, 0.9).alias("90% Speed")])
   ....: 
Out[12]: 
DataFrame()
+----------+-------+--------------+-----------+
| Type 1   | Count | Median Speed | 90% Speed |
+----------+-------+--------------+-----------+
| Normal   | 20    | 71           | 110       |
| Ice      | 2     | 90           | 95        |
| Bug      | 11    | 63           | 107       |
| Poison   | 12    | 55           | 85        |
| Electric | 8     | 100          | 136       |
| Fairy    | 2     | 47           | 60        |
| Grass    | 8     | 55           | 80        |
| Fire     | 8     | 91           | 100       |
| Water    | 21    | 70           | 90        |
| Ground   | 7     | 40           | 112       |
+----------+-------+--------------+-----------+

When the group_by list is empty the aggregation is done over the whole DataFrame. For grouping the group_by list must contain at least one column.

In [13]: df.aggregate([col_type_1], [
   ....:     f.max(col_speed).alias("Max Speed"),
   ....:     f.avg(col_speed).alias("Avg Speed"),
   ....:     f.min(col_speed).alias("Min Speed")])
   ....: 
Out[13]: 
DataFrame()
+----------+-----------+-------------------+-----------+
| Type 1   | Max Speed | Avg Speed         | Min Speed |
+----------+-----------+-------------------+-----------+
| Grass    | 80        | 54.23076923076923 | 30        |
| Fire     | 105       | 86.28571428571429 | 60        |
| Water    | 115       | 67.25806451612904 | 15        |
| Ground   | 120       | 58.125            | 25        |
| Fighting | 95        | 66.14285714285714 | 35        |
| Psychic  | 150       | 99.25             | 42        |
| Rock     | 150       | 67.5              | 20        |
| Ghost    | 130       | 103.75            | 80        |
| Dragon   | 80        | 66.66666666666667 | 50        |
| Bug      | 145       | 66.78571428571429 | 25        |
+----------+-----------+-------------------+-----------+

More than one column can be used for grouping

In [14]: df.aggregate([col_type_1, col_type_2], [
   ....:     f.max(col_speed).alias("Max Speed"),
   ....:     f.avg(col_speed).alias("Avg Speed"),
   ....:     f.min(col_speed).alias("Min Speed")])
   ....: 
Out[14]: 
DataFrame()
+----------+--------+-----------+-------------------+-----------+
| Type 1   | Type 2 | Max Speed | Avg Speed         | Min Speed |
+----------+--------+-----------+-------------------+-----------+
| Grass    | Poison | 80        | 55.0              | 30        |
| Bug      | Flying | 105       | 93.33333333333333 | 70        |
| Poison   | Ground | 85        | 80.5              | 76        |
| Poison   | Flying | 90        | 72.5              | 55        |
| Bug      | Grass  | 30        | 27.5              | 25        |
| Water    | Poison | 100       | 85.0              | 70        |
| Electric | Steel  | 70        | 57.5              | 45        |
| Ghost    | Poison | 130       | 103.75            | 80        |
| Psychic  | Fairy  | 90        | 90.0              | 90        |
| Electric | Flying | 100       | 100.0             | 100       |
+----------+--------+-----------+-------------------+-----------+

Setting Parameters

Each of the built in aggregate functions provides arguments for the parameters that affect their operation. These can also be overridden using the builder approach to setting any of the following parameters. When you use the builder, you must call build() to finish. For example, these two expressions are equivalent.

In [15]: first_1 = f.first_value(col("a"), order_by=[col("a")])

In [16]: first_2 = f.first_value(col("a")).order_by(col("a")).build()

Ordering

You can control the order in which rows are processed by window functions by providing a list of order_by functions for the order_by parameter. In the following example, we sort the Pokemon by their attack in increasing order and take the first value, which gives us the Pokemon with the smallest attack value in each Type 1.

In [17]: df.aggregate(
   ....:     [col('"Type 1"')],
   ....:     [f.first_value(
   ....:         col('"Name"'),
   ....:         order_by=[col('"Attack"').sort(ascending=True)]
   ....:         ).alias("Smallest Attack")
   ....:     ])
   ....: 
Out[17]: 
DataFrame()
+----------+-----------------+
| Type 1   | Smallest Attack |
+----------+-----------------+
| Normal   | Chansey         |
| Ice      | Jynx            |
| Bug      | Metapod         |
| Poison   | Zubat           |
| Electric | Voltorb         |
| Fairy    | Clefairy        |
| Grass    | Exeggcute       |
| Fire     | Vulpix          |
| Water    | Magikarp        |
| Ground   | Cubone          |
+----------+-----------------+

Distinct

When you set the parameter distinct to True, then unique values will only be evaluated one time each. Suppose we want to create an array of all of the Type 2 for each Type 1 of our Pokemon set. Since there will be many entries of Type 2 we only one each distinct value.

In [18]: df.aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True).alias("Type 2 List")])
Out[18]: 
DataFrame()
+----------+--------------------------------------------------+
| Type 1   | Type 2 List                                      |
+----------+--------------------------------------------------+
| Normal   | [Flying, , Fairy]                                |
| Ice      | [Flying, Psychic]                                |
| Bug      | [, Poison, Flying, Grass]                        |
| Poison   | [, Ground, Flying]                               |
| Electric | [Steel, Flying, ]                                |
| Fairy    | []                                               |
| Grass    | [, Poison, Psychic]                              |
| Fire     | [Dragon, , Flying]                               |
| Water    | [Psychic, Poison, , Flying, Fighting, Ice, Dark] |
| Ground   | [, Rock]                                         |
+----------+--------------------------------------------------+

In the output of the above we can see that there are some Type 1 for which the Type 2 entry is null. In reality, we probably want to filter those out. We can do this in two ways. First, we can filter DataFrame rows that have no Type 2. If we do this, we might have some Type 1 entries entirely removed. The second is we can use the filter argument described below.

In [19]: df.filter(col_type_2.is_not_null()).aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True).alias("Type 2 List")])
Out[19]: 
DataFrame()
+----------+------------------------------------------------+
| Type 1   | Type 2 List                                    |
+----------+------------------------------------------------+
| Normal   | [Fairy, Flying]                                |
| Ice      | [Flying, Psychic]                              |
| Bug      | [Poison, Flying, Grass]                        |
| Poison   | [Ground, Flying]                               |
| Electric | [Steel, Flying]                                |
| Grass    | [Poison, Psychic]                              |
| Fire     | [Flying, Dragon]                               |
| Water    | [Fighting, Ice, Dark, Flying, Psychic, Poison] |
| Rock     | [Flying, Water, Ground]                        |
| Ghost    | [Poison]                                       |
+----------+------------------------------------------------+

In [20]: df.aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True, filter=col_type_2.is_not_null()).alias("Type 2 List")])
Out[20]: 
DataFrame()
+----------+------------------------------------------------+
| Type 1   | Type 2 List                                    |
+----------+------------------------------------------------+
| Normal   | [Flying, Fairy]                                |
| Ice      | [Psychic, Flying]                              |
| Bug      | [Poison, Grass, Flying]                        |
| Poison   | [Flying, Ground]                               |
| Electric | [Flying, Steel]                                |
| Fairy    |                                                |
| Grass    | [Poison, Psychic]                              |
| Fire     | [Flying, Dragon]                               |
| Water    | [Fighting, Ice, Poison, Psychic, Flying, Dark] |
| Ground   | [Rock]                                         |
+----------+------------------------------------------------+

Which approach you take should depend on your use case.

Null Treatment

This option allows you to either respect or ignore null values.

One common usage for handling nulls is the case where you want to find the first value within a partition. By setting the null treatment to ignore nulls, we can find the first non-null value in our partition.

In [21]: from datafusion.common import NullTreatment

In [22]: df.aggregate([col_type_1], [
   ....:     f.first_value(
   ....:         col_type_2,
   ....:         order_by=[col_attack],
   ....:         null_treatment=NullTreatment.RESPECT_NULLS
   ....:     ).alias("Lowest Attack Type 2")])
   ....: 
Out[22]: 
DataFrame()
+----------+----------------------+
| Type 1   | Lowest Attack Type 2 |
+----------+----------------------+
| Normal   |                      |
| Ice      | Psychic              |
| Bug      |                      |
| Poison   | Flying               |
| Electric |                      |
| Fairy    |                      |
| Grass    | Psychic              |
| Fire     |                      |
| Water    |                      |
| Ground   |                      |
+----------+----------------------+

In [23]: df.aggregate([col_type_1], [
   ....:     f.first_value(
   ....:         col_type_2,
   ....:         order_by=[col_attack],
   ....:         null_treatment=NullTreatment.IGNORE_NULLS
   ....:     ).alias("Lowest Attack Type 2")])
   ....: 
Out[23]: 
DataFrame()
+----------+----------------------+
| Type 1   | Lowest Attack Type 2 |
+----------+----------------------+
| Normal   | Flying               |
| Ice      | Psychic              |
| Bug      | Poison               |
| Poison   | Flying               |
| Electric | Steel                |
| Fairy    |                      |
| Grass    | Psychic              |
| Fire     | Flying               |
| Water    | Poison               |
| Ground   | Rock                 |
+----------+----------------------+

Filter

Using the filter option is useful for filtering results to include in the aggregate function. It can be seen in the example above on how this can be useful to only filter rows evaluated by the aggregate function without filtering rows from the entire DataFrame.

Filter takes a single expression.

Suppose we want to find the speed values for only Pokemon that have low Attack values.

In [24]: df.aggregate([col_type_1], [
   ....:     f.avg(col_speed).alias("Avg Speed All"),
   ....:     f.avg(col_speed, filter=col_attack < lit(50)).alias("Avg Speed Low Attack")])
   ....: 
Out[24]: 
DataFrame()
+----------+--------------------+----------------------+
| Type 1   | Avg Speed All      | Avg Speed Low Attack |
+----------+--------------------+----------------------+
| Bug      | 66.78571428571429  | 46.0                 |
| Poison   | 58.785714285714285 | 48.0                 |
| Electric | 98.88888888888889  | 72.5                 |
| Fairy    | 47.5               | 35.0                 |
| Grass    | 54.23076923076923  | 42.5                 |
| Fire     | 86.28571428571429  | 65.0                 |
| Water    | 67.25806451612904  | 63.833333333333336   |
| Ground   | 58.125             |                      |
| Fighting | 66.14285714285714  |                      |
| Psychic  | 99.25              | 81.75                |
+----------+--------------------+----------------------+

Aggregate Functions

The available aggregate functions are:

  1. Comparison Functions
  2. Math Functions
  3. Array Functions
  4. Logical Functions
  5. Statistical Functions
  6. Linear Regression Functions
  7. Positional Functions
  8. String Functions
  9. Approximation Functions