Git Product home page Git Product logo

Comments (14)

sscdotopen avatar sscdotopen commented on July 30, 2024 2

Hi Malcolm, we are very happy about your contributions. Take your time :)

from deequ.

sscdotopen avatar sscdotopen commented on July 30, 2024 1

Its a bit of tricky case. The isNonNegative check ultimately results in a Compliance constraint:

https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/checks/Check.scala#L670

I think we would need a special implementation that casts the column to the appropriate numeric type inside the coalesce function.

Does that make sense?

from deequ.

sscdotopen avatar sscdotopen commented on July 30, 2024 1

Hi Malcolm,

The application of such constraints on a dataframe is pretty involved in Deequ. For every constraint, we internally create an Analyzer that computes the required metric. In our case, this would be a Compliance analyzer. Every such analyzer can specifiy a set of aggregation functions that it wants to have computed over the data:

https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/analyzers/Compliance.scala#L47

Deequ then collects all required aggregation functions and generates a set of SparkSQL queries from that. The idea is to generate the queries in a way that we have to scan the data as few times as possible (e.g., compute many aggregation functions at once). The corresponding code is the AnalysisRunner.

https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala

So the best way to fix this bug would be to change the aggregation function, I think we could cast the value inside the "COALESCE" to a DoubleType, this should make the code work with strings, doubles and ints.

from deequ.

malcolmgreaves avatar malcolmgreaves commented on July 30, 2024

The following Scala code shows this error in action:

import com.amazon.deequ.analyzers.Analysis
import com.amazon.deequ.analyzers.runners.AnalysisRunner
import com.amazon.deequ.checks.{ Check, CheckLevel, CheckResult } 
import com.amazon.deequ.suggestions.{ ConstraintSuggestion, ConstraintSuggestionRunner, Rules }

import org.apache.spark.sql.{ DataFrame, SparkSession }

import tools.reflect.ToolBox
import scala.reflect.runtime.currentMirror

case class S(value: String)
case class L(value: Long)

val values = Seq(Long.MaxValue, s"${Long.MaxValue}", 25252, "25252")

def show_fail(spark: SparkSession): Unit = {

    val strNums: DataFrame = spark.createDataFrame(values.map {
        case x: Long   => S(s"$x")
        case x: Int    => S(s"$x")
        case x: String => S(x)
    })
    
    val longNums: DataFrame = spark.createDataFrame(values.map {
        case x: Long   => L(x)
        case x: Int    => L(x.toLong)
        case x: String => L(x.toLong)
    })

    val expNonNegCheck = Check(CheckLevel.Error, """.isNonNegative("value")""").isNonNegative("value")
    println("::Explicit::")
    println(s"Check evaluation on string typed data: ${analyze(strNums,  expNonNegCheck).status}")
    println(s"Check evaluation on long typed data:   ${analyze(longNums, expNonNegCheck).status}")
    println("----------------------------------------------")
    
    val sugNonNegCheck = checkFromConstraint(suggestNonNegCheck(strNums))
    println("::Suggested::")
    println(s"Check evaluation on string typed data: ${analyze(strNums,  sugNonNegCheck).status}")
    println(s"Check evaluation on long typed data:   ${analyze(longNums, sugNonNegCheck).status}")
}

def analyze(df: DataFrame, check: Check): CheckResult = 
  check.evaluate(
    AnalysisRunner
      .onData(df)
      .addAnalyzers(Analysis().analyzers ++ check.requiredAnalyzers())
      .run()
    )

def suggestNonNegCheck(df: DataFrame, cname: String = "value"): ConstraintSuggestion = {
  ConstraintSuggestionRunner()
    .onData(df)
    .addConstraintRules(Rules.DEFAULT)
    .run()
    .constraintSuggestions
    .filter { case (columnName, _) => columnName == cname }
    .head
    ._2
    .filter { x => x.codeForConstraint.contains(""".isNonNegative("value")""") }
    .head
}


def checkFromConstraint(constraint: ConstraintSuggestion): Check = {
    val source = s"""
           |com.amazon.deequ.checks.Check(com.amazon.deequ.checks.CheckLevel.Error, "${constraint.description}")${constraint.codeForConstraint}
         """.stripMargin.trim()
    val toolbox = currentMirror.mkToolBox()
    val tree = toolbox.parse(source)
    val compiledCode = toolbox.compile(tree)
    compiledCode().asInstanceOf[Check]
}

