datafusion#

DataFusion: an in-process query engine built on Apache Arrow.

DataFusion is not a database – it has no server and no external dependencies. You create a SessionContext, point it at data sources (Parquet, CSV, JSON, Arrow IPC, Pandas, Polars, or raw Python dicts/lists), and run queries using either SQL or the DataFrame API.

Core abstractions#

  • SessionContext – entry point for loading data, running SQL, and creating DataFrames.

  • DataFrame – lazy query builder. Every method returns a new DataFrame; call collect() or a to_* method to execute.

  • Expr – expression tree node for column references, literals, and function calls. Build with col() and lit().

  • functions – 290+ built-in scalar, aggregate, and window functions.

Quick start#

>>> from datafusion import SessionContext, col
>>> from datafusion import functions as F
>>> ctx = SessionContext()
>>> df = ctx.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> result = (
...     df.filter(col("a") > 1)
...       .with_column("total", col("a") + col("b"))
...       .aggregate([], [F.sum(col("total")).alias("grand_total")])
... )
>>> result.to_pydict()
{'grand_total': [16]}

User guide and full documentation: https://datafusion.apache.org/python

AI agent reference (SQL-to-DataFrame mappings, expression-building patterns, common pitfalls), written in a dense, skill-oriented format: apache/datafusion-python

Submodules#

Attributes#

Classes#

Accumulator

Defines how an AggregateUDF accumulates values.

AggregateUDF

Class for performing scalar user-defined functions (UDF).

Catalog

DataFusion data catalog.

CsvReadOptions

Options for reading CSV files.

DataFrameWriteOptions

Writer options for DataFrame.

ExecutionPlan

Represent nodes in the DataFusion Physical Plan.

ExplainFormat

Output format for explain plans.

Expr

Expression object.

InsertOp

Insert operation mode.

LogicalPlan

Logical Plan.

Metric

A single execution metric with name, value, partition, and labels.

MetricsSet

A set of metrics for a single execution plan operator.

ParquetColumnOptions

Parquet options for individual columns.

ParquetWriterOptions

Advanced parquet writer options.

RecordBatch

This class is essentially a wrapper for pa.RecordBatch.

RecordBatchStream

This class represents a stream of record batches.

RuntimeEnvBuilder

Runtime configuration options.

SQLOptions

Options to be used when performing SQL queries.

ScalarUDF

Class for performing scalar user-defined functions (UDF).

SessionConfig

Session configuration options.

Table

A DataFusion table.

TableFunction

Class for performing user-defined table functions (UDTF).

TableProviderFactory

Abstract class for defining a Python based Table Provider Factory.

TableProviderFactoryExportable

Type hint for object that has __datafusion_table_provider_factory__ PyCapsule.

WindowFrame

Defines a window frame for performing window operations.

WindowUDF

Class for performing window user-defined functions (UDF).

Functions#

configure_formatter(→ None)

Configure the global DataFrame HTML formatter.

lit(→ expr.Expr)

Create a literal expression.

literal(→ expr.Expr)

Create a literal expression.

read_avro(→ datafusion.dataframe.DataFrame)

Create a DataFrame for reading Avro data source.

read_csv(→ datafusion.dataframe.DataFrame)

Read a CSV data source.

read_json(→ datafusion.dataframe.DataFrame)

Read a line-delimited JSON data source.

read_parquet(→ datafusion.dataframe.DataFrame)

Read a Parquet source into a Dataframe.

Package Contents#

class datafusion.Accumulator#

Defines how an AggregateUDF accumulates values.

abstract evaluate() pyarrow.Scalar#

Return the resultant value.

While this function template expects a PyArrow Scalar value return type, you can return any value that can be converted into a Scalar. This includes basic Python data types such as integers and strings. In addition to primitive types, we currently support PyArrow, nanoarrow, and arro3 objects in addition to primitive data types. Other objects that support the Arrow FFI standard will be given a “best attempt” at conversion to scalar objects.

abstract merge(states: list[pyarrow.Array]) None#

Merge a set of states.

abstract state() list[pyarrow.Scalar]#

Return the current state.

While this function template expects PyArrow Scalar values return type, you can return any value that can be converted into a Scalar. This includes basic Python data types such as integers and strings. In addition to primitive types, we currently support PyArrow, nanoarrow, and arro3 objects in addition to primitive data types. Other objects that support the Arrow FFI standard will be given a “best attempt” at conversion to scalar objects.

abstract update(*values: pyarrow.Array) None#

Evaluate an array of values and update state.

class datafusion.AggregateUDF(name: str, accumulator: collections.abc.Callable[[], Accumulator], input_types: list[pyarrow.DataType], return_type: pyarrow.DataType, state_type: list[pyarrow.DataType], volatility: Volatility | str)#
class datafusion.AggregateUDF(name: str, accumulator: AggregateUDFExportable, input_types: None = ..., return_type: None = ..., state_type: None = ..., volatility: None = ...)

Class for performing scalar user-defined functions (UDF).

Aggregate UDFs operate on a group of rows and return a single value. See also ScalarUDF for operating on a row by row basis.

Instantiate a user-defined aggregate function (UDAF).

See udaf() for a convenience function and argument descriptions.

__call__(*args: datafusion.expr.Expr) datafusion.expr.Expr#

Execute the UDAF.

This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.

__repr__() str#

Print a string representation of the Aggregate UDF.

classmethod _from_internal(internal: datafusion._internal.AggregateUDF) AggregateUDF#

Wrap an already-constructed internal AggregateUDF handle.

Used by SessionContext.udaf() to surface a function looked up from the session’s function registry without re-running __init__().

static from_pycapsule(func: AggregateUDFExportable | _typeshed.CapsuleType) AggregateUDF#

Create an Aggregate UDF from AggregateUDF PyCapsule object.

This function will instantiate a Aggregate UDF that uses a DataFusion AggregateUDF that is exported via the FFI bindings.

static udaf(input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, state_type: list[pyarrow.DataType], volatility: Volatility | str, name: str | None = None) collections.abc.Callable[Ellipsis, AggregateUDF]#
static udaf(accum: collections.abc.Callable[[], Accumulator], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, state_type: list[pyarrow.DataType], volatility: Volatility | str, name: str | None = None) AggregateUDF
static udaf(accum: AggregateUDFExportable) AggregateUDF
static udaf(accum: _typeshed.CapsuleType) AggregateUDF

Create a new User-Defined Aggregate Function (UDAF).

This class allows you to define an aggregate function that can be used in data aggregation or window function calls.

Usage:
  • As a function: udaf(accum, input_types, return_type, state_type, volatility, name).

  • As a decorator: @udaf(input_types, return_type, state_type, volatility, name). When using udaf as a decorator, do not pass accum explicitly.

If your Accumulator can be instantiated with no arguments, you can simply pass its type as accum. If you need to pass additional arguments to its constructor, you can define a lambda or a factory method. During runtime the Accumulator will be constructed for every instance in which this UDAF is used.

Examples

>>> import pyarrow.compute as pc
>>> from datafusion.user_defined import AggregateUDF, Accumulator, udaf
>>> class Summarize(Accumulator):
...     def __init__(self, bias: float = 0.0):
...         self._sum = pa.scalar(bias)
...     def state(self):
...         return [self._sum]
...     def update(self, values):
...         self._sum = pa.scalar(
...             self._sum.as_py() + pc.sum(values).as_py())
...     def merge(self, states):
...         self._sum = pa.scalar(
...             self._sum.as_py() + pc.sum(states[0]).as_py())
...     def evaluate(self):
...         return self._sum

Using udaf as a function:

>>> udaf1 = AggregateUDF.udaf(
...     Summarize, pa.float64(), pa.float64(),
...     [pa.float64()], "immutable")

Wrapping udaf with a function:

>>> def sum_bias_10() -> Summarize:
...     return Summarize(10.0)
>>> udaf2 = udaf(sum_bias_10, pa.float64(), pa.float64(), [pa.float64()],
...     "immutable")

Using udaf with lambda:

>>> udaf3 = udaf(lambda: Summarize(20.0), pa.float64(), pa.float64(),
...     [pa.float64()], "immutable")

Using udaf as a decorator:

>>> @AggregateUDF.udaf(
...     pa.float64(), pa.float64(),
...     [pa.float64()], "immutable")
... def udaf4():
...     return Summarize(10.0)

Apply to a dataframe:

