Ballista Python Bindings

Ballista provides Python bindings, allowing SQL and DataFrame queries to be executed from Python. Like PySpark, you build a plan through SQL or a DataFrame API against Parquet, CSV, JSON, and other file formats, run it in a distributed environment, and collect the results back in Python.

Connecting to a Cluster

BallistaSessionContext is the entry point for both remote and in-process clusters.

Remote cluster — connect to an already-running scheduler:

from ballista import BallistaSessionContext

ctx = BallistaSessionContext("df://localhost:50050")

In-process cluster — start a scheduler and executor in the current Python process. Useful for development, testing, and notebooks:

from ballista import BallistaSessionContext, setup_test_cluster

host, port = setup_test_cluster()
ctx = BallistaSessionContext(f"df://{host}:{port}")

Configuration

Target Partitions

Set datafusion.execution.target_partitions to match your cluster capacity (executors × concurrent_tasks_per_executor) by passing cluster_config to the constructor. The default inherits from DataFusion and is based on the client’s CPU count, which is far too low for distributed execution:

from ballista import BallistaSessionContext

executors = 4
concurrent_tasks = 8
target_partitions = executors * concurrent_tasks

ctx = BallistaSessionContext(
    "df://localhost:50050",
    cluster_config={
        "datafusion.execution.target_partitions": str(target_partitions),
    },
)

cluster_config is propagated to the scheduler-side session for distributed planning and execution, and is also applied to the local context so settings consulted during table registration (e.g. datafusion.execution.listing_table_factory_infer_partitions) take effect before the plan is shipped.

This controls how many parallel tasks the scheduler creates per stage. Setting it too low leaves executor capacity idle; setting it too high creates unnecessary scheduling overhead.

SQL

Registering Tables

Before running SQL queries, register tables with the context using a register_* method or a CREATE EXTERNAL TABLE statement:

ctx.register_parquet("trips", "/mnt/bigdata/nyctaxi")
ctx.sql("CREATE EXTERNAL TABLE trips STORED AS PARQUET LOCATION '/mnt/bigdata/nyctaxi'")

Executing Queries

The sql method returns a DataFrame. The query runs when you call an action like show or collect:

df = ctx.sql("SELECT count(*) FROM trips")

Showing Query Results

df.show()
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 9071244         |
+-----------------+

Collecting Query Results

collect executes the query and returns the results as PyArrow record batches:

df.collect()
[pyarrow.RecordBatch
COUNT(UInt8(1)): int64]

Viewing Query Plans

explain shows the logical and physical plans for a query:

df.explain()
+---------------+-------------------------------------------------------------+
| plan_type     | plan                                                        |
+---------------+-------------------------------------------------------------+
| logical_plan  | Projection: #COUNT(UInt8(1))                                |
|               |   Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]         |
|               |     TableScan: trips projection=[VendorID]                  |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
|               |   ProjectionExec: expr=[9071244 as COUNT(UInt8(1))]         |
|               |     EmptyExec: produce_one_row=true                         |
|               |                                                             |
+---------------+-------------------------------------------------------------+

DataFrame API

You can create a DataFrame from in-memory data using PyArrow record batches:

from ballista import BallistaSessionContext, setup_test_cluster
from datafusion import col
import pyarrow

host, port = setup_test_cluster()
ctx = BallistaSessionContext(f"df://{host}:{port}")

batch = pyarrow.RecordBatch.from_arrays(
    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
    names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

df = df.select(
    col("a") + col("b"),
    col("a") - col("b"),
)

result = df.collect()[0]

assert result.column(0) == pyarrow.array([5, 7, 9])
assert result.column(1) == pyarrow.array([-3, -3, -3])

User Defined Functions

The underlying DataFusion query engine supports Python UDFs but this has not yet been implemented in Ballista. It is planned for a future release. See #173 for status.