datafusion.udf¶
Provides the user-defined functions for evaluation of dataframes.
Attributes¶
Classes¶
Defines how an |
|
Class for performing scalar user-defined functions (UDF). |
|
Class for performing scalar user-defined functions (UDF). |
|
Defines how stable or volatile a function is. |
|
Evaluator class for user-defined window functions (UDWF). |
|
Class for performing window user-defined functions (UDF). |
Module Contents¶
- class datafusion.udf.Accumulator¶
Defines how an
AggregateUDF
accumulates values.- abstract evaluate() pyarrow.Scalar ¶
Return the resultant value.
- abstract merge(states: list[pyarrow.Array]) None ¶
Merge a set of states.
- abstract state() list[pyarrow.Scalar] ¶
Return the current state.
- abstract update(*values: pyarrow.Array) None ¶
Evaluate an array of values and update state.
- class datafusion.udf.AggregateUDF(name: str, accumulator: Callable[[], Accumulator], input_types: list[pyarrow.DataType], return_type: pyarrow.DataType, state_type: list[pyarrow.DataType], volatility: Volatility | str)¶
Class for performing scalar user-defined functions (UDF).
Aggregate UDFs operate on a group of rows and return a single value. See also
ScalarUDF
for operating on a row by row basis.Instantiate a user-defined aggregate function (UDAF).
See
udaf()
for a convenience function and argument descriptions.- __call__(*args: datafusion.expr.Expr) datafusion.expr.Expr ¶
Execute the UDAF.
This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.
- static udaf(input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, state_type: list[pyarrow.DataType], volatility: Volatility | str, name: str | None = None) Callable[Ellipsis, AggregateUDF] ¶
- static udaf(accum: Callable[[], Accumulator], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, state_type: list[pyarrow.DataType], volatility: Volatility | str, name: str | None = None) AggregateUDF
Create a new User-Defined Aggregate Function (UDAF).
This class allows you to define an aggregate function that can be used in data aggregation or window function calls.
- Usage:
- As a function: Call `udaf(accum, input_types, return_type, state_type,
volatility, name)`.
- As a decorator: Use `@udaf(input_types, return_type, state_type,
volatility, name)`.
When using udaf as a decorator, do not pass `accum` explicitly.
Function example:
If your :py:class:Accumulator can be instantiated with no arguments, you can simply pass it’s type as accum. If you need to pass additional arguments to it’s constructor, you can define a lambda or a factory method. During runtime the :py:class:Accumulator will be constructed for every instance in which this UDAF is used. The following examples are all valid. ``` import pyarrow as pa import pyarrow.compute as pc
- class Summarize(Accumulator):
- def __init__(self, bias: float = 0.0):
self._sum = pa.scalar(bias)
- def state(self) -> list[pa.Scalar]:
return [self._sum]
- def update(self, values: pa.Array) -> None:
self._sum = pa.scalar(self._sum.as_py() + pc.sum(values).as_py())
- def merge(self, states: list[pa.Array]) -> None:
self._sum = pa.scalar(self._sum.as_py() + pc.sum(states[0]).as_py())
- def evaluate(self) -> pa.Scalar:
return self._sum
- def sum_bias_10() -> Summarize:
return Summarize(10.0)
- udaf1 = udaf(Summarize, pa.float64(), pa.float64(), [pa.float64()],
“immutable”)
- udaf2 = udaf(sum_bias_10, pa.float64(), pa.float64(), [pa.float64()],
“immutable”)
- udaf3 = udaf(lambda: Summarize(20.0), pa.float64(), pa.float64(),
[pa.float64()], “immutable”)
- Decorator example:
``` @udaf(pa.float64(), pa.float64(), [pa.float64()], “immutable”) def udf4() -> Summarize:
return Summarize(10.0)
- Parameters:
accum – The accumulator python function. Only needed when calling as a function. Skip this argument when using `udaf` as a decorator.
input_types – The data types of the arguments to
accum
.return_type – The data type of the return value.
state_type – The data types of the intermediate accumulation.
volatility – See
Volatility
for allowed values.name – A descriptive name for the function.
- Returns:
A user-defined aggregate function, which can be used in either data aggregation or window function calls.
- _udaf¶
- class datafusion.udf.ScalarUDF(name: str, func: Callable[Ellipsis, _R], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: _R, volatility: Volatility | str)¶
Class for performing scalar user-defined functions (UDF).
Scalar UDFs operate on a row by row basis. See also
AggregateUDF
for operating on a group of rows.Instantiate a scalar user-defined function (UDF).
See helper method
udf()
for argument details.- __call__(*args: datafusion.expr.Expr) datafusion.expr.Expr ¶
Execute the UDF.
This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.
- static udf(input_types: list[pyarrow.DataType], return_type: _R, volatility: Volatility | str, name: str | None = None) Callable[Ellipsis, ScalarUDF] ¶
- static udf(func: Callable[Ellipsis, _R], input_types: list[pyarrow.DataType], return_type: _R, volatility: Volatility | str, name: str | None = None) ScalarUDF
Create a new User-Defined Function (UDF).
This class can be used both as a function and as a decorator.
- Usage:
As a function: Call udf(func, input_types, return_type, volatility, name).
As a decorator: Use @udf(input_types, return_type, volatility, name). In this case, do not pass func explicitly.
- Parameters:
func (Callable, optional) – Only needed when calling as a function. Skip this argument when using udf as a decorator.
input_types (list[pa.DataType]) – The data types of the arguments to func. This list must be of the same length as the number of arguments.
return_type (_R) – The data type of the return value from the function.
volatility (Volatility | str) – See Volatility for allowed values.
name (Optional[str]) – A descriptive name for the function.
- Returns:
A user-defined function that can be used in SQL expressions, data aggregation, or window function calls.
Example
Using `udf` as a function: ``` def double_func(x):
return x * 2
double_udf = udf(double_func, [pa.int32()], pa.int32(), “volatile”, “double_it”) ```
Using `udf` as a decorator: ``` @udf([pa.int32()], pa.int32(), “volatile”, “double_it”) def double_udf(x):
return x * 2
- _udf¶
- class datafusion.udf.Volatility(*args, **kwds)¶
Bases:
enum.Enum
Defines how stable or volatile a function is.
When setting the volatility of a function, you can either pass this enumeration or a
str
. Thestr
equivalent is the lower case value of the name (“immutable”, “stable”, or “volatile”).- __str__() str ¶
Returns the string equivalent.
- Immutable = 1¶
An immutable function will always return the same output when given the same input.
DataFusion will attempt to inline immutable functions during planning.
- Stable = 2¶
Returns the same value for a given input within a single queries.
A stable function may return different values given the same input across different queries but must return the same value for a given input within a query. An example of this is the
Now
function. DataFusion will attempt to inlineStable
functions during planning, when possible. For queryselect col1, now() from t1
, it might take a while to execute butnow()
column will be the same for each output row, which is evaluated during planning.
- Volatile = 3¶
A volatile function may change the return value from evaluation to evaluation.
Multiple invocations of a volatile function may return different results when used in the same query. An example of this is the random() function. DataFusion can not evaluate such functions during planning. In the query
select col1, random() from t1
,random()
function will be evaluated for each output row, resulting in a unique random value for each row.
- class datafusion.udf.WindowEvaluator¶
Evaluator class for user-defined window functions (UDWF).
It is up to the user to decide which evaluate function is appropriate.
uses_window_frame
supports_bounded_execution
include_rank
function_to_implement
False (default)
False (default)
False (default)
evaluate_all
False
True
False
evaluate
False
True/False
True
evaluate_all_with_rank
True
True/False
True/False
evaluate
- evaluate(values: list[pyarrow.Array], eval_range: tuple[int, int]) pyarrow.Scalar ¶
Evaluate window function on a range of rows in an input partition.
This is the simplest and most general function to implement but also the least performant as it creates output one row at a time. It is typically much faster to implement stateful evaluation using one of the other specialized methods on this trait.
Returns a [ScalarValue] that is the value of the window function within range for the entire partition. Argument values contains the evaluation result of function arguments and evaluation results of ORDER BY expressions. If function has a single argument, values[1..] will contain ORDER BY expression results.
- evaluate_all(values: list[pyarrow.Array], num_rows: int) pyarrow.Array ¶
Evaluate a window function on an entire input partition.
This function is called once per input partition for window functions that do not use values from the window frame, such as
row_number()
,rank()
,dense_rank()
,percent_rank()
,cume_dist()
,lead()
, andlag()
.It produces the result of all rows in a single pass. It expects to receive the entire partition as the
value
and must produce an output column with one output row for every input row.num_rows
is required to correctly compute the output in caselen(values) == 0
Implementing this function is an optimization. Certain window functions are not affected by the window frame definition or the query doesn’t have a frame, and
evaluate
skips the (costly) window frame boundary calculation and the overhead of callingevaluate
for each output row.For example, the LAG built in window function does not use the values of its window frame (it can be computed in one shot on the entire partition with
Self::evaluate_all
regardless of the window defined in theOVER
clause)lag(x, 1) OVER (ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
However,
avg()
computes the average in the window and thus does use its window frame.avg(x) OVER (PARTITION BY y ORDER BY z ROWS BETWEEN 2 PRECEDING AND 3 FOLLOWING)
- evaluate_all_with_rank(num_rows: int, ranks_in_partition: list[tuple[int, int]]) pyarrow.Array ¶
Called for window functions that only need the rank of a row.
Evaluate the partition evaluator against the partition using the row ranks. For example,
rank(col("a"))
producesa | rank - + ---- A | 1 A | 1 C | 3 D | 4 D | 4
For this case, num_rows would be 5 and the ranks_in_partition would be called with
[ (0,1), (2,2), (3,4), ]
The user must implement this method if
include_rank
returns True.
- get_range(idx: int, num_rows: int) tuple[int, int] ¶
Return the range for the window fuction.
If uses_window_frame flag is false. This method is used to calculate required range for the window function during stateful execution.
Generally there is no required range, hence by default this returns smallest range(current row). e.g seeing current row is enough to calculate window result (such as row_number, rank, etc)
- Parameters:
idx: – Current index:
num_rows – Number of rows.
- include_rank() bool ¶
Can this function be evaluated with (only) rank?
- is_causal() bool ¶
Get whether evaluator needs future data for its result.
- memoize() None ¶
Perform a memoize operation to improve performance.
When the window frame has a fixed beginning (e.g UNBOUNDED PRECEDING), some functions such as FIRST_VALUE and NTH_VALUE do not need the (unbounded) input once they have seen a certain amount of input.
memoize is called after each input batch is processed, and such functions can save whatever they need
- supports_bounded_execution() bool ¶
Can the window function be incrementally computed using bounded memory?
- uses_window_frame() bool ¶
Does the window function use the values from the window frame?
- class datafusion.udf.WindowUDF(name: str, func: Callable[[], WindowEvaluator], input_types: list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str)¶
Class for performing window user-defined functions (UDF).
Window UDFs operate on a partition of rows. See also
ScalarUDF
for operating on a row by row basis.Instantiate a user-defined window function (UDWF).
See
udwf()
for a convenience function and argument descriptions.- __call__(*args: datafusion.expr.Expr) datafusion.expr.Expr ¶
Execute the UDWF.
This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.
- static _create_window_udf(func: Callable[[], WindowEvaluator], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) WindowUDF ¶
Create a WindowUDF instance from function arguments.
- static _create_window_udf_decorator(input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) Callable[[Callable[[], WindowEvaluator]], Callable[Ellipsis, datafusion.expr.Expr]] ¶
Create a decorator for a WindowUDF.
- static _get_default_name(func: Callable) str ¶
Get the default name for a function based on its attributes.
- static _normalize_input_types(input_types: pyarrow.DataType | list[pyarrow.DataType]) list[pyarrow.DataType] ¶
Convert a single DataType to a list if needed.
- static udwf(input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) Callable[Ellipsis, WindowUDF] ¶
- static udwf(func: Callable[[], WindowEvaluator], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) WindowUDF
Create a new User-Defined Window Function (UDWF).
This class can be used both as a function and as a decorator.
- Usage:
As a function: Call udwf(func, input_types, return_type, volatility, name).
As a decorator: Use @udwf(input_types, return_type, volatility, name). When using udwf as a decorator, do not pass `func` explicitly.
- Function example:
``` import pyarrow as pa
- class BiasedNumbers(WindowEvaluator):
- def __init__(self, start: int = 0) -> None:
self.start = start
- def evaluate_all(self, values: list[pa.Array],
num_rows: int) -> pa.Array: return pa.array([self.start + i for i in range(num_rows)])
- def bias_10() -> BiasedNumbers:
return BiasedNumbers(10)
udwf1 = udwf(BiasedNumbers, pa.int64(), pa.int64(), “immutable”) udwf2 = udwf(bias_10, pa.int64(), pa.int64(), “immutable”) udwf3 = udwf(lambda: BiasedNumbers(20), pa.int64(), pa.int64(), “immutable”)
- Decorator example:
``` @udwf(pa.int64(), pa.int64(), “immutable”) def biased_numbers() -> BiasedNumbers:
return BiasedNumbers(10)
- Parameters:
func – Only needed when calling as a function. Skip this argument when using `udwf` as a decorator.
input_types – The data types of the arguments.
return_type – The data type of the return value.
volatility – See
Volatility
for allowed values.name – A descriptive name for the function.
- Returns:
A user-defined window function that can be used in window function calls.
- _udwf¶
- datafusion.udf._R¶
- datafusion.udf.udaf¶
- datafusion.udf.udf¶
- datafusion.udf.udwf¶