Accelerating Apache Iceberg Parquet Scans using Comet (Experimental)#

Note: Iceberg integration is a work-in-progress. Comet currently has two distinct Iceberg code paths: 1) a hybrid reader (native Parquet decoding, JVM otherwise) that requires building Iceberg from source rather than using available artifacts in Maven, and 2) fully-native reader (based on iceberg-rust). Directions for both designs are provided below.

Hybrid Reader#

Build Comet#

Run a Maven install so that we can compile Iceberg against latest Comet:

mvn install -DskipTests

Build the release JAR to be used from Spark:

make release

Set COMET_JAR env var:

export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-0.13.0.jar

Build Iceberg#

Clone the Iceberg repository and apply code changes needed by Comet

git clone git@github.com:apache/iceberg.git
cd iceberg
git checkout apache-iceberg-1.8.1
git apply ../datafusion-comet/dev/diffs/iceberg/1.8.1.diff

Perform a clean build

./gradlew clean build -x test -x integrationTest

Test#

Set ICEBERG_JAR environment variable.

export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.9.0-SNAPSHOT.jar

Launch Spark Shell:

$SPARK_HOME/bin/spark-shell \
    --driver-class-path $COMET_JAR:$ICEBERG_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hadoop \
    --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.sql.iceberg.parquet.reader-type=COMET \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=2g \
    --conf spark.comet.use.lazyMaterialization=false \
    --conf spark.comet.schemaEvolution.enabled=true

Create an Iceberg table. Note that Comet will not accelerate this part.

scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg")
scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}")

Comet should now be able to accelerate reading the table:

scala> spark.sql(s"SELECT * from t1").show()

This should produce the following output:

scala> spark.sql(s"SELECT * from t1").show()
+---+---+
| c0| c1|
+---+---+
|  0|  0|
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  5|  5|
|  6|  6|
|  7|  7|
|  8|  8|
|  9|  9|
| 10| 10|
| 11| 11|
| 12| 12|
| 13| 13|
| 14| 14|
| 15| 15|
| 16| 16|
| 17| 17|
| 18| 18|
| 19| 19|
+---+---+
only showing top 20 rows

Confirm that the query was accelerated by Comet:

scala> spark.sql(s"SELECT * from t1").explain()
== Physical Plan ==
*(1) CometColumnarToRow
+- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: []

Known issues#

  • Spark Runtime Filtering isn’t working

    • You can bypass the issue by either setting spark.sql.adaptive.enabled=false or spark.comet.exec.broadcastExchange.enabled=false

Native Reader#

Comet’s fully-native Iceberg integration does not require modifying Iceberg source code. Instead, Comet relies on reflection to extract FileScanTasks from Iceberg, which are then serialized to Comet’s native execution engine (see PR #2528).

The example below uses Spark’s package downloader to retrieve Comet 0.12.0 and Iceberg 1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, and 1.10. The key configuration to enable fully-native Iceberg is spark.comet.scan.icebergNative.enabled=true. This configuration should not be used with the hybrid Iceberg configuration spark.sql.iceberg.parquet.reader-type=COMET from above.

$SPARK_HOME/bin/spark-shell \
    --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \
    --repositories https://repo1.maven.org/maven2/ \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hadoop \
    --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.scan.icebergNative.enabled=true \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=2g

The same sample queries from above can be used to test Comet’s fully-native Iceberg integration, however the scan node to look for is CometIcebergNativeScan.

Supported features#

The native Iceberg reader supports the following features:

Table specifications:

  • Iceberg table spec v1 and v2 (v3 will fall back to Spark)

Schema and data types:

  • All primitive types including UUID

  • Complex types: arrays, maps, and structs

  • Schema evolution (adding and dropping columns)

Time travel and branching:

  • VERSION AS OF queries to read historical snapshots

  • Branch reads for accessing named branches

Delete handling (Merge-On-Read tables):

  • Positional deletes

  • Equality deletes

  • Mixed delete types

Filter pushdown:

  • Equality and comparison predicates (=, !=, >, >=, <, <=)

  • Logical operators (AND, OR)

  • NULL checks (IS NULL, IS NOT NULL)

  • IN and NOT IN list operations

  • BETWEEN operations

Partitioning:

  • Standard partitioning with partition pruning

  • Date partitioning with days() transform

  • Bucket partitioning

  • Truncate transform

  • Hour transform

Storage:

  • Local filesystem

  • Hadoop Distributed File System (HDFS)

  • S3-compatible storage (AWS S3, MinIO)

REST Catalog#

Comet’s native Iceberg reader also supports REST catalogs. The following example shows how to configure Spark to use a REST catalog with Comet’s native Iceberg scan:

$SPARK_HOME/bin/spark-shell \
    --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \
    --repositories https://repo1.maven.org/maven2/ \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.rest_cat=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.rest_cat.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
    --conf spark.sql.catalog.rest_cat.uri=http://localhost:8181 \
    --conf spark.sql.catalog.rest_cat.warehouse=/tmp/warehouse \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.scan.icebergNative.enabled=true \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=2g

Note that REST catalogs require explicit namespace creation before creating tables:

scala> spark.sql("CREATE NAMESPACE rest_cat.db")
scala> spark.sql("CREATE TABLE rest_cat.db.test_table (id INT, name STRING) USING iceberg")
scala> spark.sql("INSERT INTO rest_cat.db.test_table VALUES (1, 'Alice'), (2, 'Bob')")
scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show()

Current limitations#

The following scenarios will fall back to Spark’s native Iceberg reader:

  • Iceberg table spec v3 scans

  • Iceberg writes (reads are accelerated, writes use Spark)

  • Tables backed by Avro or ORC data files (only Parquet is accelerated)

  • Tables partitioned on BINARY or DECIMAL (with precision >28) columns

  • Scans with residual filters using truncate, bucket, year, month, day, or hour transform functions (partition pruning still works, but row-level filtering of these transforms falls back)