Git Product home page Git Product logo

reactive-nakadi's Introduction

Reactive Streams for Nakadi

Build Status Coverage Status

Reactive Streams wrapper for Nakadi is enspired by Reactive Kafka. Reactive-Nakadi is built using Akka, Akka Http and Akka Streams while communicating with Nakadi's low level API. It acts as a consumer client for Nakadi providing an interface for consuming and publishing events, while making use of Akka's backpressure functionality. It also provides offset checkpointing and later to provide high level lease management across multiple partitions.

An important point to note is that it is a library that can only be used in an Akka environment. It currently makes use of Akka version 2.4.2. Reactive-nakadi is not yet compatible with Play version 2.5.x for various backwards compatibility reasons, so for now it will only work with versions 2.4.x.

Note: This project is still under heavy development, and it is likely that soon after release, subsequent releases may not be fully backward compatible. The upcoming release is just in alpha-phase.

##Getting Started

###Installation

Available in Maven Central for Scala 2.11:

libraryDependencies += "org.zalando.reactivenakadi" %% "reactive-nakadi-core" % "0.0.07"

###Usage

Below are a set of examples on how to use reactive-nakadi. For the following examples it is assumed that you have imported and have within scope an implicit instance of an ActorSystem and ActorMaterializer, e.g.:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

implicit val system = ActorSystem("reactive-nakadi")
implicit val materializer = ActorMaterializer()

####Consuming Messages from Nakadi

This example is the simplest use case, a consumer that consumes events from Nakadi:

import org.zalando.react.nakadi.properties._
import org.zalando.react.nakadi.ReactiveNakadi
import org.zalando.react.nakadi.commit.handlers.aws.DynamoDBCommitManager
import org.zalando.react.nakadi.NakadiMessages.{
  BeginOffset, ProducerMessage, StringConsumerMessage, StringProducerMessage
}

val nakadi = new ReactiveNakadi()
val server = ServerProperties(
  host = "192.168.99.100", port = 8080, isConnectionSSL = false, acceptAnyCertificate = true
)

val publisher: Publisher[StringConsumerMessage] = nakadi.consume(ConsumerProperties(
  serverProperties = server,
  tokenProvider = None,
  eventType = "my-streaming-event-type",
  groupId = "experimental-group",
  partition = "0",
  commitHandler = DynamoDBCommitManager(system, CommitProperties.apply),
  offset = None  // If Offset left empty it will read from last commit
))

Source
  .fromPublisher(publisher)
  .map(someProcessingOfMessage)
  .to(Sink.ignore)
  .run()

From the above example you will note a couple of things. First is the optional tokenProvider. Just set it to some form of callable, such as val tokenProvider = Option(() => "my-barer-token").

Second thing you will notice is the commitHandler. This is required, so it is important your application is authenticated with AWS. This is because the provided commit handler, DynamoDBCommitManager relies on AWS DynamoDB. You can authenticate using the AWS environment variables, or ~/.aws/credentials. Take a look at the AWS documentation for more examples.

The offset parameter is optional. If left empty, it will try to read the latest commit from the commit handler. If there are no commits available, it will read from the BEGIN offset value in Nakadi. Alternatively you can set it to Option(Offset("120")) or Option(BeginOffset). Both of which can be imported from import org.zalando.react.nakadi.NakadiMessages.{ BeginOffset, Offset }

Finally the partition value unfortunately needs to be hard coded. This is later to be removed when Lease Management feature is complete. This means that you will need to create multiple instances of a publisher, one for each partition, until the lease management feature is complete.

####Publishing Messages to Nakadi

This example will build on the previous, say we want to consume messages, and then publish the resulting message to another event-type in Nakadi.

import org.zalando.react.nakadi.properties._
import org.zalando.react.nakadi.ReactiveNakadi
import org.zalando.react.nakadi.commit.handlers.aws.DynamoDBCommitManager
import org.zalando.react.nakadi.NakadiMessages.{
  BeginOffset, ProducerMessage, StringConsumerMessage, StringProducerMessage
}

val nakadi = new ReactiveNakadi()
val server = ServerProperties(
  host = "192.168.99.100", port = 8080, isConnectionSSL = false
)

val publisher: Publisher[StringConsumerMessage] = nakadi.consume(ConsumerProperties(
  serverProperties = server,
  tokenProvider = None,
  eventType = "my-streaming-event-type",
  groupId = "experimental-group",
  partition = "0",
  commitHandler = DynamoDBCommitManager(system, CommitProperties.apply),
  offset = None  // If Offset left empty it will read from last commit
))

