Adding a New Operator#
This guide explains how to add support for a new Spark physical operator in Apache DataFusion Comet.
Overview#
CometExecRule is responsible for replacing Spark operators with Comet operators. There are different approaches to
implementing Comet operators depending on where they execute and how they integrate with the native execution engine.
Types of Comet Operators#
CometExecRule maintains two distinct maps of operators:
1. Native Operators (nativeExecs map)#
These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. Native
operators are registered in the nativeExecs map in CometExecRule.scala.
Key characteristics of native operators:
They are converted to their corresponding native protobuf representation
They execute as DataFusion operators in the native engine
The
CometOperatorSerdeimplementation handles enable/disable checks, support validation, and protobuf serialization
Examples: ProjectExec, FilterExec, SortExec, HashAggregateExec, SortMergeJoinExec, ExpandExec, WindowExec
2. Sink Operators (sinks map)#
Sink operators serve as entry points (data sources) for native execution blocks. They are registered in the sinks
map in CometExecRule.scala.
Key characteristics of sinks:
They become
ScanExecoperators in the native plan (seeoperator2ProtoinCometExecRule.scala)They can be leaf nodes that feed data into native execution blocks
They are wrapped with
CometScanWrapperorCometSinkPlaceHolderduring plan transformationExamples include operators that bring data from various sources into native execution
Examples: UnionExec, CoalesceExec, CollectLimitExec, TakeOrderedAndProjectExec
Special sinks (not in the sinks map but also treated as sinks):
CometScanExec- File scansCometSparkToColumnarExec- Conversion from Spark row formatShuffleExchangeExec/BroadcastExchangeExec- Exchange operators
3. Comet JVM Operators#
These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen
in CometExecRule rather than using CometOperatorSerde, because they don’t need protobuf serialization.
Examples: CometBroadcastExchangeExec, CometShuffleExchangeExec
Choosing the Right Operator Type#
When adding a new operator, choose based on these criteria:
Use Native Operators when:
The operator transforms data (e.g., project, filter, sort, aggregate, join)
The operator has a direct DataFusion equivalent or custom implementation
The operator consumes native child operators and produces native output
The operator is in the middle of an execution pipeline
Use Sink Operators when:
The operator serves as a data source for native execution (becomes a
ScanExec)The operator brings data from non-native sources (e.g.,
UnionExeccombining multiple inputs)The operator is typically a leaf or near-leaf node in the execution tree
The operator needs special handling to interface with the native engine
Implementation Note for Sinks:
Sink operators are handled specially in CometExecRule.operator2Proto. Instead of converting to their own operator
type, they are converted to ScanExec in the native plan. This allows them to serve as entry points for native
execution blocks. The original Spark operator is wrapped with CometScanWrapper or CometSinkPlaceHolder which
manages the boundary between JVM and native execution.
Implementing a Native Operator#
This section focuses on adding a native operator, which is the most common and complex case.
Step 1: Define the Protobuf Message#
First, add the operator definition to native/proto/src/proto/operator.proto.
Add to the Operator Message#
Add your new operator to the oneof op_struct in the main Operator message:
message Operator {
repeated Operator children = 1;
uint32 plan_id = 2;
oneof op_struct {
Scan scan = 100;
Projection projection = 101;
Filter filter = 102;
// ... existing operators ...
YourNewOperator your_new_operator = 112; // Choose next available number
}
}
Define the Operator Message#
Create a message for your operator with the necessary fields:
message YourNewOperator {
// Fields specific to your operator
repeated spark.spark_expression.Expr expressions = 1;
// Add other configuration fields as needed
}
For reference, see existing operators like Filter (simple), HashAggregate (complex), or Sort (with ordering).
Step 2: Create a CometOperatorSerde Implementation#
Create a new Scala file in spark/src/main/scala/org/apache/comet/serde/operator/ (e.g., CometYourOperator.scala) that extends CometOperatorSerde[T] where T is the Spark operator type.
The CometOperatorSerde trait provides several key methods:
enabledConfig: Option[ConfigEntry[Boolean]]- Configuration to enable/disable this operatorgetSupportLevel(operator: T): SupportLevel- Determines if the operator is supportedconvert(op: T, builder: Operator.Builder, childOp: Operator*): Option[Operator]- Converts to protobufcreateExec(nativeOp: Operator, op: T): CometNativeExec- Creates the Comet execution operator wrapper
The validation workflow in CometExecRule.isOperatorEnabled:
Checks if the operator is enabled via
enabledConfigCalls
getSupportLevel()to determine compatibilityHandles Compatible/Incompatible/Unsupported cases with appropriate fallback messages
Simple Example (Filter)#
object CometFilterExec extends CometOperatorSerde[FilterExec] {
override def enabledConfig: Option[ConfigEntry[Boolean]] =
Some(CometConf.COMET_EXEC_FILTER_ENABLED)
override def convert(
op: FilterExec,
builder: Operator.Builder,
childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
val cond = exprToProto(op.condition, op.child.output)
if (cond.isDefined && childOp.nonEmpty) {
val filterBuilder = OperatorOuterClass.Filter
.newBuilder()
.setPredicate(cond.get)
Some(builder.setFilter(filterBuilder).build())
} else {
withInfo(op, op.condition, op.child)
None
}
}
override def createExec(nativeOp: Operator, op: FilterExec): CometNativeExec = {
CometFilterExec(nativeOp, op, op.output, op.condition, op.child, SerializedPlan(None))
}
}
case class CometFilterExec(
override val nativeOp: Operator,
override val originalPlan: SparkPlan,
override val output: Seq[Attribute],
condition: Expression,
child: SparkPlan,
override val serializedPlanOpt: SerializedPlan)
extends CometUnaryExec {
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
this.copy(child = newChild)
}
More Complex Example (Project)#
object CometProjectExec extends CometOperatorSerde[ProjectExec] {
override def enabledConfig: Option[ConfigEntry[Boolean]] =
Some(CometConf.COMET_EXEC_PROJECT_ENABLED)
override def convert(
op: ProjectExec,
builder: Operator.Builder,
childOp: Operator*): Option[OperatorOuterClass.Operator] = {
val exprs = op.projectList.map(exprToProto(_, op.child.output))
if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
val projectBuilder = OperatorOuterClass.Projection
.newBuilder()
.addAllProjectList(exprs.map(_.get).asJava)
Some(builder.setProjection(projectBuilder).build())
} else {
withInfo(op, op.projectList: _*)
None
}
}
override def createExec(nativeOp: Operator, op: ProjectExec): CometNativeExec = {
CometProjectExec(nativeOp, op, op.output, op.projectList, op.child, SerializedPlan(None))
}
}
case class CometProjectExec(
override val nativeOp: Operator,
override val originalPlan: SparkPlan,
override val output: Seq[Attribute],
projectList: Seq[NamedExpression],
child: SparkPlan,
override val serializedPlanOpt: SerializedPlan)
extends CometUnaryExec
with PartitioningPreservingUnaryExecNode {
override def producedAttributes: AttributeSet = outputSet
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
this.copy(child = newChild)
}
Using getSupportLevel#
Override getSupportLevel to control operator support based on specific conditions:
override def getSupportLevel(operator: YourOperatorExec): SupportLevel = {
// Check for unsupported features
if (operator.hasUnsupportedFeature) {
return Unsupported(Some("Feature X is not supported"))
}
// Check for incompatible behavior
if (operator.hasKnownDifferences) {
return Incompatible(Some("Known differences in edge case Y"))
}
Compatible()
}
Support levels:
Compatible()- Fully compatible with Spark (default)Incompatible()- Supported but may differ; requires explicit opt-inUnsupported()- Not supported under current conditions
Note that Comet will treat an operator as incompatible if any of the child expressions are incompatible.
Step 3: Register the Operator#
Add your operator to the appropriate map in CometExecRule.scala:
For Native Operators#
Add to the nativeExecs map (CometExecRule.scala):
val nativeExecs: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
Map(
classOf[ProjectExec] -> CometProjectExec,
classOf[FilterExec] -> CometFilterExec,
// ... existing operators ...
classOf[YourOperatorExec] -> CometYourOperator,
)
For Sink Operators#
If your operator is a sink (becomes a ScanExec in the native plan), add to the sinks map (CometExecRule.scala):
val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
Map(
classOf[CoalesceExec] -> CometCoalesceExec,
classOf[UnionExec] -> CometUnionExec,
// ... existing operators ...
classOf[YourSinkOperatorExec] -> CometYourSinkOperator,
)
Note: The allExecs map automatically combines both nativeExecs and sinks, so you only need to add to one of the two maps.
Step 4: Add Configuration Entry#
Add a configuration entry in common/src/main/scala/org/apache/comet/CometConf.scala:
val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.yourOperator.enabled")
.doc("Whether to enable your operator in Comet")
.booleanConf
.createWithDefault(true)
Run make to update the user guide. The new configuration option will be added to docs/source/user-guide/latest/configs.md.
Step 5: Implement the Native Operator in Rust#
Update the Planner#
In native/core/src/execution/planner.rs, add a match case in the operator deserialization logic to handle your new protobuf message:
use datafusion_comet_proto::spark_operator::operator::OpStruct;
// In the create_plan or similar method:
match op.op_struct.as_ref() {
Some(OpStruct::Scan(scan)) => {
// ... existing cases ...
}
Some(OpStruct::YourNewOperator(your_op)) => {
create_your_operator_exec(your_op, children, session_ctx)
}
// ... other cases ...
}
Implement the Operator#
Create the operator implementation, either in an existing file or a new file in native/core/src/execution/operators/:
use datafusion::physical_plan::{ExecutionPlan, ...};
use datafusion_comet_proto::spark_operator::YourNewOperator;
pub fn create_your_operator_exec(
op: &YourNewOperator,
children: Vec<Arc<dyn ExecutionPlan>>,
session_ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
// Deserialize expressions and configuration
// Create and return the execution plan
// Option 1: Use existing DataFusion operator
// Ok(Arc::new(SomeDataFusionExec::try_new(...)?))
// Option 2: Implement custom operator (see ExpandExec for example)
// Ok(Arc::new(YourCustomExec::new(...)))
}
For custom operators, you’ll need to implement the ExecutionPlan trait. See native/core/src/execution/operators/expand.rs or scan.rs for examples.
Step 6: Add Tests#
Scala Integration Tests#
Add tests in spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala or a related test suite:
test("your operator") {
withTable("test_table") {
sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")
// Test query that uses your operator
checkSparkAnswerAndOperator(
"SELECT * FROM test_table WHERE col1 > 1"
)
}
}
The checkSparkAnswerAndOperator helper verifies:
Results match Spark’s native execution
Your operator is actually being used (not falling back)
Rust Unit Tests#
Add unit tests in your Rust implementation file:
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_your_operator() {
// Test operator creation and execution
}
}
Step 7: Update Documentation#
Add your operator to the supported operators list in docs/source/user-guide/latest/compatibility.md or similar documentation.
Implementing a Sink Operator#
Sink operators are converted to ScanExec in the native plan and serve as entry points for native execution. The implementation is simpler than native operators because sink operators extend the CometSink base class which provides the conversion logic.
Step 1: Create a CometOperatorSerde Implementation#
Create a new Scala file in spark/src/main/scala/org/apache/spark/sql/comet/ (e.g., CometYourSinkOperator.scala):
import org.apache.comet.serde.operator.CometSink
object CometYourSinkOperator extends CometSink[YourSinkExec] {
override def enabledConfig: Option[ConfigEntry[Boolean]] =
Some(CometConf.COMET_EXEC_YOUR_SINK_ENABLED)
// Optional: Override if the data produced is FFI safe
override def isFfiSafe: Boolean = false
override def createExec(
nativeOp: OperatorOuterClass.Operator,
op: YourSinkExec): CometNativeExec = {
CometSinkPlaceHolder(
nativeOp,
op,
CometYourSinkExec(op, op.output, /* other parameters */, op.child))
}
// Optional: Override getSupportLevel if you need custom validation beyond data types
override def getSupportLevel(operator: YourSinkExec): SupportLevel = {
// CometSink base class already checks data types in convert()
// Add any additional validation here
Compatible()
}
}
/**
* Comet implementation of YourSinkExec that supports columnar processing
*/
case class CometYourSinkExec(
override val originalPlan: SparkPlan,
override val output: Seq[Attribute],
/* other parameters */,
child: SparkPlan)
extends CometExec
with UnaryExecNode {
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
// Implement columnar execution logic
val rdd = child.executeColumnar()
// Apply your sink operator's logic
rdd
}
override def outputPartitioning: Partitioning = {
// Define output partitioning
}
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
this.copy(child = newChild)
}
Key Points:
Extend
CometSink[T]which provides theconvert()method that transforms the operator toScanExecThe
CometSink.convert()method (inCometSink.scala) automatically handles:Data type validation
Conversion to
ScanExecin the native planSetting FFI safety flags
You must implement
createExec()to wrap the operator appropriatelyYou typically need to create a corresponding
CometYourSinkExecclass that implements columnar execution
Step 2: Register the Sink#
Add your sink to the sinks map in CometExecRule.scala:
val sinks: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
Map(
classOf[CoalesceExec] -> CometCoalesceExec,
classOf[UnionExec] -> CometUnionExec,
classOf[YourSinkExec] -> CometYourSinkOperator,
)
Step 3: Add Configuration#
Add a configuration entry in CometConf.scala:
val COMET_EXEC_YOUR_SINK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.yourSink.enabled")
.doc("Whether to enable your sink operator in Comet")
.booleanConf
.createWithDefault(true)
Step 4: Add Tests#
Test that your sink operator correctly feeds data into native execution:
test("your sink operator") {
withTable("test_table") {
sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")
// Test query that uses your sink operator followed by native operators
checkSparkAnswerAndOperator(
"SELECT col1 + 1 FROM (/* query that produces YourSinkExec */)"
)
}
}
Important Notes for Sinks:
Sinks extend the
CometSinkbase class, which provides theconvert()method implementationThe
CometSink.convert()method automatically handles conversion toScanExecin the native planYou don’t need to add protobuf definitions for sink operators - they use the standard
ScanmessageYou don’t need Rust implementation for sinks - they become standard
ScanExecoperators that read from the JVMSink implementations should provide a columnar-compatible execution class (e.g.,
CometCoalesceExec)The
createExec()method wraps the operator withCometSinkPlaceHolderto manage the JVM-to-native boundarySee
CometCoalesceExec.scalaorCometUnionExecinspark/src/main/scala/org/apache/spark/sql/comet/for reference implementations
Implementing a JVM Operator#
For operators that run in the JVM:
Create a new operator class extending appropriate Spark base classes in
spark/src/main/scala/org/apache/comet/Add matching logic in
CometExecRule.scalato transform the Spark operatorNo protobuf or Rust implementation needed
Example pattern from CometExecRule.scala:
case s: ShuffleExchangeExec if nativeShuffleSupported(s) =>
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
Common Patterns and Helpers#
Expression Conversion#
Use QueryPlanSerde.exprToProto to convert Spark expressions to protobuf:
val protoExpr = exprToProto(sparkExpr, inputSchema)
Handling Fallback#
Use withInfo to tag operators with fallback reasons:
if (!canConvert) {
withInfo(op, "Reason for fallback", childNodes: _*)
return None
}
Child Operator Validation#
Always check that child operators were successfully converted:
if (childOp.isEmpty) {
// Cannot convert if children failed
return None
}
Debugging Tips#
Enable verbose logging: Set
spark.comet.explain.format=verboseto see detailed plan transformationsCheck fallback reasons: Set
spark.comet.logFallbackReasons.enabled=trueto log why operators fall back to SparkVerify protobuf: Add debug prints in Rust to inspect deserialized operators
Use EXPLAIN: Run
EXPLAIN EXTENDEDon queries to see the physical plan