Comet Tuning Guide

Comet provides some tuning options to help you get the best performance from your queries.

Memory Tuning

It is necessary to specify how much memory Comet can use in addition to memory already allocated to Spark. In some cases, it may be possible to reduce the amount of memory allocated to Spark so that overall memory allocation is the same or lower than the original configuration. In other cases, enabling Comet may require allocating more memory than before. See the Determining How Much Memory to Allocate section for more details.

Comet supports Spark’s on-heap (the default) and off-heap mode for allocating memory. However, we strongly recommend using off-heap mode. Comet has some limitations when running in on-heap mode, such as requiring more memory overall, and requiring shuffle memory to be separately configured.

Configuring Comet Memory in Off-Heap Mode

The recommended way to allocate memory for Comet is to set spark.memory.offHeap.enabled=true. This allows Comet to share an off-heap memory pool with Spark, reducing the overall memory overhead. The size of the pool is specified by spark.memory.offHeap.size. For more details about Spark off-heap memory mode, please refer to Spark documentation: https://spark.apache.org/docs/latest/configuration.html.

Configuring Comet Memory in On-Heap Mode

When running in on-heap mode, Comet memory can be allocated by setting spark.comet.memoryOverhead. If this setting is not provided, it will be calculated by multiplying the current Spark executor memory by spark.comet.memory.overhead.factor (default value is 0.2) which may or may not result in enough memory for Comet to operate. It is not recommended to rely on this behavior. It is better to specify spark.comet.memoryOverhead explicitly.

Comet supports native shuffle and columnar shuffle (these terms are explained in the shuffle section below). In on-heap mode, columnar shuffle memory must be separately allocated using spark.comet.columnar.shuffle.memorySize. If this setting is not provided, it will be calculated by multiplying spark.comet.memoryOverhead by spark.comet.columnar.shuffle.memory.factor (default value is 1.0). If a shuffle exceeds this amount of memory then the query will fail.

Determining How Much Memory to Allocate

Generally, increasing the amount of memory allocated to Comet will improve query performance by reducing the amount of time spent spilling to disk, especially for aggregate, join, and shuffle operations. Allocating insufficient memory can result in out-of-memory errors. This is no different from allocating memory in Spark and the amount of memory will vary for different workloads, so some experimentation will be required.

Here is a real-world example, based on running benchmarks derived from TPC-H, running on a single executor against local Parquet files using the 100 GB data set.

Baseline Spark Performance

  • Spark completes the benchmark in 632 seconds with 8 cores and 8 GB RAM

  • With less than 8 GB RAM, performance degrades due to spilling

  • Spark can complete the benchmark with as little as 3 GB of RAM, but with worse performance (744 seconds)

Comet Performance

  • Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap mode, but performance at this level is around 340 seconds, which is significantly faster than Spark with any amount of RAM

  • Comet running in off-heap with 8 cores completes the benchmark in 295 seconds, more than 2x faster than Spark

  • It is worth noting that running Comet with only 4 cores and 4 GB RAM completes the benchmark in 520 seconds, providing better performance than Spark for half the resource

It may be possible to reduce Comet’s memory overhead by reducing batch sizes or increasing number of partitions.

SortExec

Comet’s SortExec implementation spills to disk when under memory pressure, but there are some known issues in the underlying DataFusion SortExec implementation that could cause out-of-memory errors during spilling. See https://github.com/apache/datafusion/issues/14692 for more information.

Workarounds for this problem include:

  • Allocating more off-heap memory

  • Disabling native sort by setting spark.comet.exec.sort.enabled=false

Advanced Memory Tuning

Configuring spark.executor.memoryOverhead in On-Heap Mode

In some environments, such as Kubernetes and YARN, it is important to correctly set spark.executor.memoryOverhead so that it is possible to allocate off-heap memory when running in on-heap mode.

Comet will automatically set spark.executor.memoryOverhead based on the spark.comet.memory* settings so that resource managers respect Apache Spark memory configuration before starting the containers.

Configuring Off-Heap Memory Pools

Comet implements multiple memory pool implementations. The type of pool can be specified with spark.comet.exec.memoryPool.

The valid pool types are:

  • unified (default when spark.memory.offHeap.enabled=true is set)

  • fair_unified

