Git Product home page Git Product logo

confluentinc / demo-scene Goto Github PK

View Code? Open in Web Editor NEW
1.5K 250.0 893.0 214.35 MB

👾Scripts and samples to support Confluent Demos and Talks. ⚠️Might be rough around the edges ;-) 👉For automated tutorials and QA'd code, see https://github.com/confluentinc/examples/

Home Page: https://developer.confluent.io

License: Apache License 2.0

Shell 45.60% Python 0.66% Java 20.92% JavaScript 24.89% Groovy 0.27% Makefile 0.50% HCL 1.30% Smarty 0.01% HTML 0.80% PLpgSQL 0.01% CSS 3.52% Kotlin 0.53% TSQL 0.31% Dockerfile 0.07% Go 0.38% Ruby 0.01% Elm 0.16% Less 0.02% Jupyter Notebook 0.07%
kafka ksql elasticsearch syslog mysql debezium kafka-streams kafka-connect ksqldb demo

demo-scene's Introduction

demo-scene

Scripts and samples to support Confluent Platform talks. May be rough around the edges. For automated tutorials and QA'd code, see https://github.com/confluentinc/examples/

Requirements

You may well need to allocate Docker 8GB when running these. Avoid allocating all your machine's cores to Docker as this may cause the machine to become unresponsive when running large stacks. On a four-core Mac Book two cores for Docker should be fine.

Contents

Livestreams

🎥 recordings

Applications

Data pipelines (Database + KSQL + Elasticsearch)

ksqlDB (previously known as KSQL)

Kafka Connect

Confluent Cloud

Confluent Platform

Misc

Feedback & Questions

demo-scene's People

Contributors

addisonhuddy avatar bleporini avatar cerchie avatar daveklein avatar davetroiano avatar davidseth avatar domenicbove avatar gianlucanatali avatar guozhangwang avatar hdulay avatar jrock9000 avatar jwfbean avatar kaiwaehner avatar karla-isabel-sandoval avatar londoncalling avatar lukeknep avatar lyoung-confluent avatar marcinthecloud avatar mattwong949 avatar mcascallares avatar mousavi310 avatar pdruley avatar riferrei avatar rmoff avatar russau avatar saubury-tw avatar smithjohntaylor avatar tjunderhill avatar tlberglund avatar vvcephei avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

demo-scene's Issues

[rail-data-streaming-pipeline] - Kafka shutting down

When I run docker-compose, everything shows UP - but when running the ingest_00.sh script I'm getting an error saying no reply from server..

When i look at the docker-compose logs i'm seeing issues connecting kibana to elasticsearch, but i think it's a red herring -

kibana              | {"type":"log","@timestamp":"2019-08-27T02:58:56Z","tags":["license","warning","xpack"],"pid":1,"message":"License information from the X-Pack plugin could not be obtained from Elasticsearch for the [data] cluster. Error: No Living connections"}
kibana              | {"type":"log","@timestamp":"2019-08-27T02:58:56Z","tags":["warning","elasticsearch","admin"],"pid":1,"message":"Unable to revive connection: http://elasticsearch:9200/"}
kibana              | {"type":"log","@timestamp":"2019-08-27T02:58:56Z","tags":["warning","elasticsearch","admin"],"pid":1,"message":"No living connections"}

Elasticsearch doesn't appear to throw any errors. BUT Kafka shut down, error here:

kafka               | [2019-08-27 02:55:36,553] INFO Socket connection established to zookeeper/172.20.0.6:2181, initiating session (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:55:36,571] INFO Session establishment complete on server zookeeper/172.20.0.6:2181, sessionid = 0x1000263ea780001, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:55:36,642] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
kafka               | [2019-08-27 02:55:53,640] INFO Unable to read additional data from server sessionid 0x1000263ea780001, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:04,403] INFO Opening socket connection to server zookeeper/172.20.0.6:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:04,923] INFO Socket connection established to zookeeper/172.20.0.6:2181, initiating session (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:06,751] WARN Unable to reconnect to ZooKeeper service, session 0x1000263ea780001 has expired (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:06,963] INFO Unable to reconnect to ZooKeeper service, session 0x1000263ea780001 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:12,991] INFO [ZooKeeperClient] Session expired. (kafka.zookeeper.ZooKeeperClient)
kafka               | [2019-08-27 02:56:13,021] INFO EventThread shut down for session: 0x1000263ea780001 (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:14,472] INFO [ZooKeeperClient] Initializing a new session to zookeeper:2181. (kafka.zookeeper.ZooKeeperClient)
kafka               | [2019-08-27 02:56:14,818] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@3c9754d8 (org.apache.zookeeper.ZooKeeper)
kafka               | [2019-08-27 02:56:16,911] INFO Opening socket connection to server zookeeper/172.20.0.6:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:16,936] INFO Socket connection established to zookeeper/172.20.0.6:2181, initiating session (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:17,169] INFO Session establishment complete on server zookeeper/172.20.0.6:2181, sessionid = 0x1000263ea780002, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:18,488] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka               | org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired for /consumers
kafka               | 	at org.apache.zookeeper.KeeperException.create(KeeperException.java:130)
kafka               | 	at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
kafka               | 	at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:544)
kafka               | 	at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1610)
kafka               | 	at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1532)
kafka               | 	at kafka.zk.KafkaZkClient$$anonfun$createTopLevelPaths$1.apply(KafkaZkClient.scala:1524)
kafka               | 	at kafka.zk.KafkaZkClient$$anonfun$createTopLevelPaths$1.apply(KafkaZkClient.scala:1524)
kafka               | 	at scala.collection.immutable.List.foreach(List.scala:392)
kafka               | 	at kafka.zk.KafkaZkClient.createTopLevelPaths(KafkaZkClient.scala:1524)
kafka               | 	at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:386)
kafka               | 	at kafka.server.KafkaServer.startup(KafkaServer.scala:205)
kafka               | 	at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:114)
kafka               | 	at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:66)
kafka               | [2019-08-27 02:56:18,579] INFO shutting down (kafka.server.KafkaServer)
kafka               | [2019-08-27 02:56:18,692] INFO [ZooKeeperClient] Closing. (kafka.zookeeper.ZooKeeperClient)
kafka               | [2019-08-27 02:56:18,791] INFO EventThread shut down for session: 0x1000263ea780002 (org.apache.zookeeper.ClientCnxn)
kafka               | [2019-08-27 02:56:18,794] INFO Session: 0x1000263ea780002 closed (org.apache.zookeeper.ZooKeeper)
kafka               | [2019-08-27 02:56:18,802] INFO [ZooKeeperClient] Closed. (kafka.zookeeper.ZooKeeperClient)
kafka               | [2019-08-27 02:56:18,876] INFO shut down completed (kafka.server.KafkaServer)
kafka               | [2019-08-27 02:56:18,888] INFO Shutting down SupportedServerStartable (io.confluent.support.metrics.SupportedServerStartable)
kafka               | [2019-08-27 02:56:18,888] INFO Closing BaseMetricsReporter (io.confluent.support.metrics.BaseMetricsReporter)
kafka               | [2019-08-27 02:56:18,889] INFO Waiting for metrics thread to exit (io.confluent.support.metrics.SupportedServerStartable)
kafka               | [2019-08-27 02:56:18,889] INFO Shutting down KafkaServer (io.confluent.support.metrics.SupportedServerStartable)
kafka               | [2019-08-27 02:56:18,892] INFO shutting down (kafka.server.KafkaServer)

