datafusion.dataframe

DataFrame is one of the core concepts in DataFusion.

See Concepts in the online documentation for more information.

Classes

Compression

Enum representing the available compression types for Parquet files.

DataFrame

Two dimensional table representation of data.

ParquetColumnOptions

Parquet options for individual columns.

ParquetWriterOptions

Advanced parquet writer options.

Module Contents

class datafusion.dataframe.Compression(*args, **kwds)

Bases: enum.Enum

Enum representing the available compression types for Parquet files.

classmethod from_str(value: str) Compression

Convert a string to a Compression enum value.

Parameters:

value – The string representation of the compression type.

Returns:

The Compression enum lowercase value.

Raises:

ValueError – If the string does not match any Compression enum value.

get_default_level() int | None

Get the default compression level for the compression type.

Returns:

The default compression level for the compression type.

BROTLI = 'brotli'
GZIP = 'gzip'
LZ4 = 'lz4'
LZ4_RAW = 'lz4_raw'
SNAPPY = 'snappy'
UNCOMPRESSED = 'uncompressed'
ZSTD = 'zstd'
class datafusion.dataframe.DataFrame(df: datafusion._internal.DataFrame)

Two dimensional table representation of data.

See Concepts in the online documentation for more information.

This constructor is not to be used by the end user.

See SessionContext for methods to create a DataFrame.

__arrow_c_stream__(requested_schema: object | None = None) object

Export an Arrow PyCapsule Stream.

This will execute and collect the DataFrame. We will attempt to respect the requested schema, but only trivial transformations will be applied such as only returning the fields listed in the requested schema if their data types match those in the DataFrame.

Parameters:

requested_schema – Attempt to provide the DataFrame using this schema.

Returns:

Arrow PyCapsule object.

__getitem__(key: str | list[str]) DataFrame

Return a new :py:class`DataFrame` with the specified column or columns.

Parameters:

key – Column name or list of column names to select.

Returns:

DataFrame with the specified column or columns.

__repr__() str

Return a string representation of the DataFrame.

Returns:

String representation of the DataFrame.

_repr_html_() str
aggregate(group_by: list[datafusion.expr.Expr] | datafusion.expr.Expr, aggs: list[datafusion.expr.Expr] | datafusion.expr.Expr) DataFrame

Aggregates the rows of the current DataFrame.

Parameters:
  • group_by – List of expressions to group by.

  • aggs – List of expressions to aggregate.

Returns:

DataFrame after aggregation.

cache() DataFrame

Cache the DataFrame as a memory table.

Returns:

Cached DataFrame.

cast(mapping: dict[str, pyarrow.DataType[Any]]) DataFrame

Cast one or more columns to a different data type.

Parameters:

mapping – Mapped with column as key and column dtype as value.

Returns:

DataFrame after casting columns

collect() list[pyarrow.RecordBatch]

Execute this DataFrame and collect results into memory.

Prior to calling collect, modifying a DataFrme simply updates a plan (no actual computation is performed). Calling collect triggers the computation.

Returns:

List of pyarrow.RecordBatch collected from the DataFrame.

collect_partitioned() list[list[pyarrow.RecordBatch]]

Execute this DataFrame and collect all partitioned results.

This operation returns pyarrow.RecordBatch maintaining the input partitioning.

Returns:

List of list of RecordBatch collected from the

DataFrame.

count() int

Return the total number of rows in this DataFrame.

Note that this method will actually run a plan to calculate the count, which may be slow for large or complicated DataFrames.

Returns:

Number of rows in the DataFrame.

static default_str_repr(batches: list[pyarrow.RecordBatch], schema: pyarrow.Schema, has_more: bool, table_uuid: str | None = None) str

Return the default string representation of a DataFrame.

This method is used by the default formatter and implemented in Rust for performance reasons.

describe() DataFrame

Return the statistics for this DataFrame.

Only summarized numeric datatypes at the moments and returns nulls for non-numeric datatypes.

The output format is modeled after pandas.

Returns:

A summary DataFrame containing statistics.

distinct() DataFrame

Return a new DataFrame with all duplicated rows removed.

Returns:

DataFrame after removing duplicates.

drop(*columns: str) DataFrame

Drop arbitrary amount of columns.

Parameters:

columns – Column names to drop from the dataframe.

Returns:

DataFrame with those columns removed in the projection.

