Accelerating Apache Iceberg Parquet Scans using Comet (Experimental)#

Note: Iceberg integration is a work-in-progress.

Native Reader#

Comet’s fully-native Iceberg integration does not require modifying Iceberg source code. Instead, Comet relies on reflection to extract FileScanTasks from Iceberg, which are then serialized to Comet’s native execution engine (see PR #2528).

The example below uses Spark’s package downloader to retrieve Comet 0.14.0 and Iceberg 1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, 1.9, and 1.10. The key configuration to enable fully-native Iceberg is spark.comet.scan.icebergNative.enabled=true.

$SPARK_HOME/bin/spark-shell \
    --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.14.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \
    --repositories https://repo1.maven.org/maven2/ \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hadoop \
    --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.scan.icebergNative.enabled=true \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=2g

Tuning#

Comet’s native Iceberg reader supports fetching multiple files in parallel to hide I/O latency with the config spark.comet.scan.icebergNative.dataFileConcurrencyLimit. This value defaults to 1 to maintain test behavior on Iceberg Java tests without ORDER BY clauses, but we suggest increasing it to values between 2 and 8 based on your workload.

Supported features#

The native Iceberg reader supports the following features:

Table specifications:

  • Iceberg table spec v1 and v2 (v3 will fall back to Spark)

Schema and data types:

  • All primitive types including UUID

  • Complex types: arrays, maps, and structs

  • Schema evolution (adding and dropping columns)

Time travel and branching:

  • VERSION AS OF queries to read historical snapshots

  • Branch reads for accessing named branches

Delete handling (Merge-On-Read tables):

  • Positional deletes

  • Equality deletes

  • Mixed delete types

Filter pushdown:

  • Equality and comparison predicates (=, !=, >, >=, <, <=)

  • Logical operators (AND, OR)

  • NULL checks (IS NULL, IS NOT NULL)

  • IN and NOT IN list operations

  • BETWEEN operations

Partitioning:

  • Standard partitioning with partition pruning

  • Date partitioning with days() transform

  • Bucket partitioning

  • Truncate transform

  • Hour transform

Storage:

  • Local filesystem

  • Hadoop Distributed File System (HDFS)

  • S3-compatible storage (AWS S3, MinIO)

REST Catalog#

Comet’s native Iceberg reader also supports REST catalogs. The following example shows how to configure Spark to use a REST catalog with Comet’s native Iceberg scan:

$SPARK_HOME/bin/spark-shell \
    --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.14.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \
    --repositories https://repo1.maven.org/maven2/ \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.rest_cat=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.rest_cat.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
    --conf spark.sql.catalog.rest_cat.uri=http://localhost:8181 \
    --conf spark.sql.catalog.rest_cat.warehouse=/tmp/warehouse \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.scan.icebergNative.enabled=true \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=2g

Note that REST catalogs require explicit namespace creation before creating tables:

scala> spark.sql("CREATE NAMESPACE rest_cat.db")
scala> spark.sql("CREATE TABLE rest_cat.db.test_table (id INT, name STRING) USING iceberg")
scala> spark.sql("INSERT INTO rest_cat.db.test_table VALUES (1, 'Alice'), (2, 'Bob')")
scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show()

Current limitations#

The following scenarios will fall back to Spark’s native Iceberg reader:

  • Iceberg table spec v3 scans

  • Iceberg writes (reads are accelerated, writes use Spark)

  • Tables backed by Avro or ORC data files (only Parquet is accelerated)

  • Tables partitioned on BINARY or DECIMAL (with precision >28) columns

  • Scans with residual filters using truncate, bucket, year, month, day, or hour transform functions (partition pruning still works, but row-level filtering of these transforms falls back)