Using Ordering for Better Plans in Apache DataFusion

Posted on: Tue 11 March 2025 by Mustafa Akur, Andrew Lamb

Introduction

In this blog post, we explain when an ordering requirement of an operator is satisfied by its input data. This analysis is essential for order-based optimizations and is often more complex than one might initially think.

Ordering Requirement for an operator describes how the input data to that operator must be sorted for the operator to compute the correct result. It is the job of the planner to make sure that these requirements are satisfied during execution (See DataFusion EnforceSorting for an implementation of such a rule).

There are various use cases where this type of analysis can be useful such as the following examples.

Removing Unnecessary Sorts

Imagine a user wants to execute the following query:

SELECT hostname, log_line 
FROM telemetry ORDER BY time ASC limit 10

If we don't know anything about the telemetry table we need to sort it by time ASC and then retrieve the first 10 rows to get the correct result. However, if the table is already ordered by time ASC, we can simply retrieve the first 10 rows. This approach executes much faster and uses less memory compared to resorting the entire table, even when the TopK operator is used.

In order to avoid the sort the query optimizer must determine the data is already sorted. For simple queries the analysis is straightforward however it gets complicated fast. For example, what if your data is sorted by [hostname, time ASC] and your query is

SELECT hostname, log_line 
FROM telemetry WHERE hostname = 'app.example.com' ORDER BY time ASC;

In this case a sort still isn't needed, but the analysis must reason about the sortedness of the stream when it knows hostname has a single value.

Optimized Operator Implementations

As another use case, some operators can utilize the ordering information to change its underlying algorithm to execute more efficiently. Consider the following query:

SELECT COUNT(log_line) 
FROM telemetry GROUP BY hostname;

Most analytic systems, including DataFusion, by default implement such a query using a hash table keyed on values of hostname to store the counts. However, if the telemetry table is sorted by hostname, there are much more efficient algorithms for grouping on hostname values than hashing every value and storing it in memory. However, the more efficient algorithm can only be used when the input is sorted correctly. To see this in practice, check out the source for ordered variant of the Aggregation in DataFusion.

Streaming-Friendly Execution

Stream processing aims to produce results immediately as they become available ensuring minimal latency for real-time workloads. However, some operators need to consume all input data before producing any output. Consider the Sort operation: before it can start generating output, the algorithm must first process all input data. As a result, data flow halts whenever such an operator is encountered until all input is consumed. When a physical query plan contains such an operator (Sort, CrossJoin, ..) we refer to this as pipeline breaking, meaning the query cannot be executed in a streaming fashion.

For a query to be executed in a streaming fashion we need to satisfy 2 conditions:

Logically Streamable
It should be possible to generate what user wants in streaming fashion. Consider following query:

SELECT SUM(amount)  
FROM orders  

Here, the user wants to compute the sum of all amounts in the orders table. By the nature of the query this requires scanning the entire table to generate a result making it impossible to execute in a streaming fashion.

Streaming Aware Planner
Being logically streamable does not guarantee that a query will execute in a streaming fashion. SQL is a declarative language, meaning it specifies 'WHAT' user wants. It is up to the planner 'HOW' to generate the result. In most cases there are many ways to compute the correct result for a given query. The query planner is responsible for choosing "a way" (ideally the best* one) among the all alternatives to generate what user asks for. If a plan contains a pipeline-breaking operator the execution will not be streaming—even if the query is logically streamable. To generate truly streaming plans from logically streamable queries the planner must carefully analyze the existing orderings in the source tables to ensure that the final plan does not contain any pipeline-breaking operators.

Analysis

Let's start by creating an example table that we will refer throughout the post. This table models the input data of an operator for the analysis:

Example Virtual Table

amount price hostnamecurrencytime_bin time price_cloned time_cloned
12 25 app.example.com USD 08:00:00 08:01:30 25 08:01:30
12 26 app.example.com USD 08:00:00 08:11:30 26 08:11:30
15 30 app.example.com USD 08:00:00 08:41:30 30 08:41:30
15 32 app.example.com USD 08:00:00 08:55:15 32 08:55:15
15 35 app.example.com USD 09:00:00 09:10:23 35 09:10:23
20 18 app.example.com USD 09:00:00 09:20:33 18 09:20:33
20 22 app.example.com USD 09:00:00 09:40:15 22 09:40:15


