Git Product home page Git Product logo

corral's Introduction

🐎 corral

Serverless MapReduce

Build Status Go Report Card codecov GoDoc

Corral is a MapReduce framework designed to be deployed to serverless platforms, like AWS Lambda. It presents a lightweight alternative to Hadoop MapReduce. Much of the design philosophy was inspired by Yelp's mrjob -- corral retains mrjob's ease-of-use while gaining the type safety and speed of Go.

Corral's runtime model consists of stateless, transient executors controlled by a central driver. Currently, the best environment for deployment is AWS Lambda, but corral is modular enough that support for other serverless platforms can be added as support for Go in cloud functions improves.

Corral is best suited for data-intensive but computationally inexpensive tasks, such as ETL jobs.

More details about corral's internals can be found in this blog post.

Contents:

Examples

Every good MapReduce framework needs a WordCountβ„’ example. Here's how to write a "word count" in corral:

type wordCount struct{}

func (w wordCount) Map(key, value string, emitter corral.Emitter) {
	for _, word := range strings.Fields(value) {
		emitter.Emit(word, "")
	}
}

func (w wordCount) Reduce(key string, values corral.ValueIterator, emitter corral.Emitter) {
	count := 0
	for range values.Iter() {
		count++
	}
	emitter.Emit(key, strconv.Itoa(count))
}

func main() {
	wc := wordCount{}
	job := corral.NewJob(wc, wc)

	driver := corral.NewDriver(job)
	driver.Main()
}

This can be invoked locally by building/running the above source and adding input files as arguments:

go run word_count.go /path/to/some_file.txt

By default, job output will be stored relative to the current directory.

We can also input/output to S3 by pointing to an S3 bucket/files for input/output:

go run word_count.go --out s3://my-output-bucket/ s3://my-input-bucket/*

More comprehensive examples can be found in the examples folder.

Deploying in Lambda

No formal deployment step needs run to deploy a corral application to Lambda. Instead, add the --lambda flag to an invocation of a corral app, and the project code will be automatically recompiled for Lambda and uploaded.

For example,

./word_count --lambda s3://my-input-bucket/* --out s3://my-output-bucket

Note that you must use s3 for input/output directories, as local data files will not be present in the Lambda environment.

NOTE: Due to the fact that corral recompiles application code to target Lambda, invocation of the command with the --lambda flag must be done in the root directory of your application's source code.

AWS Credentials

AWS credentials are automatically loaded from the environment. See this page for details.

As per the AWS documentation, AWS credentials are loaded in order from:

  1. Environment variables
  2. Shared credentials file
  3. IAM role (if executing in AWS Lambda or EC2)

In short, setup credentials in .aws/credentials as one would with any other AWS powered service. If you have more than one profile in .aws/credentials, make sure to set the AWS_PROFILE environment variable to select the profile to be used.

Configuration

There are a number of ways to specify configuraiton for corral applications. To hard-code configuration, there are a variety of Options that may be used when instantiating a Job.

Configuration values are used in the order, with priority given to whichever location is set first:

  1. Hard-coded job Options.
  2. Command line flags
  3. Environment variables
  4. Configuration file
  5. Default values

Configuration Settings

Below are the config settings that may be changed.

Framework Settings

  • splitSize (int64) - The maximum size (in bytes) of any single file input split. (Default: 100Mb)
  • mapBinSize (int64) - The maximum size (in bytes) of the combined input size to a mapper. (Default: 512Mb)
  • reduceBinSize (int64) - The maximum size (in bytes) of the combined input size to a reducer. This is an "expected" maximum, assuming uniform key distribution. (Default: 512Mb)
  • maxConcurrency (int) - The maximum number of executors (local, Lambda, or otherwise) that may run concurrently. (Default: 100)
  • workingLocation (string) - The location (local or S3) to use for writing intermediate and output data.
  • verbose (bool) - Enables debug logging if set to true

