datafusion.dataframe¶
DataFrame is one of the core concepts in DataFusion.
See Concepts in the online documentation for more information.
Classes¶
Enum representing the available compression types for Parquet files. |
|
Two dimensional table representation of data. |
|
Writer options for DataFrame. |
|
Output format for explain plans. |
|
Insert operation mode. |
|
Parquet options for individual columns. |
|
Advanced parquet writer options. |
Module Contents¶
- class datafusion.dataframe.Compression¶
Bases:
enum.EnumEnum representing the available compression types for Parquet files.
- classmethod from_str(value: str) Compression¶
Convert a string to a Compression enum value.
- Parameters:
value – The string representation of the compression type.
- Returns:
The Compression enum lowercase value.
- Raises:
ValueError – If the string does not match any Compression enum value.
- get_default_level() int | None¶
Get the default compression level for the compression type.
- Returns:
The default compression level for the compression type.
- BROTLI = 'brotli'¶
- GZIP = 'gzip'¶
- LZ4 = 'lz4'¶
- LZ4_RAW = 'lz4_raw'¶
- SNAPPY = 'snappy'¶
- UNCOMPRESSED = 'uncompressed'¶
- ZSTD = 'zstd'¶
- class datafusion.dataframe.DataFrame(df: datafusion._internal.DataFrame)¶
Two dimensional table representation of data.
DataFrame objects are iterable; iterating over a DataFrame yields
datafusion.RecordBatchinstances lazily.See Concepts in the online documentation for more information.
This constructor is not to be used by the end user.
See
SessionContextfor methods to create aDataFrame.- __aiter__() collections.abc.AsyncIterator[datafusion.record_batch.RecordBatch]¶
Return an async iterator over this DataFrame’s record batches.
We’re using __aiter__ because we support Python < 3.10 where aiter() is not available.
- __arrow_c_stream__(requested_schema: object | None = None) object¶
Export the DataFrame as an Arrow C Stream.
The DataFrame is executed using DataFusion’s streaming APIs and exposed via Arrow’s C Stream interface. Record batches are produced incrementally, so the full result set is never materialized in memory.
When
requested_schemais provided, DataFusion applies only simple projections such as selecting a subset of existing columns or reordering them. Column renaming, computed expressions, or type coercion are not supported through this interface.- Parameters:
requested_schema – Either a
pyarrow.Schemaor an Arrow C Schema capsule (PyCapsule) produced byschema._export_to_c_capsule(). The DataFrame will attempt to align its output with the fields and order specified by this schema.- Returns:
Arrow
PyCapsuleobject representing anArrowArrayStream.
For practical usage patterns, see the Apache Arrow streaming documentation: https://arrow.apache.org/docs/python/ipc.html#streaming.
For details on DataFusion’s Arrow integration and DataFrame streaming, see the user guide (user-guide/io/arrow and user-guide/dataframe/index).
Notes
The Arrow C Data Interface PyCapsule details are documented by Apache Arrow and can be found at: https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
- __getitem__(key: str | list[str]) DataFrame¶
Return a new
DataFramewith the specified column or columns.- Parameters:
key – Column name or list of column names to select.
- Returns:
DataFrame with the specified column or columns.
- __iter__() collections.abc.Iterator[datafusion.record_batch.RecordBatch]¶
Return an iterator over this DataFrame’s record batches.
- __repr__() str¶
Return a string representation of the DataFrame.
- Returns:
String representation of the DataFrame.
- _repr_html_() str¶
- aggregate(group_by: collections.abc.Sequence[datafusion.expr.Expr | str] | datafusion.expr.Expr | str, aggs: collections.abc.Sequence[datafusion.expr.Expr] | datafusion.expr.Expr) DataFrame¶
Aggregates the rows of the current DataFrame.
By default each unique combination of the
group_bycolumns produces one row. To get multiple levels of subtotals in a single pass, pass aGroupingSetexpression (created viarollup(),cube(), orgrouping_sets()) as thegroup_byargument. See the Aggregation user guide for detailed examples.- Parameters:
group_by – Sequence of expressions or column names to group by. A
GroupingSetexpression may be included to produce multiple grouping levels (rollup, cube, or explicit grouping sets).aggs – Sequence of expressions to aggregate.
- Returns:
DataFrame after aggregation.
- cast(mapping: dict[str, pyarrow.DataType[Any]]) DataFrame¶
Cast one or more columns to a different data type.
- Parameters:
mapping – Mapped with column as key and column dtype as value.
- Returns:
DataFrame after casting columns
- col(name: str) datafusion.expr.Expr¶
Alias for
column().See also
- collect() list[pyarrow.RecordBatch]¶
Execute this
DataFrameand collect results into memory.Prior to calling
collect, modifying a DataFrame simply updates a plan (no actual computation is performed). Callingcollecttriggers the computation.- Returns:
List of
pyarrow.RecordBatchcollected from the DataFrame.
- collect_column(column_name: str) pyarrow.Array | pyarrow.ChunkedArray¶
Executes this
DataFramefor a single column.
- collect_partitioned() list[list[pyarrow.RecordBatch]]¶
Execute this DataFrame and collect all partitioned results.
This operation returns
pyarrow.RecordBatchmaintaining the input partitioning.- Returns:
- List of list of
RecordBatchcollected from the DataFrame.
- List of list of
- column(name: str) datafusion.expr.Expr¶
Return a fully qualified column expression for
name.Resolves an unqualified column name against this DataFrame’s schema and returns an
Exprwhose underlying column reference includes the table qualifier. This is especially useful after joins, where the same column name may appear in multiple relations.- Parameters:
name – Unqualified column name to look up.
- Returns:
A fully qualified column expression.
- Raises:
Exception – If the column is not found or is ambiguous (exists in multiple relations).
Examples
Resolve a column from a simple DataFrame:
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2], "b": [3, 4]}) >>> expr = df.column("a") >>> df.select(expr).to_pydict() {'a': [1, 2]}
Resolve qualified columns after a join:
>>> left = ctx.from_pydict({"id": [1, 2], "x": [10, 20]}) >>> right = ctx.from_pydict({"id": [1, 2], "y": [30, 40]}) >>> joined = left.join(right, on="id", how="inner") >>> expr = joined.column("y") >>> joined.select("id", expr).sort("id").to_pydict() {'id': [1, 2], 'y': [30, 40]}
- count() int¶
Return the total number of rows in this
DataFrame.Note that this method will actually run a plan to calculate the count, which may be slow for large or complicated DataFrames.
- Returns:
Number of rows in the DataFrame.
- static default_str_repr(batches: list[pyarrow.RecordBatch], schema: pyarrow.Schema, has_more: bool, table_uuid: str | None = None) str¶
Return the default string representation of a DataFrame.
This method is used by the default formatter and implemented in Rust for performance reasons.
- describe() DataFrame¶
Return the statistics for this DataFrame.
Only summarized numeric datatypes at the moments and returns nulls for non-numeric datatypes.
The output format is modeled after pandas.
- Returns:
A summary DataFrame containing statistics.
- distinct() DataFrame¶
Return a new
DataFramewith all duplicated rows removed.- Returns:
DataFrame after removing duplicates.
- distinct_on(on_expr: list[datafusion.expr.Expr], select_expr: list[datafusion.expr.Expr], sort_expr: list[datafusion.expr.SortKey] | None = None) DataFrame¶
Deduplicate rows based on specific columns.
Returns a new DataFrame with one row per unique combination of the
on_exprcolumns, keeping the first row per group as determined bysort_expr.- Parameters:
on_expr – Expressions that determine uniqueness.
select_expr – Expressions to include in the output.
sort_expr – Optional sort expressions to determine which row to keep.
- Returns:
DataFrame after deduplication.
Examples
Keep the row with the smallest
bfor each uniquea:>>> from datafusion import col >>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 1, 2, 2], "b": [10, 20, 30, 40]}) >>> df.distinct_on( ... [col("a")], ... [col("a"), col("b")], ... [col("a").sort(ascending=True), col("b").sort(ascending=True)], ... ).sort("a").to_pydict() {'a': [1, 2], 'b': [10, 30]}
- drop(*columns: str) DataFrame¶
Drop arbitrary amount of columns.
Column names are case-sensitive and require double quotes to be dropped if the original name is not strictly lower case.
- Parameters:
columns – Column names to drop from the dataframe.
- Returns:
DataFrame with those columns removed in the projection.
Examples
To drop a lower-cased column ‘a’
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2], "b": [3, 4]}) >>> df.drop("a").schema().names ['b']
Or to drop an upper-cased column ‘A’
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"A": [1, 2], "b": [3, 4]}) >>> df.drop('"A"').schema().names ['b']
- except_all(other: DataFrame, distinct: bool = False) DataFrame¶
Calculate the set difference of two
DataFrame.Returns rows that are in this DataFrame but not in
other.The two
DataFramemust have exactly the same schema.- Parameters:
other – DataFrame to calculate exception with.
distinct – If
True, duplicate rows are removed from the result.
- Returns:
DataFrame after set difference.
Examples
Remove rows present in
df2:>>> ctx = dfn.SessionContext() >>> df1 = ctx.from_pydict({"a": [1, 2, 3], "b": [10, 20, 30]}) >>> df2 = ctx.from_pydict({"a": [1, 2], "b": [10, 20]}) >>> df1.except_all(df2).sort("a").to_pydict() {'a': [3], 'b': [30]}
Remove rows present in
df2and deduplicate:>>> df1.except_all(df2, distinct=True).sort("a").to_pydict() {'a': [3], 'b': [30]}
- execute_stream() datafusion.record_batch.RecordBatchStream¶
Executes this DataFrame and returns a stream over a single partition.
- Returns:
Record Batch Stream over a single partition.
- execute_stream_partitioned() list[datafusion.record_batch.RecordBatchStream]¶
Executes this DataFrame and returns a stream for each partition.
- Returns:
One record batch stream per partition.
- execution_plan() datafusion.plan.ExecutionPlan¶
Return the execution/physical plan.
- Returns:
Execution plan.
- explain(verbose: bool = False, analyze: bool = False, format: ExplainFormat | None = None) None¶
Print an explanation of the DataFrame’s plan so far.
If
analyzeis specified, runs the plan and reports metrics.- Parameters:
verbose – If
True, more details will be included.analyze – If
True, the plan will run and metrics reported.format – Output format for the plan. Defaults to
ExplainFormat.INDENT.
Examples
Show the plan in tree format:
>>> from datafusion import ExplainFormat >>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2, 3]}) >>> df.explain(format=ExplainFormat.TREE)
Show plan with runtime metrics:
>>> df.explain(analyze=True)
- fill_null(value: Any, subset: list[str] | None = None) DataFrame¶
Fill null values in specified columns with a value.
- Parameters:
value – Value to replace nulls with. Will be cast to match column type.
subset – Optional list of column names to fill. If None, fills all columns.
- Returns:
DataFrame with null values replaced where type casting is possible
Examples
>>> from datafusion import SessionContext, col >>> ctx = SessionContext() >>> df = ctx.from_pydict({"a": [1, None, 3], "b": [None, 5, 6]}) >>> filled = df.fill_null(0) >>> filled.sort(col("a")).collect()[0].column("a").to_pylist() [0, 1, 3]
Notes
Only fills nulls in columns where the value can be cast to the column type
For columns where casting fails, the original column is kept unchanged
For columns not in subset, the original column is kept unchanged
- filter(*predicates: datafusion.expr.Expr | str) DataFrame¶
Return a DataFrame for which
predicateevaluates toTrue.Rows for which
predicateevaluates toFalseorNoneare filtered out. If more than one predicate is provided, these predicates will be combined as a logical AND. Eachpredicatecan be anExprcreated using helper functions such asdatafusion.col()ordatafusion.lit(), or a SQL expression string that will be parsed against the DataFrame schema. If more complex logic is required, see the logical operations infunctions.Examples
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2, 3]}) >>> df.filter(col("a") > lit(1)).to_pydict() {'a': [2, 3]} >>> df.filter("a > 1").to_pydict() {'a': [2, 3]}
- Parameters:
predicates – Predicate expression(s) or SQL strings to filter the DataFrame.
- Returns:
DataFrame after filtering.
- find_qualified_columns(*names: str) list[datafusion.expr.Expr]¶
Return fully qualified column expressions for the given names.
This is a batch version of
column()— it resolves each unqualified name against the DataFrame’s schema and returns a list of qualified column expressions.- Parameters:
names – Unqualified column names to look up.
- Returns:
List of fully qualified column expressions, one per name.
- Raises:
Exception – If any column is not found or is ambiguous.
Examples
Resolve multiple columns at once:
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2], "b": [3, 4], "c": [5, 6]}) >>> exprs = df.find_qualified_columns("a", "c") >>> df.select(*exprs).to_pydict() {'a': [1, 2], 'c': [5, 6]}
- head(n: int = 5) DataFrame¶
Return a new
DataFramewith a limited number of rows.- Parameters:
n – Number of rows to take from the head of the DataFrame.
- Returns:
DataFrame after limiting.
- intersect(other: DataFrame, distinct: bool = False) DataFrame¶
Calculate the intersection of two
DataFrame.The two
DataFramemust have exactly the same schema.- Parameters:
other – DataFrame to intersect with.
distinct – If
True, duplicate rows are removed from the result.
- Returns:
DataFrame after intersection.
Examples
Find rows common to both DataFrames:
>>> ctx = dfn.SessionContext() >>> df1 = ctx.from_pydict({"a": [1, 2, 3], "b": [10, 20, 30]}) >>> df2 = ctx.from_pydict({"a": [1, 4], "b": [10, 40]}) >>> df1.intersect(df2).to_pydict() {'a': [1], 'b': [10]}
Intersect with deduplication:
>>> df1 = ctx.from_pydict({"a": [1, 1, 2], "b": [10, 10, 20]}) >>> df2 = ctx.from_pydict({"a": [1, 1], "b": [10, 10]}) >>> df1.intersect(df2, distinct=True).to_pydict() {'a': [1], 'b': [10]}
- into_view(temporary: bool = False) datafusion.catalog.Table¶
Convert
DataFrameinto aTable.Examples
>>> from datafusion import SessionContext >>> ctx = SessionContext() >>> df = ctx.sql("SELECT 1 AS value") >>> view = df.into_view() >>> ctx.register_table("values_view", view) >>> result = ctx.sql("SELECT value FROM values_view").collect() >>> result[0].column("value").to_pylist() [1]
- join(right: DataFrame, on: str | collections.abc.Sequence[str], how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti'] = 'inner', *, left_on: None = None, right_on: None = None, join_keys: None = None, coalesce_duplicate_keys: bool = True) DataFrame¶
- join(right: DataFrame, on: None = None, how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti'] = 'inner', *, left_on: str | collections.abc.Sequence[str], right_on: str | collections.abc.Sequence[str], join_keys: tuple[list[str], list[str]] | None = None, coalesce_duplicate_keys: bool = True) DataFrame
- join(right: DataFrame, on: None = None, how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti'] = 'inner', *, join_keys: tuple[list[str], list[str]], left_on: None = None, right_on: None = None, coalesce_duplicate_keys: bool = True) DataFrame
Join this
DataFramewith anotherDataFrame.onhas to be provided or bothleft_onandright_onin conjunction.When non-key columns share the same name in both DataFrames, use
DataFrame.col()on each DataFrame before the join to obtain fully qualified column references that can disambiguate them. Seejoin_on()for an example.- Parameters:
right – Other DataFrame to join with.
on – Column names to join on in both dataframes.
how – Type of join to perform. Supported types are “inner”, “left”, “right”, “full”, “semi”, “anti”.
left_on – Join column of the left dataframe.
right_on – Join column of the right dataframe.
coalesce_duplicate_keys – When True, coalesce the columns from the right DataFrame and left DataFrame that have identical names in the
onfields.join_keys – Tuple of two lists of column names to join on. [Deprecated]
- Returns:
DataFrame after join.
- join_on(right: DataFrame, *on_exprs: datafusion.expr.Expr, how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti'] = 'inner') DataFrame¶
Join two
DataFrameusing the specified expressions.Join predicates must be
Exprobjects, typically built withdatafusion.col(). On expressions are used to support in-equality predicates. Equality predicates are correctly optimized.Use
DataFrame.col()on each DataFrame before the join to obtain fully qualified column references. These qualified references can then be used in the join predicate and to disambiguate columns with the same name when selecting from the result.Examples
Join with unique column names:
>>> ctx = dfn.SessionContext() >>> left = ctx.from_pydict({"a": [1, 2], "x": ["a", "b"]}) >>> right = ctx.from_pydict({"b": [1, 2], "y": ["c", "d"]}) >>> left.join_on( ... right, col("a") == col("b") ... ).sort(col("x")).to_pydict() {'a': [1, 2], 'x': ['a', 'b'], 'b': [1, 2], 'y': ['c', 'd']}
Use
col()to disambiguate shared column names:>>> left = ctx.from_pydict({"id": [1, 2], "val": [10, 20]}) >>> right = ctx.from_pydict({"id": [1, 2], "val": [30, 40]}) >>> joined = left.join_on( ... right, left.col("id") == right.col("id"), how="inner" ... ) >>> joined.select( ... left.col("id"), left.col("val"), right.col("val").alias("rval") ... ).sort(left.col("id")).to_pydict() {'id': [1, 2], 'val': [10, 20], 'rval': [30, 40]}
- Parameters:
right – Other DataFrame to join with.
on_exprs – single or multiple (in)-equality predicates.
how – Type of join to perform. Supported types are “inner”, “left”, “right”, “full”, “semi”, “anti”.
- Returns:
DataFrame after join.
- limit(count: int, offset: int = 0) DataFrame¶
Return a new
DataFramewith a limited number of rows.- Parameters:
count – Number of rows to limit the DataFrame to.
offset – Number of rows to skip.
- Returns:
DataFrame after limiting.
- logical_plan() datafusion.plan.LogicalPlan¶
Return the unoptimized
LogicalPlan.- Returns:
Unoptimized logical plan.
- optimized_logical_plan() datafusion.plan.LogicalPlan¶
Return the optimized
LogicalPlan.- Returns:
Optimized logical plan.
- parse_sql_expr(expr: str) datafusion.expr.Expr¶
Creates logical expression from a SQL query text.
The expression is created and processed against the current schema.
Examples
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2, 3]}) >>> expr = df.parse_sql_expr("a > 1") >>> df.filter(expr).to_pydict() {'a': [2, 3]}
- Parameters:
expr – Expression string to be converted to datafusion expression
- Returns:
Logical expression .
- repartition(num: int) DataFrame¶
Repartition a DataFrame into
numpartitions.The batches allocation uses a round-robin algorithm.
- Parameters:
num – Number of partitions to repartition the DataFrame into.
- Returns:
Repartitioned DataFrame.
- repartition_by_hash(*exprs: datafusion.expr.Expr | str, num: int) DataFrame¶
Repartition a DataFrame using a hash partitioning scheme.
- Parameters:
exprs – Expressions or a SQL expression string to evaluate and perform hashing on.
num – Number of partitions to repartition the DataFrame into.
- Returns:
Repartitioned DataFrame.
- schema() pyarrow.Schema¶
Return the
pyarrow.Schemaof this DataFrame.The output schema contains information on the name, data type, and nullability for each column.
- Returns:
Describing schema of the DataFrame
- select(*exprs: datafusion.expr.Expr | str) DataFrame¶
Project arbitrary expressions into a new
DataFrame.- Parameters:
exprs – Either column names or
Exprto select.- Returns:
DataFrame after projection. It has one column for each expression.
Example usage:
The following example will return 3 columns from the original dataframe. The first two columns will be the original column
aandbsince the string “a” is assumed to refer to column selection. Also a duplicate of columnawill be returned with the column namealternate_a:df = df.select("a", col("b"), col("a").alias("alternate_a"))
- select_exprs(*args: str) DataFrame¶
Project arbitrary list of expression strings into a new DataFrame.
This method will parse string expressions into logical plan expressions. The output DataFrame has one column for each expression.
- Returns:
DataFrame only containing the specified columns.
- show(num: int = 20) None¶
Execute the DataFrame and print the result to the console.
- Parameters:
num – Number of lines to show.
- sort(*exprs: datafusion.expr.SortKey) DataFrame¶
Sort the DataFrame by the specified sorting expressions or column names.
Note that any expression can be turned into a sort expression by calling its
sortmethod.- Parameters:
exprs – Sort expressions or column names, applied in order.
- Returns:
DataFrame after sorting.
- sort_by(*exprs: datafusion.expr.Expr | str) DataFrame¶
Sort the DataFrame by column expressions in ascending order.
This is a convenience method that sorts the DataFrame by the given expressions in ascending order with nulls last. For more control over sort direction and null ordering, use
sort()instead.- Parameters:
exprs – Expressions or column names to sort by.
- Returns:
DataFrame after sorting.
Examples
Sort by a single column:
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [3, 1, 2]}) >>> df.sort_by("a").to_pydict() {'a': [1, 2, 3]}
- tail(n: int = 5) DataFrame¶
Return a new
DataFramewith a limited number of rows.Be aware this could be potentially expensive since the row size needs to be determined of the dataframe. This is done by collecting it.
- Parameters:
n – Number of rows to take from the tail of the DataFrame.
- Returns:
DataFrame after limiting.
- to_arrow_table() pyarrow.Table¶
Execute the
DataFrameand convert it into an Arrow Table.- Returns:
Arrow Table.
- to_pandas() pandas.DataFrame¶
Execute the
DataFrameand convert it into a Pandas DataFrame.- Returns:
Pandas DataFrame.
- to_polars() polars.DataFrame¶
Execute the
DataFrameand convert it into a Polars DataFrame.- Returns:
Polars DataFrame.
- to_pydict() dict[str, list[Any]]¶
Execute the
DataFrameand convert it into a dictionary of lists.- Returns:
Dictionary of lists.
- to_pylist() list[dict[str, Any]]¶
Execute the
DataFrameand convert it into a list of dictionaries.- Returns:
List of dictionaries.
- transform(func: collections.abc.Callable[Ellipsis, DataFrame], *args: Any) DataFrame¶
Apply a function to the current DataFrame which returns another DataFrame.
This is useful for chaining together multiple functions.
Examples
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2, 3]}) >>> def add_3(df): ... return df.with_column("modified", dfn.lit(3)) >>> def within_limit(df: DataFrame, limit: int) -> DataFrame: ... return df.filter(col("a") < lit(limit)).distinct() >>> df.transform(add_3).transform(within_limit, 4).sort("a").to_pydict() {'a': [1, 2, 3], 'modified': [3, 3, 3]}
- Parameters:
func – A callable function that takes a DataFrame as it’s first argument
args – Zero or more arguments to pass to func
- Returns:
After applying func to the original dataframe.
- Return type:
- union(other: DataFrame, distinct: bool = False) DataFrame¶
Calculate the union of two
DataFrame.The two
DataFramemust have exactly the same schema.- Parameters:
other – DataFrame to union with.
distinct – If
True, duplicate rows will be removed.
- Returns:
DataFrame after union.
- union_by_name(other: DataFrame, distinct: bool = False) DataFrame¶
Union two
DataFramematching columns by name.Unlike
union()which matches columns by position, this method matches columns by their names, allowing DataFrames with different column orders to be combined.- Parameters:
other – DataFrame to union with.
distinct – If
True, duplicate rows are removed from the result.
- Returns:
DataFrame after union by name.
Examples
Combine DataFrames with different column orders:
>>> ctx = dfn.SessionContext() >>> df1 = ctx.from_pydict({"a": [1], "b": [10]}) >>> df2 = ctx.from_pydict({"b": [20], "a": [2]}) >>> df1.union_by_name(df2).sort("a").to_pydict() {'a': [1, 2], 'b': [10, 20]}
Union by name with deduplication:
>>> df1 = ctx.from_pydict({"a": [1, 1], "b": [10, 10]}) >>> df2 = ctx.from_pydict({"b": [10], "a": [1]}) >>> df1.union_by_name(df2, distinct=True).to_pydict() {'a': [1], 'b': [10]}
- unnest_columns(*columns: str, preserve_nulls: bool = True, recursions: list[tuple[str, str, int]] | None = None) DataFrame¶
Expand columns of arrays into a single row per array element.
- Parameters:
columns – Column names to perform unnest operation on.
preserve_nulls – If False, rows with null entries will not be returned.
recursions – Optional list of
(input_column, output_column, depth)tuples that control how deeply nested columns are unnested. Any column not mentioned here is unnested with depth 1.
- Returns:
A DataFrame with the columns expanded.
Examples
Unnest an array column:
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [[1, 2], [3]], "b": ["x", "y"]}) >>> df.unnest_columns("a").to_pydict() {'a': [1, 2, 3], 'b': ['x', 'x', 'y']}
With explicit recursion depth:
>>> df.unnest_columns("a", recursions=[("a", "a", 1)]).to_pydict() {'a': [1, 2, 3], 'b': ['x', 'x', 'y']}
- window(*exprs: datafusion.expr.Expr) DataFrame¶
Add window function columns to the DataFrame.
Applies the given window function expressions and appends the results as new columns.
- Parameters:
exprs – Window function expressions to evaluate.
- Returns:
DataFrame with new window function columns appended.
Examples
Add a row number within each group:
>>> import datafusion.functions as f >>> from datafusion import col >>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "x", "y"]}) >>> df = df.window( ... f.row_number( ... partition_by=[col("b")], order_by=[col("a")] ... ).alias("rn") ... ) >>> "rn" in df.schema().names True
- with_column(name: str, expr: datafusion.expr.Expr | str) DataFrame¶
Add an additional column to the DataFrame.
The
exprmust be anExprconstructed withdatafusion.col()ordatafusion.lit(), or a SQL expression string that will be parsed against the DataFrame schema.Examples
>>> ctx = dfn.SessionContext() >>> df = ctx.from_pydict({"a": [1, 2]}) >>> df.with_column("b", col("a") + lit(10)).to_pydict() {'a': [1, 2], 'b': [11, 12]}
- Parameters:
name – Name of the column to add.
expr – Expression to compute the column.
- Returns:
DataFrame with the new column.
- with_column_renamed(old_name: str, new_name: str) DataFrame¶
Rename one column by applying a new projection.
This is a no-op if the column to be renamed does not exist.
The method supports case sensitive rename with wrapping column name into one the following symbols (” or ‘ or `).
- Parameters:
old_name – Old column name.
new_name – New column name.
- Returns:
DataFrame with the column renamed.
- with_columns(*exprs: datafusion.expr.Expr | str | collections.abc.Iterable[datafusion.expr.Expr | str], **named_exprs: datafusion.expr.Expr | str) DataFrame¶
Add columns to the DataFrame.
By passing expressions, iterables of expressions, string SQL expressions, or named expressions. All expressions must be
Exprobjects created viadatafusion.col()ordatafusion.lit(), or SQL expression strings. To pass named expressions use the formname=Expr.Example usage: The following will add 4 columns labeled
a,b,c, andd:from datafusion import col, lit df = df.with_columns( col("x").alias("a"), [lit(1).alias("b"), col("y").alias("c")], d=lit(3) ) Equivalent example using just SQL strings: df = df.with_columns( "x as a", ["1 as b", "y as c"], d="3" )
- Parameters:
exprs – Either a single expression, an iterable of expressions to add or SQL expression strings.
named_exprs – Named expressions in the form of
name=expr
- Returns:
DataFrame with the new columns added.
- write_csv(path: str | pathlib.Path, with_header: bool = False, write_options: DataFrameWriteOptions | None = None) None¶
Execute the
DataFrameand write the results to a CSV file.- Parameters:
path – Path of the CSV file to write.
with_header – If true, output the CSV header row.
write_options – Options that impact how the DataFrame is written.
- write_json(path: str | pathlib.Path, write_options: DataFrameWriteOptions | None = None) None¶
Execute the
DataFrameand write the results to a JSON file.- Parameters:
path – Path of the JSON file to write.
write_options – Options that impact how the DataFrame is written.
- write_parquet(path: str | pathlib.Path, compression: str, compression_level: int | None = None, write_options: DataFrameWriteOptions | None = None) None¶
- write_parquet(path: str | pathlib.Path, compression: Compression = Compression.ZSTD, compression_level: int | None = None, write_options: DataFrameWriteOptions | None = None) None
- write_parquet(path: str | pathlib.Path, compression: ParquetWriterOptions, compression_level: None = None, write_options: DataFrameWriteOptions | None = None) None
Execute the
DataFrameand write the results to a Parquet file.Available compression types are:
“uncompressed”: No compression.
“snappy”: Snappy compression.
“gzip”: Gzip compression.
“brotli”: Brotli compression.
“lz4”: LZ4 compression.
“lz4_raw”: LZ4_RAW compression.
“zstd”: Zstandard compression.
LZO compression is not yet implemented in arrow-rs and is therefore excluded.
- Parameters:
path – Path of the Parquet file to write.
compression – Compression type to use. Default is “ZSTD”.
compression_level – Compression level to use. For ZSTD, the recommended range is 1 to 22, with the default being 4. Higher levels provide better compression but slower speed.
write_options – Options that impact how the DataFrame is written.
- write_parquet_with_options(path: str | pathlib.Path, options: ParquetWriterOptions, write_options: DataFrameWriteOptions | None = None) None¶
Execute the
DataFrameand write the results to a Parquet file.Allows advanced writer options to be set with ParquetWriterOptions.
- Parameters:
path – Path of the Parquet file to write.
options – Sets the writer parquet options (see ParquetWriterOptions).
write_options – Options that impact how the DataFrame is written.
- write_table(table_name: str, write_options: DataFrameWriteOptions | None = None) None¶
Execute the
DataFrameand write the results to a table.The table must be registered with the session to perform this operation. Not all table providers support writing operations. See the individual implementations for details.
- df¶
- class datafusion.dataframe.DataFrameWriteOptions(insert_operation: InsertOp | None = None, single_file_output: bool = False, partition_by: str | collections.abc.Sequence[str] | None = None, sort_by: datafusion.expr.Expr | datafusion.expr.SortExpr | collections.abc.Sequence[datafusion.expr.Expr] | collections.abc.Sequence[datafusion.expr.SortExpr] | None = None)¶
Writer options for DataFrame.
There is no guarantee the table provider supports all writer options. See the individual implementation and documentation for details.
Instantiate writer options for DataFrame.
- _raw_write_options¶
- class datafusion.dataframe.ExplainFormat¶
Bases:
enum.EnumOutput format for explain plans.
Controls how the query plan is rendered in
DataFrame.explain().- GRAPHVIZ = 'graphviz'¶
Graphviz DOT format for graph rendering.
- INDENT = 'indent'¶
Default indented text format.
- PGJSON = 'pgjson'¶
PostgreSQL-compatible JSON format for use with visualization tools.
- TREE = 'tree'¶
Tree-style visual format with box-drawing characters.
- class datafusion.dataframe.InsertOp¶
Bases:
enum.EnumInsert operation mode.
These modes are used by the table writing feature to define how record batches should be written to a table.
- APPEND¶
Appends new rows to the existing table without modifying any existing rows.
- OVERWRITE¶
Overwrites all existing rows in the table with the new rows.
- REPLACE¶
Replace existing rows that collide with the inserted rows.
Replacement is typically based on a unique key or primary key.
- class datafusion.dataframe.ParquetColumnOptions(encoding: str | None = None, dictionary_enabled: bool | None = None, compression: str | None = None, statistics_enabled: str | None = None, bloom_filter_enabled: bool | None = None, bloom_filter_fpp: float | None = None, bloom_filter_ndv: int | None = None)¶
Parquet options for individual columns.
Contains the available options that can be applied for an individual Parquet column, replacing the global options in
ParquetWriterOptions.Initialize the ParquetColumnOptions.
- Parameters:
encoding – Sets encoding for the column path. Valid values are:
plain,plain_dictionary,rle,bit_packed,delta_binary_packed,delta_length_byte_array,delta_byte_array,rle_dictionary, andbyte_stream_split. These values are not case-sensitive. IfNone, uses the default parquet optionsdictionary_enabled – Sets if dictionary encoding is enabled for the column path. If None, uses the default parquet options
compression – Sets default parquet compression codec for the column path. Valid values are
uncompressed,snappy,gzip(level),lzo,brotli(level),lz4,zstd(level), andlz4_raw. These values are not case-sensitive. IfNone, uses the default parquet options.statistics_enabled – Sets if statistics are enabled for the column Valid values are:
none,chunk, andpageThese values are not case sensitive. IfNone, uses the default parquet options.bloom_filter_enabled – Sets if bloom filter is enabled for the column path. If
None, uses the default parquet options.bloom_filter_fpp – Sets bloom filter false positive probability for the column path. If
None, uses the default parquet options.bloom_filter_ndv – Sets bloom filter number of distinct values. If
None, uses the default parquet options.
- bloom_filter_enabled = None¶
- bloom_filter_fpp = None¶
- bloom_filter_ndv = None¶
- compression = None¶
- dictionary_enabled = None¶
- encoding = None¶
- statistics_enabled = None¶
- class datafusion.dataframe.ParquetWriterOptions(data_pagesize_limit: int = 1024 * 1024, write_batch_size: int = 1024, writer_version: str = '1.0', skip_arrow_metadata: bool = False, compression: str | None = 'zstd(3)', compression_level: int | None = None, dictionary_enabled: bool | None = True, dictionary_page_size_limit: int = 1024 * 1024, statistics_enabled: str | None = 'page', max_row_group_size: int = 1024 * 1024, created_by: str = 'datafusion-python', column_index_truncate_length: int | None = 64, statistics_truncate_length: int | None = None, data_page_row_count_limit: int = 20000, encoding: str | None = None, bloom_filter_on_write: bool = False, bloom_filter_fpp: float | None = None, bloom_filter_ndv: int | None = None, allow_single_file_parallelism: bool = True, maximum_parallel_row_group_writers: int = 1, maximum_buffered_record_batches_per_stream: int = 2, column_specific_options: dict[str, ParquetColumnOptions] | None = None)¶
Advanced parquet writer options.
Allows settings the writer options that apply to the entire file. Some options can also be set on a column by column basis, with the field
column_specific_options(seeParquetColumnOptions).Initialize the ParquetWriterOptions.
- Parameters:
data_pagesize_limit – Sets best effort maximum size of data page in bytes.
write_batch_size – Sets write_batch_size in bytes.
writer_version – Sets parquet writer version. Valid values are
1.0and2.0.skip_arrow_metadata – Skip encoding the embedded arrow metadata in the KV_meta.
compression –
Compression type to use. Default is
zstd(3). Available compression types areuncompressed: No compression.snappy: Snappy compression.gzip(n): Gzip compression with level n.brotli(n): Brotli compression with level n.lz4: LZ4 compression.lz4_raw: LZ4_RAW compression.zstd(n): Zstandard compression with level n.
compression_level – Compression level to set.
dictionary_enabled – Sets if dictionary encoding is enabled. If
None, uses the default parquet writer setting.dictionary_page_size_limit – Sets best effort maximum dictionary page size, in bytes.
statistics_enabled – Sets if statistics are enabled for any column Valid values are
none,chunk, andpage. IfNone, uses the default parquet writer setting.max_row_group_size – Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read.
created_by – Sets “created by” property.
column_index_truncate_length – Sets column index truncate length.
statistics_truncate_length – Sets statistics truncate length. If
None, uses the default parquet writer setting.data_page_row_count_limit – Sets best effort maximum number of rows in a data page.
encoding – Sets default encoding for any column. Valid values are
plain,plain_dictionary,rle,bit_packed,delta_binary_packed,delta_length_byte_array,delta_byte_array,rle_dictionary, andbyte_stream_split. IfNone, uses the default parquet writer setting.bloom_filter_on_write – Write bloom filters for all columns when creating parquet files.
bloom_filter_fpp – Sets bloom filter false positive probability. If
None, uses the default parquet writer settingbloom_filter_ndv – Sets bloom filter number of distinct values. If
None, uses the default parquet writer setting.allow_single_file_parallelism – Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of
n_files * n_row_groups * n_columns.maximum_parallel_row_group_writers – By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing
maximum_parallel_row_group_writersandmaximum_buffered_record_batches_per_streamif your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.maximum_buffered_record_batches_per_stream – See
maximum_parallel_row_group_writers.column_specific_options – Overrides options for specific columns. If a column is not a part of this dictionary, it will use the parameters provided here.
- allow_single_file_parallelism = True¶
- bloom_filter_fpp = None¶
- bloom_filter_ndv = None¶
- bloom_filter_on_write = False¶
- column_index_truncate_length = 64¶
- column_specific_options = None¶
- created_by = 'datafusion-python'¶
- data_page_row_count_limit = 20000¶
- data_pagesize_limit = 1048576¶
- dictionary_enabled = True¶
- dictionary_page_size_limit = 1048576¶
- encoding = None¶
- max_row_group_size = 1048576¶
- maximum_buffered_record_batches_per_stream = 2¶
- maximum_parallel_row_group_writers = 1¶
- skip_arrow_metadata = False¶
- statistics_enabled = 'page'¶
- statistics_truncate_length = None¶
- write_batch_size = 1024¶
- writer_version = '1.0'¶