Writing Custom Table Providers in Apache DataFusion
Posted on: Tue 31 March 2026 by Tim Saucer (rerun.io)
One of DataFusion's greatest strengths is its extensibility. If your data lives in a custom format, behind an API, or in a system that DataFusion does not natively support, you can teach DataFusion to read it by implementing a custom table provider. This post walks through the three layers you need to understand to design a table provider and where planning and execution work should happen.
The Three Layers¶
When DataFusion executes a query against a table, three abstractions collaborate to produce results:
- TableProvider -- Describes the table (schema, capabilities) and produces an execution plan when queried. This is part of the Logical Plan.
- ExecutionPlan -- Describes how to compute the result: partitioning, ordering, and child plan relationships. This is part of the Physical Plan.
- SendableRecordBatchStream -- The async stream that actually does the
work, yielding
RecordBatches one at a time.
Think of these as a funnel: TableProvider::scan() is called once during
planning to create an ExecutionPlan, then ExecutionPlan::execute() is called
once per partition to create a stream, and those streams are where rows are
actually produced during execution.
Background: Logical and Physical Planning¶
Before diving into the three layers, it helps to understand how DataFusion processes a query. There are several phases between a SQL string (or DataFrame call) and streaming results:
SQL / DataFrame API
→ Logical Plan (abstract: what to compute)
→ Logical Optimization (rewrite rules that preserve semantics)
→ Physical Plan (concrete: how to compute it)
→ Physical Optimization (hardware- and data-aware rewrites)
→ Execution (streaming RecordBatches)
Logical Planning¶
A logical plan describes what the query computes without specifying how.
It is a tree of relational operators -- TableScan, Filter, Projection,
Aggregate, Join, Sort, Limit, and so on. The logical optimizer rewrites
this tree to reduce work while preserving the query's meaning. Some logical
optimizations include:
- Predicate pushdown -- moves filters as close to the data source as possible, so fewer rows flow through the rest of the plan.
- Projection pruning -- eliminates columns that are never referenced downstream, reducing memory and I/O.
- Expression simplification -- rewrites expressions like
1 = 1orx AND trueinto simpler forms. - Subquery decorrelation -- converts correlated
IN/EXISTSsubqueries into more efficient semi-joins. - Limit pushdown -- pushes
LIMITearlier in the plan so operators produce less data.
Physical Planning¶
The physical planner converts the optimized logical plan into an
ExecutionPlan tree -- the concrete plan that will actually run. This is where
decisions like "use a hash join vs. a sort-merge join" or "how many partitions
to scan" are made. The physical optimizer then refines this tree further with rewrites such as:
- Distribution enforcement -- inserts
RepartitionExecnodes so that data is partitioned correctly for joins and aggregations. - Sort enforcement -- inserts
SortExecnodes where ordering is required, and removes them where the data is already sorted. - Join selection -- picks the most efficient join strategy based on statistics and table sizes.
- Aggregate optimization -- combines partial and final aggregation stages, and can use exact statistics to skip scanning entirely.
Why This Matters for Table Providers¶
Your TableProvider sits at the boundary between logical and physical planning.
During logical optimization, DataFusion determines which filters and projections
could be pushed down to the source. When scan() is called during physical
planning, those hints are passed to you. By implementing capabilities like
supports_filters_pushdown, you influence what the optimizer can do -- and the
metadata you declare in your ExecutionPlan (partitioning, ordering) directly
affects which physical optimizations apply.
Choosing the Right Starting Point¶
Not every custom data source requires implementing all three layers from scratch. DataFusion provides building blocks that let you plug in at whatever level makes sense:
| If your data is... | Start with | You implement |
|---|---|---|
Already in RecordBatches in memory |
MemTable | Nothing -- just construct it |
| An async stream of batches | StreamTable | A stream factory |
| A logical transformation of other tables | ViewTable wrapping a logical plan | The logical plan |
| A variant of an existing file format | ListingTable with a custom FileFormat wrapping an existing one | A thin FileFormat wrapper |
| Files in a custom format on disk or object storage | ListingTable with a custom FileFormat, FileSource, and FileOpener | The format, source, and opener |
| A custom source needing full control | TableProvider + ExecutionPlan + stream |
All three layers |
If your data is file-based, ListingTable handles file discovery, partition
column inference, and plan construction -- you only need to implement
FileFormat, FileSource, and FileOpener to describe how to read your
files. See the custom_file_format example for a minimal wrapping approach,
or ParquetSource and ParquetOpener for a full custom implementation to
use as a reference.
The rest of this post focuses on the full TableProvider + ExecutionPlan +
stream path, which gives you complete control and applies to any data source.
Layer 1: TableProvider¶
A TableProvider represents a queryable data source. For a minimal read-only table, you need four methods:
impl TableProvider for MyTable {
fn as_any(&self) -> &dyn Any { self }
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Build and return an ExecutionPlan -- don't do any execution work here -- keep lightweight!
Ok(Arc::new(MyExecPlan::new(
Arc::clone(&self.schema),
projection,
limit,
)))
}
}
The scan method is the heart of TableProvider. It receives three pushdown
hints from the optimizer, each reducing the amount of data your source needs
to produce:
projection-- Which columns are needed. This reduces the width of the output. If your source supports it, read only these columns rather than the full schema.filters-- Predicates the engine would like you to apply during the scan. This reduces the number of rows by skipping data that does not match. Implementsupports_filters_pushdownto advertise which filters you can handle.limit-- A row count cap. This also reduces the number of rows -- if you can stop reading early once you have produced enough rows, this avoids unnecessary work.
You can also use the scan_with_args() variant that provides additional pushdown information for other advanced use cases.
Keep scan() Lightweight¶
This is a critical point: scan() runs during planning, not execution. It
should return quickly. Best practice is to avoid performing I/O, network
calls, or heavy computation here. The scan method's job is to describe how
the data will be produced, not to produce it. All the real work belongs in the
stream (Layer 3).
A common pitfall is to fetch data or open connections in scan(). This blocks
the planning thread and can cause timeouts or deadlocks, especially if the query
involves multiple tables or subqueries that all need to be planned before
execution begins.
Existing Implementations to Learn From¶
DataFusion ships several TableProvider implementations that are excellent
references:
- MemTable -- Holds data in memory as
Vec<RecordBatch>. The simplest possible provider; great for tests and small datasets. - StreamTable -- Wraps a user-provided stream factory. Useful when your data arrives as a continuous stream (e.g., from Kafka or a socket).
- ListingTable -- The file-based data source behind DataFusion's built-in Parquet, CSV, and JSON support. Demonstrates sophisticated filter and projection pushdown, file pruning, and schema inference.
- ViewTable -- Wraps a logical plan, representing a SQL view. Useful if your provider is best expressed as a transformation of other tables.
Layer 2: ExecutionPlan¶
An ExecutionPlan is a node in the physical query plan tree. Your table
provider's scan() method returns one. The required methods are:
impl ExecutionPlan for MyExecPlan {
fn name(&self) -> &str { "MyExecPlan" }
fn as_any(&self) -> &dyn Any { self }
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![] // Leaf node -- no children
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
assert!(children.is_empty());
Ok(self)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// This is where you build and return your stream
// ...
}
}
The key properties to set correctly in PlanProperties are output partitioning and output ordering.
Output partitioning tells the engine how many partitions your data has, which determines parallelism. If your source naturally partitions data (e.g., by file or by shard), expose that here.
Output ordering declares whether your data is naturally sorted. This
enables the optimizer to avoid inserting a SortExec when a query requires
ordered data. Getting this right can be a significant performance win.
Partitioning Strategies¶
Since execute() is called once per partition, partitioning directly controls
the parallelism of your table scan. Each partition produces an independent
stream that DataFusion schedules as a task on the tokio runtime. It is
important to distinguish tasks from threads: tasks are lightweight units of
async work that are multiplexed onto a thread pool. You can have many more
tasks (partitions) than physical threads -- the runtime will interleave them
efficiently as they await I/O or yield.
Start simple: match your data's natural layout. If you have 4 files, expose
4 partitions. If your source has 8 shards, expose 8 partitions. DataFusion will
insert a RepartitionExec above your scan when downstream operators need a
different distribution. You can also implement the
repartitioned
method on your ExecutionPlan to let DataFusion request a different partition
count directly from your source, avoiding the extra operator entirely.
Consider how your data source naturally divides its data:
- By file or object: If you are reading from S3, each file can be a partition. DataFusion will read them in parallel.
- By shard or region: If your source is a sharded database, each shard maps naturally to a partition.
- By key range: If your data is keyed (e.g., by timestamp or customer ID), you can split it into ranges.
Advanced: aligning with target_partitions. Once you have something
working, you can tune further. Having too many partitions is not free: each
partition adds scheduling overhead, and downstream operators may need to
repartition the data anyway. The session configuration exposes a
target partition count that reflects how many partitions the optimizer
expects to work with:
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = state.config().target_partitions();
// Optionally coalesce or split partitions to match target_partitions.
// ...
}
If your source produces data in exactly target_partitions partitions, the
optimizer is less likely to insert a RepartitionExec above your scan.
For small datasets, target_partitions may be set to 1, which avoids any
repartitioning overhead entirely.
Advanced: declaring hash partitioning. If your source stores data
pre-partitioned by a specific key (e.g., customer_id), you can declare this
in your output partitioning. For a query like:
SELECT customer_id, SUM(amount)
FROM my_table
GROUP BY customer_id;
If you declare your output partitioning as Hash([customer_id], N), the
optimizer recognizes that the data is already distributed correctly for the
aggregation and eliminates the RepartitionExec that would otherwise appear
in the plan. You can verify this with EXPLAIN (more on this below).
Conversely, if you report UnknownPartitioning, DataFusion must assume the
worst case and will always insert repartitioning operators as needed.
Keep execute() Lightweight Too¶
Like scan(), the execute() method should construct and return a stream
without doing heavy work. The actual data production happens when the stream
is polled. Do not block on async operations here -- build the stream and let
the runtime drive it.
Existing Implementations to Learn From¶
- StreamingTableExec -- Executes a streaming table scan. It takes a stream factory (a closure that produces streams) and handles partitioning. Good reference for wrapping external streams.
- DataSourceExec -- The execution plan behind DataFusion's built-in file scanning (Parquet, CSV, JSON). It demonstrates sophisticated partitioning, filter pushdown, and projection pushdown.
Layer 3: SendableRecordBatchStream¶
SendableRecordBatchStream is where the real work happens. It is defined as:
type SendableRecordBatchStream =
Pin<Box<dyn RecordBatchStream<Item = Result<RecordBatch>> + Send>>;
This is an async stream of RecordBatches that can be sent across threads. When
the DataFusion runtime polls this stream, your code runs: reading files, calling
APIs, transforming data, etc.
Using RecordBatchStreamAdapter¶
The easiest way to create a SendableRecordBatchStream is with
RecordBatchStreamAdapter. It bridges any futures::Stream<Item =
Result<RecordBatch>> into the SendableRecordBatchStream type:
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = self.schema();
let config = self.config.clone();
let stream = futures::stream::once(async move {
// ALL the heavy work happens here, inside the stream:
// - Open connections
// - Read data from external sources
// - Transform and batch the results
let batches = fetch_data_from_source(&config).await?;
Ok(batches)
})
.flat_map(|result| match result {
Ok(batch) => futures::stream::iter(vec![Ok(batch)]),
Err(e) => futures::stream::iter(vec![Err(e)]),
});
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
Blocking Work: Use a Separate Thread Pool¶
If your stream performs blocking work -- such as blocking I/O, or CPU work that runs for hundreds of milliseconds without yielding -- you must avoid blocking the tokio async runtime. Short CPU work (e.g., parsing a batch in a few milliseconds) is fine to do inline as long as your code yields back to the runtime frequently. But for long-running synchronous work that cannot yield, offload to a dedicated thread pool and send results back through a channel:
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = self.schema();
let config = self.config.clone();
let (tx, rx) = tokio::sync::mpsc::channel(2);
// Spawn blocking work on a dedicated thread pool
tokio::task::spawn_blocking(move || {
let batches = generate_data(&config);
for batch in batches {
if tx.blocking_send(Ok(batch)).is_err() {
break; // Receiver dropped, query was cancelled
}
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
This pattern keeps the async runtime responsive while long-running synchronous work runs on its own threads. For a working example that shows how to configure separate thread pools for I/O and CPU work, see the thread_pools example in the DataFusion repository.
Where Should the Work Happen?¶
This table summarizes what belongs at each layer:
| Layer | Runs During | Should Do | Should NOT Do |
|---|---|---|---|
TableProvider::scan() |
Planning | Build an ExecutionPlan with metadata |
I/O, network calls, heavy computation |
ExecutionPlan::execute() |
Execution (once per partition) | Construct a stream, set up channels | Block on async work, read data |
RecordBatchStream (polling) |
Execution | All I/O, computation, data production | -- |
The guiding principle: push work as late as possible. Planning should be fast so the optimizer can do its job. Execution setup should be fast so all partitions can start promptly. The stream is where you spend time producing data.
Why This Matters¶
When scan() does heavy work, several problems arise:
- Planning becomes slow. If a query touches 10 tables and each
scan()takes 500ms, planning alone takes 5 seconds before any data flows. - Execution is single-threaded.
scan()runs on a single thread during planning, so any work done there cannot benefit from the parallel execution that DataFusion provides across partitions. - The optimizer cannot help. The optimizer runs between planning and execution. If you have already fetched data during planning, optimizations like predicate pushdown or partition pruning cannot reduce the work.
- Resource management breaks down. DataFusion manages concurrency and memory during execution. Work done during planning bypasses these controls.
Filter Pushdown: Doing Less Work¶
One of the most impactful optimizations you can add to a custom table provider is filter pushdown -- letting the source skip data that the query does not need, rather than reading everything and filtering it afterward.
How Filter Pushdown Works¶
When DataFusion plans a query with a WHERE clause, it passes the filter
predicates to your scan() method as the filters parameter. By default,
DataFusion assumes your provider cannot handle any filters and inserts a
FilterExec node above your scan to apply them. But if your source can
evaluate some predicates during scanning -- for example, by skipping files,
partitions, or row groups that cannot match -- you can eliminate a huge amount
of unnecessary I/O.
To opt in, implement supports_filters_pushdown:
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(filters.iter().map(|f| {
match f {
// We can fully evaluate equality filters on
// the partition column at the source
Expr::BinaryExpr(BinaryExpr {
left, op: Operator::Eq, right
}) if is_partition_column(left) || is_partition_column(right) => {
TableProviderFilterPushDown::Exact
}
// All other filters: let DataFusion handle them
_ => TableProviderFilterPushDown::Unsupported,
}
}).collect())
}
The three possible responses for each filter are:
Exact-- Your source guarantees that no output rows will have a false value for this predicate. Because the filter is fully evaluated at the source, DataFusion will not add aFilterExecfor it.Inexact-- Your source has the ability to reduce the data produced, but the output may still include rows that do not satisfy the predicate. For example, you might skip entire files based on metadata statistics but not filter individual rows within a file. DataFusion will still add aFilterExecabove your scan to remove any remaining rows that slipped through.Unsupported-- Your source ignores this filter entirely. DataFusion handles it.
Why Filter Pushdown Matters¶
Consider a table with 1 billion rows partitioned by region, and a query:
SELECT * FROM events WHERE region = 'us-east-1' AND event_type = 'click';
Without filter pushdown: Your table provider reads all 1 billion rows across all regions. DataFusion then applies both filters, discarding the vast majority of the data.
With filter pushdown on region: Your scan() method sees the
region = 'us-east-1' filter and constructs an execution plan that only reads
the us-east-1 partition. If that partition holds 100 million rows, you have
just eliminated 90% of the I/O. DataFusion still applies the event_type
filter via FilterExec if you reported it as Unsupported.
Only Push Down Filters When the Data Source Can Do Better¶
DataFusion already pushes filters as close to the data source as possible, typically placing them directly above the scan. FilterExec is also highly optimized, with vectorized evaluation and type-specialized kernels for fast predicate evaluation.
Because of this, you should only implement filter pushdown when your data source can do strictly better -- for example, by avoiding I/O entirely through skipping files or partitions based on metadata. If your data source cannot eliminate I/O in this way, it is usually better to let DataFusion handle the filter, as its in-memory execution is already highly efficient.
Using EXPLAIN to Debug Your Table Provider¶
The EXPLAIN statement is your best tool for understanding what DataFusion is
actually doing with your table provider. It shows the physical plan that
DataFusion will execute, including any operators it inserted:
EXPLAIN SELECT * FROM events WHERE region = 'us-east-1' AND event_type = 'click';
If you are using DataFrames, call .explain(false, false) for the logical plan
or .explain(false, true) for the physical plan. You can also print the plans
in verbose mode with .explain(true, true).
Before filter pushdown, the plan might look like:
FilterExec: region@0 = us-east-1 AND event_type@1 = click
MyExecPlan: partitions=50
Here DataFusion is reading all 50 partitions and filtering everything
afterward. The FilterExec above your scan is doing all the predicate work.
After implementing pushdown for region (reported as Exact):
FilterExec: event_type@1 = click
MyExecPlan: partitions=5, filter=[region = us-east-1]
Now your exec reads only the 5 partitions for us-east-1, and the remaining
FilterExec only handles the event_type predicate. The region filter has
been fully absorbed by your scan.
After implementing pushdown for both filters (both Exact):
MyExecPlan: partitions=5, filter=[region = us-east-1 AND event_type = click]
No FilterExec at all -- your source handles everything.
Similarly, EXPLAIN will reveal whether DataFusion is inserting unnecessary
SortExec or RepartitionExec nodes that you could eliminate by declaring
better output properties. Whenever your queries seem slower than expected,
EXPLAIN is the first place to look.
A Complete Filter Pushdown Example¶
To make filter pushdown concrete, here is an illustrative example. Imagine a
table provider that reads from a set of date-partitioned directories on disk
(e.g., data/2026-03-01/, data/2026-03-02/, ...). Each directory contains
one or more Parquet files for that date. By pushing down a filter on the date
column, the provider can skip entire directories -- avoiding the I/O of listing
and reading files that cannot possibly match the query.
/// A table provider backed by date-partitioned directories.
/// Each date directory contains data files; by filtering on the
/// `date` column we can skip entire directories of I/O.
struct DatePartitionedTable {
schema: SchemaRef,
/// Maps date strings ("2026-03-01") to directory paths
partitions: HashMap<String, String>,
}
#[async_trait::async_trait]
impl TableProvider for DatePartitionedTable {
fn as_any(&self) -> &dyn Any { self }
fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }
fn table_type(&self) -> TableType { TableType::Base }
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(filters.iter().map(|f| {
if Self::is_date_equality_filter(f) {
// We can fully evaluate this: we will only read
// directories matching the date, so no rows with
// a different date will appear in the output.
TableProviderFilterPushDown::Exact
} else {
TableProviderFilterPushDown::Unsupported
}
}).collect())
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Determine which date partitions to read by inspecting
// the pushed-down filters. This is the key optimization:
// we decide *during planning* which directories to scan,
// so that execution never touches irrelevant data.
let dates_to_read: Vec<String> = self
.extract_date_values(filters)
.unwrap_or_else(||
self.partitions.keys().cloned().collect()
);
let dirs: Vec<String> = dates_to_read
.iter()
.filter_map(|d| self.partitions.get(d).cloned())
.collect();
let num_dirs = dirs.len();
Ok(Arc::new(DatePartitionedExec {
schema: Arc::clone(&self.schema),
directories: dirs,
properties: PlanProperties::new(
EquivalenceProperties::new(
Arc::clone(&self.schema),
),
// One partition per date directory -- these
// will be read in parallel.
Partitioning::UnknownPartitioning(num_dirs),
EmissionType::Incremental,
Boundedness::Bounded,
),
}))
}
}
impl DatePartitionedTable {
/// Check if a filter is an equality comparison on the `date` column.
fn is_date_equality_filter(expr: &Expr) -> bool {
// In practice, match on BinaryExpr { left, op: Eq, right }
// and check if either side references the "date" column.
// Simplified here for clarity.
todo!("match on date equality expressions")
}
/// Extract date literal values from pushed-down equality filters.
fn extract_date_values(&self, filters: &[Expr]) -> Option<Vec<String>> {
// Parse filters like `date = '2026-03-01'` and return
// the literal date strings. Returns None if no date
// filters are present (meaning: read all partitions).
todo!("extract date literals from filter expressions")
}
}
The key insight is that the filter pushdown decision (supports_filters_pushdown)
and the partition pruning (scan()) work together: the first tells DataFusion
that a FilterExec is unnecessary for the date predicate, and the second
ensures that only the relevant directories are scanned. The actual file reading
happens later, in the stream produced by execute().
Putting It All Together¶
Here is a minimal but complete example of a custom table provider that generates data lazily during streaming:
use std::any::Any;
use std::sync::Arc;
use arrow::array::Int64Array;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::catalog::TableProvider;
use datafusion::common::Result;
use datafusion::datasource::TableType;
use datafusion::catalog::Session;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::Expr;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
ExecutionPlan, Partitioning, PlanProperties,
};
use futures::stream;
/// A table provider that generates sequential numbers on demand.
struct CountingTable {
schema: SchemaRef,
num_partitions: usize,
rows_per_partition: usize,
}
impl CountingTable {
fn new(num_partitions: usize, rows_per_partition: usize) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("partition", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]));
Self { schema, num_partitions, rows_per_partition }
}
}
#[async_trait::async_trait]
impl TableProvider for CountingTable {
fn as_any(&self) -> &dyn Any { self }
fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }
fn table_type(&self) -> TableType { TableType::Base }
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Light work only: build the plan with metadata
Ok(Arc::new(CountingExec {
schema: Arc::clone(&self.schema),
num_partitions: self.num_partitions,
rows_per_partition: limit
.unwrap_or(self.rows_per_partition)
.min(self.rows_per_partition),
properties: PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&self.schema)),
Partitioning::UnknownPartitioning(self.num_partitions),
EmissionType::Incremental,
Boundedness::Bounded,
),
}))
}
}
struct CountingExec {
schema: SchemaRef,
num_partitions: usize,
rows_per_partition: usize,
properties: PlanProperties,
}
impl ExecutionPlan for CountingExec {
fn name(&self) -> &str { "CountingExec" }
fn as_any(&self) -> &dyn Any { self }
fn properties(&self) -> &PlanProperties { &self.properties }
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] }
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = Arc::clone(&self.schema);
let rows = self.rows_per_partition;
// The heavy work (data generation) happens inside the stream,
// not here in execute().
let batch_stream = stream::once(async move {
let partitions = Int64Array::from(
vec![partition as i64; rows],
);
let values = Int64Array::from(
(0..rows as i64).collect::<Vec<_>>(),
);
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(partitions), Arc::new(values)],
)?;
Ok(batch)
});
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
batch_stream,
)))
}
}
Acknowledgements¶
I would like to thank Rerun.io for sponsoring the development of this work. Rerun is building a data visualization system for Physical AI and makes heavy use of DataFusion table providers for working with data analytics.
I would also like to thank the reviewers of this post for their helpful feedback and suggestions: @2010YOUY01, @adriangb, @alamb, @kevinjqliu, @Omega359, @pgwhalen, and @stuhood.
Get Involved¶
DataFusion is not a project built or driven by a single person, company, or foundation. Our community of users and contributors works together to build a shared technology that none of us could have built alone.
If you are interested in joining us, we would love to have you. You can try out DataFusion on some of your own data and projects and let us know how it goes, contribute suggestions, documentation, bug reports, or a PR with documentation, tests, or code. A list of open issues suitable for beginners is here, and you can find out how to reach us on the communication doc.
Further Reading¶
TableProviderAPI docsExecutionPlanAPI docsSendableRecordBatchStreamAPI docs- DataFusion examples directory -- contains working examples including custom table providers
Note: Portions of this blog post were written with the assistance of an AI agent.