val subscriber: Subscriber[StringProducerMessage] = nakadi.publish(ProducerProperties(
  serverProperties = server,
  tokenProvider = None,
  eventType = "destination-event-type"
))

Source
  .fromPublisher(publisher)
  .map(someProcessingOfMessage)
  .to(Sink.fromSubscriber(subscriber))
  .run()

####Committing Offsets So say we are consuming messages, but we want to keep track of where we are on the stream, i.e. we want to keep track of the Nakadi offsets. So again, based on our first example we have:

import org.zalando.react.nakadi.properties._
import org.zalando.react.nakadi.ReactiveNakadi
import org.zalando.react.nakadi.commit.handlers.aws.DynamoDBCommitManager
import org.zalando.react.nakadi.NakadiMessages.{
  BeginOffset, ProducerMessage, StringConsumerMessage, StringProducerMessage
}

val nakadi = new ReactiveNakadi()
val server = ServerProperties(
  host = "192.168.99.100", port = 8080, isConnectionSSL = false
)

val publisher: Publisher[StringConsumerMessage] = nakadi.consumeWithOffsetSink(ConsumerProperties(
  serverProperties = server,
  tokenProvider = None,
  eventType = "my-streaming-event-type",
  groupId = "experimental-group",
  partition = "0",
  commitHandler = DynamoDBCommitManager(system, CommitProperties.apply),
  offset = None  // If Offset left empty it will read from last commit
))

Source
  .fromPublisher(publisher.publisher)
  .map(someProcessingOfMessage)
  .to(publisher.offsetCommitSink)
  .run()

This will periodically checkpoint the offset to DynamoDB. By default the commit interval is every 30 seconds, but this can be configured by setting commitInterval in the ConsumerProperties.

It is important to note that the message type StringConsumerMessage is sent all the way through the flow. In other words, in the above example, the someProcessingOfMessage must return the message so that it can be then picked up by the commit sink.

Reactive-nakadi will take care of creating the DynamoDB table if it does not exist. The name will come under the format reactive-nakadi-{event-type}-{groupId}. It will contain a row per partition, on which each "primary key" is the partitionId.

####Reading events Below is an example of reading events from nakadi. You can read either a DataChangeEvent or a BusinessEvent

def read(message: StringConsumerMessage) = {
  message.events.map { event =>
    val dataChangeEvent = event.asInstanceOf[DataChangeEvent]  // Can also be read as instance of BusinessEvent
    println(dataChangeEvent.data_type)
    println(dataChangeEvent.data_op)
    println(dataChangeEvent.data)
    println(dataChangeEvent.metadata)
  }
}

###Tuning

NakadiActorSubscriber and NakadiActorPublisher have their own thread pools, configured in reference.conf. You can tune them by overriding nakadi-publisher-dispatcher.thread-pool-executor and nakadi-subscriber-dispatcher.thread-pool-executor in your own configuration file. Alternatively you can provide your own dispatcher name. It can be passed to appropriate variants of factory methods in ReactiveNakadi: publish(), producerActor(), producerActorProps() or consume(), consumerActor(), consumerActorProps().

##Development

If you are interested in experimenting with source code, just clone the repo. To run the unit tests:

$ sbt clean test

To run integration tests with Nakadi, you need to state which IP address Docker is running on. If on a mac, this can be found out using docker-machine ls. Nakadi is cloned down, and it and all of its dependencies are run in Docker. Two containers are run, local-storages which contains Postgres, Kafka and Zookeeper. The other container is Nakadi itself:

$ export DOCKER_IP=127.0.0.1
$ sbt clean it:test

##TODO

There is still a lot of work to be done on this, the most important one being lease management. Some of the high level outstanding tasks include:

  • Persistence of consumer Cursor. Nakadi plans to support consumer commits in later high level API versions
  • Read checkpoint from cursor commits and continue streaming events from that point for a given partition
  • Lease management (PR avilable here)
    • Refactor DynamoDB for conditional updates
    • Internal validation / keep alive checks for lease manager worker
    • Lease Coordinator to manage worker consumers and lease managers
    • Single registered consumer per topic group
    • DynamoDB tests
  • Configurable connection retries to Nakadi

reactive-nakadi's People

Contributors

akauppi avatar dr4ke616 avatar meln1k avatar regispl avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.