Installing DataFusion Comet

Prerequisites

Make sure the following requirements are met and software installed on your machine.

Supported Operating Systems

  • Linux

  • Apple OSX (Intel and Apple Silicon)

Supported Spark Versions

Comet currently supports the following versions of Apache Spark:

  • 3.3.x (Java 8/11/17, Scala 2.12/2.13)

  • 3.4.x (Java 8/11/17, Scala 2.12/2.13)

  • 3.5.x (Java 8/11/17, Scala 2.12/2.13)

Experimental support is provided for the following versions of Apache Spark and is intended for development/testing use only and should not be used in production yet.

  • 4.0.0-preview1 (Java 17/21, Scala 2.13)

Note that Comet may not fully work with proprietary forks of Apache Spark such as the Spark versions offered by Cloud Service Providers.

Using a Published JAR File

Comet jar files are available in Maven Central for amd64 and arm64 architectures for Linux. For Apple OSX, it is currently necessary to build from source.

Here are the direct links for downloading the Comet jar file.

Building from source

Refer to the Building from Source guide for instructions from building Comet from source, either from official source releases, or from the latest code in the GitHub repository.

Deploying to Kubernetes

See the Comet Kubernetes Guide guide.

Run Spark Shell with Comet enabled

Make sure SPARK_HOME points to the same Spark version as Comet was built for.

export COMET_JAR=spark/target/comet-spark-spark3.4_2.12-0.5.0-SNAPSHOT.jar

$SPARK_HOME/bin/spark-shell \
    --jars $COMET_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=16g \

Verify Comet enabled for Spark SQL query

Create a test Parquet source

scala> (0 until 10).toDF("a").write.mode("overwrite").parquet("/tmp/test")

Query the data from the test source and check:

  • INFO message shows the native Comet library has been initialized.

  • The query plan reflects Comet operators being used for this query instead of Spark ones

scala> spark.read.parquet("/tmp/test").createOrReplaceTempView("t1")
scala> spark.sql("select * from t1 where a > 5").explain
INFO src/lib.rs: Comet native library initialized
== Physical Plan ==
        *(1) ColumnarToRow
        +- CometFilter [a#14], (isnotnull(a#14) AND (a#14 > 5))
          +- CometScan parquet [a#14] Batched: true, DataFilters: [isnotnull(a#14), (a#14 > 5)],
             Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/test], PartitionFilters: [],
             PushedFilters: [IsNotNull(a), GreaterThan(a,5)], ReadSchema: struct<a:int>

With the configuration spark.comet.explainFallback.enabled=true, Comet will log any reasons that prevent a plan from being executed natively.

scala> Seq(1,2,3,4).toDF("a").write.parquet("/tmp/test.parquet")
WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively because:
  - LocalTableScan is not supported
  - WriteFiles is not supported
  - Execute InsertIntoHadoopFsRelationCommand is not supported

Additional Configuration

Depending on your deployment mode you may also need to set the driver & executor class path(s) to explicitly contain Comet otherwise Spark may use a different class-loader for the Comet components than its internal components which will then fail at runtime. For example:

--driver-class-path spark/target/comet-spark-spark3.4_2.12-0.5.0-SNAPSHOT.jar

Some cluster managers may require additional configuration, see https://spark.apache.org/docs/latest/cluster-overview.html

Memory tuning

In addition to Apache Spark memory configuration parameters, Comet introduces additional parameters to configure memory allocation for native execution. See Comet Memory Tuning for details.