datafusion

DataFusion python package.

This is a Python library that binds to Apache Arrow in-memory query engine DataFusion. See https://datafusion.apache.org/python for more information.

Submodules

Attributes

DFSchema

Classes

Accumulator

Defines how an AggregateUDF accumulates values.

AggregateUDF

Class for performing scalar user-defined functions (UDF).

Catalog

DataFusion data catalog.

Database

DataFusion Database.

ExecutionPlan

Represent nodes in the DataFusion Physical Plan.

Expr

Expression object.

LogicalPlan

Logical Plan.

RecordBatch

This class is essentially a wrapper for pyarrow.RecordBatch.

RecordBatchStream

This class represents a stream of record batches.

RuntimeConfig

Runtime configuration options.

SQLOptions

Options to be used when performing SQL queries.

ScalarUDF

Class for performing scalar user-defined functions (UDF).

SessionConfig

Session configuration options.

Table

DataFusion table.

WindowFrame

Defines a window frame for performing window operations.

WindowUDF

Class for performing window user-defined functions (UDF).

Functions

col(value)

Create a column expression.

column(value)

Create a column expression.

lit(value)

Create a literal expression.

literal(value)

Create a literal expression.

Package Contents

class datafusion.Accumulator

Defines how an AggregateUDF accumulates values.

abstract evaluate() pyarrow.Scalar

Return the resultant value.

abstract merge(states: List[pyarrow.Array]) None

Merge a set of states.

abstract state() List[pyarrow.Scalar]

Return the current state.

abstract update(*values: pyarrow.Array) None

Evaluate an array of values and update state.

class datafusion.AggregateUDF(name: str | None, accumulator: Callable[[], Accumulator], input_types: list[pyarrow.DataType], return_type: pyarrow.DataType, state_type: list[pyarrow.DataType], volatility: Volatility | str)

Class for performing scalar user-defined functions (UDF).

Aggregate UDFs operate on a group of rows and return a single value. See also ScalarUDF for operating on a row by row basis.

Instantiate a user-defined aggregate function (UDAF).

See udaf() for a convenience function and argument descriptions.

__call__(*args: datafusion.expr.Expr) datafusion.expr.Expr

Execute the UDAF.

This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.

static udaf(accum: Callable[[], Accumulator], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, state_type: list[pyarrow.DataType], volatility: Volatility | str, name: str | None = None) AggregateUDF

Create a new User-Defined Aggregate Function.

If your Accumulator can be instantiated with no arguments, you can simply pass it’s type as accum. If you need to pass additional arguments to it’s constructor, you can define a lambda or a factory method. During runtime the Accumulator will be constructed for every instance in which this UDAF is used. The following examples are all valid.

import pyarrow as pa
import pyarrow.compute as pc

class Summarize(Accumulator):
    def __init__(self, bias: float = 0.0):
        self._sum = pa.scalar(bias)

    def state(self) -> List[pa.Scalar]:
        return [self._sum]

    def update(self, values: pa.Array) -> None:
        self._sum = pa.scalar(self._sum.as_py() + pc.sum(values).as_py())

    def merge(self, states: List[pa.Array]) -> None:
        self._sum = pa.scalar(self._sum.as_py() + pc.sum(states[0]).as_py())

    def evaluate(self) -> pa.Scalar:
        return self._sum

def sum_bias_10() -> Summarize:
    return Summarize(10.0)

udaf1 = udaf(Summarize, pa.float64(), pa.float64(), [pa.float64()], "immutable")
udaf2 = udaf(sum_bias_10, pa.float64(), pa.float64(), [pa.float64()], "immutable")
udaf3 = udaf(lambda: Summarize(20.0), pa.float64(), pa.float64(), [pa.float64()], "immutable")
Parameters:
  • accum – The accumulator python function.

  • input_types – The data types of the arguments to accum.

  • return_type – The data type of the return value.

  • state_type – The data types of the intermediate accumulation.

  • volatility – See Volatility for allowed values.

  • name – A descriptive name for the function.

