datafusion.context

Session Context and it’s associated configuration.

Classes

RuntimeConfig

Runtime configuration options.

SQLOptions

Options to be used when performing SQL queries.

SessionConfig

Session configuration options.

SessionContext

This is the main interface for executing queries and creating DataFrames.

Module Contents

class datafusion.context.RuntimeConfig

Runtime configuration options.

Create a new RuntimeConfig with default values.

with_disk_manager_disabled() RuntimeConfig

Disable the disk manager, attempts to create temporary files will error.

Returns:

A new RuntimeConfig object with the updated setting.

with_disk_manager_os() RuntimeConfig

Use the operating system’s temporary directory for disk manager.

Returns:

A new RuntimeConfig object with the updated setting.

with_disk_manager_specified(*paths: str | pathlib.Path) RuntimeConfig

Use the specified paths for the disk manager’s temporary files.

Parameters:

paths – Paths to use for the disk manager’s temporary files.

Returns:

A new RuntimeConfig object with the updated setting.

with_fair_spill_pool(size: int) RuntimeConfig

Use a fair spill pool with the specified size.

This pool works best when you know beforehand the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even when there was sufficient memory (reserved for other operators) to avoid doing so:

┌───────────────────────z──────────────────────z───────────────┐
│                       z                      z               │
│                       z                      z               │
│       Spillable       z       Unspillable    z     Free      │
│        Memory         z        Memory        z    Memory     │
│                       z                      z               │
│                       z                      z               │
└───────────────────────z──────────────────────z───────────────┘
Parameters:

size – Size of the memory pool in bytes.

Returns:

A new RuntimeConfig object with the updated setting.

Examples usage:

config = RuntimeConfig().with_fair_spill_pool(1024)
with_greedy_memory_pool(size: int) RuntimeConfig

Use a greedy memory pool with the specified size.

This pool works well for queries that do not need to spill or have a single spillable operator. See with_fair_spill_pool() if there are multiple spillable operators that all will spill.

Parameters:

size – Size of the memory pool in bytes.

Returns:

A new RuntimeConfig object with the updated setting.

Example usage:

config = RuntimeConfig().with_greedy_memory_pool(1024)
with_temp_file_path(path: str | pathlib.Path) RuntimeConfig

Use the specified path to create any needed temporary files.

Parameters:

path – Path to use for temporary files.

Returns:

A new RuntimeConfig object with the updated setting.

Example usage:

config = RuntimeConfig().with_temp_file_path("/tmp")
with_unbounded_memory_pool() RuntimeConfig

Use an unbounded memory pool.

Returns:

A new RuntimeConfig object with the updated setting.

config_internal
class datafusion.context.SQLOptions

Options to be used when performing SQL queries.

Create a new SQLOptions with default values.

The default values are: - DDL commands are allowed - DML commands are allowed - Statements are allowed

with_allow_ddl(allow: bool = True) SQLOptions

Should DDL (Data Definition Language) commands be run?

Examples of DDL commands include CREATE TABLE and DROP TABLE.

Parameters:

allow – Allow DDL commands to be run.

Returns:

A new SQLOptions object with the updated setting.

Example usage:

options = SQLOptions().with_allow_ddl(True)
with_allow_dml(allow: bool = True) SQLOptions

Should DML (Data Manipulation Language) commands be run?

Examples of DML commands include INSERT INTO and DELETE.

Parameters:

allow – Allow DML commands to be run.

Returns:

A new SQLOptions object with the updated setting.

Example usage:

options = SQLOptions().with_allow_dml(True)
with_allow_statements(allow: bool = True) SQLOptions

Should statements such as SET VARIABLE and BEGIN TRANSACTION be run?

Parameters:

allow – Allow statements to be run.

Returns:

