Tuning Guide¶
Comet provides some tuning options to help you get the best performance from your queries.
Memory Tuning¶
Unified Memory Management with Off-Heap Memory¶
The recommended way to share memory between Spark and Comet is to set spark.memory.offHeap.enabled=true
. This allows
Comet to share an off-heap memory pool with Spark. The size of the pool is specified by spark.memory.offHeap.size
. For more details about Spark off-heap memory mode, please refer to Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
Dedicated Comet Memory Pools¶
Spark uses on-heap memory mode by default, i.e., the spark.memory.offHeap.enabled
setting is not enabled. If Spark is under on-heap memory mode, Comet will use its own dedicated memory pools that
are not shared with Spark. This requires additional configuration settings to be specified to set the size and type of
memory pool to use.
The size of the pool can be set explicitly with spark.comet.memoryOverhead
. If this setting is not specified then
the memory overhead will be calculated by multiplying the executor memory by spark.comet.memory.overhead.factor
(defaults to 0.2
).
The type of pool can be specified with spark.comet.exec.memoryPool
. The default setting is greedy_task_shared
.
The valid pool types are:
greedy
greedy_global
greedy_task_shared
fair_spill
fair_spill_global
fair_spill_task_shared
Pool types ending with _global
use a single global memory pool between all tasks on same executor.
Pool types ending with _task_shared
share a single memory pool across all attempts for a single task.
Other pool types create a dedicated pool per native query plan using a fraction of the available pool size based on number of cores and cores per task.
The greedy*
pool types use DataFusion’s GreedyMemoryPool, which implements a greedy first-come first-serve limit. This
pool works well for queries that do not need to spill or have a single spillable operator.
The fair_spill*
pool types use DataFusion’s FairSpillPool, which prevents spillable reservations from using more
than an even fraction of the available memory sans any unspillable reservations
(i.e. (pool_size - unspillable_memory) / num_spillable_reservations)
). This pool works best when you know beforehand
the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even
when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in
a first-come, first-serve fashion
Determining How Much Memory to Allocate¶
Generally, increasing memory overhead will improve query performance, especially for queries containing joins and aggregates.
Once a memory pool is exhausted, the native plan will start spilling to disk, which will slow down the query.
Insufficient memory allocation can also lead to out-of-memory (OOM) errors.
Configuring spark.executor.memoryOverhead¶
In some environments, such as Kubernetes and YARN, it is important to correctly set spark.executor.memoryOverhead
so
that it is possible to allocate off-heap memory.
Comet will automatically set spark.executor.memoryOverhead
based on the spark.comet.memory*
settings so that
resource managers respect Apache Spark memory configuration before starting the containers.
Note that there is currently a known issue where this will be inaccurate when using Native Memory Management because it does not take executor concurrency into account. The tracking issue for this is https://github.com/apache/datafusion-comet/issues/949.
Optimizing Joins¶
Spark often chooses SortMergeJoin
over ShuffledHashJoin
for stability reasons. If the build-side of a
ShuffledHashJoin
is very large then it could lead to OOM in Spark.
Vectorized query engines tend to perform better with ShuffledHashJoin
, so for best performance it is often preferable
to configure Comet to convert SortMergeJoin
to ShuffledHashJoin
. Comet does not yet provide spill-to-disk for
ShuffledHashJoin
so this could result in OOM. Also, SortMergeJoin
may still be faster in some cases. It is best
to test with both for your specific workloads.
To configure Comet to convert SortMergeJoin
to ShuffledHashJoin
, set spark.comet.exec.replaceSortMergeJoin=true
.
Shuffle¶
Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries.
To enable Comet shuffle, set the following configuration in your Spark configuration:
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
spark.comet.exec.shuffle.enabled=true
spark.shuffle.manager
is a Spark static configuration which cannot be changed at runtime.
It must be set before the Spark context is created. You can enable or disable Comet shuffle
at runtime by setting spark.comet.exec.shuffle.enabled
to true
or false
.
Once it is disabled, Comet will fall back to the default Spark shuffle manager.
Shuffle Mode¶
Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto Mode.
Auto Mode¶
spark.comet.exec.shuffle.mode
to auto
will let Comet choose the best shuffle mode based on the query plan. This
is the default.
Columnar (JVM) Shuffle¶
Comet Columnar shuffle is JVM-based and supports HashPartitioning
, RoundRobinPartitioning
, RangePartitioning
, and
SinglePartitioning
. This mode has the highest query coverage.
Columnar shuffle can be enabled by setting spark.comet.exec.shuffle.mode
to jvm
. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.
Native Shuffle¶
Comet also provides a fully native shuffle implementation, which generally provides the best performance. However,
native shuffle currently only supports HashPartitioning
and SinglePartitioning
.
To enable native shuffle, set spark.comet.exec.shuffle.mode
to native
. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.
Shuffle Compression¶
By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.
Compression can be disabled by setting spark.shuffle.compress=false
, which may result in faster shuffle times in
certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage.
Explain Plan¶
Extended Explain¶
With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists reasons why Comet may not have been enabled for specific operations. To enable this, in the Spark configuration, set the following:
-c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo
This will add a section to the detailed plan displayed in the Spark SQL UI page.