except_all(other: DataFrame) DataFrame

Calculate the exception of two DataFrame.

The two DataFrame must have exactly the same schema.

Parameters:

other – DataFrame to calculate exception with.

Returns:

DataFrame after exception.

execute_stream() datafusion.record_batch.RecordBatchStream

Executes this DataFrame and returns a stream over a single partition.

Returns:

Record Batch Stream over a single partition.

execute_stream_partitioned() list[datafusion.record_batch.RecordBatchStream]

Executes this DataFrame and returns a stream for each partition.

Returns:

One record batch stream per partition.

execution_plan() datafusion.plan.ExecutionPlan

Return the execution/physical plan.

Returns:

Execution plan.

explain(verbose: bool = False, analyze: bool = False) None

Print an explanation of the DataFrame’s plan so far.

If analyze is specified, runs the plan and reports metrics.

Parameters:
  • verbose – If True, more details will be included.

  • analyze – If Tru`e, the plan will run and metrics reported.

fill_null(value: Any, subset: list[str] | None = None) DataFrame

Fill null values in specified columns with a value.

Parameters:
  • value – Value to replace nulls with. Will be cast to match column type.

  • subset – Optional list of column names to fill. If None, fills all columns.

Returns:

DataFrame with null values replaced where type casting is possible

Examples

>>> df = df.fill_null(0)  # Fill all nulls with 0 where possible
>>> # Fill nulls in specific string columns
>>> df = df.fill_null("missing", subset=["name", "category"])

Notes

  • Only fills nulls in columns where the value can be cast to the column type

  • For columns where casting fails, the original column is kept unchanged

  • For columns not in subset, the original column is kept unchanged

filter(*predicates: datafusion.expr.Expr) DataFrame

Return a DataFrame for which predicate evaluates to True.

Rows for which predicate evaluates to False or None are filtered out. If more than one predicate is provided, these predicates will be combined as a logical AND. If more complex logic is required, see the logical operations in functions.

Parameters:

predicates – Predicate expression(s) to filter the DataFrame.

Returns:

DataFrame after filtering.

head(n: int = 5) DataFrame

Return a new DataFrame with a limited number of rows.

Parameters:

n – Number of rows to take from the head of the DataFrame.

Returns:

DataFrame after limiting.

intersect(other: DataFrame) DataFrame

Calculate the intersection of two DataFrame.

The two DataFrame must have exactly the same schema.

Parameters:

other – DataFrame to intersect with.

Returns:

DataFrame after intersection.

into_view() pyarrow.Table

Convert DataFrame as a ViewTable which can be used in register_table.

join(right: DataFrame, on: str | Sequence[str], how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti'] = 'inner', *, left_on: None = None, right_on: None = None, join_keys: None = None) DataFrame
join(right: DataFrame, on: None = None, how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti'] = 'inner', *, left_on: str | Sequence[str], right_on: str | Sequence[str], join_keys: tuple[list[str], list[str]] | None = None) DataFrame
join(right: DataFrame, on: None = None, how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti'] = 'inner', *, join_keys: tuple[list[str], list[str]], left_on: None = None, right_on: None = None) DataFrame

Join this DataFrame with another DataFrame.

on has to be provided or both left_on and right_on in conjunction.

Parameters:
  • right – Other DataFrame to join with.

  • on – Column names to join on in both dataframes.

  • how – Type of join to perform. Supported types are “inner”, “left”, “right”, “full”, “semi”, “anti”.

  • left_on – Join column of the left dataframe.

  • right_on – Join column of the right dataframe.

  • join_keys – Tuple of two lists of column names to join on. [Deprecated]

Returns:

DataFrame after join.

join_on(right: DataFrame, *on_exprs: datafusion.expr.Expr, how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti'] = 'inner') DataFrame

Join two DataFrame using the specified expressions.

On expressions are used to support in-equality predicates. Equality predicates are correctly optimized

Parameters:
  • right – Other DataFrame to join with.

  • on_exprs – single or multiple (in)-equality predicates.

  • how – Type of join to perform. Supported types are “inner”, “left”, “right”, “full”, “semi”, “anti”.

Returns:

DataFrame after join.

limit(count: int, offset: int = 0) DataFrame

Return a new DataFrame with a limited number of rows.

Parameters:
  • count – Number of rows to limit the DataFrame to.

  • offset – Number of rows to skip.

Returns:

DataFrame after limiting.

logical_plan() datafusion.plan.LogicalPlan

Return the unoptimized LogicalPlan.

Returns:

Unoptimized logical plan.

optimized_logical_plan() datafusion.plan.LogicalPlan

Return the optimized LogicalPlan.

Returns:

Optimized logical plan.

repartition(num: int) DataFrame

Repartition a DataFrame into num partitions.

The batches allocation uses a round-robin algorithm.

Parameters:

num – Number of partitions to repartition the DataFrame into.

Returns:

Repartitioned DataFrame.

repartition_by_hash(*exprs: datafusion.expr.Expr, num: int) DataFrame

Repartition a DataFrame using a hash partitioning scheme.

Parameters:
  • exprs – Expressions to evaluate and perform hashing on.

  • num – Number of partitions to repartition the DataFrame into.

Returns:

Repartitioned DataFrame.

schema() pyarrow.Schema

Return the pyarrow.Schema of this DataFrame.

The output schema contains information on the name, data type, and nullability for each column.

Returns:

Describing schema of the DataFrame

select(*exprs: datafusion.expr.Expr | str) DataFrame

Project arbitrary expressions into a new DataFrame.

Parameters:

exprs – Either column names or Expr to select.

Returns:

DataFrame after projection. It has one column for each expression.

Example usage:

The following example will return 3 columns from the original dataframe. The first two columns will be the original column a and b since the string “a” is assumed to refer to column selection. Also a duplicate of column a will be returned with the column name alternate_a:

df = df.select("a", col("b"), col("a").alias("alternate_a"))
select_columns(*args: str) DataFrame

Filter the DataFrame by columns.

Returns:

DataFrame only containing the specified columns.

show(num: int = 20) None

Execute the DataFrame and print the result to the console.

Parameters:

num – Number of lines to show.

sort(*exprs: datafusion.expr.Expr | datafusion.expr.SortExpr) DataFrame

Sort the DataFrame by the specified sorting expressions.

Note that any expression can be turned into a sort expression by calling its` sort method.

