Git Product home page Git Product logo

spring-integration-kafka's Introduction

Spring Integration Kafka Adapter

INTEXT KAFKA

The Spring Integration Kafka extension project provides inbound and outbound channel adapters for Apache Kafka. Apache Kafka is a distributed publish-subscribe messaging system that is designed for high throughput (terabytes of data) and low latency (milliseconds). For more information on Kafka and its design goals, see the Kafka main page.

This 2.0 version is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x.

That project is currently at Release Candidate 1 and its documentation is here.

Quick Start

See the Spring Integration kafka Sample for a simple Spring Boot application that sends and receives messages.

Checking out and building

In order to build the project:

./gradlew build

In order to install this into your local maven cache:

./gradlew install

Spring Integration Kafka project currently supports the following components. Please keep in mind that this is very early stage in development and do not yet fully make use of all the features that Kafka provides.

  • Outbound Channel Adapter

  • Message Driven Channel Adapter

Outbound Channel Adapter:

The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka. The channel is defined in the application context and then wired in the application that sends messages to Kafka. After that, sender applications can publish to Kafka via Spring Integration messages, which are internally converted to Kafka messages by the outbound channel adapter, as follows: the payload of the Spring Integration message will be used to populate the payload of the Kafka message, and (by default) the kafka_messageKey header of the Spring Integration message will be used to populate the key of the Kafka message

The target topic and partition for publishing the message can be customized through the kafka_topic and kafka_partitionId headers, respectively.

Here’s an example for sending a message with an arbitrary payload and the String "key" as value on the test topic.

    final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);

    channel.send(
            MessageBuilder.withPayload(payload)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "key")
                    .setHeader(KafkaHeaders.TOPIC, "test")
					.build()
            );

In addition, the <int-kafka:outbound-channel-adapter> provides the ability to extract the key, target topic, and target partition by applying SpEL expressions on the outbound message. To that end, it supports the mutually exclusive pairs of attributes topic/topic-expression, message-key/message-key-expression, and partition-id/partition-id-expression, to allow the specification of topic,message-key and partition-id respectively as static values on the adapter, or to dynamically evaluate their values at runtime against the request message.

Important
The KafkaHeaders interface (provided by spring-kafka) contains constants used for interacting with headers. The messageKey and topic default headers now require a kafka_ prefix. When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers.messageKey" and topic-expression="headers.topic" on the <int-kafka:outbound-channel-adapter>, or simply change the headers upstream to the new headers from KafkaHeaders using a <header-enricher> or MessageBuilder. Or, of course, configure them on the adapter using topic and message-key if you are using constant values.

NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as:

topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'".

The adapter requires a KafkaTemplate.

Here is an example of how the Kafka outbound channel adapter is configured with XML:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    message-key-expression="'bar'"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>
Note
When using Kafka log compaction, a deleted key is represented by a null value. Since messages can’t have null payloads, when you wish to send a null value to kafka, send a message with a payload of type KafkaNull.

Message Driven Channel Adapter:

The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) uses a spring-kafka KafkaMessageListenerContainer or ConcurrentListenerContainer.

An example of xml configuration variant is shown here:

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg name="topics" value="foo" />
</bean>

See the sample mentioned above for Java @Configuration.

Note
When using Kafka log compaction, a deleted key is represented by a null value. Since messages can’t have null payloads, when such a value is received, it is represented by a payload of type KafkaNull.

Contributing

Pull requests are welcome. Please see the contributor guidelines for details.

spring-integration-kafka's People

Contributors

artembilan avatar az-qbradley avatar bellwethr avatar bjoernhaeuser avatar garyrussell avatar ghillert avatar gmateo avatar heyjustin avatar ilayaperumalg avatar mbogoevici avatar moonkev avatar safetytrick avatar sobychacko avatar spring-builds avatar wilkinsona avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.