datafusion.ipc#

Driver- and worker-side setup for distributing DataFusion expressions.

When a Expr is shipped to a worker process (e.g. through multiprocessing.Pool() or a Ray actor), the worker reconstructs the expression against a SessionContext. If the expression references UDFs imported via the FFI capsule protocol — or any UDF the worker would otherwise resolve from its registered functions rather than from inside the shipped expression — install a configured SessionContext once per worker:

from datafusion import SessionContext
from datafusion.ipc import set_worker_ctx

def init_worker():
    ctx = SessionContext()
    ctx.register_udaf(my_ffi_aggregate)
    set_worker_ctx(ctx)

Built-in functions and Python UDFs (scalar, aggregate, window) travel inside the shipped expression itself and do not need pre-registration on the worker.

Note

Serialization model

Expressions containing Python UDFs (scalar, aggregate, window) are serialized using cloudpickle. The callable itself travels by value (bytecode and closure cells inlined), but any names the callable resolves via import are captured by reference and must be importable on the receiving worker.

The serialized payload is stamped with the sender’s Python (major, minor) version. Loading on a different minor version raises ValueError with an actionable message — cloudpickle payloads are not portable across Python minor versions. See datafusion.Expr.to_bytes() for examples of what travels by value vs. by reference.

On the driver side, call set_sender_ctx() to control how pickle.dumps() encodes expressions — for example, to apply SessionContext.with_python_udf_inlining() to every pickled expression on this thread:

>>> import pickle
>>> from datafusion import SessionContext, col, lit
>>> from datafusion.ipc import clear_sender_ctx, set_sender_ctx
>>> driver_ctx = SessionContext().with_python_udf_inlining(enabled=False)
>>> set_sender_ctx(driver_ctx)
>>> try:
...     blob = pickle.dumps(col("a") + lit(1))
... finally:
...     clear_sender_ctx()
>>> isinstance(blob, bytes)
True

Without a sender context the default codec is used (Python UDF inlining on). The sender context only affects pickle / to_bytes encoding; explicit expr.to_bytes(ctx) calls still use the supplied ctx.

The thread-local sender context holds a strong reference to the installed SessionContext until clear_sender_ctx() is called or the thread exits. Long-running driver threads that install a sender context once and never clear it will retain that session for the lifetime of the thread; pair set_sender_ctx() with clear_sender_ctx() (e.g. in a try/finally) when the sender context is only needed for a bounded scope.

Functions#

clear_sender_ctx(→ None)

Remove this driver's installed sender SessionContext.

clear_worker_ctx(→ None)

Remove this worker's installed SessionContext.

get_sender_ctx(→ datafusion.context.SessionContext | None)

Return this driver's installed sender SessionContext, or None.

get_worker_ctx(→ datafusion.context.SessionContext | None)

Return this worker's installed SessionContext, or None.

set_sender_ctx(→ None)

Install this driver's SessionContext for outbound pickles.

set_worker_ctx(→ None)

Install this worker's SessionContext for shipped expressions.

Module Contents#

datafusion.ipc.clear_sender_ctx() None#

Remove this driver’s installed sender SessionContext.

After clearing, pickled expressions fall back to the default codec (Python UDF inlining on).

Examples

>>> from datafusion import SessionContext
>>> from datafusion.ipc import (
...     set_sender_ctx, clear_sender_ctx, get_sender_ctx,
... )
>>> set_sender_ctx(SessionContext())
>>> clear_sender_ctx()
>>> get_sender_ctx() is None
True
datafusion.ipc.clear_worker_ctx() None#

Remove this worker’s installed SessionContext.

After clearing, expressions reconstructed in this worker fall back to the global SessionContext — adequate for built-ins and Python UDFs (scalar, aggregate, window), but anything imported via the FFI capsule protocol must be registered on the global context to resolve.

Examples

>>> from datafusion import SessionContext
>>> from datafusion.ipc import set_worker_ctx, clear_worker_ctx, get_worker_ctx
>>> set_worker_ctx(SessionContext())
>>> clear_worker_ctx()
>>> get_worker_ctx() is None
True
datafusion.ipc.get_sender_ctx() datafusion.context.SessionContext | None#

Return this driver’s installed sender SessionContext, or None.

Examples

>>> from datafusion.ipc import get_sender_ctx, clear_sender_ctx
>>> clear_sender_ctx()
>>> get_sender_ctx() is None
True
datafusion.ipc.get_worker_ctx() datafusion.context.SessionContext | None#

Return this worker’s installed SessionContext, or None.

Examples

>>> from datafusion.ipc import get_worker_ctx, clear_worker_ctx
>>> clear_worker_ctx()
>>> get_worker_ctx() is None
True
datafusion.ipc.set_sender_ctx(ctx: datafusion.context.SessionContext) None#

Install this driver’s SessionContext for outbound pickles.

Controls how pickle.dumps() encodes Expr instances on this thread. The most useful application is propagating a session configured with SessionContext.with_python_udf_inlining() so the toggle takes effect through pickle (which otherwise calls Expr.to_bytes() with no context and uses the default codec).

Idempotent: overwrites any previous value. Stored in a thread-local slot, so worker threads on the driver may install their own contexts. Does not affect Expr.to_bytes() calls that pass an explicit ctx — those continue to use the supplied context.

Examples

>>> from datafusion import SessionContext
>>> from datafusion.ipc import set_sender_ctx, get_sender_ctx
>>> driver = SessionContext().with_python_udf_inlining(enabled=False)
>>> set_sender_ctx(driver)
>>> get_sender_ctx() is driver
True
datafusion.ipc.set_worker_ctx(ctx: datafusion.context.SessionContext) None#

Install this worker’s SessionContext for shipped expressions.

Call once per worker — typically from a multiprocessing.Pool initializer or a Ray actor __init__. Idempotent: overwrites any previous value. Stored in a thread-local slot, so each thread within a worker may install its own context independently.

Examples

>>> from datafusion import SessionContext
>>> from datafusion.ipc import set_worker_ctx, get_worker_ctx, clear_worker_ctx
>>> set_worker_ctx(SessionContext())
>>> get_worker_ctx() is not None
True
>>> clear_worker_ctx()