Git Product home page Git Product logo

kafka-proxy's Introduction

kafka-proxy

Build Status Docker Hub Docker Pulls

The Kafka Proxy is based on idea of Cloud SQL Proxy. It allows a service to connect to Kafka brokers without having to deal with SASL/PLAIN authentication and SSL certificates.

It works by opening tcp sockets on the local machine and proxying connections to the associated Kafka brokers when the sockets are used. The host and port in Metadata and FindCoordinator responses received from the brokers are replaced by local counterparts. For discovered brokers (not configured as the boostrap servers), local listeners are started on random ports. The dynamic local listeners feature can be disabled and an additional list of external server mappings can be provided.

The Proxy can terminate TLS traffic and authenticate users using SASL/PLAIN. The credentials verification method is configurable and uses golang plugin system over RPC.

The proxies can also authenticate each other using a pluggable method which is transparent to other Kafka servers and clients. Currently, the Google ID Token for service accounts is implemented i.e. proxy client requests and sends service account JWT and proxy server receives and validates it against Google JWKS.

Kafka API calls can be restricted to prevent some operations e.g. topic deletion or produce requests.

See:

Supported Kafka versions

Following table provides overview of supported Kafka versions (specified one and all previous Kafka versions). As not every Kafka release adds new messages/versions which are relevant to the Kafka proxy, newer Kafka versions can also work.

Kafka proxy version Kafka version
from 0.11.0
0.2.9 to 2.8.0
0.3.1 to 3.4.0

Install binary release

  1. Download the latest release

    Linux

     curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.8/kafka-proxy-v0.3.8-linux-amd64.tar.gz | tar xz
    

    macOS

     curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.8/kafka-proxy-v0.3.8-darwin-amd64.tar.gz | tar xz
    
  2. Move the binary in to your PATH.

    sudo mv ./kafka-proxy /usr/local/bin/kafka-proxy
    

Building

make clean build

Docker images

Docker images are available on Docker Hub.

You can launch a kafka-proxy container for trying it out with

docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.8 \
          server \
        --bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
        --bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
        --bootstrap-server-mapping "localhost:39092,0.0.0.0:30003" \
        --dial-address-mapping "localhost:19092,172.17.0.1:19092" \
        --dial-address-mapping "localhost:29092,172.17.0.1:29092" \
        --dial-address-mapping "localhost:39092,172.17.0.1:39092" \
        --debug-enable

Kafka-proxy will now be reachable on localhost:30001, localhost:30002 and localhost:30003, connecting to kafka brokers running in docker (network bridge gateway 172.17.0.1) advertising PLAINTEXT listeners on localhost:19092, localhost:29092 and localhost:39092.

Docker images with precompiled plugins

Docker images with precompiled plugins located in /opt/kafka-proxy/bin/ are tagged with <release>-all.

You can launch a kafka-proxy container with auth-ldap plugin for trying it out with

docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.8-all \
              server \
            --bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
            --bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
            --bootstrap-server-mapping "localhost:39092,0.0.0.0:30003" \
            --dial-address-mapping "localhost:19092,172.17.0.1:19092" \
            --dial-address-mapping "localhost:29092,172.17.0.1:29092" \
            --dial-address-mapping "localhost:39092,172.17.0.1:39092" \
            --debug-enable \
            --auth-local-enable  \
            --auth-local-command=/opt/kafka-proxy/bin/auth-ldap  \
            --auth-local-param=--url=ldap://172.17.0.1:389  \
            --auth-local-param=--start-tls=false \
            --auth-local-param=--bind-dn=cn=admin,dc=example,dc=org  \
            --auth-local-param=--bind-passwd=admin  \
            --auth-local-param=--user-search-base=ou=people,dc=example,dc=org  \
            --auth-local-param=--user-filter="(&(objectClass=person)(uid=%u)(memberOf=cn=kafka-users,ou=realm-roles,dc=example,dc=org))"

Help output

Run the kafka-proxy server

Usage:
  kafka-proxy server [flags]

  Flags:
        --auth-gateway-client-command string                   Path to authentication plugin binary
        --auth-gateway-client-enable                           Enable gateway client authentication
        --auth-gateway-client-log-level string                 Log level of the auth plugin (default "trace")
        --auth-gateway-client-magic uint                       Magic bytes sent in the handshake
        --auth-gateway-client-method string                    Authentication method
        --auth-gateway-client-param stringArray                Authentication plugin parameter
        --auth-gateway-client-timeout duration                 Authentication timeout (default 10s)
        --auth-gateway-server-command string                   Path to authentication plugin binary
        --auth-gateway-server-enable                           Enable proxy server authentication
        --auth-gateway-server-log-level string                 Log level of the auth plugin (default "trace")
        --auth-gateway-server-magic uint                       Magic bytes sent in the handshake
        --auth-gateway-server-method string                    Authentication method
        --auth-gateway-server-param stringArray                Authentication plugin parameter
        --auth-gateway-server-timeout duration                 Authentication timeout (default 10s)
        --auth-local-command string                            Path to authentication plugin binary
        --auth-local-enable                                    Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers
        --auth-local-log-level string                          Log level of the auth plugin (default "trace")
        --auth-local-mechanism string                          SASL mechanism used for local authentication: PLAIN or OAUTHBEARER (default "PLAIN")
        --auth-local-param stringArray                         Authentication plugin parameter
        --auth-local-timeout duration                          Authentication timeout (default 10s)
        --bootstrap-server-mapping stringArray                 Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))
        --debug-enable                                         Enable Debug endpoint
        --debug-listen-address string                          Debug listen address (default "0.0.0.0:6060")
        --default-listener-ip string                           Default listener IP (default "0.0.0.0")
        --dial-address-mapping stringArray                     Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
        --dynamic-advertised-listener string                   Advertised address for dynamic listeners. If empty, default-listener-ip is used
        --dynamic-listeners-disable                            Disable dynamic listeners.
        --dynamic-sequential-min-port int                      If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.
        --external-server-mapping stringArray                  Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started
        --forbidden-api-keys ints                              Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics
        --forward-proxy string                                 URL of the forward proxy. Supported schemas are socks5 and http
        --gssapi-auth-type string                              GSSAPI auth type: KEYTAB or USER (default "KEYTAB")
        --gssapi-disable-pa-fx-fast                            Used to configure the client to not use PA_FX_FAST.
        --gssapi-keytab string                                 krb5.keytab file location
        --gssapi-krb5 string                                   krb5.conf file path, default: /etc/krb5.conf (default "/etc/krb5.conf")
        --gssapi-password string                               Password for auth type USER
        --gssapi-realm string                                  Realm
        --gssapi-servicename string                            ServiceName (default "kafka")
        --gssapi-spn-host-mapping stringToString               Mapping of Kafka servers address to SPN hosts (default [])
        --gssapi-username string                               Username (default "kafka")
    -h, --help                                                 help for server
        --http-disable                                         Disable HTTP endpoints
        --http-health-path string                              Path on which to health endpoint (default "/health")
        --http-listen-address string                           Address that kafka-proxy is listening on (default "0.0.0.0:9080")
        --http-metrics-path string                             Path on which to expose metrics (default "/metrics")
        --kafka-client-id string                               An optional identifier to track the source of requests (default "kafka-proxy")
        --kafka-connection-read-buffer-size int                Size of the operating system's receive buffer associated with the connection. If zero, system default is used
        --kafka-connection-write-buffer-size int               Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
        --kafka-dial-timeout duration                          How long to wait for the initial connection (default 15s)
        --kafka-keep-alive duration                            Keep alive period for an active network connection. If zero, keep-alives are disabled (default 1m0s)
        --kafka-max-open-requests int                          Maximal number of open requests pro tcp connection before sending on it blocks (default 256)
        --kafka-read-timeout duration                          How long to wait for a response (default 30s)
        --kafka-write-timeout duration                         How long to wait for a transmit (default 30s)
        --log-format string                                    Log format text or json (default "text")
        --log-level string                                     Log level debug, info, warning, error, fatal or panic (default "info")
        --log-level-fieldname string                           Log level fieldname for json format (default "@level")
        --log-msg-fieldname string                             Message fieldname for json format (default "@message")
        --log-time-fieldname string                            Time fieldname for json format (default "@timestamp")
        --producer-acks-0-disabled                             Assume fire-and-forget is never sent by the producer. Enabling this parameter will increase performance
        --proxy-listener-ca-chain-cert-file string             PEM encoded CA's certificate file. If provided, client certificate is required and verified
        --proxy-listener-cert-file string                      PEM encoded file with server certificate
        --proxy-listener-cipher-suites strings                 List of supported cipher suites
        --proxy-listener-curve-preferences strings             List of curve preferences
        --proxy-listener-keep-alive duration                   Keep alive period for an active network connection. If zero, keep-alives are disabled (default 1m0s)
        --proxy-listener-key-file string                       PEM encoded file with private key for the server certificate
        --proxy-listener-key-password string                   Password to decrypt rsa private key
        --proxy-listener-read-buffer-size int                  Size of the operating system's receive buffer associated with the connection. If zero, system default is used
        --proxy-listener-tls-enable                            Whether or not to use TLS listener
        --proxy-listener-tls-required-client-subject strings   Required client certificate subject common name; example; s:/CN=[value]/C=[state]/C=[DE,PL] or r:/CN=[^val.{2}$]/C=[state]/C=[DE,PL]; check manual for more details
        --proxy-listener-write-buffer-size int                 Sets the size of the operating system's transmit buffer associated with the connection. If zero, system default is used
        --proxy-request-buffer-size int                        Request buffer size pro tcp connection (default 4096)
        --proxy-response-buffer-size int                       Response buffer size pro tcp connection (default 4096)
        --sasl-aws-profile string                              AWS profile
        --sasl-aws-region string                               Region for AWS IAM Auth
        --sasl-enable                                          Connect using SASL
        --sasl-jaas-config-file string                         Location of JAAS config file with SASL username and password
        --sasl-method string                                   SASL method to use (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, AWS_MSK_IAM (default "PLAIN")
        --sasl-password string                                 SASL user password
        --sasl-plugin-command string                           Path to authentication plugin binary
        --sasl-plugin-enable                                   Use plugin for SASL authentication
        --sasl-plugin-log-level string                         Log level of the auth plugin (default "trace")
        --sasl-plugin-mechanism string                         SASL mechanism used for proxy authentication: PLAIN or OAUTHBEARER (default "OAUTHBEARER")
        --sasl-plugin-param stringArray                        Authentication plugin parameter
        --sasl-plugin-timeout duration                         Authentication timeout (default 10s)
        --sasl-username string                                 SASL user name
        --tls-ca-chain-cert-file string                        PEM encoded CA's certificate file
        --tls-client-cert-file string                          PEM encoded file with client certificate
        --tls-client-key-file string                           PEM encoded file with private key for the client certificate
        --tls-client-key-password string                       Password to decrypt rsa private key
        --tls-enable                                           Whether or not to use TLS when connecting to the broker
        --tls-insecure-skip-verify                             It controls whether a client verifies the server's certificate chain and host name
        --tls-same-client-cert-enable                          Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)

