Git Product home page Git Product logo

iceberg's Introduction

Iceberg

Slack

Iceberg is a high-performance format for huge analytic tables. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time.

Background and documentation is available at https://iceberg.apache.org

Status

Iceberg is under active development at the Apache Software Foundation.

The Iceberg format specification is stable and new features are added with each version.

The core Java library is located in this repository and is the reference implementation for other libraries.

Documentation is available for all libraries and integrations.

Collaboration

Iceberg tracks issues in GitHub and prefers to receive contributions as pull requests.

Community discussions happen primarily on the dev mailing list or on specific issues.

Building

Iceberg is built using Gradle with Java 8, 11, or 17.

  • To invoke a build and run tests: ./gradlew build
  • To skip tests: ./gradlew build -x test -x integrationTest
  • To fix code style for default versions: ./gradlew spotlessApply
  • To fix code style for all versions of Spark/Hive/Flink:./gradlew spotlessApply -DallVersions

Iceberg table support is organized in library modules:

  • iceberg-common contains utility classes used in other modules
  • iceberg-api contains the public Iceberg API
  • iceberg-core contains implementations of the Iceberg API and support for Avro data files, this is what processing engines should depend on
  • iceberg-parquet is an optional module for working with tables backed by Parquet files
  • iceberg-arrow is an optional module for reading Parquet into Arrow memory
  • iceberg-orc is an optional module for working with tables backed by ORC files
  • iceberg-hive-metastore is an implementation of Iceberg tables backed by the Hive metastore Thrift client
  • iceberg-data is an optional module for working with tables directly from JVM applications

Iceberg also has modules for adding Iceberg support to processing engines:

  • iceberg-spark is an implementation of Spark's Datasource V2 API for Iceberg with submodules for each spark versions (use runtime jars for a shaded version)
  • iceberg-flink contains classes for integrating with Apache Flink (use iceberg-flink-runtime for a shaded version)
  • iceberg-mr contains an InputFormat and other classes for integrating with Apache Hive
  • iceberg-pig is an implementation of Pig's LoadFunc API for Iceberg

NOTE

The tests require Docker to execute. On MacOS (with Docker Desktop), you might need to create a symbolic name to the docker socket in order to be detected by the tests:

sudo ln -s $HOME/.docker/run/docker.sock /var/run/docker.sock

Engine Compatibility

See the Multi-Engine Support page to know about Iceberg compatibility with different Spark, Flink and Hive versions. For other engines such as Presto or Trino, please visit their websites for Iceberg integration details.

Implementations

This repository contains the Java implementation of Iceberg. Other implementations can be found at:

iceberg's People

Contributors

ajantha-bhat avatar amogh-jahagirdar avatar aokolnychyi avatar bryanck avatar chenjunjiedada avatar coneyliu avatar danielcweeks avatar dependabot[bot] avatar dramaticlly avatar findepi avatar flyrain avatar fokko avatar hililiwei avatar jackye1995 avatar jun-he avatar kbendick avatar manuzhang avatar marton-bod avatar nastra avatar openinx avatar rajarshisarkar avatar rdblue avatar russellspitzer avatar samredai avatar shardulm94 avatar singhpk234 avatar snazy avatar stevenzwu avatar szehon-ho avatar wypoon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

iceberg's Issues

Add operation details to snapshots

Snapshots that are append-only can be aged off more aggressively than deletes because all data files must be tracked in the next snapshot. Adding an operation type and a summary to snapshot metadata would enable improvements to metadata cleanup operations.

Another idea from @julienledem:

another interesting operation: new snapshot that contain the same data as the previous one (for example compaction)

File I/O Submodule for TableOperations

In Netflix/iceberg#107 it was discussed that InputFile and OutputFile instances should be pluggable. We discussed the fact that provision of InputFile and OutputFile instances should be handled by the TableOperations API. However, the Spark data source in particular only uses HadoopInputFile#fromPath for reading and HadoopOutputFile#fromPath for writing. Using TableOperations#newInputFile and TableOperations#newOutputFile, would also be difficult because calling these methods on the executors would require TableOperations instances to be Serializable.

