Configuration¶
Ballista Configuration Settings¶
Configuring Ballista is quite similar to configuring DataFusion. Most settings are identical, with only a few configurations specific to Ballista.
Example: Specifying configuration options when creating a context
use ballista::extension::{SessionConfigExt, SessionContextExt};
let session_config = SessionConfig::new_with_ballista()
.with_information_schema(true)
.with_ballista_job_name("Super Cool Ballista App");
let state = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config)
.build();
let ctx: SessionContext = SessionContext::remote_with_state(&url,state).await?;
SessionConfig::new_with_ballista() will setup SessionConfig for use with ballista. This is not required, SessionConfig::new could be used, but it’s advised as it will set up some sensible configuration defaults .
SessionConfigExt expose set of SessionConfigExt::with_ballista_ and SessionConfigExt::ballista_ methods which can tune retrieve ballista specific options.
Notable SessionConfigExt configuration methods would be:
/// Overrides ballista's [LogicalExtensionCodec]
fn with_ballista_logical_extension_codec(
self,
codec: Arc<dyn LogicalExtensionCodec>,
) -> SessionConfig;
/// Overrides ballista's [PhysicalExtensionCodec]
fn with_ballista_physical_extension_codec(
self,
codec: Arc<dyn PhysicalExtensionCodec>,
) -> SessionConfig;
/// Overrides ballista's [QueryPlanner]
fn with_ballista_query_planner(
self,
planner: Arc<dyn QueryPlanner + Send + Sync + 'static>,
) -> SessionConfig;
which could be used to change default ballista behavior.
If information schema is enabled all configuration parameters could be retrieved or set using SQL;
let ctx: SessionContext = SessionContext::remote_with_state(&url, state).await?;
let result = ctx
.sql("select name, value from information_schema.df_settings where name like 'ballista'")
.await?
.collect()
.await?;
let expected = [
"+-------------------+-------------------------+",
"| name | value |",
"+-------------------+-------------------------+",
"| ballista.job.name | Super Cool Ballista App |",
"+-------------------+-------------------------+",
];
Shuffle Settings¶
The following session-level keys control Ballista’s shuffle behavior. See the tuning guide for an explanation of the hash-based (default) and sort-based shuffle writers.
key |
type |
default |
description |
|---|---|---|---|
ballista.shuffle.max_concurrent_read_requests |
UInt64 |
64 |
Maximum number of concurrent fetch requests the shuffle reader will issue. |
ballista.shuffle.force_remote_read |
Boolean |
false |
Forces the shuffle reader to fetch every partition through Arrow Flight, even when the data is local. Intended for testing. |
ballista.shuffle.remote_read_prefer_flight |
Boolean |
false |
For remote reads, prefer the Arrow Flight reader over the block reader. The block reader is generally faster. |
ballista.shuffle.sort_based.enabled |
Boolean |
false |
Enables the sort-based shuffle writer (consolidated data file per input partition with an index, instead of one file per (input partition, output partition) pair). |
ballista.shuffle.sort_based.buffer_size |
UInt64 |
1048576 |
Per-partition buffer size in bytes for the sort-based writer (1 MiB default). |
ballista.shuffle.sort_based.memory_limit |
UInt64 |
268435456 |
Total in-memory budget across all output-partition buffers for the sort-based writer (256 MiB default). |
ballista.shuffle.sort_based.spill_threshold |
Utf8 |
“0.8” |
Fraction of |
ballista.shuffle.sort_based.batch_size |
UInt64 |
8192 |
Target row count when coalescing buffered batches before they are written or spilled. |
Ballista Scheduler Configuration Settings¶
Besides the BallistaContext configuration settings, a few configuration settings for the Ballista scheduler to better manage the whole cluster are also needed to be taken care of.
Example: Specifying configuration options when starting the scheduler
./ballista-scheduler --scheduler-policy push-staged --event-loop-buffer-size 1000000 --task-distribution round-robin
key |
type |
default |
description |
|---|---|---|---|
scheduler-policy |
Utf8 |
pull-staged |
Sets the task scheduling policy for the scheduler, possible values: pull-staged, push-staged. |
event-loop-buffer-size |
UInt32 |
10000 |
Sets the event loop buffer size. for a system of high throughput, a larger value like 1000000 is recommended. |
task-distribution |
Utf8 |
bias |
Sets the task distribution policy for the scheduler, possible values: bias, round-robin |
finished-job-data-clean-up-interval-seconds |
UInt64 |
300 |
Sets the delayed interval for cleaning up finished job data, mainly the shuffle data, 0 means the cleaning up is disabled. |
finished-job-state-clean-up-interval-seconds |
UInt64 |
3600 |
Sets the delayed interval for cleaning up finished job state stored in the backend, 0 means the cleaning up is disabled. |