Usage example

kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,0.0.0.0:32399"

kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400" \
                   --bootstrap-server-mapping "192.168.99.100:32401,127.0.0.1:32401" \
                   --bootstrap-server-mapping "192.168.99.100:32402,127.0.0.1:32402" \
                   --dynamic-listeners-disable

kafka-proxy server --bootstrap-server-mapping "kafka-0.example.com:9092,0.0.0.0:32401,kafka-0.grepplabs.com:9092" \
                   --bootstrap-server-mapping "kafka-1.example.com:9092,0.0.0.0:32402,kafka-1.grepplabs.com:9092" \
                   --bootstrap-server-mapping "kafka-2.example.com:9092,0.0.0.0:32403,kafka-2.grepplabs.com:9092" \
                   --dynamic-listeners-disable

kafka-proxy server --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400" \
                   --external-server-mapping "192.168.99.100:32401,127.0.0.1:32402" \
                   --external-server-mapping "192.168.99.100:32402,127.0.0.1:32403" \
                   --forbidden-api-keys 20


export BOOTSTRAP_SERVER_MAPPING="192.168.99.100:32401,0.0.0.0:32402 192.168.99.100:32402,0.0.0.0:32403" && kafka-proxy server

Restrict proxy listener cipher suites

kafka-proxy server --bootstrap-server-mapping "localhost:19092,0.0.0.0:30001,localhost:30001" \
                   --bootstrap-server-mapping "localhost:29092,0.0.0.0:30002,localhost:30002" \
                   --bootstrap-server-mapping "localhost:39092,0.0.0.0:30003,localhost:30003" \
                   --proxy-listener-cert-file "tls/ca-cert.pem" \
                   --proxy-listener-key-file "tls/ca-key.pem"  \
                   --proxy-listener-tls-enable \
                   --proxy-listener-cipher-suites TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256

SASL authentication initiated by proxy example

SASL authentication is initiated by the proxy. SASL authentication is disabled on the clients and enabled on the Kafka brokers.

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
                   --tls-enable --tls-insecure-skip-verify \
                   --sasl-enable --sasl-username myuser --sasl-password mysecret

kafka-proxy server --bootstrap-server-mapping "kafka-0.example.com:9092,0.0.0.0:30001" \
                   --bootstrap-server-mapping "kafka-1.example.com:9092,0.0.0.0:30002" \
                   --bootstrap-server-mapping "kafka-1.example.com:9093,0.0.0.0:30003" \
                   --sasl-enable \
                   --sasl-username "alice" \
                   --sasl-password "alice-secret" \
                   --sasl-method "SCRAM-SHA-512" \
                   --log-level debug

make clean build plugin.unsecured-jwt-provider && build/kafka-proxy server \
                         --sasl-enable \
                         --sasl-plugin-enable \
                         --sasl-plugin-mechanism "OAUTHBEARER" \
                         --sasl-plugin-command build/unsecured-jwt-provider \
                         --sasl-plugin-param "--claim-sub=alice" \
                         --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

GSSAPI / Kerberos authentication

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
                   --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
                   --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
                   --sasl-enable \
                   --sasl-method "GSSAPI" \
                   --gssapi-servicename kafka \
                   --gssapi-username kafkaclient1 \
                   --gssapi-realm EXAMPLE.COM \
                   --gssapi-krb5 /etc/krb5.conf \
                   --gssapi-keytab /etc/security/keytabs/kafka.keytab

AWS MSK IAM

kafka-proxy server --bootstrap-server-mapping "b-1-public.kafkaproxycluster.uls9ao.c4.kafka.eu-central-1.amazonaws.com:9198,0.0.0.0:30001" \
                   --bootstrap-server-mapping "b-2-public.kafkaproxycluster.uls9ao.c4.kafka.eu-central-1.amazonaws.com:9198,0.0.0.0:30002" \
                   --bootstrap-server-mapping "b-3-public.kafkaproxycluster.uls9ao.c4.kafka.eu-central-1.amazonaws.com:9198,0.0.0.0:30003" \
                   --tls-enable --tls-insecure-skip-verify \
                   --sasl-enable \
                   --sasl-method "AWS_MSK_IAM" \
                   --sasl-aws-region "eu-central-1" \
                   --log-level debug

Proxy authentication example

SASL authentication is performed by the proxy. SASL authentication is enabled on the clients and disabled on the Kafka brokers.

make clean build plugin.auth-user && build/kafka-proxy server --proxy-listener-key-file "server-key.pem"  \
                         --proxy-listener-cert-file "server-cert.pem" \
                         --proxy-listener-ca-chain-cert-file "ca.pem" \
                         --proxy-listener-tls-enable \
                         --auth-local-enable \
                         --auth-local-command build/auth-user \
                         --auth-local-param "--username=my-test-user" \
                         --auth-local-param "--password=my-test-password"

make clean build plugin.auth-ldap && build/kafka-proxy server \
                         --auth-local-enable \
                         --auth-local-command build/auth-ldap \
                         --auth-local-param "--url=ldaps://ldap.example.com:636" \
                         --auth-local-param "--user-dn=cn=users,dc=exemple,dc=com" \
                         --auth-local-param "--user-attr=uid" \
                         --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

make clean build plugin.unsecured-jwt-info && build/kafka-proxy server \
                         --auth-local-enable \
                         --auth-local-command build/unsecured-jwt-info \
                         --auth-local-mechanism "OAUTHBEARER" \
                         --auth-local-param "--claim-sub=alice" \
                         --auth-local-param "--claim-sub=bob" \
                         --bootstrap-server-mapping "192.168.99.100:32400,127.0.0.1:32400"

Same client certificate check enabled example

Validate that client certificate used by proxy client is exactly the same as client certificate in authentication initiated by proxy

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9093,0.0.0.0:32399" \
   --tls-enable \
   --tls-client-cert-file client.crt \
   --tls-client-key-file client.pem \
   --tls-client-key-password changeit \
   --proxy-listener-tls-enable \
   --proxy-listener-key-file server.pem \
   --proxy-listener-cert-file server.crt \
   --proxy-listener-key-password changeit \
   --proxy-listener-ca-chain-cert-file ca.crt \
   --tls-same-client-cert-enable

Kafka Gateway example

Authentication between Kafka Proxy Client and Kafka Proxy Server with Google-ID (service account JWT)

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
                   --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
                   --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
                   --dynamic-listeners-disable \
                   --http-disable \
                   --proxy-listener-tls-enable \
                   --proxy-listener-cert-file=/var/run/secret/server.cert.pem \
                   --proxy-listener-key-file=/var/run/secret/server.key.pem \
                   --auth-gateway-server-enable \
                   --auth-gateway-server-method google-id \
                   --auth-gateway-server-magic 3285573610483682037 \
                   --auth-gateway-server-command google-id-info \
                   --auth-gateway-server-param  "--timeout=10" \
                   --auth-gateway-server-param  "--audience=tcp://kafka-gateway.grepplabs.com" \
                   --auth-gateway-server-param  "--email-regex=^[email protected]$"

kafka-proxy server --bootstrap-server-mapping "127.0.0.1:32500,127.0.0.1:32400" \
                   --bootstrap-server-mapping "127.0.0.1:32501,127.0.0.1:32401" \
                   --bootstrap-server-mapping "127.0.0.1:32502,127.0.0.1:32402" \
                   --dynamic-listeners-disable \
                   --http-disable \
                   --tls-enable \
                   --tls-ca-chain-cert-file /var/run/secret/client/ca-chain.cert.pem \
                   --auth-gateway-client-enable \
                   --auth-gateway-client-method google-id \
                   --auth-gateway-client-magic 3285573610483682037 \
                   --auth-gateway-client-command google-id-provider \
                   --auth-gateway-client-param  "--credentials-file=/var/run/secret/client/service-account.json" \
                   --auth-gateway-client-param  "--target-audience=tcp://kafka-gateway.grepplabs.com" \
                   --auth-gateway-client-param  "--timeout=10"