We propose having the TableOperations API provide a FileIO module that handles the narrow role of reading, creating / writing, and deleting files. We propose the following:

interface FileIO extends Serializable {
  InputFile newInputFile(String path);
  OutputFile newOutputFile(String path);
  void deleteFile(String path);
}

Then the following method would be added to TableOperations, and we would remove TableOperations#newInputFile and TableOperations#newMetadataFile.

interface TableOperations {
  FileIO fileIo();
  String resolveNewMetadataPath(String metadataFilename);
}

The need for resolveNewMetadataPath is because the new FileIO abstraction considers all locations as full paths, but the old method TableOperations#newMetadataFile assumes the argument is a file name, not a full path. Therefore now callers that used to call TableOperations#newMetadataFile should first retrieve the full path and then pass that along to FileIO#newOutputFile. For convenience we could add a helper default method like so:

interface TableOperations {
  FileIO fileIo();
  String resolveNewMetadataPath(String metadataFilename);
  default OutputFile newMetadataFile(String fileName) {
    return fileIo().newOutputFile(resolveMetadataPath(fileName));
  }
}

DataFile External Identifier Field

A URI locating a file may not be enough for file I/O implementations to construct InputFile and OutputFile instances, as proposed in #12. More specifically, consider a system where a file has some path, but that same path can be namespaced in different contexts. For example, the metadata for that same file can evolve over time, as we discussed in #16.

We propose adding another field called an ExternalIdentifier to the DataFile schema, which is an optional String tag allowing custom Iceberg consumers to look up the file in their system using their own unique identification mechanisms. This would allow such systems to look up the file directly by the identifier in addition to the path.

Alternative representations for the ExternalIdentifier that would allow for richer representations could be a byte blob or a struct with some schema that's stored in the table properties. However those representations can encourage more arbitrary and uncontrolled use of the field which we probably want to avoid. String seems to be the safest option.

Use a pool of metastore clients in HiveTableOperations

I think it would be great to support a pool of metastore clients in HiveTableOperations.

There are a few reasons for that:

Resource Utilization

Currently, we create a new metastore client each time we load a table. This happens because we create a new instance of HiveTableOperations. If we keep producing new clients, our metastore might stop replying. For example, existing Hive tests will break if you share the same metastore among all 3 tests.

Thread-Safety

Currently, HiveTableOperations are not thread-safe. We could solve that via a thread-local client, but the first point will remain.

Allow overriding provision of FileSystem instances to HadoopTableOperations

Util.getFS forces the FileSystem objects to be cached at the JVM level using FileSystem.get. In specific situations, sometimes one would prefer finer-grained control over the caching of these objects.

We propose the following:

  • Add protected FileSystem getFS(Path path, Configuration configuration) in HadoopTableOperations. Default implementation calls Util#getFS.
  • Make HadoopTableOperations public with a protected constructor so that users can use some of the behavior but override specific components, particularly #12 and the FileSystem provision change above.

Add an action to cherry-pick changes in a snapshot and apply them on another snapshot

In an audit workflow, new data is written to an orphan snapshot that is not committed as the table's current state until it is audited. After auditing a change, it may need to be applied or cherry-picked on top of the latest snapshot instead of the one that was current when the audited changes were created.

Iceberg needs to support cherry-picking the changes from an orphan snapshot by applying them to the current snapshot.

Cherry-picking should always apply the exact set of changes that were done in the original commit. If files were deleted, then those files must still exist in the data set. All added files should be added to the new version. Also, if an overwrite by expression replaced 0 files, then there must be no files matching the expression.

Update iceberg-spark to use Hive tables

The Spark integration currently uses path-based tables and the HadoopTables implementation. Now that iceberg-hive is available, Spark should support named tables loaded with HiveTables.

Vectorize reads and deserialize to Arrow

