Window Functions¶
In this section you will learn about window functions. A window function utilizes values from one or multiple rows to produce a result for each individual row, unlike an aggregate function that provides a single value for multiple rows.
The window functions are availble in the functions
module.
We’ll use the pokemon dataset (from Ritchie Vink) in the following examples.
In [1]: from datafusion import SessionContext
In [2]: from datafusion import col
In [3]: from datafusion import functions as f
In [4]: ctx = SessionContext()
In [5]: df = ctx.read_csv("pokemon.csv")
Here is an example that shows how you can compare each pokemon’s speed to the speed of the previous row in the DataFrame.
In [6]: df.select(
...: col('"Name"'),
...: col('"Speed"'),
...: f.lag(col('"Speed"')).alias("Previous Speed")
...: )
...:
Out[6]:
DataFrame()
+---------------------------+-------+----------------+
| Name | Speed | Previous Speed |
+---------------------------+-------+----------------+
| Bulbasaur | 45 | |
| Ivysaur | 60 | 45 |
| Venusaur | 80 | 60 |
| VenusaurMega Venusaur | 80 | 80 |
| Charmander | 65 | 80 |
| Charmeleon | 80 | 65 |
| Charizard | 100 | 80 |
| CharizardMega Charizard X | 100 | 100 |
| CharizardMega Charizard Y | 100 | 100 |
| Squirtle | 43 | 100 |
+---------------------------+-------+----------------+
Setting Parameters¶
Ordering¶
You can control the order in which rows are processed by window functions by providing
a list of order_by
functions for the order_by
parameter.
In [7]: df.select(
...: col('"Name"'),
...: col('"Attack"'),
...: col('"Type 1"'),
...: f.rank(
...: partition_by=[col('"Type 1"')],
...: order_by=[col('"Attack"').sort(ascending=True)],
...: ).alias("rank"),
...: ).sort(col('"Type 1"'), col('"Attack"'))
...:
Out[7]:
DataFrame()
+------------+--------+--------+------+
| Name | Attack | Type 1 | rank |
+------------+--------+--------+------+
| Metapod | 20 | Bug | 1 |
| Kakuna | 25 | Bug | 2 |
| Caterpie | 30 | Bug | 3 |
| Weedle | 35 | Bug | 4 |
| Butterfree | 45 | Bug | 5 |
| Venonat | 55 | Bug | 6 |
| Venomoth | 65 | Bug | 7 |
| Paras | 70 | Bug | 8 |
| Beedrill | 90 | Bug | 9 |
| Parasect | 95 | Bug | 10 |
+------------+--------+--------+------+
Partitions¶
A window function can take a list of partition_by
columns similar to an
Aggregation Function. This will cause the window values to be evaluated
independently for each of the partitions. In the example above, we found the rank of each
Pokemon per Type 1
partitions. We can see the first couple of each partition if we do
the following:
In [8]: df.select(
...: col('"Name"'),
...: col('"Attack"'),
...: col('"Type 1"'),
...: f.rank(
...: partition_by=[col('"Type 1"')],
...: order_by=[col('"Attack"').sort(ascending=True)],
...: ).alias("rank"),
...: ).filter(col("rank") < lit(3)).sort(col('"Type 1"'), col("rank"))
...:
Out[8]:
DataFrame()
+-----------+--------+----------+------+
| Name | Attack | Type 1 | rank |
+-----------+--------+----------+------+
| Metapod | 20 | Bug | 1 |
| Kakuna | 25 | Bug | 2 |
| Dratini | 64 | Dragon | 1 |
| Dragonair | 84 | Dragon | 2 |
| Voltorb | 30 | Electric | 1 |
| Magnemite | 35 | Electric | 2 |
| Clefairy | 45 | Fairy | 1 |
| Clefable | 70 | Fairy | 2 |
| Machop | 80 | Fighting | 1 |
| Mankey | 80 | Fighting | 1 |
+-----------+--------+----------+------+
Window Frame¶
When using aggregate functions, the Window Frame of defines the rows over which it operates. If you do not specify a Window Frame, the frame will be set depending on the following criteria.
If an
order_by
clause is set, the default window frame is defined as the rows between unbounded preceeding and the current row.If an
order_by
is not set, the default frame is defined as the rows betwene unbounded and unbounded following (the entire partition).
Window Frames are defined by three parameters: unit type, starting bound, and ending bound.
The unit types available are:
Rows: The starting and ending boundaries are defined by the number of rows relative to the current row.
Range: When using Range, the
order_by
clause must have exactly one term. The boundaries are defined bow how close the rows are to the value of the expression in theorder_by
parameter.Groups: A “group” is the set of all rows that have equivalent values for all terms in the
order_by
clause.
In this example we perform a “rolling average” of the speed of the current Pokemon and the two preceeding rows.
In [9]: from datafusion.expr import WindowFrame
In [10]: df.select(
....: col('"Name"'),
....: col('"Speed"'),
....: f.window("avg",
....: [col('"Speed"')],
....: order_by=[col('"Speed"')],
....: window_frame=WindowFrame("rows", 2, 0)
....: ).alias("Previous Speed")
....: )
....:
Out[10]:
DataFrame()
+------------+-------+--------------------+
| Name | Speed | Previous Speed |
+------------+-------+--------------------+
| Slowpoke | 15 | 15.0 |
| Jigglypuff | 20 | 17.5 |
| Geodude | 20 | 18.333333333333332 |
| Paras | 25 | 21.666666666666668 |
| Grimer | 25 | 23.333333333333332 |
| Rhyhorn | 25 | 25.0 |
| Snorlax | 30 | 26.666666666666668 |
| Metapod | 30 | 28.333333333333332 |
| Oddish | 30 | 30.0 |
| Parasect | 30 | 30.0 |
+------------+-------+--------------------+
Null Treatment¶
When using aggregate functions as window functions, it is often useful to specify how null values should be treated. In order to do this you need to use the builder function. In future releases we expect this to be simplified in the interface.
One common usage for handling nulls is the case where you want to find the last value up to the current row. In the following example we demonstrate how setting the null treatment to ignore nulls will fill in with the value of the most recent non-null row. To do this, we also will set the window frame so that we only process up to the current row.
In this example, we filter down to one specific type of Pokemon that does have some entries in
it’s Type 2
column that are null.
In [11]: from datafusion.common import NullTreatment
In [12]: df.filter(col('"Type 1"') == lit("Bug")).select(
....: '"Name"',
....: '"Type 2"',
....: f.window("last_value", [col('"Type 2"')])
....: .window_frame(WindowFrame("rows", None, 0))
....: .order_by(col('"Speed"'))
....: .null_treatment(NullTreatment.IGNORE_NULLS)
....: .build()
....: .alias("last_wo_null"),
....: f.window("last_value", [col('"Type 2"')])
....: .window_frame(WindowFrame("rows", None, 0))
....: .order_by(col('"Speed"'))
....: .null_treatment(NullTreatment.RESPECT_NULLS)
....: .build()
....: .alias("last_with_null")
....: )
....:
Out[12]:
DataFrame()
+------------+--------+--------------+----------------+
| Name | Type 2 | last_wo_null | last_with_null |
+------------+--------+--------------+----------------+
| Paras | Grass | Grass | Grass |
| Metapod | | Grass | |
| Parasect | Grass | Grass | Grass |
| Kakuna | Poison | Poison | Poison |
| Caterpie | | Poison | |
| Venonat | Poison | Poison | Poison |
| Weedle | Poison | Poison | Poison |
| Butterfree | Flying | Flying | Flying |
| Beedrill | Poison | Poison | Poison |
| Pinsir | | Poison | |
+------------+--------+--------------+----------------+
Aggregate Functions¶
You can use any Aggregation Function as a window function. Currently
aggregate functions must use the deprecated
datafusion.functions.window()
API but this should be resolved in
DataFusion 42.0 (Issue Link). Here
is an example that shows how to compare each pokemons’s attack power with the average attack
power in its "Type 1"
using the datafusion.functions.avg()
function.
In [13]: df.select(
....: col('"Name"'),
....: col('"Attack"'),
....: col('"Type 1"'),
....: f.window("avg", [col('"Attack"')])
....: .partition_by(col('"Type 1"'))
....: .build()
....: .alias("Average Attack"),
....: )
....:
Out[13]:
DataFrame()
+---------------------+--------+--------+----------------+
| Name | Attack | Type 1 | Average Attack |
+---------------------+--------+--------+----------------+
| Jynx | 50 | Ice | 67.5 |
| Articuno | 85 | Ice | 67.5 |
| Pidgey | 45 | Normal | 70.625 |
| Pidgeotto | 60 | Normal | 70.625 |
| Pidgeot | 80 | Normal | 70.625 |
| PidgeotMega Pidgeot | 80 | Normal | 70.625 |
| Rattata | 56 | Normal | 70.625 |
| Raticate | 81 | Normal | 70.625 |
| Spearow | 60 | Normal | 70.625 |
| Fearow | 90 | Normal | 70.625 |
+---------------------+--------+--------+----------------+
Available Functions¶
The possible window functions are:
- Aggregate Functions
All Aggregation Functions can be used as window functions.