Comments (17)
For anyone interested, Spark already has a good reference implementation in Scala here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
One thing to note is that there isn't an implementation for map type. I think Arrow was planning on using list<struct<key,value>, but engines may need something closer to struct<keys:list<>, values:list<>> in order overlay the columnar implementations directly. It might be necessary to provide both.
I'm happy to help anyone who's interested in working on this.
from iceberg.
@mccheah We have started working on a more complete integration with arrow and we'll be following up incrementally as things progress. @anjalinorwood and @samarthjain are working on this here at Netflix. There are still some things that will need to be proved out, but plan to try to chunk the work into smaller pieces for better review and comment.
from iceberg.
@aokolnychyi I just made a pull request for a schema converter between iceberg and arrow, which is the first step in getting setup for building vectors: #194
from iceberg.
There's more context and discussion on the issue in the old Netflix project: Netflix/iceberg#90
from iceberg.
@rdblue .. I ran some benchmarks on production sized data and we'r seeing a large enough gap in scan performance between Spark's core reader impl (with vectorization) and Iceberg's Parquet Reader
. (I can share them but I guess the difference is expected).
I'd be glad to work on this effort. If you have thoughts around this from previous discussions and general approach/challenges please let me know. I understand this is prolly a pretty large undertaking coz it involves other formats as well. Meanwhile, i'm trying to get something working so I can get an understanding of the challenges involved so I can work on a proposal. But i'd appreciate your thoughts around things we should address in a potential solution.
/cc @rominparekh @fbocse @aokolnychyi @mccheah
from iceberg.
I would totally support this effort. The benchmarks in this PR also confirm that vectorized execution is important.
I should mention that Iceberg's Parquet reader seems to be significantly more efficient on nested data.
from iceberg.
I would certainly review and collaborate on this!
@aokolnychyi Spark disables vectorized read on nested data, which would then indicate that Iceberg without vectorized read is faster than Spark without vectorized read.
from iceberg.
@mccheah you are right that we don't have vectorized reads on nested data in Spark. The benchmarks in the PR above test vectorized and non-vectorized reads on flat and nested data. On flat data, Iceberg is slightly faster. The real difference is seen on nested data.
As I also said in that PR, Iceberg DS is V2 and the file source is still V1, which complicates the comparison. I am working on benchmarks for readers alone without Spark.
from iceberg.
It would be great to have people working on this! Feel free to pick it up and I'll help review it. I think there are good tests that we can copy that are used to validate the row-based readers and writers.
@julienledem may also be interested. He has given talks on how to efficiently deserialize Parquet to Arrow and can hopefully help answer questions.
from iceberg.
Thanks @danielcweeks ! I also found https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala that has some nice plumbing for Batch to Iterator[InternalRow]
and vice versa. This is used by Spark's Dataset
to convert partition/split data into Arrow Batch Rdds.
from iceberg.
Another point about the current reader is that we first construct GenericInternalRow
s and then have an unsafe projection to get UnsafeRow
s.
Here are benchmark results for readers/writers alone (without Spark):
Benchmark Mode Cnt Score Error Units
SparkParquetReadersFlatDataBenchmark.readUsingIcebergReader ss 5 7.336 ± 0.298 s/op
SparkParquetReadersFlatDataBenchmark.readUsingIcebergReaderUnsafe ss 5 8.533 ± 0.237 s/op
SparkParquetReadersFlatDataBenchmark.readUsingSparkReader ss 5 9.859 ± 0.335 s/op
Benchmark Mode Cnt Score Error Units
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingIcebergReader ss 5 3.964 ± 0.140 s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe ss 5 5.032 ± 0.061 s/op
SparkParquetReadersFlatDataBenchmark.readWithProjectionUsingSparkReader ss 5 5.076 ± 0.181 s/op
Benchmark Mode Cnt Score Error Units
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReader ss 5 3.925 ± 0.225 s/op
SparkParquetReadersNestedDataBenchmark.readUsingIcebergReaderUnsafe ss 5 4.514 ± 0.097 s/op
SparkParquetReadersNestedDataBenchmark.readUsingSparkReader ss 5 5.673 ± 0.194 s/op
Benchmark Mode Cnt Score Error Units
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReader ss 5 1.914 ± 0.126 s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingIcebergReaderUnsafe ss 5 2.408 ± 0.058 s/op
SparkParquetReadersNestedDataBenchmark.readWithProjectionUsingSparkReader ss 5 2.682 ± 0.165 s/op
Benchmark Mode Cnt Score Error Units
SparkParquetWritersFlatDataBenchmark.writeUsingIcebergWriter ss 5 4.789 ± 0.168 s/op
SparkParquetWritersFlatDataBenchmark.writeUsingSparkWriter ss 5 4.948 ± 0.188 s/op
Benchmark Mode Cnt Score Error Units
SparkParquetWritersNestedDataBenchmark.writeUsingIcebergWriter ss 5 2.750 ± 0.184 s/op
SparkParquetWritersNestedDataBenchmark.writeUsingSparkWriter ss 5 2.859 ± 0.222 s/op
The code is available in the above PR.
from iceberg.
Is there a plan for how we're going to tackle this? So far I've only seen concrete work in #194. Is there a plan to build out the entire Arrow integration? Perhaps it's appropriate to break down this issue into multiple sub-issues so that PRs can target the sub-issues - then we can merge everything knowing that each piece will be part of a known complete picture?
from iceberg.
I'v added a WIP branch with a working POC for vectorization for primitive types in Iceberg
https://github.com/prodeezy/incubator-iceberg/tree/issue-9-support-arrow-based-reading-WIP
Implementation Notes:
- Iceberg's Reader adds a
SupportsScanColumnarBatch
mixin to instruct the DataSourceV2ScanExec to useplanBatchPartitions()
instead of the usualplanInputPartitions()
. It returns instances ofColumnarBatch
on each iteration. ArrowSchemaUtil
contains Iceberg to Arrow type conversion. This was copied from [3] . Added by @danielcweeks . Thanks for that!VectorizedParquetValueReaders
contains ParquetValueReaders used for reading/decoding the Parquet rowgroups (aka pagestores as referred to in the code)VectorizedSparkParquetReaders
contains the visitor implementations to map Parquet types to appropriate value readers. I implemented the struct visitor so that the root schema can be mapped properly. This has the added benefit of vectorization support for structs, so yay!- For the initial version the value readers read an entire row group into a single Arrow
FieldVector
. this i'd imagine will require tuning for right batch sizing but i'v gone with one batch per rowgroup for now. - Arrow Field Vectors are wrapped using
ArrowColumnVector
which is Spark'sColumnVector
implementation backed by Arrow. This is the first contact point between Spark and Arrow interfaces. - ArrowColumnVectors are stitched together into a
ColumnarBatch
byColumnarBatchReader
. This is my replacement forInternalRowReader
which maps Structs to Columnar Batches. This allows us to have nested structs where each level of nesting would be a nested columnar batch. Lemme know what you think of this approach. - I'v added value readers for all supported primitive types listed in
AvroDataTest
. There's a corresponding test for vectorized reader underTestSparkParquetVectorizedReader
- I haven't fixed all the Checkstyle errors so you will have to turn checkstyle off in build.gradle. Also skip tests while building.. sorry! :-(
P.S. There's some unused code under ArrowReader.java
Ignore this as it's not used. This was from my previous impl of Vectorization. I'v kept it around to compare performance.
Lemme know what folks think of the approach. I'm getting this working for our scale test benchmark and will report back with numbers. Feel free to run your own benchmarks and share.
from iceberg.
A few of us met with @prodeezy to discuss the poc implementation (great work btw) and look at the future work we all hope to be contributing to. I've captured some of the discussion and ideas of where we going and considerations here: https://docs.google.com/document/d/1qVcowrYP6xBoB9C4htwEA0QvbHpdstzieNsX26SMG2k/edit#heading=h.yun6jblu7cfi
Feel free to add comments. I think with a little cleanup we might be close to having an initial implementation that we can start iterating on. @rdblue had proposed creating a branch, which I'm all for so we can work more openly and get more eyes on the implementation.
from iceberg.
Will be using https://github.com/apache/incubator-iceberg/tree/vectorized-read going forward to iterate on this feature.
from iceberg.
Vectorization Perf Meeting notes (Aug 1):
After running benchmarks met with @samarthjain , @anjalinorwood and @rdblue to go over some possible improvements we came up with the following.
Possible low hanging fruit to reap with perf :
- Remove virtual calls in vector reader, make concrete classes (Anjali)
- Remove hasNext() calls and currentDefinitionLevels() checks and tighten the VectorReader.read() loop (Anjali)
- Look into arrow memory allocation, try to preallocate memory at initialization and avoid re-allocations ( Samarth )
- Slowness seen in time spent in FieldVector.setSafe() calls Improve Decimal / String variable byte reading. Make this fixed length based or using binary reading (Gautam)
Deeper look :
- Possibly look into removing advance() calls at every stage in ColumnIterator
- Look into removing triple iterator abstraction and work at lower level with parquet
from iceberg.
Since #828 was merged, I'm going to close this. Let's track the remaining work in the vectorized read milestone.
from iceberg.
Related Issues (20)
- Iceberg Hidden Partitioning and Spark SQL Wide Transformation Optimization
- does iceberg can run on k8s? HOT 6
- REST Catalog to support custom-catalog name like HMS/Glue HOT 9
- Copy-on-Write behaviour with Flink Data Stream API HOT 2
- Iceberg rest catalog with postgres - List namespaces with parent returns wrong children namespaces
- Is the "Emitting watermarks" new feature can't be used in flink sql? HOT 9
- AWS: Updating Glue catalog table removes column descriptions HOT 1
- MinIO + Spark + hive metadata + iceberg format
- byte and short types in spark no longer auto coerce to int32 HOT 7
- AWS: Creating a Glue table with Lake Formation enabled fails HOT 1
- How to reinitialize/refresh iceberg catalog object in spark catalog on an ongoing spark session HOT 1
- Spark: Dropping partition column from old partition table corrupts entire table HOT 7
- Flink/Azure job graph serialization fails when used with storage account shared key authentication HOT 1
- Spark: CDC does not respect when the table is rolled back. HOT 2
- Documentation page returning 404 HOT 1
- Getting storage partitioned join to work HOT 1
- `iceberg-spark-runtime-3.3_2.12-1.5.1` seems to be compiled with a mismatched scala version HOT 16
- Renaming of ConfVars Enums in Apache Hive breaks compatibility of HiveCatalog dependency in Apache Iceberg HOT 8
- org.apache.iceberg.expressions.(Max/Min)Aggregate Don't handle null DataFile.(upper/lower)Bounds() HOT 2
- Geospatial Support HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from iceberg.