Upgrade Guides#

DataFusion 52.0.0#

Changes to DFSchema API#

To permit more efficient planning, several methods on DFSchema have been changed to return references to the underlying [&FieldRef] rather than [&Field]. This allows planners to more cheaply copy the references via Arc::clone rather than cloning the entire Field structure.

You may need to change code to use Arc::clone instead of .as_ref().clone() directly on the Field. For example:

- let field = df_schema.field("my_column").as_ref().clone();
+ let field = Arc::clone(df_schema.field("my_column"));

ListingTableProvider now caches LIST commands#

In prior versions, ListingTableProvider would issue LIST commands to the underlying object store each time it needed to list files for a query. To improve performance, ListingTableProvider now caches the results of LIST commands for the lifetime of the ListingTableProvider instance or until a cache entry expires.

Note that by default the cache has no expiration time, so if files are added or removed from the underlying object store, the ListingTableProvider will not see those changes until the ListingTableProvider instance is dropped and recreated.

You can configure the maximum cache size and cache entry expiration time via configuration options:

  • datafusion.runtime.list_files_cache_limit - Limits the size of the cache in bytes

  • datafusion.runtime.list_files_cache_ttl - Limits the TTL (time-to-live) of an entry in seconds

Detailed configuration information can be found in the DataFusion Runtime Configuration user’s guide.

Caching can be disabled by setting the limit to 0:

SET datafusion.runtime.list_files_cache_limit TO "0K";

Note that the internal API has changed to use a trait ListFilesCache instead of a type alias.

newlines_in_values moved from FileScanConfig to CsvOptions#

The CSV-specific newlines_in_values configuration option has been moved from FileScanConfig to CsvOptions, as it only applies to CSV file parsing.

Who is affected:

  • Users who set newlines_in_values via FileScanConfigBuilder::with_newlines_in_values()

Migration guide:

Set newlines_in_values in CsvOptions instead of on FileScanConfigBuilder:

Before:

let source = Arc::new(CsvSource::new(file_schema.clone()));
let config = FileScanConfigBuilder::new(object_store_url, source)
    .with_newlines_in_values(true)
    .build();

After:

let options = CsvOptions {
    newlines_in_values: Some(true),
    ..Default::default()
};
let source = Arc::new(CsvSource::new(file_schema.clone())
    .with_csv_options(options));
let config = FileScanConfigBuilder::new(object_store_url, source)
    .build();

Removal of pyarrow feature#

The pyarrow feature flag has been removed. This feature has been migrated to the datafusion-python repository since version 44.0.0.

Refactoring of FileSource constructors and FileScanConfigBuilder to accept schemas upfront#

The way schemas are passed to file sources and scan configurations has been significantly refactored. File sources now require the schema (including partition columns) to be provided at construction time, and FileScanConfigBuilder no longer takes a separate schema parameter.

Who is affected:

  • Users who create FileScanConfig or file sources (ParquetSource, CsvSource, JsonSource, AvroSource) directly

  • Users who implement custom FileFormat implementations

Key changes:

  1. FileSource constructors now require TableSchema: All built-in file sources now take the schema in their constructor:

    - let source = ParquetSource::default();
    + let source = ParquetSource::new(table_schema);
    
  2. FileScanConfigBuilder no longer takes schema as a parameter: The schema is now passed via the FileSource:

    - FileScanConfigBuilder::new(url, schema, source)
    + FileScanConfigBuilder::new(url, source)
    
  3. Partition columns are now part of TableSchema: The with_table_partition_cols() method has been removed from FileScanConfigBuilder. Partition columns are now passed as part of the TableSchema to the FileSource constructor:

    + let table_schema = TableSchema::new(
    +     file_schema,
    +     vec![Arc::new(Field::new("date", DataType::Utf8, false))],
    + );
    + let source = ParquetSource::new(table_schema);
      let config = FileScanConfigBuilder::new(url, source)
    -     .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
          .with_file(partitioned_file)
          .build();
    
  4. FileFormat::file_source() now takes TableSchema parameter: Custom FileFormat implementations must be updated:

    impl FileFormat for MyFileFormat {
    -   fn file_source(&self) -> Arc<dyn FileSource> {
    +   fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
    -       Arc::new(MyFileSource::default())
    +       Arc::new(MyFileSource::new(table_schema))
        }
    }
    

