Git Product home page Git Product logo

Comments (17)

danielcweeks avatar danielcweeks commented on May 17, 2024 6

For anyone interested, Spark already has a good reference implementation in Scala here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala

One thing to note is that there isn't an implementation for map type. I think Arrow was planning on using list<struct<key,value>, but engines may need something closer to struct<keys:list<>, values:list<>> in order overlay the columnar implementations directly. It might be necessary to provide both.

I'm happy to help anyone who's interested in working on this.

from iceberg.

danielcweeks avatar danielcweeks commented on May 17, 2024 3

@mccheah We have started working on a more complete integration with arrow and we'll be following up incrementally as things progress. @anjalinorwood and @samarthjain are working on this here at Netflix. There are still some things that will need to be proved out, but plan to try to chunk the work into smaller pieces for better review and comment.

from iceberg.

danielcweeks avatar danielcweeks commented on May 17, 2024 2

@aokolnychyi I just made a pull request for a schema converter between iceberg and arrow, which is the first step in getting setup for building vectors: #194

from iceberg.

rdblue avatar rdblue commented on May 17, 2024

There's more context and discussion on the issue in the old Netflix project: Netflix/iceberg#90

from iceberg.

prodeezy avatar prodeezy commented on May 17, 2024

@rdblue .. I ran some benchmarks on production sized data and we'r seeing a large enough gap in scan performance between Spark's core reader impl (with vectorization) and Iceberg's Parquet Reader. (I can share them but I guess the difference is expected).

I'd be glad to work on this effort. If you have thoughts around this from previous discussions and general approach/challenges please let me know. I understand this is prolly a pretty large undertaking coz it involves other formats as well. Meanwhile, i'm trying to get something working so I can get an understanding of the challenges involved so I can work on a proposal. But i'd appreciate your thoughts around things we should address in a potential solution.

/cc @rominparekh @fbocse @aokolnychyi @mccheah

from iceberg.

aokolnychyi avatar aokolnychyi commented on May 17, 2024

I would totally support this effort. The benchmarks in this PR also confirm that vectorized execution is important.

I should mention that Iceberg's Parquet reader seems to be significantly more efficient on nested data.

from iceberg.

mccheah avatar mccheah commented on May 17, 2024

I would certainly review and collaborate on this!

@aokolnychyi Spark disables vectorized read on nested data, which would then indicate that Iceberg without vectorized read is faster than Spark without vectorized read.

from iceberg.

aokolnychyi avatar aokolnychyi commented on May 17, 2024

@mccheah you are right that we don't have vectorized reads on nested data in Spark. The benchmarks in the PR above test vectorized and non-vectorized reads on flat and nested data. On flat data, Iceberg is slightly faster. The real difference is seen on nested data.

As I also said in that PR, Iceberg DS is V2 and the file source is still V1, which complicates the comparison. I am working on benchmarks for readers alone without Spark.

from iceberg.

rdblue avatar rdblue commented on May 17, 2024

It would be great to have people working on this! Feel free to pick it up and I'll help review it. I think there are good tests that we can copy that are used to validate the row-based readers and writers.

@julienledem may also be interested. He has given talks on how to efficiently deserialize Parquet to Arrow and can hopefully help answer questions.

from iceberg.

prodeezy avatar prodeezy commented on May 17, 2024

Thanks @danielcweeks ! I also found https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala that has some nice plumbing for Batch to Iterator[InternalRow] and vice versa. This is used by Spark's Dataset to convert partition/split data into Arrow Batch Rdds.

from iceberg.

aokolnychyi avatar aokolnychyi commented on May 17, 2024

Another point about the current reader is that we first construct GenericInternalRows and then have an unsafe projection to get UnsafeRows.

Here are benchmark results for readers/writers alone (without Spark):

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersFlatDataBenchmark.readUsingIcebergReader                          ss    5  7.336 ± 0.298   s/op
SparkParquetReadersFlatDataBenchmark.readUsingIcebergReaderUnsafe                    ss    5  8.533 ± 0.237   s/op
SparkParquetReadersFlatDataBenchmark.readUsingSparkReader                            ss    5  9.859 ± 0.335   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingIcebergReader            ss    5  3.964 ± 0.140   s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe      ss    5  5.032 ± 0.061   s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingSparkReader              ss    5  5.076 ± 0.181   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReader                        ss    5  3.925 ± 0.225   s/op
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReaderUnsafe                  ss    5  4.514 ± 0.097   s/op
SparkParquetReadersNestedDataBenchmark.readUsingSparkReader                          ss    5  5.673 ± 0.194   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReader          ss    5  1.914 ± 0.126   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe    ss    5  2.408 ± 0.058   s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingSparkReader            ss    5  2.682 ± 0.165   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetWritersFlatDataBenchmark.writeUsingIcebergWriter                         ss    5  4.789 ± 0.168   s/op
SparkParquetWritersFlatDataBenchmark.writeUsingSparkWriter                           ss    5  4.948 ± 0.188   s/op