Iceberg does not use vectorized reads to produce data for Spark. For cases where Spark can use its vectorized read path (flat schemas, no evolution) Spark will be faster. Iceberg should solve this problem by adding a vectorized read path that deserializes to Arrow RowBatch. Spark already has support for Arrow data from PySpark.

Include the cost to open a file during split planning

We need to take into account the cost to open a file to avoid straggler tasks. As an example, see how spark.sql.files.openCostInBytes is handled in Spark.

Let's consider a case when we have 500 files with 10 records and 50 files that contain 1000000 entries. Today, Iceberg will group all small files into one task. Consequently, opening those files will become a bottleneck. Locally, Iceberg was 2x times slower than Spark file source (w/o vectorized execution). The test was executed with 4 executors. Iceberg grouped files into 2 tasks. The task with many small files was taking most of the time. As opposed to this, Spark grouped files into 20 bins.

ORC does not use InputFile and OutputFile abstractions

ORC doesn't create input and output streams using InputFile and OutputFile methods. It creates a path from the file location:

path = new Path(file.location());
writer = OrcFile.createWriter(path, options);

This breaks the IO abstraction and ORC will not work without Hadoop.

Implement strict projection in more transforms

Strict projection isn't required and wasn't implemented for several of the partitioning transformations. When strict projection isn't implemented (the projectStrict method returns null) Iceberg will fall back to a safe implementation. For example, residual evaluation will not remove predicates because they cannot be guaranteed to be true, and deletes can't determine that all values in a file match so the file can't be deleted (deletes usually fall back to min/max metrics evaluation).

Implementing strict projection for all transforms where possible will help query efficiency, will make deletes faster, etc.

Truncate stats from Parquet files

Lower and upper bound values from Parquet files are not currently truncated, which takes more space than necessary in manifests. Truncating strings and binary values will probably improve performance for large tables.

Make Iceberg support case insensitivity

Iceberg's current implementation has column case sensitivity, which hinders usability, as most sql users expect case insensitivity by default. While a query like the following will succeed in other Spark Readers, it will fail on Iceberg:

SELECT COUNT(*)
FROM iceTable
WHERE year = 2017
  AND MONTH = 11 -- Notice how MONTH has different casing than other predicates
  AND day = 01

This will fail with a stack trace similar to:

com.google.common.util.concurrent.UncheckedExecutionException: com.netflix.iceberg.exceptions.ValidationException: Cannot find field 'MONTH' in struct: struct<...>
...

PR to solve this issue at iceberg-api level: #82

More PRs to use this new flag to follow.

Publish to Maven Central (or Equivalent)

Now that the project is an Apache project, I'd love to see its artifacts published to a central repository, rather than relying on jitpack.io.

Whether the iceberg-runtime and iceberg-presto-runtime artifacts should be published too is debatable (as they're uber/fat-jars).

ManifestReader is not properly closed in BaseTableScan

It seems ManifestReader is not closed properly in some cases. ManifestReader is CloseableGroup and we add AvroIterable to the list of closeables. However, ManifestReader$close is not called.

One way to reproduce this is to run TestParquetWrite.

[Finalizer] WARN com.netflix.iceberg.hadoop.HadoopStreams - Unclosed input stream created by:
	com.netflix.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.<init>(HadoopStreams.java:78)
	com.netflix.iceberg.hadoop.HadoopStreams.wrap(HadoopStreams.java:53)
	com.netflix.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:125)
	com.netflix.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:95)
	com.netflix.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
	com.google.common.collect.Iterables$6.iterator(Iterables.java:585)
	com.google.common.collect.Iterables$8.iterator(Iterables.java:709)
	com.netflix.iceberg.ManifestReader.iterator(ManifestReader.java:225)
	com.netflix.iceberg.FilteredManifest.iterator(FilteredManifest.java:124)
	com.google.common.collect.Iterables$8.iterator(Iterables.java:709)
	com.google.common.collect.Iterables$3.transform(Iterables.java:508)
	com.google.common.collect.Iterables$3.transform(Iterables.java:505)
	com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48)
	com.google.common.collect.Iterators$5.hasNext(Iterators.java:543)
	com.netflix.iceberg.util.BinPacking$PackingIterator.hasNext(BinPacking.java:91)
	com.netflix.iceberg.io.CloseableIterable$4$1.hasNext(CloseableIterable.java:92)
	com.google.common.collect.Iterators.addAll(Iterators.java:356)
	com.google.common.collect.Lists.newArrayList(Lists.java:147)
	com.google.common.collect.Lists.newArrayList(Lists.java:129)
	com.netflix.iceberg.spark.source.Reader.tasks(Reader.java:209)
	com.netflix.iceberg.spark.source.Reader.planInputPartitions(Reader.java:135)
	org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
