Git Product home page Git Product logo

spark-avro's Introduction

Applied patches (more or less exact):

databricks#117 : support for (more) complex unions (still limited though) databricks#89 : GenericRecord to Row conversion https://github.com/databricks/spark-avro/pull/132/files : expose the convertor databricks#130 : fixes for ARRAY conversion databricks#73 : uses namespaces before structs

Disclaimer: this is work in progress, it's a temporary series of fixes and extensions while waiting for spark-2.0 and the dataset API.

Avro Data Source for Apache Spark

A library for reading and writing Avro data from Spark SQL.

Build Status codecov.io

Requirements

This documentation is for Spark 1.4+.

This library has different versions for Spark 1.2, 1.3, and 1.4+:

Spark Version Compatible version of Avro Data Source for Spark
1.2 0.2.0
1.3 1.0.0
1.4+ 2.0.1

Linking

You can link against this library (for Spark 1.4+) in your program at the following coordinates:

Using SBT:

libraryDependencies += "com.databricks" %% "spark-avro" % "2.0.1"

Using Maven:

<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-avro_2.10</artifactId>
    <version>2.0.1</version>
</dependency>

This library can also be added to Spark jobs launched through spark-shell or spark-submit by using the --packages command line option. For example, to include it when starting the spark shell:

$ bin/spark-shell --packages com.databricks:spark-avro_2.10:2.0.1

Unlike using --jars, using --packages ensures that this library and its dependencies will be added to the classpath. The --packages argument can also be used with bin/spark-submit.

This library is cross-published for Scala 2.11, so 2.11 users should replace 2.10 with 2.11 in the commands listed above.

Features

Avro Data Source for Spark supports reading and writing of Avro data from Spark SQL.

  • Automatic schema conversion: It supports most conversions between Spark SQL and Avro records, making Avro a first-class citizen in Spark.
  • Partitioning: This library allows developers to easily read and write partitioned data witout any extra configuration. Just pass the columns you want to partition on, just like you would for Parquet.
  • Compression: You can specify the type of compression to use when writing Avro out to disk. The supported types are uncompressed, snappy, and deflate. You can also specify the deflate level.
  • Specifying record names: You can specify the record name and namespace to use by passing a map of parameters with recordName and recordNamespace.

Supported types for Avro -> Spark SQL conversion

This library supports reading all Avro types, with the exception of complex union types. It uses the following mapping from Avro types to Spark SQL types:

Avro type Spark SQL type
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
bytes BinaryType
string StringType
record StructType
enum StringType
array ArrayType
map MapType
fixed BinaryType

In addition to the types listed above, it supports reading of three types of union types:

  1. union(int, long)
  2. union(float, double)
  3. union(something, null), where something is one of the supported Avro types listed above or is one of the supported union types.

At the moment, it ignores docs, aliases and other properties present in the Avro file.

Supported types for Spark SQL -> Avro conversion

This library supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below:

Spark SQL type Avro type
ByteType int
ShortType int
DecimalType string
BinaryType bytes
TimestampType long
StructType record

Examples

The recommended way to read or write Avro data from Spark SQL is by using Spark's DataFrame APIs, which are available in Scala, Java, Python, and R.

These examples use an Avro file available for download here:

Scala API

// import needed for the .avro method to be added
import com.databricks.spark.avro._
		
val sqlContext = new SQLContext(sc)

// The Avro records get converted to Spark types, filtered, and
// then written back out as Avro records
val df = sqlContext.read.avro("src/test/resources/episodes.avro")
df.filter("doctor > 5").write.avro("/tmp/output")

Alternativly you can specify the format to use instead:

val sqlContext = new SQLContext(sc)
val df = sqlContext.read
	.format("com.databricks.spark.avro")
	.load("src/test/resources/episodes.avro")
	
df.filter("doctor > 5").write
	.format("com.databricks.spark.avro")
	.save("/tmp/output")

You can also specify Avro compression options:

import com.databricks.spark.avro._
val sqlContext = new SQLContext(sc)

// configuration to use deflate compression
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "5")

val df = sqlContext.read.avro("src/test/resources/episodes.avro")

// writes out compressed Avro records
df.write.avro("/tmp/output")

You can write partitioned Avro records like this:

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq((2012, 8, "Batman", 9.8),
	(2012, 8, "Hero", 8.7),
	(2012, 7, "Robot", 5.5),
	(2011, 7, "Git", 2.0))
	.toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("/tmp/output")

You can specify the record name and namespace like this:

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.avro("src/test/resources/episodes.avro")

val name = "AvroTest"
val namespace = "com.databricks.spark.avro"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).avro("/tmp/output")

Java API

import org.apache.spark.sql.*;

SQLContext sqlContext = new SQLContext(sc);

// Creates a DataFrame from a specified file
DataFrame df = sqlContext.read().format("com.databricks.spark.avro")
    .load("src/test/resources/episodes.avro");

// Saves the subset of the Avro records read in
df.filter($"age > 5").write()
	.format("com.databricks.spark.avro")
	.save("/tmp/output");

Python API

# Creates a DataFrame from a specified directory
df = sqlContext.read.format("com.databricks.spark.avro").load("src/test/resources/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("age > 5")
subset.write.format("com.databricks.spark.avro").save("output")

SQL API

Avro data can be queried in pure SQL by registering the data as a temporary table.

CREATE TEMPORARY TABLE episodes
USING com.databricks.spark.avro
OPTIONS (path "src/test/resources/episodes.avro")

Building From Source

This library is built with SBT, which is automatically downloaded by the included shell script. To build a JAR file simply run build/sbt package from the project root.

Testing

To run the tests, you should run build/sbt test. In case you are doing improvements that target speed, you can generate a sample Avro file and check how long it takes to read that Avro file using the following commands:

build/sbt "test:run-main com.databricks.spark.avro.AvroFileGenerator NUMBER_OF_RECORDS NUMBER_OF_FILES"

will create sample avro files in target/avroForBenchmark/. You can specify the number of records for each file, as well as the overall number of files.

build/sbt "test:run-main com.databricks.spark.avro.AvroReadBenchmark"

runs count() on the data inside target/avroForBenchmark/ and tells you how the operation took.

Similarly, you can do benchmarks on how long it takes to write DataFrame as Avro file with

build/sbt "test:run-main com.databricks.spark.avro.AvroWriteBenchmark NUMBER_OF_ROWS"

where NUMBER_OF_ROWS is an optional parameter that allows you to specify the number of rows in DataFrame that we will be writing.

spark-avro's People

Contributors

aarondav avatar bepcyc avatar dbieber avatar jaley avatar jdrit avatar jendap avatar jiekebo avatar joshrosen avatar ksedgwic avatar leahmcguire avatar liancheng avatar markgrover avatar marmbrus avatar mengxr avatar oopsoutofmemory avatar philwills avatar pwendell avatar rxin avatar timyitong avatar vlyubin avatar

Stargazers

 avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.