Building Logical Plans

A logical plan is a structured representation of a database query that describes the high-level operations and transformations needed to retrieve data from a database or data source. It abstracts away specific implementation details and focuses on the logical flow of the query, including operations like filtering, sorting, and joining tables.

This logical plan serves as an intermediate step before generating an optimized physical execution plan. This is explained in more detail in the Query Planning and Execution Overview section of the Architecture Guide.

Building Logical Plans Manually

DataFusion’s LogicalPlan is an enum containing variants representing all the supported operators, and also contains an Extension variant that allows projects building on DataFusion to add custom logical operators.

It is possible to create logical plans by directly creating instances of the LogicalPlan enum as shown, but it is much easier to use the LogicalPlanBuilder, which is described in the next section.

Here is an example of building a logical plan directly:

use datafusion::common::DataFusionError;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{Filter, LogicalPlan, TableScan, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

fn main() -> Result<(), DataFusionError> {
    // create a logical table source
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, true),
        Field::new("name", DataType::Utf8, true),
    ]);
    let table_source = LogicalTableSource::new(SchemaRef::new(schema));

    // create a TableScan plan
    let projection = None; // optional projection
    let filters = vec![]; // optional filters to push down
    let fetch = None; // optional LIMIT
    let table_scan = LogicalPlan::TableScan(TableScan::try_new(
        "person",
        Arc::new(table_source),
        projection,
        filters,
        fetch,
        )?
    );

    // create a Filter plan that evaluates `id > 500` that wraps the TableScan
    let filter_expr = col("id").gt(lit(500));
    let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan)) ? );

    // print the plan
    println!("{}", plan.display_indent_schema());
    Ok(())
}

This example produces the following plan:

Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N]
  TableScan: person [id:Int32;N, name:Utf8;N]

Building Logical Plans with LogicalPlanBuilder

DataFusion logical plans can be created using the LogicalPlanBuilder struct. There is also a DataFrame API which is a higher-level API that delegates to LogicalPlanBuilder.

There are several functions that can can be used to create a new builder, such as

  • empty - create an empty plan with no fields

  • values - create a plan from a set of literal values

  • scan - create a plan representing a table scan

  • scan_with_filters - create a plan representing a table scan with filters

Once the builder is created, transformation methods can be called to declare that further operations should be performed on the plan. Note that all we are doing at this stage is building up the logical plan structure. No query execution will be performed.

Here are some examples of transformation methods, but for a full list, refer to the LogicalPlanBuilder API documentation.

  • filter

  • limit

  • sort

  • distinct

  • join

The following example demonstrates building the same simple query plan as the previous example, with a table scan followed by a filter.

use datafusion::common::DataFusionError;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

fn main() -> Result<(), DataFusionError> {
    // create a logical table source
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, true),
        Field::new("name", DataType::Utf8, true),
    ]);
    let table_source = LogicalTableSource::new(SchemaRef::new(schema));

    // optional projection
    let projection = None;

    // create a LogicalPlanBuilder for a table scan
    let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?;

    // perform a filter operation and build the plan
    let plan = builder
        .filter(col("id").gt(lit(500)))? // WHERE id > 500
        .build()?;

    // print the plan
    println!("{}", plan.display_indent_schema());
    Ok(())
}

This example produces the following plan:

Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N]
  TableScan: person [id:Int32;N, name:Utf8;N]

Translating Logical Plan to Physical Plan

Logical plans can not be directly executed. They must be “compiled” into an ExecutionPlan, which is often referred to as a “physical plan”.

Compared to LogicalPlans ExecutionPlans have many more details such as specific algorithms and detailed optimizations compared to. Given a LogicalPlan the easiest way to create an ExecutionPlan is using SessionState::create_physical_plan as shown below

use datafusion::datasource::{provider_as_source, MemTable};
use datafusion::common::DataFusionError;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_expr::{LogicalPlanBuilder, LogicalTableSource};
use datafusion::prelude::*;
use std::sync::Arc;

// Creating physical plans may access remote catalogs and data sources
// thus it must be run with an async runtime.
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {

    // create a default table source
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int32, true),
        Field::new("name", DataType::Utf8, true),
    ]);
    // To create an ExecutionPlan we must provide an actual
    // TableProvider. For this example, we don't provide any data
    // but in production code, this would have `RecordBatch`es with
    // in memory data
    let table_provider = Arc::new(MemTable::try_new(Arc::new(schema), vec![])?);
    // Use the provider_as_source function to convert the TableProvider to a table source
    let table_source = provider_as_source(table_provider);

    // create a LogicalPlanBuilder for a table scan without projection or filters
    let logical_plan = LogicalPlanBuilder::scan("person", table_source, None)?.build()?;

    // Now create the physical plan by calling `create_physical_plan`
    let ctx = SessionContext::new();
    let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?;

    // print the plan
    println!("{}", DisplayableExecutionPlan::new(physical_plan.as_ref()).indent(true));
    Ok(())
}

This example produces the following physical plan:

MemoryExec: partitions=0, partition_sizes=[]

Table Sources

The previous examples use a LogicalTableSource, which is used for tests and documentation in DataFusion, and is also suitable if you are using DataFusion to build logical plans but do not use DataFusion’s physical planner.

However, it is more common to use a TableProvider. To get a TableSource from a TableProvider, use provider_as_source or DefaultTableSource.