Native Shuffle#
This document describes Comet’s native shuffle implementation (CometNativeShuffle), which performs
shuffle operations entirely in Rust code for maximum performance. For the JVM-based alternative,
see JVM Shuffle.
Overview#
Native shuffle takes columnar input directly from Comet native operators and performs partitioning, encoding, and writing in native Rust code. This avoids the columnar-to-row-to-columnar conversion overhead that JVM shuffle incurs.
Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar
Compare this to JVM shuffle’s data path:
Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar
When Native Shuffle is Used#
Native shuffle (CometExchange) is selected when all of the following conditions are met:
Shuffle mode allows native:
spark.comet.exec.shuffle.modeisnativeorauto.Child plan is a Comet native operator: The child must be a
CometPlanthat produces columnar output. Row-based Spark operators require JVM shuffle.Supported partitioning type: Native shuffle supports:
HashPartitioningRangePartitioningSinglePartition
RoundRobinPartitioningrequires JVM shuffle.Supported partition key types: For
HashPartitioningandRangePartitioning, partition keys must be primitive types. Complex types (struct, array, map) as partition keys require JVM shuffle. Note that complex types are fully supported as data columns in native shuffle.
Architecture#
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometShuffleManager │
│ - Routes to CometNativeShuffleWriter for CometNativeShuffleHandle │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ CometNativeShuffleWriter │
│ - Constructs protobuf operator plan │
│ - Invokes native execution via CometExec.getCometIterator() │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼ (JNI)
┌─────────────────────────────────────────────────────────────────────────────┐
│ ShuffleWriterExec (Rust) │
│ - DataFusion ExecutionPlan │
│ - Orchestrates partitioning and writing │
└─────────────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌───────────────────────────────────┐ ┌───────────────────────────────────┐
│ MultiPartitionShuffleRepartitioner │ │ SinglePartitionShufflePartitioner │
│ (hash/range partitioning) │ │ (single partition case) │
└───────────────────────────────────┘ └───────────────────────────────────┘
│
▼
┌───────────────────────────────────┐
│ ShuffleBlockWriter │
│ (Arrow IPC + compression) │
└───────────────────────────────────┘
│
▼
┌─────────────────┐
│ Data + Index │
│ Files │
└─────────────────┘
Key Classes#
Scala Side#
Class |
Location |
Description |
|---|---|---|
|
|
Physical plan node. Validates types and partitioning, creates |
|
|
Implements |
|
|
Extends |
|
|
Reads shuffle blocks via |
|
|
Reads compressed Arrow IPC from input stream. Calls native decode via JNI. |
Rust Side#
File |
Location |
Description |
|---|---|---|
|
|
|
|
|
|
|
|
|
Data Flow#
Write Path#
Plan construction:
CometNativeShuffleWriterbuilds a protobuf operator plan containing:A scan operator reading from the input iterator
A
ShuffleWriteroperator with partitioning config and compression codec
Native execution:
CometExec.getCometIterator()executes the plan in Rust.Partitioning:
ShuffleWriterExecreceives batches and routes to the appropriate partitioner:MultiPartitionShuffleRepartitioner: For hash/range partitioningSinglePartitionShufflePartitioner: For single partition (simpler path)
Buffering and spilling: The partitioner buffers rows per partition. When memory pressure exceeds the threshold, partitions spill to temporary files.
Encoding:
ShuffleBlockWriterencodes each partition’s data as compressed Arrow IPC:Writes compression type header
Writes field count header
Writes compressed IPC stream
Output files: Two files are produced:
Data file: Concatenated partition data
Index file: Array of 8-byte little-endian offsets marking partition boundaries
Commit: Back in JVM,
CometNativeShuffleWriterreads the index file to get partition lengths and commits via Spark’sIndexShuffleBlockResolver.
Read Path#
CometBlockStoreShuffleReaderfetches shuffle blocks viaShuffleBlockFetcherIterator.For each block,
NativeBatchDecoderIterator:Reads the 8-byte compressed length header
Reads the 8-byte field count header
Reads the compressed IPC data
Calls
Native.decodeShuffleBlock()via JNI
Native code decompresses and deserializes the Arrow IPC stream.
Arrow FFI transfers the
RecordBatchto JVM as aColumnarBatch.
Partitioning#
Hash Partitioning#
Native shuffle implements Spark-compatible hash partitioning:
Uses Murmur3 hash function with seed 42 (matching Spark)
Computes hash of partition key columns
Applies modulo by partition count:
partition_id = hash % num_partitions
Range Partitioning#
For range partitioning:
Spark’s
RangePartitionersamples data and computes partition boundaries on the driver.Boundaries are serialized to the native plan.
Native code converts sort key columns to comparable row format.
Binary search (
partition_point) determines which partition each row belongs to.
Single Partition#
The simplest case: all rows go to partition 0. Uses SinglePartitionShufflePartitioner which
simply concatenates batches to reach the configured batch size.
Memory Management#
Native shuffle uses DataFusion’s memory management with spilling support:
Memory pool: Tracks memory usage across the shuffle operation.
Spill threshold: When buffered data exceeds the threshold, partitions spill to disk.
Per-partition spilling: Each partition has its own spill file. Multiple spills for a partition are concatenated when writing the final output.
Scratch space: Reusable buffers for partition ID computation to reduce allocations.
The MultiPartitionShuffleRepartitioner manages:
PartitionBuffer: In-memory buffer for each partitionSpillFile: Temporary file for spilled dataMemory tracking via
MemoryConsumertrait
Compression#
Native shuffle supports multiple compression codecs configured via
spark.comet.exec.shuffle.compression.codec:
Codec |
Description |
|---|---|
|
Zstandard compression. Best ratio, configurable level. |
|
LZ4 compression. Fast with good ratio. |
|
Snappy compression. Fastest, lower ratio. |
|
No compression. |
The compression codec is applied uniformly to all partitions. Each partition’s data is independently compressed, allowing parallel decompression during reads.
Configuration#
Config |
Default |
Description |
|---|---|---|
|
|
Enable Comet shuffle |
|
|
Shuffle mode: |
|
|
Compression codec |
|
|
Zstd compression level |
|
|
Write buffer size |
|
|
Target rows per batch |
Comparison with JVM Shuffle#
Aspect |
Native Shuffle |
JVM Shuffle |
|---|---|---|
Input format |
Columnar (direct from Comet operators) |
Row-based (via ColumnarToRowExec) |
Partitioning logic |
Rust implementation |
Spark’s partitioner |
Supported schemes |
Hash, Range, Single |
Hash, Range, Single, RoundRobin |
Partition key types |
Primitives only |
Any type |
Performance |
Higher (no format conversion) |
Lower (columnar→row→columnar) |
Writer variants |
Single path |
Bypass (hash) and sort-based |
See JVM Shuffle for details on the JVM-based implementation.