Lambda Settings

  • lambdaFunctionName (string) - The name to use for created Lambda functions. (Default: corral_function)
  • lambdaManageRole (bool) - Whether corral should manage creating an IAM role for Lambda execution. (Default: true)
  • lambdaRoleARN (string) - If lambdaManageRole is disabled, the ARN specified in lambdaRoleARN is used as the Lambda function's executor role.
  • lambdaTimeout (int64) - The timeout (maximum function duration) in seconds of created Lambda functions. See AWS lambda docs for details. (Default: 180)
  • lambdaMemory (int64) - The maximum memory that a Lambda function may use. See AWS lambda docs for details. (Default: 1500)

Command Line Flags

The following flags are available at runtime as command-line flags:

      --lambda            Use lambda backend
      --memprofile file   Write memory profile to file
  -o, --out directory     Output directory (can be local or in S3)
      --undeploy          Undeploy the Lambda function and IAM permissions without running the driver
  -v, --verbose           Output verbose logs

Environment Variables

Corral leverages Viper for specifying config. Any of the above configuration settings can be set as environment variables by upper-casing the setting name, and prepending CORRAL_.

For example, lambdaFunctionName can be configured using an env var by setting CORRAL_LAMBDAFUNCTIONNAME.

Config Files

Corral will read settings from a file called corralrc. Corral checks to see if this file exists in the current directory (.). It can also read global settings from $HOME/.corral/corralrc.

Reference the "Configuration Settings" section for the configuration keys that may be used.

Config files can be in JSON, YAML, or TOML format. See Viper for more details.

Architecture

Below is a high-level diagram describing the MapReduce architecture corral uses.

Input Files / Splits

Input files are split byte-wise into contiguous chunks of maximum size splitSize. These splits are packed into "input bins" of maximum size mapBinSize. The bin packing algorithm tries to assign contiguous chunks of a single file to the same mapper, but this behavior is not guaranteed.

There is a one-to-one correspondance between an "input bin" and the data that a mapper reads. i.e. Each mapper is assigned to process exactly 1 input bin. For jobs that run on Lambda, you should tune mapBinSize, splitSize, and lambdaTimeout accordingly so that mappers are able to process their entire input before timing out.

Input data is stramed into the mapper, so the entire input data needn't fit in memory.

Mappers

Input data is fed into the map function line-by-line. Input splits are calculated byte-wise, but this is rectified during the Map phase into a logical split "by line" (to prevent partial reads, or the loss of records that span input splits).

Mappers may maintain state if desired (though not encouraged).

Partition / Shuffle

Key/value pairs emitted during the map stage are written to intermediate files. Keys are partitioned into one N buckets, where N is the number of reducers. As a result, each mapper may write to as many as N separate files.

This results in a set of files labeled map-binX-Y where X is a number between 0 and N-1, and Y is the mapper's ID (a number between 0 and the number of mappers).

Reducers / Output

Currently, reducer input must be able to fit in memory. This is because keys are only partitioned, not sorted. The reducer performs an in-memory per-key partition.

Reducers receive per-key values in an arbitrary order. It is guaranteed that all values for a given key will be provided in a single call to Reduce by-key.

Values emitted from a reducer will be stored in tab separated format (i.e. KEY\tVALUE) in files labeled output-X where X is the reducer's ID (a number between 0 and the number of reducers).

Reducers may maintain state if desired (though not encouraged).

Contributing

Contributions to corral are more than welcomed! In general, the preference is to discuss potential changes in the issues before changes are made.

More information is included in the CONTRIBUTING.md

Running Tests

To run tests, run the following command in the root project directory:

go test ./...

Note that some tests (i.e. the tests of corfs) require AWS credentials to be present.

