Comet Plugin Architecture¶
Comet SQL Plugin¶
The entry point to Comet is the org.apache.spark.CometPlugin
class, which can be registered with Spark by adding the
following setting to the Spark configuration when launching spark-shell
or spark-submit
:
--conf spark.plugins=org.apache.spark.CometPlugin
On initialization, this class registers two physical plan optimization rules with Spark: CometScanRule
and CometExecRule
. These rules run whenever a query stage is being planned during Adaptive Query Execution, and
run once for the entire plan when Adaptive Query Execution is disabled.
CometScanRule¶
CometScanRule
replaces any Parquet scans with Comet operators. There are different paths for Spark v1 and v2 data sources.
When reading from Parquet v1 data sources, Comet replaces FileSourceScanExec
with a CometScanExec
, and for v2
data sources, BatchScanExec
is replaced with CometBatchScanExec
. In both cases, Comet replaces Spark’s Parquet
reader with a custom vectorized Parquet reader. This is similar to Spark’s vectorized Parquet reader used by the v2
Parquet data source but leverages native code for decoding Parquet row groups directly into Arrow format.
Comet only supports a subset of data types and will fall back to Spark’s scan if unsupported types
exist. Comet can still accelerate the rest of the query execution in this case because CometSparkToColumnarExec
will
convert the output from Spark’s can to Arrow arrays. Note that both spark.comet.exec.enabled=true
and
spark.comet.convert.parquet.enabled=true
must be set to enable this conversion.
Refer to the Supported Spark Data Types section in the contributor guide to see a list of currently supported data types.
CometExecRule¶
This rule traverses bottom-up from the original Spark plan and attempts to replace each operator with a Comet equivalent.
For example, a ProjectExec
will be replaced by CometProjectExec
.
When replacing a node, various checks are performed to determine if Comet can support the operator and its expressions. If an operator, expression, or data type is not supported by Comet then the reason will be stored in a tag on the underlying Spark node and the plan will not be converted.
Comet does not support partially replacing subsets of the plan within a query stage because this would involve adding transitions to convert between row-based and columnar data between Spark operators and Comet operators and the overhead of this could outweigh the benefits of running parts of the query stage natively in Comet.
Query Execution¶
Once the plan has been transformed, any consecutive Comet operators are combined into a CometNativeExec
which contains
a serialized version of the plan (the serialization code can be found in QueryPlanSerde
). When this operator is
executed, the serialized plan is passed to the native code when calling Native.createPlan
.
In the native code there is a PhysicalPlanner
struct (in planner.rs
) which converts the serialized plan into an
Apache DataFusion ExecutionPlan
. In some cases, Comet provides specialized physical operators and expressions to
override the DataFusion versions to ensure compatibility with Apache Spark.
CometExecIterator
will invoke Native.executePlan
to pull the next batch from the native plan. This is repeated
until no more batches are available (meaning that all data has been processed by the native plan).
The leaf nodes in the physical plan are always ScanExec
and these operators consume batches of Arrow data that were
prepared before the plan is executed. When CometExecIterator
invokes Native.executePlan
it passes the memory
addresses of these Arrow arrays to the native code.
End to End Flow¶
The following diagram shows the end-to-end flow.