Git Product home page Git Product logo

kafka-proxy's Issues

Add support for Kafka 2.8


Thanks for the great project.
I've noticed that using kafka 2.7.1 CLI clients (e.g. works nicely with kafka-proxy, while using version 2.8.0 returns the following error in the kafka-proxy log:

{"@level":"info","@message":"Reading data from kafka-0.kafka:9092 had error: Unsupported response schema version 11 for key 3 ","@timestamp":"2021-07-08T10:43:21Z"}

Kafka broker version: 2.8.0
Kafka client: 2.8.0
kafka-proxy: 0.2.8

Create request/response

i thought about creating request/response programatically with Struct and Schema, i think it could be useful for testing purposes as well for creating responses nicely
I had these problems:

  1. Struct doesn't accept Schema, it just accepts *schema and NewSchema returns Schema, i created new constructor NewSchemaStruct which returns *schema and created request:
	schemaTopicMetadata := NewSchemaStruct("topic_metadata_v0",
		&field{name: "key", ty: typeInt16},
		&field{name: "version", ty: typeInt16},
		&field{name: "correlation_id", ty: typeInt32},
		&field{name: "client_id", ty: typeStr},
		&array{name: "topic", ty: typeStr},

	topicMetadata := Struct{
		schema: schemaTopicMetadata,
		values: make([]interface{}, 0),

	topicMetadata.values = append(topicMetadata.values, int16(0))
	topicMetadata.values = append(topicMetadata.values, int16(0))
	topicMetadata.values = append(topicMetadata.values, int32(0))
	topicMetadata.values = append(topicMetadata.values, "custom")
	arr := make([]interface{}, 0)
	arr = append(arr, "topic1")
	arr = append(arr, "topic2")
	topicMetadata.values = append(topicMetadata.values, arr)

	encoded, err := EncodeSchema(&topicMetadata, schemaTopicMetadata)
  1. Second problem, when i have schema created already i cannot retrieve schema of some of it's fields e.g.:
        type FetchRequest_v0 struct {}
        func (*f FetchRequest_v0) getSchema() Schema {
	partitionSchema := NewSchema("partition_schema",
		&field{name: "partition", ty: typeInt32},
		&field{name: "fetch_offset", ty: typeInt64},
		&field{name: "partition_max_bytes", ty: typeInt32},

	topicSchema := NewSchema("topic_schema",
		&field{name: "topic_name", ty: typeStr},
		&array{name: "partitions", ty: partitionSchema},

	schemaFetch := NewSchema("fetch_v0",
		&field{name: "key", ty: typeInt16},
		&field{name: "version", ty: typeInt16},
		&field{name: "correlation_id", ty: typeInt32},
		&field{name: "client_id", ty: typeStr},
		&field{name: "reaplica_id", ty: typeInt32},
		&field{name: "max_wait_time", ty: typeInt32},
		&field{name: "min_bytes", ty: typeInt32},
		&array{name: "topics", ty: topicSchema},

        return schemaFetch

       req := &FetchRequest_v0{}
       topicSchema := req.GetSchema().fieldsByName["topics"].def.ty - ???? (cannot use this because in Schema constructor 
we assert fields as Fields in NewSchema - Fields doesn't have method for schema gathering)
	someStruct := Struct{
		schema: topicSchema,
		values: make([]interface{}, 0),
        ....add some values to someStruct
  1. We could solve problem when Struct would accept Schema but then we would need to add additional methods to Schema interface for getting fieldByName and fields - this occurs on quite many places in code
  2. to solve this we could add additional method to Field interface - e.g. GetSchema and implement on array, field etc....types - this would be quite non-intrusive
  3. i would like to ask also about EncodeSchema function, requires Struct (which itself contains schema) and also Schema (which practically contains same info as schema in Struct) it is duplication of information, not sure if it is needed

I can create pull requests but i would like to discuss it first as i don't know the code so deeply

Does the proxy support rotation of certificates?

We have a requirement for short-lived client certs, unfortunately the Kafka Java clients do not support hot-swapping of certs so this requires us to restart the consumers which subsequently trigger re-balances of the consumer group.

I was looking into a proxy option and wondered if such a setup would be possible with kafka-proxy? Feel like there must be others with this issue and this could be a great use for the proxy.

It looks like there is a PR for librdkafka to achieve this for clients which use this lib, though this is not yet merged.

Kafka-proxy for Auditing


I have an auditing/compliance requirement to generate log for auditing for my Kafka Broker. As my service provider doesn't provide it, I'm evaluate to use Kafka-proxy to achieve it.

In my requirements, if I'm able to add username information from SASL and client IP on debug log, something like that:

logrus.Debugf("[%s] Kafka request key %v, version %v, length %v", ctx.username, requestKeyVersion.ApiKey, requestKeyVersion.ApiVersion, requestKeyVersion.Length)

My broker uses SASL PLAIN.

I'm trying to change the code to extract SASL packages on Kafka's API KEY 17 on handleRequest but I cannot re-read src DeadlineReaderWriter, right?

Any ideia/recommendation? I'd love to contribute to the project, besides I'm not a go developer ;-)

Kafka Authorization


i am thinking of authorizing kafka with kafka-proxy, would it be possible to pass kafka message types to plugins?

Proxy support for outgoing HTTP/HTTPS connections

Retrieval of google certificates (google-id-token) should go through the proxy server.
The implementation should be similar to Java Proxy settings:
a. http.proxyHost, http.proxyPort
b. htttps.proxyHost, https.proxyPort
c. socksProxyHost, socksProxyPort

had error: api key -10495 is invalid

I keep getting "had error: api key -10495 is invalid" when I try to proxy a Kafka cluster with SASL. Is this because the Kafka broker version is too old?
I'm using SASL plain with TLS enabled and the handshake is successful.
Could someone help?

TLS Handshake Error with auth-ldap

Hi @gustavomcarmo

I tried your suggestion and tests with TLS and auth-ldap. All works fine without TLS but I'am getting a handshake error. Not sure what is missing.

Here are my details in case you can catch it for us (attaching error.txt)

Any help is highly appreciated. Thanks.


Error: Executing consumer group command failed due to org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.


Any ideia why I got:

Error: Executing consumer group command failed due to org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:402)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:401)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:417)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:312)
	at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
	at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

Running bin/kafka-consumer-groups --bootstrap-server proxy:9092 --all-groups --all-topics --describe --verbose and I not have error when running directly on my broker?

Thanks a lot!

QUESTION: So what config would I use to simply pass through SASL?

Given the answer to a previous question - that SASL will get passed through by default - does this mean that i configure my java client with the SASL properties for the brokers and it will get passed transparently through the kafka proxy when i use the kafka-proxy as my bootstrap-servers?

I've been fiddling around with the settings to achieve this and i'm up to the point where i'm getting net address mapping not faund for my brokers which appear to have been renamed to b2- (i.e. for the config below). Do i need to configure something in external-server-mappings perhaps - i don't actually understand what external-server-mappings actually does - I originally thought it might be instead of bootstrap-server-mapping where the kafka broker is on a host that's not the same as the one the kafka-proxy is on, but that doesn't appear to be so.

I'm presuming some kind of dynamic proxy is set up on the kafka-proxy and the path is client -> kafka-proxy -> kafka-proxy dynamic proxy -> kafka-brokers?

I think perhaps I'm a little confused.


my java client is configured:

sasl.mechanism=PLAIN required username\="someusername" password\="somepassword";

and my kafka-proxy is run as:

kafka-proxy server \
	--bootstrap-server-mapping "," \
	--tls-enable \
	--tls-insecure-skip-verify \
	--proxy-listener-tls-enable \
	--proxy-listener-key-file=/root/dev-kafka-server.private.key \
	--proxy-listener-cert-file=/root/dev-kafka-server.signed.cer \

Error's I'm seeing are:

INFO[2018-10-08T17:17:51+13:00] New connection for 
INFO[2018-10-08T17:17:51+13:00] Reading data from had error: net address mapping for was not found 

write a message and kafka report EOFException

I have a kafka(v2.5.0) cluster(3 brokers ).I start up kafka-proxy(v0.2.8) in another machine,And send a message by kafka-console-producer ,the message does not write into topic,Kafka throw a


./ --broker-list proxyip:32400,proxyip:32401,proxyip:32402 --topic test

kafka-console-producer log

[2020-11-15 05:41:34,272] WARN [Producer clientId=console-producer] Connection to node 3 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,371] WARN [Producer clientId=console-producer] Connection to node 1 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,472] WARN [Producer clientId=console-producer] Connection to node 1 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,573] WARN [Producer clientId=console-producer] Connection to node 3 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,675] WARN [Producer clientId=console-producer] Connection to node 2 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,776] WARN [Producer clientId=console-producer] Connection to node 1 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,876] WARN [Producer clientId=console-producer] Connection to node 2 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,977] WARN [Producer clientId=console-producer] Connection to node 2 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:35,078] WARN [Producer clientId=console-producer] Connection to node 1 (/ could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)