Parameters:

exprs – Sort expressions, applied in order.

Returns:

DataFrame after sorting.

tail(n: int = 5) DataFrame

Return a new DataFrame with a limited number of rows.

Be aware this could be potentially expensive since the row size needs to be determined of the dataframe. This is done by collecting it.

Parameters:

n – Number of rows to take from the tail of the DataFrame.

Returns:

DataFrame after limiting.

to_arrow_table() pyarrow.Table

Execute the DataFrame and convert it into an Arrow Table.

Returns:

Arrow Table.

to_pandas() pandas.DataFrame

Execute the DataFrame and convert it into a Pandas DataFrame.

Returns:

Pandas DataFrame.

to_polars() polars.DataFrame

Execute the DataFrame and convert it into a Polars DataFrame.

Returns:

Polars DataFrame.

to_pydict() dict[str, list[Any]]

Execute the DataFrame and convert it into a dictionary of lists.

Returns:

Dictionary of lists.

to_pylist() list[dict[str, Any]]

Execute the DataFrame and convert it into a list of dictionaries.

Returns:

List of dictionaries.

transform(func: Callable[Ellipsis, DataFrame], *args: Any) DataFrame

Apply a function to the current DataFrame which returns another DataFrame.

This is useful for chaining together multiple functions. For example:

def add_3(df: DataFrame) -> DataFrame:
    return df.with_column("modified", lit(3))

def within_limit(df: DataFrame, limit: int) -> DataFrame:
    return df.filter(col("a") < lit(limit)).distinct()

df = df.transform(modify_df).transform(within_limit, 4)
Parameters:
  • func – A callable function that takes a DataFrame as it’s first argument

  • args – Zero or more arguments to pass to func

Returns:

After applying func to the original dataframe.

Return type:

DataFrame

union(other: DataFrame, distinct: bool = False) DataFrame

Calculate the union of two DataFrame.

The two DataFrame must have exactly the same schema.

Parameters:
  • other – DataFrame to union with.

  • distinct – If True, duplicate rows will be removed.

Returns:

DataFrame after union.

union_distinct(other: DataFrame) DataFrame

Calculate the distinct union of two DataFrame.

The two DataFrame must have exactly the same schema. Any duplicate rows are discarded.

Parameters:

other – DataFrame to union with.