Returns:

A user-defined aggregate function, which can be used in either data aggregation or window function calls.

_udaf
class datafusion.Catalog(catalog: datafusion._internal.Catalog)

DataFusion data catalog.

This constructor is not typically called by the end user.

database(name: str = 'public') Database

Returns the database with the given name from this catalog.

names() list[str]

Returns the list of databases in this catalog.

catalog
class datafusion.Database(db: datafusion._internal.Database)

DataFusion Database.

This constructor is not typically called by the end user.

names() set[str]

Returns the list of all tables in this database.

table(name: str) Table

Return the table with the given name from this database.

db
class datafusion.ExecutionPlan(plan: datafusion._internal.ExecutionPlan)

Represent nodes in the DataFusion Physical Plan.

This constructor should not be called by the end user.

__repr__() str

Print a string representation of the physical plan.

children() List[ExecutionPlan]

Get a list of children ExecutionPlan that act as inputs to this plan.

The returned list will be empty for leaf nodes such as scans, will contain a single value for unary nodes, or two values for binary nodes (such as joins).

display() str

Print the physical plan.

display_indent() str

Print an indented form of the physical plan.

static from_proto(ctx: datafusion.context.SessionContext, data: bytes) ExecutionPlan

Create an ExecutionPlan from protobuf bytes.

Tables created in memory from record batches are currently not supported.

to_proto() bytes

Convert an ExecutionPlan into protobuf bytes.

Tables created in memory from record batches are currently not supported.

_raw_plan
property partition_count: int

Returns the number of partitions in the physical plan.

class datafusion.Expr(expr: datafusion._internal.expr.Expr)

Expression object.

Expressions are one of the core concepts in DataFusion. See Expressions in the online documentation for more information.

This constructor should not be called by the end user.

__add__(rhs: Any) Expr

Addition operator.

Accepts either an expression or any valid PyArrow scalar literal value.

__and__(rhs: Expr) Expr

Logical AND.

__eq__(rhs: Any) Expr

Equal to.

Accepts either an expression or any valid PyArrow scalar literal value.

__ge__(rhs: Any) Expr

Greater than or equal to.

Accepts either an expression or any valid PyArrow scalar literal value.

__getitem__(key: str | int) Expr

Retrieve sub-object.

If key is a string, returns the subfield of the struct. If key is an integer, retrieves the element in the array. Note that the element index begins at 0, unlike array_element which begins at 1.

__gt__(rhs: Any) Expr

Greater than.

Accepts either an expression or any valid PyArrow scalar literal value.

__invert__() Expr

Binary not (~).

__le__(rhs: Any) Expr

Less than or equal to.

Accepts either an expression or any valid PyArrow scalar literal value.

__lt__(rhs: Any) Expr

Less than.

Accepts either an expression or any valid PyArrow scalar literal value.

__mod__(rhs: Any) Expr

Modulo operator (%).

Accepts either an expression or any valid PyArrow scalar literal value.

__mul__(rhs: Any) Expr

Multiplication operator.

Accepts either an expression or any valid PyArrow scalar literal value.

__ne__(rhs: Any) Expr

Not equal to.

Accepts either an expression or any valid PyArrow scalar literal value.

__or__(rhs: Expr) Expr

Logical OR.

__repr__() str

Generate a string representation of this expression.

__richcmp__(other: Expr, op: int) Expr

Comparison operator.

__sub__(rhs: Any) Expr

Subtraction operator.

Accepts either an expression or any valid PyArrow scalar literal value.

__truediv__(rhs: Any) Expr

Division operator.

Accepts either an expression or any valid PyArrow scalar literal value.

alias(name: str) Expr

Assign a name to the expression.

between(low: Any, high: Any, negated: bool = False) Expr

Returns True if this expression is between a given range.

