Configuration

Ballista Configuration Settings

Configuring Ballista is quite similar to configuring DataFusion. Most settings are identical, with only a few configurations specific to Ballista.

Example: Specifying configuration options when creating a context

use ballista::extension::{SessionConfigExt, SessionContextExt};

let session_config = SessionConfig::new_with_ballista()
    .with_information_schema(true)
    .with_ballista_job_name("Super Cool Ballista App");

let state = SessionStateBuilder::new()
    .with_default_features()
    .with_config(session_config)
    .build();

let ctx: SessionContext = SessionContext::remote_with_state(&url,state).await?;

SessionConfig::new_with_ballista() will setup SessionConfig for use with ballista. This is not required, SessionConfig::new could be used, but it’s advised as it will set up some sensible configuration defaults .

SessionConfigExt expose set of SessionConfigExt::with_ballista_ and SessionConfigExt::ballista_ methods which can tune retrieve ballista specific options.

Notable SessionConfigExt configuration methods would be:

/// Overrides ballista's [LogicalExtensionCodec]
fn with_ballista_logical_extension_codec(
    self,
    codec: Arc<dyn LogicalExtensionCodec>,
) -> SessionConfig;

/// Overrides ballista's [PhysicalExtensionCodec]
fn with_ballista_physical_extension_codec(
    self,
    codec: Arc<dyn PhysicalExtensionCodec>,
) -> SessionConfig;

/// Overrides ballista's [QueryPlanner]
fn with_ballista_query_planner(
    self,
    planner: Arc<dyn QueryPlanner + Send + Sync + 'static>,
) -> SessionConfig;

which could be used to change default ballista behavior.

If information schema is enabled all configuration parameters could be retrieved or set using SQL;

let ctx: SessionContext = SessionContext::remote_with_state(&url, state).await?;

let result = ctx
    .sql("select name, value from information_schema.df_settings where name like 'ballista'")
    .await?
    .collect()
    .await?;

let expected = [
    "+-------------------+-------------------------+",
    "| name              | value                   |",
    "+-------------------+-------------------------+",
    "| ballista.job.name | Super Cool Ballista App |",
    "+-------------------+-------------------------+",
];

Shuffle Settings

The following session-level keys control Ballista’s shuffle behavior. See the tuning guide for an explanation of the hash-based (default) and sort-based shuffle writers.

key

type

default

description

ballista.shuffle.max_concurrent_read_requests

UInt64

64

Maximum number of concurrent fetch requests the shuffle reader will issue.

ballista.shuffle.force_remote_read

Boolean

false

Forces the shuffle reader to fetch every partition through Arrow Flight, even when the data is local. Intended for testing.

ballista.shuffle.remote_read_prefer_flight

Boolean

false

For remote reads, prefer the Arrow Flight reader over the block reader. The block reader is generally faster.

ballista.shuffle.sort_based.enabled

Boolean

false

Enables the sort-based shuffle writer (consolidated data file per input partition with an index, instead of one file per (input partition, output partition) pair).

ballista.shuffle.sort_based.buffer_size

UInt64

1048576

Per-partition buffer size in bytes for the sort-based writer (1 MiB default).

ballista.shuffle.sort_based.memory_limit

UInt64

268435456

Total in-memory budget across all output-partition buffers for the sort-based writer (256 MiB default).

ballista.shuffle.sort_based.spill_threshold

Utf8

“0.8”

Fraction of memory_limit at which the largest buffers spill to disk. Must be in the range 0–1.

ballista.shuffle.sort_based.batch_size

UInt64

8192

Target row count when coalescing buffered batches before they are written or spilled.

Ballista Scheduler Configuration Settings

Besides the BallistaContext configuration settings, a few configuration settings for the Ballista scheduler to better manage the whole cluster are also needed to be taken care of.

Example: Specifying configuration options when starting the scheduler

./ballista-scheduler --scheduler-policy push-staged --event-loop-buffer-size 1000000 --task-distribution round-robin

key

type

default

description

scheduler-policy

Utf8

pull-staged

Sets the task scheduling policy for the scheduler, possible values: pull-staged, push-staged.

event-loop-buffer-size

UInt32

10000

Sets the event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended.

task-distribution

Utf8

bias

Sets the task distribution policy for the scheduler, possible values: bias, round-robin

finished-job-data-clean-up-interval-seconds

UInt64

300

Sets the delayed interval for cleaning up finished job data, mainly the shuffle data, 0 means the cleaning up is disabled.

finished-job-state-clean-up-interval-seconds

UInt64

3600

Sets the delayed interval for cleaning up finished job state stored in the backend, 0 means the cleaning up is disabled.