Accelerating Apache Iceberg Parquet Scans using Comet (Experimental)

Note: Iceberg integration is a work-in-progress. It is currently necessary to build Iceberg from source rather than using available artifacts in Maven

Build Comet

Run a Maven install so that we can compile Iceberg against latest Comet:

mvn install -DskipTests

Build the release JAR to be used from Spark:

make release

Set COMET_JAR env var:

export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-0.10.0-SNAPSHOT.jar

Build Iceberg

Clone the Iceberg repository and apply code changes needed by Comet

git clone git@github.com:apache/iceberg.git
cd iceberg
git checkout apache-iceberg-1.8.1
git apply ../datafusion-comet/dev/diffs/iceberg/1.8.1.diff

Perform a clean build

./gradlew clean
./gradlew build -x test -x integrationTest

Test

Set ICEBERG_JAR environment variable.

export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.9.0-SNAPSHOT.jar

Launch Spark Shell:

$SPARK_HOME/bin/spark-shell \
    --jars $COMET_JAR,$ICEBERG_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR,$ICEBERG_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR,$ICEBERG_JAR \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hadoop \
    --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.sql.iceberg.parquet.reader-type=COMET \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=2g

Create an Iceberg table. Note that Comet will not accelerate this part.

scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg")
scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}")

Comet should now be able to accelerate reading the table:

scala> spark.sql(s"SELECT * from t1").show()

This should produce the following output:

scala> spark.sql(s"SELECT * from t1").show()
+---+---+
| c0| c1|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  5|
|  6|  6|
|  7|  7|
|  8|  8|
|  9|  9|
| 10| 10|
| 11| 11|
| 12| 12|
| 13| 13|
| 14| 14|
| 15| 15|
| 16| 16|
| 17| 17|
| 18| 18|
| 19| 19|
+---+---+
only showing top 20 rows

Confirm that the query was accelerated by Comet:

scala> spark.sql(s"SELECT * from t1").explain()
== Physical Plan ==
*(1) CometColumnarToRow
+- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: []