>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1.0, 2.0, 3.0]})
>>> df.aggregate([], [udaf1(col("a")).alias("total")]).collect_column(
...     "total")[0].as_py()
6.0
>>> df.aggregate([], [udaf2(col("a")).alias("total")]).collect_column(
...     "total")[0].as_py()
16.0
>>> df.aggregate([], [udaf3(col("a")).alias("total")]).collect_column(
...     "total")[0].as_py()
26.0
>>> df.aggregate([], [udaf4(col("a")).alias("total")]).collect_column(
...     "total")[0].as_py()
16.0
Parameters:
  • accum – The accumulator python function. Only needed when calling as a function. Skip this argument when using udaf as a decorator. If you have a Rust backed AggregateUDF within a PyCapsule, you can pass this parameter and ignore the rest. They will be determined directly from the underlying function. See the online documentation for more information.

  • input_types – The data types of the arguments to accum.

  • return_type – The data type of the return value.

  • state_type – The data types of the intermediate accumulation.

  • volatility – See Volatility for allowed values.

  • name – A descriptive name for the function.

Returns:

A user-defined aggregate function, which can be used in either data aggregation or window function calls.

_udaf#
property name: str#

Return the registered name of this UDAF.

For UDAFs imported via the FFI capsule protocol, this is the name the capsule itself reports — not the name argument passed to the constructor (which is ignored on the FFI path).

class datafusion.Catalog(catalog: datafusion._internal.catalog.RawCatalog)#

DataFusion data catalog.

This constructor is not typically called by the end user.

__repr__() str#

Print a string representation of the catalog.

deregister_schema(name: str, cascade: bool = True) Schema | None#

Deregister a schema from this catalog.

static memory_catalog(ctx: datafusion.SessionContext | None = None) Catalog#

Create an in-memory catalog provider.

names() set[str]#

This is an alias for schema_names.

register_schema(name: str, schema: Schema | SchemaProvider | SchemaProviderExportable) Schema | None#

Register a schema with this catalog.

schema(name: str = 'public') Schema#

Returns the database with the given name from this catalog.

schema_names() set[str]#

Returns the list of schemas in this catalog.

catalog#
class datafusion.CsvReadOptions(*, has_header: bool = True, delimiter: str = ',', quote: str = '"', terminator: str | None = None, escape: str | None = None, comment: str | None = None, newlines_in_values: bool = False, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = DEFAULT_MAX_INFER_SCHEMA, file_extension: str = '.csv', table_partition_cols: list[tuple[str, pyarrow.DataType]] | None = None, file_compression_type: str = '', file_sort_order: list[list[datafusion.expr.SortExpr]] | None = None, null_regex: str | None = None, truncated_rows: bool = False)#

Options for reading CSV files.

This class provides a builder pattern for configuring CSV reading options. All methods starting with with_ return self to allow method chaining.

Initialize CsvReadOptions.

Parameters:
  • has_header – Does the CSV file have a header row? If schema inference is run on a file with no headers, default column names are created.

  • delimiter – Column delimiter character. Must be a single ASCII character.

  • quote – Quote character for fields containing delimiters or newlines. Must be a single ASCII character.

  • terminator – Optional line terminator character. If None, uses CRLF. Must be a single ASCII character.

  • escape – Optional escape character for quotes. Must be a single ASCII character.

  • comment – If specified, lines beginning with this character are ignored. Must be a single ASCII character.

  • newlines_in_values – Whether newlines in quoted values are supported. Parsing newlines in quoted values may be affected by execution behavior such as parallel file scanning. Setting this to True ensures that newlines in values are parsed successfully, which may reduce performance.

  • schema – Optional PyArrow schema representing the CSV files. If None, the CSV reader will try to infer it based on data in the file.

  • schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.

  • file_extension – File extension; only files with this extension are selected for data input.

  • table_partition_cols – Partition columns as a list of tuples of (column_name, data_type).

  • file_compression_type – File compression type. Supported values are "gzip", "bz2", "xz", "zstd", or empty string for uncompressed.

  • file_sort_order – Optional sort order of the files as a list of sort expressions per file.

  • null_regex – Optional regex pattern to match null values in the CSV.

  • truncated_rows – Whether to allow truncated rows when parsing. By default this is False and will error if the CSV rows have different lengths. When set to True, it will allow records with less than the expected number of columns and fill the missing columns with nulls. If the record’s schema is not nullable, it will still return an error.

to_inner() datafusion._internal.options.CsvReadOptions#

Convert this object into the underlying Rust structure.

This is intended for internal use only.

with_comment(comment: str | None) CsvReadOptions#

Configure the comment character.

with_delimiter(delimiter: str) CsvReadOptions#

Configure the column delimiter.

with_escape(escape: str | None) CsvReadOptions#

Configure the escape character.

with_file_compression_type(file_compression_type: str) CsvReadOptions#

Configure file compression type.

with_file_extension(file_extension: str) CsvReadOptions#

Configure the file extension filter.

with_file_sort_order(file_sort_order: list[list[datafusion.expr.SortExpr]]) CsvReadOptions#

Configure file sort order.

with_has_header(has_header: bool) CsvReadOptions#

Configure whether the CSV has a header row.

with_newlines_in_values(newlines_in_values: bool) CsvReadOptions#

Configure whether newlines in values are supported.

with_null_regex(null_regex: str | None) CsvReadOptions#

Configure null value regex pattern.

with_quote(quote: str) CsvReadOptions#

Configure the quote character.

with_schema(schema: pyarrow.Schema | None) CsvReadOptions#

Configure the schema.

with_schema_infer_max_records(schema_infer_max_records: int) CsvReadOptions#

Configure maximum records for schema inference.

with_table_partition_cols(table_partition_cols: list[tuple[str, pyarrow.DataType]]) CsvReadOptions#

Configure table partition columns.

with_terminator(terminator: str | None) CsvReadOptions#

Configure the line terminator character.

with_truncated_rows(truncated_rows: bool) CsvReadOptions#

Configure whether to allow truncated rows.

comment = None#
delimiter = ','#
escape = None#
file_compression_type = ''#
file_extension = '.csv'#
file_sort_order = []#
has_header = True#
newlines_in_values = False#
null_regex = None#
quote = '"'#
schema = None#
schema_infer_max_records = 1000#
table_partition_cols = []#
terminator = None#
truncated_rows = False#
class datafusion.DataFrameWriteOptions(insert_operation: InsertOp | None = None, single_file_output: bool = False, partition_by: str | collections.abc.Sequence[str] | None = None, sort_by: datafusion.expr.Expr | datafusion.expr.SortExpr | collections.abc.Sequence[datafusion.expr.Expr] | collections.abc.Sequence[datafusion.expr.SortExpr] | None = None)#

Writer options for DataFrame.

There is no guarantee the table provider supports all writer options. See the individual implementation and documentation for details.

Instantiate writer options for DataFrame.

_raw_write_options#
class datafusion.ExecutionPlan(plan: datafusion._internal.ExecutionPlan)#

Represent nodes in the DataFusion Physical Plan.

This constructor should not be called by the end user.

__repr__() str#

Print a string representation of the physical plan.

children() list[ExecutionPlan]#

Get a list of children ExecutionPlan that act as inputs to this plan.

The returned list will be empty for leaf nodes such as scans, will contain a single value for unary nodes, or two values for binary nodes (such as joins).

collect_metrics() list[tuple[str, MetricsSet]]#

Return runtime statistics for each step of the query execution.

DataFusion executes a query as a pipeline of operators — for example a data source scan, followed by a filter, followed by a projection. After the DataFrame has been executed (via collect(), execute_stream(), etc.), each operator records statistics such as how many rows it produced and how much CPU time it consumed.

Each entry in the returned list corresponds to one operator that recorded metrics. The first element of the tuple is the operator’s description string — the same text shown by display_indent() — which identifies both the operator type and its key parameters, for example "FilterExec: column1@0 > 1" or "DataSourceExec: partitions=1".

Returns:

A list of (description, MetricsSet) tuples ordered from the outermost operator (top of the execution tree) down to the data-source leaves. Only operators that recorded at least one metric are included. Returns an empty list if called before the DataFrame has been executed.

display() str#

Print the physical plan.

display_indent() str#

Print an indented form of the physical plan.

static from_bytes(ctx: datafusion.context.SessionContext, data: bytes) ExecutionPlan#

Create an ExecutionPlan from serialized protobuf bytes.

Decoding routes through the session’s installed PhysicalExtensionCodec. Tables created in memory from record batches are currently not supported.

static from_proto(ctx: datafusion.context.SessionContext, data: bytes) ExecutionPlan#

Deprecated alias for from_bytes().

metrics() MetricsSet | None#

Return metrics for this plan node, or None if this plan has no MetricsSet.

Some operators (e.g. DataSourceExec) eagerly initialize a MetricsSet when the plan is created, so this may return a set even before execution. Metric values (such as output_rows) are only meaningful after the DataFrame has been executed.

to_bytes(ctx: datafusion.context.SessionContext | None = None) bytes#

