Git Product home page Git Product logo

Comments (9)

twitu avatar twitu commented on June 19, 2024 1

Thanks for sharing and helping resolve this issue so quickly 😁

from datafusion.

alamb avatar alamb commented on June 19, 2024

Thank you for the report and the reproducer ❤️

read row groups in order they were written

This is not my expectation.

DataFusion reads row groups in parallel, potentially out of order, with multiple threads as an optimization. To preserve the order of the data you can either set the configuration datafusion.optimizer.repartition_file_scans to false or else communicate the order of the data in the files using the CREATE EXTERNAL TABLE .. WITH ORDER clause and then explicitly ask for that order in your query.

read the same values for the same row group even when the file increases in size
read the same values as the python pyarrow parquet reader

Yes I agree these are also my expectation

Maybe you can try setting datafusion.optimizer.repartition_file_scans to false and see if that makes the data consistent

from datafusion.

twitu avatar twitu commented on June 19, 2024

Setting datafusion.optimizer.repartition_file_scans to false like this fixes things. ✔️

    let session_cfg =
        SessionConfig::new().set_str("datafusion.optimizer.repartition_file_scans", "false");
    let session_ctx = SessionContext::new_with_config(session_cfg);

However, it's unclear how it interacts with other options and affects memory and performance. So here's what I have -

It is a given that the data will be sorted based on timestamp like this

    let parquet_options = ParquetReadOptions::<'_> {
        skip_metadata: Some(false),
        file_sort_order: vec![vec![Expr::Sort(Sort {
            expr: Box::new(col("ts_init")),
            asc: true,
            nulls_first: true,
        })]],
        ..Default::default()
    };

Then there are two approaches to get row groups/data in order -

  • Using an order by clause in the query session_ctx.sql("SELECT * FROM data ORDER BY ts_init"). From our previous discussion, doing an order by on an already sorted column does not incur an additional overhead.

  • Setting datafusion.optimizer.repartition_file_scans to false ensures that the data is read in sequential order of row groups.

It's not clear to me how each option affects the performance and memory usage. Do you have any guidance around it?

from datafusion.

alamb avatar alamb commented on June 19, 2024

Setting datafusion.optimizer.repartition_file_scans to false like this fixes things. ✔️

That is great news

Using an order by clause in the query session_ctx.sql("SELECT * FROM data ORDER BY ts_init"). From our #7675 (comment), doing an order by on an already sorted column does not incur an additional overhead.

This is the approach I would recommend (we do it in InfluxDB 3.0).

You can verify that there are no additional sorts, etc in the plan by using EXPLAIN to look at the query plan

If you use this approach, you should also be able to avoid setting datafusion.optimizer.repartition_file_scans to false as the optimizer will take care of it automatically

from datafusion.

twitu avatar twitu commented on June 19, 2024

So I tried out some experiments with different combinations of using ORDER BY and setting the repartition_file_scans configuration value. And the results are a bit un-intuitive. The full results, instructions and data is hosted in this repo 1.

The script reads a file and prints the number of rows in the first row group. Ideally it should only load the first row group.

However, As you can see, even after specifying the sort order of the file the ORDER BY query still loads the whole file and does some kind of sort operation. You can see that for a 100 MB file it loads up to 900 MB. Turning on re-partitioning helps a bit in improving perf and memory footprint.

    let parquet_options = ParquetReadOptions::<'_> {
        skip_metadata: Some(false),
        file_sort_order: vec![vec![Expr::Sort(Sort {
            expr: Box::new(col("ts_init")),
            asc: true,
            nulls_first: true,
        })]],
        ..Default::default()
    };

But the best result is by removing sorting and turning of re-partitioning. I believe it only loads the one row group required. It'll be very helpful to document this interaction.

order repartition wall time (s) memory (mb) read sorted order
true true 0.84 654
true false 1.19 944
false false 0.21 45
false true 0.33 151

I'll share more results for reading the whole file.

Footnotes

  1. https://github.com/nautechsystems/nautilus_experiments/tree/efficient-query

from datafusion.

twitu avatar twitu commented on June 19, 2024

Here's the result for the script that reads the whole file and counts the total number of rows also included in the repo. 1

order repartition wall time (s) memory (mb) read sorted order
true true 1.24 654
true false 1.65 944
false false 0.55 45
false true 0.41 151

It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.

What do you think?

