Git Product home page Git Product logo

elasticsearch-river-kafka's Introduction

Kafka River Plugin for ElasticSearch

The Kafka River plugin allows you to read messages from Kafka and index bulked messages into elasticsearch. The bulk size (the number of messages to be indexed in one request) and concurrent request number is configurable. The Kafka River also supports consuming messages from multiple Kafka brokers and multiple partitions.

The plugin uses the latest Kafka and Elasticsearch version.

  • Kafka version 0.8.1.1
  • Elasticsearch version 1.4.0

The plugin is periodically updated, if there are newer versions of any dependencies. It is available in the ElasticSearch's official website.

Setup

  1. Install Kafka if you are working on local environment (See Apache Kafka Quick Start Guide for instructions on how to Download and Build.)

  2. Install the plugin

cd $ELASTICSEARCH_HOME
.bin/plugin -install <plugin-name> -url https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/download/v1.2.1/elasticsearch-river-kafka-1.2.1-plugin.zip

Example:

cd $ELASTICSEARCH_HOME
.bin/plugin -install kafka-river -url https://github.com/mariamhakobyan/elasticsearch-river-kafka/releases/download/v1.2.1/elasticsearch-river-kafka-1.2.1-plugin.zip

If it doesn't work, clone git repository and build plugin manually.

  • Build the plugin - it will create a zip file here: $PROJECT-PATH/target/elasticsearch-river-kafka-1.2.1-SNAPSHOT-plugin.zip
  • Install the plugin from target into elasticsearch
cd $ELASTICSEARCH_HOME
.bin/plugin --install <plugin-name> --url file:////$PLUGIN-PATH/elasticsearch-river-kafka-1.2.1-SNAPSHOT-plugin.zip

Update installed plugin

cd $ELASTICSEARCH_HOME
./bin/plugin -remove <plugin-name>
./bin/plugin -url file:/$PLUGIN_PATH -install <plugin-name>

Configuration

To deploy Kafka river into Elasticsearch as a plugin, execute:

curl -XPUT 'localhost:9200/_river/<river-name>/_meta' -d '
{
     "type" : "kafka",
     "kafka" : {
        "zookeeper.connect" : <zookeeper.connect>, 
        "zookeeper.connection.timeout.ms" : <zookeeper.connection.timeout.ms>,
        "topic" : <topic.name>,
        "message.type" : <message.type>
    },
    "index" : {
        "index" : <index.name>,
        "type" : <mapping.type.name>,
        "bulk.size" : <bulk.size>,
        "concurrent.requests" : <concurrent.requests>,
        "flush.interval.secs" : <flush.interval.secs>,
        "action.type" : <action.type>
     }
 }'
  • NOTE: Type "kafka" is required and must not be changed. It corresponds the type, given in the source code, by which elasticsearch is able to associate created river with the installed plugin.

Example:

curl -XPUT 'localhost:9200/_river/kafka-river/_meta' -d '
{
     "type" : "kafka",
     "kafka" : {
        "zookeeper.connect" : "localhost", 
        "zookeeper.connection.timeout.ms" : 10000,
        "topic" : "river",
        "message.type" : "json"
    },
    "index" : {
        "index" : "kafka-index",
        "type" : "status",
        "bulk.size" : 100,
        "concurrent.requests" : 1,
        "flush.interval.secs" : 5,
        "action.type" : "index"
     }
 }'

The detailed description of each parameter:

  • river-name (required) - The river name to be created in elasticsearch.

  • zookeeper.connect (optional) - Zookeeper server host. Default is: localhost

  • zookeeper.connection.timeout.ms (optional) - Zookeeper server connection timeout in milliseconds. Default is: 10000

  • topic (optional) - The name of the topic where you want to send Kafka message. Default is: elasticsearch-river-kafka

  • message.type (optional) - The kafka message type, which then will be inserted into ES keeping the same type. Default is: json. The following options are available:

    • json : Inserts json message into ES separating each json property into ES document property. example:

       "_source": {
          "name": "John",
          "age": 28
       }
    • string : Inserts string message into ES as a documet, where the key name is value, and the value is the received message. example:

     "_source": {
          "value": "received text message"
     }
  • index (optional) - The name of elasticsearch index. Default is: kafka-index

  • type (optional) - The mapping type of elasticsearch index. Default is: status

  • bulk.size (optional) - The number of messages to be bulk indexed into elasticsearch. Default is: 100

  • concurrent.requests (optional) - The number of concurrent requests of indexing that will be allowed. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. Default is: 1

  • flush.interval.secs (optional) - The number seconds between flushes to elastic search. Default is: 5 so any remaining messages get flushed to elasticsearch after 5s even if the number of messages has not reached.

  • action.type (optional) - The action type against how the messages should be processed. Default is: index. The following options are available:

    • index : Creates documents in ES with the value field set to the received message.
    • delete : Deletes documents from ES based on id field set in the received message.
    • raw.execute : Execute incoming messages as a raw query.

To delete the existing river, execute:

curl -XDELETE 'localhost:9200/_river/<river-name>/'

Example:

curl -XDELETE 'localhost:9200/_river/kafka-river/'

To see the indexed data:

curl -XGET 'localhost:9200/kafka-index/_search?pretty=1'

To delete the index:

curl -XDELETE 'localhost:9200/kafka-index'

Kafka Consumer details

Currently Consumer Group Java API (high level api) is used to create the Kafka Consumer, which keeps track of offset automatically. This enables the River to read kafka messages from multiple brokers and multiple partitions.

License

Copyright (C) 2014 Mariam Hakobyan. See LICENSE

Contributing

  1. Fork the repository on Github
  2. Create a named feature branch
  3. Develop your changes in a branch
  4. Write tests for your change (if applicable)
  5. Ensure all the tests are passing
  6. Submit a Pull Request using Github

elasticsearch-river-kafka's People

Contributors

mariamhakobyan avatar runesl avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.