Convert an ExecutionPlan into serialized protobuf bytes.

When ctx is supplied, encoding routes through the session’s installed PhysicalExtensionCodec. Tables created in memory from record batches are currently not supported.

to_proto() bytes#

Deprecated alias for to_bytes().

_raw_plan#
property partition_count: int#

Returns the number of partitions in the physical plan.

class datafusion.ExplainFormat#

Bases: enum.Enum

Output format for explain plans.

Controls how the query plan is rendered in DataFrame.explain().

GRAPHVIZ = 'graphviz'#

Graphviz DOT format for graph rendering.

INDENT = 'indent'#

Default indented text format.

PGJSON = 'pgjson'#

PostgreSQL-compatible JSON format for use with visualization tools.

TREE = 'tree'#

Tree-style visual format with box-drawing characters.

class datafusion.Expr(expr: datafusion._internal.expr.RawExpr)#

Expression object.

Expressions are one of the core concepts in DataFusion. See Expressions in the online documentation for more information.

This constructor should not be called by the end user.

__add__(rhs: Any) Expr#

Addition operator.

Accepts either an expression or any valid PyArrow scalar literal value.

__and__(rhs: Expr) Expr#

Logical AND.

__eq__(rhs: object) Expr#

Equal to.

Accepts either an expression or any valid PyArrow scalar literal value.

__ge__(rhs: Any) Expr#

Greater than or equal to.

Accepts either an expression or any valid PyArrow scalar literal value.

__getitem__(key: str | int) Expr#

Retrieve sub-object.

If key is a string, returns the subfield of the struct. If key is an integer, retrieves the element in the array. Note that the element index begins at 0, unlike array_element() which begins at 1. If key is a slice, returns an array that contains a slice of the original array. Similar to integer indexing, this follows Python convention where the index begins at 0 unlike array_slice() which begins at 1.

__gt__(rhs: Any) Expr#

Greater than.

Accepts either an expression or any valid PyArrow scalar literal value.

__invert__() Expr#

Binary not (~).

__le__(rhs: Any) Expr#

Less than or equal to.

Accepts either an expression or any valid PyArrow scalar literal value.

__lt__(rhs: Any) Expr#

Less than.

Accepts either an expression or any valid PyArrow scalar literal value.

__mod__(rhs: Any) Expr#

Modulo operator (%).

Accepts either an expression or any valid PyArrow scalar literal value.

__mul__(rhs: Any) Expr#

Multiplication operator.

Accepts either an expression or any valid PyArrow scalar literal value.

__ne__(rhs: object) Expr#

Not equal to.

Accepts either an expression or any valid PyArrow scalar literal value.

__or__(rhs: Expr) Expr#

Logical OR.

__reduce__() tuple[collections.abc.Callable[[bytes], Expr], tuple[bytes]]#

Pickle protocol hook.

Lets expressions be shipped to worker processes via pickle.dumps() / pickle.loads(). Built-in functions and Python UDFs (scalar, aggregate, window) travel inside the pickle bytes; only FFI-capsule UDFs require pre-registration on the worker. The worker’s SessionContext for resolving those references is looked up via datafusion.ipc.set_worker_ctx(), falling back to the global SessionContext if none has been installed on the worker.

Warning

Security pickle.loads() on the returned tuple executes arbitrary Python on the receiver, including any cloudpickled UDF callable embedded in the payload. Only unpickle expressions from trusted sources.

Warning

Portability Sender and receiver must run the same Python (major, minor) version; cloudpickle bytecode is not portable across minor versions. See to_bytes() for details on what travels by value vs. by reference.

Examples

>>> import pickle
>>> from datafusion import col, lit
>>> e = col("a") * lit(2)
>>> pickle.loads(pickle.dumps(e)).canonical_name()
'a * Int64(2)'

The encoding side honors a driver-side sender context installed via datafusion.ipc.set_sender_ctx() — that is how SessionContext.with_python_udf_inlining() propagates through pickle.dumps. The sender context is read by __reduce__, so copy.copy() and copy.deepcopy() — which also go through __reduce__ — pick it up too.

__repr__() str#

Generate a string representation of this expression.

__richcmp__(other: Expr, op: int) Expr#

Comparison operator.

__sub__(rhs: Any) Expr#

Subtraction operator.

Accepts either an expression or any valid PyArrow scalar literal value.

__truediv__(rhs: Any) Expr#

Division operator.

Accepts either an expression or any valid PyArrow scalar literal value.

classmethod _reconstruct(proto_bytes: bytes) Expr#

Internal entry point used by __reduce__() on unpickle.

Examples

>>> from datafusion import Expr, col, lit
>>> blob = (col("a") + lit(1)).to_bytes()
>>> Expr._reconstruct(blob).canonical_name()
'a + Int64(1)'
abs() Expr#

Return the absolute value of a given number.

Returns:#

Expr

A new expression representing the absolute value of the input expression.

acos() Expr#

Returns the arc cosine or inverse cosine of a number.

Returns:#

Expr

A new expression representing the arc cosine of the input expression.

acosh() Expr#

Returns inverse hyperbolic cosine.

alias(name: str, metadata: dict[str, str] | None = None) Expr#

Assign a name to the expression.

Parameters:
  • name – The name to assign to the expression.

  • metadata – Optional metadata to attach to the expression.

Returns:

A new expression with the assigned name.

array_dims() Expr#

Returns an array of the array’s dimensions.

array_distinct() Expr#

Returns distinct values from the array after removing duplicates.

array_empty() Expr#

Returns a boolean indicating whether the array is empty.

array_length() Expr#

Returns the length of the array.

array_ndims() Expr#

Returns the number of dimensions of the array.

array_pop_back() Expr#

Returns the array without the last element.

array_pop_front() Expr#

Returns the array without the first element.

arrow_typeof() Expr#

Returns the Arrow type of the expression.

ascii() Expr#

Returns the numeric code of the first character of the argument.

asin() Expr#

Returns the arc sine or inverse sine of a number.

asinh() Expr#

Returns inverse hyperbolic sine.

atan() Expr#

Returns inverse tangent of a number.

atanh() Expr#

Returns inverse hyperbolic tangent.

between(low: Any, high: Any, negated: bool = False) Expr#

Returns True if this expression is between a given range.

Parameters:
  • low – lower bound of the range (inclusive).

  • high – higher bound of the range (inclusive).

  • negated – negates whether the expression is between a given range

bit_length() Expr#

Returns the number of bits in the string argument.

btrim() Expr#

Removes all characters, spaces by default, from both sides of a string.

canonical_name() str#

Returns a complete string representation of this expression.

cardinality() Expr#

Returns the total number of elements in the array.

cast(to: pyarrow.DataType[Any] | type) Expr#

Cast to a new data type.

cbrt() Expr#

Returns the cube root of a number.

ceil() Expr#

Returns the nearest integer greater than or equal to argument.

char_length() Expr#

The number of characters in the string.

character_length() Expr#

Returns the number of characters in the argument.

chr() Expr#

Converts the Unicode code point to a UTF8 character.

static column(value: str) Expr#

Creates a new expression representing a column.

column_name(plan: datafusion.plan.LogicalPlan) str#

Compute the output column name based on the provided logical plan.

cos() Expr#

Returns the cosine of the argument.

cosh() Expr#

Returns the hyperbolic cosine of the argument.

cot() Expr#

Returns the cotangent of the argument.

degrees() Expr#

Converts the argument from radians to degrees.

distinct() ExprFuncBuilder#

Only evaluate distinct values for an aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

empty() Expr#

This is an alias for array_empty().

exp() Expr#

Returns the exponential of the argument.

factorial() Expr#

Returns the factorial of the argument.

fill_nan(value: Any | Expr | None = None) Expr#

Fill NaN values with a provided value.

fill_null(value: Any | Expr | None = None) Expr#

Fill NULL values with a provided value.

filter(filter: Expr) ExprFuncBuilder#

Filter an aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

flatten() Expr#

Flattens an array of arrays into a single array.

floor() Expr#

Returns the nearest integer less than or equal to the argument.

classmethod from_bytes(buf: bytes, ctx: datafusion.context.SessionContext | None = None) Expr#

Reconstruct an expression from serialized bytes.

Accepts output of to_bytes() or pickle.dumps(). ctx is the SessionContext used to resolve any function references that travel by name (e.g. FFI UDFs, or Python UDFs sent with inlining disabled via SessionContext.with_python_udf_inlining()). When ctx is None the worker context installed via datafusion.ipc.set_worker_ctx() is consulted; if no worker context is installed, the global SessionContext is used (sufficient for built-ins and Python UDFs, plus any UDFs registered on the global context).

Warning