...

My suspicion is that we have a problem in BaseTableScan$planFiles. Specifically, this piece:

      ConcurrentLinkedQueue<Closeable> toClose = new ConcurrentLinkedQueue<>();
      Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
          matchingManifests,
          manifest -> {
            ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
            toClose.add(reader);
            String schemaString = SchemaParser.toJson(reader.spec().schema());
            String specString = PartitionSpecParser.toJson(reader.spec());
            ResidualEvaluator residuals = new ResidualEvaluator(reader.spec(), rowFilter);
            return Iterables.transform(
                reader.filterRows(rowFilter).select(SNAPSHOT_COLUMNS),
                file -> new BaseFileScanTask(file, schemaString, specString, residuals)
            );
          });

      if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
        return CloseableIterable.combine(
            new ParallelIterable<>(readers, getWorkerPool()),
            toClose);
      } else {
        return CloseableIterable.combine(Iterables.concat(readers), toClose);
      }

The closure that adds ManifestReader reader to toClose is lazy. Hence, toClose is empty when we return CloseableIterable.combine(Iterables.concat(readers), toClose).

Add LICENSE and NOTICE to binary artifacts

For an ASF release to include binary artifacts, those artifacts need correct LICENSE and NOTICE files. The source LICENSE and NOTICE are correct for Jars built from Iceberg source, but shaded Jars like iceberg-runtime need license documentation that reflect the shaded contents.

Ignore unsupported partition fields

Iceberg may add new transforms to the partition spec. When a transform is not recognized, Iceberg should ignore the field so that the format is forward-compatible with new transforms.

Iceberg should also ignore fields with multiple source columns, in case transforms on multiple columns are added.

Just me or Gradle tests fail for `iceberg-hive` when env variable `HIVE_HOME` is present?

$ hive --version
...
Hive 3.1.1
Git git://daijymacpro-2.local/Users/daijy/commit/hive -r f4e0529634b6231a0072295da48af466cf2f10b7
Compiled by daijy on Tue Oct 23 17:19:24 PDT 2018
From source with checksum 6deca5a8401bbb6c6b49898be6fcb80e

$ ls -las $HIVE_HOME
/usr/local/hive/current -> apache-hive-3.1.1-bin

$ which hive
/usr/local/hive/current/bin/hive

So when running ./gradlew test it starts yielding errors

> Task :iceberg-hive:test

com.netflix.iceberg.hive.HiveTablesTest > testFailure FAILED
    javax.jdo.JDOFatalInternalException at HiveTablesTest.java:43
        Caused by: java.lang.reflect.InvocationTargetException at HiveTablesTest.java:43
            Caused by: org.datanucleus.exceptions.NucleusException at HiveTablesTest.java:43
                Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException at HiveTablesTest.java:43

com.netflix.iceberg.hive.HiveTablesTest > testExistingTableUpdate FAILED
    javax.jdo.JDOFatalInternalException at HiveTablesTest.java:43
        Caused by: java.lang.reflect.InvocationTargetException at HiveTablesTest.java:43
            Caused by: org.datanucleus.exceptions.NucleusException at HiveTablesTest.java:43
                Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException at HiveTablesTest.java:43