Migration examples:

For Parquet files:

- let source = Arc::new(ParquetSource::default());
- let config = FileScanConfigBuilder::new(url, schema, source)
+ let table_schema = TableSchema::new(schema, vec![]);
+ let source = Arc::new(ParquetSource::new(table_schema));
+ let config = FileScanConfigBuilder::new(url, source)
      .with_file(partitioned_file)
      .build();

For CSV files with partition columns:

- let source = Arc::new(CsvSource::new(true, b',', b'"'));
- let config = FileScanConfigBuilder::new(url, file_schema, source)
-     .with_table_partition_cols(vec![Field::new("year", DataType::Int32, false)])
+ let options = CsvOptions {
+     has_header: Some(true),
+     delimiter: b',',
+     quote: b'"',
+     ..Default::default()
+ };
+ let table_schema = TableSchema::new(
+     file_schema,
+     vec![Arc::new(Field::new("year", DataType::Int32, false))],
+ );
+ let source = Arc::new(CsvSource::new(table_schema).with_csv_options(options));
+ let config = FileScanConfigBuilder::new(url, source)
      .build();

Adaptive filter representation in Parquet filter pushdown#

As of Arrow 57.1.0, DataFusion uses a new adaptive filter strategy when evaluating pushed down filters for Parquet files. This new strategy improves performance for certain types of queries where the results of filtering are more efficiently represented with a bitmask rather than a selection. See arrow-rs #5523 for more details.

This change only applies to the built-in Parquet data source with filter-pushdown enabled ( which is not yet the default behavior).

You can disable the new behavior by setting the datafusion.execution.parquet.force_filter_selections configuration setting to true.

> set datafusion.execution.parquet.force_filter_selections = true;

Statistics handling moved from FileSource to FileScanConfig#

Statistics are now managed directly by FileScanConfig instead of being delegated to FileSource implementations. This simplifies the FileSource trait and provides more consistent statistics handling across all file formats.

Who is affected:

  • Users who have implemented custom FileSource implementations

Breaking changes:

Two methods have been removed from the FileSource trait:

  • with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>

  • statistics(&self) -> Result<Statistics>

Migration guide:

If you have a custom FileSource implementation, you need to:

  1. Remove the with_statistics method implementation

  2. Remove the statistics method implementation

  3. Remove any internal state that was storing statistics

Before:

#[derive(Clone)]
struct MyCustomSource {
    table_schema: TableSchema,
    projected_statistics: Option<Statistics>,
    // other fields...
}

impl FileSource for MyCustomSource {
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
        Arc::new(Self {
            table_schema: self.table_schema.clone(),
            projected_statistics: Some(statistics),
            // other fields...
        })
    }

    fn statistics(&self) -> Result<Statistics> {
        Ok(self.projected_statistics.clone().unwrap_or_else(||
            Statistics::new_unknown(self.table_schema.file_schema())
        ))
    }

    // other methods...
}

After:

#[derive(Clone)]
struct MyCustomSource {
    table_schema: TableSchema,
    // projected_statistics field removed
    // other fields...
}

impl FileSource for MyCustomSource {
    // with_statistics method removed
    // statistics method removed

    // other methods...
}

Accessing statistics:

Statistics are now accessed through FileScanConfig instead of FileSource:

- let stats = config.file_source.statistics()?;
+ let stats = config.statistics();

Note that FileScanConfig::statistics() automatically marks statistics as inexact when filters are present, ensuring correctness when filters are pushed down.