Security Decoding may invoke cloudpickle.loads on bytes embedded in the payload, which executes arbitrary Python code. Treat buf as code, not data — only decode bytes you produced yourself or received from a trusted sender.

Warning

Portability cloudpickle payloads are not portable across Python minor versions. The wire format stamps the sender’s (major, minor); if it does not match the current interpreter, this method raises ValueError naming both versions. Modules the UDF imports must also be importable on the receiver — see to_bytes() for by-value vs. by-reference details.

Examples

>>> from datafusion import Expr, col, lit
>>> blob = (col("a") + lit(1)).to_bytes()
>>> Expr.from_bytes(blob).canonical_name()
'a + Int64(1)'
from_unixtime() Expr#

Converts an integer to RFC3339 timestamp format string.

initcap() Expr#

Set the initial letter of each word to capital.

Converts the first letter of each word in string to uppercase and the remaining characters to lowercase.

is_not_null() Expr#

Returns True if this expression is not null.

is_null() Expr#

Returns True if this expression is null.

isnan() Expr#

Returns true if a given number is +NaN or -NaN otherwise returns false.

iszero() Expr#

Returns true if a given number is +0.0 or -0.0 otherwise returns false.

length() Expr#

The number of characters in the string.

list_dims() Expr#

Returns an array of the array’s dimensions.

This is an alias for array_dims().

list_distinct() Expr#

Returns distinct values from the array after removing duplicates.

This is an alias for array_distinct().

list_length() Expr#

Returns the length of the array.

This is an alias for array_length().

list_ndims() Expr#

Returns the number of dimensions of the array.

This is an alias for array_ndims().

static literal(value: Any) Expr#

Creates a new expression representing a scalar value.

value must be a valid PyArrow scalar value or easily castable to one.

static literal_with_metadata(value: Any, metadata: dict[str, str]) Expr#

Creates a new expression representing a scalar value with metadata.

Parameters:
  • value – A valid PyArrow scalar value or easily castable to one.

  • metadata – Metadata to attach to the expression.

ln() Expr#

Returns the natural logarithm (base e) of the argument.

log10() Expr#

Base 10 logarithm of the argument.

log2() Expr#

Base 2 logarithm of the argument.

lower() Expr#

Converts a string to lowercase.

ltrim() Expr#

Removes all characters, spaces by default, from the beginning of a string.

md5() Expr#

Computes an MD5 128-bit checksum for a string expression.

null_treatment(null_treatment: datafusion.common.NullTreatment) ExprFuncBuilder#

Set the treatment for null values for a window or aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

octet_length() Expr#

Returns the number of bytes of a string.

order_by(*exprs: Expr | SortExpr) ExprFuncBuilder#

Set the ordering for a window or aggregate function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

over(window: Window) Expr#

Turn an aggregate function into a window function.

This function turns any aggregate function into a window function. With the exception of partition_by, how each of the parameters is used is determined by the underlying aggregate function.

Parameters:

window – Window definition

partition_by(*partition_by: Expr) ExprFuncBuilder#

Set the partitioning for a window function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

python_value() Any#

Extracts the Expr value into Any.

This is only valid for literal expressions.

Returns:

Python object representing literal value of the expression.

radians() Expr#

Converts the argument from degrees to radians.

reverse() Expr#

Reverse the string argument.

rex_call_operands() list[Expr]#

Return the operands of the expression based on it’s variant type.

Row expressions, Rex(s), operate on the concept of operands. Different variants of Expressions, Expr(s), store those operands in different datastructures. This function examines the Expr variant and returns the operands to the calling logic.

rex_call_operator() str#

Extracts the operator associated with a row expression type call.

rex_type() datafusion.common.RexType#

Return the Rex Type of this expression.

A Rex (Row Expression) specifies a single row of data.That specification could include user defined functions or types. RexType identifies the row as one of the possible valid RexType.

rtrim() Expr#

Removes all characters, spaces by default, from the end of a string.

schema_name() str#

Returns the name of this expression as it should appear in a schema.

This name will not include any CAST expressions.

sha224() Expr#

Computes the SHA-224 hash of a binary string.

sha256() Expr#

Computes the SHA-256 hash of a binary string.

sha384() Expr#

Computes the SHA-384 hash of a binary string.

sha512() Expr#

Computes the SHA-512 hash of a binary string.

signum() Expr#

Returns the sign of the argument (-1, 0, +1).

sin() Expr#

Returns the sine of the argument.

sinh() Expr#

Returns the hyperbolic sine of the argument.

sort(ascending: bool = True, nulls_first: bool = True) SortExpr#

Creates a sort Expr from an existing Expr.

Parameters:
  • ascending – If true, sort in ascending order.

  • nulls_first – Return null values first.

sqrt() Expr#

Returns the square root of the argument.

static string_literal(value: str) Expr#

Creates a new expression representing a UTF8 literal value.

It is different from literal because it is pa.string() instead of pa.string_view()

This is needed for cases where DataFusion is expecting a UTF8 instead of UTF8View literal, like in: apache/datafusion

tan() Expr#

Returns the tangent of the argument.

tanh() Expr#

Returns the hyperbolic tangent of the argument.

to_bytes(ctx: datafusion.context.SessionContext | None = None) bytes#

Serialize this expression to bytes for shipping to another process.

Use this — or pickle.dumps() — to send an expression to a worker process for distributed evaluation.

When ctx is supplied, encoding routes through that session’s installed LogicalExtensionCodec (so settings like SessionContext.with_python_udf_inlining() take effect). When ctx is None, the default codec is used (Python UDF inlining on, no user-installed extension codec).

Built-in functions travel inside the returned bytes. Python UDFs (scalar, aggregate, window) also inline by default, so the worker does not need to pre-register them; when the encoding session has SessionContext.with_python_udf_inlining() set to False, Python UDFs travel by name only and must be registered on the worker. UDFs imported via the FFI capsule protocol always travel by name only and must be registered on the worker.

Warning

Security Bytes returned here may embed a cloudpickled Python callable (when the expression carries a Python UDF). Reconstructing them via from_bytes() or pickle.loads() executes arbitrary Python on the receiver. Only accept payloads from trusted sources.

Warning

Portability cloudpickle serializes Python bytecode, which is not stable across Python minor versions. A payload produced on Python 3.11 will fail to load on Python 3.12. The wire format stamps the sender’s (major, minor); from_bytes() raises a ValueError naming both versions on mismatch.

cloudpickle captures the UDF callable by value — bytecode and closure cells inlined — but names the callable resolves via import are captured by reference (module path only) and must be importable on the receiver.

Self-contained — works anywhere:

# Lambda: bytecode captured inline
udf(lambda x: x * 2, [pa.int64()], pa.int64(),
    volatility="immutable")

# Locally-defined function: bytecode captured inline
def double(x):
    return x * 2
udf(double, [pa.int64()], pa.int64(), volatility="immutable")

# Closure over a local variable: value captured inline
factor = 3
udf(lambda x: x * factor, [pa.int64()], pa.int64(),
    volatility="immutable")

Requires matching environment on receiver:

# Top-level import: `foo` must be installed on receiver
from foo import double
udf(double, [pa.int64()], pa.int64(), volatility="immutable")

# Bound method of an imported class: same caveat
from mylib import Transformer
t = Transformer()
udf(t.transform, [pa.int64()], pa.int64(),
    volatility="immutable")

Examples

>>> from datafusion import col, lit
>>> blob = (col("a") + lit(1)).to_bytes()
>>> isinstance(blob, bytes)
True
to_hex() Expr#

Converts an integer to a hexadecimal string.

to_variant() Any#

Convert this expression into a python object if possible.

trim() Expr#

Removes all characters, spaces by default, from both sides of a string.

try_cast(to: pyarrow.DataType[Any] | type) Expr#

Cast to a new data type, returning NULL on failure.

Like cast() but produces NULL instead of erroring when the cast cannot be performed for a given row.

Examples

>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": ["oops"]})
>>> result = df.select(col("a").try_cast(pa.float64()).alias("c"))
>>> result.collect_column("c")[0].as_py() is None
True
types() datafusion.common.DataTypeMap#

Return the DataTypeMap.

Returns:

DataTypeMap which represents the PythonType, Arrow DataType, and SqlType Enum which this expression represents.

upper() Expr#

Converts a string to uppercase.

variant_name() str#

Returns the name of the Expr variant.

Ex: IsNotNull, Literal, BinaryExpr, etc

window_frame(window_frame: WindowFrame) ExprFuncBuilder#

Set the frame fora window function.

This function will create an ExprFuncBuilder that can be used to set parameters for either window or aggregate functions. If used on any other type of expression, an error will be generated when build() is called.