com.netflix.iceberg.hive.HiveTablesTest > testConcurrentFastAppends FAILED
    javax.jdo.JDOFatalInternalException at HiveTablesTest.java:43
        Caused by: java.lang.reflect.InvocationTargetException at HiveTablesTest.java:43
            Caused by: org.datanucleus.exceptions.NucleusException at HiveTablesTest.java:43
                Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException at HiveTablesTest.java:43

com.netflix.iceberg.hive.HiveTablesTest > testCreate FAILED
    javax.jdo.JDOFatalInternalException at HiveTablesTest.java:43
        Caused by: java.lang.reflect.InvocationTargetException at HiveTablesTest.java:43
            Caused by: org.datanucleus.exceptions.NucleusException at HiveTablesTest.java:43
                Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException at HiveTablesTest.java:43

4 tests completed, 4 failed

FAILURE: Build failed with an exception.

Workaround:
Just set HIVE_HOME env var to empty string (fix in shell or just override it in code maybe setup method ?)

Support snapshot selection in Spark read options

Spark passes query options from DataFrameReader to the Iceberg source. Iceberg should support selecting a specific snapshot ID or the table state at some time from these options.

This is is needed to read a table snapshot to audit its contents while it is not the current snapshot (write, audit, then publish).

Add external schema mappings for files written with name-based schemas

Files written by Iceberg writers contain Iceberg field IDs that are used for column projection. Iceberg doesn't currently support tracking data files that were written by other systems and added to Iceberg tables with the API because the field IDs are missing. To support files written by non-Iceberg writers, Iceberg could support a table-level mapping from a source schema to Iceberg IDs.

For example, a table with 2 columns might have an Avro schema mapping like this one, encoded as JSON in table properties:

[ {"field-id": 1, "names": ["id"]},
  {"field-id": 2, "names": ["data"]} ]

When reading an Avro file, the read schema would be produced using the file's schema and the field IDs from the mapping. The names in each field mapping is a list to handle aliasing.

Iceberg fails to return results when filtered on complex columns

Sample JSON Data

scala> val json = spark.read.schema(schema).json("people_complex.json").show()