Connect to Kafka through SOCKS5 Proxy example

Connect through test SOCKS5 Proxy server

    kafka-proxy tools socks5-proxy --addr localhost:1080

    kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
                       --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
                       --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502"
                       --forward-proxy socks5://localhost:1080
    kafka-proxy tools socks5-proxy --addr localhost:1080 --username my-proxy-user --password my-proxy-password

    kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
                       --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
                       --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
                       --forward-proxy socks5://my-proxy-user:my-proxy-password@localhost:1080

Connect to Kafka through HTTP Proxy example

Connect through test HTTP Proxy server using CONNECT method

    kafka-proxy tools http-proxy --addr localhost:3128

    kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
                       --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
                       --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502"
                       --forward-proxy http://localhost:3128
    kafka-proxy tools http-proxy --addr localhost:3128 --username my-proxy-user --password my-proxy-password

    kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
                       --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
                       --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
                       --forward-proxy http://my-proxy-user:my-proxy-password@localhost:3128

Validating client certificate DN

Sometimes it might be necessary to not only validate that the client certificate is valid but also that the client certificate DN is issued for a concrete use case. This can be achieved using the following set of arguments:

--proxy-listener-tls-client-cert-validate-subject bool                        Whether to validate client certificate subject (default false)
--proxy-listener-tls-required-client-subject-common-name string               Required client certificate subject common name
--proxy-listener-tls-required-client-subject-country stringArray              Required client certificate subject country
--proxy-listener-tls-required-client-subject-province stringArray             Required client certificate subject province
--proxy-listener-tls-required-client-subject-locality stringArray             Required client certificate subject locality
--proxy-listener-tls-required-client-subject-organization stringArray         Required client certificate subject organization
--proxy-listener-tls-required-client-subject-organizational-unit stringArray  Required client certificate subject organizational unit

By setting --proxy-listener-tls-client-cert-validate-subject true, Kafka Proxy will inspect client certificate DN fields for the expected values set with the --proxy-listener-tls-required-client-* arguments. The matches are always exact and used together, fo all non empty values. For example, to allow a valid certificate for country=DE and organization=grepplabs, configure Kafka Proxy in the following way:

    kafka-proxy server \
      --proxy-listener-tls-client-cert-validate-subject true \
      --proxy-listener-tls-required-client-subject-country DE \
      --proxy-listener-tls-required-client-subject-organization grepplabs

Kubernetes sidecar container example

---
apiVersion: apps/v1
kind: Deployment
metadata:
   name: myapp
spec:
  replicas: 1
  selector:
    matchLabels:
      app: myapp
  template:
    metadata:
      labels:
        app: myapp
      annotations:
        prometheus.io/scrape: 'true'
    spec:
      containers:
        - name: kafka-proxy
          image: grepplabs/kafka-proxy:latest
          args:
            - 'server'
            - '--log-format=json'
            - '--bootstrap-server-mapping=kafka-0:9093,127.0.0.1:32400'
            - '--bootstrap-server-mapping=kafka-1:9093,127.0.0.1:32401'
            - '--bootstrap-server-mapping=kafka-2:9093,127.0.0.1:32402'
            - '--tls-enable'
            - '--tls-ca-chain-cert-file=/var/run/secret/kafka-ca-chain-certificate/ca-chain.cert.pem'
            - '--tls-client-cert-file=/var/run/secret/kafka-client-certificate/client.cert.pem'
            - '--tls-client-key-file=/var/run/secret/kafka-client-key/client.key.pem'
            - '--tls-client-key-password=$(TLS_CLIENT_KEY_PASSWORD)'
            - '--sasl-enable'
            - '--sasl-jaas-config-file=/var/run/secret/kafka-client-jaas/jaas.config'
          env:
          - name: TLS_CLIENT_KEY_PASSWORD
            valueFrom:
              secretKeyRef:
                name: tls-client-key-password
                key: password
          volumeMounts:
          - name: "sasl-jaas-config-file"
            mountPath: "/var/run/secret/kafka-client-jaas"
          - name: "tls-ca-chain-certificate"
            mountPath: "/var/run/secret/kafka-ca-chain-certificate"
          - name: "tls-client-cert-file"
            mountPath: "/var/run/secret/kafka-client-certificate"
          - name: "tls-client-key-file"
            mountPath: "/var/run/secret/kafka-client-key"
          ports:
          - name: metrics
            containerPort: 9080
          livenessProbe:
            httpGet:
              path: /health
              port: 9080
            initialDelaySeconds: 5
            periodSeconds: 3
          readinessProbe:
            httpGet:
              path: /health
              port: 9080
            initialDelaySeconds: 5
            periodSeconds: 10
            timeoutSeconds: 5
            successThreshold: 2
            failureThreshold: 5
        - name: myapp
          image: myapp:latest
          ports:
          - containerPort: 8080
            name: metrics
          env:
          - name: BOOTSTRAP_SERVERS
            value: "127.0.0.1:32400,127.0.0.1:32401,127.0.0.1:32402"
      volumes:
      - name: sasl-jaas-config-file
        secret:
          secretName: sasl-jaas-config-file
      - name: tls-ca-chain-certificate
        secret:
          secretName: tls-ca-chain-certificate
      - name: tls-client-cert-file
        secret:
          secretName: tls-client-cert-file
      - name: tls-client-key-file
        secret:
          secretName: tls-client-key-file

Connect to Kafka running in Kubernetes example (kafka proxy runs in cluster)

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
   name: kafka-proxy
spec:
  selector:
    matchLabels:
      app: kafka-proxy
  replicas: 1
  serviceName: kafka-proxy
  template:
    metadata:
      labels:
        app: kafka-proxy
    spec:
      containers:
        - name: kafka-proxy
          image: grepplabs/kafka-proxy:latest
          args:
            - 'server'
            - '--log-format=json'
            - '--bootstrap-server-mapping=kafka-0:9093,127.0.0.1:32400'
            - '--bootstrap-server-mapping=kafka-1:9093,127.0.0.1:32401'
            - '--bootstrap-server-mapping=kafka-2:9093,127.0.0.1:32402'
            - '--tls-enable'
            - '--tls-ca-chain-cert-file=/var/run/secret/kafka-ca-chain-certificate/ca-chain.cert.pem'
            - '--tls-client-cert-file=/var/run/secret/kafka-client-certificate/client.cert.pem'
            - '--tls-client-key-file=/var/run/secret/kafka-client-key/client.key.pem'
            - '--tls-client-key-password=$(TLS_CLIENT_KEY_PASSWORD)'
            - '--sasl-enable'
            - '--sasl-jaas-config-file=/var/run/secret/kafka-client-jaas/jaas.config'
            - '--proxy-request-buffer-size=32768'
            - '--proxy-response-buffer-size=32768'
            - '--proxy-listener-read-buffer-size=32768'
            - '--proxy-listener-write-buffer-size=131072'
            - '--kafka-connection-read-buffer-size=131072'
            - '--kafka-connection-write-buffer-size=32768'
          env:
          - name: TLS_CLIENT_KEY_PASSWORD
            valueFrom:
              secretKeyRef:
                name: tls-client-key-password
                key: password
          volumeMounts:
          - name: "sasl-jaas-config-file"
            mountPath: "/var/run/secret/kafka-client-jaas"
          - name: "tls-ca-chain-certificate"
            mountPath: "/var/run/secret/kafka-ca-chain-certificate"
          - name: "tls-client-cert-file"
            mountPath: "/var/run/secret/kafka-client-certificate"
          - name: "tls-client-key-file"
            mountPath: "/var/run/secret/kafka-client-key"
          ports:
          - name: metrics
            containerPort: 9080
          - name: kafka-0
            containerPort: 32400
          - name: kafka-1
            containerPort: 32401
          - name: kafka-2
            containerPort: 32402
          livenessProbe:
            httpGet:
              path: /health
              port: 9080
            initialDelaySeconds: 5
            periodSeconds: 3
          readinessProbe:
            httpGet:
              path: /health
              port: 9080
            initialDelaySeconds: 5
            periodSeconds: 10
            timeoutSeconds: 5
            successThreshold: 2
            failureThreshold: 5
          resources:
            requests:
              memory: 128Mi
              cpu: 1000m
      restartPolicy: Always
      volumes:
      - name: sasl-jaas-config-file
        secret:
          secretName: sasl-jaas-config-file
      - name: tls-ca-chain-certificate
        secret:
          secretName: tls-ca-chain-certificate
      - name: tls-client-cert-file
        secret:
          secretName: tls-client-cert-file
      - name: tls-client-key-file
        secret:
          secretName: tls-client-key-file
kubectl port-forward kafka-proxy-0 32400:32400 32401:32401 32402:32402

Use localhost:32400, localhost:32401 and localhost:32402 as bootstrap servers

Connect to Kafka running in Kubernetes example (kafka proxy runs locally)

one node Kafka cluster

kafka.properties

broker.id=0
advertised.listeners=PLAINTEXT://kafka-0.kafka-headless.kafka:9092
...
kubectl port-forward -n kafka kafka-0 9092:9092
kafka-proxy server --bootstrap-server-mapping "127.0.0.1:9092,0.0.0.0:19092" --dial-address-mapping "kafka-0.kafka-headless.kafka:9092,0.0.0.0:9092"