__radd__#
__rand__#
__rmod__#
__rmul__#
__ror__#
__rsub__#
__rtruediv__#
_to_pyarrow_types: ClassVar[dict[type, pyarrow.DataType]]#
expr#
class datafusion.InsertOp#

Bases: enum.Enum

Insert operation mode.

These modes are used by the table writing feature to define how record batches should be written to a table.

APPEND#

Appends new rows to the existing table without modifying any existing rows.

OVERWRITE#

Overwrites all existing rows in the table with the new rows.

REPLACE#

Replace existing rows that collide with the inserted rows.

Replacement is typically based on a unique key or primary key.

class datafusion.LogicalPlan(plan: datafusion._internal.LogicalPlan)#

Logical Plan.

A LogicalPlan is a node in a tree of relational operators (such as Projection or Filter).

Represents transforming an input relation (table) to an output relation (table) with a potentially different schema. Plans form a dataflow tree where data flows from leaves up to the root to produce the query result.

A LogicalPlan can be created by the SQL query planner, the DataFrame API, or programmatically (for example custom query languages).

This constructor should not be called by the end user.

__eq__(other: LogicalPlan) bool#

Test equality.

__repr__() str#

Generate a printable representation of the plan.

display() str#

Print the logical plan.

display_graphviz() str#

Print the graph visualization of the logical plan.

