Spring Integration Kafka Adapter
The Spring Integration Kafka extension project provides inbound and outbound channel adapters for Apache Kafka. Apache Kafka is a distributed publish-subscribe messaging system that is designed for high throughput (terabytes of data) and low latency (milliseconds). For more information on Kafka and its design goals, see the Kafka main page.
This 2.0 version is a complete rewrite based on the new
spring-kafka project which uses the pure java Producer
and
Consumer
clients provided by Kafka 0.9.x.x.
That project is currently at Release Candidate 1 and its documentation is here.
Quick Start
See the Spring Integration kafka Sample for a simple Spring Boot application that sends and receives messages.
Checking out and building
In order to build the project:
./gradlew build
In order to install this into your local maven cache:
./gradlew install
Spring Integration Kafka project currently supports the following components. Please keep in mind that this is very early stage in development and do not yet fully make use of all the features that Kafka provides.
-
Outbound Channel Adapter
-
Message Driven Channel Adapter
Outbound Channel Adapter:
The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka.
The channel is defined in the application context and then wired in the application that sends messages to Kafka.
After that, sender applications can publish to Kafka via Spring Integration messages, which are internally converted
to Kafka messages by the outbound channel adapter, as follows: the payload of the Spring Integration message will be
used to populate the payload of the Kafka message, and (by default) the kafka_messageKey
header of the Spring
Integration message will be used to populate the key of the Kafka message
The target topic and partition for publishing the message can be customized through the kafka_topic
and kafka_partitionId
headers, respectively.
Here’s an example for sending a message with an arbitrary payload and the String "key"
as value on the test
topic.
final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);
channel.send(
MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, "key")
.setHeader(KafkaHeaders.TOPIC, "test")
.build()
);
In addition, the <int-kafka:outbound-channel-adapter>
provides the ability to extract the key, target topic, and
target partition by applying SpEL expressions on the outbound message. To that end, it supports the mutually exclusive
pairs of attributes topic
/topic-expression
, message-key
/message-key-expression
, and
partition-id
/partition-id-expression
, to allow the specification of topic
,message-key
and partition-id
respectively as static values on the adapter, or to dynamically evaluate their values at runtime against
the request message.
Important
|
The KafkaHeaders interface (provided by spring-kafka ) contains constants used for interacting with
headers.
The messageKey and topic default headers now require a kafka_ prefix.
When migrating from an earlier version that used the old headers, you need to specify
message-key-expression="headers.messageKey" and topic-expression="headers.topic" on the
<int-kafka:outbound-channel-adapter> , or simply change the headers upstream to
the new headers from KafkaHeaders using a <header-enricher> or MessageBuilder .
Or, of course, configure them on the adapter using topic and message-key if you are using constant values.
|
NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
.
The adapter requires a KafkaTemplate
.
Here is an example of how the Kafka outbound channel adapter is configured with XML:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
message-key-expression="'bar'"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
Note
|
When using Kafka log compaction, a deleted key is represented by a null value.
Since messages can’t have null payloads, when you wish to send a null value to kafka, send a message with a payload of type KafkaNull .
|
Message Driven Channel Adapter:
The KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) uses a spring-kafka
KafkaMessageListenerContainer
or ConcurrentListenerContainer
.
An example of xml configuration variant is shown here:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg name="topics" value="foo" />
</bean>
See the sample mentioned above for Java @Configuration
.
Note
|
When using Kafka log compaction, a deleted key is represented by a null value.
Since messages can’t have null payloads, when such a value is received, it is represented by a payload of type KafkaNull .
|
Contributing
Pull requests are welcome. Please see the contributor guidelines for details.