kafka-proxy server \
--bootstrap-server-mapping "broker1:9093," \
--bootstrap-server-mapping "broker2:9093," \
--bootstrap-server-mapping "broker3:9093," \
--sasl-enable \
--sasl-username "admin" \
--sasl-password "admin" \
--log-level debug

kafka-proxy log

INFO[2020-11-15T05:41:32-05:00] New connection for broker3:9093        
DEBU[2020-11-15T05:41:32-05:00] Sending SaslHandshakeRequest mechanism: PLAIN  version: 0 
DEBU[2020-11-15T05:41:32-05:00] Successful SASL handshake. Available mechanisms: [PLAIN] 
DEBU[2020-11-15T05:41:32-05:00] Sending authentication opaque packets, mechanism PLAIN 
DEBU[2020-11-15T05:41:33-05:00] Kafka request key 18, version 3, length 52   
DEBU[2020-11-15T05:41:33-05:00] Kafka response key 18, version 3, length 348 
DEBU[2020-11-15T05:41:33-05:00] Kafka request key 3, version 9, length 32    
DEBU[2020-11-15T05:41:33-05:00] Kafka response key 3, version 9, length 112  
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker1:9093, listener=, advertised= 
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker2:9093, listener=, advertised= 
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker3:9093, listener=, advertised= 
INFO[2020-11-15T05:41:37-05:00] Client closed local connection on from broker3:40750 (broker3:9093)

