Tuning Guide

Partitions and Parallelism

The goal of any distributed compute engine is to parallelize work as much as possible, allowing the work to scale by adding more compute resource.

The basic unit of concurrency and parallelism in Ballista is the concept of a partition. The leaf nodes of a query are typically table scans that read from files on disk and Ballista currently treats each file within a table as a single partition (in the future, Ballista will support splitting files into partitions but this is not implemented yet).

For example, if there is a table “customer” that consists of 200 Parquet files, that table scan will naturally have 200 partitions and the table scan and certain subsequent operations will also have 200 partitions. Conversely, if the table only has a single Parquet file then there will be a single partition and the work will not be able to scale even if the cluster has resource available. Ballista supports repartitioning within a query to improve parallelism. The configuration setting datafusion.execution.target_partitionscan be set to the desired number of partitions. This is currently a global setting for the entire context. The default value for this setting is 16.

Note that Ballista will never decrease the number of partitions based on this setting and will only repartition if the source operation has fewer partitions than this setting.

Example: Setting the desired number of shuffle partitions when creating a context.

use ballista::extension::{SessionConfigExt, SessionContextExt};

let session_config = SessionConfig::new_with_ballista()
    .with_target_partitions(200);

let state = SessionStateBuilder::new()
    .with_default_features()
    .with_config(session_config)
    .build();

let ctx: SessionContext = SessionContext::remote_with_state(&url,state).await?;

Configuring Executor Concurrency Levels

Each executor instance has a fixed number of tasks that it can process concurrently. This is specified by passing a concurrent_tasks command-line parameter. The default setting is to use all available CPU cores.

Increasing this configuration setting will increase the number of tasks that each executor can run concurrently but this will also mean that the executor will use more memory. If executors are failing due to out-of-memory errors then decreasing the number of concurrent tasks may help.

Configuring Executor Memory Pool

By default the executor uses DataFusion’s unbounded memory pool, so spillable operators (sort, hash join, hash aggregate) grow until the host runs out of memory. To bound executor memory and let those operators spill to disk under pressure, pass --memory-pool-size when starting the executor:

ballista-executor --memory-pool-size 8GB --concurrent-tasks 8

The argument accepts human-readable sizes (8GB, 512MiB) or a plain byte count. SI suffixes (KB/MB/GB) are powers of 10; IEC suffixes (KiB/MiB/GiB) are powers of 2.

The total budget is divided equally across concurrent task slots: each task receives its own FairSpillPool of size memory_pool_size / concurrent_tasks. With --memory-pool-size 8GB --concurrent-tasks 8, every task sees a 1 GB pool, fully isolated from other tasks. Idle slots do not lend their share to busy ones, which keeps task memory predictable at the cost of some unused capacity when the executor is under-utilized.

The executor refuses to start if the per-task share would round to zero (i.e. memory_pool_size < concurrent_tasks).

When --memory-pool-size is not set, the executor behaves as before with no memory pool installed.

Shuffle Implementation

Ballista exchanges data between query stages by writing the output of each upstream task to local files, which downstream tasks read either from disk (when co-located) or over Arrow Flight. Two shuffle implementations are available, with different trade-offs around file count, memory use, and write latency.

Hash-based shuffle (default)

The default writer hashes each incoming RecordBatch and immediately encodes the per-partition slices to Arrow IPC, streaming them into one file per (input_partition, output_partition) pair. Nothing is buffered in memory across batches.

This is simple and low latency, but for N input partitions and M output partitions it produces N × M files. Wide shuffles can therefore generate a large number of small files.

Sort-based shuffle (opt-in)

The sort-based writer accumulates batches in a per-output-partition in-memory buffer. When the total buffered size crosses a threshold, the largest buffers are spilled to disk. After the input stream finishes, the remaining in-memory data and any spilled batches are merged and written into a single consolidated Arrow IPC file per input partition, alongside an index file that lets readers seek directly to a given output partition.

This produces 2 × N files instead of N × M, coalesces small batches to a target size before writing, and bounds shuffle memory use via spilling — at the cost of higher write latency than the hash writer. Consider enabling it for queries with high partition fan-out or when file-count pressure on local storage is a concern.

The sort-based writer is disabled by default and is enabled per session:

let session_config = SessionConfig::new_with_ballista()
    .set_bool("ballista.shuffle.sort_based.enabled", true);

The following session-level keys tune its behavior:

key

type

default

description

ballista.shuffle.sort_based.enabled

Boolean

false

Enables the sort-based shuffle writer.

ballista.shuffle.sort_based.buffer_size

UInt64

1048576

Per-partition buffer size in bytes (1 MiB default).

ballista.shuffle.sort_based.memory_limit

UInt64

268435456

Total in-memory budget across all output-partition buffers (256 MiB default).

ballista.shuffle.sort_based.spill_threshold

Utf8

“0.8”

Fraction of memory_limit at which the largest buffers begin spilling to disk. Must be in the range 0–1.

ballista.shuffle.sort_based.batch_size

UInt64

8192

Target row count when coalescing buffered batches before they are written or spilled.

Push-based vs Pull-based Task Scheduling

Ballista supports both push-based and pull-based task scheduling. It is recommended that you try both to determine which is the best for your use case.

Pull-based scheduling works in a similar way to Apache Spark and push-based scheduling can result in lower latency.

The scheduling policy can be specified in the --scheduler-policy parameter when starting the scheduler and executor processes. The default is pull-staged.

Viewing Query Plans and Metrics

The scheduler provides a REST API for monitoring jobs. See the scheduler documentation for more information.

This is optional scheduler feature which should be enabled with rest-api feature

To download a query plan in dot format from the scheduler, submit a request to the following API endpoint

http://localhost:50050/api/job/{job_id}/dot

The resulting file can be converted into an image using graphviz:

dot -Tpng query.dot > query.png

Here is an example query plan:

query plan