Use localhost:19092 as bootstrap servers

3 nodes Kafka cluster

strimzi 0.13.0 CRD

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: test-cluster
  namespace: kafka
spec:
  kafka:
    version: 2.3.0
    replicas: 3
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      num.partitions: 60
      default.replication.factor: 3
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 20Gi
          deleteClaim: true
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 5Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}
kubectl port-forward -n kafka test-cluster-kafka-0 9092:9092
kubectl port-forward -n kafka test-cluster-kafka-1 9093:9092
kubectl port-forward -n kafka test-cluster-kafka-2 9094:9092

kafka-proxy server --log-level debug \
  --bootstrap-server-mapping "127.0.0.1:9092,0.0.0.0:19092" \
  --bootstrap-server-mapping "127.0.0.1:9093,0.0.0.0:19093" \
  --bootstrap-server-mapping "127.0.0.1:9094,0.0.0.0:19094" \
  --dial-address-mapping "test-cluster-kafka-0.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9092" \
  --dial-address-mapping "test-cluster-kafka-1.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9093" \
  --dial-address-mapping "test-cluster-kafka-2.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9094"

Use localhost:19092 as bootstrap servers

Embedded third-party source code

kafka-proxy's People

Contributors

7hong avatar antvick avatar asatsi avatar chrisss93 avatar dependabot[bot] avatar everesio avatar fhke avatar henter avatar ianwormsbecker-ssimwave avatar ijrsvt avatar mgusiew-guide avatar mikekap avatar radekg avatar rgruchalski-klarrio avatar ronaldpetty avatar samhagan avatar smithjohntaylor avatar smoya avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-proxy's Issues

Multiple kafka-proxy servers to balance the traffic

In a scenario that we have 12 brokers in the cluster, can we setup 3 kafka-proxy servers like below, so that each kafka-proxy will handle traffic for 4 brokers?

kafka-proxy_by_grepplabs

It's not quite clear to me from the README, maybe each kafka-proxy must have all 12 brokers mapped to 12 ports, and we have to put an ALB (in AWS) in front. Please advise.

SASL SCRAM support

We have been using the proxy with great success to date using both PLAINTEXT and SSL endpoints. We are now wanting to do SASL authentication with SCRAM (our hosting provider uses SCRAM256 rather than PLAIN).

As best I can tell kafka-proxy currently doesn't have support for this.

I believe I will need to add a *Auth struct in sasl_by_proxy.go to handle the SCRAM portion as well as providing some command line parameters.

Any chance someone has worked on this already and has some code sitting around? If not I will likely get started hacking away.

Kafka Authorization

Hi,

i am thinking of authorizing kafka with kafka-proxy, would it be possible to pass kafka message types to plugins?

QUESTION: Using this with Confluent Cloud

QUESTION:

Hi,

For SASL_PLAIN - does this pass through the SASL that i put on my clients to the brokers or is it intended to have it's own SASL credentials which are used for all upstream clients?

Background
I'm having a play with this against confluent cloud with little success so far. I'm attempting to use it as a proxy because the CCloud brokers are in an external vpc which i can't connect directly to.

Regards,
Jon

sasl passthrough

Can/how Kafka-proxy be enhanced with a change where the client passes SASL credentials to Kafka proxy and Kafka proxy just forwards the client provided SASL credentials to Kafka broker for authentication and authorization purposes?

Having issues with AWS MSK with TLS enabled

Hi,

I am getting this when a producer contacts MSK through kafka-proxy with TLS.

INFO[2019-12-08T10:34:11Z] Reading data from local connection on 10.20.6.169:9092 from 10.20.12.42:40272 (b-1.msk-a.uv35cd.c3.kafka.eu-west-1.amazonaws.com:9094) had error: tls: received unexpected handshake message of type *tls.clientHelloMsg when waiting for *tls.clientKeyExchangeMsg

I ran the kafka-proxy with Client auth and without it.. I had the same problem, as long as I have TLS enabled with msk cluster.

./kafka-proxy server --proxy-listener-tls-enable --tls-enable --proxy-listener-key-file "ssl.key" --proxy-listener-cert-file "server-cert.pem" --debug-enable --log-level debug --bootstrap-server-mapping "b-1.msk-a.uv35cd.c3.kafka.eu-west-1.amazonaws.com:9094,0.0.0.0:9092,msk.a.sandbox.aws.corpinc.com:9092" --bootstrap-server-mapping "b-2.msk-a.uv35cd.c3.kafka.eu-west-1.amazonaws.com:9094,0.0.0.0:9093,msk.a.sandbox.aws.corpinc.com:9093" --bootstrap-server-mapping "b-3.msk-a.uv35cd.c3.kafka.eu-west-1.amazonaws.com:9094,0.0.0.0:9094,msk.a.sandbox.aws.corpinc.com:9094"

./kafka-console-producer.sh --broker-list grappler.msk.a.sandbox.aws.corpinc.com:9092 --topic tls.tested.test --producer.config client.properties

client.properties:
security.protocol=SSL
ssl.truststore.location=kafka.client.truststore.jks
ssl.truststore.password=changeit

I am using current versions :

  • For Kafka producer : kafka_2.12-2.2.1
  • MSK with in-transit encryption enabled and 2.2.1 kafka version.

Question: Kubernetes: External IP, Service?

Hi,

Is there any recommendation for deploy Kafka-proxy in Kubernetes and using a Service type LoadBalancing?
I did deploy it but I have to bind to and external IP and I don't have it on Pod.
I can't map to my LoadBalance hostname/ip:

 --bootstrap-server-mapping=original-bootstrap:9092,external-ip-hostname:9092", 

Any ideas?
Thanks a lot!

Use case different than sidecar

Hi kafka-proxy contributors,

I'd like to know if kafka-proxy can be used in such a way different than the sidecar architecture. The project README basically mentions the use of localhost for accessing kafka-proxy, and I need it much more as a reverse proxy for accessing the backend Kafka brokers.

Thanks for any help.

Does the proxy support rotation of certificates?

We have a requirement for short-lived client certs, unfortunately the Kafka Java clients do not support hot-swapping of certs so this requires us to restart the consumers which subsequently trigger re-balances of the consumer group.

I was looking into a proxy option and wondered if such a setup would be possible with kafka-proxy? Feel like there must be others with this issue and this could be a great use for the proxy.

It looks like there is a PR for librdkafka to achieve this for clients which use this lib, though this is not yet merged.
confluentinc/librdkafka#2868

QUESTION: So what config would I use to simply pass through SASL?

Given the answer to a previous question - that SASL will get passed through by default - does this mean that i configure my java client with the SASL properties for the brokers and it will get passed transparently through the kafka proxy when i use the kafka-proxy as my bootstrap-servers?

I've been fiddling around with the settings to achieve this and i'm up to the point where i'm getting net address mapping not faund for my brokers which appear to have been renamed to b2- (i.e. b2-pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092 for the config below). Do i need to configure something in external-server-mappings perhaps - i don't actually understand what external-server-mappings actually does - I originally thought it might be instead of bootstrap-server-mapping where the kafka broker is on a host that's not the same as the one the kafka-proxy is on, but that doesn't appear to be so.

I'm presuming some kind of dynamic proxy is set up on the kafka-proxy and the path is client -> kafka-proxy -> kafka-proxy dynamic proxy -> kafka-brokers?

I think perhaps I'm a little confused.

Background

my java client is configured:

ssl.truststore.location=dev-kafka.truststore.jks
ssl.truststore.password=changeit
ssl.truststore.type=PKCS12
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=worker-0.some.domain.name.com:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="someusername" password\="somepassword";
security.protocol=SASL_SSL

and my kafka-proxy is run as:

kafka-proxy server \
	--bootstrap-server-mapping "pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092,0.0.0.0:9092" \
	--tls-enable \
	--tls-insecure-skip-verify \
	--proxy-listener-tls-enable \
	--proxy-listener-key-file=/root/dev-kafka-server.private.key \
	--proxy-listener-cert-file=/root/dev-kafka-server.signed.cer \
	--dynamic-listeners-disable

Error's I'm seeing are:

INFO[2018-10-08T17:17:51+13:00] New connection for pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092 
INFO[2018-10-08T17:17:51+13:00] Reading data from pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092 had error: net address mapping for b2-pkc-ep0ml.ap-southeast-2.aws.confluent.cloud:9092 was not found 

How to access the proxy pod running in cluster externally via a service ?

Question?

Is it possible to access this example externally via a service (exposed via LoadBalancer) ?

https://github.com/grepplabs/kafka-proxy#connect-to-kafka-running-in-kubernetes-example-kafka-proxy-runs-in-cluster

This example works via the kubectl port-forward command but when we try to expose the same pod as a service, it does not work. Any feedback for this ?

What is the best way to do this ? The following example of the deployed proxy pod does not work.
kubectl expose pod/kafka-proxy-0 -n default --type LoadBalancer --name kafka-proxy-exposed

Propagate Client Certificate

Hi there,

Is there any way to propagate the client certificate on the proxy?

We are authenticating our clients using their certificate and we would like that proxy presents the certificate to the server as it is sent by the client.

Cheers,

Elton

Kerberos authentication to Kafka brokers

Looks like it's not possible for a client behind kafka-proxy or kafka-proxy itself to authenticate themselves to Kafka brokers with Kerberos credentials? How difficult could it be to support this use case? I can see the underlying 3rd-party Sarama library supports Kerberos authentication.