In the early part of the logs I see it can't resolve kafka:29092.. :

[main] WARN org.apache.kafka.clients.ClientUtils - Couldn't resolve server kafka:29092 from bootstrap.servers as DNS resolution failed for kafka
kafka-connect-02    | [main] ERROR io.confluent.admin.utils.cli.KafkaReadyCommand - Error while running kafka-ready.
kafka-connect-02    | org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
kafka-connect-02    | 	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:386)
kafka-connect-02    | 	at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:65)
kafka-connect-02    | 	at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:138)
kafka-connect-02    | 	at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:150)
kafka-connect-02    | Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
kafka-connect-02    | 	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:90)
kafka-connect-02    | 	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:49)
kafka-connect-02    | 	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346)
kafka-connect-02    | 	... 3 more

When i run this first curl post in the 00_ingest.sh file ..

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
  "name": "source-activemq-networkrail-TRAIN_MVT_EA_TOC-01",
  "config": {
    "connector.class": "io.confluent.connect.activemq.ActiveMQSourceConnector",
    "activemq.url": "tcp://datafeeds.networkrail.co.uk:61619",
    "activemq.username": "${file:/data/credentials.properties:NROD_USERNAME}",
    "activemq.password": "${file:/data/credentials.properties:NROD_PASSWORD}",
    "jms.destination.type": "topic",
    "jms.destination.name": "TRAIN_MVT_EA_TOC",
    "kafka.topic": "networkrail_TRAIN_MVT",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "confluent.license": "",
    "confluent.topic.bootstrap.servers": "kafka:29092",
    "confluent.topic.replication.factor": 1
  }
}'

This is the response:
curl: (52) Empty reply from server

Would love some insight on this 🙏

Set RabbitMQConnector kafka key based on a message field value

Hello!
I have read the great article

https://rmoff.net/2020/01/08/streaming-messages-from-rabbitmq-into-kafka-with-kafka-connect/.

I would like to know how/if it possible to configure the rabbitmq source connector:

  1. to take a field from the payload message as a value for the key for the kafka message
  2. to specify a static value value for the key for the kafka message

Example for the mentioned article:

  1. specify "transaction" field value (e.g. PAYMENT) to be key value for the kafka message
    RabbitMQMessage
{
  "transaction": "PAYMENT",
  "amount": "$125.0",
  "timestamp": "Wed 8 Jan 2020 10:41:45 GMT"
}

Kafka message, the value of the field transaction to be set as value for the kafka key message :

"key": "PAYMENT"
"value": byte value for the RABBITMQMessage
  1. Specify my own key value, meaning that the key value for the Kafka Message o be configured static as "PAYMENT"
    Kafka message:
"key": "PAYMENT"
"value": byte value for the RABBITMQMessage

I would prefer a solution for the first example, in order to have a dynamic configuration,

[scalable-payment-processing] PaymentThroughputTest Testing throughput per Window

I Was looking at PaymentsThroughput application. ThroughputPerWindow seems to be always 0.

In order to test the Application I suppose one can use testDriver.advanceWallClockTime(ONE_MINUTE);

However calls to System.currentTimeMillis(); within the ThroughputStats would probably not have the desired effect. Would it be wise to inject a Clock instance or have it as a static variable in order to inject it where ever need and be able to test using

public KafkaStreams(Topology topology, Properties props, Time time)

and in integration tests use specific time

public EmbeddedKafkaCluster(final int numBrokers,
                                final Properties brokerConfig,
                                final long mockTimeMillisStart,
                                final long mockTimeNanoStart)

and in PaymentsThroughput call something like Time.now() instead of System.currentTimeMillis();

Having problem with specific.protobuf.value.type in yaml config using Spring Cloud Kafka binder

When using the Spring Cloud Kafka binder, I am having a problem applying the 'specific.protobuf.value.type' config setting when using KafkaProtobufSerde. In my StreamProcessor class, I want to be able to deserialize my protobuf classes using the the 'specific' classes specified in config (protobuf generated).
Please see sample code: https://github.com/mperrault/specific-protobuf-issue
In my EventStreamProcessor.consume method, the eventType1Stream should be deserialized as EventType1 instances, and the eventType2Stream should be EventType2 instances if this config setting is applied properly to these channels.
The consume method invocation will fail with ClassCastExceptions indicating that the events types are of type DynamicMessage (which is what you would expect if KafkaProtobufSerde had no 'specific' config applied).
I experimented with moving the specific.protobuf.value.type setting to different locations in the yaml file, but had no success in getting this recognized at the channel level. Note that 'specific' config does work at the spring.kafka.properties.specific.protobuf.value.type location, but this ends up causing ALL channels that have been configured for KafkaProtobufSerde to use this 'specific' value type - I want to be able to set 'specific' per channel.

Salesforce Connector - throwing - TimeoutException: Timed out waiting for a node assignment

Hello, i'm currently getting an error org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment in the connector status after configuring a Salesforce Source Connector in Confluent Center > Connect Page.

This is my docker-compose.yml file:

