Using the DataFrame API¶
The Users Guide introduces the DataFrame
API and this section describes
that API in more depth.
What is a DataFrame?¶
As described in the Users Guide, DataFusion DataFrame
s are modeled after
the Pandas DataFrame interface, and are implemented as thin wrapper over a
LogicalPlan
that adds functionality for building and executing those plans.
The simplest possible dataframe is one that scans a table and that table can be in a file or in memory.
How to generate a DataFrame¶
You can construct DataFrame
s programmatically using the API, similarly to
other DataFrame APIs. For example, you can read an in memory RecordBatch
into
a DataFrame
:
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register an in-memory table containing the following data
// id | bank_account
// ---|-------------
// 1 | 9000
// 2 | 8000
// 3 | 7000
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
// Create a DataFrame that scans the user table, and finds
// all users with a bank account at least 8000
// and sorts the results by bank account in descending order
let dataframe = ctx
.read_batch(data)?
.filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000
.sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC
Ok(())
}
You can also generate a DataFrame
from a SQL query and use the DataFrame’s APIs
to manipulate the output of the query.
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::assert_batches_eq;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register the same in-memory table as the previous example
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
ctx.register_batch("users", data)?;
// Create a DataFrame using SQL
let dataframe = ctx.sql("SELECT * FROM users;")
.await?
// Note we can filter the output of the query using the DataFrame API
.filter(col("bank_account").gt_eq(lit(8000)))?; // bank_account >= 8000
let results = &dataframe.collect().await?;
// use the `assert_batches_eq` macro to show the output
assert_batches_eq!(
vec![
"+----+--------------+",
"| id | bank_account |",
"+----+--------------+",
"| 1 | 9000 |",
"| 2 | 8000 |",
"+----+--------------+",
],
&results
);
Ok(())
}
Collect / Streaming Exec¶
DataFusion DataFrame
s are “lazy”, meaning they do no processing until
they are executed, which allows for additional optimizations.
You can run a DataFrame
in one of three ways:
collect
: executes the query and buffers all the output into aVec<RecordBatch>
execute_stream
: begins executions and returns aSendableRecordBatchStream
which incrementally computes output on each call tonext()
cache
: executes the query and buffers the output into a new in memoryDataFrame.
To collect all outputs into a memory buffer, use the collect
method:
use datafusion::prelude::*;
use datafusion::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read the contents of a CSV file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// execute the query and collect the results as a Vec<RecordBatch>
let batches = df.collect().await?;
for record_batch in batches {
println!("{record_batch:?}");
}
Ok(())
}
Use execute_stream
to incrementally generate output one RecordBatch
at a time:
use datafusion::prelude::*;
use datafusion::error::Result;
use futures::stream::StreamExt;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// begin execution (returns quickly, does not compute results)
let mut stream = df.execute_stream().await?;
// results are returned incrementally as they are computed
while let Some(record_batch) = stream.next().await {
println!("{record_batch:?}");
}
Ok(())
}
Write DataFrame to Files¶
You can also write the contents of a DataFrame
to a file. When writing a file,
DataFusion executes the DataFrame
and streams the results to the output.
DataFusion comes with support for writing csv
, json
arrow
avro
, and
parquet
files, and supports writing custom file formats via API (see
custom_file_format.rs
for an example)
For example, to read a CSV file and write it to a parquet file, use the
DataFrame::write_parquet
method
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::dataframe::DataFrameWriteOptions;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// stream the contents of the DataFrame to the `example.parquet` file
let target_path = tempfile::tempdir()?.path().join("example.parquet");
df.write_parquet(
target_path.to_str().unwrap(),
DataFrameWriteOptions::new(),
None, // writer_options
).await;
Ok(())
}
The output file will look like (Example Output):
> select * from '../datafusion/core/example.parquet';
+---+---+---+
| a | b | c |
+---+---+---+
| 1 | 2 | 3 |
+---+---+---+
Relationship between LogicalPlan
s and DataFrame
s¶
The DataFrame
struct is defined like this:
use datafusion::execution::session_state::SessionState;
use datafusion::logical_expr::LogicalPlan;
pub struct DataFrame {
// state required to execute a LogicalPlan
session_state: Box<SessionState>,
// LogicalPlan that describes the computation to perform
plan: LogicalPlan,
}
As shown above, DataFrame
is a thin wrapper of LogicalPlan
, so you can
easily go back and forth between them.
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;
#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// You can easily get the LogicalPlan from the DataFrame
let (_state, plan) = df.into_parts();
// Just combine LogicalPlan with SessionContext and you get a DataFrame
// get LogicalPlan in dataframe
let new_df = DataFrame::new(ctx.state(), plan);
Ok(())
}
In fact, using the DataFrame
s methods you can create the same
LogicalPlan
s as when using LogicalPlanBuilder
:
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;
#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// Create a new DataFrame sorted by `id`, `bank_account`
let new_df = df.select(vec![col("a"), col("b")])?
.sort_by(vec![col("a")])?;
// Build the same plan using the LogicalPlanBuilder
// Similar to `SELECT a, b FROM example.csv ORDER BY a`
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan
let plan = LogicalPlanBuilder::from(plan)
.project(vec![col("a"), col("b")])?
.sort_by(vec![col("a")])?
.build()?;
// prove they are the same
assert_eq!(new_df.logical_plan(), &plan);
Ok(())
}