Benchmark                                                                          Mode  Cnt  Score   Error  Units
SparkParquetWritersNestedDataBenchmark.writeUsingIcebergWriter                       ss    5  2.750 ± 0.184   s/op
SparkParquetWritersNestedDataBenchmark.writeUsingSparkWriter                         ss    5  2.859 ± 0.222   s/op

The code is available in the above PR.

from iceberg.

mccheah avatar mccheah commented on May 17, 2024

Is there a plan for how we're going to tackle this? So far I've only seen concrete work in #194. Is there a plan to build out the entire Arrow integration? Perhaps it's appropriate to break down this issue into multiple sub-issues so that PRs can target the sub-issues - then we can merge everything knowing that each piece will be part of a known complete picture?

from iceberg.

prodeezy avatar prodeezy commented on May 17, 2024

I'v added a WIP branch with a working POC for vectorization for primitive types in Iceberg
https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP

Implementation Notes:

  • Iceberg's Reader adds a SupportsScanColumnarBatch mixin to instruct the DataSourceV2ScanExec to use planBatchPartitions() instead of the usual planInputPartitions(). It returns instances of ColumnarBatch on each iteration.
  • ArrowSchemaUtil contains Iceberg to Arrow type conversion. This was copied from [3] . Added by @danielcweeks . Thanks for that!
  • VectorizedParquetValueReaders contains ParquetValueReaders used for reading/decoding the Parquet rowgroups (aka pagestores as referred to in the code)
  • VectorizedSparkParquetReaders contains the visitor implementations to map Parquet types to appropriate value readers. I implemented the struct visitor so that the root schema can be mapped properly. This has the added benefit of vectorization support for structs, so yay!
  • For the initial version the value readers read an entire row group into a single Arrow FieldVector. this i'd imagine will require tuning for right batch sizing but i'v gone with one batch per rowgroup for now.
  • Arrow Field Vectors are wrapped using ArrowColumnVector which is Spark's ColumnVector implementation backed by Arrow. This is the first contact point between Spark and Arrow interfaces.
  • ArrowColumnVectors are stitched together into a ColumnarBatch by ColumnarBatchReader . This is my replacement for InternalRowReader which maps Structs to Columnar Batches. This allows us to have nested structs where each level of nesting would be a nested columnar batch. Lemme know what you think of this approach.
  • I'v added value readers for all supported primitive types listed in AvroDataTest. There's a corresponding test for vectorized reader under TestSparkParquetVectorizedReader
  • I haven't fixed all the Checkstyle errors so you will have to turn checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(

P.S. There's some unused code under ArrowReader.java Ignore this as it's not used. This was from my previous impl of Vectorization. I'v kept it around to compare performance.

Lemme know what folks think of the approach. I'm getting this working for our scale test benchmark and will report back with numbers. Feel free to run your own benchmarks and share.

from iceberg.

danielcweeks avatar danielcweeks commented on May 17, 2024

A few of us met with @prodeezy to discuss the poc implementation (great work btw) and look at the future work we all hope to be contributing to. I've captured some of the discussion and ideas of where we going and considerations here: https://docs.google.com/document/d/1qVcowrYP6xBoB9C4htwEA0QvbHpdstzieNsX26SMG2k/edit#heading=h.yun6jblu7cfi

Feel free to add comments. I think with a little cleanup we might be close to having an initial implementation that we can start iterating on. @rdblue had proposed creating a branch, which I'm all for so we can work more openly and get more eyes on the implementation.

(@samarthjain @anjalinorwood)

from iceberg.

prodeezy avatar prodeezy commented on May 17, 2024

Will be using https://github.com/apache/incubator-iceberg/tree/vectorized-read going forward to iterate on this feature.

from iceberg.

prodeezy avatar prodeezy commented on May 17, 2024

Vectorization Perf Meeting notes (Aug 1):

After running benchmarks met with @samarthjain , @anjalinorwood and @rdblue to go over some possible improvements we came up with the following.

Possible low hanging fruit to reap with perf :

  • Remove virtual calls in vector reader, make concrete classes (Anjali)
  • Remove hasNext() calls and currentDefinitionLevels() checks and tighten the VectorReader.read() loop (Anjali)
  • Look into arrow memory allocation, try to preallocate memory at initialization and avoid re-allocations ( Samarth )
  • Slowness seen in time spent in FieldVector.setSafe() calls Improve Decimal / String variable byte reading. Make this fixed length based or using binary reading (Gautam)

Deeper look :

  • Possibly look into removing advance() calls at every stage in ColumnIterator
  • Look into removing triple iterator abstraction and work at lower level with parquet

from iceberg.

rdblue avatar rdblue commented on May 17, 2024

Since #828 was merged, I'm going to close this. Let's track the remaining work in the vectorized read milestone.

from iceberg.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.