datafusion.context¶
Session Context and it’s associated configuration.
Classes¶
Type hint for object exporting Arrow C Array via Arrow PyCapsule Interface. |
|
Type hint for object exporting Arrow C Stream via Arrow PyCapsule Interface. |
|
Runtime configuration options. |
|
Options to be used when performing SQL queries. |
|
Session configuration options. |
|
This is the main interface for executing queries and creating DataFrames. |
|
Type hint for object that has __datafusion_table_provider__ PyCapsule. |
Module Contents¶
- class datafusion.context.ArrowArrayExportable¶
Bases:
ProtocolType hint for object exporting Arrow C Array via Arrow PyCapsule Interface.
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
- __arrow_c_array__(requested_schema: object | None = None) tuple[object, object]¶
- class datafusion.context.ArrowStreamExportable¶
Bases:
ProtocolType hint for object exporting Arrow C Stream via Arrow PyCapsule Interface.
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
- __arrow_c_stream__(requested_schema: object | None = None) object¶
- class datafusion.context.RuntimeEnvBuilder¶
Runtime configuration options.
Create a new
RuntimeEnvBuilderwith default values.- with_disk_manager_disabled() RuntimeEnvBuilder¶
Disable the disk manager, attempts to create temporary files will error.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
- with_disk_manager_os() RuntimeEnvBuilder¶
Use the operating system’s temporary directory for disk manager.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
- with_disk_manager_specified(*paths: str | pathlib.Path) RuntimeEnvBuilder¶
Use the specified paths for the disk manager’s temporary files.
- Parameters:
paths – Paths to use for the disk manager’s temporary files.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
- with_fair_spill_pool(size: int) RuntimeEnvBuilder¶
Use a fair spill pool with the specified size.
This pool works best when you know beforehand the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even when there was sufficient memory (reserved for other operators) to avoid doing so:
┌───────────────────────z──────────────────────z───────────────┐ │ z z │ │ z z │ │ Spillable z Unspillable z Free │ │ Memory z Memory z Memory │ │ z z │ │ z z │ └───────────────────────z──────────────────────z───────────────┘
- Parameters:
size – Size of the memory pool in bytes.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
Examples
>>> config = dfn.RuntimeEnvBuilder().with_fair_spill_pool(1024)
- with_greedy_memory_pool(size: int) RuntimeEnvBuilder¶
Use a greedy memory pool with the specified size.
This pool works well for queries that do not need to spill or have a single spillable operator. See
with_fair_spill_pool()if there are multiple spillable operators that all will spill.- Parameters:
size – Size of the memory pool in bytes.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
Examples
>>> config = dfn.RuntimeEnvBuilder().with_greedy_memory_pool(1024)
- with_temp_file_path(path: str | pathlib.Path) RuntimeEnvBuilder¶
Use the specified path to create any needed temporary files.
- Parameters:
path – Path to use for temporary files.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
Examples
>>> config = dfn.RuntimeEnvBuilder().with_temp_file_path("/tmp")
- with_unbounded_memory_pool() RuntimeEnvBuilder¶
Use an unbounded memory pool.
- Returns:
A new
RuntimeEnvBuilderobject with the updated setting.
- config_internal¶
- class datafusion.context.SQLOptions¶
Options to be used when performing SQL queries.
Create a new
SQLOptionswith default values.The default values are: - DDL commands are allowed - DML commands are allowed - Statements are allowed
- with_allow_ddl(allow: bool = True) SQLOptions¶
Should DDL (Data Definition Language) commands be run?
Examples of DDL commands include
CREATE TABLEandDROP TABLE.- Parameters:
allow – Allow DDL commands to be run.
- Returns:
A new
SQLOptionsobject with the updated setting.
Examples
>>> options = dfn.SQLOptions().with_allow_ddl(True)
- with_allow_dml(allow: bool = True) SQLOptions¶
Should DML (Data Manipulation Language) commands be run?
Examples of DML commands include
INSERT INTOandDELETE.- Parameters:
allow – Allow DML commands to be run.
- Returns:
A new
SQLOptionsobject with the updated setting.
Examples
>>> options = dfn.SQLOptions().with_allow_dml(True)
- with_allow_statements(allow: bool = True) SQLOptions¶
Should statements such as
SET VARIABLEandBEGIN TRANSACTIONbe run?- Parameters:
allow – Allow statements to be run.
- Returns:
py:class:SQLOptions` object with the updated setting.
- Return type:
A new
Examples
>>> options = dfn.SQLOptions().with_allow_statements(True)
- options_internal¶
- class datafusion.context.SessionConfig(config_options: dict[str, str] | None = None)¶
Session configuration options.
Create a new
SessionConfigwith the given configuration options.- Parameters:
config_options – Configuration options.
- set(key: str, value: str) SessionConfig¶
Set a configuration option.
Args: key: Option key. value: Option value.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_batch_size(batch_size: int) SessionConfig¶
Customize batch size.
- Parameters:
batch_size – Batch size.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_create_default_catalog_and_schema(enabled: bool = True) SessionConfig¶
Control if the default catalog and schema will be automatically created.
- Parameters:
enabled – Whether the default catalog and schema will be automatically created.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_default_catalog_and_schema(catalog: str, schema: str) SessionConfig¶
Select a name for the default catalog and schema.
- Parameters:
catalog – Catalog name.
schema – Schema name.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_extension(extension: Any) SessionConfig¶
Create a new configuration using an extension.
- Parameters:
extension – A custom configuration extension object. These are
library. (shared from another DataFusion extension)
- Returns:
A new
SessionConfigobject with the updated setting.
- with_information_schema(enabled: bool = True) SessionConfig¶
Enable or disable the inclusion of
information_schemavirtual tables.- Parameters:
enabled – Whether to include
information_schemavirtual tables.- Returns:
A new
SessionConfigobject with the updated setting.
- with_parquet_pruning(enabled: bool = True) SessionConfig¶
Enable or disable the use of pruning predicate for parquet readers.
Pruning predicates will enable the reader to skip row groups.
- Parameters:
enabled – Whether to use pruning predicate for parquet readers.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_aggregations(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for aggregations.
Enabling this improves parallelism.
- Parameters:
enabled – Whether to use repartitioning for aggregations.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_file_min_size(size: int) SessionConfig¶
Set minimum file range size for repartitioning scans.
- Parameters:
size – Minimum file range size.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_file_scans(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for file scans.
- Parameters:
enabled – Whether to use repartitioning for file scans.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_joins(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for joins to improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for joins.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_sorts(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for window functions.
This may improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for window functions.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_repartition_windows(enabled: bool = True) SessionConfig¶
Enable or disable the use of repartitioning for window functions.
This may improve parallelism.
- Parameters:
enabled – Whether to use repartitioning for window functions.
- Returns:
A new
SessionConfigobject with the updated setting.
- with_target_partitions(target_partitions: int) SessionConfig¶
Customize the number of target partitions for query execution.
Increasing partitions can increase concurrency.
- Parameters:
target_partitions – Number of target partitions.
- Returns:
A new
SessionConfigobject with the updated setting.
- config_internal¶
- class datafusion.context.SessionContext(config: SessionConfig | None = None, runtime: RuntimeEnvBuilder | None = None)¶
This is the main interface for executing queries and creating DataFrames.
See Concepts in the online documentation for more information.
Main interface for executing queries with DataFusion.
Maintains the state of the connection between a user and an instance of the connection between a user and an instance of the DataFusion engine.
- Parameters:
config – Session configuration options.
runtime – Runtime configuration options.
Example usage:
The following example demonstrates how to use the context to execute a query against a CSV data source using the
DataFrameAPI:from datafusion import SessionContext ctx = SessionContext() df = ctx.read_csv("data.csv")
- __datafusion_logical_extension_codec__() Any¶
Access the PyCapsule FFI_LogicalExtensionCodec.
- __datafusion_task_context_provider__() Any¶
Access the PyCapsule FFI_TaskContextProvider.
- __repr__() str¶
Print a string representation of the Session Context.
- static _convert_file_sort_order(file_sort_order: collections.abc.Sequence[collections.abc.Sequence[datafusion.expr.SortKey]] | None) list[list[datafusion._internal.expr.SortExpr]] | None¶
Convert nested
SortKeysequences into raw sort expressions.Each
SortKeycan be a column name string, anExpr, or aSortExprand will be converted usingdatafusion.expr.sort_list_to_raw_sort_list().
- static _convert_table_partition_cols(table_partition_cols: list[tuple[str, str | pyarrow.DataType]]) list[tuple[str, pyarrow.DataType]]¶
- catalog(name: str = 'datafusion') datafusion.catalog.Catalog¶
Retrieve a catalog by name.
- catalog_names() set[str]¶
Returns the list of catalogs in this context.
- create_dataframe(partitions: list[list[pyarrow.RecordBatch]], name: str | None = None, schema: pyarrow.Schema | None = None) datafusion.dataframe.DataFrame¶
Create and return a dataframe using the provided partitions.
- Parameters:
partitions –
pa.RecordBatchpartitions to register.name – Resultant dataframe name.
schema – Schema for the partitions.
- Returns:
DataFrame representation of the SQL query.
- create_dataframe_from_logical_plan(plan: datafusion.plan.LogicalPlan) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom an existing plan.- Parameters:
plan – Logical plan.
- Returns:
DataFrame representation of the logical plan.
- deregister_object_store(schema: str, host: str | None = None) None¶
Remove an object store from the session.
- Parameters:
schema – The data source schema (e.g.
"s3://").host – URL for the host (e.g. bucket name).
- deregister_table(name: str) None¶
Remove a table from the session.
- deregister_udaf(name: str) None¶
Remove a user-defined aggregate function from the session.
- Parameters:
name – Name of the UDAF to deregister.
- deregister_udf(name: str) None¶
Remove a user-defined scalar function from the session.
- Parameters:
name – Name of the UDF to deregister.
- deregister_udtf(name: str) None¶
Remove a user-defined table function from the session.
- Parameters:
name – Name of the UDTF to deregister.
- deregister_udwf(name: str) None¶
Remove a user-defined window function from the session.
- Parameters:
name – Name of the UDWF to deregister.
- empty_table() datafusion.dataframe.DataFrame¶
Create an empty
DataFrame.
- enable_ident_normalization() bool¶
Return whether identifier normalization (lowercasing) is enabled.
Examples
>>> ctx = SessionContext() >>> ctx.enable_ident_normalization() True
- enable_url_table() SessionContext¶
Control if local files can be queried as tables.
- Returns:
A new
SessionContextobject with url table enabled.
- execute(plan: datafusion.plan.ExecutionPlan, partitions: int) datafusion.record_batch.RecordBatchStream¶
Execute the
planand return the results.
- execute_logical_plan(plan: datafusion.plan.LogicalPlan) datafusion.dataframe.DataFrame¶
Execute a
LogicalPlanand return a DataFrame.- Parameters:
plan – Logical plan to execute.
- Returns:
DataFrame resulting from the execution.
Examples
>>> ctx = SessionContext() >>> df = ctx.from_pydict({"a": [1, 2, 3]}) >>> plan = df.logical_plan() >>> df2 = ctx.execute_logical_plan(plan) >>> df2.collect()[0].column(0) <pyarrow.lib.Int64Array object at ...> [ 1, 2, 3 ]
- from_arrow(data: ArrowStreamExportable | ArrowArrayExportable, name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom an Arrow source.The Arrow data source can be any object that implements either
__arrow_c_stream__or__arrow_c_array__. For the latter, it must return a struct array.Arrow data can be Polars, Pandas, Pyarrow etc.
- Parameters:
data – Arrow data source.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Arrow table.
- from_pandas(data: pandas.DataFrame, name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom a Pandas DataFrame.- Parameters:
data – Pandas DataFrame.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Pandas DataFrame.
- from_polars(data: polars.DataFrame, name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom a Polars DataFrame.- Parameters:
data – Polars DataFrame.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the Polars DataFrame.
- from_pydict(data: dict[str, list[Any]], name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom a dictionary.- Parameters:
data – Dictionary of lists.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the dictionary of lists.
- from_pylist(data: list[dict[str, Any]], name: str | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom a list.- Parameters:
data – List of dictionaries.
name – Name of the DataFrame.
- Returns:
DataFrame representation of the list of dictionaries.
- classmethod global_ctx() SessionContext¶
Retrieve the global context as a SessionContext wrapper.
- Returns:
A SessionContext object that wraps the global SessionContextInternal.
- parse_sql_expr(sql: str, schema: datafusion.common.DFSchema) datafusion.expr.Expr¶
Parse a SQL expression string into a logical expression.
- Parameters:
sql – SQL expression string.
schema – Schema to use for resolving column references.
- Returns:
Parsed expression.
Examples
>>> from datafusion.common import DFSchema >>> ctx = SessionContext() >>> schema = DFSchema.empty() >>> ctx.parse_sql_expr("1 + 2", schema=schema) Expr(Int64(1) + Int64(2))
- read_arrow(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_extension: str = '.arrow', file_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None) datafusion.dataframe.DataFrame¶
Create a
DataFramefor reading an Arrow IPC data source.- Parameters:
path – Path to the Arrow IPC file.
schema – The data source schema.
file_extension – File extension to select.
file_partition_cols – Partition columns.
- Returns:
DataFrame representation of the read Arrow IPC file.
Examples
>>> import tempfile, os >>> ctx = dfn.SessionContext() >>> table = pa.table({"a": [1, 2, 3]}) >>> with tempfile.TemporaryDirectory() as tmpdir: ... path = os.path.join(tmpdir, "data.arrow") ... with pa.ipc.new_file(path, table.schema) as writer: ... writer.write_table(table) ... df = ctx.read_arrow(path) ... df.collect()[0].column(0) <pyarrow.lib.Int64Array object at ...> [ 1, 2, 3 ]
Provide an explicit
schemato override schema inference:>>> with tempfile.TemporaryDirectory() as tmpdir: ... path = os.path.join(tmpdir, "data.arrow") ... with pa.ipc.new_file(path, table.schema) as writer: ... writer.write_table(table) ... df = ctx.read_arrow(path, schema=pa.schema([("a", pa.int64())])) ... df.collect()[0].column(0) <pyarrow.lib.Int64Array object at ...> [ 1, 2, 3 ]
Use
file_extensionto read files with a non-default extension:>>> with tempfile.TemporaryDirectory() as tmpdir: ... path = os.path.join(tmpdir, "data.ipc") ... with pa.ipc.new_file(path, table.schema) as writer: ... writer.write_table(table) ... df = ctx.read_arrow(path, file_extension=".ipc") ... df.collect()[0].column(0) <pyarrow.lib.Int64Array object at ...> [ 1, 2, 3 ]
- read_avro(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_extension: str = '.avro') datafusion.dataframe.DataFrame¶
Create a
DataFramefor reading Avro data source.- Parameters:
path – Path to the Avro file.
schema – The data source schema.
file_partition_cols – Partition columns.
file_extension – File extension to select.
- Returns:
DataFrame representation of the read Avro file
- read_csv(path: str | pathlib.Path | list[str] | list[pathlib.Path], schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = DEFAULT_MAX_INFER_SCHEMA, file_extension: str = '.csv', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_compression_type: str | None = None, options: datafusion.options.CsvReadOptions | None = None) datafusion.dataframe.DataFrame¶
Read a CSV data source.
- Parameters:
path – Path to the CSV file
schema – An optional schema representing the CSV files. If None, the CSV reader will try to infer it based on data in file.
has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.
delimiter – An optional column delimiter.
schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
options – Set advanced options for CSV reading. This cannot be combined with any of the other options in this method.
- Returns:
DataFrame representation of the read CSV files
- read_empty() datafusion.dataframe.DataFrame¶
Create an empty
DataFramewith no columns or rows.See also
This is an alias for
empty_table().
- read_json(path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_compression_type: str | None = None) datafusion.dataframe.DataFrame¶
Read a line-delimited JSON data source.
- Parameters:
path – Path to the JSON file.
schema – The data source schema.
schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
- Returns:
DataFrame representation of the read JSON files.
- read_parquet(path: str | pathlib.Path, table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: collections.abc.Sequence[collections.abc.Sequence[datafusion.expr.SortKey]] | None = None) datafusion.dataframe.DataFrame¶
Read a Parquet source into a
Dataframe.- Parameters:
path – Path to the Parquet file.
table_partition_cols – Partition columns.
parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.
file_extension – File extension; only files with this extension are selected for data input.
skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.
schema – An optional schema representing the parquet files. If None, the parquet reader will try to infer it based on data in the file.
file_sort_order – Sort order for the file. Each sort key can be specified as a column name (
str), an expression (Expr), or aSortExpr.
- Returns:
DataFrame representation of the read Parquet files
- read_table(table: datafusion.catalog.Table | TableProviderExportable | datafusion.dataframe.DataFrame | pyarrow.dataset.Dataset) datafusion.dataframe.DataFrame¶
Creates a
DataFramefrom a table.
- refresh_catalogs() None¶
Refresh catalog metadata.
Examples
>>> ctx = SessionContext() >>> ctx.refresh_catalogs()
- register_arrow(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_extension: str = '.arrow', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None) None¶
Register an Arrow IPC file as a table.
The registered table can be referenced from SQL statements executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the Arrow IPC file.
schema – The data source schema.
file_extension – File extension to select.
table_partition_cols – Partition columns.
Examples
>>> import tempfile, os >>> ctx = dfn.SessionContext() >>> table = pa.table({"x": [10, 20, 30]}) >>> with tempfile.TemporaryDirectory() as tmpdir: ... path = os.path.join(tmpdir, "data.arrow") ... with pa.ipc.new_file(path, table.schema) as writer: ... writer.write_table(table) ... ctx.register_arrow("arrow_tbl", path) ... ctx.sql("SELECT * FROM arrow_tbl").collect()[0].column(0) <pyarrow.lib.Int64Array object at ...> [ 10, 20, 30 ]
Provide an explicit
schemato override schema inference:>>> with tempfile.TemporaryDirectory() as tmpdir: ... path = os.path.join(tmpdir, "data.arrow") ... with pa.ipc.new_file(path, table.schema) as writer: ... writer.write_table(table) ... ctx.register_arrow( ... "arrow_schema", ... path, ... schema=pa.schema([("x", pa.int64())]), ... ) ... ctx.sql("SELECT * FROM arrow_schema").collect()[0].column(0) <pyarrow.lib.Int64Array object at ...> [ 10, 20, 30 ]
Use
file_extensionto read files with a non-default extension:>>> with tempfile.TemporaryDirectory() as tmpdir: ... path = os.path.join(tmpdir, "data.ipc") ... with pa.ipc.new_file(path, table.schema) as writer: ... writer.write_table(table) ... ctx.register_arrow( ... "arrow_ipc", path, file_extension=".ipc" ... ) ... ctx.sql("SELECT * FROM arrow_ipc").collect()[0].column(0) <pyarrow.lib.Int64Array object at ...> [ 10, 20, 30 ]
- register_avro(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, file_extension: str = '.avro', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None) None¶
Register an Avro file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the Avro file.
schema – The data source schema.
file_extension – File extension to select.
table_partition_cols – Partition columns.
- register_batch(name: str, batch: pyarrow.RecordBatch) None¶
Register a single
pa.RecordBatchas a table.- Parameters:
name – Name of the resultant table.
batch – Record batch to register as a table.
Examples
>>> ctx = dfn.SessionContext() >>> batch = pa.RecordBatch.from_pydict({"a": [1, 2, 3]}) >>> ctx.register_batch("batch_tbl", batch) >>> ctx.sql("SELECT * FROM batch_tbl").collect()[0].column(0) <pyarrow.lib.Int64Array object at ...> [ 1, 2, 3 ]
- register_catalog_provider(name: str, provider: datafusion.catalog.CatalogProviderExportable | datafusion.catalog.CatalogProvider | datafusion.catalog.Catalog) None¶
Register a catalog provider.
- register_catalog_provider_list(provider: datafusion.catalog.CatalogProviderListExportable | datafusion.catalog.CatalogProviderList | datafusion.catalog.CatalogList) None¶
Register a catalog provider list.
- register_csv(name: str, path: str | pathlib.Path | list[str | pathlib.Path], schema: pyarrow.Schema | None = None, has_header: bool = True, delimiter: str = ',', schema_infer_max_records: int = DEFAULT_MAX_INFER_SCHEMA, file_extension: str = '.csv', file_compression_type: str | None = None, options: datafusion.options.CsvReadOptions | None = None) None¶
Register a CSV file as a table.
The registered table can be referenced from SQL statement executed against.
- Parameters:
name – Name of the table to register.
path – Path to the CSV file. It also accepts a list of Paths.
schema – An optional schema representing the CSV file. If None, the CSV reader will try to infer it based on data in file.
has_header – Whether the CSV file have a header. If schema inference is run on a file with no headers, default column names are created.
delimiter – An optional column delimiter.
schema_infer_max_records – Maximum number of rows to read from CSV files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
file_compression_type – File compression type.
options – Set advanced options for CSV reading. This cannot be combined with any of the other options in this method.
- register_dataset(name: str, dataset: pyarrow.dataset.Dataset) None¶
Register a
pa.dataset.Datasetas a table.- Parameters:
name – Name of the table to register.
dataset – PyArrow dataset.
- register_json(name: str, path: str | pathlib.Path, schema: pyarrow.Schema | None = None, schema_infer_max_records: int = 1000, file_extension: str = '.json', table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_compression_type: str | None = None) None¶
Register a JSON file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the JSON file.
schema – The data source schema.
schema_infer_max_records – Maximum number of rows to read from JSON files for schema inference if needed.
file_extension – File extension; only files with this extension are selected for data input.
table_partition_cols – Partition columns.
file_compression_type – File compression type.
- register_listing_table(name: str, path: str | pathlib.Path, table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, file_extension: str = '.parquet', schema: pyarrow.Schema | None = None, file_sort_order: collections.abc.Sequence[collections.abc.Sequence[datafusion.expr.SortKey]] | None = None) None¶
Register multiple files as a single table.
Registers a
Tablethat can assemble multiple files from locations in anObjectStoreinstance.- Parameters:
name – Name of the resultant table.
path – Path to the file to register.
table_partition_cols – Partition columns.
file_extension – File extension of the provided table.
schema – The data source schema.
file_sort_order – Sort order for the file. Each sort key can be specified as a column name (
str), an expression (Expr), or aSortExpr.
- register_object_store(schema: str, store: Any, host: str | None = None) None¶
Add a new object store into the session.
- Parameters:
schema – The data source schema.
store – The
ObjectStoreto register.host – URL for the host.
- register_parquet(name: str, path: str | pathlib.Path, table_partition_cols: list[tuple[str, str | pyarrow.DataType]] | None = None, parquet_pruning: bool = True, file_extension: str = '.parquet', skip_metadata: bool = True, schema: pyarrow.Schema | None = None, file_sort_order: collections.abc.Sequence[collections.abc.Sequence[datafusion.expr.SortKey]] | None = None) None¶
Register a Parquet file as a table.
The registered table can be referenced from SQL statement executed against this context.
- Parameters:
name – Name of the table to register.
path – Path to the Parquet file.
table_partition_cols – Partition columns.
parquet_pruning – Whether the parquet reader should use the predicate to prune row groups.
file_extension – File extension; only files with this extension are selected for data input.
skip_metadata – Whether the parquet reader should skip any metadata that may be in the file schema. This can help avoid schema conflicts due to metadata.
schema – The data source schema.
file_sort_order – Sort order for the file. Each sort key can be specified as a column name (
str), an expression (Expr), or aSortExpr.
- register_record_batches(name: str, partitions: list[list[pyarrow.RecordBatch]]) None¶
Register record batches as a table.
This function will convert the provided partitions into a table and register it into the session using the given name.
- Parameters:
name – Name of the resultant table.
partitions – Record batches to register as a table.
- register_table(name: str, table: datafusion.catalog.Table | TableProviderExportable | datafusion.dataframe.DataFrame | pyarrow.dataset.Dataset) None¶
Register a
Tablewith this context.The registered table can be referenced from SQL statements executed against this context.
- Parameters:
name – Name of the resultant table.
table – Any object that can be converted into a
Table.
- register_table_factory(format: str, factory: datafusion.catalog.TableProviderFactory | datafusion.catalog.TableProviderFactoryExportable) None¶
Register a
TableProviderFactoryExportable.The registered factory can be referenced from SQL DDL statements executed against this context.
- Parameters:
format – The value to be used in STORED AS ${format} clause.
factory – A PyCapsule that implements
TableProviderFactoryExportable
- register_table_provider(name: str, provider: datafusion.catalog.Table | TableProviderExportable | datafusion.dataframe.DataFrame | pyarrow.dataset.Dataset) None¶
Register a table provider.
Deprecated: use
register_table()instead.
- register_udaf(udaf: datafusion.user_defined.AggregateUDF) None¶
Register a user-defined aggregation function (UDAF) with the context.
- register_udf(udf: datafusion.user_defined.ScalarUDF) None¶
Register a user-defined function (UDF) with the context.
- register_udtf(func: datafusion.user_defined.TableFunction) None¶
Register a user defined table function.
- register_udwf(udwf: datafusion.user_defined.WindowUDF) None¶
Register a user-defined window function (UDWF) with the context.
- register_view(name: str, df: datafusion.dataframe.DataFrame) None¶
Register a
DataFrameas a view.- Parameters:
name (str) – The name to register the view under.
df (DataFrame) – The DataFrame to be converted into a view and registered.
- remove_optimizer_rule(name: str) bool¶
Remove an optimizer rule by name.
- Parameters:
name – Name of the optimizer rule to remove.
- Returns:
True if a rule with the given name was found and removed.
Examples
>>> ctx = SessionContext() >>> ctx.remove_optimizer_rule("nonexistent_rule") False
- session_id() str¶
Return an id that uniquely identifies this
SessionContext.
- session_start_time() str¶
Return the session start time as an RFC 3339 formatted string.
Examples
>>> ctx = SessionContext() >>> ctx.session_start_time() '2026-01-01T12:34:56.123456789+00:00'
- sql(query: str, options: SQLOptions | None = None, param_values: dict[str, Any] | None = None, **named_params: Any) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom SQL query text.See the online documentation for a description of how to perform parameterized substitution via either the
param_valuesoption or passing innamed_params.Note: This API implements DDL statements such as
CREATE TABLEandCREATE VIEWand DML statements such asINSERT INTOwith in-memory default implementation.Seesql_with_options().- Parameters:
query – SQL query text.
options – If provided, the query will be validated against these options.
param_values – Provides substitution of scalar values in the query after parsing.
named_params – Provides string or DataFrame substitution in the query string.
- Returns:
DataFrame representation of the SQL query.
- sql_with_options(query: str, options: SQLOptions, param_values: dict[str, Any] | None = None, **named_params: Any) datafusion.dataframe.DataFrame¶
Create a
DataFramefrom SQL query text.This function will first validate that the query is allowed by the provided options.
- Parameters:
query – SQL query text.
options – SQL options.
param_values – Provides substitution of scalar values in the query after parsing.
named_params – Provides string or DataFrame substitution in the query string.
- Returns:
DataFrame representation of the SQL query.
- table(name: str) datafusion.dataframe.DataFrame¶
Retrieve a previously registered table by name.
- table_exist(name: str) bool¶
Return whether a table with the given name exists.
- table_provider(name: str) datafusion.catalog.Table¶
Return the
Tablefor the given table name.- Parameters:
name – Name of the table.
- Returns:
The table provider.
- Raises:
KeyError – If the table is not found.
Examples
>>> import pyarrow as pa >>> ctx = SessionContext() >>> batch = pa.RecordBatch.from_pydict({"x": [1, 2]}) >>> ctx.register_record_batches("my_table", [[batch]]) >>> tbl = ctx.table_provider("my_table") >>> tbl.schema x: int64
- with_logical_extension_codec(codec: Any) SessionContext¶
Create a new session context with specified codec.
This only supports codecs that have been implemented using the FFI interface.
- ctx¶
- class datafusion.context.TableProviderExportable¶
Bases:
ProtocolType hint for object that has __datafusion_table_provider__ PyCapsule.
https://datafusion.apache.org/python/user-guide/io/table_provider.html
- __datafusion_table_provider__(session: Any) object¶