Upgrade Guides

DataFusion 48.0.0

Processing Field instead of DataType for user defined functions

In order to support metadata handling and extension types, user defined functions are now switching to traits which use Field rather than a DataType and nullability. This gives a single interface to both of these parameters and additionally allows access to metadata fields, which can be used for extension types.

To upgrade structs which implement ScalarUDFImpl, if you have implemented return_type_from_args you need instead to implement return_field_from_args. If your functions do not need to handle metadata, this should be straightforward repackaging of the output data into a Field. The name you specify on the field is not important. It will be overwritten during planning. ReturnInfo has been removed, so you will need to remove all references to it.

ScalarFunctionArgs now contains a field called arg_fields. You can use this to access the metadata associated with the columnar values during invocation.

Physical Expression return Field

To support the changes to user defined functions processing metadata, the PhysicalExpr trait, which now must specify a return Field based on the input schema. To upgrade structs which implement PhysicalExpr you need to implement the return_field function. There are numerous examples in the physical-expr crate.

FileFormat::supports_filters_pushdown replaced with FileSource::try_pushdown_filters

To support more general filter pushdown, the FileFormat::supports_filters_pushdown was replaced with FileSource::try_pushdown_filters. If you implemented a custom FileFormat that uses a custom FileSource you will need to implement FileSource::try_pushdown_filters. See ParquetSource::try_pushdown_filters for an example of how to implement this.

FileFormat::supports_filters_pushdown has been removed.

ParquetExec, AvroExec, CsvExec, JsonExec Removed

ParquetExec, AvroExec, CsvExec, and JsonExec were deprecated in DataFusion 46 and are removed in DataFusion 48. This is sooner than the normal process described in the API Deprecation Guidelines because all the tests cover the new DataSourceExec rather than the older structures. As we evolve DataSource, the old structures began to show signs of “bit rotting” (not working but no one knows due to lack of test coverage).

DataFusion 47.0.0

This section calls out some of the major changes in the 47.0.0 release of DataFusion.

Here are some example upgrade PRs that demonstrate changes required when upgrading from DataFusion 46.0.0:

Upgrades to arrow-rs and arrow-parquet 55.0.0 and object_store 0.12.0

Several APIs are changed in the underlying arrow and parquet libraries to use a u64 instead of usize to better support WASM (See #7371 and [#6961])

Additionally ObjectStore::list and ObjectStore::list_with_offset have been changed to return static lifetimes (See #6619)

This requires converting from usize to u64 occasionally as well as changes to ObjectStore implementations such as

impl Objectstore {
    ...
    // The range is now a u64 instead of usize
    async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
        self.inner.get_range(location, range).await
    }
    ...
    // the lifetime is now 'static instead of `_ (meaning the captured closure can't contain references)
    // (this also applies to list_with_offset)
    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
        self.inner.list(prefix)
    }
}

The ParquetObjectReader has been updated to no longer require the object size (it can be fetched using a single suffix request). See #7334 for details

Pattern in DataFusion 46.0.0:

let meta: ObjectMeta = ...;
let reader = ParquetObjectReader::new(store, meta);

Pattern in DataFusion 47.0.0:

let meta: ObjectMeta = ...;
let reader = ParquetObjectReader::new(store, location)
  .with_file_size(meta.size);

DisplayFormatType::TreeRender

DataFusion now supports tree style explain plans. Implementations of Executionplan must also provide a description in the DisplayFormatType::TreeRender format. This can be the same as the existing DisplayFormatType::Default.

Removed Deprecated APIs

Several APIs have been removed in this release. These were either deprecated previously or were hard to use correctly such as the multiple different ScalarUDFImpl::invoke* APIs. See #15130, #15123, and #15027 for more details.

FileScanConfig –> FileScanConfigBuilder

Previously, FileScanConfig::build() directly created ExecutionPlans. In DataFusion 47.0.0 this has been changed to use FileScanConfigBuilder. See #15352 for details.

Pattern in DataFusion 46.0.0:

let plan = FileScanConfig::new(url, schema, Arc::new(file_source))
  .with_statistics(stats)
  ...
  .build()

Pattern in DataFusion 47.0.0:

let config = FileScanConfigBuilder::new(url, schema, Arc::new(file_source))
  .with_statistics(stats)
  ...
  .build();
let scan = DataSourceExec::from_data_source(config);

DataFusion 46.0.0

Use invoke_with_args instead of invoke() and invoke_batch()

DataFusion is moving to a consistent API for invoking ScalarUDFs, ScalarUDFImpl::invoke_with_args(), and deprecating ScalarUDFImpl::invoke(), ScalarUDFImpl::invoke_batch(), and ScalarUDFImpl::invoke_no_args()

If you see errors such as the following it means the older APIs are being used:

This feature is not implemented: Function concat does not implement invoke but called

To fix this error, use ScalarUDFImpl::invoke_with_args() instead, as shown below. See PR 14876 for an example.

Given existing code like this:

impl ScalarUDFImpl for SparkConcat {
...
    fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
        if args
            .iter()
            .any(|arg| matches!(arg.data_type(), DataType::List(_)))
        {
            ArrayConcat::new().invoke_batch(args, number_rows)
        } else {
            ConcatFunc::new().invoke_batch(args, number_rows)
        }
    }
}

To

