Logical plans via datafusion-proto#

SessionContext.fromProto(byte[]) accepts a serialized DataFusion LogicalPlanNode and returns a lazy DataFrame. This is useful when you already have a plan produced by another DataFusion-aware tool, or when you want to construct the plan programmatically with finer-grained control than the sql or DataFrame APIs.

The protobuf Java classes are generated by the build into the org.apache.datafusion.protobuf (plan and expression nodes) and datafusion_common (scalar values, schema, column references, file formats) packages. The Maven build downloads pinned .proto files from the matching upstream DataFusion tag on first build, then generates the Java classes locally — see the Contributor Guide for how to bump the version.

A minimal plan#

The smallest interesting plan is a projection of a literal over an empty input. It is useful as a sanity check and exercises serialization end-to-end without touching any storage.

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.datafusion.DataFrame;
import org.apache.datafusion.SessionContext;
import org.apache.datafusion.protobuf.EmptyRelationNode;
import org.apache.datafusion.protobuf.LogicalExprNode;
import org.apache.datafusion.protobuf.LogicalPlanNode;
import org.apache.datafusion.protobuf.ProjectionNode;

import datafusion_common.DatafusionCommon;

LogicalPlanNode plan =
    LogicalPlanNode.newBuilder()
        .setProjection(
            ProjectionNode.newBuilder()
                .setInput(
                    LogicalPlanNode.newBuilder()
                        .setEmptyRelation(
                            EmptyRelationNode.newBuilder().setProduceOneRow(true).build())
                        .build())
                .addExpr(
                    LogicalExprNode.newBuilder()
                        .setLiteral(
                            DatafusionCommon.ScalarValue.newBuilder().setInt32Value(1).build())
                        .build())
                .build())
        .build();

try (var allocator = new RootAllocator();
     SessionContext ctx = new SessionContext();
     DataFrame df = ctx.fromProto(plan.toByteArray());
     ArrowReader reader = df.collect(allocator)) {
    reader.loadNextBatch();
    VectorSchemaRoot batch = reader.getVectorSchemaRoot();
    IntVector col = (IntVector) batch.getVector(0);
    System.out.println(col.get(0));  // 1
}

fromProto performs the same logical-planning, optimization, and physical-planning pipeline as sql; the result is a lazy DataFrame that only executes when you pull results.

Scanning a Parquet file via ListingTableScanNode#

A ListingTableScanNode reads one or more files of the same format from disk. Unlike registerParquet, it does not require the table to be in the catalog — the scan node carries everything DataFusion needs: the file paths, the schema, the projection, the file format, and the target partition count.

The scan node’s schema field is a datafusion_common.Schema, not an Arrow Schema. Convert between the two with the helper in org.apache.datafusion.proto.SchemaConverter:

import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.datafusion.proto.SchemaConverter;

Schema arrow = ctx.tableSchema("lineitem");
DatafusionCommon.Schema schemaProto = SchemaConverter.toProto(arrow);

The full example: register the file once to introspect its schema, then build a plan that scans the same file, sorts by l_orderkey, and fetches the first row. This is equivalent to SELECT l_orderkey FROM lineitem ORDER BY l_orderkey LIMIT 1.

import org.apache.datafusion.protobuf.BareTableReference;
import org.apache.datafusion.protobuf.ListingTableScanNode;
import org.apache.datafusion.protobuf.ProjectionColumns;
import org.apache.datafusion.protobuf.SortExprNode;
import org.apache.datafusion.protobuf.SortNode;
import org.apache.datafusion.protobuf.TableReference;

String path = "/path/to/lineitem.parquet";

try (var allocator = new RootAllocator();
     SessionContext ctx = new SessionContext()) {

    ctx.registerParquet("lineitem", path);
    DatafusionCommon.Schema schemaProto =
        SchemaConverter.toProto(ctx.tableSchema("lineitem"));

    LogicalExprNode orderKeyCol =
        LogicalExprNode.newBuilder()
            .setColumn(DatafusionCommon.Column.newBuilder().setName("l_orderkey").build())
            .build();

    LogicalPlanNode plan =
        LogicalPlanNode.newBuilder()
            .setSort(
                SortNode.newBuilder()
                    .setInput(
                        LogicalPlanNode.newBuilder()
                            .setListingScan(
                                ListingTableScanNode.newBuilder()
                                    .setTableName(
                                        TableReference.newBuilder()
                                            .setBare(
                                                BareTableReference.newBuilder()
                                                    .setTable("lineitem")
                                                    .build())
                                            .build())
                                    .addPaths(path)
                                    .setFileExtension(".parquet")
                                    .setSchema(schemaProto)
                                    .setProjection(
                                        ProjectionColumns.newBuilder()
                                            .addColumns("l_orderkey")
                                            .build())
                                    .setParquet(
                                        DatafusionCommon.ParquetFormat.getDefaultInstance())
                                    .setTargetPartitions(1)
                                    .build())
                            .build())
                    .addExpr(
                        SortExprNode.newBuilder()
                            .setExpr(orderKeyCol)
                            .setAsc(true)
                            .setNullsFirst(false)
                            .build())
                    .setFetch(1)
                    .build())
            .build();

    try (DataFrame df = ctx.fromProto(plan.toByteArray());
         ArrowReader reader = df.collect(allocator)) {
        reader.loadNextBatch();
        // ...
    }
}

When to use proto plans#

The sql and DataFrame APIs are the right choice for most workloads. Reach for fromProto when you need one of:

  • Cross-tool interop. Accept plans produced by another DataFusion-based system (a planner, a scheduler, a query frontend).

  • Programmatic plan construction. Build the plan node tree directly instead of going through SQL parsing, useful for tools that compile their own surface language to DataFusion.

  • Plan persistence. Serialize a plan to bytes, store or transmit it, and execute it later — possibly in a different process or on a different machine.

Schema conversion support#

SchemaConverter.toProto and SchemaConverter.fromProto support the primitive Arrow types this project’s tests exercise: Bool, signed and unsigned integer types 8 through 64 bits, Float32, Float64, Utf8, Utf8View, LargeUtf8, Date32, and Decimal128. Anything else raises UnsupportedOperationException naming the offending type.