+----+-------+--------------------+
| age|   name|             friends|
+----+-------+--------------------+
|null|Michael|                null|
|  30|   Andy|[Josh -> 10, Bria...|
|  19| Justin|[Bharat -> 15, Gau...|
+----+-------+--------------------+

Json Table Schema

scala> json.printSchema
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- friends: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = true)

Predicate on Map type column works on Parquet format with Vanilla spark reader

scala> json.write.format("parquet").mode("append").save("parquet-people-complex")
scala> val parqDf = spark.read.format("parquet").load("parquet-people-complex")
scala> parqDf.createOrReplaceTempView("people_parquet_complex")

scala> val sqlDf = spark.sql("select * from people_parquet_complex where friends['Josh'] = 10").show()
sqlDf: org.apache.spark.sql.DataFrame = [age: int, name: string ... 2 more fields]
+---+----+--------------------+----------+
|age|name|             friends|  location|
+---+----+--------------------+----------+
| 30|Andy|[Josh -> 10, Gaut...|[101, 100]|
+---+----+--------------------+----------+

Sample data in Iceberg format

scala> json.write.format("iceberg").mode("append").save("iceberg-people-complex")
scala> val iceDf = spark.read.format("iceberg").load("iceberg-people-complex").show()

+----+-------+--------------------+
| age|   name|             friends|
+----+-------+--------------------+
|null|Michael|                null|
|  30|   Andy|[Josh -> 10, Bria...|
|  19| Justin|[Bharat -> 15, Xa...|
+----+-------+--------------------+

Filter on Complex (map) column fails to return result [This is the issue]

scala> iceDf.createOrReplaceTempView("people_iceberg_complex")
scala> spark.sql("select * from people_iceberg_complex where friends['Josh'] = 10").show()
sqlDF: org.apache.spark.sql.DataFrame = [age: int, name: string ... 1 more field]
+---+----+-------+
|age|name|friends|
+---+----+-------+
+---+----+-------+

Split files when planning scan tasks

When building a scan, the TableScan API can plan the files to read (planFiles) or group the files into combined splits (planTasks). Split planning should also split files at the target split size before bin packing to create the final splits.

This relates to adding split locations to the manifest file (row group or stripe offsets). The simple version of this issue is to split at the target split size and then combine, but eventually we want to take the split offsets into account if it does make sense to store them in the manifest file.

Support cryptographic integrity

Parquet encryption protects integrity of individual data files. However, in an untrusted storage, removal of one or more data file in a table might go unnoticed. Replacement of one file contents with another will go unnoticed, unless a user has provided a unique Parquet AAD prefix for each file.

The snapshot integrity mechanism implements cryptographic protection of integrity of data sets comprised of multiple Parquet files.

The mechanism works by creating a small signature file, that contains the table URI / snapshot ID and the number of files. It can also contain an explicit list of file names (with or without full path). The file contents is signed (can be also encrypted, with eg AES GCM).

On the writer side, the mechanism creates AAD prefixes for every data file, and creates the signature file itself. The input is the snapshot URI, N and the encryption/signature key; plus (optionally) the list of file names.

On the reader side, the mechanism parses and verifies the signature file, and provides the framework with the verified table URI / snapshot ID, number of files that must be accounted for, and the Parquet AAD prefix for each file; plus (optionally) the list of file names. The input is the signature file, encryption/signature key and (optionally) the expected table URI /snapshot ID.

This issue was originally posted to the Netflix repository by @ggershinsky.

API for path generation in Iceberg frameworks

In the integrations that Iceberg supports out of the box (Spark, Pig), the frameworks decide how to generate paths for written files. However, some sources would prefer to pick their own paths for new files. Some questions for designing such an API include:

  • Should this be bundled with the FileIO API?
  • Should such a path generation API be concerned about the file's partition values? Partition metadata would be required for the default implementation to maintain existing behavior for Spark.

Add split offsets to manifest files

Instead of storing a single HDFS block size for each data file, Iceberg should store a list of split offsets. That will allow split planning to be more precise by using row group or stripe offsets, without reading file footers.

Add support for nested struct field based filter expressions in Iceberg

I tried testing struct filter pushdowns in Iceberg by applying these dependent code changes viz.

  1. Spark pr for struct pushdown
  2. Iceberg writers for Parquet
  3. Changes to Metrics collection to add struct metrics in Iceberg

Iceberg rejects it with this validation error:

Caused by: com.netflix.iceberg.exceptions.ValidationException: Cannot find field 'location.lat' in struct: struct<1: age: optional int, 2: name: optional string, 3: friends: optional map<string, int>, 4: location: optional struct<7: lat: optional double, 8: lon: optional double>>
  at com.netflix.iceberg.exceptions.ValidationException.check(ValidationException.java:42)
  at com.netflix.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:76)
  at com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:138)
  at com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.predicate(Projections.java:94)
  at com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:147)
  at com.netflix.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:160)
  at com.netflix.iceberg.expressions.Projections$BaseProjectionEvaluator.project(Projections.java:108)
  at com.netflix.iceberg.expressions.InclusiveManifestEvaluator.<init>(InclusiveManifestEvaluator.java:57)
  at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:153)
  at com.netflix.iceberg.BaseTableScan$1.load(BaseTableScan.java:149)
  at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
  at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
  at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
  at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)

Test Gist : https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac

Based on discussions on dev mailing-list and Issue#78 we want to be able to support nested struct filtering in Iceberg. Although for now we want to avoid mixed fields like struct inside map or struct inside array as that changes the semantics of the expression For example, a.b = 5 can be run on a: struct<b: int> but can't be run on a: list<struct<b: int>>.

Issue#78 focusses on adding the metrics in Iceberg for struct fields, this issue is to address the expression handling for the same once the former is available.

/cc @aokolnychyi @rdblue

IcebergSource cannot load data from hiveTable

While testing HiveTable read using Iceberg DataSource API, looks like, its not possible.

