# Upgrade Guides ## DataFusion 52.0.0 ### Changes to DFSchema API To permit more efficient planning, several methods on `DFSchema` have been changed to return references to the underlying [`&FieldRef`] rather than [`&Field`]. This allows planners to more cheaply copy the references via `Arc::clone` rather than cloning the entire `Field` structure. You may need to change code to use `Arc::clone` instead of `.as_ref().clone()` directly on the `Field`. For example: ```diff - let field = df_schema.field("my_column").as_ref().clone(); + let field = Arc::clone(df_schema.field("my_column")); ``` ### ListingTableProvider now caches `LIST` commands In prior versions, `ListingTableProvider` would issue `LIST` commands to the underlying object store each time it needed to list files for a query. To improve performance, `ListingTableProvider` now caches the results of `LIST` commands for the lifetime of the `ListingTableProvider` instance or until a cache entry expires. Note that by default the cache has no expiration time, so if files are added or removed from the underlying object store, the `ListingTableProvider` will not see those changes until the `ListingTableProvider` instance is dropped and recreated. You can configure the maximum cache size and cache entry expiration time via configuration options: - `datafusion.runtime.list_files_cache_limit` - Limits the size of the cache in bytes - `datafusion.runtime.list_files_cache_ttl` - Limits the TTL (time-to-live) of an entry in seconds Detailed configuration information can be found in the [DataFusion Runtime Configuration](https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings) user's guide. Caching can be disabled by setting the limit to 0: ```sql SET datafusion.runtime.list_files_cache_limit TO "0K"; ``` Note that the internal API has changed to use a trait `ListFilesCache` instead of a type alias. ### `newlines_in_values` moved from `FileScanConfig` to `CsvOptions` The CSV-specific `newlines_in_values` configuration option has been moved from `FileScanConfig` to `CsvOptions`, as it only applies to CSV file parsing. **Who is affected:** - Users who set `newlines_in_values` via `FileScanConfigBuilder::with_newlines_in_values()` **Migration guide:** Set `newlines_in_values` in `CsvOptions` instead of on `FileScanConfigBuilder`: **Before:** ```rust,ignore let source = Arc::new(CsvSource::new(file_schema.clone())); let config = FileScanConfigBuilder::new(object_store_url, source) .with_newlines_in_values(true) .build(); ``` **After:** ```rust,ignore let options = CsvOptions { newlines_in_values: Some(true), ..Default::default() }; let source = Arc::new(CsvSource::new(file_schema.clone()) .with_csv_options(options)); let config = FileScanConfigBuilder::new(object_store_url, source) .build(); ``` ### Removal of `pyarrow` feature The `pyarrow` feature flag has been removed. This feature has been migrated to the `datafusion-python` repository since version `44.0.0`. ### Refactoring of `FileSource` constructors and `FileScanConfigBuilder` to accept schemas upfront The way schemas are passed to file sources and scan configurations has been significantly refactored. File sources now require the schema (including partition columns) to be provided at construction time, and `FileScanConfigBuilder` no longer takes a separate schema parameter. **Who is affected:** - Users who create `FileScanConfig` or file sources (`ParquetSource`, `CsvSource`, `JsonSource`, `AvroSource`) directly - Users who implement custom `FileFormat` implementations **Key changes:** 1. **FileSource constructors now require TableSchema**: All built-in file sources now take the schema in their constructor: ```diff - let source = ParquetSource::default(); + let source = ParquetSource::new(table_schema); ``` 2. **FileScanConfigBuilder no longer takes schema as a parameter**: The schema is now passed via the FileSource: ```diff - FileScanConfigBuilder::new(url, schema, source) + FileScanConfigBuilder::new(url, source) ``` 3. **Partition columns are now part of TableSchema**: The `with_table_partition_cols()` method has been removed from `FileScanConfigBuilder`. Partition columns are now passed as part of the `TableSchema` to the FileSource constructor: ```diff + let table_schema = TableSchema::new( + file_schema, + vec![Arc::new(Field::new("date", DataType::Utf8, false))], + ); + let source = ParquetSource::new(table_schema); let config = FileScanConfigBuilder::new(url, source) - .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]) .with_file(partitioned_file) .build(); ``` 4. **FileFormat::file_source() now takes TableSchema parameter**: Custom `FileFormat` implementations must be updated: ```diff impl FileFormat for MyFileFormat { - fn file_source(&self) -> Arc { + fn file_source(&self, table_schema: TableSchema) -> Arc { - Arc::new(MyFileSource::default()) + Arc::new(MyFileSource::new(table_schema)) } } ``` **Migration examples:** For Parquet files: ```diff - let source = Arc::new(ParquetSource::default()); - let config = FileScanConfigBuilder::new(url, schema, source) + let table_schema = TableSchema::new(schema, vec![]); + let source = Arc::new(ParquetSource::new(table_schema)); + let config = FileScanConfigBuilder::new(url, source) .with_file(partitioned_file) .build(); ``` For CSV files with partition columns: ```diff - let source = Arc::new(CsvSource::new(true, b',', b'"')); - let config = FileScanConfigBuilder::new(url, file_schema, source) - .with_table_partition_cols(vec![Field::new("year", DataType::Int32, false)]) + let options = CsvOptions { + has_header: Some(true), + delimiter: b',', + quote: b'"', + ..Default::default() + }; + let table_schema = TableSchema::new( + file_schema, + vec![Arc::new(Field::new("year", DataType::Int32, false))], + ); + let source = Arc::new(CsvSource::new(table_schema).with_csv_options(options)); + let config = FileScanConfigBuilder::new(url, source) .build(); ``` ### Adaptive filter representation in Parquet filter pushdown As of Arrow 57.1.0, DataFusion uses a new adaptive filter strategy when evaluating pushed down filters for Parquet files. This new strategy improves performance for certain types of queries where the results of filtering are more efficiently represented with a bitmask rather than a selection. See [arrow-rs #5523] for more details. This change only applies to the built-in Parquet data source with filter-pushdown enabled ( which is [not yet the default behavior]). You can disable the new behavior by setting the `datafusion.execution.parquet.force_filter_selections` [configuration setting] to true. ```sql > set datafusion.execution.parquet.force_filter_selections = true; ``` [arrow-rs #5523]: https://github.com/apache/arrow-rs/issues/5523 [configuration setting]: https://datafusion.apache.org/user-guide/configs.html [not yet the default behavior]: https://github.com/apache/datafusion/issues/3463 ### Statistics handling moved from `FileSource` to `FileScanConfig` Statistics are now managed directly by `FileScanConfig` instead of being delegated to `FileSource` implementations. This simplifies the `FileSource` trait and provides more consistent statistics handling across all file formats. **Who is affected:** - Users who have implemented custom `FileSource` implementations **Breaking changes:** Two methods have been removed from the `FileSource` trait: - `with_statistics(&self, statistics: Statistics) -> Arc` - `statistics(&self) -> Result` **Migration guide:** If you have a custom `FileSource` implementation, you need to: 1. Remove the `with_statistics` method implementation 2. Remove the `statistics` method implementation 3. Remove any internal state that was storing statistics **Before:** ```rust,ignore #[derive(Clone)] struct MyCustomSource { table_schema: TableSchema, projected_statistics: Option, // other fields... } impl FileSource for MyCustomSource { fn with_statistics(&self, statistics: Statistics) -> Arc { Arc::new(Self { table_schema: self.table_schema.clone(), projected_statistics: Some(statistics), // other fields... }) } fn statistics(&self) -> Result { Ok(self.projected_statistics.clone().unwrap_or_else(|| Statistics::new_unknown(self.table_schema.file_schema()) )) } // other methods... } ``` **After:** ```rust,ignore #[derive(Clone)] struct MyCustomSource { table_schema: TableSchema, // projected_statistics field removed // other fields... } impl FileSource for MyCustomSource { // with_statistics method removed // statistics method removed // other methods... } ``` **Accessing statistics:** Statistics are now accessed through `FileScanConfig` instead of `FileSource`: ```diff - let stats = config.file_source.statistics()?; + let stats = config.statistics(); ``` Note that `FileScanConfig::statistics()` automatically marks statistics as inexact when filters are present, ensuring correctness when filters are pushed down. ### Partition column handling moved out of `PhysicalExprAdapter` Partition column replacement is now a separate preprocessing step performed before expression rewriting via `PhysicalExprAdapter`. This change provides better separation of concerns and makes the adapter more focused on schema differences rather than partition value substitution. **Who is affected:** - Users who have custom implementations of `PhysicalExprAdapterFactory` that handle partition columns - Users who directly use the `FilePruner` API **Breaking changes:** 1. `FilePruner::try_new()` signature changed: the `partition_fields` parameter has been removed since partition column handling is now done separately 2. Partition column replacement must now be done via `replace_columns_with_literals()` before expressions are passed to the adapter **Migration guide:** If you have code that creates a `FilePruner` with partition fields: **Before:** ```rust,ignore use datafusion_pruning::FilePruner; let pruner = FilePruner::try_new( predicate, file_schema, partition_fields, // This parameter is removed file_stats, )?; ``` **After:** ```rust,ignore use datafusion_pruning::FilePruner; // Partition fields are no longer needed let pruner = FilePruner::try_new( predicate, file_schema, file_stats, )?; ``` If you have custom code that relies on `PhysicalExprAdapter` to handle partition columns, you must now call `replace_columns_with_literals()` separately: **Before:** ```rust,ignore // Adapter handled partition column replacement internally let adapted_expr = adapter.rewrite(expr)?; ``` **After:** ```rust,ignore use datafusion_physical_expr_adapter::replace_columns_with_literals; // Replace partition columns first let expr_with_literals = replace_columns_with_literals(expr, &partition_values)?; // Then apply the adapter let adapted_expr = adapter.rewrite(expr_with_literals)?; ``` ### `build_row_filter` signature simplified The `build_row_filter` function in `datafusion-datasource-parquet` has been simplified to take a single schema parameter instead of two. The expectation is now that the filter has been adapted to the physical file schema (the arrow representation of the parquet file's schema) before being passed to this function using a `PhysicalExprAdapter` for example. **Who is affected:** - Users who call `build_row_filter` directly **Breaking changes:** The function signature changed from: ```rust,ignore pub fn build_row_filter( expr: &Arc, physical_file_schema: &SchemaRef, predicate_file_schema: &SchemaRef, // removed metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, ) -> Result> ``` To: ```rust,ignore pub fn build_row_filter( expr: &Arc, file_schema: &SchemaRef, metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, ) -> Result> ``` **Migration guide:** Remove the duplicate schema parameter from your call: ```diff - build_row_filter(&predicate, &file_schema, &file_schema, metadata, reorder, metrics) + build_row_filter(&predicate, &file_schema, metadata, reorder, metrics) ``` ### Planner now requires explicit opt-in for WITHIN GROUP syntax The SQL planner now enforces the aggregate UDF contract more strictly: the `WITHIN GROUP (ORDER BY ...)` syntax is accepted only if the aggregate UDAF explicitly advertises support by returning `true` from `AggregateUDFImpl::supports_within_group_clause()`. Previously the planner forwarded a `WITHIN GROUP` clause to order-sensitive aggregates even when they did not implement ordered-set semantics, which could cause queries such as `SUM(x) WITHIN GROUP (ORDER BY x)` to plan successfully. This behavior was too permissive and has been changed to match PostgreSQL and the documented semantics. Migration: If your UDAF intentionally implements ordered-set semantics and wants to accept the `WITHIN GROUP` SQL syntax, update your implementation to return `true` from `supports_within_group_clause()` and handle the ordering semantics in your accumulator implementation. If your UDAF is merely order-sensitive (but not an ordered-set aggregate), do not advertise `supports_within_group_clause()` and clients should use alternative function signatures (for example, explicit ordering as a function argument) instead. ### `AggregateUDFImpl::supports_null_handling_clause` now defaults to `false` This method specifies whether an aggregate function allows `IGNORE NULLS`/`RESPECT NULLS` during SQL parsing, with the implication it respects these configs during computation. Most DataFusion aggregate functions silently ignored this syntax in prior versions as they did not make use of it and it was permitted by default. We change this so only the few functions which do respect this clause (e.g. `array_agg`, `first_value`, `last_value`) need to implement it. Custom user defined aggregate functions will also error if this syntax is used, unless they explicitly declare support by overriding the method. For example, SQL parsing will now fail for queries such as this: ```sql SELECT median(c1) IGNORE NULLS FROM table ``` Instead of silently succeeding. ### API change for `CacheAccessor` trait The remove API no longer requires a mutable instance ### FFI crate updates Many of the structs in the `datafusion-ffi` crate have been updated to allow easier conversion to the underlying trait types they represent. This simplifies some code paths, but also provides an additional improvement in cases where library code goes through a round trip via the foreign function interface. To update your code, suppose you have a `FFI_SchemaProvider` called `ffi_provider` and you wish to use this as a `SchemaProvider`. In the old approach you would do something like: ```rust,ignore let foreign_provider: ForeignSchemaProvider = ffi_provider.into(); let foreign_provider = Arc::new(foreign_provider) as Arc; ``` This code should now be written as: ```rust,ignore let foreign_provider: Arc = ffi_provider.into(); let foreign_provider = foreign_provider as Arc; ``` For the case of user defined functions, the updates are similar but you may need to change the way you call the creation of the `ScalarUDF`. Aggregate and window functions follow the same pattern. Previously you may write: ```rust,ignore let foreign_udf: ForeignScalarUDF = ffi_udf.try_into()?; let foreign_udf: ScalarUDF = foreign_udf.into(); ``` Instead this should now be: ```rust,ignore let foreign_udf: Arc = ffi_udf.into(); let foreign_udf = ScalarUDF::new_from_shared_impl(foreign_udf); ``` When creating any of the following structs, we now require the user to provide a `TaskContextProvider` and optionally a `LogicalExtensionCodec`: - `FFI_CatalogListProvider` - `FFI_CatalogProvider` - `FFI_SchemaProvider` - `FFI_TableProvider` - `FFI_TableFunction` Each of these structs has a `new()` and a `new_with_ffi_codec()` method for instantiation. For example, when you previously would write ```rust,ignore let table = Arc::new(MyTableProvider::new()); let ffi_table = FFI_TableProvider::new(table, None); ``` Now you will need to provide a `TaskContextProvider`. The most common implementation of this trait is `SessionContext`. ```rust,ignore let ctx = Arc::new(SessionContext::default()); let table = Arc::new(MyTableProvider::new()); let ffi_table = FFI_TableProvider::new(table, None, ctx, None); ``` The alternative function to create these structures may be more convenient if you are doing many of these operations. A `FFI_LogicalExtensionCodec` will store the `TaskContextProvider` as well. ```rust,ignore let codec = Arc::new(DefaultLogicalExtensionCodec {}); let ctx = Arc::new(SessionContext::default()); let ffi_codec = FFI_LogicalExtensionCodec::new(codec, None, ctx); let table = Arc::new(MyTableProvider::new()); let ffi_table = FFI_TableProvider::new_with_ffi_codec(table, None, ffi_codec); ``` Additional information about the usage of the `TaskContextProvider` can be found in the crate README. Additionally, the FFI structure for Scalar UDF's no longer contains a `return_type` call. This code was not used since the `ForeignScalarUDF` struct implements the `return_field_from_args` instead. ### Projection handling moved from FileScanConfig to FileSource Projection handling has been moved from `FileScanConfig` into `FileSource` implementations. This enables format-specific projection pushdown (e.g., Parquet can push down struct field access, Vortex can push down computed expressions into un-decoded data). **Who is affected:** - Users who have implemented custom `FileSource` implementations - Users who use `FileScanConfigBuilder::with_projection_indices` directly **Breaking changes:** 1. **`FileSource::with_projection` replaced with `try_pushdown_projection`:** The `with_projection(&self, config: &FileScanConfig) -> Arc` method has been removed and replaced with `try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result>>`. 2. **`FileScanConfig.projection_exprs` field removed:** Projections are now stored in the `FileSource` directly, not in `FileScanConfig`. Various public helper methods that access projection information have been removed from `FileScanConfig`. 3. **`FileScanConfigBuilder::with_projection_indices` now returns `Result`:** This method can now fail if the projection pushdown fails. 4. **`FileSource::create_file_opener` now returns `Result>`:** Previously returned `Arc` directly. Any `FileSource` implementation that may fail to create a `FileOpener` should now return an appropriate error. 5. **`DataSource::try_swapping_with_projection` signature changed:** Parameter changed from `&[ProjectionExpr]` to `&ProjectionExprs`. **Migration guide:** If you have a custom `FileSource` implementation: **Before:** ```rust,ignore impl FileSource for MyCustomSource { fn with_projection(&self, config: &FileScanConfig) -> Arc { // Apply projection from config Arc::new(Self { /* ... */ }) } fn create_file_opener( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, ) -> Arc { Arc::new(MyOpener { /* ... */ }) } } ``` **After:** ```rust,ignore impl FileSource for MyCustomSource { fn try_pushdown_projection( &self, projection: &ProjectionExprs, ) -> Result>> { // Return None if projection cannot be pushed down // Return Some(new_source) with projection applied if it can Ok(Some(Arc::new(Self { projection: Some(projection.clone()), /* ... */ }))) } fn projection(&self) -> Option<&ProjectionExprs> { self.projection.as_ref() } fn create_file_opener( &self, object_store: Arc, base_config: &FileScanConfig, partition: usize, ) -> Result> { Ok(Arc::new(MyOpener { /* ... */ })) } } ``` We recommend you look at [#18627](https://github.com/apache/datafusion/pull/18627) that introduced these changes for more examples for how this was handled for the various built in file sources. We have added [`SplitProjection`](https://docs.rs/datafusion-datasource/latest/datafusion_datasource/projection/struct.SplitProjection.html) and [`ProjectionOpener`](https://docs.rs/datafusion-datasource/latest/datafusion_datasource/projection/struct.ProjectionOpener.html) helpers to make it easier to handle projections in your `FileSource` implementations. For file sources that can only handle simple column selections (not computed expressions), use the `SplitProjection` and `ProjectionOpener` helpers to split the projection into pushdownable and non-pushdownable parts: ```rust,ignore use datafusion_datasource::projection::{SplitProjection, ProjectionOpener}; // In try_pushdown_projection: let split = SplitProjection::new(projection, self.table_schema())?; // Use split.file_projection() for what to push down to the file format // The ProjectionOpener wrapper will handle the rest ``` **For `FileScanConfigBuilder` users:** ```diff let config = FileScanConfigBuilder::new(url, source) - .with_projection_indices(Some(vec![0, 2, 3])) + .with_projection_indices(Some(vec![0, 2, 3]))? .build(); ``` ### `SchemaAdapter` and `SchemaAdapterFactory` completely removed Following the deprecation announced in [DataFusion 49.0.0](49.0.0.md#deprecating-schemaadapterfactory-and-schemaadapter), `SchemaAdapterFactory` has been fully removed from Parquet scanning. This applies to both: The following symbols have been deprecated and will be removed in the next release: - `SchemaAdapter` trait - `SchemaAdapterFactory` trait - `SchemaMapper` trait - `SchemaMapping` struct - `DefaultSchemaAdapterFactory` struct These types were previously used to adapt record batch schemas during file reading. This functionality has been replaced by `PhysicalExprAdapterFactory`, which rewrites expressions at planning time rather than transforming batches at runtime. If you were using a custom `SchemaAdapterFactory` for schema adaptation (e.g., default column values, type coercion), you should now implement `PhysicalExprAdapterFactory` instead. See the [default column values example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs) for how to implement a custom `PhysicalExprAdapterFactory`. **Migration guide:** If you implemented a custom `SchemaAdapterFactory`, migrate to `PhysicalExprAdapterFactory`. See the [default column values example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs) for a complete implementation.