datafusion.ipc#
Driver- and worker-side setup for distributing DataFusion expressions.
When a Expr is shipped to a worker process (e.g. through
multiprocessing.Pool() or a Ray actor), the worker reconstructs the
expression against a SessionContext. If the expression references
UDFs imported via the FFI capsule protocol — or any UDF the worker would
otherwise resolve from its registered functions rather than from inside
the shipped expression — install a configured SessionContext
once per worker:
from datafusion import SessionContext
from datafusion.ipc import set_worker_ctx
def init_worker():
ctx = SessionContext()
ctx.register_udaf(my_ffi_aggregate)
set_worker_ctx(ctx)
Built-in functions and Python UDFs (scalar, aggregate, window) travel inside the shipped expression itself and do not need pre-registration on the worker.
Note
Serialization model
Expressions containing Python UDFs (scalar, aggregate, window) are
serialized using cloudpickle. The callable itself travels
by value (bytecode and closure cells inlined), but any names the
callable resolves via import are captured by reference and
must be importable on the receiving worker.
The serialized payload is stamped with the sender’s Python
(major, minor) version. Loading on a different minor version
raises ValueError with an actionable message — cloudpickle
payloads are not portable across Python minor versions. See
datafusion.Expr.to_bytes() for examples of what travels by
value vs. by reference.
On the driver side, call set_sender_ctx() to control how
pickle.dumps() encodes expressions — for example, to apply
SessionContext.with_python_udf_inlining() to every pickled
expression on this thread:
>>> import pickle
>>> from datafusion import SessionContext, col, lit
>>> from datafusion.ipc import clear_sender_ctx, set_sender_ctx
>>> driver_ctx = SessionContext().with_python_udf_inlining(enabled=False)
>>> set_sender_ctx(driver_ctx)
>>> try:
... blob = pickle.dumps(col("a") + lit(1))
... finally:
... clear_sender_ctx()
>>> isinstance(blob, bytes)
True
Without a sender context the default codec is used (Python UDF
inlining on). The sender context only affects pickle / to_bytes
encoding; explicit expr.to_bytes(ctx) calls still use the supplied
ctx.
The thread-local sender context holds a strong reference to the
installed SessionContext until clear_sender_ctx() is
called or the thread exits. Long-running driver threads that install a sender
context once and never clear it will retain that session for the
lifetime of the thread; pair set_sender_ctx() with
clear_sender_ctx() (e.g. in a try/finally) when the
sender context is only needed for a bounded scope.
Functions#
|
Remove this driver's installed sender |
|
Remove this worker's installed |
|
Return this driver's installed sender |
|
Return this worker's installed |
|
Install this driver's |
|
Install this worker's |
Module Contents#
- datafusion.ipc.clear_sender_ctx() None#
Remove this driver’s installed sender
SessionContext.After clearing, pickled expressions fall back to the default codec (Python UDF inlining on).
Examples
>>> from datafusion import SessionContext >>> from datafusion.ipc import ( ... set_sender_ctx, clear_sender_ctx, get_sender_ctx, ... ) >>> set_sender_ctx(SessionContext()) >>> clear_sender_ctx() >>> get_sender_ctx() is None True
- datafusion.ipc.clear_worker_ctx() None#
Remove this worker’s installed
SessionContext.After clearing, expressions reconstructed in this worker fall back to the global
SessionContext— adequate for built-ins and Python UDFs (scalar, aggregate, window), but anything imported via the FFI capsule protocol must be registered on the global context to resolve.Examples
>>> from datafusion import SessionContext >>> from datafusion.ipc import set_worker_ctx, clear_worker_ctx, get_worker_ctx >>> set_worker_ctx(SessionContext()) >>> clear_worker_ctx() >>> get_worker_ctx() is None True
- datafusion.ipc.get_sender_ctx() datafusion.context.SessionContext | None#
Return this driver’s installed sender
SessionContext, orNone.Examples
>>> from datafusion.ipc import get_sender_ctx, clear_sender_ctx >>> clear_sender_ctx() >>> get_sender_ctx() is None True
- datafusion.ipc.get_worker_ctx() datafusion.context.SessionContext | None#
Return this worker’s installed
SessionContext, orNone.Examples
>>> from datafusion.ipc import get_worker_ctx, clear_worker_ctx >>> clear_worker_ctx() >>> get_worker_ctx() is None True
- datafusion.ipc.set_sender_ctx(ctx: datafusion.context.SessionContext) None#
Install this driver’s
SessionContextfor outbound pickles.Controls how
pickle.dumps()encodesExprinstances on this thread. The most useful application is propagating a session configured withSessionContext.with_python_udf_inlining()so the toggle takes effect through pickle (which otherwise callsExpr.to_bytes()with no context and uses the default codec).Idempotent: overwrites any previous value. Stored in a thread-local slot, so worker threads on the driver may install their own contexts. Does not affect
Expr.to_bytes()calls that pass an explicitctx— those continue to use the supplied context.Examples
>>> from datafusion import SessionContext >>> from datafusion.ipc import set_sender_ctx, get_sender_ctx >>> driver = SessionContext().with_python_udf_inlining(enabled=False) >>> set_sender_ctx(driver) >>> get_sender_ctx() is driver True
- datafusion.ipc.set_worker_ctx(ctx: datafusion.context.SessionContext) None#
Install this worker’s
SessionContextfor shipped expressions.Call once per worker — typically from a
multiprocessing.Poolinitializer or a Ray actor__init__. Idempotent: overwrites any previous value. Stored in a thread-local slot, so each thread within a worker may install its own context independently.Examples
>>> from datafusion import SessionContext >>> from datafusion.ipc import set_worker_ctx, get_worker_ctx, clear_worker_ctx >>> set_worker_ctx(SessionContext()) >>> get_worker_ctx() is not None True >>> clear_worker_ctx()