Returns a format`able structure that produces lines meant for graphical display using the `DOT language. This format can be visualized using software from [graphviz](https://graphviz.org/)

display_indent() str#

Print an indented form of the logical plan.

display_indent_schema() str#

Print an indented form of the schema for the logical plan.

static from_bytes(ctx: datafusion.context.SessionContext, data: bytes) LogicalPlan#

Create a LogicalPlan from serialized protobuf bytes.

Decoding routes through the session’s installed LogicalExtensionCodec. Tables created in memory from record batches are currently not supported.

static from_proto(ctx: datafusion.context.SessionContext, data: bytes) LogicalPlan#

Deprecated alias for from_bytes().

inputs() list[LogicalPlan]#

Returns the list of inputs to the logical plan.

to_bytes(ctx: datafusion.context.SessionContext | None = None) bytes#

Convert a LogicalPlan to serialized protobuf bytes.

When ctx is supplied, encoding routes through the session’s installed LogicalExtensionCodec so user FFI codecs (registered via SessionContext.with_logical_extension_codec()) see the encode path. With ctx=None a default codec is used. Tables created in memory from record batches are currently not supported.

to_proto() bytes#

Deprecated alias for to_bytes().

to_variant() Any#

Convert the logical plan into its specific variant.

_raw_plan#
class datafusion.Metric(raw: datafusion._internal.Metric)#

A single execution metric with name, value, partition, and labels.

This constructor should not be called by the end user.

__repr__() str#

Return a string representation of the metric.

labels() dict[str, str]#

Return the labels associated with this metric.

Labels provide additional context for a metric. For example:

metric.labels()
# {'output_type': 'final'}
_raw#
property name: str#

The name of this metric (e.g. output_rows).

property partition: int | None#

The 0-based partition index this metric applies to.

Returns None for metrics that are not partition-specific (i.e. they apply globally across all partitions of the operator).

property value: int | datetime.datetime | None#

The value of this metric.

Returns an int for counters, gauges, and time-based metrics (nanoseconds), a datetime (UTC) for start_timestamp / end_timestamp metrics, or None when the value has not been set or is not representable.

property value_as_datetime: datetime.datetime | None#

The value as a UTC datetime for timestamp metrics.

Returns None for all non-timestamp metrics and for timestamp metrics whose value has not been set (e.g. before execution).

class datafusion.MetricsSet(raw: datafusion._internal.MetricsSet)#

A set of metrics for a single execution plan operator.

A physical plan operator runs independently across one or more partitions. metrics() returns the raw per-partition Metric objects. The convenience properties (output_rows, elapsed_compute, etc.) automatically sum the named metric across all partitions, giving a single aggregate value for the operator as a whole.

This constructor should not be called by the end user.

__repr__() str#

Return a string representation of the metrics set.

metrics() list[Metric]#

Return all individual metrics in this set.

sum_by_name(name: str) int | None#

Sum the named metric across all partitions.

Useful for accessing any metric not exposed as a first-class property. Returns None if no metric with the given name was recorded.

Parameters:

name – The metric name, e.g. "output_rows" or "elapsed_compute".

_raw#
property elapsed_compute: int | None#

Total CPU time (in nanoseconds) spent inside this operator’s execute loop.

Summed across all partitions. Returns None if no elapsed_compute metric was recorded.

property output_rows: int | None#

Sum of output_rows across all partitions.

property spill_count: int | None#

Number of times this operator spilled data to disk due to memory pressure.

This is a count of spill events, not a byte count. Summed across all partitions. Returns None if no spill_count metric was recorded.

property spilled_bytes: int | None#

Sum of spilled_bytes across all partitions.

property spilled_rows: int | None#

Sum of spilled_rows across all partitions.

class datafusion.ParquetColumnOptions(encoding: str | None = None, dictionary_enabled: bool | None = None, compression: str | None = None, statistics_enabled: str | None = None, bloom_filter_enabled: bool | None = None, bloom_filter_fpp: float | None = None, bloom_filter_ndv: int | None = None)#

Parquet options for individual columns.

Contains the available options that can be applied for an individual Parquet column, replacing the global options in ParquetWriterOptions.

Initialize the ParquetColumnOptions.

Parameters:
  • encoding – Sets encoding for the column path. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case-sensitive. If None, uses the default parquet options

  • dictionary_enabled – Sets if dictionary encoding is enabled for the column path. If None, uses the default parquet options

  • compression – Sets default parquet compression codec for the column path. Valid values are uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case-sensitive. If None, uses the default parquet options.

  • statistics_enabled – Sets if statistics are enabled for the column Valid values are: none, chunk, and page These values are not case sensitive. If None, uses the default parquet options.

  • bloom_filter_enabled – Sets if bloom filter is enabled for the column path. If None, uses the default parquet options.

  • bloom_filter_fpp – Sets bloom filter false positive probability for the column path. If None, uses the default parquet options.

  • bloom_filter_ndv – Sets bloom filter number of distinct values. If None, uses the default parquet options.

bloom_filter_enabled = None#
bloom_filter_fpp = None#
bloom_filter_ndv = None#
compression = None#
dictionary_enabled = None#
encoding = None#
statistics_enabled = None#
class datafusion.ParquetWriterOptions(data_pagesize_limit: int = 1024 * 1024, write_batch_size: int = 1024, writer_version: str = '1.0', skip_arrow_metadata: bool = False, compression: str | None = 'zstd(3)', compression_level: int | None = None, dictionary_enabled: bool | None = True, dictionary_page_size_limit: int = 1024 * 1024, statistics_enabled: str | None = 'page', max_row_group_size: int = 1024 * 1024, created_by: str = 'datafusion-python', column_index_truncate_length: int | None = 64, statistics_truncate_length: int | None = None, data_page_row_count_limit: int = 20000, encoding: str | None = None, bloom_filter_on_write: bool = False, bloom_filter_fpp: float | None = None, bloom_filter_ndv: int | None = None, allow_single_file_parallelism: bool = True, maximum_parallel_row_group_writers: int = 1, maximum_buffered_record_batches_per_stream: int = 2, column_specific_options: dict[str, ParquetColumnOptions] | None = None)#

Advanced parquet writer options.

Allows settings the writer options that apply to the entire file. Some options can also be set on a column by column basis, with the field column_specific_options (see ParquetColumnOptions).

Initialize the ParquetWriterOptions.

Parameters:
  • data_pagesize_limit – Sets best effort maximum size of data page in bytes.

  • write_batch_size – Sets write_batch_size in bytes.

  • writer_version – Sets parquet writer version. Valid values are 1.0 and 2.0.

  • skip_arrow_metadata – Skip encoding the embedded arrow metadata in the KV_meta.

  • compression

    Compression type to use. Default is zstd(3). Available compression types are

    • uncompressed: No compression.

    • snappy: Snappy compression.

    • gzip(n): Gzip compression with level n.

    • brotli(n): Brotli compression with level n.

    • lz4: LZ4 compression.

    • lz4_raw: LZ4_RAW compression.

    • zstd(n): Zstandard compression with level n.

  • compression_level – Compression level to set.

  • dictionary_enabled – Sets if dictionary encoding is enabled. If None, uses the default parquet writer setting.

  • dictionary_page_size_limit – Sets best effort maximum dictionary page size, in bytes.

  • statistics_enabled – Sets if statistics are enabled for any column Valid values are none, chunk, and page. If None, uses the default parquet writer setting.

  • max_row_group_size – Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read.

  • created_by – Sets “created by” property.

  • column_index_truncate_length – Sets column index truncate length.

  • statistics_truncate_length – Sets statistics truncate length. If None, uses the default parquet writer setting.

  • data_page_row_count_limit – Sets best effort maximum number of rows in a data page.

  • encoding – Sets default encoding for any column. Valid values are plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. If None, uses the default parquet writer setting.

  • bloom_filter_on_write – Write bloom filters for all columns when creating parquet files.

  • bloom_filter_fpp – Sets bloom filter false positive probability. If None, uses the default parquet writer setting

  • bloom_filter_ndv – Sets bloom filter number of distinct values. If None, uses the default parquet writer setting.

  • allow_single_file_parallelism – Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files * n_row_groups * n_columns.

  • maximum_parallel_row_group_writers – By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.

  • maximum_buffered_record_batches_per_stream – See maximum_parallel_row_group_writers.

  • column_specific_options – Overrides options for specific columns. If a column is not a part of this dictionary, it will use the parameters provided here.

allow_single_file_parallelism = True#
bloom_filter_fpp = None#
bloom_filter_ndv = None#
bloom_filter_on_write = False#
column_index_truncate_length = 64#
column_specific_options = None#
created_by = 'datafusion-python'#
data_page_row_count_limit = 20000#
data_pagesize_limit = 1048576#
dictionary_enabled = True#
dictionary_page_size_limit = 1048576#
encoding = None#
max_row_group_size = 1048576#
maximum_buffered_record_batches_per_stream = 2#
maximum_parallel_row_group_writers = 1#
skip_arrow_metadata = False#
statistics_enabled = 'page'#
statistics_truncate_length = None#
write_batch_size = 1024#
writer_version = '1.0'#
class datafusion.RecordBatch(record_batch: datafusion._internal.RecordBatch)#

This class is essentially a wrapper for pa.RecordBatch.

This constructor is generally not called by the end user.

See the RecordBatchStream iterator for generating this class.

__arrow_c_array__(requested_schema: object | None = None) tuple[object, object]#

Export the record batch via the Arrow C Data Interface.

This allows zero-copy interchange with libraries that support the Arrow PyCapsule interface.

Parameters:

requested_schema – Attempt to provide the record batch using this schema. Only straightforward projections such as column selection or reordering are applied.

Returns:

Two Arrow PyCapsule objects representing the ArrowArray and ArrowSchema.

to_pyarrow() pyarrow.RecordBatch#

Convert to pa.RecordBatch.

record_batch#
class datafusion.RecordBatchStream(record_batch_stream: datafusion._internal.RecordBatchStream)#

This class represents a stream of record batches.

These are typically the result of a execute_stream() operation.

This constructor is typically not called by the end user.

__aiter__() Self#

Return an asynchronous iterator over record batches.

async __anext__() RecordBatch#

Return the next RecordBatch in the stream asynchronously.

__iter__() Self#

Return an iterator over record batches.

__next__() RecordBatch#

Return the next RecordBatch in the stream.

next() RecordBatch#

See __next__() for the iterator function.

rbs#
class datafusion.RuntimeEnvBuilder#

Runtime configuration options.

Create a new RuntimeEnvBuilder with default values.

with_disk_manager_disabled() RuntimeEnvBuilder#

Disable the disk manager, attempts to create temporary files will error.

Returns:

A new RuntimeEnvBuilder object with the updated setting.

with_disk_manager_os() RuntimeEnvBuilder#

Use the operating system’s temporary directory for disk manager.

Returns:

A new RuntimeEnvBuilder object with the updated setting.

with_disk_manager_specified(*paths: str | pathlib.Path) RuntimeEnvBuilder#

Use the specified paths for the disk manager’s temporary files.

Parameters:

paths – Paths to use for the disk manager’s temporary files.

Returns:

A new RuntimeEnvBuilder object with the updated setting.

with_fair_spill_pool(size: int) RuntimeEnvBuilder#

Use a fair spill pool with the specified size.

This pool works best when you know beforehand the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even when there was sufficient memory (reserved for other operators) to avoid doing so:

┌───────────────────────z──────────────────────z───────────────┐
│                       z                      z               │
│                       z                      z               │
│       Spillable       z       Unspillable    z     Free      │
│        Memory         z        Memory        z    Memory     │
│                       z                      z               │
│                       z                      z               │
└───────────────────────z──────────────────────z───────────────┘
Parameters:

size – Size of the memory pool in bytes.

Returns:

A new RuntimeEnvBuilder object with the updated setting.

Examples

>>> config = dfn.RuntimeEnvBuilder().with_fair_spill_pool(1024)
with_greedy_memory_pool(size: int) RuntimeEnvBuilder#

Use a greedy memory pool with the specified size.

This pool works well for queries that do not need to spill or have a single spillable operator. See with_fair_spill_pool() if there are multiple spillable operators that all will spill.

Parameters:

size – Size of the memory pool in bytes.

Returns:

A new RuntimeEnvBuilder object with the updated setting.

Examples

>>> config = dfn.RuntimeEnvBuilder().with_greedy_memory_pool(1024)
with_temp_file_path(path: str | pathlib.Path) RuntimeEnvBuilder#

Use the specified path to create any needed temporary files.

Parameters:

path – Path to use for temporary files.

Returns:

A new RuntimeEnvBuilder object with the updated setting.

Examples

>>> config = dfn.RuntimeEnvBuilder().with_temp_file_path("/tmp")
with_unbounded_memory_pool() RuntimeEnvBuilder#

Use an unbounded memory pool.

Returns:

A new RuntimeEnvBuilder object with the updated setting.

config_internal#
class datafusion.SQLOptions#

Options to be used when performing SQL queries.

Create a new SQLOptions with default values.

The default values are: - DDL commands are allowed - DML commands are allowed - Statements are allowed

with_allow_ddl(allow: bool = True) SQLOptions#

Should DDL (Data Definition Language) commands be run?

Examples of DDL commands include CREATE TABLE and DROP TABLE.

Parameters:

allow – Allow DDL commands to be run.

Returns:

A new SQLOptions object with the updated setting.

Examples

>>> options = dfn.SQLOptions().with_allow_ddl(True)
with_allow_dml(allow: bool = True) SQLOptions#

Should DML (Data Manipulation Language) commands be run?

Examples of DML commands include INSERT INTO and DELETE.

Parameters:

allow – Allow DML commands to be run.

Returns:

A new SQLOptions object with the updated setting.

Examples

>>> options = dfn.SQLOptions().with_allow_dml(True)
with_allow_statements(allow: bool = True) SQLOptions#

Should statements such as SET VARIABLE and BEGIN TRANSACTION be run?

Parameters:

allow – Allow statements to be run.

Returns:

py:class:SQLOptions` object with the updated setting.

Return type:

A new

Examples

>>> options = dfn.SQLOptions().with_allow_statements(True)
options_internal#
class datafusion.ScalarUDF(name: str, func: collections.abc.Callable[Ellipsis, _R], input_fields: list[pyarrow.Field], return_field: pyarrow.Field, volatility: Volatility | str)#

Class for performing scalar user-defined functions (UDF).

Scalar UDFs operate on a row by row basis. See also AggregateUDF for operating on a group of rows.

Instantiate a scalar user-defined function (UDF).

See helper method udf() for argument details.

__call__(*args: datafusion.expr.Expr) datafusion.expr.Expr#

Execute the UDF.

This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.

__repr__() str#

Print a string representation of the Scalar UDF.

classmethod _from_internal(internal: datafusion._internal.ScalarUDF) ScalarUDF#

Wrap an already-constructed internal ScalarUDF handle.

Used by SessionContext.udf() to surface a function looked up from the session’s function registry without re-running __init__().

static from_pycapsule(func: ScalarUDFExportable) ScalarUDF#

Create a Scalar UDF from ScalarUDF PyCapsule object.

This function will instantiate a Scalar UDF that uses a DataFusion ScalarUDF that is exported via the FFI bindings.

static udf(input_fields: collections.abc.Sequence[pyarrow.DataType | pyarrow.Field] | pyarrow.DataType | pyarrow.Field, return_field: pyarrow.DataType | pyarrow.Field, volatility: Volatility | str, name: str | None = None) collections.abc.Callable[Ellipsis, ScalarUDF]#
static udf(func: collections.abc.Callable[Ellipsis, _R], input_fields: collections.abc.Sequence[pyarrow.DataType | pyarrow.Field] | pyarrow.DataType | pyarrow.Field, return_field: pyarrow.DataType | pyarrow.Field, volatility: Volatility | str, name: str | None = None) ScalarUDF
static udf(func: ScalarUDFExportable) ScalarUDF

Create a new User-Defined Function (UDF).

This class can be used both as either a function or a decorator.

Usage:
  • As a function: udf(func, input_fields, return_field, volatility, name).

  • As a decorator: @udf(input_fields, return_field, volatility, name). When used a decorator, do not pass func explicitly.

In lieu of passing a PyArrow Field, you can pass a DataType for simplicity. When you do so, it will be assumed that the nullability of the inputs and output are True and that they have no metadata.

Parameters:
  • func (Callable, optional) – Only needed when calling as a function. Skip this argument when using udf as a decorator. If you have a Rust backed ScalarUDF within a PyCapsule, you can pass this parameter and ignore the rest. They will be determined directly from the underlying function. See the online documentation for more information.

  • input_fields (list[pa.Field | pa.DataType]) – The data types or Fields of the arguments to func. This list must be of the same length as the number of arguments.

  • return_field (_R) – The field of the return value from the function.

  • volatility (Volatility | str) – See Volatility for allowed values.

  • name (Optional[str]) – A descriptive name for the function.

Returns:

A user-defined function that can be used in SQL expressions, data aggregation, or window function calls.

Examples

Using udf as a function:

>>> import pyarrow.compute as pc
>>> from datafusion.user_defined import ScalarUDF
>>> def double_func(x):
...     return pc.multiply(x, 2)
>>> double_udf = ScalarUDF.udf(
...     double_func, [pa.int64()], pa.int64(),
...     "volatile", "double_it")

Using udf as a decorator:

>>> @ScalarUDF.udf([pa.int64()], pa.int64(), "volatile")
... def decorator_double_udf(x):
...     return pc.multiply(x, 3)

Apply to a dataframe:

>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"x": [1, 2, 3]})
>>> df.select(double_udf(col("x")).alias("result")).to_pydict()
{'result': [2, 4, 6]}
>>> df.select(decorator_double_udf(col("x")).alias("result")).to_pydict()
{'result': [3, 6, 9]}
_udf#
property name: str#

Return the registered name of this UDF.

For UDFs imported via the FFI capsule protocol, this is the name the capsule itself reports — not the name argument passed to the constructor (which is ignored on the FFI path).

Examples

>>> import pyarrow as pa
>>> from datafusion import udf
>>> double = udf(
...     lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]),
...     [pa.int64()],
...     pa.int64(),
...     volatility="immutable",
...     name="double",
... )
>>> double.name
'double'
class datafusion.SessionConfig(config_options: dict[str, str] | None = None)#

Session configuration options.

Create a new SessionConfig with the given configuration options.

Parameters:

config_options – Configuration options.

set(key: str, value: str) SessionConfig#

Set a configuration option.

Args: key: Option key. value: Option value.

Returns:

A new SessionConfig object with the updated setting.

with_batch_size(batch_size: int) SessionConfig#

Customize batch size.

Parameters:

batch_size – Batch size.

Returns:

A new SessionConfig object with the updated setting.

with_create_default_catalog_and_schema(enabled: bool = True) SessionConfig#

Control if the default catalog and schema will be automatically created.

Parameters:

enabled – Whether the default catalog and schema will be automatically created.

Returns:

A new SessionConfig object with the updated setting.

with_default_catalog_and_schema(catalog: str, schema: str) SessionConfig#

Select a name for the default catalog and schema.

Parameters:
  • catalog – Catalog name.

  • schema – Schema name.

Returns:

A new SessionConfig object with the updated setting.

with_extension(extension: Any) SessionConfig#

Create a new configuration using an extension.

Parameters:
  • extension – A custom configuration extension object. These are

  • library. (shared from another DataFusion extension)

Returns:

A new SessionConfig object with the updated setting.

with_information_schema(enabled: bool = True) SessionConfig#

Enable or disable the inclusion of information_schema virtual tables.

Parameters:

enabled – Whether to include information_schema virtual tables.

Returns:

A new SessionConfig object with the updated setting.

with_parquet_pruning(enabled: bool = True) SessionConfig#

Enable or disable the use of pruning predicate for parquet readers.

Pruning predicates will enable the reader to skip row groups.

Parameters:

enabled – Whether to use pruning predicate for parquet readers.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_aggregations(enabled: bool = True) SessionConfig#

Enable or disable the use of repartitioning for aggregations.

Enabling this improves parallelism.

Parameters:

enabled – Whether to use repartitioning for aggregations.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_file_min_size(size: int) SessionConfig#

Set minimum file range size for repartitioning scans.

Parameters:

size – Minimum file range size.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_file_scans(enabled: bool = True) SessionConfig#

Enable or disable the use of repartitioning for file scans.

Parameters:

enabled – Whether to use repartitioning for file scans.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_joins(enabled: bool = True) SessionConfig#

Enable or disable the use of repartitioning for joins to improve parallelism.

Parameters:

enabled – Whether to use repartitioning for joins.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_sorts(enabled: bool = True) SessionConfig#

Enable or disable the use of repartitioning for window functions.

This may improve parallelism.

Parameters:

enabled – Whether to use repartitioning for window functions.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_windows(enabled: bool = True) SessionConfig#

Enable or disable the use of repartitioning for window functions.

This may improve parallelism.

Parameters:

enabled – Whether to use repartitioning for window functions.

Returns:

A new SessionConfig object with the updated setting.

with_target_partitions(target_partitions: int) SessionConfig#

Customize the number of target partitions for query execution.

Increasing partitions can increase concurrency.

Parameters:

target_partitions – Number of target partitions.

Returns:

A new SessionConfig object with the updated setting.

config_internal#
class datafusion.Table(table: Table | datafusion.context.TableProviderExportable | datafusion.DataFrame | pyarrow.dataset.Dataset, ctx: datafusion.SessionContext | None = None)#

A DataFusion table.

Internally we currently support the following types of tables:

  • Tables created using built-in DataFusion methods, such as reading from CSV or Parquet

  • pyarrow datasets

  • DataFusion DataFrames, which will be converted into a view

  • Externally provided tables implemented with the FFI PyCapsule interface (advanced)

Constructor.

__repr__() str#

Print a string representation of the table.

static from_dataset(dataset: pyarrow.dataset.Dataset) Table#

Turn a pyarrow.dataset Dataset into a Table.

__slots__ = ('_inner',)#
_inner#
property kind: str#

Returns the kind of table.

property schema: pyarrow.Schema#

Returns the schema associated with this table.

class datafusion.TableFunction(name: str, func: collections.abc.Callable[Ellipsis, Any], ctx: datafusion.SessionContext | None = None, *, with_session: bool = False)#

Class for performing user-defined table functions (UDTF).

Table functions generate new table providers based on the input expressions.

Instantiate a user-defined table function (UDTF).

Set with_session=True to have the calling SessionContext passed as a session keyword argument on each invocation. Use it inside the callback to look up registered tables, UDFs, or session configuration. When with_session is False (the default), func is invoked with the positional expression arguments only.

with_session=True is only supported for pure-Python callables. Passing it together with an FFI-exported table function (one exposing __datafusion_table_function__) raises TypeError.

Registry mutations performed through the injected session (such as registering tables or UDFs) propagate to the caller’s SessionContext because the registries are shared. Configuration changes do not propagate; the wrapper holds its own clone of the session config.

See udtf() for a convenience function and argument descriptions.

__call__(*args: datafusion.expr.Expr) Any#

Execute the UDTF and return a table provider.

__repr__() str#

User printable representation.

static _create_table_udf(func: collections.abc.Callable[Ellipsis, Any], name: str, *, with_session: bool = False) TableFunction#

Create a TableFunction instance from function arguments.

static _create_table_udf_decorator(name: str | None = None, *, with_session: bool = False) collections.abc.Callable[[collections.abc.Callable[Ellipsis, Any]], TableFunction]#

Create a decorator for a TableFunction.

static udtf(name: str, *, with_session: bool = False) collections.abc.Callable[Ellipsis, Any]#
static udtf(func: collections.abc.Callable[Ellipsis, Any], name: str, *, with_session: bool = False) TableFunction

Create a new User-Defined Table Function (UDTF).

Pass with_session=True to have the calling SessionContext injected as a session keyword argument on each invocation.

_udtf#
class datafusion.TableProviderFactory#

Bases: abc.ABC

Abstract class for defining a Python based Table Provider Factory.

abstract create(cmd: datafusion.expr.CreateExternalTable) Table#

Create a table using the CreateExternalTable.

class datafusion.TableProviderFactoryExportable#

Bases: Protocol

Type hint for object that has __datafusion_table_provider_factory__ PyCapsule.

https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProviderFactory.html

__datafusion_table_provider_factory__(session: Any) object#
class datafusion.WindowFrame(units: str, start_bound: Any | None, end_bound: Any | None)#

Defines a window frame for performing window operations.

Construct a window frame using the given parameters.

Parameters:
  • units – Should be one of rows, range, or groups.

  • start_bound – Sets the preceding bound. Must be >= 0. If none, this will be set to unbounded. If unit type is groups, this parameter must be set.

  • end_bound – Sets the following bound. Must be >= 0. If none, this will be set to unbounded. If unit type is groups, this parameter must be set.

__repr__() str#

Print a string representation of the window frame.

get_frame_units() str#

Returns the window frame units for the bounds.

get_lower_bound() WindowFrameBound#

Returns starting bound.

get_upper_bound() WindowFrameBound#

Returns end bound.

window_frame#
class datafusion.WindowUDF(name: str, func: collections.abc.Callable[[], WindowEvaluator], input_types: list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str)#

Class for performing window user-defined functions (UDF).

Window UDFs operate on a partition of rows. See also ScalarUDF for operating on a row by row basis.

Instantiate a user-defined window function (UDWF).

See udwf() for a convenience function and argument descriptions.

__call__(*args: datafusion.expr.Expr) datafusion.expr.Expr#

Execute the UDWF.

This function is not typically called by an end user. These calls will occur during the evaluation of the dataframe.

__repr__() str#

Print a string representation of the Window UDF.

static _create_window_udf(func: collections.abc.Callable[[], WindowEvaluator], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) WindowUDF#

Create a WindowUDF instance from function arguments.

static _create_window_udf_decorator(input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) collections.abc.Callable[[collections.abc.Callable[[], WindowEvaluator]], collections.abc.Callable[Ellipsis, datafusion.expr.Expr]]#

Create a decorator for a WindowUDF.

classmethod _from_internal(internal: datafusion._internal.WindowUDF) WindowUDF#

Wrap an already-constructed internal WindowUDF handle.

Used by SessionContext.udwf() to surface a function looked up from the session’s function registry without re-running __init__().

static _get_default_name(func: collections.abc.Callable) str#

Get the default name for a function based on its attributes.

static _normalize_input_types(input_types: pyarrow.DataType | list[pyarrow.DataType]) list[pyarrow.DataType]#

Convert a single DataType to a list if needed.

static from_pycapsule(func: WindowUDFExportable) WindowUDF#

Create a Window UDF from WindowUDF PyCapsule object.

This function will instantiate a Window UDF that uses a DataFusion WindowUDF that is exported via the FFI bindings.

static udwf(input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) collections.abc.Callable[Ellipsis, WindowUDF]#
static udwf(func: collections.abc.Callable[[], WindowEvaluator], input_types: pyarrow.DataType | list[pyarrow.DataType], return_type: pyarrow.DataType, volatility: Volatility | str, name: str | None = None) WindowUDF

Create a new User-Defined Window Function (UDWF).

This class can be used both as either a function or a decorator.

Usage:
  • As a function: udwf(func, input_types, return_type, volatility, name).

  • As a decorator: @udwf(input_types, return_type, volatility, name). When using udwf as a decorator, do not pass func explicitly.

Examples

>>> from datafusion.user_defined import WindowUDF, WindowEvaluator, udwf
>>> class BiasedNumbers(WindowEvaluator):
...     def __init__(self, start: int = 0):
...         self.start = start
...     def evaluate_all(self, values, num_rows):
...         return pa.array(
...             [self.start + i for i in range(num_rows)])

Using udwf as a function:

>>> udwf1 = WindowUDF.udwf(
...     BiasedNumbers, pa.int64(), pa.int64(), "immutable")
>>> def bias_10() -> BiasedNumbers:
...    return BiasedNumbers(10)
>>> udwf2 = udwf(bias_10, pa.int64(), pa.int64(), "immutable")
>>> udwf3 = udwf(
...     lambda: BiasedNumbers(20), pa.int64(), pa.int64(), "immutable"
... )

Using udwf as a decorator:

>>> @WindowUDF.udwf(pa.int64(), pa.int64(), "immutable")
... def biased_numbers():
...     return BiasedNumbers(10)

Apply to a dataframe:

>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [10, 20, 30]})
>>> df.select(udwf1(col("a")).alias("result")).to_pydict()
{'result': [0, 1, 2]}
>>> df.select(udwf2(col("a")).alias("result")).to_pydict()
{'result': [10, 11, 12]}
>>> df.select(udwf3(col("a")).alias("result")).to_pydict()
{'result': [20, 21, 22]}
>>> df.select(biased_numbers(col("a")).alias("result")).to_pydict()
{'result': [10, 11, 12]}
Parameters:
  • func – Only needed when calling as a function. Skip this argument when using udwf as a decorator. If you have a Rust backed WindowUDF within a PyCapsule, you can pass this parameter and ignore the rest. They will be determined directly from the underlying function. See the online documentation for more information.

  • input_types – The data types of the arguments.

  • return_type – The data type of the return value.

  • volatility – See Volatility for allowed values.

  • name – A descriptive name for the function.

Returns:

A user-defined window function that can be used in window function calls.

_udwf#
property name: str#

Return the registered name of this UDWF.

For UDWFs imported via the FFI capsule protocol, this is the name the capsule itself reports — not the name argument passed to the constructor (which is ignored on the FFI path).

datafusion.configure_formatter(**kwargs: Any) None#

Configure the global DataFrame HTML formatter.

This function creates a new formatter with the provided configuration and sets it as the global formatter for all DataFrames.

Parameters:

**kwargs – Formatter configuration parameters like max_cell_length, max_width, max_height, enable_cell_expansion, etc.

Raises:

ValueError – If any invalid parameters are provided

Example

>>> from datafusion.dataframe_formatter import configure_formatter
>>> configure_formatter(
...     max_cell_length=50,
...     max_height=500,
...     enable_cell_expansion=True,
...     use_shared_styles=True
... )
datafusion.lit(value: Any) expr.Expr#

Create a literal expression.

datafusion.literal(value: Any) expr.Expr#

Create a literal expression.

datafusion.read_avro(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_extension: str = '.avro') datafusion.dataframe.DataFrame#

Create a DataFrame for reading Avro data source.

This function will use the global context. Any functions or tables registered with another context may not be accessible when used with a DataFrame created using this function.

Parameters:
  • path – Path to the Avro file.

  • schema – The data source schema.

  • file_partition_cols – Partition columns.

  • file_extension – File extension to select.

Returns:

DataFrame representation of the read Avro file

datafusion.read_csv(path: str | pathlib.Path | list[str] | list[pathlib.Path], schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = 1000, file_extension: str = '.csv', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_compression_type: str | None = None, options: datafusion.options.CsvReadOptions | None = None) datafusion.dataframe.DataFrame#

Read a CSV data source.

This function will use the global context. Any functions or tables registered with another context may not be accessible when used with a DataFrame created using this function.

Parameters:
  • path – Path to the CSV file

  • schema – An optional schema representing the CSV files. If None, the CSV reader will try to infer it based on data in file.

  • has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.

  • delimiter – An optional column delimiter.

  • schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.

  • file_extension – File extension; only files with this extension are selected for data input.

  • table_partition_cols – Partition columns.

  • file_compression_type – File compression type.

  • options – Set advanced options for CSV reading. This cannot be combined with any of the other options in this method.

Returns:

DataFrame representation of the read CSV files

datafusion.read_json(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_compression_type: str | None = None) datafusion.dataframe.DataFrame#

Read a line-delimited JSON data source.

This function will use the global context. Any functions or tables registered with another context may not be accessible when used with a DataFrame created using this function.

Parameters:
  • path – Path to the JSON file.

  • schema – The data source schema.

  • schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.

  • file_extension – File extension; only files with this extension are selected for data input.

  • table_partition_cols – Partition columns.

  • file_compression_type – File compression type.

Returns:

DataFrame representation of the read JSON files.

datafusion.read_parquet(path: str | pathlib.Path, table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: list[list[datafusion.expr.Expr]] | None = None) datafusion.dataframe.DataFrame#

Read a Parquet source into a Dataframe.

This function will use the global context. Any functions or tables registered with another context may not be accessible when used with a DataFrame created using this function.

Parameters:
  • path – Path to the Parquet file.

  • table_partition_cols – Partition columns.

  • parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.

  • file_extension – File extension; only files with this extension are selected for data input.

  • skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.

  • schema – An optional schema representing the parquet files. If None, the parquet reader will try to infer it based on data in the file.

  • file_sort_order – Sort order for the file.

Returns:

DataFrame representation of the read Parquet files

datafusion.DFSchema#
datafusion.col: Col#
datafusion.column: Col#
datafusion.udaf#
datafusion.udf#
datafusion.udtf#
datafusion.udwf#