datafusion.context¶
Session Context and it’s associated configuration.
Classes¶
Type hint for object exporting Arrow C Array via Arrow PyCapsule Interface. |
|
Type hint for object exporting Arrow C Stream via Arrow PyCapsule Interface. |
|
Type hint for object that has __datafusion_catalog_provider__ PyCapsule. |
|
See RuntimeEnvBuilder. |
|
Runtime configuration options. |
|
Options to be used when performing SQL queries. |
|
Session configuration options. |
|
This is the main interface for executing queries and creating DataFrames. |
|
Type hint for object that has __datafusion_table_provider__ PyCapsule. |
Module Contents¶
- class datafusion.context.ArrowArrayExportable¶
Bases:
ProtocolType hint for object exporting Arrow C Array via Arrow PyCapsule Interface.
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
- __arrow_c_array__(requested_schema: object | None = None) tuple[object, object]¶
- class datafusion.context.ArrowStreamExportable¶
Bases:
ProtocolType hint for object exporting Arrow C Stream via Arrow PyCapsule Interface.
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
- __arrow_c_stream__(requested_schema: object | None = None) object¶
- class datafusion.context.CatalogProviderExportable¶
Bases:
ProtocolType hint for object that has __datafusion_catalog_provider__ PyCapsule.
https://docs.rs/datafusion/latest/datafusion/catalog/trait.CatalogProvider.html
- __datafusion_catalog_provider__() object¶
- class datafusion.context.RuntimeConfig¶
Bases:
RuntimeEnvBuilderSee RuntimeEnvBuilder.
Create a new
RuntimeEnvBuilderwith default values.
- class datafusion.context.RuntimeEnvBuilder¶
Runtime configuration options.
Create a new
RuntimeEnvBuilderwith default values.- with_disk_manager_disabled() RuntimeEnvBuilder¶
Disable the disk manager, attempts to create temporary files will error.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
- with_disk_manager_os() RuntimeEnvBuilder¶
Use the operating system’s temporary directory for disk manager.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
- with_disk_manager_specified(*paths: str | pathlib.Path) RuntimeEnvBuilder¶
Use the specified paths for the disk manager’s temporary files.
- Parameters:
paths – Paths to use for the disk manager’s temporary files.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
- with_fair_spill_pool(size: int) RuntimeEnvBuilder¶
Use a fair spill pool with the specified size.
This pool works best when you know beforehand the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even when there was sufficient memory (reserved for other operators) to avoid doing so:
┌───────────────────────z──────────────────────z───────────────┐ │ z z │ │ z z │ │ Spillable z Unspillable z Free │ │ Memory z Memory z Memory │ │ z z │ │ z z │ └───────────────────────z──────────────────────z───────────────┘
- Parameters:
size – Size of the memory pool in bytes.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
Examples usage:
config = RuntimeEnvBuilder().with_fair_spill_pool(1024)
- with_greedy_memory_pool(size: int) RuntimeEnvBuilder¶
Use a greedy memory pool with the specified size.
This pool works well for queries that do not need to spill or have a single spillable operator. See
with_fair_spill_pool()if there are multiple spillable operators that all will spill.- Parameters:
size – Size of the memory pool in bytes.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
Example usage:
config = RuntimeEnvBuilder().with_greedy_memory_pool(1024)
- with_temp_file_path(path: str | pathlib.Path) RuntimeEnvBuilder¶
Use the specified path to create any needed temporary files.
- Parameters:
path – Path to use for temporary files.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
Example usage:
config = RuntimeEnvBuilder().with_temp_file_path("/tmp")
- with_unbounded_memory_pool() RuntimeEnvBuilder¶
Use an unbounded memory pool.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
- config_internal¶
- class datafusion.context.SQLOptions¶
Options to be used when performing SQL queries.
Create a new
SQLOptionswith default values.The default values are: - DDL commands are allowed - DML commands are allowed - Statements are allowed
- with_allow_ddl(allow: bool = True) SQLOptions¶
Should DDL (Data Definition Language) commands be run?
Examples of DDL commands include
CREATE TABLEandDROP TABLE.- Parameters:
allow – Allow DDL commands to be run.
- Returns:
A new
SQLOptionsobject with the updated setting.
Example usage:
options = SQLOptions().with_allow_ddl(True)
- with_allow_dml(allow: bool = True) SQLOptions¶
Should DML (Data Manipulation Language) commands be run?
Examples of DML commands include
INSERT INTOandDELETE.- Parameters:
allow – Allow DML commands to be run.
- Returns:
A new
SQLOptionsobject with the updated setting.
Example usage:
options = SQLOptions().with_allow_dml(True)
- with_allow_statements(allow: bool = True) SQLOptions¶
Should statements such as
SET VARIABLEandBEGIN TRANSACTIONbe run?- Parameters:
allow – Allow statements to be run.
- Returns:
py:class:SQLOptions` object with the updated setting.
- Return type:
A new
Example usage:
options = SQLOptions().with_allow_statements(True)
- options_internal¶
- class datafusion.context.SessionConfig(config_options: dict[str, str] | None = None)¶
Session configuration options.
Create a new
SessionConfigwith the given configuration options.- Parameters:
config_options – Configuration options.
- set(key: str, value: str) SessionConfig¶
Set a configuration option.
Args: key: Option key. value: Option value.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_batch_size(batch_size: int) SessionConfig¶
Customize batch size.
- Parameters:
batch_size – Batch size.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_create_default_catalog_and_schema(enabled: bool = True) SessionConfig¶
Control if the default catalog and schema will be automatically created.
- Parameters:
enabled – Whether the default catalog and schema will be automatically created.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_default_catalog_and_schema(catalog: str, schema: str) SessionConfig¶
Select a name for the default catalog and schema.
- Parameters:
catalog – Catalog name.
schema – Schema name.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_information_schema(enabled: bool = True) SessionConfig¶
Enable or disable the inclusion of
information_schemavirtual tables.- Parameters:
enabled – Whether to include
information_schemavirtual tables.- Returns:
A new
SessionConfigobject with the updated setting.
- with_parquet_pruning(enabled: bool = True) SessionConfig¶
Enable or disable the use of pruning predicate for parquet readers.
Pruning predicates will enable the reader to skip row groups.
- Parameters:
enabled – Whether to use pruning predicate for parquet readers.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_aggregations(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for aggregations.
Enabling this improves parallelism.
- Parameters:
enabled – Whether to use repartitioning for aggregations.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_file_min_size(size: int) SessionConfig¶
Set minimum file range size for repartitioning scans.
- Parameters:
size – Minimum file range size.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_file_scans(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for file scans.
- Parameters:
enabled – Whether to use repartitioning for file scans.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_joins(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for joins to improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for joins.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_sorts(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for window functions.
This may improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for window functions.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_windows(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for window functions.
This may improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for window functions.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_target_partitions(target_partitions: int) SessionConfig¶
Customize the number of target partitions for query execution.
Increasing partitions can increase concurrency.
- Parameters:
target_partitions – Number of target partitions.
- Returns:
A new
SessionConfigobject with the updated setting.
- config_internal¶
- class datafusion.context.SessionContext(config: SessionConfig | None = None, runtime: RuntimeEnvBuilder | None = None)¶
This is the main interface for executing queries and creating DataFrames.
See Concepts in the online documentation for more information.
Main interface for executing queries with DataFusion.
Maintains the state of the connection between a user and an instance of the connection between a user and an instance of the DataFusion engine.
- Parameters:
config – Session configuration options.
runtime – Runtime configuration options.
Example usage:
The following example demonstrates how to use the context to execute a query against a CSV data source using the
DataFrameAPI:from datafusion import SessionContext ctx = SessionContext() df = ctx.read_csv("data.csv")
- __repr__() str¶
Print a string representation of the Session Context.
- static _convert_file_sort_order(file_sort_order: collections.abc.Sequence[collections.abc.Sequence[datafusion.expr.SortKey]] | None) list[list[datafusion._internal.expr.SortExpr]] | None¶
Convert nested
SortKeysequences into raw sort expressions.Each
SortKeycan be a column name string, anExpr, or aSortExprand will be converted usingdatafusion.expr.sort_list_to_raw_sort_list().
- static _convert_table_partition_cols(table_partition_cols: list[tuple[str, str | pyarrow.DataType]]) list[tuple[str, pyarrow.DataType]]¶
- catalog(name: str = 'datafusion') datafusion.catalog.Catalog¶
Retrieve a catalog by name.
- catalog_names() set[str]¶
Returns the list of catalogs in this context.
- create_dataframe(partitions: list[list[pyarrow.RecordBatch]], name: str | None = None, schema: pyarrow.Schema | None = None) datafusion.dataframe.DataFrame¶
Create and return a dataframe using the provided partitions.
- Parameters:
partitions –
pa.RecordBatchpartitions to register.name – Resultant dataframe name.
schema – Schema for the partitions.
- Returns:
DataFrame representation of the SQL query.
- create_dataframe_from_logical_plan(plan: datafusion.plan.LogicalPlan) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom an existing plan.- Parameters:
plan – Logical plan.
- Returns:
DataFrame representation of the logical plan.
- deregister_table(name: str) None¶
Remove a table from the session.
- empty_table() datafusion.dataframe.DataFrame¶
Create an empty
DataFrame.
- enable_url_table() SessionContext¶
Control if local files can be queried as tables.
- Returns:
A new
SessionContextobject with url table enabled.
- execute(plan: datafusion.plan.ExecutionPlan, partitions: int) datafusion.record_batch.RecordBatchStream¶
Execute the
planand return the results.
- from_arrow(data: ArrowStreamExportable | ArrowArrayExportable, name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom an Arrow source.The Arrow data source can be any object that implements either
__arrow_c_stream__or__arrow_c_array__. For the latter, it must return a struct array.Arrow data can be Polars, Pandas, Pyarrow etc.
- Parameters:
data – Arrow data source.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Arrow table.
- from_arrow_table(data: pyarrow.Table, name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom an Arrow table.This is an alias for
from_arrow().
- from_pandas(data: pandas.DataFrame, name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom a Pandas DataFrame.- Parameters:
data – Pandas DataFrame.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Pandas DataFrame.
- from_polars(data: polars.DataFrame, name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom a Polars DataFrame.- Parameters:
data – Polars DataFrame.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Polars DataFrame.
- from_pydict(data: dict[str, list[Any]], name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom a dictionary.- Parameters:
data – Dictionary of lists.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the dictionary of lists.
- from_pylist(data: list[dict[str, Any]], name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom a list.- Parameters:
data – List of dictionaries.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the list of dictionaries.
- classmethod global_ctx() SessionContext¶
Retrieve the global context as a SessionContext wrapper.
- Returns:
A SessionContext object that wraps the global SessionContextInternal.
- read_avro(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_extension: str = '.avro') datafusion.dataframe.DataFrame¶
Create a
DataFramefor reading Avro data source.- Parameters:
path – Path to the Avro file.
schema – The data source schema.
file_partition_cols – Partition columns.
file_extension – File extension to select.
- Returns:
DataFrame representation of the read Avro file
- read_csv(path: str | pathlib.Path | list[str] | list[pathlib.Path], schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = 1000, file_extension: str = '.csv', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_compression_type: str | None = None) datafusion.dataframe.DataFrame¶
Read a CSV data source.
- Parameters:
path – Path to the CSV file
schema – An optional schema representing the CSV files. If None, the CSV reader will try to infer it based on data in file.
has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.
delimiter – An optional column delimiter.
schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
- Returns:
DataFrame representation of the read CSV files
- read_json(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_compression_type: str | None = None) datafusion.dataframe.DataFrame¶
Read a line-delimited JSON data source.
- Parameters:
path – Path to the JSON file.
schema – The data source schema.
schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
- Returns:
DataFrame representation of the read JSON files.
- read_parquet(path: str | pathlib.Path, table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: collections.abc.Sequence[collections.abc.Sequence[datafusion.expr.SortKey]] | None = None) datafusion.dataframe.DataFrame¶
Read a Parquet source into a
Dataframe.- Parameters:
path – Path to the Parquet file.
table_partition_cols – Partition columns.
parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.
file_extension – File extension; only files with this extension are selected for data input.
skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.
schema – An optional schema representing the parquet files. If None, the parquet reader will try to infer it based on data in the file.
file_sort_order – Sort order for the file. Each sort key can be specified as a column name (
str), an expression (Expr), or aSortExpr.
- Returns:
DataFrame representation of the read Parquet files
- read_table(table: datafusion.catalog.Table | TableProviderExportable | datafusion.dataframe.DataFrame | pyarrow.dataset.Dataset) datafusion.dataframe.DataFrame¶
Creates a
DataFramefrom a table.
- register_avro(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_extension: str = '.avro', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None) None¶
Register an Avro file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the Avro file.
schema – The data source schema.
file_extension – File extension to select.
table_partition_cols – Partition columns.
- register_catalog_provider(name: str, provider: CatalogProviderExportable | datafusion.catalog.CatalogProvider | datafusion.catalog.Catalog) None¶
Register a catalog provider.
- register_csv(name: str, path: str | pathlib.Path | list[str | pathlib.Path], schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = 1000, file_extension: str = '.csv', file_compression_type: str | None = None) None¶
Register a CSV file as a table.
The registered table can be referenced from SQL statement executed against.
- Parameters:
name – Name of the table to register.
path – Path to the CSV file. It also accepts a list of Paths.
schema – An optional schema representing the CSV file. If None, the CSV reader will try to infer it based on data in file.
has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.
delimiter – An optional column delimiter.
schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
file_compression_type – File compression type.
- register_dataset(name: str, dataset: pyarrow.dataset.Dataset) None¶
Register a
pa.dataset.Datasetas a table.- Parameters:
name – Name of the table to register.
dataset – PyArrow dataset.
- register_json(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_compression_type: str | None = None) None¶
Register a JSON file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the JSON file.
schema – The data source schema.
schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
- register_listing_table(name: str, path: str | pathlib.Path, table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_extension: str = '.parquet', schema: pyarrow.Schema | None = None, file_sort_order: collections.abc.Sequence[collections.abc.Sequence[datafusion.expr.SortKey]] | None = None) None¶
Register multiple files as a single table.
Registers a
Tablethat can assemble multiple files from locations in anObjectStoreinstance.- Parameters:
name – Name of the resultant table.
path – Path to the file to register.
table_partition_cols – Partition columns.
file_extension – File extension of the provided table.
schema – The data source schema.
file_sort_order – Sort order for the file. Each sort key can be specified as a column name (
str), an expression (Expr), or aSortExpr.
- register_object_store(schema: str, store: Any, host: str | None = None) None¶
Add a new object store into the session.
- Parameters:
schema – The data source schema.
store – The
ObjectStoreto register.host – URL for the host.
- register_parquet(name: str, path: str | pathlib.Path, table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: collections.abc.Sequence[collections.abc.Sequence[datafusion.expr.SortKey]] | None = None) None¶
Register a Parquet file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the Parquet file.
table_partition_cols – Partition columns.
parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.
file_extension – File extension; only files with this extension are selected for data input.
skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.
schema – The data source schema.
file_sort_order – Sort order for the file. Each sort key can be specified as a column name (
str), an expression (Expr), or aSortExpr.
- register_record_batches(name: str, partitions: list[list[pyarrow.RecordBatch]]) None¶
Register record batches as a table.
This function will convert the provided partitions into a table and register it into the session using the given name.
- Parameters:
name – Name of the resultant table.
partitions – Record batches to register as a table.
- register_table(name: str, table: datafusion.catalog.Table | TableProviderExportable | datafusion.dataframe.DataFrame | pyarrow.dataset.Dataset) None¶
Register a
Tablewith this context.The registered table can be referenced from SQL statements executed against this context.
- Parameters:
name – Name of the resultant table.
table – Any object that can be converted into a
Table.
- register_table_provider(name: str, provider: datafusion.catalog.Table | TableProviderExportable | datafusion.dataframe.DataFrame | pyarrow.dataset.Dataset) None¶
Register a table provider.
Deprecated: use
register_table()instead.
- register_udaf(udaf: datafusion.user_defined.AggregateUDF) None¶
Register a user-defined aggregation function (UDAF) with the context.
- register_udf(udf: datafusion.user_defined.ScalarUDF) None¶
Register a user-defined function (UDF) with the context.
- register_udtf(func: datafusion.user_defined.TableFunction) None¶
Register a user defined table function.
- register_udwf(udwf: datafusion.user_defined.WindowUDF) None¶
Register a user-defined window function (UDWF) with the context.
- register_view(name: str, df: datafusion.dataframe.DataFrame) None¶
Register a
DataFrameas a view.- Parameters:
name (str) – The name to register the view under.
df (DataFrame) – The DataFrame to be converted into a view and registered.
- session_id() str¶
Return an id that uniquely identifies this
SessionContext.
- sql(query: str, options: SQLOptions | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom SQL query text.Note: This API implements DDL statements such as
CREATE TABLEandCREATE VIEWand DML statements such asINSERT INTOwith in-memory default implementation.Seesql_with_options().- Parameters:
query – SQL query text.
options – If provided, the query will be validated against these options.
- Returns:
DataFrame representation of the SQL query.
- sql_with_options(query: str, options: SQLOptions) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom SQL query text.This function will first validate that the query is allowed by the provided options.
- Parameters:
query – SQL query text.
options – SQL options.
- Returns:
DataFrame representation of the SQL query.
- table(name: str) datafusion.dataframe.DataFrame¶
Retrieve a previously registered table by name.
- table_exist(name: str) bool¶
Return whether a table with the given name exists.
- ctx¶
- class datafusion.context.TableProviderExportable¶
Bases:
ProtocolType hint for object that has __datafusion_table_provider__ PyCapsule.
https://datafusion.apache.org/python/user-guide/io/table_provider.html
- __datafusion_table_provider__() object¶