Partition column handling moved out of PhysicalExprAdapter#

Partition column replacement is now a separate preprocessing step performed before expression rewriting via PhysicalExprAdapter. This change provides better separation of concerns and makes the adapter more focused on schema differences rather than partition value substitution.

Who is affected:

  • Users who have custom implementations of PhysicalExprAdapterFactory that handle partition columns

  • Users who directly use the FilePruner API

Breaking changes:

  1. FilePruner::try_new() signature changed: the partition_fields parameter has been removed since partition column handling is now done separately

  2. Partition column replacement must now be done via replace_columns_with_literals() before expressions are passed to the adapter

Migration guide:

If you have code that creates a FilePruner with partition fields:

Before:

use datafusion_pruning::FilePruner;

let pruner = FilePruner::try_new(
    predicate,
    file_schema,
    partition_fields,  // This parameter is removed
    file_stats,
)?;

After:

use datafusion_pruning::FilePruner;

// Partition fields are no longer needed
let pruner = FilePruner::try_new(
    predicate,
    file_schema,
    file_stats,
)?;

If you have custom code that relies on PhysicalExprAdapter to handle partition columns, you must now call replace_columns_with_literals() separately:

Before:

// Adapter handled partition column replacement internally
let adapted_expr = adapter.rewrite(expr)?;

After:

use datafusion_physical_expr_adapter::replace_columns_with_literals;

// Replace partition columns first
let expr_with_literals = replace_columns_with_literals(expr, &partition_values)?;
// Then apply the adapter
let adapted_expr = adapter.rewrite(expr_with_literals)?;

build_row_filter signature simplified#

The build_row_filter function in datafusion-datasource-parquet has been simplified to take a single schema parameter instead of two. The expectation is now that the filter has been adapted to the physical file schema (the arrow representation of the parquet file’s schema) before being passed to this function using a PhysicalExprAdapter for example.

Who is affected:

  • Users who call build_row_filter directly

Breaking changes:

The function signature changed from:

