Data Sources¶
DataFusion provides a wide variety of ways to get data into a DataFrame to perform operations.
Local file¶
DataFusion has the ability to read from a variety of popular file formats, such as Parquet, CSV, JSON, and AVRO.
In [1]: from datafusion import SessionContext
In [2]: ctx = SessionContext()
In [3]: df = ctx.read_csv("pokemon.csv")
In [4]: df.show()
DataFrame()
+----+---------------------------+--------+--------+-------+----+--------+---------+---------+---------+-------+------------+-----------+
| #  | Name                      | Type 1 | Type 2 | Total | HP | Attack | Defense | Sp. Atk | Sp. Def | Speed | Generation | Legendary |
+----+---------------------------+--------+--------+-------+----+--------+---------+---------+---------+-------+------------+-----------+
| 1  | Bulbasaur                 | Grass  | Poison | 318   | 45 | 49     | 49      | 65      | 65      | 45    | 1          | false     |
| 2  | Ivysaur                   | Grass  | Poison | 405   | 60 | 62     | 63      | 80      | 80      | 60    | 1          | false     |
| 3  | Venusaur                  | Grass  | Poison | 525   | 80 | 82     | 83      | 100     | 100     | 80    | 1          | false     |
| 3  | VenusaurMega Venusaur     | Grass  | Poison | 625   | 80 | 100    | 123     | 122     | 120     | 80    | 1          | false     |
| 4  | Charmander                | Fire   |        | 309   | 39 | 52     | 43      | 60      | 50      | 65    | 1          | false     |
| 5  | Charmeleon                | Fire   |        | 405   | 58 | 64     | 58      | 80      | 65      | 80    | 1          | false     |
| 6  | Charizard                 | Fire   | Flying | 534   | 78 | 84     | 78      | 109     | 85      | 100   | 1          | false     |
| 6  | CharizardMega Charizard X | Fire   | Dragon | 634   | 78 | 130    | 111     | 130     | 85      | 100   | 1          | false     |
| 6  | CharizardMega Charizard Y | Fire   | Flying | 634   | 78 | 104    | 78      | 159     | 115     | 100   | 1          | false     |
| 7  | Squirtle                  | Water  |        | 314   | 44 | 48     | 65      | 50      | 64      | 43    | 1          | false     |
| 8  | Wartortle                 | Water  |        | 405   | 59 | 63     | 80      | 65      | 80      | 58    | 1          | false     |
| 9  | Blastoise                 | Water  |        | 530   | 79 | 83     | 100     | 85      | 105     | 78    | 1          | false     |
| 9  | BlastoiseMega Blastoise   | Water  |        | 630   | 79 | 103    | 120     | 135     | 115     | 78    | 1          | false     |
| 10 | Caterpie                  | Bug    |        | 195   | 45 | 30     | 35      | 20      | 20      | 45    | 1          | false     |
| 11 | Metapod                   | Bug    |        | 205   | 50 | 20     | 55      | 25      | 25      | 30    | 1          | false     |
| 12 | Butterfree                | Bug    | Flying | 395   | 60 | 45     | 50      | 90      | 80      | 70    | 1          | false     |
| 13 | Weedle                    | Bug    | Poison | 195   | 40 | 35     | 30      | 20      | 20      | 50    | 1          | false     |
| 14 | Kakuna                    | Bug    | Poison | 205   | 45 | 25     | 50      | 25      | 25      | 35    | 1          | false     |
| 15 | Beedrill                  | Bug    | Poison | 395   | 65 | 90     | 40      | 45      | 80      | 75    | 1          | false     |
| 15 | BeedrillMega Beedrill     | Bug    | Poison | 495   | 65 | 150    | 40      | 15      | 80      | 145   | 1          | false     |
+----+---------------------------+--------+--------+-------+----+--------+---------+---------+---------+-------+------------+-----------+
Create in-memory¶
Sometimes it can be convenient to create a small DataFrame from a Python list or dictionary object.
To do this in DataFusion, you can use one of the three functions
from_pydict(),
from_pylist(), or
create_dataframe().
As their names suggest, from_pydict and from_pylist will create DataFrames from Python
dictionary and list objects, respectively. create_dataframe assumes you will pass in a list
of list of PyArrow Record Batches.
The following three examples all will create identical DataFrames:
In [5]: import pyarrow as pa
In [6]: ctx.from_pylist([
   ...:     { "a": 1, "b": 10.0, "c": "alpha" },
   ...:     { "a": 2, "b": 20.0, "c": "beta" },
   ...:     { "a": 3, "b": 30.0, "c": "gamma" },
   ...: ]).show()
   ...: 
DataFrame()
+---+------+-------+
| a | b    | c     |
+---+------+-------+
| 1 | 10.0 | alpha |
| 2 | 20.0 | beta  |
| 3 | 30.0 | gamma |
+---+------+-------+
In [7]: ctx.from_pydict({
   ...:     "a": [1, 2, 3],
   ...:     "b": [10.0, 20.0, 30.0],
   ...:     "c": ["alpha", "beta", "gamma"],
   ...: }).show()
   ...: 
DataFrame()
+---+------+-------+
| a | b    | c     |
+---+------+-------+
| 1 | 10.0 | alpha |
| 2 | 20.0 | beta  |
| 3 | 30.0 | gamma |
+---+------+-------+
In [8]: batch = pa.RecordBatch.from_arrays(
   ...:     [
   ...:         pa.array([1, 2, 3]),
   ...:         pa.array([10.0, 20.0, 30.0]),
   ...:         pa.array(["alpha", "beta", "gamma"]),
   ...:     ],
   ...:     names=["a", "b", "c"],
   ...: )
   ...: 
