Upgrade Guides¶
DataFusion 47.0.0
¶
This section calls out some of the major changes in the 47.0.0
release of DataFusion.
Here are some example upgrade PRs that demonstrate changes required when upgrading from DataFusion 46.0.0:
Upgrades to arrow-rs
and arrow-parquet
55.0.0 and object_store
0.12.0¶
Several APIs are changed in the underlying arrow and parquet libraries to use a
u64
instead of usize
to better support WASM (See #7371 and [#6961])
Additionally ObjectStore::list
and ObjectStore::list_with_offset
have been changed to return static
lifetimes (See #6619)
This requires converting from usize
to u64
occasionally as well as changes to ObjectStore
implementations such as
impl Objectstore {
...
// The range is now a u64 instead of usize
async fn get_range(&self, location: &Path, range: Range<u64>) -> ObjectStoreResult<Bytes> {
self.inner.get_range(location, range).await
}
...
// the lifetime is now 'static instead of `_ (meaning the captured closure can't contain references)
// (this also applies to list_with_offset)
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
self.inner.list(prefix)
}
}
The ParquetObjectReader
has been updated to no longer require the object size
(it can be fetched using a single suffix request). See #7334 for details
Pattern in DataFusion 46.0.0
:
let meta: ObjectMeta = ...;
let reader = ParquetObjectReader::new(store, meta);
Pattern in DataFusion 47.0.0
:
let meta: ObjectMeta = ...;
let reader = ParquetObjectReader::new(store, location)
.with_file_size(meta.size);
DisplayFormatType::TreeRender
¶
DataFusion now supports tree
style explain plans. Implementations of
Executionplan
must also provide a description in the
DisplayFormatType::TreeRender
format. This can be the same as the existing
DisplayFormatType::Default
.
Removed Deprecated APIs¶
Several APIs have been removed in this release. These were either deprecated
previously or were hard to use correctly such as the multiple different
ScalarUDFImpl::invoke*
APIs. See #15130, #15123, and #15027 for more
details.
FileScanConfig
–> FileScanConfigBuilder
¶
Previously, FileScanConfig::build()
directly created ExecutionPlans. In
DataFusion 47.0.0 this has been changed to use FileScanConfigBuilder
. See
#15352 for details.
Pattern in DataFusion 46.0.0
:
let plan = FileScanConfig::new(url, schema, Arc::new(file_source))
.with_statistics(stats)
...
.build()
Pattern in DataFusion 47.0.0
:
let config = FileScanConfigBuilder::new(url, schema, Arc::new(file_source))
.with_statistics(stats)
...
.build();
let scan = DataSourceExec::from_data_source(config);
DataFusion 46.0.0
¶
Use invoke_with_args
instead of invoke()
and invoke_batch()
¶
DataFusion is moving to a consistent API for invoking ScalarUDFs,
ScalarUDFImpl::invoke_with_args()
, and deprecating
ScalarUDFImpl::invoke()
, ScalarUDFImpl::invoke_batch()
, and ScalarUDFImpl::invoke_no_args()
If you see errors such as the following it means the older APIs are being used:
This feature is not implemented: Function concat does not implement invoke but called
To fix this error, use ScalarUDFImpl::invoke_with_args()
instead, as shown
below. See PR 14876 for an example.
Given existing code like this:
impl ScalarUDFImpl for SparkConcat {
...
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
if args
.iter()
.any(|arg| matches!(arg.data_type(), DataType::List(_)))
{
ArrayConcat::new().invoke_batch(args, number_rows)
} else {
ConcatFunc::new().invoke_batch(args, number_rows)
}
}
}
To
impl ScalarUDFImpl for SparkConcat {
...
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
if args
.args
.iter()
.any(|arg| matches!(arg.data_type(), DataType::List(_)))
{
ArrayConcat::new().invoke_with_args(args)
} else {
ConcatFunc::new().invoke_with_args(args)
}
}
}
ParquetExec
, AvroExec
, CsvExec
, JsonExec
deprecated¶
DataFusion 46 has a major change to how the built in DataSources are organized.
Instead of individual ExecutionPlan
s for the different file formats they now
all use DataSourceExec
and the format specific information is embodied in new
traits DataSource
and FileSource
.
Here is more information about
Change PR PR #14224
Example of an Upgrade PR in delta-rs
Cookbook: Changes to ParquetExecBuilder
¶
Code that looks for ParquetExec
like this will no longer work:
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
// Do something with ParquetExec here
}
Instead, with DataSourceExec
, the same information is now on FileScanConfig
and
ParquetSource
. The equivalent code is
if let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
if let Some(scan_config) = datasource_exec.data_source().as_any().downcast_ref::<FileScanConfig>() {
// FileGroups, and other information is on the FileScanConfig
// parquet
if let Some(parquet_source) = scan_config.file_source.as_any().downcast_ref::<ParquetSource>()
{
// Information on PruningPredicates and parquet options are here
}
}
Cookbook: Changes to ParquetExecBuilder
¶
Likewise code that builds ParquetExec
using the ParquetExecBuilder
such as
the following must be changed:
let mut exec_plan_builder = ParquetExecBuilder::new(
FileScanConfig::new(self.log_store.object_store_url(), file_schema)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols),
)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
.with_table_parquet_options(parquet_options);
// Add filter
if let Some(predicate) = logical_filter {
if config.enable_parquet_pushdown {
exec_plan_builder = exec_plan_builder.with_predicate(predicate);
}
};
New code should use FileScanConfig
to build the appropriate DataSourceExec
:
let mut file_source = ParquetSource::new(parquet_options)
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));
// Add filter
if let Some(predicate) = logical_filter {
if config.enable_parquet_pushdown {
file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
}
};
let file_scan_config = FileScanConfig::new(
self.log_store.object_store_url(),
file_schema,
Arc::new(file_source),
)
.with_statistics(stats)
.with_projection(self.projection.cloned())
.with_limit(self.limit)
.with_table_partition_cols(table_partition_cols);
// Build the actual scan like this
parquet_scan: file_scan_config.build(),
datafusion-cli
no longer automatically unescapes strings¶
datafusion-cli
previously would incorrectly unescape string literals (see ticket for more details).
To escape '
in SQL literals, use ''
:
> select 'it''s escaped';
+----------------------+
| Utf8("it's escaped") |
+----------------------+
| it's escaped |
+----------------------+
1 row(s) fetched.
To include special characters (such as newlines via \n
) you can use an E
literal string. For example
> select 'foo\nbar';
+------------------+
| Utf8("foo\nbar") |
+------------------+
| foo\nbar |
+------------------+
1 row(s) fetched.
Elapsed 0.005 seconds.
Changes to array scalar function signatures¶
DataFusion 46 has changed the way scalar array function signatures are
declared. Previously, functions needed to select from a list of predefined
signatures within the ArrayFunctionSignature
enum. Now the signatures
can be defined via a Vec
of psuedo-types, which each correspond to a
single argument. Those psuedo-types are the variants of the
ArrayFunctionArgument
enum and are as follows:
Array
: An argument of type List/LargeList/FixedSizeList. All Array arguments must be coercible to the same type.Element
: An argument that is coercible to the inner type of theArray
arguments.Index
: AnInt64
argument.
Each of the old variants can be converted to the new format as follows:
TypeSignature::ArraySignature(ArrayFunctionSignature::ArrayAndElement)
:
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
arguments: vec![ArrayFunctionArgument::Array, ArrayFunctionArgument::Element],
array_coercion: Some(ListCoercion::FixedSizedListToList),
});
TypeSignature::ArraySignature(ArrayFunctionSignature::ElementAndArray)
:
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
arguments: vec![ArrayFunctionArgument::Element, ArrayFunctionArgument::Array],
array_coercion: Some(ListCoercion::FixedSizedListToList),
});
TypeSignature::ArraySignature(ArrayFunctionSignature::ArrayAndIndex)
:
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
arguments: vec![ArrayFunctionArgument::Array, ArrayFunctionArgument::Index],
array_coercion: None,
});
TypeSignature::ArraySignature(ArrayFunctionSignature::ArrayAndElementAndOptionalIndex)
:
TypeSignature::OneOf(vec![
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
arguments: vec![ArrayFunctionArgument::Array, ArrayFunctionArgument::Element],
array_coercion: None,
}),
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
arguments: vec![
ArrayFunctionArgument::Array,
ArrayFunctionArgument::Element,
ArrayFunctionArgument::Index,
],
array_coercion: None,
}),
]);
TypeSignature::ArraySignature(ArrayFunctionSignature::Array)
:
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
arguments: vec![ArrayFunctionArgument::Array],
array_coercion: None,
});
Alternatively, you can switch to using one of the following functions which
take care of constructing the TypeSignature
for you:
Signature::array_and_element
Signature::array_and_element_and_optional_index
Signature::array_and_index
Signature::array