The unified pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator. The size of the pool is specified by spark.memory.offHeap.size and the pool interacts with Spark’s memory pool, effectively sharing the off-heap memory between Spark and Comet. This approach is sometimes referred to as unified memory management.

The fair_unified pool type prevents operators from using more than an even fraction of the available memory (i.e. pool_size / num_reservations). This pool works best when you know beforehand the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even when there is sufficient memory in order to leave enough memory for other operators.

The pool size configuration for the fair_unified pool, is a little more complex. The total pool size is computed by multiplying spark.memory.offHeap.size by spark.comet.memory.overhead.factor with the minimum amount being spark.comet.memory.overhead.min. It is also possible to manually specify spark.comet.memoryOverhead instead to override this default behavior. Note that the fair_unified pool does not use unified memory management to interact with Spark’s memory pools, which is why the allocation defaults to a fraction of off-heap memory.

Configuring On-Heap Memory Pools

When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark.

The type of pool can be specified with spark.comet.exec.memoryPool. The default setting is greedy_task_shared.

The valid pool types are:

  • greedy

  • greedy_global

  • greedy_task_shared

  • fair_spill

  • fair_spill_global

  • fair_spill_task_shared

  • unbounded

Pool types ending with _global use a single global memory pool between all tasks on same executor.

Pool types ending with _task_shared share a single memory pool across all attempts for a single task.

Other pool types create a dedicated pool per native query plan using a fraction of the available pool size based on number of cores and cores per task.

The greedy* pool types use DataFusion’s GreedyMemoryPool, which implements a greedy first-come first-serve limit. This pool works well for queries that do not need to spill or have a single spillable operator.

The fair_spill* pool types use DataFusion’s FairSpillPool, which prevents spillable reservations from using more than an even fraction of the available memory sans any unspillable reservations (i.e. (pool_size - unspillable_memory) / num_spillable_reservations). This pool works best when you know beforehand the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in a first-come, first-serve fashion

The unbounded pool type uses DataFusion’s UnboundedMemoryPool, which enforces no limit. This option is useful for development/testing purposes, where there is no room to allow spilling and rather choose to fail the job. Spilling significantly slows down the job and this option is one way to measure the best performance scenario without adjusting how much memory to allocate.

Optimizing Joins

Spark often chooses SortMergeJoin over ShuffledHashJoin for stability reasons. If the build-side of a ShuffledHashJoin is very large then it could lead to OOM in Spark.

Vectorized query engines tend to perform better with ShuffledHashJoin, so for best performance it is often preferable to configure Comet to convert SortMergeJoin to ShuffledHashJoin. Comet does not yet provide spill-to-disk for ShuffledHashJoin so this could result in OOM. Also, SortMergeJoin may still be faster in some cases. It is best to test with both for your specific workloads.

To configure Comet to convert SortMergeJoin to ShuffledHashJoin, set spark.comet.exec.replaceSortMergeJoin=true.

Shuffle

Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries.

To enable Comet shuffle, set the following configuration in your Spark configuration:

spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
spark.comet.exec.shuffle.enabled=true

spark.shuffle.manager is a Spark static configuration which cannot be changed at runtime. It must be set before the Spark context is created. You can enable or disable Comet shuffle at runtime by setting spark.comet.exec.shuffle.enabled to true or false. Once it is disabled, Comet will fall back to the default Spark shuffle manager.

Shuffle Implementations

Comet provides two shuffle implementations: Native Shuffle and Columnar Shuffle. Comet will first try to use Native Shuffle and if that is not possible it will try to use Columnar Shuffle. If neither can be applied, it will fall back to Spark for shuffle operations.

Native Shuffle

Comet provides a fully native shuffle implementation, which generally provides the best performance. However, native shuffle currently only supports HashPartitioning and SinglePartitioning and has some restrictions on supported data types.

Columnar (JVM) Shuffle

Comet Columnar shuffle is JVM-based and supports HashPartitioning, RoundRobinPartitioning, RangePartitioning, and SinglePartitioning. This shuffle implementation supports more data types than native shuffle.

Shuffle Compression

By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression. Compression can be disabled by setting spark.shuffle.compress=false, which may result in faster shuffle times in certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage.

Explain Plan

Extended Explain

With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists reasons why Comet may not have been enabled for specific operations. To enable this, in the Spark configuration, set the following:

-c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo

This will add a section to the detailed plan displayed in the Spark SQL UI page.