Footnotes

  1. https://github.com/nautechsystems/nautilus_experiments/tree/efficient-query

from datafusion.

alamb avatar alamb commented on June 19, 2024

Hi @twitu -- thanks for this

Some comments:

I took a quick peek at https://github.com/nautechsystems/nautilus_experiments/tree/efficient-query and: it looks like it reads only a single batch out https://github.com/nautechsystems/nautilus_experiments/blob/a4ceb950de3b4bbc43ec82b64ee1495d077f5116/src/bin/single_row_group.rs#L45

This means you are running the equivalent of SELECT ... LIMIT 4000 or something similar

It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.

I would expect those settings will be the lowest latency (time to first batch)

However, As you can see, even after specifying the sort order of the file the ORDER BY query still loads the whole file and does some kind of sort operation.

I would expect that it loads the first row group of each file and begins merging them together (e.g. if you did an EXPLAIN ... on your SQL you would see a SortPreservingMerge in the physical plan

@suremarc and @matthewmturner and others have been working on optimizing a similar case -- see #7490 for example. We have other items tracked #10313 (notably we have a way to avoid opening all the files if we know the data is already sorted and doesn't overlap: #10316)

cc @NGA-TRAN

from datafusion.

twitu avatar twitu commented on June 19, 2024

I also added a query explanation and here're the results.

    plan: Analyze
      Sort: data.ts_init ASC NULLS LAST
        Projection: data.bid, data.ask, data.bid_size, data.ask_size, data.ts_event, data.ts_init
          TableScan: data,
    plan: Analyze
      Projection: data.bid, data.ask, data.bid_size, data.ask_size, data.ts_event, data.ts_init
        TableScan: data,

You'll see that the queries with ORDER BY have a Sort expression in the plan. It's not clear to me why despite specifying the sort order in the configuration the plan still has a sort. I hope the optimizations you've mentioned will take this into account.

        file_sort_order: vec![vec![Expr::Sort(Sort {
            expr: Box::new(col("ts_init")),
            asc: true,
            nulls_first: true,
        })]],

You'll see in the experiments repo that I've implement binaries for both reading a single row group and reading the whole file. The queries are the same the behaviour is changed in how the resulting stream is consumed. I don't think this is equivalent to adding a LIMIT clause because for the purpose of the query I'm reading the whole file. It is only that the consumer decides to stop after reading one row group.

It seems like order=false and repartition=false seems to be the holy grail of performant, low-memory footprint streaming in sorted order.

I would expect those settings will be the lowest latency (time to first batch)

Surprisingly, these settings also give lowest latency and memory foot print when reading the full file as shown in the above table1.

If you need an additional contributor in any of the above mentioned issues, I'm happy to help 😄

Footnotes

  1. https://github.com/apache/datafusion/issues/10572#issuecomment-2148021428

from datafusion.

alamb avatar alamb commented on June 19, 2024

Hi @twitu -- I am very sorry for the delay in responding -- I have been traveling for sever

You'll see that the queries with ORDER BY have a Sort expression in the plan. It's not clear to me why despite specifying the sort order in the configuration the plan still has a sort. I hope the optimizations you've mentioned will take this into account.

One thing that might be going on is that the NULLS FIRST doesn't seem to match

In your plan the sort is putting nulls last

      Sort: data.ts_init ASC NULLS LAST

but in your code you specify NULLS first

        file_sort_order: vec![vec![Expr::Sort(Sort {
            expr: Box::new(col("ts_init")),
            asc: true,
            nulls_first: true,
        })]],

I don't think this is equivalent to adding a LIMIT clause because for the purpose of the query I'm reading the whole file. It is only that the consumer decides to stop after reading one row group.

DataFusion is a streaming engine, so if you open a parquet file and read one batch and stop then the entire file will not be opened read (the batches are basically created on demand)

There are certain "pipeline breaking" operators that do require reading the entire input, such as Sort and GroupHashAggregate which is why I think you are seeing the entire file read when your query has a sprt

If you need an additional contributor in any of the above mentioned issues, I'm happy to help 😄

We are always looking for contributors -- anything you can do to help others would be most appreciated. For example, perhaps you can add an example to datafusion-examples https://github.com/apache/datafusion/tree/main/datafusion-examples showing how to use a pre-sorted input file to avoid sorting during query (assuming that you can actually get that working)

from datafusion.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.