Using the SQL API¶
DataFusion has a full SQL API that allows you to interact with DataFusion using
SQL query strings. The simplest way to use the SQL API is to use the
SessionContext
struct which provides the highest-level API for executing SQL
queries.
To use SQL, you first register your data as a table and then run queries
using the SessionContext::sql
method. For lower level control such as
preventing DDL, you can use SessionContext::sql_with_options
or the
SessionState
APIs
Registering Data Sources using SessionContext::register*
¶
The SessionContext::register*
methods tell DataFusion the name of
the source and how to read data. Once registered, you can execute SQL queries
using the SessionContext::sql
method referring to your data source as a table.
The SessionContext::sql
method returns a DataFrame
for ease of
use. See the “Using the DataFrame API” section for more information on how to
work with DataFrames.
Read a CSV File¶
use datafusion::error::Result;
use datafusion::prelude::*;
use arrow::record_batch::RecordBatch;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// register the "example" table
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan to run a SQL query
let df = ctx.sql("SELECT a, min(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// execute the plan and collect the results as Vec<RecordBatch>
let results: Vec<RecordBatch> = df.collect().await?;
// Use the assert_batches_eq macro to compare the results with expected output
datafusion::assert_batches_eq!(vec![
"+---+----------------+",
"| a | min(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+",
],
&results
);
Ok(())
}
Read an Apache Parquet file¶
Similarly to CSV, you can register a Parquet file as a table using the register_parquet
method.
use datafusion::error::Result;
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// create local session context
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
// register parquet file with the execution context
ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
// execute the query
let df = ctx.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col",
).await?;
// execute the plan, and compare to the expected results
let results = df.collect().await?;
datafusion::assert_batches_eq!(vec![
"+---------+------------+--------------------------------+",
"| int_col | double_col | alltypes_plain.date_string_col |",
"+---------+------------+--------------------------------+",
"| 1 | 10.1 | 03/01/09 |",
"| 1 | 10.1 | 04/01/09 |",
"| 1 | 10.1 | 02/01/09 |",
"+---------+------------+--------------------------------+",
],
&results
);
Ok(())
}
Read an Apache Avro file¶
DataFusion can also read Avro files using the register_avro
method.
use datafusion::arrow::util::pretty;
use datafusion::error::Result;
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// find the path to the avro test files
let testdata = datafusion::test_util::arrow_test_data();
// register avro file with the execution context
let avro_file = &format!("{testdata}/avro/alltypes_plain.avro");
ctx.register_avro("alltypes_plain", avro_file, AvroReadOptions::default()).await?;
// execute the query
let df = ctx.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col"
).await?;
// execute the plan, and compare to the expected results
let results = df.collect().await?;
datafusion::assert_batches_eq!(vec![
"+---------+------------+--------------------------------+",
"| int_col | double_col | alltypes_plain.date_string_col |",
"+---------+------------+--------------------------------+",
"| 1 | 10.1 | 03/01/09 |",
"| 1 | 10.1 | 04/01/09 |",
"| 1 | 10.1 | 02/01/09 |",
"+---------+------------+--------------------------------+",
],
&results
);
Ok(())
}
Reading Multiple Files as a table¶
It is also possible to read multiple files as a single table. This is done with the ListingTableProvider which takes a list of file paths and reads them as a single table, matching schemas as appropriate
Coming Soon
Using CREATE EXTERNAL TABLE
to register data sources via SQL¶
You can also register files using SQL using the CREATE EXTERNAL TABLE
statement.
use datafusion::error::Result;
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
// create local session context
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
// register parquet file using SQL
let ddl = format!(
"CREATE EXTERNAL TABLE alltypes_plain \
STORED AS PARQUET LOCATION '{testdata}/alltypes_plain.parquet'"
);
ctx.sql(&ddl).await?;
// execute the query referring to the alltypes_plain table we just registered
let df = ctx.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col",
).await?;
// execute the plan, and compare to the expected results
let results = df.collect().await?;
datafusion::assert_batches_eq!(vec![
"+---------+------------+--------------------------------+",
"| int_col | double_col | alltypes_plain.date_string_col |",
"+---------+------------+--------------------------------+",
"| 1 | 10.1 | 03/01/09 |",
"| 1 | 10.1 | 04/01/09 |",
"| 1 | 10.1 | 02/01/09 |",
"+---------+------------+--------------------------------+",
],
&results
);
Ok(())
}