`

version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- '31000:31000'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31000

kafka:
# "-._,-'"-.,-'"-._,-'"-.,-'"-._,-'"-.,-'"-._,-'"-.,-'"-._,- # An important note about accessing Kafka from clients on other machines: # ----------------------------------------------------------------------- # # The config used here exposes port 9092 for _external_ connections to the broker # i.e. those from _outside_ the docker network. This could be from the host machine # running docker, or maybe further afield if you've got a more complicated setup. # If the latter is true, you will need to change the value 'localhost' in # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those # remote clients # # For connections _internal_ to the docker network, such as from other services # and components, use kafka:29092. # # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details # "-.,-'"-._,-'"-.,-'"-._,-'"-.,-'"-._,-'"-.,-'"-._,-'"-._,-
#
image: confluentinc/cp-enterprise-kafka:5.3.0
ports:
- '9092:9092'
- '31001:31001'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31001

schema-registry:
image: confluentinc/cp-schema-registry:latest
depends_on:
- zookeeper
- kafka
ports:
- '8081:8081'
- '31002:31002'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_JMX_HOSTNAME: "localhost"
SCHEMA_REGISTRY_JMX_PORT: 31002

ksql-server:
image: confluentinc/cp-ksql-server:latest
depends_on:
- kafka
- schema-registry
ports:
- '31003:31003'
- '8088:8088'
environment:
KSQL_BOOTSTRAP_SERVERS: kafka:29092
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_SERVICE_ID: asgard
KSQL_JMX_HOSTNAME: "localhost"
KSQL_JMX_PORT: 31003
KSQL_LOG4J_ROOT_LOGLEVEL: "INFO"
KSQL_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
# --- Processing log config ---
KSQL_LOG4J_PROCESSING_LOG_BROKERLIST: kafka:29092
KSQL_LOG4J_PROCESSING_LOG_TOPIC: asgard_processing_log
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_NAME: asgard_processing_log
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"

ksql-cli:
image: confluentinc/cp-ksql-cli:latest
depends_on:
- ksql-server
entrypoint: /bin/sh
tty: true

kafka-connect:
image: confluentinc/cp-kafka-connect:latest
ports:
- '8083:8083'
- '31004:31004'
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31004
depends_on:
- zookeeper
- kafka
- schema-registry
# Uncomment this bit if you want to automatically install the datagen and twitter connectors
# and set the datagen connector running
# command:
# - bash
# - -c
# - |
# confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.1.1
# confluent-hub install --no-prompt jcustenborder/kafka-connect-twitter:0.2.32
# /etc/confluent/docker/run &
# echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
# while [ $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) -ne 200 ] ; do
# echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) " (waiting for 200)"
# sleep 5
# done
# nc -vz kafka-connect 8083
# echo -e "\n--\n+> Creating Connector"
# curl -X POST http://kafka-connect:8083/connectors -H "Content-Type: application/json" -d '
# {
# "name": "datagen-pageviews",
# "config": {
# "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
# "kafka.topic": "pageviews",
# "quickstart": "pageviews",
# "max.interval": 100,
# "iterations": 10000000,
# "tasks.max": "1"
# }
# }
# '
# sleep infinity

rest-proxy:
image: confluentinc/cp-kafka-rest:latest
depends_on:
- zookeeper
- kafka
- schema-registry
ports:
- '8082:8082'
- '31005:31005'
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KAFKAREST_JMX_HOSTNAME: "localhost"
KAFKAREST_JMX_PORT: 31005

control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- kafka
- schema-registry
- kafka-connect
- ksql-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT_CLUSTER: 'kafka-connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksql-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
`

rail-data-streaming-pipeline - Ingest data

Trying to set up rail-data-stream locally. getting below error.

Adityas-MacBook-Pro:cif_schedule adityapadhi$ ./00_ingest_schedule.sh
readlink: illegal option -- f
usage: readlink [-n] [file ...]
usage: dirname path
./00_ingest_schedule.sh: line 4: /../../set_credentials_env.sh: No such file or directory
gunzip: unknown compression format
gunzip: unknown compression format
gunzip: unknown compression format
gunzip: unknown compression format
Adityas-MacBook-Pro:cif_schedule adityapadhi$
Adityas-MacBook-Pro:cif_schedule adityapadhi$

any advise how to resolve?

some more info..
**Adityas-MacBook-Pro:cif_schedule adityapadhi$ curl -s -L -u "$NROD_USERNAME:$NROD_PASSWORD" "https://datafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_EA_TOC_FULL_DAILY&day=toc-full"

<title>Apache Tomcat/7.0.27 - Error report</title><style></style>

HTTP Status 401 - User not subscribed to CIF_EA_TOC_FULL_DAILY


type Status report

message User not subscribed to CIF_EA_TOC_FULL_DAILY

description This request requires HTTP authentication (User not subscribed to CIF_EA_TOC_FULL_DAILY).


Apache Tomcat/7.0.27

Adityas-MacBook-Pro:cif_schedule adityapadhi$ echo $NROD_USERNAME [email protected] Adityas-MacBook-Pro:cif_schedule adityapadhi$**

PS : name of the script is incorrect in set up instructions.

Issue when trying to run demo on Mac

I’m following https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc
After running the setup, I get an error message at:

ksql> PRINT 'orders';
Could not find topic 'orders', or the KSQL user does not have permissions to list the topic.

I can run SHOW TOPICS; and indeed orders does not appear in the listing (there are only topics like _confluent* and one _schemas)

My OS: MacOS High Sierra

I’m available if you need any further information on how to reproduce the issue.

Ratings not flowing - how to start?

Working through the demo script at: https://github.com/confluentinc/demo-scene/blob/master/build-a-streaming-pipeline/demo_build-a-streaming-pipeline.adoc

=============
Waiting for Kafka Connect to start listening on localhost ⏳
=============

Tue Jul 14 13:30:10 PDT 2020 

--------------
\o/ Kafka Connect is ready! Listener HTTP state:  200 

curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'DatagenConnector|MySqlConnector|ElasticsearchSinkConnector'

"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
"io.confluent.kafka.connect.datagen.DatagenConnector"
"io.debezium.connector.mysql.MySqlConnector"

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas 
-------------------------------------------------------------------------
 confluent_rmoff_01ksql_processing_log | 1          | 1                  
 ratings                               | 1          | 1                  
-------------------------------------------------------------------------
ksql> 

ksql> CREATE SINK CONNECTOR SINK_ES_RATINGS WITH (
>    'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
>    'topics'          = 'ratings',
>    'connection.url'  = 'http://elasticsearch:9200',
>    'type.name'       = '_doc',
>    'key.ignore'      = 'false',
>    'schema.ignore'   = 'true',
>    'transforms'= 'ExtractTimestamp',
>    'transforms.ExtractTimestamp.type'= 'org.apache.kafka.connect.transforms.InsertField$Value',
>    'transforms.ExtractTimestamp.timestamp.field' = 'RATING_TS'
>);

 Error                                                    