The main corral has TravisCI setup. If you fork this repo, you can enable TravisCI on your fork. You will need to set the following environment variables for all the tests to work:

  • AWS_ACCESS_KEY_ID: Credentials access key
  • AWS_SECRET_ACCESS_KEY: Credentials secret key
  • AWS_DEFAULT_REGION: Region to use for S3 tests
  • AWS_TEST_BUCKET: The S3 bucket to use for tests (just the name; i.e. testBucket instead of s3://testBucket)

License

This project is licensed under the MIT License - see the LICENSE file for details

Previous Work / Attributions

  • lambda-refarch-mapreduce - Python/Node.JS reference MapReduce Architecture
    • Uses a "recursive" style reducer instead of parallel reducers
    • Requires that all reducer output can fit in memory of a single lambda function
  • mrjob
    • Excellent Python library for writing MapReduce jobs for Hadoop, EMR/Dataproc, and others
  • dmrgo
    • mrjob-inspired Go MapReduce library
  • Zappa
    • Serverless Python toolkit. Inspired much of the way that corral does automatic Lambda deployment
  • Logo: Fence by Vitaliy Gorbachev from the Noun Project

corral's People

Contributors

bcongdon avatar ustiugov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

corral's Issues

Deployment in Lambda

Hello,
Greetings of the day. First of I would like to thank you for sharing your marvelous work. It amazed me and I have just started exploring it.

Though I am facing issues in deploying it in lambda.

I have cloned the source code to my local system. I am also able to upload the word_count.go to a s3 bucket.

But then the command to deploy it in Lambda is not working for me. I am trying the run the command from my source code directory.
Would you please guide exactly what command should i use as per my system.

thanks in advance.
issue1

Buffered writes, local memory reductions, safer input splits

Nice project! I may judiciously emulate this in C++ with Apache Arrow support :)

  1. To avoid write thrashing may want to add an optional buffer to batch writes:

CORRAL_MAP_BUFFER_SIZE=0 by default?

  1. If reduce() is a monoid that actually reduces space you can do a reduce() before writing out to the global map() location.

CORRAL_LOCAL_REDUCE=0 by default?

  1. The input split function needs to be safe in some domains like WordCount where you don't want to split in the middle of a word. I'd support passing a simple tokenier where mappers would read an overlap of K bytes.

For really nasty grammars you can't do it in parallel, context free you can do Valiant 75' parsing via parallel matrix multiplication, and for simpler grammars like parenthesis matching you just need https://en.wikipedia.org/wiki/All_nearest_smaller_values .

Add a post-Map "Combiner" step

An analog of Hadoop's "Combiner" will reduce the amount of data needed to be written to the global map output for certain workloads.

The combiner should have the same interface as reducer -- but will need to be run in the mapper before any output is written

Word count not counting the whole data

I'm running the word count program over a 86GB dataset. The data is utf8, already sanitized with newlines and spaces. I already know that the total words is around 29000M words. But the resulting output of the word count program sums just 86M words. Also, the logs are full of too many requests errors.

How can I debug why the program is not reading the whole input? It is caused by those too many requests errors? Any workaround? Thanks

Add flag for skipping lambda deployment

As discussed in #6, it might be useful to have a flag that skips the compilation/deployment step for Lambda invocations.

Pros:

  • If you have a really complicated job (code wise), it may be annoying/unnecessary to compile for each invocation

Cons:

  • Could pretty easily be a footgun. (i.e. you make code changes that aren't deployed)

This shouldn't be difficult to implement -- just needs a new CLI flag and something in the lambda deployment process that skips compilation/upload under the presence of the flag.

Allow arbitrary record split functions

Is it worth adding the ability to change the record split function?

Currently records are split by newline, and any split function would have to conform to the bufio.SplitFunc signature, so I'm not sure about the utility of this

Passing a value to map functions

Hi,

Your framework is extremely useful for parallel processing. I have been exploring it and using it.

But i was wondering if there is a way to pass some value to every map functions?

Suppose i have a large file that contains lacs of words.

Now i want to pass an input word and check if that word exists in the file and if yes then whats the count of that word.
In this case i need a way to pass that input word to map functions.

ETL example

You mention ETL as a common use case - could you add an example that does something like ETL?

Fail to achieve concurrency

masterFile.txt

Above is the input file for the map. Now I was hoping that it would get split per line. Meaning, there shall be 10 lambda functions running concurrently, considering the input file has 10 new lines.

But it doesn't. Instead, Only a single lambda function is running which executes each line sequentially. Hence it fails to achieve concurrency.

Here is my code:

func main() {
	job := corral.NewJob(wordCount{}, wordCount{})

	options := []corral.Option{
		corral.WithSplitSize(12),
		corral.WithMapBinSize(12),
	}

	driver := corral.NewDriver(job, options...)
	driver.Main()
}

It does show multiple maps (example 1/10, 2/10 etc) while I execute it locally. But when I deploy it to aws lambda, it only executes 1 map/function.

Any kind of help would be really appreciated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.