When show_fail(spark) is executed (for some valid SparkSession, e.g. the one that's created with spark-shell), the following output is observed:

::Explicit::
Check evaluation on string typed data: Error
Check evaluation on long typed data:   Success
----------------------------------------------
::Suggested::
Check evaluation on string typed data: Error
Check evaluation on long typed data:   Success

NOTE: If reproducing in spark-shell, be sure to use the paste mode (:paste) to add all of the above code in one go!

from deequ.

malcolmgreaves avatar malcolmgreaves commented on July 30, 2024

@sscdotopen I am planning on writing a fix for this. Unlike the others, however, I am not as sure where to start. Any pointers?

from deequ.

malcolmgreaves avatar malcolmgreaves commented on July 30, 2024

Yes that makes sense: I notice that the .isNonNegative check generates some Spark SQL (the COALESE statement).

I'm not too well versed in Spark SQL: is there something equivalent to .map { _.toLong }? or .map { _.toDouble }? We could invoke this iff we see that we're applying to a column with type string. Conceptually, it would go:

if column is numeric-typed: 
  run check
else if column is string typed:
  ** convert string values to numeric values
  run check
else:
  fail

The run check & fail already exists -- the column value type check & conversion is what I don't know how to do cleanly in the existing framework.

from deequ.

malcolmgreaves avatar malcolmgreaves commented on July 30, 2024

In Scala, not Spark SQL, the following implementation is a POC for the .hasNonNegative Check:
(we'd have to get the correct column name from the Check, however :P):

def isNonNegative(df: DataFrame): Boolean = Try {
    df
      .select("value")
      .map { _(0) match {
        case v: String => v.toDouble
        case v: Double => v
        case v: Int => v.toDouble
        case v: Short => v.toDouble
        case v: Long => v.toDouble
      }}
      .filter { _ <= 0 }
      .count() == 0
}.getOrElse(false)

With this, and the above defs, when we execute:

println(isNonNegative(strNums))
println(isNonNegative(longNums))
println(isNonNegative(spark.createDataFrame(Seq(S("hi")))))

we see the following output:

true
true
false

from deequ.

malcolmgreaves avatar malcolmgreaves commented on July 30, 2024

In the satisfies(...) calls, I don't understand where the DataFrame that we're checking against comes into play. If we could just insert some of this value type-converting code at the point before it calls COALESE(...)...

from deequ.

malcolmgreaves avatar malcolmgreaves commented on July 30, 2024

Hi! Just wanted to follow up v quick so that y'all don't think I've abandoned this issue :D -- I will work on getting a fix out for this as soon as I can. I have been working on this for a work project and, as life goes, I have a new project that's spinning up: I will be working at a reduced / slower rate.

from deequ.

klangner avatar klangner commented on July 30, 2024

Is anyone working on this?
If not I would be interested in it.

from deequ.

sscdotopen avatar sscdotopen commented on July 30, 2024

Feel free to pick this up!

from deequ.

klangner avatar klangner commented on July 30, 2024

If I understand it correctly this can be solved in 2 different way:

  • Do not suggest isNonNegative on non numeric column
  • If there is isNonNegative check then try to convert the column values into numeric type

From the discussion it looks that preferred way is to fix it during check (the later).
In this case how to tread non-numeric values like "any text"? I guess the check shouldn't pass then?

TBH I would rather prefer not to have numeric checks proposed on non-numeric column types.
the reason for this is that if I know that this column contains numeric-values then I should convert it to proper type first. If I'm not doing it that maybe there is a reason for it, and the values are not really of numeric type.
Also the map function for converting String -> Numeric is easy to write (and cheap) and can deal with any corner cases so this isn't a big deal.

from deequ.

klangner avatar klangner commented on July 30, 2024

One more question :-)
Isn't it fixed already? I was trying to prepare test for it but found one already passing:

    "correctly evaluate non negative and positive constraints" in withSparkSession { sparkSession =>
      val nonNegativeCheck = Check(CheckLevel.Error, "a")
        .isNonNegative("item")

      val isPositiveCheck = Check(CheckLevel.Error, "a")
        .isPositive("item")

      val results = runChecks(getDfWithNumericValues(sparkSession), nonNegativeCheck,
        isPositiveCheck)

      assertEvaluatesTo(nonNegativeCheck, results, CheckStatus.Success)
      assertEvaluatesTo(isPositiveCheck, results, CheckStatus.Success)
    }

And the dataset:

  def getDfWithNumericValues(sparkSession: SparkSession): DataFrame = {
    import sparkSession.implicits._
    // att2 is always bigger than att1
    Seq(
      ("1", 1, 0, 0),
      ("2", 2, 0, 0),
      ("3", 3, 0, 0),
      ("4", 4, 5, 4),
      ("5", 5, 6, 6),
      ("6", 6, 7, 7)
    ).toDF("item", "att1", "att2", "att3")
  }

It will fail if I change the value in "item" column to negative one or not-a-number value

from deequ.

sscdotopen avatar sscdotopen commented on July 30, 2024

Thanks for figuring that one out.

from deequ.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.