How can a table have multiple orderings? At first glance it may seem counterintuitive for a table to have more than one valid ordering. However, during query execution such scenarios can arise. For example consider the following query:
SELECT time, date_bin('1 hour', time, '1970-01-01') as time_bin  
FROM table;
If we know that the table is ordered by time ASC we can infer that time_bin ASC is also a valid ordering. This is because the date_bin function is monotonic, meaning it preserves the order of its input. DataFusion leverages these functional dependencies to infer new orderings as data flows through different query operators. For details on the implementation see the source code.

By inspection, you can see this table is sorted by the amount column, but It is also sorted by time and time_bin as well as the compound (time_bin, amount) and many other variations. While this example is an extreme case, real-world data often has multiple sort orders.

A naive approach for analyzing whether the ordering requirement of an operator is satisfied by its input would be:

  • Store all the valid ordering expressions that the tables satisfies
  • Check whether the ordering requirement by the operator is among valid orderings.

This naive algorithm works and correct. However, listing all valid orderings can be quite lengthy and is of exponential complexity as the number of orderings grows. For the example table here is a (small) subset of the valid orderings:

[amount ASC]
[amount ASC, price ASC]
[amount ASC, price_cloned ASC]
[hostname ASC, amount ASC, price_cloned ASC]
[amount ASC, hostname ASC, price_cloned ASC]
[amount ASC, price_cloned ASC, hostname ASC]
.
.
.

As can be seen from the listing above storing all valid orderings is wasteful and contains significant redundancy. Here are some observations which suggest that we can do much better:

  • Storing a prefix of another valid ordering is redundant. If the table satisfies the lexicographic ordering1: [amount ASC, price ASC], it already satisfies ordering [amount ASC] trivially. Hence, once we store [amount ASC, price ASC] storing [amount ASC] is redundant.

  • Using all columns that are equal to each other in the listings is redundant. If we know the table is ordered by [amount ASC, price ASC], it is also ordered by [amount ASC, price_cloned ASC] since price and price_cloned are copy of each other. It is enough to use just one expression among the expressions that exact copy of each other.

  • Constant expressions can be inserted anywhere in a valid ordering with an arbitrary direction (e.g. ASC, DESC). Hence, if the table is ordered by [amount ASC, price ASC], it is also ordered by:
    [hostname ASC, amount ASC, price ASC],
    [hostname DESC, amount ASC, price ASC],
    [amount ASC, hostname ASC, price ASC],
    .
    .

This is clearly redundant. For this reason, it is better to avoid explicitly encoding constant expressions in valid sort orders.

