Scalar UDFs#

A scalar UDF is a Java-implemented SQL function that operates one row at a time, expressed in vectorised form: each invocation receives a batch of input columns and returns either a per-row output column of the same length (Array) or a single value broadcast to every row (Scalar).

Implement#

Implement the ScalarFunction interface. The implementation declares its own SQL name, argument fields, return field, and volatility, and supplies the per-batch evaluate body:

import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.datafusion.ColumnarValue;
import org.apache.datafusion.ScalarFunction;
import org.apache.datafusion.ScalarFunctionArgs;
import org.apache.datafusion.Volatility;

public final class AddOne implements ScalarFunction {
    private static final ArrowType INT32 = new ArrowType.Int(32, true);

    @Override public String name() { return "add_one"; }
    @Override public List<Field> argFields() { return List.of(Field.nullable("x", INT32)); }
    @Override public Field returnField() { return Field.nullable("y", INT32); }
    @Override public Volatility volatility() { return Volatility.IMMUTABLE; }

    @Override
    public ColumnarValue evaluate(BufferAllocator allocator, ScalarFunctionArgs args) {
        IntVector in = (IntVector) args.args().get(0).vector();
        IntVector out = new IntVector("add_one", allocator);
        out.allocateNew(in.getValueCount());
        for (int i = 0; i < in.getValueCount(); i++) {
            if (in.isNull(i)) out.setNull(i);
            else out.set(i, in.get(i) + 1);
        }
        out.setValueCount(in.getValueCount());
        return ColumnarValue.array(out);
    }
}

Each entry in args.args() is a ColumnarValue — either ColumnarValue.Array (a per-row vector of length args.rowCount()) or ColumnarValue.Scalar (a length-1 vector representing a single literal or folded constant). Access the underlying Arrow vector with .vector().

Allocate any new vectors — including the result — from the supplied BufferAllocator. The input vectors are read-only views; do not close them. Ownership of the returned vector transfers to the framework on return.

Declaring argument and return fields#

Each argument and the return value are described as Arrow Fields. A Field carries a name, a FieldType (Arrow type plus nullability and metadata), and a list of child fields used by nested types.

For primitive types the Field.nullable(name, arrowType) and Field.notNullable(name, arrowType) factories are the shortest form:

Field x = Field.nullable("x", new ArrowType.Int(32, true));
Field y = Field.notNullable("y", new ArrowType.Int(32, true));

Nested Arrow types — List, Struct, Map, Union — must declare their element / member / key / value fields as children, because that information does not live on the ArrowType itself. Use the new Field(name, FieldType, children) constructor:

// List<Int32>
ArrowType INT32 = new ArrowType.Int(32, true);
Field listOfInt =
    new Field(
        "vals",
        FieldType.nullable(new ArrowType.List()),
        List.of(Field.nullable("item", INT32)));

// Struct<a: Int32, b: Int32>
Field structAB =
    new Field(
        "ab",
        FieldType.nullable(new ArrowType.Struct()),
        List.of(Field.nullable("a", INT32), Field.nullable("b", INT32)));

A UDF declared with Field.nullable(...) arguments is registered with an exact nullability-bearing signature; calls whose argument types do not match exactly are rejected. The declared returnField’s nullability is preserved end-to-end: a non-nullable return field stays non-nullable in the result schema.

Returning a Scalar#

Functions that yield a single value (nullary constants like pi(), or any function that wants the framework to broadcast a result across the batch) can return ColumnarValue.scalar(...) over a length-1 vector:

public final class JavaPi implements ScalarFunction {
    private static final ArrowType FLOAT64 =
        new ArrowType.FloatingPoint(org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE);

    @Override public String name() { return "java_pi"; }
    @Override public List<Field> argFields() { return List.of(); }
    @Override public Field returnField() { return Field.nullable("p", FLOAT64); }
    @Override public Volatility volatility() { return Volatility.VOLATILE; }

    @Override
    public ColumnarValue evaluate(BufferAllocator allocator, ScalarFunctionArgs args) {
        org.apache.arrow.vector.Float8Vector out =
            new org.apache.arrow.vector.Float8Vector("pi", allocator);
        out.allocateNew(1);
        out.set(0, Math.PI);
        out.setValueCount(1);
        return ColumnarValue.scalar(out);
    }
}

The framework expands the scalar across args.rowCount() rows automatically.

Register#

Wrap the implementation in a ScalarUdf and pass it to SessionContext.registerUdf:

try (SessionContext ctx = new SessionContext()) {
    ctx.registerUdf(new ScalarUdf(new AddOne()));

    try (DataFrame df = ctx.sql("SELECT add_one(x) FROM t");
         ArrowReader r = df.collect(allocator)) {
        // ...
    }
}

ScalarUdf mirrors DataFusion’s ScalarUDF struct; ScalarFunction mirrors ScalarUDFImpl. Use Volatility.IMMUTABLE for pure functions, STABLE for functions that are deterministic within a single query, and VOLATILE for non-deterministic functions.

Errors#

If the UDF throws, the exception class and message surface in the RuntimeException raised from collect(). If the returned ColumnarValue is null, an Array result’s vector length does not equal args.rowCount(), or the result’s Arrow type differs from the declared return field, the runtime raises a RuntimeException with a descriptive message. A Scalar result whose vector is not length-1 is rejected at the ColumnarValue.scalar factory.

Threading#

DataFusion may invoke a UDF concurrently from multiple worker threads. If the implementation carries mutable state, the implementation must synchronize it.

Limitations (v1)#

  • Scalar UDFs only — no aggregates, window functions, or table functions.

  • Exact-signature only — no variadic or polymorphic argument lists.

  • No nullable-argument short-circuiting; null inputs are passed through to the UDF as nulls in the input vector.