Window Functions

In this section you will learn about window functions. A window function utilizes values from one or multiple rows to produce a result for each individual row, unlike an aggregate function that provides a single value for multiple rows.

The window functions are availble in the functions module.

We’ll use the pokemon dataset (from Ritchie Vink) in the following examples.

In [1]: from datafusion import SessionContext

In [2]: from datafusion import col

In [3]: from datafusion import functions as f

In [4]: ctx = SessionContext()

In [5]: df = ctx.read_csv("pokemon.csv")

Here is an example that shows how you can compare each pokemon’s speed to the speed of the previous row in the DataFrame.

In [6]: df.select(
   ...:     col('"Name"'),
   ...:     col('"Speed"'),
   ...:     f.lag(col('"Speed"')).alias("Previous Speed")
   ...: )
   ...: 
Out[6]: 
DataFrame()
+---------------------------+-------+----------------+
| Name                      | Speed | Previous Speed |
+---------------------------+-------+----------------+
| Bulbasaur                 | 45    |                |
| Ivysaur                   | 60    | 45             |
| Venusaur                  | 80    | 60             |
| VenusaurMega Venusaur     | 80    | 80             |
| Charmander                | 65    | 80             |
| Charmeleon                | 80    | 65             |
| Charizard                 | 100   | 80             |
| CharizardMega Charizard X | 100   | 100            |
| CharizardMega Charizard Y | 100   | 100            |
| Squirtle                  | 43    | 100            |
+---------------------------+-------+----------------+

Setting Parameters

Ordering

You can control the order in which rows are processed by window functions by providing a list of order_by functions for the order_by parameter.

In [7]: df.select(
   ...:     col('"Name"'),
   ...:     col('"Attack"'),
   ...:     col('"Type 1"'),
   ...:     f.rank(
   ...:         partition_by=[col('"Type 1"')],
   ...:         order_by=[col('"Attack"').sort(ascending=True)],
   ...:     ).alias("rank"),
   ...: ).sort(col('"Type 1"'), col('"Attack"'))
   ...: 
Out[7]: 
DataFrame()
+------------+--------+--------+------+
| Name       | Attack | Type 1 | rank |
+------------+--------+--------+------+
| Metapod    | 20     | Bug    | 1    |
| Kakuna     | 25     | Bug    | 2    |
| Caterpie   | 30     | Bug    | 3    |
| Weedle     | 35     | Bug    | 4    |
| Butterfree | 45     | Bug    | 5    |
| Venonat    | 55     | Bug    | 6    |
| Venomoth   | 65     | Bug    | 7    |
| Paras      | 70     | Bug    | 8    |
| Beedrill   | 90     | Bug    | 9    |
| Parasect   | 95     | Bug    | 10   |
+------------+--------+--------+------+

Partitions

A window function can take a list of partition_by columns similar to an Aggregation Function. This will cause the window values to be evaluated independently for each of the partitions. In the example above, we found the rank of each Pokemon per Type 1 partitions. We can see the first couple of each partition if we do the following:

In [8]: df.select(
   ...:     col('"Name"'),
   ...:     col('"Attack"'),
   ...:     col('"Type 1"'),
   ...:     f.rank(
   ...:         partition_by=[col('"Type 1"')],
   ...:         order_by=[col('"Attack"').sort(ascending=True)],
   ...:     ).alias("rank"),
   ...: ).filter(col("rank") < lit(3)).sort(col('"Type 1"'), col("rank"))
   ...: 
Out[8]: 
DataFrame()
+-----------+--------+----------+------+
| Name      | Attack | Type 1   | rank |
+-----------+--------+----------+------+
| Metapod   | 20     | Bug      | 1    |
| Kakuna    | 25     | Bug      | 2    |
| Dratini   | 64     | Dragon   | 1    |
| Dragonair | 84     | Dragon   | 2    |
| Voltorb   | 30     | Electric | 1    |
| Magnemite | 35     | Electric | 2    |
| Clefairy  | 45     | Fairy    | 1    |
| Clefable  | 70     | Fairy    | 2    |
| Machop    | 80     | Fighting | 1    |
| Mankey    | 80     | Fighting | 1    |
+-----------+--------+----------+------+

Window Frame

When using aggregate functions, the Window Frame of defines the rows over which it operates. If you do not specify a Window Frame, the frame will be set depending on the following criteria.

  • If an order_by clause is set, the default window frame is defined as the rows between unbounded preceeding and the current row.

  • If an order_by is not set, the default frame is defined as the rows betwene unbounded and unbounded following (the entire partition).

Window Frames are defined by three parameters: unit type, starting bound, and ending bound.

The unit types available are:

  • Rows: The starting and ending boundaries are defined by the number of rows relative to the current row.

  • Range: When using Range, the order_by clause must have exactly one term. The boundaries are defined bow how close the rows are to the value of the expression in the order_by parameter.

  • Groups: A “group” is the set of all rows that have equivalent values for all terms in the order_by clause.