Also, would you happen to know if there exists a workaround to achieve the subj with the current version?

Thanks for doing a great job.

Ftr : want to parse the content of each Kafka protocol package

Hello, I am using this library as a proxy for our Kafka cluster. Now we have a requirement to change some data of Kafka packets, such as topic content, at the Kafka proxy level. So we need to parse each package, rather than simply do 4-tier proxy. I hope we can discuss this . thank you very mush~

Create request/response

i thought about creating request/response programatically with Struct and Schema, i think it could be useful for testing purposes as well for creating responses nicely
I had these problems:

  1. Struct doesn't accept Schema, it just accepts *schema and NewSchema returns Schema, i created new constructor NewSchemaStruct which returns *schema and created request:
	schemaTopicMetadata := NewSchemaStruct("topic_metadata_v0",
		&field{name: "key", ty: typeInt16},
		&field{name: "version", ty: typeInt16},
		&field{name: "correlation_id", ty: typeInt32},
		&field{name: "client_id", ty: typeStr},
		&array{name: "topic", ty: typeStr},
	)

	topicMetadata := Struct{
		schema: schemaTopicMetadata,
		values: make([]interface{}, 0),
	}

	topicMetadata.values = append(topicMetadata.values, int16(0))
	topicMetadata.values = append(topicMetadata.values, int16(0))
	topicMetadata.values = append(topicMetadata.values, int32(0))
	topicMetadata.values = append(topicMetadata.values, "custom")
	arr := make([]interface{}, 0)
	arr = append(arr, "topic1")
	arr = append(arr, "topic2")
	topicMetadata.values = append(topicMetadata.values, arr)

	encoded, err := EncodeSchema(&topicMetadata, schemaTopicMetadata)
  1. Second problem, when i have schema created already i cannot retrieve schema of some of it's fields e.g.:
        type FetchRequest_v0 struct {}
        func (*f FetchRequest_v0) getSchema() Schema {
	partitionSchema := NewSchema("partition_schema",
		&field{name: "partition", ty: typeInt32},
		&field{name: "fetch_offset", ty: typeInt64},
		&field{name: "partition_max_bytes", ty: typeInt32},
	)

	topicSchema := NewSchema("topic_schema",
		&field{name: "topic_name", ty: typeStr},
		&array{name: "partitions", ty: partitionSchema},
	)

	schemaFetch := NewSchema("fetch_v0",
		&field{name: "key", ty: typeInt16},
		&field{name: "version", ty: typeInt16},
		&field{name: "correlation_id", ty: typeInt32},
		&field{name: "client_id", ty: typeStr},
		&field{name: "reaplica_id", ty: typeInt32},
		&field{name: "max_wait_time", ty: typeInt32},
		&field{name: "min_bytes", ty: typeInt32},
		&array{name: "topics", ty: topicSchema},
	)

        return schemaFetch
       }

       req := &FetchRequest_v0{}
       topicSchema := req.GetSchema().fieldsByName["topics"].def.ty - ???? (cannot use this because in Schema constructor 
we assert fields as Fields in NewSchema - Fields doesn't have method for schema gathering)
	someStruct := Struct{
		schema: topicSchema,
		values: make([]interface{}, 0),
	}
        ....add some values to someStruct
  1. We could solve problem when Struct would accept Schema but then we would need to add additional methods to Schema interface for getting fieldByName and fields - this occurs on quite many places in code
  2. to solve this we could add additional method to Field interface - e.g. GetSchema and implement on array, field etc....types - this would be quite non-intrusive
  3. i would like to ask also about EncodeSchema function, requires Struct (which itself contains schema) and also Schema (which practically contains same info as schema in Struct) it is duplication of information, not sure if it is needed

I can create pull requests but i would like to discuss it first as i don't know the code so deeply

had error: api key -10495 is invalid

I keep getting "had error: api key -10495 is invalid" when I try to proxy a Kafka cluster with SASL. Is this because the Kafka broker version is too old?
I'm using SASL plain with TLS enabled and the handshake is successful.
Could someone help?

write a message and kafka report EOFException

hi,
I have a kafka(v2.5.0) cluster(3 brokers ).I start up kafka-proxy(v0.2.8) in another machine,And send a message by kafka-console-producer ,the message does not write into topic,Kafka throw a java.io.EOFException.

kafka-console-producer

./kafka-console-producer.sh --broker-list proxyip:32400,proxyip:32401,proxyip:32402 --topic test
>123

kafka-console-producer log

[2020-11-15 05:41:34,272] WARN [Producer clientId=console-producer] Connection to node 3 (/0.0.0.0:32402) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,371] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,472] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,573] WARN [Producer clientId=console-producer] Connection to node 3 (/0.0.0.0:32402) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,675] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,776] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,876] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:34,977] WARN [Producer clientId=console-producer] Connection to node 2 (/0.0.0.0:32401) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-11-15 05:41:35,078] WARN [Producer clientId=console-producer] Connection to node 1 (/0.0.0.0:32400) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

kafka-proxy

kafka-proxy server \
--bootstrap-server-mapping "broker1:9093,0.0.0.0:32400" \
--bootstrap-server-mapping "broker2:9093,0.0.0.0:32401" \
--bootstrap-server-mapping "broker3:9093,0.0.0.0:32402" \
--sasl-enable \
--sasl-username "admin" \
--sasl-password "admin" \
--log-level debug

kafka-proxy log

INFO[2020-11-15T05:41:32-05:00] New connection for broker3:9093        
DEBU[2020-11-15T05:41:32-05:00] Sending SaslHandshakeRequest mechanism: PLAIN  version: 0 
DEBU[2020-11-15T05:41:32-05:00] Successful SASL handshake. Available mechanisms: [PLAIN] 
DEBU[2020-11-15T05:41:32-05:00] Sending authentication opaque packets, mechanism PLAIN 
DEBU[2020-11-15T05:41:33-05:00] Kafka request key 18, version 3, length 52   
DEBU[2020-11-15T05:41:33-05:00] Kafka response key 18, version 3, length 348 
DEBU[2020-11-15T05:41:33-05:00] Kafka request key 3, version 9, length 32    
DEBU[2020-11-15T05:41:33-05:00] Kafka response key 3, version 9, length 112  
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker1:9093, listener=0.0.0.0:32401, advertised=0.0.0.0:32401 
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker2:9093, listener=0.0.0.0:32402, advertised=0.0.0.0:32402 
DEBU[2020-11-15T05:41:33-05:00] Address mappings broker=broker3:9093, listener=0.0.0.0:32400, advertised=0.0.0.0:32400 
INFO[2020-11-15T05:41:37-05:00] Client closed local connection on 192.168.4.60:32400 from broker3:40750 (broker3:9093)

kafka log

[2020-11-15 05:34:46,830] DEBUG Processor 3 listening to new connection from /proxyip:54714 (kafka.network.Processor)
[2020-11-15 05:34:46,830] DEBUG Accepted connection from /proxyip:54714 on /broker1:9093 and assigned it to processor 3, sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400] (
kafka.network.Acceptor)
[2020-11-15 05:34:46,830] DEBUG connections.max.reauth.ms for mechanism=PLAIN: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Handling Kafka request SASL_HANDSHAKE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,830] DEBUG Using SASL mechanism 'PLAIN' provided by client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG Set SASL server state to AUTHENTICATE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG Authentication complete; session max lifetime from broker config=0 ms, no credential expiration; no session expiration, sending 0 ms to client (org.apache.kafka.common.security.authenticator.SaslServerAu
thenticator)
[2020-11-15 05:34:46,843] DEBUG Set SASL server state to COMPLETE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2020-11-15 05:34:46,843] DEBUG [SocketServer brokerId=1] Successfully authenticated with /proxyip (org.apache.kafka.common.network.Selector)
[2020-11-15 05:34:46,854] DEBUG [SocketServer brokerId=1] Connection with /proxyip disconnected (org.apache.kafka.common.network.Selector)
java.io.EOFException
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:97)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
        at kafka.network.Processor.poll(SocketServer.scala:861)
        at kafka.network.Processor.run(SocketServer.scala:760)
        at java.lang.Thread.run(Thread.java:748)


thanks fo any help

Minimum or Basic ACL or Authorization at the Kafka Proxy Level - any options to implement ?

Hello,

Currently we have implemented an ldap oriented authentication with SASL authentication performed by the proxy.
SASL authentication is enabled on the clients and disabled on the Kafka brokers keeping the implementation simple.

Are there any options for us now to keep authentication disabled on the brokers but maintain basic ACL or Properties for Authorization based on roles mapped to users at the KAFKA PROXY Level ? It need not be as complicated as Kafka ACLs but is there such an option for us now or in the coming future ?

Any feedback or help will be greatly appreciated.

Thanks
@ashishgidh

Create Custom Plugins?

Question:

Would it be possible to develop and integrate custom plugins to perform Authentication and Authorization with kafka-proxy server?

Use case:

  • Custom plugin will handle authentication and authorization with an external service
  • Authorization will be a simple role check and can be done once on start up instead of a per message basis
  • Upon success of both checks the plugin would return a successful response and have kafka-proxy server authenticate using SASL with the kafka brokers

Looking at the plugin code I think this would be possible by reusing the plugin/local-auth interface to implement a custom plugin. The plugin would return the bool "authenticated" which I think would be sufficient for my use case

Q: Connect to brokers with PEM and KEY

