Accelerating Apache Iceberg Parquet Scans using Comet (Experimental)#
Note: Iceberg integration is a work-in-progress. Comet currently has two distinct Iceberg code paths: 1) a hybrid reader (native Parquet decoding, JVM otherwise) that requires building Iceberg from source rather than using available artifacts in Maven, and 2) fully-native reader (based on iceberg-rust). Directions for both designs are provided below.
Hybrid Reader#
Build Comet#
Run a Maven install so that we can compile Iceberg against latest Comet:
mvn install -DskipTests
Build the release JAR to be used from Spark:
make release
Set COMET_JAR env var:
export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-0.13.0.jar
Build Iceberg#
Clone the Iceberg repository and apply code changes needed by Comet
git clone git@github.com:apache/iceberg.git
cd iceberg
git checkout apache-iceberg-1.8.1
git apply ../datafusion-comet/dev/diffs/iceberg/1.8.1.diff
Perform a clean build
./gradlew clean build -x test -x integrationTest
Test#
Set ICEBERG_JAR environment variable.
export ICEBERG_JAR=`pwd`/spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.9.0-SNAPSHOT.jar
Launch Spark Shell:
$SPARK_HOME/bin/spark-shell \
--driver-class-path $COMET_JAR:$ICEBERG_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.iceberg.parquet.reader-type=COMET \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g \
--conf spark.comet.use.lazyMaterialization=false \
--conf spark.comet.schemaEvolution.enabled=true
Create an Iceberg table. Note that Comet will not accelerate this part.
scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg")
scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 10000).map(i => (i, i)).mkString(",")}")
Comet should now be able to accelerate reading the table:
scala> spark.sql(s"SELECT * from t1").show()
This should produce the following output:
scala> spark.sql(s"SELECT * from t1").show()
+---+---+
| c0| c1|
+---+---+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
| 10| 10|
| 11| 11|
| 12| 12|
| 13| 13|
| 14| 14|
| 15| 15|
| 16| 16|
| 17| 17|
| 18| 18|
| 19| 19|
+---+---+
only showing top 20 rows
Confirm that the query was accelerated by Comet:
scala> spark.sql(s"SELECT * from t1").explain()
== Physical Plan ==
*(1) CometColumnarToRow
+- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: []
Known issues#
Spark Runtime Filtering isn’t working
You can bypass the issue by either setting
spark.sql.adaptive.enabled=falseorspark.comet.exec.broadcastExchange.enabled=false
Native Reader#
Comet’s fully-native Iceberg integration does not require modifying Iceberg source
code. Instead, Comet relies on reflection to extract FileScanTasks from Iceberg, which are
then serialized to Comet’s native execution engine (see
PR #2528).
The example below uses Spark’s package downloader to retrieve Comet 0.12.0 and Iceberg
1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, and 1.10. The key configuration
to enable fully-native Iceberg is spark.comet.scan.icebergNative.enabled=true. This
configuration should not be used with the hybrid Iceberg configuration
spark.sql.iceberg.parquet.reader-type=COMET from above.
$SPARK_HOME/bin/spark-shell \
--packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \
--repositories https://repo1.maven.org/maven2/ \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.spark_catalog.type=hadoop \
--conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.scan.icebergNative.enabled=true \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g
The same sample queries from above can be used to test Comet’s fully-native Iceberg integration,
however the scan node to look for is CometIcebergNativeScan.
Supported features#
The native Iceberg reader supports the following features:
Table specifications:
Iceberg table spec v1 and v2 (v3 will fall back to Spark)
Schema and data types:
All primitive types including UUID
Complex types: arrays, maps, and structs
Schema evolution (adding and dropping columns)
Time travel and branching:
VERSION AS OFqueries to read historical snapshotsBranch reads for accessing named branches
Delete handling (Merge-On-Read tables):
Positional deletes
Equality deletes
Mixed delete types
Filter pushdown:
Equality and comparison predicates (
=,!=,>,>=,<,<=)Logical operators (
AND,OR)NULL checks (
IS NULL,IS NOT NULL)INandNOT INlist operationsBETWEENoperations
Partitioning:
Standard partitioning with partition pruning
Date partitioning with
days()transformBucket partitioning
Truncate transform
Hour transform
Storage:
Local filesystem
Hadoop Distributed File System (HDFS)
S3-compatible storage (AWS S3, MinIO)
REST Catalog#
Comet’s native Iceberg reader also supports REST catalogs. The following example shows how to configure Spark to use a REST catalog with Comet’s native Iceberg scan:
$SPARK_HOME/bin/spark-shell \
--packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \
--repositories https://repo1.maven.org/maven2/ \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.rest_cat=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.rest_cat.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.rest_cat.uri=http://localhost:8181 \
--conf spark.sql.catalog.rest_cat.warehouse=/tmp/warehouse \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.scan.icebergNative.enabled=true \
--conf spark.comet.explainFallback.enabled=true \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g
Note that REST catalogs require explicit namespace creation before creating tables:
scala> spark.sql("CREATE NAMESPACE rest_cat.db")
scala> spark.sql("CREATE TABLE rest_cat.db.test_table (id INT, name STRING) USING iceberg")
scala> spark.sql("INSERT INTO rest_cat.db.test_table VALUES (1, 'Alice'), (2, 'Bob')")
scala> spark.sql("SELECT * FROM rest_cat.db.test_table").show()
Current limitations#
The following scenarios will fall back to Spark’s native Iceberg reader:
Iceberg table spec v3 scans
Iceberg writes (reads are accelerated, writes use Spark)
Tables backed by Avro or ORC data files (only Parquet is accelerated)
Tables partitioned on
BINARYorDECIMAL(with precision >28) columnsScans with residual filters using
truncate,bucket,year,month,day, orhourtransform functions (partition pruning still works, but row-level filtering of these transforms falls back)