Git Product home page Git Product logo

Comments (10)

eejbyfeldt avatar eejbyfeldt commented on June 29, 2024 1

Will work on this one. I think it might also involve moving nth_value since they shared some code.

from arrow-datafusion.

eejbyfeldt avatar eejbyfeldt commented on June 29, 2024 1

@jayzhan211 I pushed a work in progress here #11029 it still fails some test cases in sqllogictests

External error: query failed: DataFusion error: This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: NTH_VALUE(aggregate_test_100.c4,Int64(3)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING
[SQL] SELECT
   NTH_VALUE(c4, 3) OVER(ORDER BY c9 ASC ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING) as nth_value1,
   NTH_VALUE(c4, 2) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as nth_value2
   FROM aggregate_test_100
   ORDER BY c9
   LIMIT 5
at test_files/window.slt:1205

External error: query failed: DataFusion error: External error: Arrow error: Invalid argument error: column types must match schema types, expected List(Field { name: "item", data_type: Struct([Field { name: "sn@1", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) but found List(Field { name: "item", data_type: Struct([Field { name: "sn@0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) at column index 2
[SQL] SELECT ARRAY_AGG(e.rate ORDER BY e.sn)
FROM sales_global AS s
JOIN exchange_rates AS e
ON s.currency = e.currency_from AND
   e.currency_to = 'USD' AND
   s.ts >= e.ts
GROUP BY s.sn
ORDER BY s.sn;
at test_files/group_by.slt:3181

The first one seems to be because the new NTH_VALUE UDAF is picked over the builtin window function with the same name. Is this expected? What is the correct course of action to resolve it?

The second one looks a bit weird to me, not sure if I messed something up or I hitting some other issue.

@jayzhan211 If you have time to provide some pointers that would be highly appreciated :)

from arrow-datafusion.

jayzhan211 avatar jayzhan211 commented on June 29, 2024

I suggest we convert 1. ArrayAgg 2 DistinctArrayAgg, 3. OrderSensitiveArrayAgg and 3. NthValue separately.

OrderSensitiveArrayAgg and NthValue are quite complex.

from arrow-datafusion.

jayzhan211 avatar jayzhan211 commented on June 29, 2024

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

We can get nullable with

let nullable = expr.nullable(input_schema)?;

https://github.com/apache/datafusion/blob/58d23c5c050f43aa7b867d4f0be7298d8d6cad83/datafusion/physical-expr-common/src/aggregate/mod.rs#L275C1-L289C6

from arrow-datafusion.

eejbyfeldt avatar eejbyfeldt commented on June 29, 2024

I suggest we convert ArrayAgg, DistinctArrayAgg, OrderSensitiveArrayAgg and NthValue separately.

At least leave OrderSensitiveArrayAgg and NthValue in another PR, since they have ordering and window function so it is quite complex

Sounds good. If only converting ArrayAgg how do one handle that there are multiple expressions using the same name? Should it not be registered? Or should I just leave the existing ArrayAgg code as is? The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?

from arrow-datafusion.

eejbyfeldt avatar eejbyfeldt commented on June 29, 2024

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

Make sense. How come we only provide a single value for input_type (https://github.com/apache/datafusion/blob/39.0.0/datafusion/physical-expr-common/src/aggregate/mod.rs#L245) can aggregates not have multiple inputs? Should the nullable field be just input_nullable: bool or should it be inputs_nullable: Vec<bool>?

from arrow-datafusion.

jayzhan211 avatar jayzhan211 commented on June 29, 2024

ArrayAgg and Nth expect to have parameter nullable, we need to add it to AggregateFunctionExpr so we can add it to StateFieldsArgs for state_fields and change the nullable for field.

Make sense. How come we only provide a single value for input_type (https://github.com/apache/datafusion/blob/39.0.0/datafusion/physical-expr-common/src/aggregate/mod.rs#L245) can aggregates not have multiple inputs? Should the nullable field be just input_nullable: bool or should it be inputs_nullable: Vec<bool>?

We have single input because we have not meet any function that need multiple input yet. If there is any function that expect multiple input, we can extend it to Vec

from arrow-datafusion.

jayzhan211 avatar jayzhan211 commented on June 29, 2024

I suggest we convert ArrayAgg, DistinctArrayAgg, OrderSensitiveArrayAgg and NthValue separately.
At least leave OrderSensitiveArrayAgg and NthValue in another PR, since they have ordering and window function so it is quite complex

Sounds good. If only converting ArrayAgg how do one handle that there are multiple expressions using the same name? Should it not be registered? Or should I just leave the existing ArrayAgg code as is? The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?

We could check the distinct and ordering to know whether we should use builtin or UDAF here

pub fn create_aggregate_expr_with_name_and_maybe_filter(
e: &Expr,
name: impl Into<String>,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
execution_props: &ExecutionProps,
) -> Result<AggregateExprWithOptionalArgs> {
match e {
Expr::AggregateFunction(AggregateFunction {
func_def,
distinct,
args,
filter,
order_by,
null_treatment,
}) => {
let physical_args =
create_physical_exprs(args, logical_input_schema, execution_props)?;
let filter = match filter {
Some(e) => Some(create_physical_expr(
e,
logical_input_schema,
execution_props,
)?),
None => None,
};
let ignore_nulls = null_treatment
.unwrap_or(sqlparser::ast::NullTreatment::RespectNulls)
== NullTreatment::IgnoreNulls;
let (agg_expr, filter, order_by) = match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
let physical_sort_exprs = match order_by {
Some(exprs) => Some(create_physical_sort_exprs(
exprs,
logical_input_schema,
execution_props,
)?),
None => None,
};
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);
let agg_expr = aggregates::create_aggregate_expr(
fun,
*distinct,
&physical_args,
&ordering_reqs,
physical_input_schema,
name,
ignore_nulls,
)?;
(agg_expr, filter, physical_sort_exprs)
}
AggregateFunctionDefinition::UDF(fun) => {
let sort_exprs = order_by.clone().unwrap_or(vec![]);
let physical_sort_exprs = match order_by {
Some(exprs) => Some(create_physical_sort_exprs(
exprs,
logical_input_schema,
execution_props,
)?),
None => None,
};
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);
let agg_expr = udaf::create_aggregate_expr(
fun,
&physical_args,
args,
&sort_exprs,
&ordering_reqs,
physical_input_schema,
name,
ignore_nulls,
*distinct,
)?;
(agg_expr, filter, physical_sort_exprs)
}
};
Ok((agg_expr, filter, order_by))
}
other => internal_err!("Invalid aggregate expression '{other:?}'"),
}
}

The accumulator is private so it can not easily be reused, or is it acceptable to make it public in this intermediate state?
We can move to physical_expr_common first, after all the related function is done, then move it back.

from arrow-datafusion.

eejbyfeldt avatar eejbyfeldt commented on June 29, 2024

We have single input because we have not meet any function that need multiple input yet. If there is any function that expect multiple input, we can extend it to Vec

What about covariance: https://github.com/apache/datafusion/blob/main/datafusion/functions-aggregate/src/covariance.rs#L43 that takes 2 arguments.

from arrow-datafusion.

eejbyfeldt avatar eejbyfeldt commented on June 29, 2024

@jayzhan211 Created a PR for only doning ArrayAgg here #11045 will look into adding nullable next.

from arrow-datafusion.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.