I would like to know how can I run/configure the kafka-proxy cli to connect to a cluster (brokers) asking for a certificate.

I have the PEM and KEY files, and I can connect using Java with a custom Keystore with those files.

How can I replicate this on the kafka-proxy client.

Proxy support for outgoing HTTP/HTTPS connections

Retrieval of google certificates (google-id-token) should go through the proxy server.
The implementation should be similar to Java Proxy settings:
a. http.proxyHost, http.proxyPort
b. htttps.proxyHost, https.proxyPort
c. socksProxyHost, socksProxyPort

Example using kubernetes service type: NodePort

Hello,

What is the appropriate configuration for exposing kafka-proxy over a NodePort service? I've looked at #30 and #50 but have been unable to get it working.

NOTE:
Both kafka-proxy and kafka are running inside kubernetes (using kind). I am using strimzi for deploying kafka.

Thanks!

Add support for Kafka 2.8

Hi,

Thanks for the great project.
I've noticed that using kafka 2.7.1 CLI clients (e.g. kafka-topics.sh) works nicely with kafka-proxy, while using version 2.8.0 returns the following error in the kafka-proxy log:

{"@level":"info","@message":"Reading data from kafka-0.kafka:9092 had error: Unsupported response schema version 11 for key 3 ","@timestamp":"2021-07-08T10:43:21Z"}

Kafka broker version: 2.8.0
Kafka client: 2.8.0
kafka-proxy: 0.2.8

mutual ssl based authentication

Hi,
is it possible to setup mutual ssl based authentication between client and kafka-proxy. Not between kafka-proxy and kafka broker, but between client and kafka-proxy (client<->kafka-proxy)?
What I need is, that kafka client users are authenticating themselves against kafka-proxy with their unique client tls certificates before they are routed to the kafka-broker from kafka-proxy.
Thanks.

Proxy initiated authentication with "dynamic" SASL credentials?

Question:

I have a scenario where I would like kafka-proxy to initiate SASL PLAIN with a kafka broker using credentials passed in from the client.

Proposed flow:

  • client does mutual TLS with kafka-proxy
  • Client TLS cert would contain the SASL credentials embedded (Probably in the CNAME)
  • kafka-proxy would have a new flag to enable this passthrough route
  • kafka-proxy parses the credentials out of the client TLS cert and uses those to initiate SASL

Looking at the code I think the most relevant changes could be done in client.go ( I realize other parts of the code would need to be changed as well):

  • handleConn would parse the credentials from the x509 Certificate
    • create a new SASLPlainAuth struct from Client.saslAuthByProxy and copy the username/password extracted from the TLS cert
  • DialAndAuth and auth() would be modified to take in this "dynamic" SASLPlainAuth struct and call sendAndReceiveSASLAuth on that same struct to connect to the broker

Would you be open to supporting this pattern? If so we are more than happy to contribute the needed changes.

List of supported Kafka versions

Hello! Thanks for this great project, Im looking to understand the versions of Kafka that are supported, but the only thing I could find was Kafka 2.4.0 support in v0.2.0. Is that the latest version that is supported?

I think it would be helpful to add a list of them to the README, I tried parsing it from code but was unable to.

Thanks!

Multiple proxies with different client certificates

Hi, I have configured Kafka with client certificate auth. I have multiple client private keys for client certificates. I can setup two kafka-proxies on different ports with different value for --tls-client-key-file parameter. The question is if such setup will work ? I noticed that in LoadBalancer example (https://gist.github.com/everesio/e6fae11ed69099ae3f867eddcd83db82) Kafka-proxy pods are exposed as singleton StatefulSets so just want to make sure that two instances which proxy different types of clients (based on client certificates) will not cause some conflicts/race conditions.

It is also interesting why StatefulSet is needed (the problem with StatefulSet is that when pod goes into unknown state manual intervention is required to decide if to replace it or not). Is it Kafka protocol that requires network identity to remain the same or statefulness of protocol calls or maybe something else ?

TLS Handshake Error with auth-ldap

Hi @gustavomcarmo

I tried your suggestion and tests with TLS and auth-ldap. All works fine without TLS but I'am getting a handshake error. Not sure what is missing.

Here are my details in case you can catch it for us (attaching error.txt)
error.txt

Any help is highly appreciated. Thanks.

===================================

QUESTION: does tls_enable turn on TLS for client side, server side, or both?

I have not experimented yet, but would like to know - does the tls_enable flag turn on TLS for connections from kafka-proxy to the kafka brokers? Or from clients to kafka-proxy? Or both?

If both, how would you configure different certificates for each side of the connection?

If only one of the above, are there plans to enable TLS for the other side?

Running proxy with tls-enable works, but not with proxy-listener-tls-enable

Hi, I am trying to use the kafka-proxy as a way to do cross-VPC access for AWS MSK. First of all, I am trying to set up the proxy within the same VPC as the MSK. I am able to make it work with --tls-enable only. The proxy will be started as below

./kafka-proxy server \
        --bootstrap-server-mapping "b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3001" \
        --bootstrap-server-mapping "b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3002" \
        --dynamic-listeners-disable \
        --kafka-client-id client \
        --tls-enable \
        --tls-ca-chain-cert-file "/home/ec2-user/ssl2/truststore-certs.pem" \
        --tls-client-cert-file "/home/ec2-user/ssl2/keystore-certs.pem" \
        --tls-client-key-file  "/home/ec2-user/ssl2/key.pem" \
        --tls-client-key-password pass123

then connect locally using bin/kafka-console-consumer.sh --bootstrap-server ip-10-1-0-94.ec2.internal:3001 --topic AWSKafkaTutorialTopic2 --from-beginning. Everything works fine.

However, when I tried to add the TLS between client and proxy, things start to break. I have done 3 experiments, and I connect to all 3 using bin/kafka-console-consumer.sh --bootstrap-server ip-10-1-0-94.ec2.internal:3001 --topic AWSKafkaTutorialTopic2 --from-beginning --consumer.config client.properties, where the client.properties works fine when I connect directly to the AWS MSK bin/kafka-console-consumer.sh --bootstrap-server b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094 --topic AWSKafkaTutorialTopic2 --from-beginning --consumer.config client.properties

Experiment 1: using port 9094

./kafka-proxy server \
        --bootstrap-server-mapping "b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3001" \
        --bootstrap-server-mapping "b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3002" \
        --dynamic-listeners-disable \
        --kafka-client-id proxy \
        --proxy-listener-tls-enable \
        --proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
        --proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
        --proxy-listener-key-file  "/home/ec2-user/kafka-proxy/ssl/key.pem" \
        --debug-enable

Results

  1. Proxy log
INFO[2021-06-18T01:03:12Z] New connection for b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094
INFO[2021-06-18T01:03:12Z] Reading data from b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094 had error: unexpected EOF
  1. Broker log
[2021-06-18 01:03:12,122] INFO [SocketServer brokerId=1] Failed authentication with /INTERNAL_IP (SSL handshake failed) (org.apache.kafka.common.network.Selector)

Experiment 2: using port 9092

./kafka-proxy server \
        --bootstrap-server-mapping "b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9092,0.0.0.0:3001" \
        --bootstrap-server-mapping "b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9092,0.0.0.0:3002" \
        --dynamic-listeners-disable \
        --kafka-client-id proxy \
        --proxy-listener-tls-enable \
        --proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
        --proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
        --proxy-listener-key-file  "/home/ec2-user/kafka-proxy/ssl/key.pem" \
        --debug-enable

Result:

  1. Client Log
[2021-06-18 01:07:01,748] ERROR [Consumer clientId=consumer-console-consumer-42640-1, groupId=console-consumer-42640] Connection to node 2147483645 (/0.0.0.0:3002) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-06-18 01:07:01,750] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names present
	at sun.security.ssl.Alert.createSSLException(Alert.java:131)
	at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
  1. Proxy Log
INFO[2021-06-18T01:07:01Z] Reading data from local connection on 10.1.0.94:3002 from 10.1.0.94:52242 (b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9092) had error: tls: received unexpected handshake message of type *tls.clientHelloMsg when waiting for *tls.certificateMsg
  1. Broker Log: None

Experiment 3: combine both tls-enable and proxy-listener-tls-enable

./kafka-proxy server \
        --bootstrap-server-mapping "b-1.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3001" \
        --bootstrap-server-mapping "b-2.chen-guo-msk.abcdef.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:3002" \
        --dynamic-listeners-disable \
        --kafka-client-id proxy \
        --proxy-listener-tls-enable \
        --proxy-listener-ca-chain-cert-file "/home/ec2-user/kafka-proxy/ssl/truststore-certs.pem" \
        --proxy-listener-cert-file "/home/ec2-user/kafka-proxy/ssl/proxyServer-keystore-certs.pem" \
        --proxy-listener-key-file  "/home/ec2-user/kafka-proxy/ssl/key.pem" \
        --tls-enable \
        --tls-ca-chain-cert-file "/home/ec2-user/ssl2/truststore-certs.pem" \
        --tls-client-cert-file "/home/ec2-user/ssl2/keystore-certs.pem" \
        --tls-client-key-file  "/home/ec2-user/ssl2/key.pem" \
        --tls-client-key-password pass123 \
        --debug-enable

Result: exactly the same as Experiment2

I feel like there might be something wrong with how I generate the keys/certificates for the proxy, but I cannot figure out what. When I do openssl s_client -connect ip-10-1-0-94.ec2.internal:3001, the end of the response is like below