----------------------------------------------------------
 {
  "error_code" : 409,
  "message" : "Connector SINK_ES_RATINGS already exists"
} 
----------------------------------------------------------
ksql> 

Print ratings just sits there....

ksql> PRINT 'ratings';

Press CTRL-C to interrupt

[Question] Why intermittently cannot find SpoolDir?

After cloning project and docker-compose up initially fine:

with data/unprocessed/sample.csv

Isabella,42567,Girl
Sophia,42261,Girl
Jacob,42164,Boy
Emma,35951,Girl
Ethan,34523,Boy
Mason,34195,Boy
William,34130,Boy
Olivia,34128,Girl
Jayden,33962,Boy
Ava,30765,Girl

docker-compose ps

     Name                    Command                       State                        Ports              
-----------------------------------------------------------------------------------------------------------
kafka             /etc/confluent/docker/run        Up                      0.0.0.0:9092->9092/tcp          
kafka-connect     bash -c echo "Installing c ...   Up (health: starting)   0.0.0.0:8083->8083/tcp, 9092/tcp
kafkacat          /bin/sh -c apk add jq;           Up                                                      
                  wh ...                                                                                   
ksqldb            /usr/bin/docker/run              Up                      0.0.0.0:8088->8088/tcp          
postgres          docker-entrypoint.sh postgres    Up                      5432/tcp                        
schema-registry   /etc/confluent/docker/run        Up                      0.0.0.0:8081->8081/tcp          
zookeeper         /etc/confluent/docker/run        Up                      2181/tcp, 2888/tcp, 3888/tcp 
curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'SpoolDir'

"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector"

Create the connector as per doc:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'

Then sometime later:

docker-compose ps


     Name                    Command                       State                        Ports              
-----------------------------------------------------------------------------------------------------------
kafka             /etc/confluent/docker/run        Exit 1                                                  
kafka-connect     bash -c echo "Installing c ...   Up (health: starting)   0.0.0.0:8083->8083/tcp, 9092/tcp
kafkacat          /bin/sh -c apk add jq;           Up                                                      
                  wh ...                                                                                   
ksqldb            /usr/bin/docker/run              Exit 255                                                
postgres          docker-entrypoint.sh postgres    Up                      5432/tcp                        
schema-registry   /etc/confluent/docker/run        Exit 1                                                  
zookeeper         /etc/confluent/docker/run        Up                      2181/tcp, 2888/tcp, 3888/tcp

Some log:

Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1596284656874) timed out at 1596284656875 after 1 attempt(s)
schema-registry    | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
schema-registry    | [main] INFO io.confluent.admin.utils.ClusterStatus - Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ...
schema-registry    | [main] ERROR io.confluent.admin.utils.ClusterStatus - Expected 1 brokers but found only 0. Brokers found [].
schema-registry exited with code 1
ksqldb             | [2020-08-01 12:24:38,879] ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:60)
ksqldb             | io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration!
ksqldb             | 	at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:90)
ksqldb             | 	at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.isKafkaAuthorizerEnabled(KsqlAuthorizationValidatorFactory.java:81)
ksqldb             | 	at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:51)
ksqldb             | 	at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:624)
ksqldb             | 	at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:544)
ksqldb             | 	at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:98)
ksqldb             | 	at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:56)
ksqldb             | Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1596284678874) timed out at 1596284678875 after 1 attempt(s)
ksqldb             | 	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
ksqldb             | 	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
ksqldb             | 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
ksqldb             | 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
ksqldb             | 	at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:60)
ksqldb             | 	... 6 more
ksqldb             | Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1596284678874) timed out at 1596284678875 after 1 attempt(s)
ksqldb             | Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
ksqldb exited with code 255

and

curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'SpoolDir'

Does not return anything?

and needing to restart docker-compose up again before

curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'SpoolDir'

to return the expected:

"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector"

EDIT:

Seems to work now, as before there was no schema, now everytime a new csv with schema is placed into the unprocessed directory it is ingested, but would like to know how NOT to reingest an already processed csv file?

Say I dragged sample.csv from processed back to unprocessed directory, it should or not reprocess again? Do I have option not to reprocess again if csv file is already in processed directory?

EDIT:

But if testing for another csv file of the same name that is already in processed directory by issuing another

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }'

curl: (52) Empty reply from server

Can these questions be more easily answered from ksqlDB?

How many different boys' names are there that begin with the letter Z? (Count the names, not the people.)
What is the most common girl's name that begins with the letter Q?

docker --version
Docker version 19.03.12, build 48a66213fe

Mac OSX Catalina 10.15.6

Cube: overview diagram should show mobile devices accessing the REST/VPC environment

https://github.com/confluentinc/demo-scene/tree/master/ccloud-cube-demo

The image at https://github.com/confluentinc/demo-scene/raw/master/ccloud-cube-demo/images/rest-api-call.png should show how the mobile devices (= what the users do) interact with the REST proxy. I read the intro paragraphs and kept wondering where, in the diagram, the users are represented.

Also, the intro paragraph talks about "the application" and "the moves", but lacks even a 1-liner description of what "the Cube" (the app, the moves) actually is:

This code contains a sample application called "The Cube" that allow users to interact with Kafka clusters running on Confluent Cloud by using their mobile devices. While users play with the application from their mobile devices, events containing their moves are emitted to a topic.

I found this info much later:

The application presents the user with a cube that moves as the user moves its hands and body. It relies on the device orientation API found in most modern devices.

And typo here:

Anyone capable of perfectly positioning the cube using the coordinates the represents the number three will be shown in the output of a KSQL query.

Telegram Kafka Connect

I've tried your demo and it works really great.
I want to explore more about the real-time notification, which you've still put in TODO.
But I see there's "delay_alert" on "egress" so I've tried to figure out how to make the topics until we could create a connector.

I've tried to create all of what I need like the location, delay, the train, and so on into a single markdown to single topic TRAIN_DELAY_ALERT_TG.
The problem is when I create the sink using "01_create_sinks.sh" on the "delay_alert".
here is the error and I don't know what is wrong with the configuration from the sink that you've provided.
image

I've put the key for the bot on credential.properties and enter the chat ID.
All is well if I'm using curl to send the message.
Here are the Topic and the value
image