pub fn build_row_filter(
    expr: &Arc<dyn PhysicalExpr>,
    physical_file_schema: &SchemaRef,
    predicate_file_schema: &SchemaRef,  // removed
    metadata: &ParquetMetaData,
    reorder_predicates: bool,
    file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>>

To:

pub fn build_row_filter(
    expr: &Arc<dyn PhysicalExpr>,
    file_schema: &SchemaRef,
    metadata: &ParquetMetaData,
    reorder_predicates: bool,
    file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>>

Migration guide:

Remove the duplicate schema parameter from your call:

- build_row_filter(&predicate, &file_schema, &file_schema, metadata, reorder, metrics)
+ build_row_filter(&predicate, &file_schema, metadata, reorder, metrics)

Planner now requires explicit opt-in for WITHIN GROUP syntax#

The SQL planner now enforces the aggregate UDF contract more strictly: the WITHIN GROUP (ORDER BY ...) syntax is accepted only if the aggregate UDAF explicitly advertises support by returning true from AggregateUDFImpl::supports_within_group_clause().

Previously the planner forwarded a WITHIN GROUP clause to order-sensitive aggregates even when they did not implement ordered-set semantics, which could cause queries such as SUM(x) WITHIN GROUP (ORDER BY x) to plan successfully. This behavior was too permissive and has been changed to match PostgreSQL and the documented semantics.

Migration: If your UDAF intentionally implements ordered-set semantics and wants to accept the WITHIN GROUP SQL syntax, update your implementation to return true from supports_within_group_clause() and handle the ordering semantics in your accumulator implementation. If your UDAF is merely order-sensitive (but not an ordered-set aggregate), do not advertise supports_within_group_clause() and clients should use alternative function signatures (for example, explicit ordering as a function argument) instead.

AggregateUDFImpl::supports_null_handling_clause now defaults to false#

This method specifies whether an aggregate function allows IGNORE NULLS/RESPECT NULLS during SQL parsing, with the implication it respects these configs during computation.

Most DataFusion aggregate functions silently ignored this syntax in prior versions as they did not make use of it and it was permitted by default. We change this so only the few functions which do respect this clause (e.g. array_agg, first_value, last_value) need to implement it.

Custom user defined aggregate functions will also error if this syntax is used, unless they explicitly declare support by overriding the method.

For example, SQL parsing will now fail for queries such as this:

SELECT median(c1) IGNORE NULLS FROM table

Instead of silently succeeding.

API change for CacheAccessor trait#

The remove API no longer requires a mutable instance

FFI crate updates#

Many of the structs in the datafusion-ffi crate have been updated to allow easier conversion to the underlying trait types they represent. This simplifies some code paths, but also provides an additional improvement in cases where library code goes through a round trip via the foreign function interface.

To update your code, suppose you have a FFI_SchemaProvider called ffi_provider and you wish to use this as a SchemaProvider. In the old approach you would do something like:

let foreign_provider: ForeignSchemaProvider = ffi_provider.into();
    let foreign_provider = Arc::new(foreign_provider) as Arc<dyn SchemaProvider>;

This code should now be written as:

let foreign_provider: Arc<dyn SchemaProvider + Send> = ffi_provider.into();
    let foreign_provider = foreign_provider as Arc<dyn SchemaProvider>;

For the case of user defined functions, the updates are similar but you may need to change the way you call the creation of the ScalarUDF. Aggregate and window functions follow the same pattern.

Previously you may write:

let foreign_udf: ForeignScalarUDF = ffi_udf.try_into()?;
    let foreign_udf: ScalarUDF = foreign_udf.into();

Instead this should now be:

let foreign_udf: Arc<dyn ScalarUDFImpl> = ffi_udf.into();
    let foreign_udf = ScalarUDF::new_from_shared_impl(foreign_udf);

When creating any of the following structs, we now require the user to provide a TaskContextProvider and optionally a LogicalExtensionCodec:

  • FFI_CatalogListProvider

  • FFI_CatalogProvider

  • FFI_SchemaProvider

  • FFI_TableProvider

  • FFI_TableFunction

Each of these structs has a new() and a new_with_ffi_codec() method for instantiation. For example, when you previously would write

let table = Arc::new(MyTableProvider::new());
   let ffi_table = FFI_TableProvider::new(table, None);

Now you will need to provide a TaskContextProvider. The most common implementation of this trait is SessionContext.

let ctx = Arc::new(SessionContext::default());
   let table = Arc::new(MyTableProvider::new());
   let ffi_table = FFI_TableProvider::new(table, None, ctx, None);

The alternative function to create these structures may be more convenient if you are doing many of these operations. A FFI_LogicalExtensionCodec will store the TaskContextProvider as well.

let codec = Arc::new(DefaultLogicalExtensionCodec {});
   let ctx = Arc::new(SessionContext::default());
   let ffi_codec = FFI_LogicalExtensionCodec::new(codec, None, ctx);
   let table = Arc::new(MyTableProvider::new());
   let ffi_table = FFI_TableProvider::new_with_ffi_codec(table, None, ffi_codec);

Additional information about the usage of the TaskContextProvider can be found in the crate README.

Additionally, the FFI structure for Scalar UDF’s no longer contains a return_type call. This code was not used since the ForeignScalarUDF struct implements the return_field_from_args instead.

Projection handling moved from FileScanConfig to FileSource#

Projection handling has been moved from FileScanConfig into FileSource implementations. This enables format-specific projection pushdown (e.g., Parquet can push down struct field access, Vortex can push down computed expressions into un-decoded data).

Who is affected:

  • Users who have implemented custom FileSource implementations

  • Users who use FileScanConfigBuilder::with_projection_indices directly

Breaking changes:

  1. FileSource::with_projection replaced with try_pushdown_projection:

    The with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> method has been removed and replaced with try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result<Option<Arc<dyn FileSource>>>.

  2. FileScanConfig.projection_exprs field removed:

    Projections are now stored in the FileSource directly, not in FileScanConfig. Various public helper methods that access projection information have been removed from FileScanConfig.

  3. FileScanConfigBuilder::with_projection_indices now returns Result<Self>:

    This method can now fail if the projection pushdown fails.

  4. FileSource::create_file_opener now returns Result<Arc<dyn FileOpener>>:

    Previously returned Arc<dyn FileOpener> directly. Any FileSource implementation that may fail to create a FileOpener should now return an appropriate error.

  5. DataSource::try_swapping_with_projection signature changed:

    Parameter changed from &[ProjectionExpr] to &ProjectionExprs.

Migration guide:

If you have a custom FileSource implementation:

Before:

impl FileSource for MyCustomSource {
    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
        // Apply projection from config
        Arc::new(Self { /* ... */ })
    }

    fn create_file_opener(
        &self,
        object_store: Arc<dyn ObjectStore>,
        base_config: &FileScanConfig,
        partition: usize,
    ) -> Arc<dyn FileOpener> {
        Arc::new(MyOpener { /* ... */ })
    }
}

After:

impl FileSource for MyCustomSource {
    fn try_pushdown_projection(
        &self,
        projection: &ProjectionExprs,
    ) -> Result<Option<Arc<dyn FileSource>>> {
        // Return None if projection cannot be pushed down
        // Return Some(new_source) with projection applied if it can
        Ok(Some(Arc::new(Self {
            projection: Some(projection.clone()),
            /* ... */
        })))
    }

    fn projection(&self) -> Option<&ProjectionExprs> {
        self.projection.as_ref()
    }

    fn create_file_opener(
        &self,
        object_store: Arc<dyn ObjectStore>,
        base_config: &FileScanConfig,
        partition: usize,
    ) -> Result<Arc<dyn FileOpener>> {
        Ok(Arc::new(MyOpener { /* ... */ }))
    }
}

We recommend you look at #18627 that introduced these changes for more examples for how this was handled for the various built in file sources.

We have added SplitProjection and ProjectionOpener helpers to make it easier to handle projections in your FileSource implementations.

For file sources that can only handle simple column selections (not computed expressions), use the SplitProjection and ProjectionOpener helpers to split the projection into pushdownable and non-pushdownable parts:

use datafusion_datasource::projection::{SplitProjection, ProjectionOpener};

// In try_pushdown_projection:
let split = SplitProjection::new(projection, self.table_schema())?;
// Use split.file_projection() for what to push down to the file format
// The ProjectionOpener wrapper will handle the rest

For FileScanConfigBuilder users:

let config = FileScanConfigBuilder::new(url, source)
-   .with_projection_indices(Some(vec![0, 2, 3]))
+   .with_projection_indices(Some(vec![0, 2, 3]))?
    .build();

SchemaAdapter and SchemaAdapterFactory completely removed#

Following the deprecation announced in DataFusion 49.0.0, SchemaAdapterFactory has been fully removed from Parquet scanning. This applies to both:

The following symbols have been deprecated and will be removed in the next release:

  • SchemaAdapter trait

  • SchemaAdapterFactory trait

  • SchemaMapper trait

  • SchemaMapping struct

  • DefaultSchemaAdapterFactory struct

These types were previously used to adapt record batch schemas during file reading. This functionality has been replaced by PhysicalExprAdapterFactory, which rewrites expressions at planning time rather than transforming batches at runtime. If you were using a custom SchemaAdapterFactory for schema adaptation (e.g., default column values, type coercion), you should now implement PhysicalExprAdapterFactory instead. See the default column values example for how to implement a custom PhysicalExprAdapterFactory.

Migration guide:

If you implemented a custom SchemaAdapterFactory, migrate to PhysicalExprAdapterFactory. See the default column values example for a complete implementation.