impl ScalarUDFImpl for SparkConcat {
    ...
    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
        if args
            .args
            .iter()
            .any(|arg| matches!(arg.data_type(), DataType::List(_)))
        {
            ArrayConcat::new().invoke_with_args(args)
        } else {
            ConcatFunc::new().invoke_with_args(args)
        }
    }
}

ParquetExec, AvroExec, CsvExec, JsonExec deprecated

DataFusion 46 has a major change to how the built in DataSources are organized. Instead of individual ExecutionPlans for the different file formats they now all use DataSourceExec and the format specific information is embodied in new traits DataSource and FileSource.

Here is more information about

Cookbook: Changes to ParquetExecBuilder

Code that looks for ParquetExec like this will no longer work:

    if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
        // Do something with ParquetExec here
    }

Instead, with DataSourceExec, the same information is now on FileScanConfig and ParquetSource. The equivalent code is

if let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
  if let Some(scan_config) = datasource_exec.data_source().as_any().downcast_ref::<FileScanConfig>() {
    // FileGroups, and other information is on the FileScanConfig
    // parquet
    if let Some(parquet_source) = scan_config.file_source.as_any().downcast_ref::<ParquetSource>()
    {
      // Information on PruningPredicates and parquet options are here
    }
}

Cookbook: Changes to ParquetExecBuilder

Likewise code that builds ParquetExec using the ParquetExecBuilder such as the following must be changed:

let mut exec_plan_builder = ParquetExecBuilder::new(
    FileScanConfig::new(self.log_store.object_store_url(), file_schema)
        .with_projection(self.projection.cloned())
        .with_limit(self.limit)
        .with_table_partition_cols(table_partition_cols),
)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
.with_table_parquet_options(parquet_options);

// Add filter
if let Some(predicate) = logical_filter {
    if config.enable_parquet_pushdown {
        exec_plan_builder = exec_plan_builder.with_predicate(predicate);
    }
};

New code should use FileScanConfig to build the appropriate DataSourceExec:

let mut file_source = ParquetSource::new(parquet_options)
    .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));

// Add filter
if let Some(predicate) = logical_filter {
    if config.enable_parquet_pushdown {
        file_source = file_source.with_predicate(predicate);
    }
};

let file_scan_config = FileScanConfig::new(
    self.log_store.object_store_url(),
    file_schema,
    Arc::new(file_source),
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols);

// Build the actual scan like this
parquet_scan: file_scan_config.build(),

datafusion-cli no longer automatically unescapes strings

datafusion-cli previously would incorrectly unescape string literals (see ticket for more details).

To escape ' in SQL literals, use '':

> select 'it''s escaped';
+----------------------+
| Utf8("it's escaped") |
+----------------------+
| it's escaped         |
+----------------------+
1 row(s) fetched.

To include special characters (such as newlines via \n) you can use an E literal string. For example

> select 'foo\nbar';
+------------------+
| Utf8("foo\nbar") |
+------------------+
| foo\nbar         |
+------------------+
1 row(s) fetched.
Elapsed 0.005 seconds.

Changes to array scalar function signatures

DataFusion 46 has changed the way scalar array function signatures are declared. Previously, functions needed to select from a list of predefined signatures within the ArrayFunctionSignature enum. Now the signatures can be defined via a Vec of psuedo-types, which each correspond to a single argument. Those psuedo-types are the variants of the ArrayFunctionArgument enum and are as follows:

  • Array: An argument of type List/LargeList/FixedSizeList. All Array arguments must be coercible to the same type.

  • Element: An argument that is coercible to the inner type of the Array arguments.

  • Index: An Int64 argument.

Each of the old variants can be converted to the new format as follows:

TypeSignature::ArraySignature(ArrayFunctionSignature::ArrayAndElement):

TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
    arguments: vec![ArrayFunctionArgument::Array, ArrayFunctionArgument::Element],
    array_coercion: Some(ListCoercion::FixedSizedListToList),
});

TypeSignature::ArraySignature(ArrayFunctionSignature::ElementAndArray):

TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
    arguments: vec![ArrayFunctionArgument::Element, ArrayFunctionArgument::Array],
    array_coercion: Some(ListCoercion::FixedSizedListToList),
});

TypeSignature::ArraySignature(ArrayFunctionSignature::ArrayAndIndex):

TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
    arguments: vec![ArrayFunctionArgument::Array, ArrayFunctionArgument::Index],
    array_coercion: None,
});

TypeSignature::ArraySignature(ArrayFunctionSignature::ArrayAndElementAndOptionalIndex):

TypeSignature::OneOf(vec![
    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
        arguments: vec![ArrayFunctionArgument::Array, ArrayFunctionArgument::Element],
        array_coercion: None,
    }),
    TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
        arguments: vec![
            ArrayFunctionArgument::Array,
            ArrayFunctionArgument::Element,
            ArrayFunctionArgument::Index,
        ],
        array_coercion: None,
    }),
]);

TypeSignature::ArraySignature(ArrayFunctionSignature::Array):

TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
    arguments: vec![ArrayFunctionArgument::Array],
    array_coercion: None,
});

Alternatively, you can switch to using one of the following functions which take care of constructing the TypeSignature for you:

  • Signature::array_and_element

  • Signature::array_and_element_and_optional_index

  • Signature::array_and_index

  • Signature::array