Parameters:
  • low – lower bound of the range (inclusive).

  • high – higher bound of the range (inclusive).

  • negated – negates whether the expression is between a given range

canonical_name() str

Returns a complete string representation of this expression.

cast(to: pyarrow.DataType[Any] | Type[float] | Type[int] | Type[str] | Type[bool]) Expr

Cast to a new data type.

static column(value: str) Expr

Creates a new expression representing a column.

column_name(plan: datafusion.plan.LogicalPlan) str

Compute the output column name based on the provided logical plan.

display_name() str

Returns the name of this expression as it should appear in a schema.

This name will not include any CAST expressions.

distinct() ExprFuncBuilder

Only evaluate distinct values for an aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

fill_nan(value: Any | Expr | None = None) Expr

Fill NaN values with a provided value.

fill_null(value: Any | Expr | None = None) Expr

Fill NULL values with a provided value.

filter(filter: Expr) ExprFuncBuilder

Filter an aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

is_not_null() Expr

Returns True if this expression is not null.

is_null() Expr

Returns True if this expression is null.

static literal(value: Any) Expr

Creates a new expression representing a scalar value.

value must be a valid PyArrow scalar value or easily castable to one.

null_treatment(null_treatment: datafusion.common.NullTreatment) ExprFuncBuilder

Set the treatment for null values for a window or aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

order_by(*exprs: Expr | SortExpr) ExprFuncBuilder

Set the ordering for a window or aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

over(window: Window) Expr

Turn an aggregate function into a window function.

This function turns any aggregate function into a window function. With the exception of partition_by, how each of the parameters is used is determined by the underlying aggregate function.

Parameters:

window – Window definition

partition_by(*partition_by: Expr) ExprFuncBuilder

Set the partitioning for a window function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

python_value() Any

Extracts the Expr value into a PyObject.

This is only valid for literal expressions.

Returns:

Python object representing literal value of the expression.

rex_call_operands() list[Expr]

Return the operands of the expression based on it’s variant type.

Row expressions, Rex(s), operate on the concept of operands. Different variants of Expressions, Expr(s), store those operands in different datastructures. This function examines the Expr variant and returns the operands to the calling logic.

rex_call_operator() str

Extracts the operator associated with a row expression type call.

rex_type() datafusion.common.RexType

Return the Rex Type of this expression.

A Rex (Row Expression) specifies a single row of data.That specification could include user defined functions or types. RexType identifies the row as one of the possible valid RexType.

schema_name() str

Returns the name of this expression as it should appear in a schema.

This name will not include any CAST expressions.

sort(ascending: bool = True, nulls_first: bool = True) SortExpr

Creates a sort Expr from an existing Expr.

Parameters:
  • ascending – If true, sort in ascending order.

  • nulls_first – Return null values first.

to_variant() Any

Convert this expression into a python object if possible.

types() datafusion.common.DataTypeMap

Return the DataTypeMap.

Returns:

DataTypeMap which represents the PythonType, Arrow DataType, and SqlType Enum which this expression represents.

variant_name() str

Returns the name of the Expr variant.

Ex: IsNotNull, Literal, BinaryExpr, etc

window_frame(window_frame: WindowFrame) ExprFuncBuilder

Set the frame fora window function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

__radd__
__rand__
__rmod__
__rmul__
__ror__
__rsub__
__rtruediv__
_to_pyarrow_types
expr
class datafusion.LogicalPlan(plan: datafusion._internal.LogicalPlan)

Logical Plan.

A LogicalPlan is a node in a tree of relational operators (such as Projection or Filter).

Represents transforming an input relation (table) to an output relation (table) with a potentially different schema. Plans form a dataflow tree where data flows from leaves up to the root to produce the query result.

A LogicalPlan can be created by the SQL query planner, the DataFrame API, or programmatically (for example custom query languages).

This constructor should not be called by the end user.

__repr__() str

Generate a printable representation of the plan.

