Comments (9)
Thanks for sharing and helping resolve this issue so quickly 😁
from datafusion.
Thank you for the report and the reproducer ❤️
read row groups in order they were written
This is not my expectation.
DataFusion reads row groups in parallel, potentially out of order, with multiple threads as an optimization. To preserve the order of the data you can either set the configuration datafusion.optimizer.repartition_file_scans
to false
or else communicate the order of the data in the files using the CREATE EXTERNAL TABLE .. WITH ORDER
clause and then explicitly ask for that order in your query.
read the same values for the same row group even when the file increases in size
read the same values as the python pyarrow parquet reader
Yes I agree these are also my expectation
Maybe you can try setting datafusion.optimizer.repartition_file_scans
to false
and see if that makes the data consistent
from datafusion.
Setting datafusion.optimizer.repartition_file_scans
to false
like this fixes things. ✔️
let session_cfg =
SessionConfig::new().set_str("datafusion.optimizer.repartition_file_scans", "false");
let session_ctx = SessionContext::new_with_config(session_cfg);
However, it's unclear how it interacts with other options and affects memory and performance. So here's what I have -
It is a given that the data will be sorted based on timestamp like this
let parquet_options = ParquetReadOptions::<'_> {
skip_metadata: Some(false),
file_sort_order: vec![vec![Expr::Sort(Sort {
expr: Box::new(col("ts_init")),
asc: true,
nulls_first: true,
})]],
..Default::default()
};
Then there are two approaches to get row groups/data in order -
-
Using an order by clause in the query
session_ctx.sql("SELECT * FROM data ORDER BY ts_init")
. From our previous discussion, doing an order by on an already sorted column does not incur an additional overhead. -
Setting
datafusion.optimizer.repartition_file_scans
tofalse
ensures that the data is read in sequential order of row groups.
It's not clear to me how each option affects the performance and memory usage. Do you have any guidance around it?
from datafusion.
Setting datafusion.optimizer.repartition_file_scans to false like this fixes things. ✔️
That is great news
Using an order by clause in the query session_ctx.sql("SELECT * FROM data ORDER BY ts_init"). From our #7675 (comment), doing an order by on an already sorted column does not incur an additional overhead.
This is the approach I would recommend (we do it in InfluxDB 3.0).
You can verify that there are no additional sorts, etc in the plan by using EXPLAIN
to look at the query plan
If you use this approach, you should also be able to avoid setting datafusion.optimizer.repartition_file_scans
to false
as the optimizer will take care of it automatically
from datafusion.
So I tried out some experiments with different combinations of using ORDER BY
and setting the repartition_file_scans
configuration value. And the results are a bit un-intuitive. The full results, instructions and data is hosted in this repo 1.
The script reads a file and prints the number of rows in the first row group. Ideally it should only load the first row group.
However, As you can see, even after specifying the sort order of the file the ORDER BY
query still loads the whole file and does some kind of sort operation. You can see that for a 100 MB file it loads up to 900 MB. Turning on re-partitioning helps a bit in improving perf and memory footprint.
let parquet_options = ParquetReadOptions::<'_> {
skip_metadata: Some(false),
file_sort_order: vec![vec![Expr::Sort(Sort {
expr: Box::new(col("ts_init")),
asc: true,
nulls_first: true,
})]],
..Default::default()
};
But the best result is by removing sorting and turning of re-partitioning. I believe it only loads the one row group required. It'll be very helpful to document this interaction.
order | repartition | wall time (s) | memory (mb) | read sorted order |
---|---|---|---|---|
true | true | 0.84 | 654 | ✅ |
true | false | 1.19 | 944 | ✅ |
false | false | 0.21 | 45 | ✅ |
false | true | 0.33 | 151 | ❌ |
I'll share more results for reading the whole file.
Footnotes
from datafusion.
Here's the result for the script that reads the whole file and counts the total number of rows also included in the repo. 1
order | repartition | wall time (s) | memory (mb) | read sorted order |
---|---|---|---|---|
true | true | 1.24 | 654 | ✅ |
true | false | 1.65 | 944 | ✅ |
false | false | 0.55 | 45 | ✅ |
false | true | 0.41 | 151 | ❌ |
It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.
What do you think?
Footnotes
from datafusion.
Hi @twitu -- thanks for this
Some comments:
I took a quick peek at https://github.com/nautechsystems/nautilus_experiments/tree/efficient-query and: it looks like it reads only a single batch out https://github.com/nautechsystems/nautilus_experiments/blob/a4ceb950de3b4bbc43ec82b64ee1495d077f5116/src/bin/single_row_group.rs#L45
This means you are running the equivalent of SELECT ... LIMIT 4000
or something similar
It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.
I would expect those settings will be the lowest latency (time to first batch)
However, As you can see, even after specifying the sort order of the file the ORDER BY query still loads the whole file and does some kind of sort operation.
I would expect that it loads the first row group of each file and begins merging them together (e.g. if you did an EXPLAIN ...
on your SQL you would see a SortPreservingMerge
in the physical plan
@suremarc and @matthewmturner and others have been working on optimizing a similar case -- see #7490 for example. We have other items tracked #10313 (notably we have a way to avoid opening all the files if we know the data is already sorted and doesn't overlap: #10316)
cc @NGA-TRAN
from datafusion.
I also added a query explanation and here're the results.
plan: Analyze
Sort: data.ts_init ASC NULLS LAST
Projection: data.bid, data.ask, data.bid_size, data.ask_size, data.ts_event, data.ts_init
TableScan: data,
plan: Analyze
Projection: data.bid, data.ask, data.bid_size, data.ask_size, data.ts_event, data.ts_init
TableScan: data,
You'll see that the queries with ORDER BY
have a Sort expression in the plan. It's not clear to me why despite specifying the sort order in the configuration the plan still has a sort. I hope the optimizations you've mentioned will take this into account.
file_sort_order: vec![vec![Expr::Sort(Sort {
expr: Box::new(col("ts_init")),
asc: true,
nulls_first: true,
})]],
You'll see in the experiments repo that I've implement binaries for both reading a single row group and reading the whole file. The queries are the same the behaviour is changed in how the resulting stream is consumed. I don't think this is equivalent to adding a LIMIT
clause because for the purpose of the query I'm reading the whole file. It is only that the consumer decides to stop after reading one row group.
It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.
I would expect those settings will be the lowest latency (time to first batch)
Surprisingly, these settings also give lowest latency and memory foot print when reading the full file as shown in the above table1.
If you need an additional contributor in any of the above mentioned issues, I'm happy to help 😄
Footnotes
from datafusion.
Hi @twitu -- I am very sorry for the delay in responding -- I have been traveling for sever
You'll see that the queries with ORDER BY have a Sort expression in the plan. It's not clear to me why despite specifying the sort order in the configuration the plan still has a sort. I hope the optimizations you've mentioned will take this into account.
One thing that might be going on is that the NULLS FIRST doesn't seem to match
In your plan the sort is putting nulls last
Sort: data.ts_init ASC NULLS LAST
but in your code you specify NULLS first
file_sort_order: vec![vec![Expr::Sort(Sort {
expr: Box::new(col("ts_init")),
asc: true,
nulls_first: true,
})]],
I don't think this is equivalent to adding a LIMIT clause because for the purpose of the query I'm reading the whole file. It is only that the consumer decides to stop after reading one row group.
DataFusion is a streaming engine, so if you open a parquet file and read one batch and stop then the entire file will not be opened read (the batches are basically created on demand)
There are certain "pipeline breaking" operators that do require reading the entire input, such as Sort
and GroupHashAggregate
which is why I think you are seeing the entire file read when your query has a sprt
If you need an additional contributor in any of the above mentioned issues, I'm happy to help 😄
We are always looking for contributors -- anything you can do to help others would be most appreciated. For example, perhaps you can add an example to datafusion-examples
https://github.com/apache/datafusion/tree/main/datafusion-examples showing how to use a pre-sorted input file to avoid sorting during query (assuming that you can actually get that working)
from datafusion.
Related Issues (20)
- Do we need to escape search string as it's used in regexp? Wondering what's the result of `contains("abcdefg", ".*")` HOT 5
- Add a benchmark for extracting parquet data page statistics HOT 1
- Push down filters below `Unnest` in sub queries HOT 7
- Convert `Average` to UDAF HOT 1
- Convert `Min` and `Max` to UDAF HOT 3
- Convert `Hyperloglog` to UDAF HOT 3
- Support extracting Float{16, 32, 64} statistics from Parquet Data Pages
- Support extracting UInt{8, 16, 32, 64} statistics from Parquet Data Pages
- DataFusion weekly project plan (Andrew Lamb) - June 17, 2024
- Timezone `UTC` is not considered equal to `+00:00` HOT 5
- Create a branch for StringView / BinaryView development HOT 1
- Change `StatisticsConverter::row_group_counts` to return `None` for non existent columns in parquet files HOT 1
- Complete Possible Join Type Handling for EmptyRelation Propagation Rule HOT 2
- Presentation about DataFusion at Microsoft GSL (Grey Systems Lab)
- Conditionally allow to keep partition_by columns when using PARTITIONED BY HOT 1
- Inconsistent behavior in HashJoin Projections HOT 1
- regen.sh is failing in Ubuntu when working with recommended version of protoc
- Implement `DynamicTableProvider` in DataFusion Core HOT 2
- Open Variant Type for semi-structured data HOT 2
- Support filter on unnest column
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from datafusion.