read like this:
sparkSession.read().format("iceberg").load("db.tbl")

as the findTable(...) method in IcebergSource uses HadoopTables:

protected Table findTable(DataSourceOptions options, Configuration conf) {
   Optional<String> location = options.get("path");
   Preconditions.checkArgument(location.isPresent(),
       "Cannot open table without a location: path is not set");

   HadoopTables tables = new HadoopTables(conf);

   return tables.load(location.get());
 }

May be Im missing something and didnt understand the flow.

Add in and notIn predicates

Currently, set inclusion is implemented using a tree of equals predicates joined with or predicates. It would be much more efficient to add support for in and notIn predicates.

Run checkstyle in CI

I noticed that it's possible to make a formatting error, such as disorganize imports or change indentation, without ./gradlew check noticing. This issue proposes adding more rigorous checkstyle and code linting integration to this project.

An idea is to use the baseline toolkit, but we can also write the linting configuration by hand.

ParquetMetrics computes wrong lower/upper bounds for small decimals

It seems ParquetMetrics computes wrong lower/upper bounds for decimals that are represented as int64 in Parquet. Consequently, Iceberg can filter out data files that contain relevant data for a query.

Create a Parquet file with one row:

    Schema schema = new Schema(
      optional(1, "decimalCol", Types.DecimalType.of(10, 2))
    );

    GenericData.Record record = new GenericData.Record(AvroSchemaUtil.convert(schema.asStruct()));
    record.put(0, new BigDecimal("3.50"));

    File parquetFile = newParquetFile();
    try (FileAppender<GenericData.Record> writer = Parquet.write(localOutput(parquetFile))
        .schema(schema)
        .build()) {
      writer.add(record);
    }

Explore this file using parquet-tools (min/max values are correct):

file schema: table 
--------------------------------------------------------------------------------
decimalCol:  OPTIONAL INT64 O:DECIMAL R:0 D:1

row group 1: RC:1 TS:75 OFFSET:4 
--------------------------------------------------------------------------------
decimalCol:   INT64 GZIP DO:0 FPO:4 SZ:91/75/0.82 VC:1 ENC:BIT_PACKED,PLAIN,RLE ST:[min: 3.50, max: 3.50, num_nulls: 0]

Create an Iceberg table:

    String tableLocation = newTableLocation();
    Configuration conf = new Configuration();
    HadoopTables hadoopTables = new HadoopTables(conf);
    Table table = hadoopTables.create(schema, tableLocation);

Append the Parquet file to the table:

    DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned())
      .withFileSizeInBytes(parquetFile.length())
      .withPath(parquetFile.toString())
      .withMetrics(fromInputFile(localInput(parquetFile)))
      .build();

    table.newAppend().appendFile(dataFile).commit();

Query the table via Spark:

    Dataset<Row> df = spark.read()
      .format("iceberg")
      .load(tableLocation);

    df.show();
    +----------+
    |decimalCol|
    +----------+
    |      3.50|
    +----------+
    df.where("decimalCol = 3.50").show();
    +----------+
    |decimalCol|
    +----------+
    +----------+
    df.where("decimalCol = CAST(3.50 AS DECIMAL(10,2))").show();
    +----------+
    |decimalCol|
    +----------+
    +----------+

This happens because ParquetMetrics uses ParquetConversions$fromParquetPrimitive, which assumes that decimals are always represented as binary.

In this particular case, org.apache.parquet.column.statistics.Statistics$genericGetMin will return a long value.

overwrite SaveMode support for Spark write API

Currently overwrite SaveMode fails with "Save mode overwrite is not supported".

Overwrite is a very common use case for ETLs and it is also needed to implement compaction with iceberg.

Encryption in Data Files

We want to support encrypting and decrypting data that is recorded in Iceberg tables. There are several API extensions that we can consider to make this work:

  • Define a KeyReference field, which is a byte blob in the DataFile object. A KeyReference is a pointer to a key.
  • Define an EncryptionKey which is a composition of the key bytes, the iv, and the key algorithm (see e.g. here and here)
