Ballista Python Bindings¶
Ballista provides Python bindings, allowing SQL and DataFrame queries to be executed from Python. Like PySpark, you build a plan through SQL or a DataFrame API against Parquet, CSV, JSON, and other file formats, run it in a distributed environment, and collect the results back in Python.
Connecting to a Cluster¶
BallistaSessionContext is the entry point for both remote and in-process clusters.
Remote cluster — connect to an already-running scheduler:
from ballista import BallistaSessionContext
ctx = BallistaSessionContext("df://localhost:50050")
In-process cluster — start a scheduler and executor in the current Python process. Useful for development, testing, and notebooks:
from ballista import BallistaSessionContext, setup_test_cluster
host, port = setup_test_cluster()
ctx = BallistaSessionContext(f"df://{host}:{port}")
Configuration¶
Target Partitions¶
Set datafusion.execution.target_partitions to match your cluster capacity
(executors × concurrent_tasks_per_executor) by passing cluster_config to the
constructor. The default inherits from DataFusion and is based on the client’s CPU
count, which is far too low for distributed execution:
from ballista import BallistaSessionContext
executors = 4
concurrent_tasks = 8
target_partitions = executors * concurrent_tasks
ctx = BallistaSessionContext(
"df://localhost:50050",
cluster_config={
"datafusion.execution.target_partitions": str(target_partitions),
},
)
cluster_config is propagated to the scheduler-side session for distributed planning
and execution, and is also applied to the local context so settings consulted during
table registration (e.g. datafusion.execution.listing_table_factory_infer_partitions)
take effect before the plan is shipped.
This controls how many parallel tasks the scheduler creates per stage. Setting it too low leaves executor capacity idle; setting it too high creates unnecessary scheduling overhead.
SQL¶
Registering Tables¶
Before running SQL queries, register tables with the context using a register_*
method or a CREATE EXTERNAL TABLE statement:
ctx.register_parquet("trips", "/mnt/bigdata/nyctaxi")
ctx.sql("CREATE EXTERNAL TABLE trips STORED AS PARQUET LOCATION '/mnt/bigdata/nyctaxi'")
Executing Queries¶
The sql method returns a DataFrame. The query runs when you call an action like
show or collect:
df = ctx.sql("SELECT count(*) FROM trips")
Showing Query Results¶
df.show()
+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 9071244 |
+-----------------+
Collecting Query Results¶
collect executes the query and returns the results as
PyArrow record batches:
df.collect()
[pyarrow.RecordBatch
COUNT(UInt8(1)): int64]
Viewing Query Plans¶
explain shows the logical and physical plans for a query:
df.explain()
+---------------+-------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | TableScan: trips projection=[VendorID] |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | ProjectionExec: expr=[9071244 as COUNT(UInt8(1))] |
| | EmptyExec: produce_one_row=true |
| | |
+---------------+-------------------------------------------------------------+
DataFrame API¶
You can create a DataFrame from in-memory data using PyArrow record batches:
from ballista import BallistaSessionContext, setup_test_cluster
from datafusion import col
import pyarrow
host, port = setup_test_cluster()
ctx = BallistaSessionContext(f"df://{host}:{port}")
batch = pyarrow.RecordBatch.from_arrays(
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])
df = df.select(
col("a") + col("b"),
col("a") - col("b"),
)
result = df.collect()[0]
assert result.column(0) == pyarrow.array([5, 7, 9])
assert result.column(1) == pyarrow.array([-3, -3, -3])
User Defined Functions¶
The underlying DataFusion query engine supports Python UDFs but this has not yet been implemented in Ballista. It is planned for a future release. See #173 for status.