Here are the "01_create_sinks.sh"
image

Livestreams September 1

Hello,

Following the tutorial here: https://github.com/confluentinc/demo-scene/tree/master/livestreams/september-1 and the corresponding video https://www.youtube.com/watch?v=rblw9AMyqEU, the cluster is created successfully but after importing the accounts.sql to the DB connect-cloud reports:

WorkerSourceTask{id=account-jdbc-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
The topics are successfully created in kafka cluster but no data available.

Do I miss something from the tutorial?

Thanks

ksql-workshop

elasticsearch /usr/local/bin/docker-entr ... Exit 78

Not able to retrieve proper JSON

I have a large MongoDump JSON which I need to put it in Kafka using Kafka Connect.

Using the configuration present in source-data-users-json-noschema.json, I am not able to obtain proper JSON. The keys are null (which is fine) but, the values get treated as a string and enclosed within double quotes while escaping the quotes present within.

I was able to get the proper-lookalike JSON while treating the value as org.apache.kafka.connect.storage.StringConverter.

Eitherway, I am only able to digest 15k-30k data which is not even 10% of the total data. It says the following before it stops ingesting.

[2019-03-15 08:48:13,812] ERROR WorkerSourceTask{id=orderdata-test18-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.NegativeArraySizeException
        at org.apache.kafka.connect.file.FileStreamSourceTask.poll(FileStreamSourceTask.java:141)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-03-15 08:48:13,814] ERROR WorkerSourceTask{id=orderdata-test18-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

Configuration help

I am trying to get the demo running locally but am having trouble with the confugrations... stuff like "TELEGRAM_API_TOKEN". I have my token but not sure where to put it. I see some config files in the gitignore. i.e go/telegram_config.go. I imagine thats where it goes but not sure format etc. Wondering if you could point me to some docs or examples for that? Thanks.

Issue with syslog connector using Control Center

I am having issues getting the syslog connector to work when deployed from the Control Center UI using the Docker version of Confluent Platform. I am able to build the docker-compose.yml and Dockerfile to load the connector and begin to modify the configs via Control Center. However, once I launch the connector the state will always say UNASSIGNED. I was able to easily go through the quick start with the datagen connector. I am running a local Docker container, allocating 8GB memory on my Mac. I have attached the yml file as well for situational awareness. Any help would be much appreciated.

Control Center Config for Connector

{
  "name": "syslog-tcp",
  "connector.class": "io.confluent.connect.syslog.SyslogSourceConnector",
  "tasks.max": "1",
  "syslog.listener": "TCP",
  "syslog.port": "5454",
  "topic": "syslog-tcp",
  "confluent.topic.bootstrap.servers": [
    "localhost:9092"
  ],
  "confluent.topic.replication.factor": "1"
}

docker-compose.yml

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.2.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.2.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.2.2
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/kafka-connect-syslog:latest
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
      - "5454:5454"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

  control-center:
    image: confluentinc/cp-enterprise-control-center:5.2.2
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema-registry
      - connect
      - ksql-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
      CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksql-server:
    image: confluentinc/cp-ksql-server:5.2.2
    hostname: ksql-server
    container_name: ksql-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksql-server
      KSQL_APPLICATION_ID: "cp-all-in-one"
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"

  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.2.2
    container_name: ksql-cli
    depends_on:
      - broker
      - connect
      - ksql-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    # Downrev ksql-examples to 5.1.2 due to DEVX-798 (work around issues in 5.2.0)
    image: confluentinc/ksql-examples:5.2.2
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksql-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.2.2
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

[Question] Docker-compose and relatively long time before valid Connector?

docker-compose -f dc.kafka.yml up (depends_on not shown)

zookeeper:
image: confluentinc/cp-zookeeper:5.5.0

kafka:
image: confluentinc/cp-server:5.5.0

rest-proxy:
image: confluentinc/cp-kafka-rest:5.5.0

schema-registry:
image: confluentinc/cp-schema-registry:5.5.0

kafka-connect-01:
image: confluentinc/cp-kafka-connect:5.5.0

ksqldb-server:
image: confluentinc/cp-ksqldb-server:5.5.0

But

$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'SpoolDir'

is not returned until after 3-4 minutes later? Is this normal operation? :

"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector"
"com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector"

if this is to be expected then we need to async wait for 3 mins?

Screenshot 2020-08-10 at 21 28 51

Screenshot 2020-08-10 at 21 42 26

{"error_code":500,"message":null} on creating a new connector

Hi!
I'm using kafka-connect-zero-to-hero/docker-compose.yml. I changed it by adding confluentinc/kafka-connect-ftps:1.0.2-preview here and ftp server

  vsftpd:
    image: "fauria/vsftpd:latest"
    container_name: vsftpd
    ports:
      - 21:21
      - 20:20
    environment:
      FTP_USER: "name"
      FTP_PASS: "pass"
    volumes:
      - ${PWD}/data:/data

When I'm sending

   curl -i -X PUT -H  "Content-Type:application/json" \
   http://localhost:8083/connectors/source-ftps/config \
   -d '{
           "connector.class": "io.confluent.connect.ftps.FtpsSourceConnector",
           "kafka.topic": "default.demo" ,
           "confluent.topic.bootstrap.servers": "kafka:29092",
           "ftps.host": "vsftpd",
           "ftps.username": "name",
           "ftps.password": "pass",
           "ftps.input.path": "/data"
   }'

to start connector I'm getting an error {"error_code":500,"message":null}.

Kafka-connect logs:

kafka-connect      | [2020-09-30 10:33:08,446] ERROR Uncaught exception in REST call to /connectors/FTPSConnector/config (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
kafka-connect      | java.lang.NullPointerException
kafka-connect      |    at java.io.File.<init>(File.java:277)
kafka-connect      |    at io.confluent.connect.ftps.connection.FtpsConnection.createSslContext(FtpsConnection.java:132)
kafka-connect      |    at io.confluent.connect.ftps.connection.FtpsConnection.<init>(FtpsConnection.java:45)
kafka-connect      |    at io.confluent.connect.ftps.source.validation.FtpsSourceConfigValidation.performValidation(FtpsSourceConfigValidation.java:52)
kafka-connect      |    at io.confluent.connect.utils.validators.all.ConfigValidation.validate(ConfigValidation.java:185)
kafka-connect      |    at io.confluent.connect.ftps.FtpsSourceConnector.validate(FtpsSourceConnector.java:80)
kafka-connect      |    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:313)
kafka-connect      |    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:745)
kafka-connect      |    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$6.call(DistributedHerder.java:742)
kafka-connect      |    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:342)
kafka-connect      |    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:282)
kafka-connect      |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-connect      |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect      |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
kafka-connect      |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
kafka-connect      |    at java.lang.Thread.run(Thread.java:748)