In [9]: ctx.create_dataframe([[batch]]).show()
DataFrame()
+---+------+-------+
| a | b    | c     |
+---+------+-------+
| 1 | 10.0 | alpha |
| 2 | 20.0 | beta  |
| 3 | 30.0 | gamma |
+---+------+-------+
Object Store¶
DataFusion has support for multiple storage options in addition to local files. The example below requires an appropriate S3 account with access credentials.
Supported Object Stores are
from datafusion.object_store import AmazonS3
region = "us-east-1"
bucket_name = "yellow-trips"
s3 = AmazonS3(
    bucket_name=bucket_name,
    region=region,
    access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
)
path = f"s3://{bucket_name}/"
ctx.register_object_store("s3://", s3, None)
ctx.register_parquet("trips", path)
ctx.table("trips").show()
Other DataFrame Libraries¶
DataFusion can import DataFrames directly from other libraries, such as
Polars and Pandas.
Since DataFusion version 42.0.0, any DataFrame library that supports the Arrow FFI PyCapsule
interface can be imported to DataFusion using the
from_arrow() function. Older versions of Polars may
not support the arrow interface. In those cases, you can still import via the
from_polars() function.
import pandas as pd
data = { "a": [1, 2, 3], "b": [10.0, 20.0, 30.0], "c": ["alpha", "beta", "gamma"] }
pandas_df = pd.DataFrame(data)
datafusion_df = ctx.from_arrow(pandas_df)
datafusion_df.show()
import polars as pl
polars_df = pl.DataFrame(data)
datafusion_df = ctx.from_arrow(polars_df)
datafusion_df.show()
Delta Lake¶
DataFusion 43.0.0 and later support the ability to register table providers from sources such as Delta Lake. This will require a recent version of deltalake to provide the required interfaces.
from deltalake import DeltaTable
delta_table = DeltaTable("path_to_table")
ctx.register_table("my_delta_table", delta_table)
df = ctx.table("my_delta_table")
df.show()
On older versions of deltalake (prior to 0.22) you can use the
Arrow DataSet
interface to import to DataFusion, but this does not support features such as filter push down
which can lead to a significant performance difference.
from deltalake import DeltaTable
delta_table = DeltaTable("path_to_table")
ctx.register_dataset("my_delta_table", delta_table.to_pyarrow_dataset())
df = ctx.table("my_delta_table")
df.show()
Apache Iceberg¶
DataFusion 45.0.0 and later support the ability to register Apache Iceberg tables as table providers through the Custom Table Provider interface.
This requires either the pyiceberg library (>=0.10.0) or the pyiceberg-core library (>=0.5.0).
- The - pyiceberg-corelibrary exposes Iceberg Rust’s implementation of the Custom Table Provider interface as python bindings.
- The - pyiceberglibrary utilizes the- pyiceberg-corepython bindings under the hood and provides a native way for Python users to interact with the DataFusion.
from datafusion import SessionContext
from pyiceberg.catalog import load_catalog
import pyarrow as pa
# Load catalog and create/load a table
catalog = load_catalog("catalog", type="in-memory")
catalog.create_namespace_if_not_exists("default")
# Create some sample data
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
iceberg_table = catalog.create_table("default.test", schema=data.schema)
iceberg_table.append(data)
# Register the table with DataFusion
ctx = SessionContext()
ctx.register_table_provider("test", iceberg_table)
# Query the table using DataFusion
ctx.table("test").show()
Note that the Datafusion integration rely on features from the Iceberg Rust implementation instead of the PyIceberg implementation. Features that are available in PyIceberg but not yet in Iceberg Rust will not be available when using DataFusion.
Custom Table Provider¶
You can implement a custom Data Provider in Rust and expose it to DataFusion through the the interface as describe in the Custom Table Provider section. This is an advanced topic, but a user example is provided in the DataFusion repository.
Catalog¶
A common technique for organizing tables is using a three level hierarchical approach. DataFusion
supports this form of organizing using the Catalog,
Schema, and Table. By default,
a SessionContext comes with a single Catalog and a single Schema
with the names datafusion and default, respectively.
The default implementation uses an in-memory approach to the catalog and schema. We have support for adding additional in-memory catalogs and schemas. This can be done like in the following example:
from datafusion.catalog import Catalog, Schema
my_catalog = Catalog.memory_catalog()
my_schema = Schema.memory_schema()
my_catalog.register_schema("my_schema_name", my_schema)
ctx.register_catalog("my_catalog_name", my_catalog)
You could then register tables in my_schema and access them either through the DataFrame
API or via sql commands such as "SELECT * from my_catalog_name.my_schema_name.my_table".
User Defined Catalog and Schema¶
If the in-memory catalogs are insufficient for your uses, there are two approaches you can take to implementing a custom catalog and/or schema. In the below discussion, we describe how to implement these for a Catalog, but the approach to implementing for a Schema is nearly identical.
DataFusion supports Catalogs written in either Rust or Python. If you write a Catalog in Rust, you will need to export it as a Python library via PyO3. There is a complete example of a catalog implemented this way in the examples folder of our repository. Writing catalog providers in Rust provides typically can lead to significant performance improvements over the Python based approach.
To implement a Catalog in Python, you will need to inherit from the abstract base class
CatalogProvider. There are examples in the
unit tests of
implementing a basic Catalog in Python where we simply keep a dictionary of the
registered Schemas.
One important note for developers is that when we have a Catalog defined in Python, we have two different ways of accessing this Catalog. First, we register the catalog with a Rust wrapper. This allows for any rust based code to call the Python functions as necessary. Second, if the user access the Catalog via the Python API, we identify this and return back the original Python object that implements the Catalog. This is an important distinction for developers because we do not return a Python wrapper around the Rust wrapper of the original Python object.