Returns:

DataFrame after union.

unnest_columns(*columns: str, preserve_nulls: bool = True) DataFrame

Expand columns of arrays into a single row per array element.

Parameters:
  • columns – Column names to perform unnest operation on.

  • preserve_nulls – If False, rows with null entries will not be returned.

Returns:

A DataFrame with the columns expanded.

with_column(name: str, expr: datafusion.expr.Expr) DataFrame

Add an additional column to the DataFrame.

Parameters:
  • name – Name of the column to add.

  • expr – Expression to compute the column.

Returns:

DataFrame with the new column.

with_column_renamed(old_name: str, new_name: str) DataFrame

Rename one column by applying a new projection.

This is a no-op if the column to be renamed does not exist.

The method supports case sensitive rename with wrapping column name into one the following symbols (” or ‘ or `).

Parameters:
  • old_name – Old column name.

  • new_name – New column name.

Returns:

DataFrame with the column renamed.

with_columns(*exprs: datafusion.expr.Expr | Iterable[datafusion.expr.Expr], **named_exprs: datafusion.expr.Expr) DataFrame

Add columns to the DataFrame.

By passing expressions, iteratables of expressions, or named expressions. To pass named expressions use the form name=Expr.

Example usage: The following will add 4 columns labeled a, b, c, and d:

df = df.with_columns(
    lit(0).alias('a'),
    [lit(1).alias('b'), lit(2).alias('c')],
    d=lit(3)
    )
Parameters:
  • exprs – Either a single expression or an iterable of expressions to add.

  • named_exprs – Named expressions in the form of name=expr

Returns:

DataFrame with the new columns added.

write_csv(path: str | pathlib.Path, with_header: bool = False) None

Execute the DataFrame and write the results to a CSV file.

Parameters:
  • path – Path of the CSV file to write.

  • with_header – If true, output the CSV header row.

write_json(path: str | pathlib.Path) None

Execute the DataFrame and write the results to a JSON file.

Parameters:

path – Path of the JSON file to write.

write_parquet(path: str | pathlib.Path, compression: str, compression_level: int | None = None) None
write_parquet(path: str | pathlib.Path, compression: Compression = Compression.ZSTD, compression_level: int | None = None) None
write_parquet(path: str | pathlib.Path, compression: ParquetWriterOptions, compression_level: None = None) None

Execute the DataFrame and write the results to a Parquet file.

Parameters:
  • path – Path of the Parquet file to write.

  • compression – Compression type to use. Default is “ZSTD”. Available compression types are: - “uncompressed”: No compression. - “snappy”: Snappy compression. - “gzip”: Gzip compression. - “brotli”: Brotli compression. - “lz4”: LZ4 compression. - “lz4_raw”: LZ4_RAW compression. - “zstd”: Zstandard compression.

  • Note – LZO is not yet implemented in arrow-rs and is therefore excluded.

  • compression_level – Compression level to use. For ZSTD, the recommended range is 1 to 22, with the default being 4. Higher levels provide better compression but slower speed.

write_parquet_with_options(path: str | pathlib.Path, options: ParquetWriterOptions) None

Execute the DataFrame and write the results to a Parquet file.

Allows advanced writer options to be set with ParquetWriterOptions.

Parameters:
  • path – Path of the Parquet file to write.

  • options – Sets the writer parquet options (see ParquetWriterOptions).

df
class datafusion.dataframe.ParquetColumnOptions(encoding: str | None = None, dictionary_enabled: bool | None = None, compression: str | None = None, statistics_enabled: str | None = None, bloom_filter_enabled: bool | None = None, bloom_filter_fpp: float | None = None, bloom_filter_ndv: int | None = None)

Parquet options for individual columns.

Contains the available options that can be applied for an individual Parquet column, replacing the global options in ParquetWriterOptions.

encoding

Sets encoding for the column path. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case-sensitive. If None, uses the default parquet options

dictionary_enabled

Sets if dictionary encoding is enabled for the column path. If None, uses the default parquet options

compression

Sets default parquet compression codec for the column path. Valid values are uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case-sensitive. If None, uses the default parquet options.

statistics_enabled

Sets if statistics are enabled for the column Valid values are: none, chunk, and page These values are not case sensitive. If None, uses the default parquet options.

bloom_filter_enabled

Sets if bloom filter is enabled for the column path. If None, uses the default parquet options.

bloom_filter_fpp

Sets bloom filter false positive probability for the column path. If None, uses the default parquet options.

bloom_filter_ndv

Sets bloom filter number of distinct values. If None, uses the default parquet options.

Initialize the ParquetColumnOptions.

bloom_filter_enabled = None
bloom_filter_fpp = None
bloom_filter_ndv = None
compression = None
dictionary_enabled = None
encoding = None
statistics_enabled = None
class datafusion.dataframe.ParquetWriterOptions(data_pagesize_limit: int = 1024 * 1024, write_batch_size: int = 1024, writer_version: str = '1.0', skip_arrow_metadata: bool = False, compression: str | None = 'zstd(3)', compression_level: int | None = None, dictionary_enabled: bool | None = True, dictionary_page_size_limit: int = 1024 * 1024, statistics_enabled: str | None = 'page', max_row_group_size: int = 1024 * 1024, created_by: str = 'datafusion-python', column_index_truncate_length: int | None = 64, statistics_truncate_length: int | None = None, data_page_row_count_limit: int = 20000, encoding: str | None = None, bloom_filter_on_write: bool = False, bloom_filter_fpp: float | None = None, bloom_filter_ndv: int | None = None, allow_single_file_parallelism: bool = True, maximum_parallel_row_group_writers: int = 1, maximum_buffered_record_batches_per_stream: int = 2, column_specific_options: dict[str, ParquetColumnOptions] | None = None)

Advanced parquet writer options.

Allows settings the writer options that apply to the entire file. Some options can also be set on a column by column basis, with the field column_specific_options (see ParquetColumnOptions).

data_pagesize_limit

Sets best effort maximum size of data page in bytes.

write_batch_size

Sets write_batch_size in bytes.

writer_version

Sets parquet writer version. Valid values are 1.0 and 2.0.

skip_arrow_metadata

Skip encoding the embedded arrow metadata in the KV_meta.

compression

Compression type to use. Default is “zstd(3)”. Available compression types are - “uncompressed”: No compression. - “snappy”: Snappy compression. - “gzip(n)”: Gzip compression with level n. - “brotli(n)”: Brotli compression with level n. - “lz4”: LZ4 compression. - “lz4_raw”: LZ4_RAW compression. - “zstd(n)”: Zstandard compression with level n.

dictionary_enabled

Sets if dictionary encoding is enabled. If None, uses the default parquet writer setting.

dictionary_page_size_limit

Sets best effort maximum dictionary page size, in bytes.

statistics_enabled

Sets if statistics are enabled for any column Valid values are none, chunk, and page. If None, uses the default parquet writer setting.

max_row_group_size

Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read.

created_by

Sets “created by” property.

column_index_truncate_length

Sets column index truncate length.

statistics_truncate_length

Sets statistics truncate length. If None, uses the default parquet writer setting.

data_page_row_count_limit

Sets best effort maximum number of rows in a data page.

encoding

Sets default encoding for any column. Valid values are plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. If None, uses the default parquet writer setting.

bloom_filter_on_write

Write bloom filters for all columns when creating parquet files.

bloom_filter_fpp

Sets bloom filter false positive probability. If None, uses the default parquet writer setting

bloom_filter_ndv

Sets bloom filter number of distinct values. If None, uses the default parquet writer setting.

allow_single_file_parallelism

Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files * n_row_groups * n_columns.

maximum_parallel_row_group_writers

By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.

maximum_buffered_record_batches_per_stream

See maximum_parallel_row_group_writers.

column_specific_options

Overrides options for specific columns. If a column is not a part of this dictionary, it will use the parameters provided here.

Initialize the ParquetWriterOptions.

allow_single_file_parallelism = True
bloom_filter_fpp = None
bloom_filter_ndv = None
bloom_filter_on_write = False
column_index_truncate_length = 64
column_specific_options = None
created_by = 'datafusion-python'
data_page_row_count_limit = 20000
data_pagesize_limit = 1048576
dictionary_enabled = True
dictionary_page_size_limit = 1048576
encoding = None
max_row_group_size = 1048576
maximum_buffered_record_batches_per_stream = 2
maximum_parallel_row_group_writers = 1
skip_arrow_metadata = False
statistics_enabled = 'page'
statistics_truncate_length = None
write_batch_size = 1024
writer_version = '1.0'