Do you have any idea what can be wrong?
Thank you in advance!

atm-fraud-detection docker-compose issue, missing ksql service

Here is some docker command output. There is no service running on ksql-server and my shell does not recognize ksql-server as a valid host. I may pursue troubleshooting on my own as I recognize I am new on docker. I am using docker edge 2.0.0.0-mac82, Engine 18.09.0 for Mac OSX (Mojave). I installed docker edge following recommendations that docker on Yosemite had severe slowness issues, which I also experienced.

MacBook-Pro-3:ksql-atm-fraud-detection philippederome$ docker-compose images
                  Container                                      Repository                      Tag       Image Id      Size 
------------------------------------------------------------------------------------------------------------------------------
ksql-atm-fraud-detection_connect-debezium_1     debezium/connect                                0.8      d40481fd6992   627 MB
ksql-atm-fraud-detection_elasticsearch_1        docker.elastic.co/elasticsearch/elasticsearch   6.5.1    32f93c89076d   738 MB
ksql-atm-fraud-detection_gess_1                 rmoff/gess                                      latest   5b9faf7af511   866 MB
ksql-atm-fraud-detection_kafka-connect_1        confluentinc/cp-kafka-connect                   5.0.1    989ec2fa6359   965 MB
ksql-atm-fraud-detection_kafka_1                confluentinc/cp-enterprise-kafka                5.0.1    e17fe44e4a50   572 MB
ksql-atm-fraud-detection_kibana_1               docker.elastic.co/kibana/kibana                 6.5.1    afcfde4729b0   696 MB
ksql-atm-fraud-detection_ksql-cli_1             confluentinc/cp-ksql-cli                        5.0.1    65ac9aec5358   529 MB
ksql-atm-fraud-detection_ksql-server_1          confluentinc/cp-ksql-server                     5.0.1    da4198ce5a8d   533 MB
ksql-atm-fraud-detection_mysql_1                debezium/example-mysql                          0.8      d4b49c9e4434   355 MB
ksql-atm-fraud-detection_schema-registry_1      confluentinc/cp-schema-registry                 5.0.1    57b952ec17bc   640 MB
ksql-atm-fraud-detection_send-gess-to-kafka_1   confluentinc/cp-enterprise-kafka                5.0.1    e17fe44e4a50   572 MB
ksql-atm-fraud-detection_zookeeper_1            confluentinc/cp-zookeeper                       5.0.1    267c6bd493c9   532 MB
MacBook-Pro-3:ksql-atm-fraud-detection philippederome$ docker-compose ps
                    Name                                   Command               State                          Ports                        
---------------------------------------------------------------------------------------------------------------------------------------------
ksql-atm-fraud-detection_connect-debezium_1     /docker-entrypoint.sh bash ...   Up      0.0.0.0:8083->8083/tcp, 8778/tcp, 9092/tcp, 9779/tcp
ksql-atm-fraud-detection_elasticsearch_1        /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 9300/tcp                    
ksql-atm-fraud-detection_gess_1                 python /scripts/gess-main.py     Up                                                          
ksql-atm-fraud-detection_kafka-connect_1        bash -c /etc/confluent/doc ...   Up      0.0.0.0:18083->18083/tcp, 8083/tcp, 9092/tcp        
ksql-atm-fraud-detection_kafka_1                /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp                              
ksql-atm-fraud-detection_kibana_1               bash -c /usr/local/bin/kib ...   Up      0.0.0.0:5601->5601/tcp                              
ksql-atm-fraud-detection_ksql-cli_1             /bin/sh                          Up                                                          
ksql-atm-fraud-detection_ksql-server_1          /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp                              
ksql-atm-fraud-detection_mysql_1                docker-entrypoint.sh mysqld      Up      3306/tcp, 33060/tcp                                 
ksql-atm-fraud-detection_schema-registry_1      /etc/confluent/docker/run        Up      8081/tcp                                            
ksql-atm-fraud-detection_send-gess-to-kafka_1   bash -c nc -v -u -l -p 690 ...   Up      9092/tcp                                            
ksql-atm-fraud-detection_zookeeper_1            /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 3888/tcp                        
MacBook-Pro-3:ksql-atm-fraud-detection philippederome$ 

ksql query timeout error in python

I have an issue running ksql query from python. always I got timeout error:

timeout Traceback (most recent call last)
/usr/lib/python3/dist-packages/urllib3/response.py in _error_catcher(self)
301 try:
--> 302 yield
303

