Git Product home page Git Product logo

Comments (9)

liukun4515 avatar liukun4515 commented on July 18, 2024 2

cc @alamb
I will fix this

from arrow-datafusion.

liukun4515 avatar liukun4515 commented on July 18, 2024 1

I will file a pr to resolve the ser/deser issue.

from arrow-datafusion.

liukun4515 avatar liukun4515 commented on July 18, 2024

my sql is like:

select
  LO_SUPPKEY
from
  SSB_1G.LINEORDER
GROUP BY
  LO_SUPPKEY
limit 20  offset 10

The stand-alone physical plan is:
The agg exec add the limit value in the plan after the rule of LimitedDistinctAggregation, but it does not take effect when we send the physical plan to remote side.

GlobalLimitExec: skip=10, fetch=20
  CoalescePartitionsExec
    LocalLimitExec: fetch=30
      AggregateExec: mode=FinalPartitioned, gby=[LO_SUPPKEY@0 as LO_SUPPKEY], aggr=[], lim=[30]
        CoalesceBatchesExec: target_batch_size=8192
          RepartitionExec: partitioning=Hash([LO_SUPPKEY@0], 16), input_partitions=16
            RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=2
              AggregateExec: mode=Partial, gby=[LO_SUPPKEY@0 as LO_SUPPKEY], aggr=[], lim=[30]
                ProjectionExec: expr=[2@0 as LO_SUPPKEY]
                  ParquetExec: file_groups={2 groups:

from arrow-datafusion.

liukun4515 avatar liukun4515 commented on July 18, 2024

But other issue i found in the AggregateExec, when we push the limit to the agg exec and will select the

        if let Some(limit) = self.limit {
            warn!("agg exec: {}", self.is_unordered_unfiltered_group_by_distinct());
            if !self.is_unordered_unfiltered_group_by_distinct() {
                warn!("agg exec: create GroupedPriorityQueue");
                return Ok(StreamType::GroupedPriorityQueue(
                    GroupedTopKAggregateStream::new(self, context, partition, limit)?,
                ));
            }
        }

GroupedTopKAggregateStream.

The implementation of GroupedTopKAggregateStream get the right result for the SQL, but the efficiency is not good, because we don't care about the order and don't need to consume all of downstream data

from arrow-datafusion.

liukun4515 avatar liukun4515 commented on July 18, 2024

But other issue i found in the AggregateExec, when we push the limit to the agg exec and will select the

        if let Some(limit) = self.limit {
            warn!("agg exec: {}", self.is_unordered_unfiltered_group_by_distinct());
            if !self.is_unordered_unfiltered_group_by_distinct() {
                warn!("agg exec: create GroupedPriorityQueue");
                return Ok(StreamType::GroupedPriorityQueue(
                    GroupedTopKAggregateStream::new(self, context, partition, limit)?,
                ));
            }
        }

GroupedTopKAggregateStream.

The implementation of GroupedTopKAggregateStream get the right result for the SQL, but the efficiency is not good, because we don't care about the order and don't need to consume all of downstream data

In our sql:

select
  LO_SUPPKEY
from
  SSB_1G.LINEORDER
GROUP BY
  LO_SUPPKEY
limit 20  offset 10

There is no sort/order and agg expression cause, we don't need to use the GroupedTopKAggregateStream struct to get the result. The GroupedTopKAggregateStream is not efficient for the SQL.

The GroupedTopKAggregateStream will consume all of the data and use the PriorityQueue to store and sort all data

from arrow-datafusion.

alamb avatar alamb commented on July 18, 2024

GroupedTopKAggregateStream

I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192

I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a MIN or MAX aggregate 🤔

from arrow-datafusion.

avantgardnerio avatar avantgardnerio commented on July 18, 2024

GroupedTopKAggregateStream

I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192

I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a MIN or MAX aggregate 🤔

I'm surprised this is being invoked, because:

        // ensure the sort direction matches aggregate function
        let (field, desc) = aggr.get_minmax_desc()?;
        if desc != order.options.descending {
            return None;
        }

and

    /// Finds the DataType and SortDirection for this Aggregate, if there is one
    pub fn get_minmax_desc(&self) -> Option<(Field, bool)> {
        let agg_expr = self.aggr_expr.as_slice().first()?;
        if let Some(max) = agg_expr.as_any().downcast_ref::<Max>() {
            Some((max.field().ok()?, true))
        } else if let Some(min) = agg_expr.as_any().downcast_ref::<Min>() {
            Some((min.field().ok()?, false))
        } else {
            None
        }
    }

from arrow-datafusion.

liukun4515 avatar liukun4515 commented on July 18, 2024

GroupedTopKAggregateStream

I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192

I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a MIN or MAX aggregate 🤔

it's normal, if just want to get the distinct value for one or several column, and using the bellow sql:

select distinct column from table limit n

select column from table group by column limit n

The usages is from our cases, customer want to get the distinct value

from arrow-datafusion.

liukun4515 avatar liukun4515 commented on July 18, 2024

I think pr #7192 introduced the top_k agg with the priority queue in the AggregateExec , and it is used to optimize the case like bellow pattern:

select column, min(xx) from table group by column order by min(xx)

But in the pr #8038 introduced the new rule of push limit for distinct column which use the is_unordered_unfiltered_group_by_distinct to check the condition without the sort condition in the plan. This rule is used to optimize the case like:

select distinct column from table
select column from table group by column

from arrow-datafusion.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.