---
SSL handshake has read 18546 bytes and written 417 bytes
Verification error: self signed certificate in certificate chain
---
New, TLSv1.3, Cipher is TLS_AES_128_GCM_SHA256
Server public key is 2048 bit
Secure Renegotiation IS NOT supported
Compression: NONE
Expansion: NONE
No ALPN negotiated
Early data was not sent
Verify return code: 19 (self signed certificate in certificate chain)
---
140263774463808:error:14094412:SSL routines:ssl3_read_bytes:sslv3 alert bad certificate:ssl/record/rec_layer_s3.c:1544:SSL alert number 42

I checked your response in #40 (comment) and tried to disable TLS1.3, but it didn't help. It's always using TLSv1.3 as I can tell through openssl. I am not sure whether TLS1.3 is the problem. Also it seems that only tls13 is included in the binary.

[ec2-user@ip-10-1-0-94 kafka-proxy]$ strings kafka-proxy  | grep -i tls13.go
/usr/local/go/src/crypto/tls/handshake_server_tls13.go
/usr/local/go/src/crypto/tls/handshake_client_tls13.go

On the other hand, from the results in Experiment 1, we can actually tell that the TLS between client and proxy should already work. Otherwise, there shouldn't be any logs on the broker side.

Any suggestions would be great! Thank you!!

local error: tls: unexpected message on proxying request

I want to use this proxy to terminate TLS from the kafka brokers on the proxy (including client authentication with a key file) and connect from the application via plaintext. However, when running the proxy with the following command:

./kafka-proxy server --bootstrap-server-mapping "kafka1.example.com:19093,127.0.0.1:32500" --bootstrap-server-mapping "kafka2.example.com:19093,127.0.0.1:32501" --bootstrap-server-mapping "kafka3.example.com:19093,127.0.0.1:32502" --tls-insecure-skip-verify --tls-enable --tls-client-cert-file ./cert.pem --tls-client-key-file ./key.pem

However, whenever I try to connect from the java application, the proxy disconnects from the host with the following error message:

time="2020-02-15T13:08:30+01:00" level=info msg="New connection for kafka1.example.com:19093"
time="2020-02-15T13:08:30+01:00" level=info msg="couldn't connect to kafka1.example.com:19093(kafka1.example.com:19093): local error: tls: unexpected message"

I'm not sure, what I do wrong, or probably I misunderstood the parameters here, can you help?

Restricting the ports for outgoing connection to a range of pre-defined ports

Hi,
Is there a way to restrict the ports used for outgoing connections when starting the Kafka Proxy server? I am specifying the bootstrap server using the --bootstrap-server-mapping option and tried to use the --dynamic-listeners-disable and --dynamic-sequential-min-port but these options didn't seem to help. I have a firewall that blocks all outgoing connections by default and I have allowed port 9092 but I get the following error:

Reading data from destination_ip_address:destination_port had error: read tcp source_ip_address:source_port->destination_ip_address:destination_port: wsarecv: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.

This is because the Read TCP operation is happening on a random port that is blocked.

Update 1: After reading some code I found that the --dynamic-sequential-min-port parameter is being used to create servers/listeners using net.Listen method. So from what I understand this parameter is irrelevant to my problem. Can someone confirm ?

Client certificates check between proxy client and brokers client

Hi,

Here is my use case. I need to proxy AWS MSK (Managed Service Kafka) in order to connect it from any network (MSK connectivity options are bit limited as of now). MSK cluster is created with mutual TLS, this is because it is multitenant and each tenant has its own client cert. Client certs for all tenants are signed by same CA (CA certificate monthly fee is pretty high so this is cost optimization). I have one proxy group for each tenant and still want to maintain TLS between proxy client and proxy.

Kafka-proxy offers a possibility to terminate mutual TLS on proxy and also to start mutual TLS on proxy so those two capabilities can be used together in order to make sure that:

  1. Client connects with client cert
  2. Client credentials are propagated to server
    I would like proxy to terminate client TLS and start TLS with same certificate

I can start TLS on proxy with selected certificate (tls-client-cert-file) so this part is fine
On proxy listener side I can validate that proxy client cert is signed with given CA (proxy-listener-ca-chain-cert-file) but I am not able to check that proxy client cert is same as tls-client-cert-file. This is needed so that tenantA is not able to use same proxy group as tenantB (They are signed with same CA)

I did some hacking on kafka-proxy and came up with some prototype that is backwards compatible by default but allows to check for same client cert if needed (new config flag). The code is pretty straightforward and available here: https://github.com/mgusiew-guide/kafka-proxy/tree/SameClientCertEnable

Unfortunately I did not create any automated tests for now (just tested manually) but that will follow, just wanted to have something in order to start discussion.

Would you be interested in such contribution ? I could redesign and add some tests based on your review.

Let me know what you think

Question: msk server in port 443

My requirement is expose msk in public internet on port 443. I am trying to run this proxy in eks fronting MSK .

---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-proxy-1
  name: kafka-proxy-1
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: kafka-proxy-1
    port: 443
    protocol: TCP
    targetPort: 443
  selector:
    app: kafka-proxy-1
  sessionAffinity: None
  type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-proxy-2
  name: kafka-proxy-2
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: kafka-proxy-2
    port: 443
    protocol: TCP
    targetPort: 443
  selector:
    app: kafka-proxy-2
  sessionAffinity: None
  type: LoadBalancer
---

apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-proxy-3
  name: kafka-proxy-3
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: kafka-proxy-3
    port: 443
    protocol: TCP
    targetPort: 443
  selector:
    app: kafka-proxy-3
  sessionAffinity: None
  type: LoadBalancer

---
apiVersion: apps/v1
kind: Deployment
metadata:
   name: kafka-proxy-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-proxy-1
  template:
    metadata:
      labels:
        app: kafka-proxy-1
    spec:
      containers:
        - name: kafka-proxy
          securityContext:
            capabilities:
              add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
          image: grepplabs/kafka-proxy:latest
          args:
            - 'server'
            - '--log-format=json'
            - '--bootstrap-server-mapping=b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:443,a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443'
            - '--external-server-mapping=b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443'
            - '--external-server-mapping=b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443'
            - '--external-server-mapping=b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443'
            - '--tls-enable'
            - '--log-level=debug'
            - '--dynamic-listeners-disable'
            - '--tls-insecure-skip-verify'
            - '--proxy-request-buffer-size=32768'
            - '--proxy-response-buffer-size=32768'
            - '--proxy-listener-read-buffer-size=32768'
            - '--proxy-listener-write-buffer-size=131072'
            - '--kafka-connection-read-buffer-size=131072'
            - '--kafka-connection-write-buffer-size=32768'
          ports:
          - name: kafka-port1
            containerPort: 443


---
apiVersion: apps/v1
kind: Deployment
metadata:
   name: kafka-proxy-2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-proxy-2
  template:
    metadata:
      labels:
        app: kafka-proxy-2
    spec:
      containers:
        - name: kafka-proxy
          securityContext:
            capabilities:
              add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
          image: grepplabs/kafka-proxy:latest
          args:
            - 'server'
            - '--log-format=json'
            - '--bootstrap-server-mapping=b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:443,a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443'
            - '--external-server-mapping=b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443'
            - '--external-server-mapping=b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443'
            - '--external-server-mapping=b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443'
            - '--tls-enable'
            - '--log-level=debug'
            - '--dynamic-listeners-disable'
            - '--tls-insecure-skip-verify'
            - '--proxy-request-buffer-size=32768'
            - '--proxy-response-buffer-size=32768'
            - '--proxy-listener-read-buffer-size=32768'
            - '--proxy-listener-write-buffer-size=131072'
            - '--kafka-connection-read-buffer-size=131072'
            - '--kafka-connection-write-buffer-size=32768'
          ports:
          - name: kafka-port2
            containerPort: 443


---
apiVersion: apps/v1
kind: Deployment
metadata:
   name: kafka-proxy-3
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-proxy-3
  template:
    metadata:
      labels:
        app: kafka-proxy-3
    spec:
      containers:
        - name: kafka-proxy
          securityContext:
            capabilities:
              add: ["NET_ADMIN", "SYS_TIME","NET_BIND_SERVICE"]
          image: grepplabs/kafka-proxy:latest
          args:
            - 'server'
            - '--log-format=json'
            - '--external-server-mapping=b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443'
            - '--external-server-mapping=b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443'
            - '--external-server-mapping=b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443'
            - '--bootstrap-server-mapping=b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094,0.0.0.0:443,a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443'
            - '--tls-enable'
            - '--log-level=debug'
            - '--dynamic-listeners-disable'
            - '--tls-insecure-skip-verify'
            - '--proxy-request-buffer-size=32768'
            - '--proxy-response-buffer-size=32768'
            - '--proxy-listener-read-buffer-size=32768'
            - '--proxy-listener-write-buffer-size=131072'
            - '--kafka-connection-read-buffer-size=131072'
            - '--kafka-connection-write-buffer-size=32768'
          ports:
          - name: kafka-port3
            containerPort: 443

where *.us-east-1.elb.amazonaws.com:443 are the external ip i get from eks.

I get this error in my pod