/usr/lib/python3/dist-packages/urllib3/response.py in read_chunked(self, amt, decode_content)
600 break
--> 601 chunk = self._handle_chunk(amt)
602 decoded = self._decode(chunk, decode_content=decode_content,

/usr/lib/python3/dist-packages/urllib3/response.py in _handle_chunk(self, amt)
566 returned_chunk = self._fp._safe_read(self.chunk_left)
--> 567 self._fp._safe_read(2) # Toss the CRLF at the end of the chunk.
568 self.chunk_left = None

/usr/lib/python3.6/http/client.py in _safe_read(self, amt)
611 while amt > 0:
--> 612 chunk = self.fp.read(min(amt, MAXAMOUNT))
613 if not chunk:

/usr/lib/python3.6/socket.py in readinto(self, b)
585 try:
--> 586 return self._sock.recv_into(b)
587 except timeout:

timeout: timed out

During handling of the above exception, another exception occurred:

ReadTimeoutError Traceback (most recent call last)
/usr/lib/python3/dist-packages/requests/models.py in generate()
744 try:
--> 745 for chunk in self.raw.stream(chunk_size, decode_content=True):
746 yield chunk

/usr/lib/python3/dist-packages/urllib3/response.py in stream(self, amt, decode_content)
431 if self.chunked and self.supports_chunked_reads():
--> 432 for line in self.read_chunked(amt, decode_content=decode_content):
433 yield line

/usr/lib/python3/dist-packages/urllib3/response.py in read_chunked(self, amt, decode_content)
625 if self._original_response:
--> 626 self._original_response.close()

/usr/lib/python3.6/contextlib.py in exit(self, type, value, traceback)
98 try:
---> 99 self.gen.throw(type, value, traceback)
100 except StopIteration as exc:

/usr/lib/python3/dist-packages/urllib3/response.py in _error_catcher(self)
306 # there is yet no clean way to get at it from this context.
--> 307 raise ReadTimeoutError(self._pool, None, 'Read timed out.')
308

ReadTimeoutError: HTTPConnectionPool(host='localhost', port=8088): Read timed out.

During handling of the above exception, another exception occurred:

ConnectionError Traceback (most recent call last)
in
1 query = client.query('select * from user_tweet_count')
----> 2 for item in query: print(item)

/usr/local/lib/python3.6/dist-packages/ksql/api.py in query(self, query_string, encoding, chunk_size, stream_properties, idle_timeout)
69 streaming_response = self._request(endpoint='query', sql_string=query_string, stream_properties=stream_properties)
70 start_idle = None
---> 71 for chunk in streaming_response.iter_content(chunk_size=chunk_size):
72 if chunk != b'\n':
73 start_idle = None

/usr/lib/python3/dist-packages/requests/models.py in generate()
750 raise ContentDecodingError(e)
751 except ReadTimeoutError as e:
--> 752 raise ConnectionError(e)
753 else:
754 # Standard file-like object.

ConnectionError: HTTPConnectionPool(host='localhost', port=8088): Read timed out.

./scripts/ccloud/ccloud_stack_create.sh - Fails with default config

Following the instructions the default make script is failing.
( This is using GCP as per the YouTube video and readme ).

Running MacOS (BigSur), zsh and fully updated brew stack.

./scripts/ccloud/ccloud_stack_create.sh
This demo uses real Confluent Cloud ☁️  resources.
 💰 To avoid unexpected charges, carefully evaluate the cost of resources before launching the script and ensure all resources are destroyed after you are done running it.
Do you still want to run this script? [y/n] y
Do you also want to create a Confluent Cloud 🚀  ksqlDB app (hourly charges may apply)? [y/n]
y

☁️  Creating Confluent Cloud stack for service account demo-app-9402, ID: 165173.
Error: accepts 1 arg(s), received 41
Usage:
  ccloud kafka cluster describe <id> [flags]

Flags:
  -o, --output string        Specify the output format as "human", "json", or "yaml". (default "human")
      --environment string   Environment ID.
      --context string       Context name.

Global Flags:
  -h, --help            Show help for this command.
  -v, --verbose count   Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Error: resource "Error" not found

Suggestions:
    Check that the resource "Error" exists.
    To list Kafka clusters, use `ccloud kafka cluster list`.
    To check schema-registry cluster info, use `ccloud schema-registry cluster describe`.
    To list KSQL clusters, use `ccloud ksql app list`.

Waiting up to 720 seconds for Confluent Cloud cluster to be ready and for credentials to propagate

[Question] How do you configure the MQTT client id in a MqttConnector?

I'm evaluating io.confluent.connect.mqtt.MqttSourceConnector and io.confluent.connect.mqtt.MqttSinkConnector.

I notice that the connector misses MQTT messages from Mosqiutto when connect is not running. Even tough mqtt.clean.session.enabled is set to false.
In the Mosquitto logs I see that the connector uses a different MQTT client id after each restart. If I could fix the client id, the MQTT borker would deliver message received when the connector reconnects. However, in the documentation I can't find any property to set the MQTT client id.

I'm wondering, is this bit of configuration undocumented or just missing altogether?

demo-scene/ksql-workshop master ❯ docker-compose pull error

Pulling zookeeper        ... done
Pulling kafka            ... done
Pulling schema-registry  ... done
Pulling kafka-connect    ... done
Pulling ksql-server      ... done
Pulling ksql-cli         ... done
Pulling kafkacat         ... done
Pulling datagen-ratings  ... done
Pulling control-center   ... done
Pulling mysql            ... done
Pulling connect-debezium ... done
Pulling elasticsearch    ... done
Pulling kibana           ... done
Pulling websockets       ... error
Pulling nginx            ... done

ERROR: for websockets  b"pull access denied for ksql-workshop-websockets, repository does not exist or may require 'docker login': denied: requested access to the resource is denied"
ERROR: pull access denied for ksql-workshop-websockets, repository does not exist or may require 'docker login': denied: requested access to the resource is denied

MS SQL Server driver is not registering in Kafka Connect Runtime Log

I am trying to connect to MS SQL Server with io.confluent.connect.jdbc.JdbcSourceConnector, so i put mssql-jdbc-7.4.1.jre8.jar in following path in kafka connect runtime.
/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib

and CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'

The problem is MS SQL server driver is not loading & registering in Kafka connect runtime.
But some other plugin drivers like oracle, mysql are registering which are there in same folder above.

Kafka connect runtime log is below.

kafka-connect | [2020-05-08 13:00:32,528] INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect | [2020-05-08 13:00:32,528] INFO Added plugin 'io.confluent.connect.jdbc.JdbcSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect | [2020-05-08 13:00:32,530] DEBUG Registered java.sql.Driver: jTDS 1.3.1 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect | [2020-05-08 13:00:32,558] DEBUG Registered java.sql.Driver: oracle.jdbc.OracleDriver@1fde4f40 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect | [2020-05-08 13:00:32,563] DEBUG Registered java.sql.Driver: org.postgresql.Driver@290aeb20 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
kafka-connect | [2020-05-08 13:00:32,566] DEBUG Registered java.sql.Driver: org.sqlite.JDBC@4393593c to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

What should i do to load and register MS SQL Server driver in Kafka Connect Runtime?

request.timeout.ms override

I see a few examples where we do things like:

request.timeout.ms=20000

This seems unnecessary since the default is reasonable (30s). It would be nice to remove it to keep things simpler.

Elasticsearch NOT running on docker container, asking for more memory

Hello,
I was working on making this code work on Linu Ubuntu(18.04) LTS version on my laptop(32gb RAM).
All the services in Docker container are up and running. The issue is with Elasticsearch container that is failing to start and thus Kibana aslo will notwork for visualization.
Attached is the log file....for ES not running.

I am trying to run...kafka-connect-zero-to-hero code base and the using the regular .yml file to create the environment and not the extended.yml file

On a happy path note.... I was able to run this exact codebase on Mac, w/o any issues. Everything worked like a charm. In reality this sould work in Linux env. as well.

Any help is appreciated in this regards.
Thanks
AJ
elasticsearch 2020-04-08_02-06-09.txt

[Question] Docker not working for this to work?

zsh ~ open -a Docker

Expected:
Docker daemon to start

Now:
Unable to find application named 'docker'

zsh ~ docker --version
Docker version 19.03.12, build 48a6621

Mac OSX Catalina 10.15.6

EDIT:

Fixed it now

  1. Dragged Docker.app from Applications into Trash
  2. brew cask reinstall docker
  3. open -a Docker now works

ATM Data Generator not emitting right transactions

Hi Robin,

I'm following your demo. I got issue with Gess. As you are using UK ATM transaction data. I have downloaded new Zealand ATM data from open street map. As per instruction I have generate csv file and then I copied atmnz_export.csv under data directory and update gess.conf file accordingly.

Restart the container and verity gess is emitting NZ ATM transaction data by this command

nc -v -u -l 6900 

but when I tried push same transaction in new topic using following command

nc -v -u -l 6900 | \
docker run --interactive --rm --network ksqlatmfrauddetection_default confluentinc/cp-kafkacat \
kafkacat -b kafka:29092 -P -t atm_txns_akl

I got UK ATM transaction in topic. If I stop and rerun the same command it start pushing NZ data. it is not consistent.

I don't know how its picking UK ATM transaction data even I have deleted all files from gess data directory but it still emitting UK ATM data. Any thoughts ?

reference setup.sh

In the oracle-ksql-elasticsearch example, the file that is inside docker-compose/scripts should/setup.sh be in the docker-compose/setup.sh directory because the references inside are made to that directory in addition to the context of docker-compose:
docker-compose exec connect-debezium bash -c '**/scripts/**create-ora-source-debezium-xstream.sh'

can I do a pull request?

[Question] "schema.generation.enabled": "true" <-- not capturing the order_id as expected?

Added

"schema.generation.key.fields": "order_id",

in curl:

{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_01",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\.csv",
"schema.generation.enabled": "true",
"schema.generation.key.fields": "order_id",
"csv.first.row.as.header": "true"
}

but the generated:

➜ data git:(master) ✗ docker exec kafkacat kafkacat -b kafka:29092 -t orders_spooldir_01 -C -o-1 -J -s key=s -s value=avro -r http://schema-registry:8081 | jq '.'
{
"topic": "orders_spooldir_01",
"partition": 0,
"offset": 999,
"tstype": "create",
"ts": 1597701710423,
"headers": [
"file.name",
"orders.csv",
"file.path",
"/data/unprocessed/orders.csv",
"file.length",
"83034",
"file.offset",
"1001",
"file.last.modified",
"2020-08-17T20:40:20.000Z"
],
"key": "Struct{}", <---- Empty?

is still Empty?

expecting:

"key": "Struct{order_id=1000}",

EDIT:

Worked but needed to restart Kafka cluster?

kafka connect jar links don't work

Greetings,
Kafka connect does not download jars with curl https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.16.tar.gz as specified in the command section of docker-compose file.

kafka-prob
I also logged in to the container did it manually and it did not download the jar.(see above pic)

I also cant find download jars in the /usr/share/java that I configured in Kafka connect.
pwd

Attaching docker-compose file, that's almost the same as yours.

docker-compose.zip

[Question] Topic not created after SpoolDir creation?

➜ data git:(master) ✗ curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-01/config
-d '{
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"topic": "orders_spooldir_01",
"input.path": "/data/unprocessed",
"finished.path": "/data/processed",
"error.path": "/data/error",
"input.file.pattern": ".*\.csv",
"schema.generation.enabled": "true",
"csv.first.row.as.header": "true"
}'

HTTP/1.1 201 Created
Date: Thu, 03 Sep 2020 18:30:41 GMT
Location: http://localhost:8083/connectors/source-csv-spooldir-01
Content-Type: application/json
Content-Length: 421
Server: Jetty(9.4.24.v20191120)

{"name":"source-csv-spooldir-01","config":{"connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector","topic":"orders_spooldir_01","input.path":"/data/unprocessed","finished.path":"/data/processed","error.path":"/data/error","input.file.pattern":".*\.csv","schema.generation.enabled":"true","csv.first.row.as.header":"true","name":"source-csv-spooldir-01"},"tasks":[],"type":"source"}%

Looks ok so far, but why topic is not created?

➜ data git:(master) ✗ docker exec kafkacat kafkacat -b kafka:29092 -L -J|jq '.topics[].topic'|sort
"__confluent.support.metrics"
"__consumer_offsets"
"__transaction_state"
"_confluent-ksql-confluent_rmoff_01_command_topic"
"_schemas"
"confluent_rmoff_01ksql_processing_log"
"docker-connect-configs"
"docker-connect-offsets"
"docker-connect-status"

where is orders_spooldir_01?

Trying to use Debezium connector MySQL giving deserialization error

The kafka topic gets created and starts streaming and task fails with below error soon after

{"id":0,"state":"FAILED","worker_id":"kafka-connect:8083","trace":"org.apache.kafka.connect.errors.ConnectException: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1570956435000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=80, nextPosition=708104, flags=0}...

The connector is created as below

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d '{
      "name": "debezium-source-source1",
      "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "database.hostname": "mysql",
            "database.port": "3306",
            "database.user": "username",
            "database.password": "*****",
 	    "database.server.id": "1",
            "database.server.name": "db",
            "table.whitelist": "schema.table",
            "database.history.kafka.bootstrap.servers": "kafka:29092",
            "database.history.kafka.topic": "dbhistory.table" ,
            "include.schema.changes": "true"
       }
    }'

The mysql version is 8.0.17
I've created the history topic beforehand as it was giving an error dbtopic is missing at first.

The connector created successfully and started streaming data and soon after task failed with above error. I checked topic and could see initial data it pulled
I've also ensured the MySQL global settings are set as per
https://debezium.io/documentation/faq/#what_is_causing_intermittent_code_eventdatadeserializationexception_code_s_with_the_mysql_connector

Tiered-Storage readme.md consume command error

(Reference to tiered-storage/readme.md file) Looks like the command given for the consumer has a different topic and port than expected ... it should be the below. Let me know if you would like for me to submit a pull request for this.

kafka-consumer-perf-test --topic test-topic
--messages 5000
--threads 1
--broker-list localhost:9091
--timeout 60000
--consumer.config config/consumer.config

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.