display() str

Print the logical plan.

display_graphviz() str

Print the graph visualization of the logical plan.

Returns a format`able structure that produces lines meant for graphical display using the `DOT language. This format can be visualized using software from [graphviz](https://graphviz.org/)

display_indent() str

Print an indented form of the logical plan.

display_indent_schema() str

Print an indented form of the schema for the logical plan.

static from_proto(ctx: datafusion.context.SessionContext, data: bytes) LogicalPlan

Create a LogicalPlan from protobuf bytes.

Tables created in memory from record batches are currently not supported.

inputs() List[LogicalPlan]

Returns the list of inputs to the logical plan.

to_proto() bytes

Convert a LogicalPlan to protobuf bytes.

Tables created in memory from record batches are currently not supported.

to_variant() Any

Convert the logical plan into its specific variant.

_raw_plan
class datafusion.RecordBatch(record_batch: datafusion._internal.RecordBatch)

This class is essentially a wrapper for pyarrow.RecordBatch.

This constructor is generally not called by the end user.

See the RecordBatchStream iterator for generating this class.

to_pyarrow() pyarrow.RecordBatch

Convert to pyarrow.RecordBatch.

record_batch
class datafusion.RecordBatchStream(record_batch_stream: datafusion._internal.RecordBatchStream)

This class represents a stream of record batches.

These are typically the result of a execute_stream() operation.

This constructor is typically not called by the end user.

__iter__() typing_extensions.Self

Iterator function.

__next__() RecordBatch

Iterator function.

next() RecordBatch | None

See __next__() for the iterator function.

rbs
class datafusion.RuntimeConfig

Runtime configuration options.

Create a new RuntimeConfig with default values.

with_disk_manager_disabled() RuntimeConfig

Disable the disk manager, attempts to create temporary files will error.

Returns:

A new RuntimeConfig object with the updated setting.

with_disk_manager_os() RuntimeConfig

Use the operating system’s temporary directory for disk manager.

Returns:

A new RuntimeConfig object with the updated setting.

with_disk_manager_specified(*paths: str | pathlib.Path) RuntimeConfig

Use the specified paths for the disk manager’s temporary files.

Parameters:

paths – Paths to use for the disk manager’s temporary files.

Returns:

A new RuntimeConfig object with the updated setting.

with_fair_spill_pool(size: int) RuntimeConfig

Use a fair spill pool with the specified size.

This pool works best when you know beforehand the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even when there was sufficient memory (reserved for other operators) to avoid doing so:

┌───────────────────────z──────────────────────z───────────────┐
│                       z                      z               │
│                       z                      z               │
│       Spillable       z       Unspillable    z     Free      │
│        Memory         z        Memory        z    Memory     │
│                       z                      z               │
│                       z                      z               │
└───────────────────────z──────────────────────z───────────────┘
Parameters:

size – Size of the memory pool in bytes.

Returns:

A new RuntimeConfig object with the updated setting.

Examples usage:

config = RuntimeConfig().with_fair_spill_pool(1024)
with_greedy_memory_pool(size: int) RuntimeConfig

Use a greedy memory pool with the specified size.

This pool works well for queries that do not need to spill or have a single spillable operator. See with_fair_spill_pool() if there are multiple spillable operators that all will spill.

Parameters:

size – Size of the memory pool in bytes.

Returns:

A new RuntimeConfig object with the updated setting.

Example usage:

config = RuntimeConfig().with_greedy_memory_pool(1024)
with_temp_file_path(path: str | pathlib.Path) RuntimeConfig

Use the specified path to create any needed temporary files.

Parameters:

path – Path to use for temporary files.

Returns:

A new RuntimeConfig object with the updated setting.

Example usage:

config = RuntimeConfig().with_temp_file_path("/tmp")
with_unbounded_memory_pool() RuntimeConfig

Use an unbounded memory pool.

Returns:

A new RuntimeConfig object with the updated setting.

config_internal
class datafusion.SQLOptions

Options to be used when performing SQL queries.

Create a new SQLOptions with default values.

The default values are: - DDL commands are allowed - DML commands are allowed - Statements are allowed

with_allow_ddl(allow: bool = True) SQLOptions

Should DDL (Data Definition Language) commands be run?

Examples of DDL commands include CREATE TABLE and DROP TABLE.

Parameters:

allow – Allow DDL commands to be run.

Returns:

A new SQLOptions object with the updated setting.

Example usage:

options = SQLOptions().with_allow_ddl(True)
with_allow_dml(allow: bool = True) SQLOptions

Should DML (Data Manipulation Language) commands be run?

Examples of DML commands include INSERT INTO and DELETE.

Parameters:

allow – Allow DML commands to be run.

Returns:

A new SQLOptions object with the updated setting.

Example usage:

options = SQLOptions().with_allow_dml(True)
with_allow_statements(allow: bool = True) SQLOptions

Should statements such as SET VARIABLE and BEGIN TRANSACTION be run?

Parameters:

allow – Allow statements to be run.

Returns:

py:class:SQLOptions` object with the updated setting.

Return type:

A new

Example usage:

options = SQLOptions().with_allow_statements(True)
options_internal
class datafusion.ScalarUDF(name: str | None, func: Callable[Ellipsis, _R], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: _R, volatility: Volatility | str)

Class for performing scalar user-defined functions (UDF).

Scalar UDFs operate on a row by row basis. See also AggregateUDF for operating on a group of rows.

Instantiate a scalar user-defined function (UDF).

See helper method udf() for argument details.

__call__(*args: datafusion.expr.Expr) datafusion.expr.Expr

Execute the UDF.

This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.

static udf(func: Callable[Ellipsis, _R], input_types: list[pyarrow.DataType], return_type: _R, volatility: Volatility | str, name: str | None = None) ScalarUDF

Create a new User-Defined Function.

Parameters:
  • func – A callable python function.

  • input_types – The data types of the arguments to func. This list must be of the same length as the number of arguments.

  • return_type – The data type of the return value from the python function.

  • volatility – See Volatility for allowed values.

  • name – A descriptive name for the function.

Returns:

A user-defined aggregate function, which can be used in either data

aggregation or window function calls.

_udf
class datafusion.SessionConfig(config_options: dict[str, str] | None = None)

Session configuration options.

Create a new SessionConfig with the given configuration options.

Parameters:

config_options – Configuration options.

set(key: str, value: str) SessionConfig

Set a configuration option.

Args: key: Option key. value: Option value.

Returns:

A new SessionConfig object with the updated setting.

with_batch_size(batch_size: int) SessionConfig

Customize batch size.

Parameters:

batch_size – Batch size.

Returns:

A new SessionConfig object with the updated setting.

with_create_default_catalog_and_schema(enabled: bool = True) SessionConfig

Control if the default catalog and schema will be automatically created.

Parameters:

enabled – Whether the default catalog and schema will be automatically created.

Returns:

A new SessionConfig object with the updated setting.

with_default_catalog_and_schema(catalog: str, schema: str) SessionConfig

Select a name for the default catalog and schema.

Parameters:
  • catalog – Catalog name.

  • schema – Schema name.

Returns:

A new SessionConfig object with the updated setting.

with_information_schema(enabled: bool = True) SessionConfig

Enable or disable the inclusion of information_schema virtual tables.

Parameters:

enabled – Whether to include information_schema virtual tables.

Returns:

A new SessionConfig object with the updated setting.

with_parquet_pruning(enabled: bool = True) SessionConfig

Enable or disable the use of pruning predicate for parquet readers.

Pruning predicates will enable the reader to skip row groups.

Parameters:

enabled – Whether to use pruning predicate for parquet readers.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_aggregations(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for aggregations.

Enabling this improves parallelism.

Parameters:

enabled – Whether to use repartitioning for aggregations.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_file_min_size(size: int) SessionConfig

Set minimum file range size for repartitioning scans.

Parameters:

size – Minimum file range size.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_file_scans(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for file scans.

Parameters:

enabled – Whether to use repartitioning for file scans.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_joins(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for joins to improve parallelism.

Parameters:

enabled – Whether to use repartitioning for joins.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_sorts(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for window functions.

This may improve parallelism.

Parameters:

enabled – Whether to use repartitioning for window functions.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_windows(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for window functions.

This may improve parallelism.

Parameters:

enabled – Whether to use repartitioning for window functions.

Returns:

A new SessionConfig object with the updated setting.

with_target_partitions(target_partitions: int) SessionConfig

Customize the number of target partitions for query execution.

Increasing partitions can increase concurrency.

Parameters:

target_partitions – Number of target partitions.

Returns:

A new SessionConfig object with the updated setting.

config_internal
class datafusion.Table(table: datafusion._internal.Table)

DataFusion table.

This constructor is not typically called by the end user.

schema() pyarrow.Schema

Returns the schema associated with this table.

property kind: str

Returns the kind of table.

table
class datafusion.WindowFrame(units: str, start_bound: Any | None, end_bound: Any | None)

Defines a window frame for performing window operations.

Construct a window frame using the given parameters.

Parameters:
  • units – Should be one of rows, range, or groups.

  • start_bound – Sets the preceding bound. Must be >= 0. If none, this will be set to unbounded. If unit type is groups, this parameter must be set.

  • end_bound – Sets the following bound. Must be >= 0. If none, this will be set to unbounded. If unit type is groups, this parameter must be set.

get_frame_units() str

Returns the window frame units for the bounds.

get_lower_bound() WindowFrameBound

Returns starting bound.

get_upper_bound()

Returns end bound.

window_frame
class datafusion.WindowUDF(name: str | None, func: Callable[[], WindowEvaluator], input_types: list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str)

Class for performing window user-defined functions (UDF).

Window UDFs operate on a partition of rows. See also ScalarUDF for operating on a row by row basis.

Instantiate a user-defined window function (UDWF).

See udwf() for a convenience function and argument descriptions.

__call__(*args: datafusion.expr.Expr) datafusion.expr.Expr

Execute the UDWF.

This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.

static udwf(func: Callable[[], WindowEvaluator], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) WindowUDF

Create a new User-Defined Window Function.

If your WindowEvaluator can be instantiated with no arguments, you can simply pass it’s type as func. If you need to pass additional arguments to it’s constructor, you can define a lambda or a factory method. During runtime the WindowEvaluator will be constructed for every instance in which this UDWF is used. The following examples are all valid.

import pyarrow as pa

class BiasedNumbers(WindowEvaluator):
    def __init__(self, start: int = 0) -> None:
        self.start = start

    def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array:
        return pa.array([self.start + i for i in range(num_rows)])

def bias_10() -> BiasedNumbers:
    return BiasedNumbers(10)

udwf1 = udwf(BiasedNumbers, pa.int64(), pa.int64(), "immutable")
udwf2 = udwf(bias_10, pa.int64(), pa.int64(), "immutable")
udwf3 = udwf(lambda: BiasedNumbers(20), pa.int64(), pa.int64(), "immutable")
Parameters:
  • func – A callable to create the window function.

  • input_types – The data types of the arguments to func.

  • return_type – The data type of the return value.

  • volatility – See Volatility for allowed values.

  • arguments – A list of arguments to pass in to the __init__ method for accum.

  • name – A descriptive name for the function.

Returns:

A user-defined window function.

_udwf
datafusion.col(value: str)

Create a column expression.

datafusion.column(value: str)

Create a column expression.

datafusion.lit(value)

Create a literal expression.

datafusion.literal(value)

Create a literal expression.

datafusion.DFSchema