py:class:SQLOptions` object with the updated setting.

Return type:

A new

Example usage:

options = SQLOptions().with_allow_statements(True)
options_internal
class datafusion.context.SessionConfig(config_options: dict[str, str] | None = None)

Session configuration options.

Create a new SessionConfig with the given configuration options.

Parameters:

config_options – Configuration options.

set(key: str, value: str) SessionConfig

Set a configuration option.

Args: key: Option key. value: Option value.

Returns:

A new SessionConfig object with the updated setting.

with_batch_size(batch_size: int) SessionConfig

Customize batch size.

Parameters:

batch_size – Batch size.

Returns:

A new SessionConfig object with the updated setting.

with_create_default_catalog_and_schema(enabled: bool = True) SessionConfig

Control if the default catalog and schema will be automatically created.

Parameters:

enabled – Whether the default catalog and schema will be automatically created.

Returns:

A new SessionConfig object with the updated setting.

with_default_catalog_and_schema(catalog: str, schema: str) SessionConfig

Select a name for the default catalog and schema.

Parameters:
  • catalog – Catalog name.

  • schema – Schema name.

Returns:

A new SessionConfig object with the updated setting.

with_information_schema(enabled: bool = True) SessionConfig

Enable or disable the inclusion of information_schema virtual tables.

Parameters:

enabled – Whether to include information_schema virtual tables.

Returns:

A new SessionConfig object with the updated setting.

with_parquet_pruning(enabled: bool = True) SessionConfig

Enable or disable the use of pruning predicate for parquet readers.

Pruning predicates will enable the reader to skip row groups.

Parameters:

enabled – Whether to use pruning predicate for parquet readers.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_aggregations(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for aggregations.

Enabling this improves parallelism.

Parameters:

enabled – Whether to use repartitioning for aggregations.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_file_min_size(size: int) SessionConfig

Set minimum file range size for repartitioning scans.

Parameters:

size – Minimum file range size.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_file_scans(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for file scans.

Parameters:

enabled – Whether to use repartitioning for file scans.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_joins(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for joins to improve parallelism.

Parameters:

enabled – Whether to use repartitioning for joins.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_sorts(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for window functions.

This may improve parallelism.

Parameters:

enabled – Whether to use repartitioning for window functions.

Returns:

A new SessionConfig object with the updated setting.

with_repartition_windows(enabled: bool = True) SessionConfig

Enable or disable the use of repartitioning for window functions.

This may improve parallelism.

Parameters:

enabled – Whether to use repartitioning for window functions.

Returns:

A new SessionConfig object with the updated setting.

with_target_partitions(target_partitions: int) SessionConfig

Customize the number of target partitions for query execution.

Increasing partitions can increase concurrency.

Parameters:

target_partitions – Number of target partitions.

Returns:

A new SessionConfig object with the updated setting.

config_internal
class datafusion.context.SessionContext(config: SessionConfig | None = None, runtime: RuntimeConfig | None = None)

This is the main interface for executing queries and creating DataFrames.

See Concepts in the online documentation for more information.

Main interface for executing queries with DataFusion.

Maintains the state of the connection between a user and an instance of the connection between a user and an instance of the DataFusion engine.

Parameters:
  • config – Session configuration options.

  • runtime – Runtime configuration options.

Example usage:

The following example demonstrates how to use the context to execute a query against a CSV data source using the DataFrame API:

from datafusion import SessionContext

ctx = SessionContext()
df = ctx.read_csv("data.csv")
catalog(name: str = 'datafusion') datafusion.catalog.Catalog

Retrieve a catalog by name.

create_dataframe(partitions: list[list[pyarrow.RecordBatch]], name: str | None = None, schema: pyarrow.Schema | None = None) datafusion.dataframe.DataFrame

Create and return a dataframe using the provided partitions.

Parameters:
  • partitionspyarrow.RecordBatch partitions to register.

  • name – Resultant dataframe name.

  • schema – Schema for the partitions.

Returns:

DataFrame representation of the SQL query.

create_dataframe_from_logical_plan(plan: datafusion.plan.LogicalPlan) datafusion.dataframe.DataFrame

Create a DataFrame from an existing plan.

Parameters:

plan – Logical plan.

Returns:

DataFrame representation of the logical plan.

deregister_table(name: str) None

Remove a table from the session.

empty_table() datafusion.dataframe.DataFrame

Create an empty DataFrame.

execute(plan: datafusion.plan.ExecutionPlan, partitions: int) datafusion.record_batch.RecordBatchStream

Execute the plan and return the results.

from_arrow(data: Any, name: str | None = None) datafusion.dataframe.DataFrame

Create a DataFrame from an Arrow source.

The Arrow data source can be any object that implements either __arrow_c_stream__ or __arrow_c_array__. For the latter, it must return a struct array. Common examples of sources from pyarrow include

Parameters:
  • data – Arrow data source.

  • name – Name of the DataFrame.

Returns:

DataFrame representation of the Arrow table.

from_arrow_table(data: pyarrow.Table, name: str | None = None) datafusion.dataframe.DataFrame

Create a DataFrame from an Arrow table.

This is an alias for from_arrow().

from_pandas(data: pandas.DataFrame, name: str | None = None) datafusion.dataframe.DataFrame

Create a DataFrame from a Pandas DataFrame.

Parameters:
  • data – Pandas DataFrame.

  • name – Name of the DataFrame.

Returns:

DataFrame representation of the Pandas DataFrame.

from_polars(data: polars.DataFrame, name: str | None = None) datafusion.dataframe.DataFrame

Create a DataFrame from a Polars DataFrame.

Parameters:
  • data – Polars DataFrame.

  • name – Name of the DataFrame.

Returns:

DataFrame representation of the Polars DataFrame.

from_pydict(data: dict[str, list[Any]], name: str | None = None) datafusion.dataframe.DataFrame

Create a DataFrame from a dictionary.

Parameters:
  • data – Dictionary of lists.

  • name – Name of the DataFrame.

Returns:

DataFrame representation of the dictionary of lists.

from_pylist(data: list[dict[str, Any]], name: str | None = None) datafusion.dataframe.DataFrame

Create a DataFrame from a list.

Parameters:
  • data – List of dictionaries.

  • name – Name of the DataFrame.

Returns:

DataFrame representation of the list of dictionaries.

read_avro(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_partition_cols: list[tuple[str, str]] | None = None, file_extension: str = '.avro') datafusion.dataframe.DataFrame

Create a DataFrame for reading Avro data source.

Parameters:
  • path – Path to the Avro file.

  • schema – The data source schema.

  • file_partition_cols – Partition columns.

  • file_extension – File extension to select.

Returns:

DataFrame representation of the read Avro file

read_csv(path: str | pathlib.Path | list[str] | list[pathlib.Path], schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = 1000, file_extension: str = '.csv', table_partition_cols: list[tuple[str, str]] | None = None, file_compression_type: str | None = None) datafusion.dataframe.DataFrame

Read a CSV data source.

Parameters:
  • path – Path to the CSV file

  • schema – An optional schema representing the CSV files. If None, the CSV reader will try to infer it based on data in file.

  • has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.

  • delimiter – An optional column delimiter.

  • schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.

  • file_extension – File extension; only files with this extension are selected for data input.

  • table_partition_cols – Partition columns.

  • file_compression_type – File compression type.

Returns:

DataFrame representation of the read CSV files

read_json(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str]] | None = None, file_compression_type: str | None = None) datafusion.dataframe.DataFrame

Read a line-delimited JSON data source.

Parameters:
  • path – Path to the JSON file.

  • schema – The data source schema.

  • schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.

  • file_extension – File extension; only files with this extension are selected for data input.

  • table_partition_cols – Partition columns.

  • file_compression_type – File compression type.

Returns:

DataFrame representation of the read JSON files.

read_parquet(path: str | pathlib.Path, table_partition_cols: list[tuple[str, str]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: list[list[datafusion.expr.Expr]] | None = None) datafusion.dataframe.DataFrame

Read a Parquet source into a Dataframe.

Parameters:
  • path – Path to the Parquet file.

  • table_partition_cols – Partition columns.

  • parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.

  • file_extension – File extension; only files with this extension are selected for data input.

  • skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.

  • schema – An optional schema representing the parquet files. If None, the parquet reader will try to infer it based on data in the file.

  • file_sort_order – Sort order for the file.

Returns:

DataFrame representation of the read Parquet files

read_table(table: datafusion.catalog.Table) datafusion.dataframe.DataFrame

Creates a DataFrame from a table.

For a Table such as a ListingTable, create a DataFrame.

register_avro(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_extension: str = '.avro', table_partition_cols: list[tuple[str, str]] | None = None) None

Register an Avro file as a table.

The registered table can be referenced from SQL statement executed against this context.

Parameters:
  • name – Name of the table to register.

  • path – Path to the Avro file.

  • schema – The data source schema.

  • file_extension – File extension to select.

  • table_partition_cols – Partition columns.

register_csv(name: str, path: str | pathlib.Path | list[str | pathlib.Path], schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = 1000, file_extension: str = '.csv', file_compression_type: str | None = None) None

Register a CSV file as a table.

The registered table can be referenced from SQL statement executed against.

Parameters:
  • name – Name of the table to register.

  • path – Path to the CSV file. It also accepts a list of Paths.

  • schema – An optional schema representing the CSV file. If None, the CSV reader will try to infer it based on data in file.

  • has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.

  • delimiter – An optional column delimiter.

  • schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.

  • file_extension – File extension; only files with this extension are selected for data input.

  • file_compression_type – File compression type.

register_dataset(name: str, dataset: pyarrow.dataset.Dataset) None

Register a pyarrow.dataset.Dataset as a table.

Parameters:
  • name – Name of the table to register.

  • dataset – PyArrow dataset.

register_json(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str]] | None = None, file_compression_type: str | None = None) None

Register a JSON file as a table.

The registered table can be referenced from SQL statement executed against this context.

Parameters:
  • name – Name of the table to register.

  • path – Path to the JSON file.

  • schema – The data source schema.

  • schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.

  • file_extension – File extension; only files with this extension are selected for data input.

  • table_partition_cols – Partition columns.

  • file_compression_type – File compression type.

register_listing_table(name: str, path: str | pathlib.Path, table_partition_cols: list[tuple[str, str]] | None = None, file_extension: str = '.parquet', schema: pyarrow.Schema | None = None, file_sort_order: list[list[datafusion.expr.Expr | datafusion.expr.SortExpr]] | None = None) None

Register multiple files as a single table.

Registers a Table that can assemble multiple files from locations in an ObjectStore instance.

Parameters:
  • name – Name of the resultant table.

  • path – Path to the file to register.

  • table_partition_cols – Partition columns.

  • file_extension – File extension of the provided table.

  • schema – The data source schema.

  • file_sort_order – Sort order for the file.

register_object_store(schema: str, store: Any, host: str | None = None) None

Add a new object store into the session.

Parameters:
  • schema – The data source schema.

  • store – The ObjectStore to register.

  • host – URL for the host.

register_parquet(name: str, path: str | pathlib.Path, table_partition_cols: list[tuple[str, str]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: list[list[datafusion.expr.Expr]] | None = None) None

Register a Parquet file as a table.

The registered table can be referenced from SQL statement executed against this context.

Parameters:
  • name – Name of the table to register.

  • path – Path to the Parquet file.

  • table_partition_cols – Partition columns.

  • parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.

  • file_extension – File extension; only files with this extension are selected for data input.

  • skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.

  • schema – The data source schema.

  • file_sort_order – Sort order for the file.

register_record_batches(name: str, partitions: list[list[pyarrow.RecordBatch]]) None

Register record batches as a table.

This function will convert the provided partitions into a table and register it into the session using the given name.

Parameters:
  • name – Name of the resultant table.

  • partitions – Record batches to register as a table.

register_table(name: str, table: datafusion.catalog.Table) None

Register a :py:class: ~datafusion.catalog.Table as a table.

The registered table can be referenced from SQL statement executed against.

Parameters:
  • name – Name of the resultant table.

  • table – DataFusion table to add to the session context.

register_udaf(udaf: datafusion.udf.AggregateUDF) None

Register a user-defined aggregation function (UDAF) with the context.

register_udf(udf: datafusion.udf.ScalarUDF) None

Register a user-defined function (UDF) with the context.

register_udwf(udwf: datafusion.udf.WindowUDF) None

Register a user-defined window function (UDWF) with the context.

session_id() str

Return an id that uniquely identifies this SessionContext.

sql(query: str, options: SQLOptions | None = None) datafusion.dataframe.DataFrame

Create a DataFrame from SQL query text.

Note: This API implements DDL statements such as CREATE TABLE and CREATE VIEW and DML statements such as INSERT INTO with in-memory default implementation.See sql_with_options().

Parameters:
  • query – SQL query text.

  • options – If provided, the query will be validated against these options.

Returns:

DataFrame representation of the SQL query.

sql_with_options(query: str, options: SQLOptions) datafusion.dataframe.DataFrame

Create a DataFrame from SQL query text.

This function will first validate that the query is allowed by the provided options.

Parameters:
  • query – SQL query text.

  • options – SQL options.

Returns:

DataFrame representation of the SQL query.

table(name: str) datafusion.dataframe.DataFrame

Retrieve a previously registered table by name.

table_exist(name: str) bool

Return whether a table with the given name exists.

config
ctx
runtime