JVM Shuffle#
This document describes Comet’s JVM-based columnar shuffle implementation (CometColumnarShuffle), which
writes shuffle data in Arrow IPC format using JVM code with native encoding. For the fully native
alternative, see Native Shuffle.
Overview#
Comet provides two shuffle implementations:
CometNativeShuffle (
CometExchange): Fully native shuffle using Rust. Takes columnar input directly from Comet native operators and performs partitioning in native code.CometColumnarShuffle (
CometColumnarExchange): JVM-based shuffle that operates on rows internally, buffersUnsafeRows in memory pages, and uses native code (via JNI) to encode them to Arrow IPC format. Uses Spark’s partitioner for partition assignment. Can accept either row-based or columnar input (columnar input is converted to rows viaColumnarToRowExec).
The JVM shuffle is selected via CometShuffleDependency.shuffleType.
When JVM Shuffle is Used#
JVM shuffle (CometColumnarExchange) is used instead of native shuffle (CometExchange) in the following cases:
Shuffle mode is explicitly set to “jvm”: When
spark.comet.exec.shuffle.modeis set tojvm.Child plan is not a Comet native operator: When the child plan is a Spark row-based operator (not a
CometPlan), JVM shuffle is the only option since native shuffle requires columnar input from Comet operators.Unsupported partitioning type: Native shuffle only supports
HashPartitioning,RangePartitioning, andSinglePartition. JVM shuffle additionally supportsRoundRobinPartitioning.Unsupported partition key types: For
HashPartitioningandRangePartitioning, native shuffle only supports primitive types as partition keys. Complex types (struct, array, map) cannot be used as partition keys in native shuffle, though they are fully supported as data columns in both implementations.
Input Handling#
Spark Row-Based Input#
When the child plan is a Spark row-based operator, CometColumnarExchange calls child.execute() which
returns an RDD[InternalRow]. The rows flow directly to the JVM shuffle writers.
Comet Columnar Input#
When the child plan is a Comet native operator (e.g., CometHashAggregate) but JVM shuffle is selected
(due to shuffle mode setting or unsupported partitioning), CometColumnarExchange still calls
child.execute(). Comet operators implement doExecute() by wrapping themselves with ColumnarToRowExec:
// In CometExec base class
override def doExecute(): RDD[InternalRow] =
ColumnarToRowExec(this).doExecute()
This means the data path becomes:
Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
This is less efficient than native shuffle which avoids the columnar-to-row conversion:
Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
Why Use Spark’s Partitioner?#
JVM shuffle uses row-based input so it can leverage Spark’s existing partitioner infrastructure
(partitioner.getPartition(key)). This allows Comet to support all of Spark’s partitioning schemes
without reimplementing them in Rust. Native shuffle, by contrast, serializes the partitioning scheme
to protobuf and implements the partitioning logic in native code.
Architecture#
┌─────────────────────────────────────────────────────────────────────────┐
│ CometShuffleManager │
│ - Extends Spark's ShuffleManager │
│ - Routes to appropriate writer/reader based on ShuffleHandle type │
└─────────────────────────────────────────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────┐
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ CometBypassMerge- │ │ CometUnsafe- │ │ CometNative- │
│ SortShuffleWriter │ │ ShuffleWriter │ │ ShuffleWriter │
│ (hash-based) │ │ (sort-based) │ │ (fully native) │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
│ │
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ CometDiskBlock- │ │ CometShuffleExternal│
│ Writer │ │ Sorter │
└─────────────────────┘ └─────────────────────┘
│ │
└────────────┬───────────┘
▼
┌─────────────────────┐
│ SpillWriter │
│ (native encoding │
│ via JNI) │
└─────────────────────┘
Key Classes#
Shuffle Manager#
Class |
Location |
Description |
|---|---|---|
|
|
Entry point. Extends Spark’s |
|
|
Extends |
Shuffle Handles#
Handle |
Writer Strategy |
|---|---|
|
Hash-based: one file per partition, merged at end |
|
Sort-based: records sorted by partition ID, single output |
|
Fully native shuffle |
Selection logic in CometShuffleManager.shouldBypassMergeSort():
Uses bypass if partitions < threshold AND partitions × cores ≤ max threads
Otherwise uses sort-based to avoid OOM from many concurrent writers
Writers#
Class |
Location |
Description |
|---|---|---|
|
|
Hash-based writer. Creates one |
|
|
Sort-based writer. Uses |
|
|
Buffers rows in memory pages for a single partition. Spills to disk via native encoding. Used by bypass writer. |
|
|
Buffers records across all partitions, sorts by partition ID, spills sorted data. Used by unsafe writer. |
|
|
Base class for spill logic. Manages memory pages and calls |
Reader#
Class |
Location |
Description |
|---|---|---|
|
|
Fetches shuffle blocks via |
|
|
Reads compressed Arrow IPC batches from input stream. Calls |
Data Flow#
Write Path#
ShuffleWriteProcessorcallsCometShuffleManager.getWriter()Writer receives
Iterator[Product2[K, V]]where V isUnsafeRowRows are serialized and buffered in off-heap memory pages
When memory threshold or batch size is reached,
SpillWriter.doSpilling()is calledNative code (
Native.writeSortedFileNative()) converts rows to Arrow arrays and writes IPC formatFor bypass writer: partition files are concatenated into final output
For sort writer: spill files are merged
Read Path#
CometBlockStoreShuffleReader.read()createsShuffleBlockFetcherIteratorFor each block,
NativeBatchDecoderIteratorreads the IPC streamNative code (
Native.decodeShuffleBlock()) decompresses and decodes to Arrow arraysArrow FFI imports arrays as
ColumnarBatch
Memory Management#
CometShuffleMemoryAllocator: Custom allocator for off-heap memory pagesMemory is allocated in pages; when allocation fails, writers spill to disk
CometDiskBlockWritercoordinates spilling across all partition writers (largest first)Async spilling is supported via
ShuffleThreadPool
Configuration#
Config |
Description |
|---|---|
|
Enable async spill writes |
|
Threads per writer for async |
|
Rows per Arrow batch |
|
Row count threshold for spill |
|
Compression codec (zstd, lz4, etc.) |