In summary,

  • We should store only the longest lexicographic ordering (shouldn't use any prefix of it)
  • Using expressions that are exact copies of each other is redundant.
  • Ordering expressions shouldn't contain any constant expression.

Key Concepts for Analyzing Orderings

To solve the shortcomings above DataFusion needs to track of following properties for the table:

  • Constant Expressions
  • Equivalent Expression Groups (will be explained shortly)
  • Succinct Valid Orderings (will be explained shortly)
Note: These properties are implemented in the EquivalenceProperties structure in DataFusion, please see the source for more details

These properties allow us to analyze whether the ordering requirement is satisfied by the data already.

1. Constant Expressions

Constant expressions are those where each row in the expression has the same value across all rows. Although constant expressions may seem odd in a table they can arise after operations like Filter or Join occur.

For instance in the example table:

  • Columns hostname and currency are constant because every row in the table has the same value ('app.example.com' for hostname, and 'USD' for currency) for these columns.
Note: Constant expressions can arise during query execution. For example, in following query:
SELECT hostname FROM logs
WHERE hostname='app.example.com'
after filtering is done, for subsequent operators the hostname column will be constant.

2. Equivalent Expression Groups

Equivalent expression groups are expressions that always hold the same value across rows. These expressions can be thought of as clones of each other and may arise from operations like Filter, Join, or Projection.

In the example table, the expressions price and price_cloned form one equivalence group, and time and time_cloned form another equivalence group.

Note: Equivalent expression groups can arise during the query execution. For example, in the following query:
SELECT time, time as time_cloned FROM logs
after the projection is done, for subsequent operators time and time_cloned will form an equivalence group. As another example, in the following query:
SELECT employees.id, employees.name, departments.department_name FROM employees JOIN departments ON employees.department_id = departments.id;
after joining, employees.department_id and departments.id will form an equivalence group.

3. Succinct Encoding of Valid Orderings

Valid orderings are the orderings that the table already satisfies. However, naively listing them requires exponential space as the number of columns grows as discussed before. Instead, we list all valid orderings after following constraints are applied:

  • Do not use any constant expressions in the valid ordering construction
  • Use only one entry (by convention the first entry) in the equivalent expression group.
  • Lexicographic ordering shouldn't contain any leading ordering2except the first position 3.
  • Do not use any prefix of a valid lexicographic ordering4.

After applying the first and second constraint, the example table simplifies to

amount pricetime_bin time
12 2508:00:00 08:01:30
12 2608:00:00 08:11:30
15 3008:00:00 08:41:30
15 3208:00:00 08:55:15
15 3509:00:00 09:10:23
20 1809:00:00 09:20:33
20 2209:00:00 09:40:15


Following third and fourth constraints for the simplified table, the succinct valid orderings are:
[amount ASC, price ASC],
[time_bin ASC],
[time ASC]

How can DataFusion find orderings?

DataFusion's CREATE EXTERNAL TABLE has a WITH ORDER clause (see docs) to specify the known orderings of the table during table creation. For example the following query:

CREATE EXTERNAL TABLE source (
    amount INT NOT NULL,
    price DOUBLE NOT NULL,
    time TIMESTAMP NOT NULL,
    ...
)
STORED AS CSV
WITH ORDER (time ASC)
WITH ORDER (amount ASC, price ASC)
LOCATION '/path/to/FILE_NAME.csv'
OPTIONS ('has_header' 'true');
communicates that source table has the orderings: [time ASC] and [amount ASC, price ASC].
When orderings are communicated from the source, DataFusion tracks the orderings through each operator while optimizing the plan.
  • add new orderings (such as when "date_bin" function is applied to the "time" column)
  • Remove orderings, if operation doesn't preserve the ordering of the data at its input
  • Update equivalent groups
  • Update constant expressions
Figure 1 shows an example how DataFusion generates an efficient plan for the query:

SELECT 
  row_number() OVER (ORDER BY time) as rn,
  time
FROM events
ORDER BY rn, time
using the orderings of the query intermediates.

Window Query Datafusion Optimization
Figure 1: DataFusion analyzes orderings of the sources and query intermediates to generate efficient plans

Table Properties

In summary, for the example table, the following properties correctly describe the sort properties:

  • Constant Expressions = hostname, currency
  • Equivalent Expression Groups = [price, price_cloned], [time, time_cloned]
  • Valid Orderings = [amount ASC, price ASC], [time_bin ASC], [time ASC]

Algorithm for Analyzing Ordering Requirements

After deriving these properties for the data, following algorithm can be used to check whether an ordering requirement is satisfied by the table:

  1. Prune constant expressions: Remove any constant expressions from the ordering requirement.
  2. Normalize the requirement: Replace each expression in the ordering requirement with the first entry from its equivalence group.
  3. De-duplicate expressions: If an expression appears more than once, remove duplicates, keeping only the first occurrence.
  4. Match leading orderings: Check whether the leading ordering requirement5 matches the leading valid orderings6 of table. If so:
    • Remove the leading ordering requirement from the ordering requirement
    • Remove the matching leading valid ordering from the valid orderings of table.
  5. Iterate through the remaining expressions: Go back to step 4 until ordering requirement is empty or leading ordering requirement is not found among the leading valid orderings of table.

If, at the end of the procedure above, the ordering requirement is an empty list, we can conclude that the requirement is satisfied by the table.

Example Walkthrough

Let's say the user provided a query such as the following:

SELECT * FROM table
ORDER BY hostname DESC, amount ASC, time_bin ASC, price_cloned ASC, time ASC, currency ASC, price DESC;

And the input has the same properties explained above

  • Constant Expressions = hostname, currency
  • Equivalent Expressions Groups = [price, price_cloned], [time, time_cloned]
  • Succinct Valid Orderings = [amount ASC, price ASC], [time_bin ASC], [time ASC]

In order to remove a sort the optimizer must check if the ordering requirement [hostname DESC, amount ASC, time_bin ASC, price_cloned ASC, time ASC, currency ASC, price DESC] is satisfied by the properties.

Algorithm Steps

  1. Prune constant expressions:
    Remove hostname and currency from the requirement. The requirement becomes:
    [amount ASC, time_bin ASC, price_cloned ASC, time ASC, price DESC].

  2. Normalize using equivalent groups:
    Replace price_cloned with price and time_cloned with time. The requirement becomes:
    [amount ASC, time_bin ASC, price ASC, time ASC, price DESC].

  3. De-duplicate expressions:
    Since price appears twice, we simplify the requirement to:
    [amount ASC, time_bin ASC, price ASC, time ASC] (keeping the first occurrence from the left side).

  4. Match leading orderings:
    Check if leading ordering requirement amount ASC is among the leading valid orderings: amount ASC, time_bin ASC, time ASC. Since this is the case, we remove amount ASC from both the ordering requirement and the valid orderings of the table.

  5. Iterate through the remaining expressions: Now, the problem is converted from
    "whether the requirement: [amount ASC, time_bin ASC, price ASC, time ASC] is satisfied by valid orderings: [amount ASC, price ASC], [time_bin ASC], [time ASC]"
    into
    "whether the requirement: [time_bin ASC, price ASC, time ASC] is satisfied by valid orderings: [price ASC], [time_bin ASC], [time ASC]"
    We go back to step 4 until the ordering requirement list is exhausted or its length no longer decreases.

At the end of stages above we end up with an empty ordering requirement list. Given this, we can conclude that the table satisfies the ordering requirement and thus no sort is required.

Conclusion

In this post, we described the conditions under which an ordering requirement is satisfied based on the properties of a table. We introduced key concepts such as constant expressions, equivalence groups, and valid orderings, and used them to determine whether a given ordering requirement are satisfied by an input table.

This analysis plays a crucial role in:

  • Choosing more efficient algorithm variants
  • Generating streaming-friendly plans

The DataFusion query engine employs this analysis (and many more) during its planning stage to ensure correct and efficient query execution. We welcome you to come and join the project.

Appendix

[1]Lexicographic order is a way of ordering sequences (like strings, list of expressions) based on the order of their components, similar to how words are ordered in a dictionary. It compares each element of the sequences one by one, from left to right.

[2]Leading ordering is the first ordering in a lexicographic ordering list. As an example, for the ordering: [amount ASC, price ASC], leading ordering will be: amount ASC.

[3]This means that, if we know that [amount ASC] and [time ASC] are both valid orderings for the table. We shouldn't enlist [amount ASC, time ASC] or [time ASC, amount ASC] as valid orderings. These orderings can be deduced if we know that table satisfies the ordering [amount ASC] and [time ASC].

[4]This means that, if ordering [amount ASC, price ASC] is a valid ordering for the table. We shouldn't enlist [amount ASC] as valid ordering. Validity of it can be deduced from the ordering: [amount ASC, price ASC]

[5]Leading ordering requirement is the first ordering requirement in the list of lexicographic ordering requirement expression. As an example for the requirement: [amount ASC, time_bin ASC, prices ASC, time ASC], leading ordering requirement is: amount ASC.

[6]Leading valid orderings are the first ordering for each valid ordering list in the table. As an example, for the valid orderings: [amount ASC, prices ASC], [time_bin ASC], [time ASC], leading valid orderings will be: amount ASC, time_bin ASC, time ASC.

*Best depends on the use case, DataFusion has many various flags to communicate what user thinks the best plan is (e.g. streamable, fastest, lowest memory, etc.). See configurations for detail.

Copyright 2025, The Apache Software Foundation, Licensed under the Apache License, Version 2.0.
Apache® and the Apache feather logo are trademarks of The Apache Software Foundation.