Comments (9)
cc @alamb
I will fix this
from datafusion.
I will file a pr to resolve the ser/deser issue.
from datafusion.
my sql is like:
select
LO_SUPPKEY
from
SSB_1G.LINEORDER
GROUP BY
LO_SUPPKEY
limit 20 offset 10
The stand-alone physical plan is:
The agg exec add the limit value in the plan after the rule of LimitedDistinctAggregation
, but it does not take effect when we send the physical plan to remote side.
GlobalLimitExec: skip=10, fetch=20
CoalescePartitionsExec
LocalLimitExec: fetch=30
AggregateExec: mode=FinalPartitioned, gby=[LO_SUPPKEY@0 as LO_SUPPKEY], aggr=[], lim=[30]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([LO_SUPPKEY@0], 16), input_partitions=16
RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=2
AggregateExec: mode=Partial, gby=[LO_SUPPKEY@0 as LO_SUPPKEY], aggr=[], lim=[30]
ProjectionExec: expr=[2@0 as LO_SUPPKEY]
ParquetExec: file_groups={2 groups:
from datafusion.
But other issue i found in the AggregateExec
, when we push the limit to the agg exec and will select the
if let Some(limit) = self.limit {
warn!("agg exec: {}", self.is_unordered_unfiltered_group_by_distinct());
if !self.is_unordered_unfiltered_group_by_distinct() {
warn!("agg exec: create GroupedPriorityQueue");
return Ok(StreamType::GroupedPriorityQueue(
GroupedTopKAggregateStream::new(self, context, partition, limit)?,
));
}
}
GroupedTopKAggregateStream
.
The implementation of GroupedTopKAggregateStream
get the right result for the SQL, but the efficiency is not good, because we don't care about the order and don't need to consume all of downstream data
from datafusion.
But other issue i found in the
AggregateExec
, when we push the limit to the agg exec and will select theif let Some(limit) = self.limit { warn!("agg exec: {}", self.is_unordered_unfiltered_group_by_distinct()); if !self.is_unordered_unfiltered_group_by_distinct() { warn!("agg exec: create GroupedPriorityQueue"); return Ok(StreamType::GroupedPriorityQueue( GroupedTopKAggregateStream::new(self, context, partition, limit)?, )); } }
GroupedTopKAggregateStream
.The implementation of
GroupedTopKAggregateStream
get the right result for the SQL, but the efficiency is not good, because we don't care about the order and don't need to consume all of downstream data
In our sql:
select
LO_SUPPKEY
from
SSB_1G.LINEORDER
GROUP BY
LO_SUPPKEY
limit 20 offset 10
There is no sort/order and agg expression cause, we don't need to use the GroupedTopKAggregateStream
struct to get the result. The GroupedTopKAggregateStream
is not efficient for the SQL.
The GroupedTopKAggregateStream
will consume all of the data and use the PriorityQueue
to store and sort all data
from datafusion.
GroupedTopKAggregateStream
I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192
I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a MIN
or MAX
aggregate 🤔
from datafusion.
GroupedTopKAggregateStream
I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192
I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a
MIN
orMAX
aggregate 🤔
I'm surprised this is being invoked, because:
// ensure the sort direction matches aggregate function
let (field, desc) = aggr.get_minmax_desc()?;
if desc != order.options.descending {
return None;
}
and
/// Finds the DataType and SortDirection for this Aggregate, if there is one
pub fn get_minmax_desc(&self) -> Option<(Field, bool)> {
let agg_expr = self.aggr_expr.as_slice().first()?;
if let Some(max) = agg_expr.as_any().downcast_ref::<Max>() {
Some((max.field().ok()?, true))
} else if let Some(min) = agg_expr.as_any().downcast_ref::<Min>() {
Some((min.field().ok()?, false))
} else {
None
}
}
from datafusion.
GroupedTopKAggregateStream
I believe @avantgardnerio / @thinkharderdev worked on this code, as part of #7192
I agree it is strange to be using that operator given a SQL without an aggregate. I thought it only kicked in if there was a
MIN
orMAX
aggregate 🤔
it's normal, if just want to get the distinct value for one or several column, and using the bellow sql:
select distinct column from table limit n
select column from table group by column limit n
The usages is from our cases, customer want to get the distinct value
from datafusion.
I think pr #7192 introduced the top_k agg with the priority queue
in the AggregateExec
, and it is used to optimize the case like bellow pattern:
select column, min(xx) from table group by column order by min(xx)
But in the pr #8038 introduced the new rule of push limit for distinct column
which use the is_unordered_unfiltered_group_by_distinct
to check the condition without the sort
condition in the plan. This rule is used to optimize the case like:
select distinct column from table
select column from table group by column
from datafusion.
Related Issues (20)
- regression: query error with case when as the group field HOT 6
- ScalarValue::new_ten error cites one not ten
- Support Map as a ScalarValue HOT 4
- Consider using `Arc::clone` to clone Arcs to make it clear they aren't deep copies HOT 1
- Support unparsing the Value Plan of Array (List) to SQL String HOT 2
- Support `Dictionary` in Parquet Metadata Statistics HOT 4
- Is pre-compile pattern string in regexp_match operation HOT 3
- Implement physical plan serialization for COPY plans `CsvLogicalExtensionCodec` HOT 7
- Remove special casting of `Min` / `Max` built in `AggregateFunctions` HOT 6
- Remove `Min`/`Max` references from `AggregateExec::get_minmax_descr` HOT 1
- Remove `Min`/`Max` references from `AggregateStatistics` HOT 9
- Union columns can never be `NULL` HOT 7
- Allow sorting to improve `FixedSizeBinary` filtering HOT 6
- [EPIC] Improve examples to make them easier to navigate HOT 4
- Inconsistent behavior of '+-' arithmetic operator on Boolean type HOT 1
- Optionally display schema in explain plan
- math functions precision with f64 doesn't quite vibe on FreeBSD HOT 1
- Break datafusion-catalog code into its own crate HOT 6
- Support `FixedSizedBinaryArray` Parquet Data Page Statistics HOT 1
- Support `DictionaryArray` Parquet Data Page Statistics HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from datafusion.