In this example we perform a “rolling average” of the speed of the current Pokemon and the two preceeding rows.

In [9]: from datafusion.expr import WindowFrame

In [10]: df.select(
   ....:     col('"Name"'),
   ....:     col('"Speed"'),
   ....:     f.window("avg",
   ....:         [col('"Speed"')],
   ....:         order_by=[col('"Speed"')],
   ....:         window_frame=WindowFrame("rows", 2, 0)
   ....:     ).alias("Previous Speed")
   ....: )
   ....: 
Out[10]: 
DataFrame()
+------------+-------+--------------------+
| Name       | Speed | Previous Speed     |
+------------+-------+--------------------+
| Slowpoke   | 15    | 15.0               |
| Jigglypuff | 20    | 17.5               |
| Geodude    | 20    | 18.333333333333332 |
| Paras      | 25    | 21.666666666666668 |
| Grimer     | 25    | 23.333333333333332 |
| Rhyhorn    | 25    | 25.0               |
| Snorlax    | 30    | 26.666666666666668 |
| Metapod    | 30    | 28.333333333333332 |
| Oddish     | 30    | 30.0               |
| Parasect   | 30    | 30.0               |
+------------+-------+--------------------+

Null Treatment

When using aggregate functions as window functions, it is often useful to specify how null values should be treated. In order to do this you need to use the builder function. In future releases we expect this to be simplified in the interface.

One common usage for handling nulls is the case where you want to find the last value up to the current row. In the following example we demonstrate how setting the null treatment to ignore nulls will fill in with the value of the most recent non-null row. To do this, we also will set the window frame so that we only process up to the current row.

In this example, we filter down to one specific type of Pokemon that does have some entries in it’s Type 2 column that are null.

In [11]: from datafusion.common import NullTreatment

In [12]: df.filter(col('"Type 1"') ==  lit("Bug")).select(
   ....:     '"Name"',
   ....:     '"Type 2"',
   ....:     f.window("last_value", [col('"Type 2"')])
   ....:         .window_frame(WindowFrame("rows", None, 0))
   ....:         .order_by(col('"Speed"'))
   ....:         .null_treatment(NullTreatment.IGNORE_NULLS)
   ....:         .build()
   ....:         .alias("last_wo_null"),
   ....:     f.window("last_value", [col('"Type 2"')])
   ....:         .window_frame(WindowFrame("rows", None, 0))
   ....:         .order_by(col('"Speed"'))
   ....:         .null_treatment(NullTreatment.RESPECT_NULLS)
   ....:         .build()
   ....:         .alias("last_with_null")
   ....: )
   ....: 
Out[12]: 
DataFrame()
+------------+--------+--------------+----------------+
| Name       | Type 2 | last_wo_null | last_with_null |
+------------+--------+--------------+----------------+
| Paras      | Grass  | Grass        | Grass          |
| Metapod    |        | Grass        |                |
| Parasect   | Grass  | Grass        | Grass          |
| Kakuna     | Poison | Poison       | Poison         |
| Caterpie   |        | Poison       |                |
| Venonat    | Poison | Poison       | Poison         |
| Weedle     | Poison | Poison       | Poison         |
| Butterfree | Flying | Flying       | Flying         |
| Beedrill   | Poison | Poison       | Poison         |
| Pinsir     |        | Poison       |                |
+------------+--------+--------------+----------------+

Aggregate Functions

You can use any Aggregation Function as a window function. Currently aggregate functions must use the deprecated datafusion.functions.window() API but this should be resolved in DataFusion 42.0 (Issue Link). Here is an example that shows how to compare each pokemons’s attack power with the average attack power in its "Type 1" using the datafusion.functions.avg() function.

In [13]: df.select(
   ....:     col('"Name"'),
   ....:     col('"Attack"'),
   ....:     col('"Type 1"'),
   ....:     f.window("avg", [col('"Attack"')])
   ....:         .partition_by(col('"Type 1"'))
   ....:         .build()
   ....:         .alias("Average Attack"),
   ....: )
   ....: 
Out[13]: 
DataFrame()
+---------------------+--------+--------+----------------+
| Name                | Attack | Type 1 | Average Attack |
+---------------------+--------+--------+----------------+
| Jynx                | 50     | Ice    | 67.5           |
| Articuno            | 85     | Ice    | 67.5           |
| Pidgey              | 45     | Normal | 70.625         |
| Pidgeotto           | 60     | Normal | 70.625         |
| Pidgeot             | 80     | Normal | 70.625         |
| PidgeotMega Pidgeot | 80     | Normal | 70.625         |
| Rattata             | 56     | Normal | 70.625         |
| Raticate            | 81     | Normal | 70.625         |
| Spearow             | 60     | Normal | 70.625         |
| Fearow              | 90     | Normal | 70.625         |
+---------------------+--------+--------+----------------+

Available Functions

The possible window functions are:

  1. Rank Functions
  2. Analytical Functions
  3. Aggregate Functions