Logical plans via datafusion-proto#
SessionContext.fromProto(byte[]) accepts a serialized DataFusion
LogicalPlanNode and returns a lazy DataFrame. This is useful when you
already have a plan produced by another DataFusion-aware tool, or when
you want to construct the plan programmatically with finer-grained
control than the sql or DataFrame APIs.
The protobuf Java classes are generated by the build into the
org.apache.datafusion.protobuf (plan and expression nodes) and
datafusion_common (scalar values, schema, column references, file
formats) packages. The Maven build downloads pinned .proto files from
the matching upstream DataFusion tag on first build, then generates the
Java classes locally — see the
Contributor Guide
for how to bump the version.
A minimal plan#
The smallest interesting plan is a projection of a literal over an empty input. It is useful as a sanity check and exercises serialization end-to-end without touching any storage.
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.datafusion.DataFrame;
import org.apache.datafusion.SessionContext;
import org.apache.datafusion.protobuf.EmptyRelationNode;
import org.apache.datafusion.protobuf.LogicalExprNode;
import org.apache.datafusion.protobuf.LogicalPlanNode;
import org.apache.datafusion.protobuf.ProjectionNode;
import datafusion_common.DatafusionCommon;
LogicalPlanNode plan =
LogicalPlanNode.newBuilder()
.setProjection(
ProjectionNode.newBuilder()
.setInput(
LogicalPlanNode.newBuilder()
.setEmptyRelation(
EmptyRelationNode.newBuilder().setProduceOneRow(true).build())
.build())
.addExpr(
LogicalExprNode.newBuilder()
.setLiteral(
DatafusionCommon.ScalarValue.newBuilder().setInt32Value(1).build())
.build())
.build())
.build();
try (var allocator = new RootAllocator();
SessionContext ctx = new SessionContext();
DataFrame df = ctx.fromProto(plan.toByteArray());
ArrowReader reader = df.collect(allocator)) {
reader.loadNextBatch();
VectorSchemaRoot batch = reader.getVectorSchemaRoot();
IntVector col = (IntVector) batch.getVector(0);
System.out.println(col.get(0)); // 1
}
fromProto performs the same logical-planning, optimization, and
physical-planning pipeline as sql; the result is a lazy
DataFrame that only executes when you pull results.
Scanning a Parquet file via ListingTableScanNode#
A ListingTableScanNode reads one or more files of the same format
from disk. Unlike registerParquet, it does not require the table to
be in the catalog — the scan node carries everything DataFusion needs:
the file paths, the schema, the projection, the file format, and the
target partition count.
The scan node’s schema field is a datafusion_common.Schema, not an
Arrow Schema. Convert between the two with the helper in
org.apache.datafusion.proto.SchemaConverter:
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.datafusion.proto.SchemaConverter;
Schema arrow = ctx.tableSchema("lineitem");
DatafusionCommon.Schema schemaProto = SchemaConverter.toProto(arrow);
The full example: register the file once to introspect its schema,
then build a plan that scans the same file, sorts by l_orderkey,
and fetches the first row. This is equivalent to
SELECT l_orderkey FROM lineitem ORDER BY l_orderkey LIMIT 1.
import org.apache.datafusion.protobuf.BareTableReference;
import org.apache.datafusion.protobuf.ListingTableScanNode;
import org.apache.datafusion.protobuf.ProjectionColumns;
import org.apache.datafusion.protobuf.SortExprNode;
import org.apache.datafusion.protobuf.SortNode;
import org.apache.datafusion.protobuf.TableReference;
String path = "/path/to/lineitem.parquet";
try (var allocator = new RootAllocator();
SessionContext ctx = new SessionContext()) {
ctx.registerParquet("lineitem", path);
DatafusionCommon.Schema schemaProto =
SchemaConverter.toProto(ctx.tableSchema("lineitem"));
LogicalExprNode orderKeyCol =
LogicalExprNode.newBuilder()
.setColumn(DatafusionCommon.Column.newBuilder().setName("l_orderkey").build())
.build();
LogicalPlanNode plan =
LogicalPlanNode.newBuilder()
.setSort(
SortNode.newBuilder()
.setInput(
LogicalPlanNode.newBuilder()
.setListingScan(
ListingTableScanNode.newBuilder()
.setTableName(
TableReference.newBuilder()
.setBare(
BareTableReference.newBuilder()
.setTable("lineitem")
.build())
.build())
.addPaths(path)
.setFileExtension(".parquet")
.setSchema(schemaProto)
.setProjection(
ProjectionColumns.newBuilder()
.addColumns("l_orderkey")
.build())
.setParquet(
DatafusionCommon.ParquetFormat.getDefaultInstance())
.setTargetPartitions(1)
.build())
.build())
.addExpr(
SortExprNode.newBuilder()
.setExpr(orderKeyCol)
.setAsc(true)
.setNullsFirst(false)
.build())
.setFetch(1)
.build())
.build();
try (DataFrame df = ctx.fromProto(plan.toByteArray());
ArrowReader reader = df.collect(allocator)) {
reader.loadNextBatch();
// ...
}
}
When to use proto plans#
The sql and DataFrame APIs are the right choice for most workloads.
Reach for fromProto when you need one of:
Cross-tool interop. Accept plans produced by another DataFusion-based system (a planner, a scheduler, a query frontend).
Programmatic plan construction. Build the plan node tree directly instead of going through SQL parsing, useful for tools that compile their own surface language to DataFusion.
Plan persistence. Serialize a plan to bytes, store or transmit it, and execute it later — possibly in a different process or on a different machine.
Schema conversion support#
SchemaConverter.toProto and SchemaConverter.fromProto support the
primitive Arrow types this project’s tests exercise: Bool, signed and
unsigned integer types 8 through 64 bits, Float32, Float64, Utf8,
Utf8View, LargeUtf8, Date32, and Decimal128. Anything else
raises UnsupportedOperationException naming the offending type.