{"@level":"info","@message":"Bootstrap server b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094 advertised as a572bcec9048311ea874d02324e2a15c-102380383.us-east-1.elb.amazonaws.com:443","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"External server b-2.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094 advertised as a573b1e97048311ea874d02324e2a15c-2011622944.us-east-1.elb.amazonaws.com:443","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"External server b-3.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094 advertised as a574a0444048311ea874d02324e2a15c-972090528.us-east-1.elb.amazonaws.com:443","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"Listening on 0.0.0.0:443 ([::]:443) for remote b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"Ready for new connections","@timestamp":"2019-11-11T22:45:59Z"}
{"@level":"info","@message":"New connection for b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:46:01Z"}
{"@level":"info","@message":"Client closed local connection on 20.10.3.191:443 from 20.10.1.197:39176 (b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094)","@timestamp":"2019-11-11T22:46:01Z"}
{"@level":"info","@message":"New connection for b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:46:05Z"}
{"@level":"info","@message":"Client closed local connection on 20.10.3.191:443 from 20.10.2.185:7332 (b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094)","@timestamp":"2019-11-11T22:46:05Z"}
{"@level":"info","@message":"New connection for b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:46:09Z"}
{"@level":"info","@message":"Client closed local connection on 20.10.3.191:443 from 20.10.3.252:36298 (b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094)","@timestamp":"2019-11-11T22:46:09Z"}
{"@level":"info","@message":"New connection for b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094","@timestamp":"2019-11-11T22:46:11Z"}
{"@level":"info","@message":"Client closed local connection on 20.10.3.191:443 from 20.10.1.197:39186 (b-1.pdp-kafka.h5u916.c3.kafka.us-east-1.amazonaws.com:9094)","@timestamp":"2019-11-11T22:46:11Z"}```

kafka proxy expects response from broker for Produce Request with RequiredAcks=0

Symptoms: "open requests buffer is full" as described in #23

RequiredAcks:
This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.

Kafka-proxy for Auditing

Hi,

I have an auditing/compliance requirement to generate log for auditing for my Kafka Broker. As my service provider doesn't provide it, I'm evaluate to use Kafka-proxy to achieve it.

In my requirements, if I'm able to add username information from SASL and client IP on debug log, something like that:

logrus.Debugf("[%s] Kafka request key %v, version %v, length %v", ctx.username, requestKeyVersion.ApiKey, requestKeyVersion.ApiVersion, requestKeyVersion.Length)

My broker uses SASL PLAIN.

I'm trying to change the code to extract SASL packages on Kafka's API KEY 17 on handleRequest but I cannot re-read src DeadlineReaderWriter, right?

Any ideia/recommendation? I'd love to contribute to the project, besides I'm not a go developer ;-)

Proxy to multiple cluster bases on header fields

I known it's not the goal of this project but I have in mind something that can help me to solve big scaling issues.
The idea is to write a proxy between the producer and the brokers cluster, that can route all the trafic to the appropriate kafka cluster depending on a field on kafka header message. Maybe my idea is just crap, I don't know but if I can your input I ll appreciate.

Add support for remapping brokers based on node id

Kafka allows brokers to advertise any IP, but they must have a consistent node id. It would be useful if kafka-proxy could key the IP replacement logic by node id as well as ip. For example, something like --bootstrap-server-mapping=NODE_ID:1,localhost:30001.

This would still require at least one --bootstrap-server-mapping that's not node id based, but it would provide a nice alternative in case the advertised hostnames aren't readily available and change over time.

QUESTION: open requests buffer is full

I am using kafka-proxy to publish 250 messages per second, from two producers where average message size is 8kB.

While doing that I get the following error on every few seconds:

Reading data from local connection on xxx:32502 from xxx:14913 (b-2.testcluster.pwsiki.c3.kafka.eu-west-1.amazonaws.com:9092) had error: open requests buffer is full

I have increased all available "buffer" properties to quite high number (ie. 500000).
I am running proxy on stock ec2 m5.large instance.

Should I tweak any of the system settings? If so, which?

Error: Executing consumer group command failed due to org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

Hi,

Any ideia why I got:

Error: Executing consumer group command failed due to org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:402)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:401)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:417)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:312)
	at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
	at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

Running bin/kafka-consumer-groups --bootstrap-server proxy:9092 --all-groups --all-topics --describe --verbose and I not have error when running directly on my broker?

Thanks a lot!

ACL support?

Does it have ACL or another access control mechanism on proxy level? For example, interception and filtering of messages by senders and target topics. Like gateways filters http messages by theirs headers and target path. The main idea is the same - to bring all the issues of access to the proxy/gateway(or other separate system) without using the built-in Kafka ACL.

Errors you may encounter when upgrading the library

(The purpose of this report is to alert grepplabs/kafka-proxy to the possible problems when grepplabs/kafka-proxy try to upgrade the following dependencies)

An error will happen when upgrading library prometheus/client_golang:

github.com/prometheus/client_golang

-Latest Version: v1.7.1 (Latest commit fe7bd95 11 days ago)
-Where did you use it:
https://github.com/grepplabs/kafka-proxy/search?q=prometheus%2Fclient_golang%2Fprometheus&unscoped_q=prometheus%2Fclient_golang%2Fprometheus
-Detail:

github.com/prometheus/client_golang/go.mod

module github.com/prometheus/client_golang
require (
	github.com/beorn7/perks v1.0.1
	github.com/cespare/xxhash/v2 v2.1.1
	…
)
go 1.11

github.com/prometheus/client_golang/prometheus/desc.go

package prometheus
import (
	"github.com/cespare/xxhash/v2"
	…
)

This problem was introduced since prometheus/client_golang v1.2.0(committed 9a2ab94 on 16 Oct 2019) .Now you used version v0.8.0. If you try to upgrade prometheus/client_golang to version v1.2.0 and above, you will get an error--- no package exists at "github.com/cespare/xxhash/v2"

I investigated the libraries (prometheus/client_golang >= v1.2.0) release information and found the root cause of this issue is that----

  1. These dependencies all added Go modules in the recent versions.

  2. They all comply with the specification of "Releasing Modules for v2 or higher" available in the Modules documentation. Quoting the specification:

A package that has migrated to Go Modules must include the major version in the import path to reference any v2+ modules. For example, Repo github.com/my/module migrated to Modules on version v3.x.y. Then this repo should declare its module path with MAJOR version suffix "/v3" (e.g., module github.com/my/module/v3), and its downstream project should use "github.com/my/module/v3/mypkg" to import this repo’s package.

  1. This "github.com/my/module/v3/mypkg" is not the physical path. So earlier versions of Go (including those that don't have minimal module awareness) plus all tooling (like dep, glide, govendor, etc) don't have minimal module awareness as of now and therefore don't handle import paths correctly See golang/dep#1962, golang/dep#2139.

Note: creating a new branch is not required. If instead you have been previously releasing on master and would prefer to tag v3.0.0 on master, that is a viable option. (However, be aware that introducing an incompatible API change in master can cause issues for non-modules users who issue a go get -u given the go tool is not aware of semver prior to Go 1.11 or when module mode is not enabled in Go 1.11+).
Pre-existing dependency management solutions such as dep currently can have problems consuming a v2+ module created in this way. See for example dep#1962.
https://github.com/golang/go/wiki/Modules#releasing-modules-v2-or-higher

Solution

1. Migrate to Go Modules.

Go Modules is the general trend of ecosystem, if you want a better upgrade package experience, migrating to Go Modules is a good choice.

Migrate to modules will be accompanied by the introduction of virtual paths(It was discussed above).

This "github.com/my/module/v3/mypkg" is not the physical path. So Go versions older than 1.9.7 and 1.10.3 plus all third-party dependency management tools (like dep, glide, govendor, etc) don't have minimal module awareness as of now and therefore don't handle import paths correctly.

Then the downstream projects might be negatively affected in their building if they are module-unaware (Go versions older than 1.9.7 and 1.10.3; Or use third-party dependency management tools, such as: Dep, glide, govendor…).

2. Maintaining v2+ libraries that use Go Modules in Vendor directories.

If grepplabs/kafka-proxy want to keep using the dependency manage tools (like dep, glide, govendor, etc), and still want to upgrade the dependencies, can choose this fix strategy.
Manually download the dependencies into the vendor directory and do compatibility dispose(materialize the virtual path or delete the virtual part of the path). Avoid fetching the dependencies by virtual import paths. This may add some maintenance overhead compared to using modules.

As the import paths have different meanings between the projects adopting module repos and the non-module repos, materialize the virtual path is a better way to solve the issue, while ensuring compatibility with downstream module users. A textbook example provided by repo github.com/moby/moby is here:
https://github.com/moby/moby/blob/master/VENDORING.md
https://github.com/moby/moby/blob/master/vendor.conf
In the vendor directory, github.com/moby/moby adds the /vN subdirectory in the corresponding dependencies.
This will help more downstream module users to work well with your package.

3. Request upstream to do compatibility processing.

The prometheus/client_golang have 1048 module-unaware users in github, such as: AndreaGreco/mqtt_sensor_exporter, seekplum/plum_exporter, arl/monitoring…
https://github.com/search?q=prometheus%2Fclient_golang+filename%3Avendor.conf+filename%3Avendor.json+filename%3Aglide.toml+filename%3AGodep.toml+filename%3AGodep.json&type=Code

Summary

You can make a choice when you meet this DM issues by balancing your own development schedules/mode against the affects on the downstream projects.

For this issue, Solution 1 can maximize your benefits and with minimal impacts to your downstream projects the ecosystem.

References

Do you plan to upgrade the libraries in near future?
Hope this issue report can help you ^_^
Thank you very much for your attention.

Best regards,
Kate

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.