grepplabs / kafka-proxy Goto Github PK
View Code? Open in Web Editor NEWProxy connections to Kafka cluster. Connect through SOCKS Proxy, HTTP Proxy or to cluster running in Kubernetes.
License: Apache License 2.0
Proxy connections to Kafka cluster. Connect through SOCKS Proxy, HTTP Proxy or to cluster running in Kubernetes.
License: Apache License 2.0
Hi,
Thanks for the great project.
I've noticed that using kafka 2.7.1 CLI clients (e.g. kafka-topics.sh
) works nicely with kafka-proxy, while using version 2.8.0 returns the following error in the kafka-proxy log:
{"@level":"info","@message":"Reading data from kafka-0.kafka:9092 had error: Unsupported response schema version 11 for key 3 ","@timestamp":"2021-07-08T10:43:21Z"}
Kafka broker version: 2.8.0
Kafka client: 2.8.0
kafka-proxy: 0.2.8
i thought about creating request/response programatically with Struct and Schema, i think it could be useful for testing purposes as well for creating responses nicely
I had these problems:
schemaTopicMetadata := NewSchemaStruct("topic_metadata_v0",
&field{name: "key", ty: typeInt16},
&field{name: "version", ty: typeInt16},
&field{name: "correlation_id", ty: typeInt32},
&field{name: "client_id", ty: typeStr},
&array{name: "topic", ty: typeStr},
)
topicMetadata := Struct{
schema: schemaTopicMetadata,
values: make([]interface{}, 0),
}
topicMetadata.values = append(topicMetadata.values, int16(0))
topicMetadata.values = append(topicMetadata.values, int16(0))
topicMetadata.values = append(topicMetadata.values, int32(0))
topicMetadata.values = append(topicMetadata.values, "custom")
arr := make([]interface{}, 0)
arr = append(arr, "topic1")
arr = append(arr, "topic2")
topicMetadata.values = append(topicMetadata.values, arr)
encoded, err := EncodeSchema(&topicMetadata, schemaTopicMetadata)
type FetchRequest_v0 struct {}
func (*f FetchRequest_v0) getSchema() Schema {
partitionSchema := NewSchema("partition_schema",
&field{name: "partition", ty: typeInt32},
&field{name: "fetch_offset", ty: typeInt64},
&field{name: "partition_max_bytes", ty: typeInt32},
)
topicSchema := NewSchema("topic_schema",
&field{name: "topic_name", ty: typeStr},
&array{name: "partitions", ty: partitionSchema},
)
schemaFetch := NewSchema("fetch_v0",
&field{name: "key", ty: typeInt16},
&field{name: "version", ty: typeInt16},
&field{name: "correlation_id", ty: typeInt32},
&field{name: "client_id", ty: typeStr},
&field{name: "reaplica_id", ty: typeInt32},
&field{name: "max_wait_time", ty: typeInt32},
&field{name: "min_bytes", ty: typeInt32},
&array{name: "topics", ty: topicSchema},
)
return schemaFetch
}
req := &FetchRequest_v0{}
topicSchema := req.GetSchema().fieldsByName["topics"].def.ty - ???? (cannot use this because in Schema constructor
we assert fields as Fields in NewSchema - Fields doesn't have method for schema gathering)
someStruct := Struct{
schema: topicSchema,
values: make([]interface{}, 0),
}
....add some values to someStruct
I can create pull requests but i would like to discuss it first as i don't know the code so deeply
We have a requirement for short-lived client certs, unfortunately the Kafka Java clients do not support hot-swapping of certs so this requires us to restart the consumers which subsequently trigger re-balances of the consumer group.
I was looking into a proxy option and wondered if such a setup would be possible with kafka-proxy? Feel like there must be others with this issue and this could be a great use for the proxy.
It looks like there is a PR for librdkafka to achieve this for clients which use this lib, though this is not yet merged.
confluentinc/librdkafka#2868
Hi,
I have an auditing/compliance requirement to generate log for auditing for my Kafka Broker. As my service provider doesn't provide it, I'm evaluate to use Kafka-proxy to achieve it.
In my requirements, if I'm able to add username information from SASL and client IP on debug log, something like that:
logrus.Debugf("[%s] Kafka request key %v, version %v, length %v", ctx.username, requestKeyVersion.ApiKey, requestKeyVersion.ApiVersion, requestKeyVersion.Length)
My broker uses SASL PLAIN.
I'm trying to change the code to extract SASL packages on Kafka's API KEY 17 on handleRequest
but I cannot re-read src DeadlineReaderWriter
, right?
Any ideia/recommendation? I'd love to contribute to the project, besides I'm not a go developer ;-)
Hi,
i am thinking of authorizing kafka with kafka-proxy, would it be possible to pass kafka message types to plugins?
Retrieval of google certificates (google-id-token) should go through the proxy server.
The implementation should be similar to Java Proxy settings:
a. http.proxyHost, http.proxyPort
b. htttps.proxyHost, https.proxyPort
c. socksProxyHost, socksProxyPort
I keep getting "had error: api key -10495 is invalid" when I try to proxy a Kafka cluster with SASL. Is this because the Kafka broker version is too old?
I'm using SASL plain with TLS enabled and the handshake is successful.
Could someone help?
I tried your suggestion and tests with TLS and auth-ldap. All works fine without TLS but I'am getting a handshake error. Not sure what is missing.
Here are my details in case you can catch it for us (attaching error.txt)
error.txt
Any help is highly appreciated. Thanks.
===================================
Hi,
Any ideia why I got:
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:402)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:237)
at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:401)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:417)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:312)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
Running bin/kafka-consumer-groups --bootstrap-server proxy:9092 --all-groups --all-topics --describe --verbose
and I not have error when running directly on my broker?
Thanks a lot!
Given the answer to a previous question - that SASL will get passed through by default - does this mean that i configure my java client with the SASL properties for the brokers and it will get passed transparently through the kafka proxy when i use the kafka-proxy as my bootstrap-servers?
I've been fiddling around with the settings to achieve this and i'm up to the point where i'm getting net address mapping not faund for my brokers which appear to have been renamed to b2- (i.e. b2-pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092 for the config below). Do i need to configure something in external-server-mappings perhaps - i don't actually understand what external-server-mappings
actually does - I originally thought it might be instead of bootstrap-server-mapping where the kafka broker is on a host that's not the same as the one the kafka-proxy is on, but that doesn't appear to be so.
I'm presuming some kind of dynamic proxy is set up on the kafka-proxy and the path is client -> kafka-proxy -> kafka-proxy dynamic proxy -> kafka-brokers
?
I think perhaps I'm a little confused.
my java client is configured:
ssl.truststore.location=dev-kafka.truststore.jks
ssl.truststore.password=changeit
ssl.truststore.type=PKCS12
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=worker-0.some.domain.name.com:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="someusername" password\="somepassword";
security.protocol=SASL_SSL
and my kafka-proxy is run as:
kafka-proxy server \
--bootstrap-server-mapping "pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092,0.0.0.0:9092" \
--tls-enable \
--tls-insecure-skip-verify \
--proxy-listener-tls-enable \
--proxy-listener-key-file=/root/dev-kafka-server.private.key \
--proxy-listener-cert-file=/root/dev-kafka-server.signed.cer \
--dynamic-listeners-disable
Error's I'm seeing are:
INFO[2018-10-08T17:17:51+13:00] New connection for pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092
INFO[2018-10-08T17:17:51+13:00] Reading data from pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092 had error: net address mapping for b2-pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092 was not found
hi,
I have a kafka(v2.5.0) cluster(3 brokers ).I start up kafka-proxy(v0.2.8) in another machine,And send a message by kafka-console-producer
,the message does not write into topic,Kafka throw a java.io.EOFException
.
./kafka-console-producer.sh --broker-list proxyip:32400,proxyip:32401,proxyip:32402 --topic test
>123
[2020-11-15 05:41:34,272] WARN [Producer clientId=console-producer] Connection to node 3 (/0.0.0.0:32402) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,371] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,472] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,573] WARN [Producer clientId=console-producer] Connection to node 3 (/0.0.0.0:32402) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,675] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,776] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,876] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,977] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:35,078] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
kafka-proxy server \
--bootstrap-server-mapping "broker1:9093,0.0.0.0:32400" \
--bootstrap-server-mapping "broker2:9093,0.0.0.0:32401" \
--bootstrap-server-mapping "broker3:9093,0.0.0.0:32402" \
--sasl-enable \
--sasl-username "admin" \
--sasl-password "admin" \
--log-level debug
INFO[2020-11-15T05:41:32-05:00] New connection for broker3:9093
DEBU[2020-11-15T05:41:32-05:00] Sending SaslHandshakeRequest mechanism: PLAIN version: 0
DEBU[2020-11-15T05:41:32-05:00] Successful SASL handshake. Available mechanisms: [PLAIN]
DEBU[2020-11-15T05:41:32-05:00] Sending authentication opaque packets, mechanism PLAIN
DEBU[2020-11-15T05:41:33-05:00] Kafka request key 18, version 3, length 52
DEBU[2020-11-15T05:41:33-05:00] Kafka response key 18, version 3, length 348
DEBU[2020-11-15T05:41:33-05:00] Kafka request key 3, version 9, length 32
DEBU[2020-11-15T05:41:33-05:00] Kafka response key 3, version 9, length 112
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker1:9093, listener=0.0.0.0:32401, advertised=0.0.0.0:32401
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker2:9093, listener=0.0.0.0:32402, advertised=0.0.0.0:32402
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker3:9093, listener=0.0.0.0:32400, advertised=0.0.0.0:32400
INFO[2020-11-15T05:41:37-05:00] Client closed local connection on 192.168.4.60:32400 from broker3:40750 (broker3:9093)
[2020-11-15 05:34:46,830] DEBUG Processor 3 listening to new connection from /proxyip:54714 (kafka.network.Processor)
[2020-11-15 05:34:46,830] DEBUG Accepted connection from /proxyip:54714 on /broker1:9093 and assigned it to processor 3, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (
kafka.network.Acceptor)
[2020-11-15 05:34:46,830] DEBUG connections.max.reauth.ms for mechanism=PLAIN: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Handling Kafka request SASL_HANDSHAKE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Using SASL mechanism 'PLAIN' provided by client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG Set SASL server state to AUTHENTICATE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG Authentication complete; session max lifetime from broker config=0 ms, no credential expiration; no session expiration, sending 0 ms to client (org.apache.kafka.common.security.authenticator.SaslServerAu
thenticator)
[2020-11-15 05:34:46,843] DEBUG Set SASL server state to COMPLETE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG [SocketServer brokerId=1] Successfully authenticated with /proxyip (org.apache.kafka.common.network.Selector)
[2020-11-15 05:34:46,854] DEBUG [SocketServer brokerId=1] Connection with /proxyip disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:97)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:861)
at kafka.network.Processor.run(SocketServer.scala:760)
at java.lang.Thread.run(Thread.java:748)
thanks fo any help
Username/Password Authentication for SOCKS V5 should be supported (RFC 1929)
Looks like it's not possible for a client behind kafka-proxy or kafka-proxy itself to authenticate themselves to Kafka brokers with Kerberos credentials? How difficult could it be to support this use case? I can see the underlying 3rd-party Sarama library supports Kerberos authentication.
Also, would you happen to know if there exists a workaround to achieve the subj with the current version?
Thanks for doing a great job.
Question:
I have a scenario where I would like kafka-proxy to initiate SASL PLAIN with a kafka broker using credentials passed in from the client.
Proposed flow:
Looking at the code I think the most relevant changes could be done in client.go ( I realize other parts of the code would need to be changed as well):
Would you be open to supporting this pattern? If so we are more than happy to contribute the needed changes.
Hello,
Could you please add options to be able to specify all field titles of JSON structured logs?
E.g. Google's Stackdriver expects a field named "message", but not "@message".
We have been using the proxy with great success to date using both PLAINTEXT and SSL endpoints. We are now wanting to do SASL authentication with SCRAM (our hosting provider uses SCRAM256 rather than PLAIN).
As best I can tell kafka-proxy currently doesn't have support for this.
I believe I will need to add a *Auth struct in sasl_by_proxy.go to handle the SCRAM portion as well as providing some command line parameters.
Any chance someone has worked on this already and has some code sitting around? If not I will likely get started hacking away.
(The purpose of this report is to alert grepplabs/kafka-proxy
to the possible problems when grepplabs/kafka-proxy
try to upgrade the following dependencies)
-Latest Version: v1.7.1 (Latest commit fe7bd95 11 days ago)
-Where did you use it:
https://github.com/grepplabs/kafka-proxy/search?q=prometheus%2Fclient_golang%2Fprometheus&unscoped_q=prometheus%2Fclient_golang%2Fprometheus
-Detail:
module github.com/prometheus/client_golang
require (
github.com/beorn7/perks v1.0.1
github.com/cespare/xxhash/v2 v2.1.1
…
)
go 1.11
package prometheus
import (
"github.com/cespare/xxhash/v2"
…
)
This problem was introduced since prometheus/client_golang v1.2.0(committed 9a2ab94 on 16 Oct 2019) .Now you used version v0.8.0. If you try to upgrade prometheus/client_golang to version v1.2.0 and above, you will get an error--- no package exists at "github.com/cespare/xxhash/v2"
These dependencies all added Go modules in the recent versions.
They all comply with the specification of "Releasing Modules for v2 or higher" available in the Modules documentation. Quoting the specification:
A package that has migrated to Go Modules must include the major version in the import path to reference any v2+ modules. For example, Repo github.com/my/module migrated to Modules on version v3.x.y. Then this repo should declare its module path with MAJOR version suffix "/v3" (e.g., module
github.com/my/module/v3
), and its downstream project should use"github.com/my/module/v3/mypkg"
to import this repo’s package.
physical path
. So earlier versions of Go (including those that don't have minimal module awareness) plus all tooling (like dep, glide, govendor, etc) don't have minimal module awareness
as of now and therefore don't handle import paths correctly See golang/dep#1962, golang/dep#2139.Note: creating a new branch is not required. If instead you have been previously releasing on master and would prefer to tag v3.0.0 on master, that is a viable option. (However, be aware that introducing an incompatible API change in master can cause issues for non-modules users who issue a go get -u given the go tool is not aware of semver prior to Go 1.11 or when module mode is not enabled in Go 1.11+).
Pre-existing dependency management solutions such as dep currently can have problems consuming a v2+ module created in this way. See for example dep#1962.
https://github.com/golang/go/wiki/Modules#releasing-modules-v2-or-higher
Go Modules is the general trend of ecosystem, if you want a better upgrade package experience, migrating to Go Modules is a good choice.
Migrate to modules will be accompanied by the introduction of virtual paths(It was discussed above).
This "github.com/my/module/v3/mypkg" is not the
physical path
. So Go versions older than 1.9.7 and 1.10.3 plus all third-party dependency management tools (like dep, glide, govendor, etc) don't haveminimal module awareness
as of now and therefore don't handle import paths correctly.
Then the downstream projects might be negatively affected in their building if they are module-unaware (Go versions older than 1.9.7 and 1.10.3; Or use third-party dependency management tools, such as: Dep, glide, govendor…).
If grepplabs/kafka-proxy
want to keep using the dependency manage tools (like dep, glide, govendor, etc), and still want to upgrade the dependencies, can choose this fix strategy.
Manually download the dependencies into the vendor directory and do compatibility dispose(materialize the virtual path or delete the virtual part of the path). Avoid fetching the dependencies by virtual import paths. This may add some maintenance overhead compared to using modules.
As the import paths have different meanings between the projects adopting module repos and the non-module repos, materialize the virtual path is a better way to solve the issue, while ensuring compatibility with downstream module users. A textbook example provided by repo github.com/moby/moby
is here:
https://github.com/moby/moby/blob/master/VENDORING.md
https://github.com/moby/moby/blob/master/vendor.conf
In the vendor directory, github.com/moby/moby
adds the /vN subdirectory in the corresponding dependencies.
This will help more downstream module users to work well with your package.
The prometheus/client_golang
have 1048 module-unaware users in github, such as: AndreaGreco/mqtt_sensor_exporter, seekplum/plum_exporter, arl/monitoring…
https://github.com/search?q=prometheus%2Fclient_golang+filename%3Avendor.conf+filename%3Avendor.json+filename%3Aglide.toml+filename%3AGodep.toml+filename%3AGodep.json&type=Code
You can make a choice when you meet this DM issues by balancing your own development schedules/mode against the affects on the downstream projects.
For this issue, Solution 1 can maximize your benefits and with minimal impacts to your downstream projects the ecosystem.
Do you plan to upgrade the libraries in near future?
Hope this issue report can help you ^_^
Thank you very much for your attention.
Best regards,
Kate
Hi,
Here is my use case. I need to proxy AWS MSK (Managed Service Kafka) in order to connect it from any network (MSK connectivity options are bit limited as of now). MSK cluster is created with mutual TLS, this is because it is multitenant and each tenant has its own client cert. Client certs for all tenants are signed by same CA (CA certificate monthly fee is pretty high so this is cost optimization). I have one proxy group for each tenant and still want to maintain TLS between proxy client and proxy.
Kafka-proxy offers a possibility to terminate mutual TLS on proxy and also to start mutual TLS on proxy so those two capabilities can be used together in order to make sure that:
I can start TLS on proxy with selected certificate (tls-client-cert-file) so this part is fine
On proxy listener side I can validate that proxy client cert is signed with given CA (proxy-listener-ca-chain-cert-file) but I am not able to check that proxy client cert is same as tls-client-cert-file. This is needed so that tenantA is not able to use same proxy group as tenantB (They are signed with same CA)
I did some hacking on kafka-proxy and came up with some prototype that is backwards compatible by default but allows to check for same client cert if needed (new config flag). The code is pretty straightforward and available here: https://github.com/mgusiew-guide/kafka-proxy/tree/SameClientCertEnable
Unfortunately I did not create any automated tests for now (just tested manually) but that will follow, just wanted to have something in order to start discussion.
Would you be interested in such contribution ? I could redesign and add some tests based on your review.
Let me know what you think
Make plugin-auth-ldap production ready:
Can/how Kafka-proxy be enhanced with a change where the client passes SASL credentials to Kafka proxy and Kafka proxy just forwards the client provided SASL credentials to Kafka broker for authentication and authorization purposes?
Hi, I am trying to use the kafka-proxy as a way to do cross-VPC access for AWS MSK. First of all, I am trying to set up the proxy within the same VPC as the MSK. I am able to make it work with --tls-enable
only. The proxy will be started as below
./kafka-proxy server \
--bootstrap-server-mapping "b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3001" \
--bootstrap-server-mapping "b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3002" \
--dynamic-listeners-disable \
--kafka-client-id client \
--tls-enable \
--tls-ca-chain-cert-file "/home/ec2-user/ssl2/truststore-certs.pem" \
--tls-client-cert-file "/home/ec2-user/ssl2/keystore-certs.pem" \
--tls-client-key-file "/home/ec2-user/ssl2/key.pem" \
--tls-client-key-password pass123
then connect locally using bin/kafka-console-consumer.sh --bootstrap-server ip-10-1-0-94.ec2.internal:3001 --topic AWSKafkaTutorialTopic2 --from-beginning
. Everything works fine.
However, when I tried to add the TLS between client and proxy, things start to break. I have done 3 experiments, and I connect to all 3 using bin/kafka-console-consumer.sh --bootstrap-server ip-10-1-0-94.ec2.internal:3001 --topic AWSKafkaTutorialTopic2 --from-beginning --consumer.config client.properties
, where the client.properties
works fine when I connect directly to the AWS MSK bin/kafka-console-consumer.sh --bootstrap-server b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094 --topic AWSKafkaTutorialTopic2 --from-beginning --consumer.config client.properties
./kafka-proxy server \
--bootstrap-server-mapping "b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3001" \
--bootstrap-server-mapping "b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3002" \
--dynamic-listeners-disable \
--kafka-client-id proxy \
--proxy-listener-tls-enable \
--proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
--proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
--proxy-listener-key-file "/home/ec2-user/kafka-proxy/ssl/key.pem" \
--debug-enable
Results
INFO[2021-06-18T01:03:12Z] New connection for b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094
INFO[2021-06-18T01:03:12Z] Reading data from b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094 had error: unexpected EOF
[2021-06-18 01:03:12,122] INFO [SocketServer brokerId=1] Failed authentication with /INTERNAL_IP (SSL handshake failed) (org.apache.kafka.common.network.Selector)
./kafka-proxy server \
--bootstrap-server-mapping "b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9092,0.0.0.0:3001" \
--bootstrap-server-mapping "b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9092,0.0.0.0:3002" \
--dynamic-listeners-disable \
--kafka-client-id proxy \
--proxy-listener-tls-enable \
--proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
--proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
--proxy-listener-key-file "/home/ec2-user/kafka-proxy/ssl/key.pem" \
--debug-enable
Result:
[2021-06-18 01:07:01,748] ERROR [Consumer clientId=consumer-console-consumer-42640-1, groupId=console-consumer-42640] Connection to node 2147483645 (/0.0.0.0:3002) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-06-18 01:07:01,750] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names present
at sun.security.ssl.Alert.createSSLException(Alert.java:131)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
INFO[2021-06-18T01:07:01Z] Reading data from local connection on 10.1.0.94:3002 from 10.1.0.94:52242 (b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9092) had error: tls: received unexpected handshake message of type *tls.clientHelloMsg when waiting for *tls.certificateMsg
tls-enable
and proxy-listener-tls-enable
./kafka-proxy server \
--bootstrap-server-mapping "b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3001" \
--bootstrap-server-mapping "b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3002" \
--dynamic-listeners-disable \
--kafka-client-id proxy \
--proxy-listener-tls-enable \
--proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
--proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
--proxy-listener-key-file "/home/ec2-user/kafka-proxy/ssl/key.pem" \
--tls-enable \
--tls-ca-chain-cert-file "/home/ec2-user/ssl2/truststore-certs.pem" \
--tls-client-cert-file "/home/ec2-user/ssl2/keystore-certs.pem" \
--tls-client-key-file "/home/ec2-user/ssl2/key.pem" \
--tls-client-key-password pass123 \
--debug-enable
Result: exactly the same as Experiment2
I feel like there might be something wrong with how I generate the keys/certificates for the proxy, but I cannot figure out what. When I do openssl s_client -connect ip-10-1-0-94.ec2.internal:3001
, the end of the response is like below
---
SSL handshake has read 18546 bytes and written 417 bytes
Verification error: self signed certificate in certificate chain
---
New, TLSv1.3, Cipher is TLS_AES_128_GCM_SHA256
Server public key is 2048 bit
Secure Renegotiation IS NOT supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
Early data was not sent
Verify return code: 19 (self signed certificate in certificate chain)
---
140263774463808:error:14094412:SSL routines:ssl3_read_bytes:sslv3 alert bad certificate:ssl/record/rec_layer_s3.c:1544:SSL alert number 42
I checked your response in #40 (comment) and tried to disable TLS1.3
, but it didn't help. It's always using TLSv1.3
as I can tell through openssl. I am not sure whether TLS1.3
is the problem. Also it seems that only tls13 is included in the binary.
[ec2-user@ip-10-1-0-94 kafka-proxy]$ strings kafka-proxy | grep -i tls13.go
/usr/local/go/src/crypto/tls/handshake_server_tls13.go
/usr/local/go/src/crypto/tls/handshake_client_tls13.go
On the other hand, from the results in Experiment 1, we can actually tell that the TLS between client and proxy should already work. Otherwise, there shouldn't be any logs on the broker side.
Any suggestions would be great! Thank you!!
I known it's not the goal of this project but I have in mind something that can help me to solve big scaling issues.
The idea is to write a proxy between the producer and the brokers cluster, that can route all the trafic to the appropriate kafka cluster depending on a field on kafka header message. Maybe my idea is just crap, I don't know but if I can your input I ll appreciate.
@everesio Do we have a ways to redirect the logs to specific folder ? Also could you please share the performance metrics/benchmarks for this proxy for a higher volume of data ?
Hello, I am using this library as a proxy for our Kafka cluster. Now we have a requirement to change some data of Kafka packets, such as topic content, at the Kafka proxy level. So we need to parse each package, rather than simply do 4-tier proxy. I hope we can discuss this . thank you very mush~
Hi, I have configured Kafka with client certificate auth. I have multiple client private keys for client certificates. I can setup two kafka-proxies on different ports with different value for --tls-client-key-file parameter. The question is if such setup will work ? I noticed that in LoadBalancer example (https://gist.github.com/everesio/e6fae11ed69099ae3f867eddcd83db82) Kafka-proxy pods are exposed as singleton StatefulSets so just want to make sure that two instances which proxy different types of clients (based on client certificates) will not cause some conflicts/race conditions.
It is also interesting why StatefulSet is needed (the problem with StatefulSet is that when pod goes into unknown state manual intervention is required to decide if to replace it or not). Is it Kafka protocol that requires network identity to remain the same or statefulness of protocol calls or maybe something else ?
Hi kafka-proxy contributors,
I'd like to know if kafka-proxy can be used in such a way different than the sidecar architecture. The project README basically mentions the use of localhost for accessing kafka-proxy, and I need it much more as a reverse proxy for accessing the backend Kafka brokers.
Thanks for any help.
Hello! Thanks for this great project, Im looking to understand the versions of Kafka that are supported, but the only thing I could find was Kafka 2.4.0 support
in v0.2.0
. Is that the latest version that is supported?
I think it would be helpful to add a list of them to the README, I tried parsing it from code but was unable to.
Thanks!
I am using kafka-proxy to publish 250 messages per second, from two producers where average message size is 8kB.
While doing that I get the following error on every few seconds:
Reading data from local connection on xxx:32502 from xxx:14913 (b-2.testcluster.pwsiki.c3.kafka.eu-west-1.amazonaws.com:9092) had error: open requests buffer is full
I have increased all available "buffer" properties to quite high number (ie. 500000).
I am running proxy on stock ec2 m5.large instance.
Should I tweak any of the system settings? If so, which?
Does it have ACL or another access control mechanism on proxy level? For example, interception and filtering of messages by senders and target topics. Like gateways filters http messages by theirs headers and target path. The main idea is the same - to bring all the issues of access to the proxy/gateway(or other separate system) without using the built-in Kafka ACL.
Question:
Would it be possible to develop and integrate custom plugins to perform Authentication and Authorization with kafka-proxy server?
Use case:
Looking at the plugin code I think this would be possible by reusing the plugin/local-auth interface to implement a custom plugin. The plugin would return the bool "authenticated" which I think would be sufficient for my use case
My requirement is expose msk in public internet on port 443. I am trying to run this proxy in eks fronting MSK .
---
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-proxy-1
name: kafka-proxy-1
spec:
externalTrafficPolicy: Cluster
ports:
- name: kafka-proxy-1
port: 443
protocol: TCP
targetPort: 443
selector:
app: kafka-proxy-1
sessionAffinity: None
type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-proxy-2
name: kafka-proxy-2
spec:
externalTrafficPolicy: Cluster
ports:
- name: kafka-proxy-2
port: 443
protocol: TCP
targetPort: 443
selector:
app: kafka-proxy-2
sessionAffinity: None
type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-proxy-3
name: kafka-proxy-3
spec:
externalTrafficPolicy: Cluster
ports:
- name: kafka-proxy-3
port: 443
protocol: TCP
targetPort: 443
selector:
app: kafka-proxy-3
sessionAffinity: None
type: LoadBalancer
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-proxy-1
spec:
replicas: 1
selector:
matchLabels:
app: kafka-proxy-1
template:
metadata:
labels:
app: kafka-proxy-1
spec:
containers:
- name: kafka-proxy
securityContext:
capabilities:
add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
image: grepplabs/kafka-proxy:latest
args:
- 'server'
- '--log-format=json'
- '--bootstrap-server-mapping=b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:443,a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443'
- '--external-server-mapping=b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443'
- '--external-server-mapping=b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443'
- '--external-server-mapping=b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443'
- '--tls-enable'
- '--log-level=debug'
- '--dynamic-listeners-disable'
- '--tls-insecure-skip-verify'
- '--proxy-request-buffer-size=32768'
- '--proxy-response-buffer-size=32768'
- '--proxy-listener-read-buffer-size=32768'
- '--proxy-listener-write-buffer-size=131072'
- '--kafka-connection-read-buffer-size=131072'
- '--kafka-connection-write-buffer-size=32768'
ports:
- name: kafka-port1
containerPort: 443
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-proxy-2
spec:
replicas: 1
selector:
matchLabels:
app: kafka-proxy-2
template:
metadata:
labels:
app: kafka-proxy-2
spec:
containers:
- name: kafka-proxy
securityContext:
capabilities:
add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
image: grepplabs/kafka-proxy:latest
args:
- 'server'
- '--log-format=json'
- '--bootstrap-server-mapping=b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:443,a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443'
- '--external-server-mapping=b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443'
- '--external-server-mapping=b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443'
- '--external-server-mapping=b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443'
- '--tls-enable'
- '--log-level=debug'
- '--dynamic-listeners-disable'
- '--tls-insecure-skip-verify'
- '--proxy-request-buffer-size=32768'
- '--proxy-response-buffer-size=32768'
- '--proxy-listener-read-buffer-size=32768'
- '--proxy-listener-write-buffer-size=131072'
- '--kafka-connection-read-buffer-size=131072'
- '--kafka-connection-write-buffer-size=32768'
ports:
- name: kafka-port2
containerPort: 443
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-proxy-3
spec:
replicas: 1
selector:
matchLabels:
app: kafka-proxy-3
template:
metadata:
labels:
app: kafka-proxy-3
spec:
containers:
- name: kafka-proxy
securityContext:
capabilities:
add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
image: grepplabs/kafka-proxy:latest
args:
- 'server'
- '--log-format=json'
- '--external-server-mapping=b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443'
- '--external-server-mapping=b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443'
- '--external-server-mapping=b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443'
- '--bootstrap-server-mapping=b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:443,a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443'
- '--tls-enable'
- '--log-level=debug'
- '--dynamic-listeners-disable'
- '--tls-insecure-skip-verify'
- '--proxy-request-buffer-size=32768'
- '--proxy-response-buffer-size=32768'
- '--proxy-listener-read-buffer-size=32768'
- '--proxy-listener-write-buffer-size=131072'
- '--kafka-connection-read-buffer-size=131072'
- '--kafka-connection-write-buffer-size=32768'
ports:
- name: kafka-port3
containerPort: 443
where *.us-east-1.elb.amazonaws.com:443 are the external ip i get from eks.
I get this error in my pod
{"@level":"info","@message":"Bootstrap server b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094 advertised as a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"External server b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094 advertised as a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"External server b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094 advertised as a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"Listening on 0.0.0.0:443 ([::]:443) for remote b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"Ready for new connections","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"New connection for b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:46:01Z"}
{"@level":"info","@message":"Client closed local connection on 20.10.3.191:443 from 20.10.1.197:39176 (b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094)","@timestamp":"2019-11-11T22:46:01Z"}
{"@level":"info","@message":"New connection for b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:46:05Z"}
{"@level":"info","@message":"Client closed local connection on 20.10.3.191:443 from 20.10.2.185:7332 (b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094)","@timestamp":"2019-11-11T22:46:05Z"}
{"@level":"info","@message":"New connection for b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:46:09Z"}
{"@level":"info","@message":"Client closed local connection on 20.10.3.191:443 from 20.10.3.252:36298 (b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094)","@timestamp":"2019-11-11T22:46:09Z"}
{"@level":"info","@message":"New connection for b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:46:11Z"}
{"@level":"info","@message":"Client closed local connection on 20.10.3.191:443 from 20.10.1.197:39186 (b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094)","@timestamp":"2019-11-11T22:46:11Z"}```
Question?
Is it possible to access this example externally via a service (exposed via LoadBalancer) ?
This example works via the kubectl port-forward command but when we try to expose the same pod as a service, it does not work. Any feedback for this ?
What is the best way to do this ? The following example of the deployed proxy pod does not work.
kubectl expose pod/kafka-proxy-0 -n default --type LoadBalancer --name kafka-proxy-exposed
Hello,
Currently we have implemented an ldap oriented authentication with SASL authentication performed by the proxy.
SASL authentication is enabled on the clients and disabled on the Kafka brokers keeping the implementation simple.
Are there any options for us now to keep authentication disabled on the brokers but maintain basic ACL or Properties for Authorization based on roles mapped to users at the KAFKA PROXY Level ? It need not be as complicated as Kafka ACLs but is there such an option for us now or in the coming future ?
Any feedback or help will be greatly appreciated.
Thanks
@ashishgidh
I want to use this proxy to terminate TLS from the kafka brokers on the proxy (including client authentication with a key file) and connect from the application via plaintext. However, when running the proxy with the following command:
./kafka-proxy server --bootstrap-server-mapping "kafka1.example.com:19093,127.0.0.1:32500" --bootstrap-server-mapping "kafka2.example.com:19093,127.0.0.1:32501" --bootstrap-server-mapping "kafka3.example.com:19093,127.0.0.1:32502" --tls-insecure-skip-verify --tls-enable --tls-client-cert-file ./cert.pem --tls-client-key-file ./key.pem
However, whenever I try to connect from the java application, the proxy disconnects from the host with the following error message:
time="2020-02-15T13:08:30+01:00" level=info msg="New connection for kafka1.example.com:19093"
time="2020-02-15T13:08:30+01:00" level=info msg="couldn't connect to kafka1.example.com:19093(kafka1.example.com:19093): local error: tls: unexpected message"
I'm not sure, what I do wrong, or probably I misunderstood the parameters here, can you help?
Hi,
Is there any recommendation for deploy Kafka-proxy in Kubernetes and using a Service type LoadBalancing?
I did deploy it but I have to bind to and external IP and I don't have it on Pod.
I can't map to my LoadBalance hostname/ip:
--bootstrap-server-mapping=original-bootstrap:9092,external-ip-hostname:9092",
Any ideas?
Thanks a lot!
Hi there,
Is there any way to propagate the client certificate on the proxy?
We are authenticating our clients using their certificate and we would like that proxy presents the certificate to the server as it is sent by the client.
Cheers,
Elton
Kafka allows brokers to advertise any IP, but they must have a consistent node id. It would be useful if kafka-proxy could key the IP replacement logic by node id as well as ip. For example, something like --bootstrap-server-mapping=NODE_ID:1,localhost:30001
.
This would still require at least one --bootstrap-server-mapping
that's not node id based, but it would provide a nice alternative in case the advertised hostnames aren't readily available and change over time.
I have not experimented yet, but would like to know - does the tls_enable
flag turn on TLS for connections from kafka-proxy to the kafka brokers? Or from clients to kafka-proxy? Or both?
If both, how would you configure different certificates for each side of the connection?
If only one of the above, are there plans to enable TLS for the other side?
Hi,
I am getting this when a producer contacts MSK through kafka-proxy with TLS.
INFO[2019-12-08T10:34:11Z] Reading data from local connection on 10.20.6.169:9092 from 10.20.12.42:40272 (b-1.msk-a.uv35cd.c3.kafka.eu-west-1.amazonaws.com:9094) had error: tls: received unexpected handshake message of type *tls.clientHelloMsg when waiting for *tls.clientKeyExchangeMsg
I ran the kafka-proxy with Client auth and without it.. I had the same problem, as long as I have TLS enabled with msk cluster.
./kafka-proxy server --proxy-listener-tls-enable --tls-enable --proxy-listener-key-file "ssl.key" --proxy-listener-cert-file "server-cert.pem" --debug-enable --log-level debug --bootstrap-server-mapping "b-1.msk-a.uv35cd.c3.kafka.eu-west-1.amazonaws.com:9094,0.0.0.0:9092,msk.a.sandbox.aws.corpinc.com:9092" --bootstrap-server-mapping "b-2.msk-a.uv35cd.c3.kafka.eu-west-1.amazonaws.com:9094,0.0.0.0:9093,msk.a.sandbox.aws.corpinc.com:9093" --bootstrap-server-mapping "b-3.msk-a.uv35cd.c3.kafka.eu-west-1.amazonaws.com:9094,0.0.0.0:9094,msk.a.sandbox.aws.corpinc.com:9094"
./kafka-console-producer.sh --broker-list grappler.msk.a.sandbox.aws.corpinc.com:9092 --topic tls.tested.test --producer.config client.properties
client.properties:
security.protocol=SSL
ssl.truststore.location=kafka.client.truststore.jks
ssl.truststore.password=changeit
I am using current versions :
I would like to know how can I run/configure the kafka-proxy cli to connect to a cluster (brokers) asking for a certificate.
I have the PEM and KEY files, and I can connect using Java with a custom Keystore with those files.
How can I replicate this on the kafka-proxy client.
Hi,
Is there a way to restrict the ports used for outgoing connections when starting the Kafka Proxy server? I am specifying the bootstrap server using the --bootstrap-server-mapping
option and tried to use the --dynamic-listeners-disable
and --dynamic-sequential-min-port
but these options didn't seem to help. I have a firewall that blocks all outgoing connections by default and I have allowed port 9092
but I get the following error:
Reading data from destination_ip_address:destination_port had error: read tcp source_ip_address:source_port->destination_ip_address:destination_port: wsarecv: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
This is because the Read TCP operation is happening on a random port that is blocked.
Update 1: After reading some code I found that the --dynamic-sequential-min-port
parameter is being used to create servers/listeners using net.Listen
method. So from what I understand this parameter is irrelevant to my problem. Can someone confirm ?
Hi,
is it possible to setup mutual ssl based authentication between client and kafka-proxy. Not between kafka-proxy and kafka broker, but between client and kafka-proxy (client<->kafka-proxy)?
What I need is, that kafka client users are authenticating themselves against kafka-proxy with their unique client tls certificates before they are routed to the kafka-broker from kafka-proxy.
Thanks.
In a scenario that we have 12 brokers in the cluster, can we setup 3 kafka-proxy servers like below, so that each kafka-proxy will handle traffic for 4 brokers?
It's not quite clear to me from the README, maybe each kafka-proxy must have all 12 brokers mapped to 12 ports, and we have to put an ALB (in AWS) in front. Please advise.
QUESTION:
Hi,
For SASL_PLAIN - does this pass through the SASL that i put on my clients to the brokers or is it intended to have it's own SASL credentials which are used for all upstream clients?
Background
I'm having a play with this against confluent cloud with little success so far. I'm attempting to use it as a proxy because the CCloud brokers are in an external vpc which i can't connect directly to.
Regards,
Jon
Symptoms: "open requests buffer is full" as described in #23
RequiredAcks:
This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.