datafusion.context¶
Session Context and it’s associated configuration.
Classes¶
Runtime configuration options. |
|
Options to be used when performing SQL queries. |
|
Session configuration options. |
|
This is the main interface for executing queries and creating DataFrames. |
Module Contents¶
- class datafusion.context.RuntimeConfig¶
Runtime configuration options.
Create a new
RuntimeConfig
with default values.- with_disk_manager_disabled() RuntimeConfig ¶
Disable the disk manager, attempts to create temporary files will error.
- Returns:
A new
RuntimeConfig
object with the updated setting.
- with_disk_manager_os() RuntimeConfig ¶
Use the operating system’s temporary directory for disk manager.
- Returns:
A new
RuntimeConfig
object with the updated setting.
- with_disk_manager_specified(*paths: str | pathlib.Path) RuntimeConfig ¶
Use the specified paths for the disk manager’s temporary files.
- Parameters:
paths – Paths to use for the disk manager’s temporary files.
- Returns:
A new
RuntimeConfig
object with the updated setting.
- with_fair_spill_pool(size: int) RuntimeConfig ¶
Use a fair spill pool with the specified size.
This pool works best when you know beforehand the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even when there was sufficient memory (reserved for other operators) to avoid doing so:
┌───────────────────────z──────────────────────z───────────────┐ │ z z │ │ z z │ │ Spillable z Unspillable z Free │ │ Memory z Memory z Memory │ │ z z │ │ z z │ └───────────────────────z──────────────────────z───────────────┘
- Parameters:
size – Size of the memory pool in bytes.
- Returns:
A new
RuntimeConfig
object with the updated setting.
Examples usage:
config = RuntimeConfig().with_fair_spill_pool(1024)
- with_greedy_memory_pool(size: int) RuntimeConfig ¶
Use a greedy memory pool with the specified size.
This pool works well for queries that do not need to spill or have a single spillable operator. See
with_fair_spill_pool()
if there are multiple spillable operators that all will spill.- Parameters:
size – Size of the memory pool in bytes.
- Returns:
A new
RuntimeConfig
object with the updated setting.
Example usage:
config = RuntimeConfig().with_greedy_memory_pool(1024)
- with_temp_file_path(path: str | pathlib.Path) RuntimeConfig ¶
Use the specified path to create any needed temporary files.
- Parameters:
path – Path to use for temporary files.
- Returns:
A new
RuntimeConfig
object with the updated setting.
Example usage:
config = RuntimeConfig().with_temp_file_path("/tmp")
- with_unbounded_memory_pool() RuntimeConfig ¶
Use an unbounded memory pool.
- Returns:
A new
RuntimeConfig
object with the updated setting.
- config_internal¶
- class datafusion.context.SQLOptions¶
Options to be used when performing SQL queries.
Create a new
SQLOptions
with default values.The default values are: - DDL commands are allowed - DML commands are allowed - Statements are allowed
- with_allow_ddl(allow: bool = True) SQLOptions ¶
Should DDL (Data Definition Language) commands be run?
Examples of DDL commands include
CREATE TABLE
andDROP TABLE
.- Parameters:
allow – Allow DDL commands to be run.
- Returns:
A new
SQLOptions
object with the updated setting.
Example usage:
options = SQLOptions().with_allow_ddl(True)
- with_allow_dml(allow: bool = True) SQLOptions ¶
Should DML (Data Manipulation Language) commands be run?
Examples of DML commands include
INSERT INTO
andDELETE
.- Parameters:
allow – Allow DML commands to be run.
- Returns:
A new
SQLOptions
object with the updated setting.
Example usage:
options = SQLOptions().with_allow_dml(True)
- with_allow_statements(allow: bool = True) SQLOptions ¶
Should statements such as
SET VARIABLE
andBEGIN TRANSACTION
be run?- Parameters:
allow – Allow statements to be run.
- Returns:
py:class:SQLOptions` object with the updated setting.
- Return type:
A new
Example usage:
options = SQLOptions().with_allow_statements(True)
- options_internal¶
- class datafusion.context.SessionConfig(config_options: dict[str, str] | None = None)¶
Session configuration options.
Create a new
SessionConfig
with the given configuration options.- Parameters:
config_options – Configuration options.
- set(key: str, value: str) SessionConfig ¶
Set a configuration option.
Args: key: Option key. value: Option value.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_batch_size(batch_size: int) SessionConfig ¶
Customize batch size.
- Parameters:
batch_size – Batch size.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_create_default_catalog_and_schema(enabled: bool = True) SessionConfig ¶
Control if the default catalog and schema will be automatically created.
- Parameters:
enabled – Whether the default catalog and schema will be automatically created.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_default_catalog_and_schema(catalog: str, schema: str) SessionConfig ¶
Select a name for the default catalog and schema.
- Parameters:
catalog – Catalog name.
schema – Schema name.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_information_schema(enabled: bool = True) SessionConfig ¶
Enable or disable the inclusion of
information_schema
virtual tables.- Parameters:
enabled – Whether to include
information_schema
virtual tables.- Returns:
A new
SessionConfig
object with the updated setting.
- with_parquet_pruning(enabled: bool = True) SessionConfig ¶
Enable or disable the use of pruning predicate for parquet readers.
Pruning predicates will enable the reader to skip row groups.
- Parameters:
enabled – Whether to use pruning predicate for parquet readers.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_repartition_aggregations(enabled: bool = True) SessionConfig ¶
Enable or disable the use of repartitioning for aggregations.
Enabling this improves parallelism.
- Parameters:
enabled – Whether to use repartitioning for aggregations.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_repartition_file_min_size(size: int) SessionConfig ¶
Set minimum file range size for repartitioning scans.
- Parameters:
size – Minimum file range size.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_repartition_file_scans(enabled: bool = True) SessionConfig ¶
Enable or disable the use of repartitioning for file scans.
- Parameters:
enabled – Whether to use repartitioning for file scans.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_repartition_joins(enabled: bool = True) SessionConfig ¶
Enable or disable the use of repartitioning for joins to improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for joins.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_repartition_sorts(enabled: bool = True) SessionConfig ¶
Enable or disable the use of repartitioning for window functions.
This may improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for window functions.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_repartition_windows(enabled: bool = True) SessionConfig ¶
Enable or disable the use of repartitioning for window functions.
This may improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for window functions.
- Returns:
A new
SessionConfig
object with the updated setting.
- with_target_partitions(target_partitions: int) SessionConfig ¶
Customize the number of target partitions for query execution.
Increasing partitions can increase concurrency.
- Parameters:
target_partitions – Number of target partitions.
- Returns:
A new
SessionConfig
object with the updated setting.
- config_internal¶
- class datafusion.context.SessionContext(config: SessionConfig | None = None, runtime: RuntimeConfig | None = None)¶
This is the main interface for executing queries and creating DataFrames.
See Concepts in the online documentation for more information.
Main interface for executing queries with DataFusion.
Maintains the state of the connection between a user and an instance of the connection between a user and an instance of the DataFusion engine.
- Parameters:
config – Session configuration options.
runtime – Runtime configuration options.
Example usage:
The following example demonstrates how to use the context to execute a query against a CSV data source using the
DataFrame
API:from datafusion import SessionContext ctx = SessionContext() df = ctx.read_csv("data.csv")
- catalog(name: str = 'datafusion') datafusion.catalog.Catalog ¶
Retrieve a catalog by name.
- create_dataframe(partitions: list[list[pyarrow.RecordBatch]], name: str | None = None, schema: pyarrow.Schema | None = None) datafusion.dataframe.DataFrame ¶
Create and return a dataframe using the provided partitions.
- Parameters:
partitions –
pyarrow.RecordBatch
partitions to register.name – Resultant dataframe name.
schema – Schema for the partitions.
- Returns:
DataFrame representation of the SQL query.
- create_dataframe_from_logical_plan(plan: datafusion._internal.LogicalPlan) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from an existing plan.- Parameters:
plan – Logical plan.
- Returns:
DataFrame representation of the logical plan.
- deregister_table(name: str) None ¶
Remove a table from the session.
- empty_table() datafusion.dataframe.DataFrame ¶
Create an empty
DataFrame
.
- execute(plan: datafusion._internal.ExecutionPlan, partitions: int) datafusion.record_batch.RecordBatchStream ¶
Execute the
plan
and return the results.
- from_arrow(data: Any, name: str | None = None) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from an Arrow source.The Arrow data source can be any object that implements either
__arrow_c_stream__
or__arrow_c_array__
. For the latter, it must return a struct array. Common examples of sources from pyarrow include- Parameters:
data – Arrow data source.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Arrow table.
- from_arrow_table(data: pyarrow.Table, name: str | None = None) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from an Arrow table.This is an alias for
from_arrow()
.
- from_pandas(data: pandas.DataFrame, name: str | None = None) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from a Pandas DataFrame.- Parameters:
data – Pandas DataFrame.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Pandas DataFrame.
- from_polars(data: polars.DataFrame, name: str | None = None) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from a Polars DataFrame.- Parameters:
data – Polars DataFrame.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Polars DataFrame.
- from_pydict(data: dict[str, list[Any]], name: str | None = None) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from a dictionary.- Parameters:
data – Dictionary of lists.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the dictionary of lists.
- from_pylist(data: list[dict[str, Any]], name: str | None = None) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from a list.- Parameters:
data – List of dictionaries.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the list of dictionaries.
- read_avro(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_partition_cols: list[tuple[str, str]] | None = None, file_extension: str = '.avro') datafusion.dataframe.DataFrame ¶
Create a
DataFrame
for reading Avro data source.- Parameters:
path – Path to the Avro file.
schema – The data source schema.
file_partition_cols – Partition columns.
file_extension – File extension to select.
- Returns:
DataFrame representation of the read Avro file
- read_csv(path: str | pathlib.Path | list[str] | list[pathlib.Path], schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = 1000, file_extension: str = '.csv', table_partition_cols: list[tuple[str, str]] | None = None, file_compression_type: str | None = None) datafusion.dataframe.DataFrame ¶
Read a CSV data source.
- Parameters:
path – Path to the CSV file
schema – An optional schema representing the CSV files. If None, the CSV reader will try to infer it based on data in file.
has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.
delimiter – An optional column delimiter.
schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
- Returns:
DataFrame representation of the read CSV files
- read_json(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str]] | None = None, file_compression_type: str | None = None) datafusion.dataframe.DataFrame ¶
Read a line-delimited JSON data source.
- Parameters:
path – Path to the JSON file.
schema – The data source schema.
schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
- Returns:
DataFrame representation of the read JSON files.
- read_parquet(path: str | pathlib.Path, table_partition_cols: list[tuple[str, str]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: list[list[datafusion.expr.Expr]] | None = None) datafusion.dataframe.DataFrame ¶
Read a Parquet source into a
Dataframe
.- Parameters:
path – Path to the Parquet file.
table_partition_cols – Partition columns.
parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.
file_extension – File extension; only files with this extension are selected for data input.
skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.
schema – An optional schema representing the parquet files. If None, the parquet reader will try to infer it based on data in the file.
file_sort_order – Sort order for the file.
- Returns:
DataFrame representation of the read Parquet files
- read_table(table: datafusion.catalog.Table) datafusion.dataframe.DataFrame ¶
Creates a
DataFrame
from a table.
- register_avro(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_extension: str = '.avro', table_partition_cols: list[tuple[str, str]] | None = None) None ¶
Register an Avro file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the Avro file.
schema – The data source schema.
file_extension – File extension to select.
table_partition_cols – Partition columns.
- register_csv(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = 1000, file_extension: str = '.csv', file_compression_type: str | None = None) None ¶
Register a CSV file as a table.
The registered table can be referenced from SQL statement executed against.
- Parameters:
name – Name of the table to register.
path – Path to the CSV file.
schema – An optional schema representing the CSV file. If None, the CSV reader will try to infer it based on data in file.
has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.
delimiter – An optional column delimiter.
schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
file_compression_type – File compression type.
- register_dataset(name: str, dataset: pyarrow.dataset.Dataset) None ¶
Register a
pyarrow.dataset.Dataset
as a table.- Parameters:
name – Name of the table to register.
dataset – PyArrow dataset.
- register_json(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str]] | None = None, file_compression_type: str | None = None) None ¶
Register a JSON file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the JSON file.
schema – The data source schema.
schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
- register_listing_table(name: str, path: str | pathlib.Path, table_partition_cols: list[tuple[str, str]] | None = None, file_extension: str = '.parquet', schema: pyarrow.Schema | None = None, file_sort_order: list[list[datafusion.expr.Expr]] | None = None) None ¶
Register multiple files as a single table.
Registers a
Table
that can assemble multiple files from locations in anObjectStore
instance.- Parameters:
name – Name of the resultant table.
path – Path to the file to register.
table_partition_cols – Partition columns.
file_extension – File extension of the provided table.
schema – The data source schema.
file_sort_order – Sort order for the file.
- register_object_store(schema: str, store: Any, host: str | None) None ¶
Add a new object store into the session.
- Parameters:
schema – The data source schema.
store – The
ObjectStore
to register.host – URL for the host.
- register_parquet(name: str, path: str | pathlib.Path, table_partition_cols: list[tuple[str, str]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: list[list[datafusion.expr.Expr]] | None = None) None ¶
Register a Parquet file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the Parquet file.
table_partition_cols – Partition columns.
parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.
file_extension – File extension; only files with this extension are selected for data input.
skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.
schema – The data source schema.
file_sort_order – Sort order for the file.
- register_record_batches(name: str, partitions: list[list[pyarrow.RecordBatch]]) None ¶
Register record batches as a table.
This function will convert the provided partitions into a table and register it into the session using the given name.
- Parameters:
name – Name of the resultant table.
partitions – Record batches to register as a table.
- register_table(name: str, table: datafusion.catalog.Table) None ¶
Register a :py:class: ~datafusion.catalog.Table as a table.
The registered table can be referenced from SQL statement executed against.
- Parameters:
name – Name of the resultant table.
table – DataFusion table to add to the session context.
- register_udaf(udaf: datafusion._internal.AggregateUDF) None ¶
Register a user-defined aggregation function (UDAF) with the context.
- register_udf(udf: datafusion.udf.ScalarUDF) None ¶
Register a user-defined function (UDF) with the context.
- session_id() str ¶
Return an id that uniquely identifies this
SessionContext
.
- sql(query: str, options: SQLOptions | None = None) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from SQL query text.Note: This API implements DDL statements such as
CREATE TABLE
andCREATE VIEW
and DML statements such asINSERT INTO
with in-memory default implementation.Seesql_with_options()
.- Parameters:
query – SQL query text.
options – If provided, the query will be validated against these options.
- Returns:
DataFrame representation of the SQL query.
- sql_with_options(query: str, options: SQLOptions) datafusion.dataframe.DataFrame ¶
Create a
DataFrame
from SQL query text.This function will first validate that the query is allowed by the provided options.
- Parameters:
query – SQL query text.
options – SQL options.
- Returns:
DataFrame representation of the SQL query.
- table(name: str) datafusion.dataframe.DataFrame ¶
Retrieve a previously registered table by name.
- table_exist(name: str) bool ¶
Return whether a table with the given name exists.
- config¶
- ctx¶
- runtime¶