kafka log

[2020-11-15 05:34:46,830] DEBUG Processor 3 listening to new connection from /proxyip:54714 (
[2020-11-15 05:34:46,830] DEBUG Accepted connection from /proxyip:54714 on /broker1:9093 and assigned it to processor 3, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (
[2020-11-15 05:34:46,830] DEBUG for mechanism=PLAIN: 0 (
[2020-11-15 05:34:46,830] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (
[2020-11-15 05:34:46,830] DEBUG Handling Kafka request SASL_HANDSHAKE during authentication (
[2020-11-15 05:34:46,830] DEBUG Using SASL mechanism 'PLAIN' provided by client (
[2020-11-15 05:34:46,843] DEBUG Set SASL server state to AUTHENTICATE during authentication (
[2020-11-15 05:34:46,843] DEBUG Authentication complete; session max lifetime from broker config=0 ms, no credential expiration; no session expiration, sending 0 ms to client (
[2020-11-15 05:34:46,843] DEBUG Set SASL server state to COMPLETE during authentication (
[2020-11-15 05:34:46,843] DEBUG [SocketServer brokerId=1] Successfully authenticated with /proxyip (
[2020-11-15 05:34:46,854] DEBUG [SocketServer brokerId=1] Connection with /proxyip disconnected (

thanks fo any help

Example using kubernetes service type: NodePort


What is the appropriate configuration for exposing kafka-proxy over a NodePort service? I've looked at #30 and #50 but have been unable to get it working.

Both kafka-proxy and kafka are running inside kubernetes (using kind). I am using strimzi for deploying kafka.


Kerberos authentication to Kafka brokers

Looks like it's not possible for a client behind kafka-proxy or kafka-proxy itself to authenticate themselves to Kafka brokers with Kerberos credentials? How difficult could it be to support this use case? I can see the underlying 3rd-party Sarama library supports Kerberos authentication.

Also, would you happen to know if there exists a workaround to achieve the subj with the current version?

Thanks for doing a great job.

Proxy initiated authentication with "dynamic" SASL credentials?


I have a scenario where I would like kafka-proxy to initiate SASL PLAIN with a kafka broker using credentials passed in from the client.

Proposed flow:

  • client does mutual TLS with kafka-proxy
  • Client TLS cert would contain the SASL credentials embedded (Probably in the CNAME)
  • kafka-proxy would have a new flag to enable this passthrough route
  • kafka-proxy parses the credentials out of the client TLS cert and uses those to initiate SASL

Looking at the code I think the most relevant changes could be done in client.go ( I realize other parts of the code would need to be changed as well):

  • handleConn would parse the credentials from the x509 Certificate
    • create a new SASLPlainAuth struct from Client.saslAuthByProxy and copy the username/password extracted from the TLS cert
  • DialAndAuth and auth() would be modified to take in this "dynamic" SASLPlainAuth struct and call sendAndReceiveSASLAuth on that same struct to connect to the broker

Would you be open to supporting this pattern? If so we are more than happy to contribute the needed changes.

SASL SCRAM support

We have been using the proxy with great success to date using both PLAINTEXT and SSL endpoints. We are now wanting to do SASL authentication with SCRAM (our hosting provider uses SCRAM256 rather than PLAIN).

As best I can tell kafka-proxy currently doesn't have support for this.

I believe I will need to add a *Auth struct in sasl_by_proxy.go to handle the SCRAM portion as well as providing some command line parameters.

Any chance someone has worked on this already and has some code sitting around? If not I will likely get started hacking away.

Errors you may encounter when upgrading the library

(The purpose of this report is to alert grepplabs/kafka-proxy to the possible problems when grepplabs/kafka-proxy try to upgrade the following dependencies)

An error will happen when upgrading library prometheus/client_golang:

-Latest Version: v1.7.1 (Latest commit fe7bd95 11 days ago)
-Where did you use it:

require ( v1.0.1 v2.1.1
go 1.11

package prometheus
import (

This problem was introduced since prometheus/client_golang v1.2.0(committed 9a2ab94 on 16 Oct 2019) .Now you used version v0.8.0. If you try to upgrade prometheus/client_golang to version v1.2.0 and above, you will get an error--- no package exists at ""

I investigated the libraries (prometheus/client_golang >= v1.2.0) release information and found the root cause of this issue is that----

  1. These dependencies all added Go modules in the recent versions.

  2. They all comply with the specification of "Releasing Modules for v2 or higher" available in the Modules documentation. Quoting the specification:

A package that has migrated to Go Modules must include the major version in the import path to reference any v2+ modules. For example, Repo migrated to Modules on version v3.x.y. Then this repo should declare its module path with MAJOR version suffix "/v3" (e.g., module, and its downstream project should use "" to import this repo’s package.

  1. This "" is not the physical path. So earlier versions of Go (including those that don't have minimal module awareness) plus all tooling (like dep, glide, govendor, etc) don't have minimal module awareness as of now and therefore don't handle import paths correctly See golang/dep#1962, golang/dep#2139.

Note: creating a new branch is not required. If instead you have been previously releasing on master and would prefer to tag v3.0.0 on master, that is a viable option. (However, be aware that introducing an incompatible API change in master can cause issues for non-modules users who issue a go get -u given the go tool is not aware of semver prior to Go 1.11 or when module mode is not enabled in Go 1.11+).
Pre-existing dependency management solutions such as dep currently can have problems consuming a v2+ module created in this way. See for example dep#1962.


1. Migrate to Go Modules.

Go Modules is the general trend of ecosystem, if you want a better upgrade package experience, migrating to Go Modules is a good choice.

Migrate to modules will be accompanied by the introduction of virtual paths(It was discussed above).

This "" is not the physical path. So Go versions older than 1.9.7 and 1.10.3 plus all third-party dependency management tools (like dep, glide, govendor, etc) don't have minimal module awareness as of now and therefore don't handle import paths correctly.

Then the downstream projects might be negatively affected in their building if they are module-unaware (Go versions older than 1.9.7 and 1.10.3; Or use third-party dependency management tools, such as: Dep, glide, govendor…).

2. Maintaining v2+ libraries that use Go Modules in Vendor directories.

If grepplabs/kafka-proxy want to keep using the dependency manage tools (like dep, glide, govendor, etc), and still want to upgrade the dependencies, can choose this fix strategy.
Manually download the dependencies into the vendor directory and do compatibility dispose(materialize the virtual path or delete the virtual part of the path). Avoid fetching the dependencies by virtual import paths. This may add some maintenance overhead compared to using modules.

As the import paths have different meanings between the projects adopting module repos and the non-module repos, materialize the virtual path is a better way to solve the issue, while ensuring compatibility with downstream module users. A textbook example provided by repo is here:
In the vendor directory, adds the /vN subdirectory in the corresponding dependencies.
This will help more downstream module users to work well with your package.

3. Request upstream to do compatibility processing.

The prometheus/client_golang have 1048 module-unaware users in github, such as: AndreaGreco/mqtt_sensor_exporter, seekplum/plum_exporter, arl/monitoring…


You can make a choice when you meet this DM issues by balancing your own development schedules/mode against the affects on the downstream projects.

For this issue, Solution 1 can maximize your benefits and with minimal impacts to your downstream projects the ecosystem.


Do you plan to upgrade the libraries in near future?
Hope this issue report can help you ^_^
Thank you very much for your attention.

Best regards,

Client certificates check between proxy client and brokers client


Here is my use case. I need to proxy AWS MSK (Managed Service Kafka) in order to connect it from any network (MSK connectivity options are bit limited as of now). MSK cluster is created with mutual TLS, this is because it is multitenant and each tenant has its own client cert. Client certs for all tenants are signed by same CA (CA certificate monthly fee is pretty high so this is cost optimization). I have one proxy group for each tenant and still want to maintain TLS between proxy client and proxy.

Kafka-proxy offers a possibility to terminate mutual TLS on proxy and also to start mutual TLS on proxy so those two capabilities can be used together in order to make sure that:

  1. Client connects with client cert
  2. Client credentials are propagated to server
    I would like proxy to terminate client TLS and start TLS with same certificate

I can start TLS on proxy with selected certificate (tls-client-cert-file) so this part is fine
On proxy listener side I can validate that proxy client cert is signed with given CA (proxy-listener-ca-chain-cert-file) but I am not able to check that proxy client cert is same as tls-client-cert-file. This is needed so that tenantA is not able to use same proxy group as tenantB (They are signed with same CA)

I did some hacking on kafka-proxy and came up with some prototype that is backwards compatible by default but allows to check for same client cert if needed (new config flag). The code is pretty straightforward and available here:

Unfortunately I did not create any automated tests for now (just tested manually) but that will follow, just wanted to have something in order to start discussion.

Would you be interested in such contribution ? I could redesign and add some tests based on your review.

Let me know what you think

sasl passthrough

Can/how Kafka-proxy be enhanced with a change where the client passes SASL credentials to Kafka proxy and Kafka proxy just forwards the client provided SASL credentials to Kafka broker for authentication and authorization purposes?

Running proxy with tls-enable works, but not with proxy-listener-tls-enable

Hi, I am trying to use the kafka-proxy as a way to do cross-VPC access for AWS MSK. First of all, I am trying to set up the proxy within the same VPC as the MSK. I am able to make it work with --tls-enable only. The proxy will be started as below

./kafka-proxy server \
        --bootstrap-server-mapping "," \
        --bootstrap-server-mapping "," \
        --dynamic-listeners-disable \
        --kafka-client-id client \
        --tls-enable \
        --tls-ca-chain-cert-file "/home/ec2-user/ssl2/truststore-certs.pem" \
        --tls-client-cert-file "/home/ec2-user/ssl2/keystore-certs.pem" \
        --tls-client-key-file  "/home/ec2-user/ssl2/key.pem" \
        --tls-client-key-password pass123

then connect locally using bin/ --bootstrap-server ip-10-1-0-94.ec2.internal:3001 --topic AWSKafkaTutorialTopic2 --from-beginning. Everything works fine.

However, when I tried to add the TLS between client and proxy, things start to break. I have done 3 experiments, and I connect to all 3 using bin/ --bootstrap-server ip-10-1-0-94.ec2.internal:3001 --topic AWSKafkaTutorialTopic2 --from-beginning --consumer.config, where the works fine when I connect directly to the AWS MSK bin/ --bootstrap-server --topic AWSKafkaTutorialTopic2 --from-beginning --consumer.config

Experiment 1: using port 9094

./kafka-proxy server \
        --bootstrap-server-mapping "," \
        --bootstrap-server-mapping "," \
        --dynamic-listeners-disable \
        --kafka-client-id proxy \
        --proxy-listener-tls-enable \
        --proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
        --proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
        --proxy-listener-key-file  "/home/ec2-user/kafka-proxy/ssl/key.pem" \


  1. Proxy log
INFO[2021-06-18T01:03:12Z] New connection for
INFO[2021-06-18T01:03:12Z] Reading data from had error: unexpected EOF
  1. Broker log
[2021-06-18 01:03:12,122] INFO [SocketServer brokerId=1] Failed authentication with /INTERNAL_IP (SSL handshake failed) (

Experiment 2: using port 9092

./kafka-proxy server \
        --bootstrap-server-mapping "," \
        --bootstrap-server-mapping "," \
        --dynamic-listeners-disable \
        --kafka-client-id proxy \
        --proxy-listener-tls-enable \
        --proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
        --proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
        --proxy-listener-key-file  "/home/ec2-user/kafka-proxy/ssl/key.pem" \


  1. Client Log
[2021-06-18 01:07:01,748] ERROR [Consumer clientId=consumer-console-consumer-42640-1, groupId=console-consumer-42640] Connection to node 2147483645 (/ failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-06-18 01:07:01,750] ERROR Error processing message, terminating consumer process:  ($)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: No subject alternative names present
  1. Proxy Log
INFO[2021-06-18T01:07:01Z] Reading data from local connection on from ( had error: tls: received unexpected handshake message of type *tls.clientHelloMsg when waiting for *tls.certificateMsg
  1. Broker Log: None

Experiment 3: combine both tls-enable and proxy-listener-tls-enable

./kafka-proxy server \
        --bootstrap-server-mapping "," \
        --bootstrap-server-mapping "," \
        --dynamic-listeners-disable \
        --kafka-client-id proxy \
        --proxy-listener-tls-enable \
        --proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
        --proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
        --proxy-listener-key-file  "/home/ec2-user/kafka-proxy/ssl/key.pem" \
        --tls-enable \
        --tls-ca-chain-cert-file "/home/ec2-user/ssl2/truststore-certs.pem" \
        --tls-client-cert-file "/home/ec2-user/ssl2/keystore-certs.pem" \
        --tls-client-key-file  "/home/ec2-user/ssl2/key.pem" \
        --tls-client-key-password pass123 \

Result: exactly the same as Experiment2

I feel like there might be something wrong with how I generate the keys/certificates for the proxy, but I cannot figure out what. When I do openssl s_client -connect ip-10-1-0-94.ec2.internal:3001, the end of the response is like below

SSL handshake has read 18546 bytes and written 417 bytes
Verification error: self signed certificate in certificate chain
New, TLSv1.3, Cipher is TLS_AES_128_GCM_SHA256
Server public key is 2048 bit
Secure Renegotiation IS NOT supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
Early data was not sent
Verify return code: 19 (self signed certificate in certificate chain)
140263774463808:error:14094412:SSL routines:ssl3_read_bytes:sslv3 alert bad certificate:ssl/record/rec_layer_s3.c:1544:SSL alert number 42

I checked your response in #40 (comment) and tried to disable TLS1.3, but it didn't help. It's always using TLSv1.3 as I can tell through openssl. I am not sure whether TLS1.3 is the problem. Also it seems that only tls13 is included in the binary.

[ec2-user@ip-10-1-0-94 kafka-proxy]$ strings kafka-proxy  | grep -i tls13.go

On the other hand, from the results in Experiment 1, we can actually tell that the TLS between client and proxy should already work. Otherwise, there shouldn't be any logs on the broker side.

Any suggestions would be great! Thank you!!

Proxy to multiple cluster bases on header fields

I known it's not the goal of this project but I have in mind something that can help me to solve big scaling issues.
The idea is to write a proxy between the producer and the brokers cluster, that can route all the trafic to the appropriate kafka cluster depending on a field on kafka header message. Maybe my idea is just crap, I don't know but if I can your input I ll appreciate.

Ftr : want to parse the content of each Kafka protocol package

Hello, I am using this library as a proxy for our Kafka cluster. Now we have a requirement to change some data of Kafka packets, such as topic content, at the Kafka proxy level. So we need to parse each package, rather than simply do 4-tier proxy. I hope we can discuss this . thank you very mush~

Multiple proxies with different client certificates

Hi, I have configured Kafka with client certificate auth. I have multiple client private keys for client certificates. I can setup two kafka-proxies on different ports with different value for --tls-client-key-file parameter. The question is if such setup will work ? I noticed that in LoadBalancer example ( Kafka-proxy pods are exposed as singleton StatefulSets so just want to make sure that two instances which proxy different types of clients (based on client certificates) will not cause some conflicts/race conditions.

It is also interesting why StatefulSet is needed (the problem with StatefulSet is that when pod goes into unknown state manual intervention is required to decide if to replace it or not). Is it Kafka protocol that requires network identity to remain the same or statefulness of protocol calls or maybe something else ?

Use case different than sidecar

Hi kafka-proxy contributors,

I'd like to know if kafka-proxy can be used in such a way different than the sidecar architecture. The project README basically mentions the use of localhost for accessing kafka-proxy, and I need it much more as a reverse proxy for accessing the backend Kafka brokers.

Thanks for any help.

List of supported Kafka versions

Hello! Thanks for this great project, Im looking to understand the versions of Kafka that are supported, but the only thing I could find was Kafka 2.4.0 support in v0.2.0. Is that the latest version that is supported?

I think it would be helpful to add a list of them to the README, I tried parsing it from code but was unable to.


QUESTION: open requests buffer is full

I am using kafka-proxy to publish 250 messages per second, from two producers where average message size is 8kB.

While doing that I get the following error on every few seconds:

Reading data from local connection on xxx:32502 from xxx:14913 ( had error: open requests buffer is full

I have increased all available "buffer" properties to quite high number (ie. 500000).
I am running proxy on stock ec2 m5.large instance.

Should I tweak any of the system settings? If so, which?

ACL support?

Does it have ACL or another access control mechanism on proxy level? For example, interception and filtering of messages by senders and target topics. Like gateways filters http messages by theirs headers and target path. The main idea is the same - to bring all the issues of access to the proxy/gateway(or other separate system) without using the built-in Kafka ACL.

Create Custom Plugins?


Would it be possible to develop and integrate custom plugins to perform Authentication and Authorization with kafka-proxy server?

Use case:

  • Custom plugin will handle authentication and authorization with an external service
  • Authorization will be a simple role check and can be done once on start up instead of a per message basis
  • Upon success of both checks the plugin would return a successful response and have kafka-proxy server authenticate using SASL with the kafka brokers

Looking at the plugin code I think this would be possible by reusing the plugin/local-auth interface to implement a custom plugin. The plugin would return the bool "authenticated" which I think would be sufficient for my use case

Question: msk server in port 443

My requirement is expose msk in public internet on port 443. I am trying to run this proxy in eks fronting MSK .

apiVersion: v1
kind: Service
    app: kafka-proxy-1
  name: kafka-proxy-1
  externalTrafficPolicy: Cluster
  - name: kafka-proxy-1
    port: 443
    protocol: TCP
    targetPort: 443
    app: kafka-proxy-1
  sessionAffinity: None
  type: LoadBalancer
apiVersion: v1
kind: Service
    app: kafka-proxy-2
  name: kafka-proxy-2
  externalTrafficPolicy: Cluster
  - name: kafka-proxy-2
    port: 443
    protocol: TCP
    targetPort: 443
    app: kafka-proxy-2
  sessionAffinity: None
  type: LoadBalancer

apiVersion: v1
kind: Service
    app: kafka-proxy-3
  name: kafka-proxy-3
  externalTrafficPolicy: Cluster
  - name: kafka-proxy-3
    port: 443
    protocol: TCP
    targetPort: 443
    app: kafka-proxy-3
  sessionAffinity: None
  type: LoadBalancer

apiVersion: apps/v1
kind: Deployment
   name: kafka-proxy-1
  replicas: 1
      app: kafka-proxy-1
        app: kafka-proxy-1
        - name: kafka-proxy
              add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
          image: grepplabs/kafka-proxy:latest
            - 'server'
            - '--log-format=json'
            - ',,'
            - ','
            - ','
            - ','
            - '--tls-enable'
            - '--log-level=debug'
            - '--dynamic-listeners-disable'
            - '--tls-insecure-skip-verify'
            - '--proxy-request-buffer-size=32768'
            - '--proxy-response-buffer-size=32768'
            - '--proxy-listener-read-buffer-size=32768'
            - '--proxy-listener-write-buffer-size=131072'
            - '--kafka-connection-read-buffer-size=131072'
            - '--kafka-connection-write-buffer-size=32768'
          - name: kafka-port1
            containerPort: 443

apiVersion: apps/v1
kind: Deployment
   name: kafka-proxy-2
  replicas: 1
      app: kafka-proxy-2
        app: kafka-proxy-2
        - name: kafka-proxy
              add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
          image: grepplabs/kafka-proxy:latest
            - 'server'
            - '--log-format=json'
            - ',,'
            - ','
            - ','
            - ','
            - '--tls-enable'
            - '--log-level=debug'
            - '--dynamic-listeners-disable'
            - '--tls-insecure-skip-verify'
            - '--proxy-request-buffer-size=32768'
            - '--proxy-response-buffer-size=32768'
            - '--proxy-listener-read-buffer-size=32768'
            - '--proxy-listener-write-buffer-size=131072'
            - '--kafka-connection-read-buffer-size=131072'
            - '--kafka-connection-write-buffer-size=32768'
          - name: kafka-port2
            containerPort: 443

apiVersion: apps/v1
kind: Deployment
   name: kafka-proxy-3
  replicas: 1
      app: kafka-proxy-3
        app: kafka-proxy-3
        - name: kafka-proxy
              add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
          image: grepplabs/kafka-proxy:latest
            - 'server'
            - '--log-format=json'
            - ','
            - ','
            - ','
            - ',,'
            - '--tls-enable'
            - '--log-level=debug'
            - '--dynamic-listeners-disable'
            - '--tls-insecure-skip-verify'
            - '--proxy-request-buffer-size=32768'
            - '--proxy-response-buffer-size=32768'
            - '--proxy-listener-read-buffer-size=32768'
            - '--proxy-listener-write-buffer-size=131072'
            - '--kafka-connection-read-buffer-size=131072'
            - '--kafka-connection-write-buffer-size=32768'
          - name: kafka-port3
            containerPort: 443

where * are the external ip i get from eks.

I get this error in my pod

{"@level":"info","@message":"Bootstrap server advertised as","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"External server advertised as","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"External server advertised as","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"Listening on ([::]:443) for remote","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"Ready for new connections","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"New connection for","@timestamp":"2019-11-11T22:46:01Z"}
{"@level":"info","@message":"Client closed local connection on from (","@timestamp":"2019-11-11T22:46:01Z"}
{"@level":"info","@message":"New connection for","@timestamp":"2019-11-11T22:46:05Z"}
{"@level":"info","@message":"Client closed local connection on from (","@timestamp":"2019-11-11T22:46:05Z"}
{"@level":"info","@message":"New connection for","@timestamp":"2019-11-11T22:46:09Z"}
{"@level":"info","@message":"Client closed local connection on from (","@timestamp":"2019-11-11T22:46:09Z"}
{"@level":"info","@message":"New connection for","@timestamp":"2019-11-11T22:46:11Z"}
{"@level":"info","@message":"Client closed local connection on from (","@timestamp":"2019-11-11T22:46:11Z"}```

How to access the proxy pod running in cluster externally via a service ?


Is it possible to access this example externally via a service (exposed via LoadBalancer) ?

This example works via the kubectl port-forward command but when we try to expose the same pod as a service, it does not work. Any feedback for this ?

What is the best way to do this ? The following example of the deployed proxy pod does not work.
kubectl expose pod/kafka-proxy-0 -n default --type LoadBalancer --name kafka-proxy-exposed

Minimum or Basic ACL or Authorization at the Kafka Proxy Level - any options to implement ?


Currently we have implemented an ldap oriented authentication with SASL authentication performed by the proxy.
SASL authentication is enabled on the clients and disabled on the Kafka brokers keeping the implementation simple.

Are there any options for us now to keep authentication disabled on the brokers but maintain basic ACL or Properties for Authorization based on roles mapped to users at the KAFKA PROXY Level ? It need not be as complicated as Kafka ACLs but is there such an option for us now or in the coming future ?

Any feedback or help will be greatly appreciated.


local error: tls: unexpected message on proxying request

I want to use this proxy to terminate TLS from the kafka brokers on the proxy (including client authentication with a key file) and connect from the application via plaintext. However, when running the proxy with the following command:

./kafka-proxy server --bootstrap-server-mapping "," --bootstrap-server-mapping "," --bootstrap-server-mapping "," --tls-insecure-skip-verify --tls-enable --tls-client-cert-file ./cert.pem --tls-client-key-file ./key.pem

However, whenever I try to connect from the java application, the proxy disconnects from the host with the following error message:

time="2020-02-15T13:08:30+01:00" level=info msg="New connection for"
time="2020-02-15T13:08:30+01:00" level=info msg="couldn't connect to local error: tls: unexpected message"

I'm not sure, what I do wrong, or probably I misunderstood the parameters here, can you help?

Question: Kubernetes: External IP, Service?


Is there any recommendation for deploy Kafka-proxy in Kubernetes and using a Service type LoadBalancing?
I did deploy it but I have to bind to and external IP and I don't have it on Pod.
I can't map to my LoadBalance hostname/ip:


Any ideas?
Thanks a lot!

Propagate Client Certificate

Hi there,

Is there any way to propagate the client certificate on the proxy?

We are authenticating our clients using their certificate and we would like that proxy presents the certificate to the server as it is sent by the client.



Add support for remapping brokers based on node id

Kafka allows brokers to advertise any IP, but they must have a consistent node id. It would be useful if kafka-proxy could key the IP replacement logic by node id as well as ip. For example, something like --bootstrap-server-mapping=NODE_ID:1,localhost:30001.

This would still require at least one --bootstrap-server-mapping that's not node id based, but it would provide a nice alternative in case the advertised hostnames aren't readily available and change over time.

QUESTION: does tls_enable turn on TLS for client side, server side, or both?

I have not experimented yet, but would like to know - does the tls_enable flag turn on TLS for connections from kafka-proxy to the kafka brokers? Or from clients to kafka-proxy? Or both?

If both, how would you configure different certificates for each side of the connection?

If only one of the above, are there plans to enable TLS for the other side?

Having issues with AWS MSK with TLS enabled


I am getting this when a producer contacts MSK through kafka-proxy with TLS.

INFO[2019-12-08T10:34:11Z] Reading data from local connection on from ( had error: tls: received unexpected handshake message of type *tls.clientHelloMsg when waiting for *tls.clientKeyExchangeMsg

I ran the kafka-proxy with Client auth and without it.. I had the same problem, as long as I have TLS enabled with msk cluster.

./kafka-proxy server --proxy-listener-tls-enable --tls-enable --proxy-listener-key-file "ssl.key" --proxy-listener-cert-file "server-cert.pem" --debug-enable --log-level debug --bootstrap-server-mapping ",," --bootstrap-server-mapping ",," --bootstrap-server-mapping ",,"

./ --broker-list --topic tls.tested.test --producer.config

I am using current versions :

  • For Kafka producer : kafka_2.12-2.2.1
  • MSK with in-transit encryption enabled and 2.2.1 kafka version.

Q: Connect to brokers with PEM and KEY

I would like to know how can I run/configure the kafka-proxy cli to connect to a cluster (brokers) asking for a certificate.

I have the PEM and KEY files, and I can connect using Java with a custom Keystore with those files.

How can I replicate this on the kafka-proxy client.

Restricting the ports for outgoing connection to a range of pre-defined ports

Is there a way to restrict the ports used for outgoing connections when starting the Kafka Proxy server? I am specifying the bootstrap server using the --bootstrap-server-mapping option and tried to use the --dynamic-listeners-disable and --dynamic-sequential-min-port but these options didn't seem to help. I have a firewall that blocks all outgoing connections by default and I have allowed port 9092 but I get the following error:

Reading data from destination_ip_address:destination_port had error: read tcp source_ip_address:source_port->destination_ip_address:destination_port: wsarecv: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.

This is because the Read TCP operation is happening on a random port that is blocked.

Update 1: After reading some code I found that the --dynamic-sequential-min-port parameter is being used to create servers/listeners using net.Listen method. So from what I understand this parameter is irrelevant to my problem. Can someone confirm ?

mutual ssl based authentication

is it possible to setup mutual ssl based authentication between client and kafka-proxy. Not between kafka-proxy and kafka broker, but between client and kafka-proxy (client<->kafka-proxy)?
What I need is, that kafka client users are authenticating themselves against kafka-proxy with their unique client tls certificates before they are routed to the kafka-broker from kafka-proxy.

Multiple kafka-proxy servers to balance the traffic

In a scenario that we have 12 brokers in the cluster, can we setup 3 kafka-proxy servers like below, so that each kafka-proxy will handle traffic for 4 brokers?


It's not quite clear to me from the README, maybe each kafka-proxy must have all 12 brokers mapped to 12 ports, and we have to put an ALB (in AWS) in front. Please advise.

QUESTION: Using this with Confluent Cloud



For SASL_PLAIN - does this pass through the SASL that i put on my clients to the brokers or is it intended to have it's own SASL credentials which are used for all upstream clients?

I'm having a play with this against confluent cloud with little success so far. I'm attempting to use it as a proxy because the CCloud brokers are in an external vpc which i can't connect directly to.


kafka proxy expects response from broker for Produce Request with RequiredAcks=0

Symptoms: "open requests buffer is full" as described in #23

This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.