Git Product home page Git Product logo

go-bqstreamer's Introduction

Kik and me (@oryband) are no longer maintaining this repository. Thanks for all the contributions. You are welcome to fork and continue development.

BigQuery Streamer BigQuery GoDoc

Stream insert data into BigQuery fast and concurrently, using InsertAll().

Features

  • Insert rows from multiple tables, datasets, and projects, and insert them bulk. No need to manage data structures and sort rows by tables - bqstreamer does it for you.
  • Multiple background workers (i.e. goroutines) to enqueue and insert rows.
  • Insert can be done in a blocking or in the background (asynchronously).
  • Perform insert operations in predefined set sizes, according to BigQuery's quota policy.
  • Handle and retry BigQuery server errors.
  • Backoff interval between failed insert operations.
  • Error reporting.
  • Production ready, and thoroughly tested. We - at Rounds (now acquired by Kik) - are using it in our data gathering workflow.
  • Thorough testing and documentation for great good!

Getting Started

  1. Install Go, version should be at least 1.5.
  2. Clone this repository and download dependencies:
  3. Version v2: go get gopkg.in/kikinteractive/go-bqstreamer.v2
  4. Version v1: go get gopkg.in/kikinteractive/go-bqstreamer.v1
  5. Acquire Google OAuth2/JWT credentials, so you can authenticate with BigQuery.

How Does It Work?

There are two types of inserters you can use:

  1. SyncWorker, which is a single blocking (synchronous) worker.
  2. It enqueues rows and performs insert operations in a blocking manner.
  3. AsyncWorkerGroup, which employes multiple background SyncWorkers.
  4. The AsyncWorkerGroup enqueues rows, and its background workers pull and insert in a fan-out model.
  5. An insert operation is executed according to row amount or time thresholds for each background worker.
  6. Errors are reported to an error channel for processing by the user.
  7. This provides a higher insert throughput for larger scale scenarios.

Examples

Check the GoDoc examples section.

Contribute

  1. Please check the issues page.
  2. File new bugs and ask for improvements.
  3. Pull requests welcome!

Test

# Run unit tests and check coverage.
$ make test

# Run integration tests.
# This requires an active project, dataset and pem key.
$ export BQSTREAMER_PROJECT=my-project
$ export BQSTREAMER_DATASET=my-dataset
$ export BQSTREAMER_TABLE=my-table
$ export BQSTREAMER_KEY=my-key.json
$ make testintegration

go-bqstreamer's People

Contributors

avivl avatar nightlyone avatar oryband avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-bqstreamer's Issues

How to know when all inserts are done

Hi, first of all, amazing library.

I have been working with it for a few days and I have seen that MultiStreamer.Stop is not synchronous, leading to the possibility that rows are not imported when the program finishes.

So, is there a way to know when all rows have been inserted? Right now I have a dirty patch to make Stop sync but I'd like to know if there is an official way to do this or is planned to be implemented.

Cheers!

Consider closing error channel as signal to stop

Thanks for the library guys! Found it tonight and its just about exactly what I'm looking for, though I did run into a couple issues while trying to use it that I though I'd ask about addressing before getting too much deeper.

The first is that it doesn't seem possible to know -- after calling stop() -- that the queue is flushed. This is because the errors channel is never closed, so I assume that clients are just supposed to wait some reasonable amount of time after calling stop?

I also wonder whether there's value in providing the two different interfaces (streamer and multi-streamer). It seems like all you really need is 1 "producer", and N "consumers", and the single streamer is just the N=1 case? I ran into this because I was trying to send to a streamer from multiple goroutines, but that requires me to do my own synchonization around streamer. I finally saw the note that you "probably shouldn't use streamer", but that got me wondering whether both of these issues could be addressed by just changing up the interface to always take a "numWorkers" param, and let the client close the requests channel as a stop-signal, then just iterating over the errors channel to know that the queue is drained.

Not sure if that makes any sense (it's past my bedtime and my brain is fried!) but I've tried to lay it out in code here:
http://play.golang.org/p/qBJ5nofG39

Though I obviously understand if you think that deviates too far from your intended use-cases -- just thought I'd float the idea. Thanks anyway -- this looks very useful!

Question: AsyncWorkerGroup.Close()

Hello,

I was wondering how this function makes sure that all remaining messages are flushed. When I send a batch of messages via workerGroup.Enqueue(), and then close the workerGroup, it seems that some messages do not get sent. When the main thread sleeps for a while, the messages do get sent.
Could it be that because of the use of a buffered channel for rowChan, all workers stop while it can still be that some messages are buffered in rowChan?

See https://github.com/rounds/go-bqstreamer/blob/master/async_worker_group.go#L76
and https://github.com/rounds/go-bqstreamer/blob/master/async_worker.go#L47

Getting queue length

At the moment, rows is not exported, so I can't access it to run len() on. I don't mind implementing it myself, but would you prefer it implemented as an interface, or to export rows?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.