Comet Tuning Guide¶
Comet provides some tuning options to help you get the best performance from your queries.
Configuring Tokio Runtime¶
Comet uses a global tokio runtime per executor process using tokio’s defaults of one worker thread per core and a
maximum of 512 blocking threads. These values can be overridden using the environment variables COMET_WORKER_THREADS
and COMET_MAX_BLOCKING_THREADS
.
It is recommended that COMET_WORKER_THREADS
be set to the number of executor cores. This may not be necessary
in some environments, such as Kubernetes, where the number of cores allocated to a pod will already be equal to the
number of executor cores.
Memory Tuning¶
It is necessary to specify how much memory Comet can use in addition to memory already allocated to Spark. In some cases, it may be possible to reduce the amount of memory allocated to Spark so that overall memory allocation is the same or lower than the original configuration. In other cases, enabling Comet may require allocating more memory than before. See the Determining How Much Memory to Allocate section for more details.
Configuring Comet Memory¶
Comet shares an off-heap memory pool with Spark. The size of the pool is
specified by spark.memory.offHeap.size
.
Comet’s memory accounting isn’t 100% accurate and this can result in Comet using more memory than it reserves,
leading to out-of-memory exceptions. To work around this issue, it is possible to
set spark.comet.exec.memoryPool.fraction
to a value less than 1.0
to restrict the amount of memory that can be
reserved by Comet.
For more details about Spark off-heap memory mode, please refer to Spark documentation.
Comet implements multiple memory pool implementations. The type of pool can be specified with spark.comet.exec.memoryPool
.
The valid pool types are:
fair_unified
(default whenspark.memory.offHeap.enabled=true
is set)greedy_unified
The fair_unified
pool types prevents operators from using more than an even fraction of the available memory
(i.e. pool_size / num_reservations
). This pool works best when you know beforehand
the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even
when there is sufficient memory in order to leave enough memory for other operators.
The greedy_unified
pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
need to spill or have a single spillable operator.
Determining How Much Memory to Allocate¶
Generally, increasing the amount of memory allocated to Comet will improve query performance by reducing the amount of time spent spilling to disk, especially for aggregate, join, and shuffle operations. Allocating insufficient memory can result in out-of-memory errors. This is no different from allocating memory in Spark and the amount of memory will vary for different workloads, so some experimentation will be required.
Here is a real-world example, based on running benchmarks derived from TPC-H, running on a single executor against local Parquet files using the 100 GB data set.
Baseline Spark Performance
Spark completes the benchmark in 632 seconds with 8 cores and 8 GB RAM
With less than 8 GB RAM, performance degrades due to spilling
Spark can complete the benchmark with as little as 3 GB of RAM, but with worse performance (744 seconds)
Comet Performance
Comet requires at least 5 GB of RAM, but performance at this level is around 340 seconds, which is significantly faster than Spark with any amount of RAM
Comet running in off-heap with 8 cores completes the benchmark in 295 seconds, more than 2x faster than Spark
It is worth noting that running Comet with only 4 cores and 4 GB RAM completes the benchmark in 520 seconds, providing better performance than Spark for half the resource
It may be possible to reduce Comet’s memory overhead by reducing batch sizes or increasing number of partitions.
Optimizing Joins¶
Spark often chooses SortMergeJoin
over ShuffledHashJoin
for stability reasons. If the build-side of a
ShuffledHashJoin
is very large then it could lead to OOM in Spark.
Vectorized query engines tend to perform better with ShuffledHashJoin
, so for best performance it is often preferable
to configure Comet to convert SortMergeJoin
to ShuffledHashJoin
. Comet does not yet provide spill-to-disk for
ShuffledHashJoin
so this could result in OOM. Also, SortMergeJoin
may still be faster in some cases. It is best
to test with both for your specific workloads.
To configure Comet to convert SortMergeJoin
to ShuffledHashJoin
, set spark.comet.exec.replaceSortMergeJoin=true
.
Shuffle¶
Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries.
To enable Comet shuffle, set the following configuration in your Spark configuration:
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
spark.comet.exec.shuffle.enabled=true
spark.shuffle.manager
is a Spark static configuration which cannot be changed at runtime.
It must be set before the Spark context is created. You can enable or disable Comet shuffle
at runtime by setting spark.comet.exec.shuffle.enabled
to true
or false
.
Once it is disabled, Comet will fall back to the default Spark shuffle manager.
Shuffle Implementations¶
Comet provides two shuffle implementations: Native Shuffle and Columnar Shuffle. Comet will first try to use Native Shuffle and if that is not possible it will try to use Columnar Shuffle. If neither can be applied, it will fall back to Spark for shuffle operations.
Native Shuffle¶
Comet provides a fully native shuffle implementation, which generally provides the best performance. Native shuffle
supports HashPartitioning
, RangePartitioning
and SinglePartitioning
but currently only supports primitive type
partitioning keys. Columns that are not partitioning keys may contain complex types like maps, structs, and arrays.
Columnar (JVM) Shuffle¶
Comet Columnar shuffle is JVM-based and supports HashPartitioning
, RoundRobinPartitioning
, RangePartitioning
, and
SinglePartitioning
. This shuffle implementation supports complex data types as partitioning keys.
Shuffle Compression¶
By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.
Compression can be disabled by setting spark.shuffle.compress=false
, which may result in faster shuffle times in
certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage.
Explain Plan¶
Extended Explain¶
With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists reasons why Comet may not have been enabled for specific operations. To enable this, in the Spark configuration, set the following:
-c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo
This will add a section to the detailed plan displayed in the Spark SQL UI page.