struct EncryptionKey {
    byte[] encodedKey();
    String keyAlgorithm();
    byte[] iv();
}
  • Define a KeyManager which manages creating new keys and retrieving keys based on key references. The TableOperations API should support returning an Optional<KeyManager>; return Optional.empty() if the table operations doesn't support encryption.
struct CreatedKey {
    EncryptionKey key();
    byte[] keyReference();
}

interface KeyManager {
    CreatedKey createKey(String pathToEncrypt);
    EncryptionKey getKey(byte[] reference);
}

Additional Metrics and Statistics

I think iceberg can benefit a lot from the metrics in the data_file like the deprecated distinct_countsmetricand additional metrics like mean, standard deviation, skewness etc.

This will allow smarter guarding when iterating over partition spec to avoid committing problematic data_file to the new snapshot which is not on par with expectations.

Custom metadata in data files

(Migrated from Netflix/iceberg#106 with some extra details added)

It would be useful for consumers of Iceberg tables to be able to specify additional metadata in data files that enable them to know how to read the files. Some examples of custom metadata include:

  • Encryption keys required to read the file,
  • Compression codecs specified on the file without needing to have a specific file extension,
  • Metadata that's specific to a custom file format. Suppose we supported CSV tables in Iceberg down the road. It would be nice to attach the column delimiter on a per-file basis so that a table can be comprised of multiple files that may not necessarily be uniform in terms of the exact layout, but have compatible schemas.

The custom metadata field should be of type Map<String, String> and can be an optional column.

Finally, consider the I/O submodule proposed in #12. In FileIO there, the newOutputFile API should also return custom metadata specific to reading that file after it's written. Thus FileIO#newOutputFile should return a struct containing an OutputFile object for writing the bytes and a Map<String, String> collection of metadata to be saved in the manifest after the data file is written. Similarly, FileIO#newInputFile should also pass custom metadata for reading that file that can be interpreted by the specific implementation of FileIO.

Spark based functional test-cases

On the google groups, saw many users ask about the usage of Iceberg. ( Datasource , Table API etc.)
So think this would be useful for us to test and new users to understand how to use Iceberg for different scenarios.
So, wrote few functional test-cases to test using spark and iceberg data-source and Table API's.

In general flow is like this:

  1. read local resources/ file.
  2. convert it to RDD by running mapper function, and converting epoch to day time.
  3. write RDD/DF to local folder as parquet data partitioned by event_date.
  4. create or get local iceberg table.
  5. add partitions/files/RDD to iceberg table using Iceberg's Table APIs / Datasource.
  6. read data from Parquet.
  7. read data from iceberg table.
  8. compare results.

Add an API to maintain external schema mappings

Once Iceberg supports external schema mappings (#40), it should also support an easy way to maintain those mappings by notifying Iceberg when an external schema changes. Iceberg would update its mapping when notified.

For example, starting with this mapping:

[ {"field-id": 1, "names": ["id"]},
  {"field-id": 2, "names": ["data"]} ]

Consider a new Avro schema registered that changes the name id to obj_id and adds a ts field. Iceberg would add an un-mapped entry for ts and add obj_id to the id mapping based on the Avro schema's field alias that indicates id and obj_id are the same field. The updated mapping would be:

[ {"field-id": 1, "names": ["obj_id", "id"]},
  {"field-id": 2, "names": ["data"]},
  {"names": ["ts"]} ]

Next, if the Iceberg table schema is updated to add ts, the mapping would be updated by matching the new Iceberg column to the unmatched mapping entry to produce this mapping:

[ {"field-id": 1, "names": ["obj_id", "id"]},
  {"field-id": 2, "names": ["data"]},
  {"field-id": 3, "names": ["ts"]} ]

This would maintain compatibility with new Avro data files without making changes to the Iceberg table other than the mapping. Columns can be added in Iceberg or Avro first and the mapping is completed by column name when it is added in both schemas.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.