Git Product home page Git Product logo

stream-reactor's Introduction

Actions Status

Join us on slack Alt text

Lenses Connectors for Apache Kafka

Lenses.io is the leader in offering Apache 2 licensed Kafka Connectors (Stream Reactor) since 2016.

Enterprise Support for Kafka Connectors

Lenses offers the leading Developer Experience solution for engineers building real-time applications on any Apache Kafka (lenses.io). Subscribed customers are entitled to full 24x7 support for selected Kafka Connectors. This includes priority over feature requests and security incident SLAs. Email [email protected] for more information.

Engage with the Community

Speak to us on our Community Slack channel (Register at https://launchpass.com/lensesio) or ask the Community a question in our Ask Marios forum.

Kafka Connectors Roadmap

A series of next-generation Connectors are in active development. Give us your feedback of which connectors we should be working on or to to get the latest information, send us an email at [email protected]

Stream Reactor Kafka Connectors

Alt text FOSSA Status

A collection of components to build a real time ingestion pipeline.

Kafka Compatibility

  • Kafka 2.8 -> 3.5 (Confluent 6.2 -> 7.5) - Stream Reactor 4.1.0+
  • Kafka 3.1 (Confluent 7.1) - Stream Reactor 4.0.0 (Kafka 3.1 Build)
  • Kafka 2.8 (Confluent 6.2) - Stream Reactor 4.0.0 (Kafka 2.8 Build)
  • Kafka 2.5 (Confluent 5.5) - Stream reactor 2.0.0+
  • Kafka 2.0 -> 2.4 (Confluent 5.4) - Stream reactor 1.2.7

DEPRECATION NOTICE

In the next major release, Elasticsearch 6 support will be removed, to be replaced with OpenSearch and Elasticsearch 8 support.

The following connectors have been deprecated and are no longer included in future releases:

  • Elasticsearch 6
  • Kudu
  • Hazelcast
  • HBase
  • Hive
  • Pulsar

Connectors

Please take a moment and read the documentation and make sure the software prerequisites are met!!

Connector Type Description Docs
AWS S3 Sink Copy data from Kafka to AWS S3. Docs
AWS S3 Source Copy data from AWS S3 to Kafka. Docs
Azure Data Lake (Beta) Sink Copy data from Kafka to Azure Data Lake Docs
AzureDocumentDb Sink Copy data from Kafka and Azure Document Db. Docs
Cassandra Source Copy data from Cassandra to Kafka. Docs
*Cassandra Sink Certified DSE Cassandra, copy data from Kafka to Cassandra. Docs
Elastic 6 Sink Copy data from Kafka to Elastic Search 6.x w. tcp or http Docs
Elastic 7 Sink Copy data from Kafka to Elastic Search 7.x w. tcp or http Docs
FTP/HTTP Source Copy data from FTP/HTTP to Kafka. Docs
Google Cloud Storage (Beta) Sink Copy data from Kafka to Google Cloud Storage. Docs
Google Cloud Storage (Beta) Source Copy data from Google Cloud Storage to Kafka. Docs
HTTP (Beta) Sink Copy data from Kafka to HTTP. Docs
InfluxDb Sink Copy data from Kafka to InfluxDb. Docs
JMS Source Copy data from JMS topics/queues to Kafka. Docs
JMS Sink Copy data from Kafka to JMS. Docs
MongoDB Sink Copy data from Kafka to MongoDB. Docs
MQTT Source Copy data from MQTT to Kafka. Docs
MQTT Sink Copy data from Kafka to MQTT. Docs
Redis Sink Copy data from Kafka to Redis. Docs

Release Notes

Please see the Stream Reactor Release Notes at Lenses Documentation.

Building

To build:

sbt clean compile

To test:

sbt test

To create assemblies:

sbt assembly

To build a particular project:

sbt "project cassandra" compile

To test a particular project:

sbt "project cassandra" test

To create a jar of a particular project:

sbt "project cassandra" assembly

Running E2E tests

If not already built, you must first build the connector archives:

sbt "project cassandra" assembly
sbt "project elastic6" assembly 
sbt "project mongodb" assembly
sbt "project redis" assembly

To run the tests:

sbt e2e:test

Github Workflows

For a detailed explanation of the Github workflow, please see our Github Actions Workflow Guide.

Contributing

We'd love to accept your contributions! Please use GitHub pull requests: fork the repo, develop and test your code, semantically commit and submit a pull request. Thanks!

License

FOSSA Status

stream-reactor's People

Contributors

afausti avatar andmarios avatar andrewstevenson avatar antwnis avatar boeboe avatar caiooliveiraeti avatar carolinaloureiro avatar codesmell avatar davidsloan avatar fartariatomas avatar gomati-mu avatar goncaloccastro avatar jbruggem avatar johnathana avatar lauramorab avatar matthedude avatar mbarlot avatar michaelandrepearce avatar orbiran88 avatar pedrobento988 avatar rdebokx avatar richardlundsky avatar rollulus avatar scala-steward avatar sdlcwangsong avatar sksamuel avatar sotirissl avatar spirosoik avatar stheppi avatar v-gerasimov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stream-reactor's Issues

Ability to configure schema name

Similar to :

confluentinc/kafka-connect-jdbc#90

The generated avro schemas by the kafka connect cassandra source default to io.confluent.avro.ConnectDefault because their schema name is null.

This causes the avro plugin to only generate this unique class.

A workaround would also be appreciated , as I cannot use serde's now.

Authorization password in Redis connect is not optional

Documentation says that connect.redis.connection.password should be optional. But without this property connector doesn't work. I am always getting "Missing required configuration "connect.redis.connection.password" which has no default value" error.

Detected Guava issue #1635.

Hello, I have a problem with kafka-connect-cassandra. With Confluent Platform 3.0.0 it`s work just fine. But I have a problem with CP 3.0.1. I know there were a pull request #73 that tried to fix it but it doesn't work for me. There are logs:

 [2016-10-20 17:09:30,883] INFO 
    ____        __        __  ___                  __        _
   / __ \____ _/ /_____ _/  |/  /___  __  ______  / /_____ _(_)___  ___  ___  _____
  / / / / __ `/ __/ __ `/ /|_/ / __ \/ / / / __ \/ __/ __ `/ / __ \/ _ \/ _ \/ ___/
 / /_/ / /_/ / /_/ /_/ / /  / / /_/ / /_/ / / / / /_/ /_/ / / / / /  __/  __/ /
/_____/\__,_/\__/\__,_/_/  /_/\____/\__,_/_/ /_/\__/\__,_/_/_/ /_/\___/\___/_/
       ______                                __           _____ _       __
      / ____/___ _______________ _____  ____/ /________ _/ ___/(_)___  / /__
     / /   / __ `/ ___/ ___/ __ `/ __ \/ __  / ___/ __ `/\__ \/ / __ \/ //_/
    / /___/ /_/ (__  |__  ) /_/ / / / / /_/ / /  / /_/ /___/ / / / / / ,<
    \____/\__,_/____/____/\__,_/_/ /_/\__,_/_/   \__,_//____/_/_/ /_/_/|_|

 By Andrew Stevenson. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:64)
[2016-10-20 17:09:30,957] ERROR Task cassandra-drones-path-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.ExceptionInInitializerError
    at com.datamountaineer.streamreactor.connect.cassandra.CassandraConnection$.getCluster(CassandraConnection.scala:43)
    at com.datamountaineer.streamreactor.connect.cassandra.CassandraConnection$.apply(CassandraConnection.scala:33)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraWriter$$anonfun$1.apply(CassandraWriter.scala:33)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraWriter$$anonfun$1.apply(CassandraWriter.scala:33)
    at scala.util.Try$.apply(Try.scala:192)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraWriter$.apply(CassandraWriter.scala:33)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask.start(CassandraSinkTask.scala:78)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use.  This introduces codec resolution issues and potentially other incompatibility issues in the driver.  Please upgrade to Guava 16.01 or later.
    at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62)
    at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36)
    at com.datastax.driver.core.Cluster.(Cluster.java:68)
    ... 16 more
[2016-10-20 17:09:30,961] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

RethinkDB issues

  1. Not sure why 'eee' in \stream-reactor\kafka-connect-rethink\src\main\scala\com\datamountaineeer\ :)
  2. AUTOCREATE does not work with tasks.max > 1 - concurrent table creation is failing on rethinkdb
  3. Primary Key is not created despite correct log entry
    Setting primary as first field found: PKEY (com.datamountaineeer.streamreactor.connect.rethink.sink.ReThinkSinkConverter$:63)
  4. Need a way to ignore 'binary' fields (with settings?) which is legal type in AVRO - otherwise stream-reactor throwing exception and terminating

Align common settings

Use same format for topic table mappings and field selections across all sources/sinks.

Enhance Redis KCQL for time-series ( Using Redis Sorted Sets)

Redis & Timeseries

Redis is used for timeseries use-cases like:
โ€ข Sell price and volume of a traded stock
โ€ข Data gathered from sensors embedded inside IoT devices

  1. A common technique is to use a redis Sorted Set and create a set i.e. called and
    store i) the timestamp in millis as the score and ii) encode both the actual value/s & the
    timestamp as the value, in a flat or Json structure

flat-example:

ZADD USD2GBP 1392141527245 1392141527245:0.7983
ZADD USD2GBP 1392141529245 1392141529245:0.7982
ZRANGE USD2GBP 0 -1

json-example:

ZADD EUR2GBP 1392141527298 '{"timestamp":1392141527245,"price":0.8562}'
ZADD EUR2GBP 1392141529299 '{"timestamp":1392141529245,"price":0.8603}'
ZRANGE EUR2GBP 0 -1

http://www.slideshare.net/cacois/cois-palkostrata2013/29
http://www.slideshare.net/cacois/cois-palkostrata2013/33

This encoding allows read queries for historical data like:

zrangebyscore USD2GBP <currentTimeInMillis - 86400000> <currentTimeInMillis>

To enable this capability as KCQL we will re-use STORED AS

Failed to bind port 9042 on 127.0.0.1

When i type the gradle test in the stream-reactor directory.It shows an error;

2016-12-01 18:42:13,808 ERROR [pool-2-thread-1] [service.StartupChecks] [execute:153] cassandra.jmx.local.port missing from cassandra-env.sh, unable to start local JMX service.
TestCassandraReaderIncr:
Exception (java.lang.IllegalStateException) encountered during startup: Failed to bind port 9042 on 127.0.0.1.
java.lang.IllegalStateException: Failed to bind port 9042 on 127.0.0.1.
at org.apache.cassandra.transport.Server.run(Server.java:189)
at org.apache.cassandra.transport.Server.start(Server.java:122)
at org.apache.cassandra.service.CassandraDaemon.start(CassandraDaemon.java:430)
at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:531)
at org.cassandraunit.utils.EmbeddedCassandraServerHelper$1.run(EmbeddedCassandraServerHelper.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2016-12-01 18:42:22,889 ERROR [pool-2-thread-1] [service.CassandraDaemon] [exitOrFail:638] Exception encountered during startup
java.lang.IllegalStateException: Failed to bind port 9042 on 127.0.0.1.
at org.apache.cassandra.transport.Server.run(Server.java:189)
at org.apache.cassandra.transport.Server.start(Server.java:122)
at org.apache.cassandra.service.CassandraDaemon.start(CassandraDaemon.java:430)
at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:531)
at org.cassandraunit.utils.EmbeddedCassandraServerHelper$1.run(EmbeddedCassandraServerHelper.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
:kafka-connect-cassandra:test FAILED

FAILURE: Build failed with an exception.

Cassandra Sink - Error when consuming Avro messages

Hi,

I am trying to use cassandra-sink connector to gather avro messages in a topic. Each message is a GenericRecord. I am taking the "unknown magic byte" error as shown below;

[2016-11-01 10:58:51,944] INFO WorkerSinkTask{id=cassandra-sink-products-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261) [2016-11-01 10:58:51,959] ERROR Task cassandra-sink-products-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:356) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! [2016-11-01 10:58:51,959] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143) [2016-11-01 10:58:51,959] INFO Stopping Cassandra sink. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:95) [2016-11-01 10:58:51,959] INFO Shutting down Cassandra driver session and cluster. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:166) [2016-11-01 10:58:54,188] INFO Publish thread interrupted! (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:161) [2016-11-01 10:58:54,296] INFO Publishing Monitoring Metrics stopped for clientID=consumer-4 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:173) [2016-11-01 10:58:54,296] INFO Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:658) [2016-11-01 10:58:54,300] INFO Closed monitoring interceptor for client ID=consumer-4 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:195)

I try both the official apache-kafka and confluent fork. The error is the same.

"io.confluent" % "kafka-avro-serializer" % "3.0.1",
"org.apache.kafka" % "kafka_2.11" % "0.10.0.1-cp1"
//"org.apache.kafka" % "kafka_2.11" % "0.10.0.1"

I can successfully feed the orders table (producer records are given from the console-producer), as explained in the example (http://docs.datamountaineer.com/en/latest/cassandra-sink.html). However, when I try using records in a topic, it does not work.

Does anyone have a solution for this problem? I investigated the problem in the web and try many things but could not find a working solution.

Thanks,
Ferhat

[Blockchain] Failed to serialize offset data

I'm getting serialisation errors while running the blockchain source connector in standalone mode. The connector still produces data that I can read successfully with another Connect sink connector. But I'm seeing the following NullPointerExceptions while the offset is being committed:

[2016-12-06 09:19:18,292] INFO Kafka version : 0.10.1.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-12-06 09:19:18,292] INFO Kafka commitId : 3402a74efb23d1d4 (org.apache.kafka.common.utils.AppInfoParser:84)
[2016-12-06 09:19:18,294] INFO Created connector blockchain-source (org.apache.kafka.connect.cli.ConnectStandalone:93)
[2016-12-06 09:19:18,305] INFO
  ____        _        __  __                   _        _
 |  _ \  __ _| |_ __ _|  \/  | ___  _   _ _ __ | |_ __ _(_)_ __   ___  ___ _ __
 | | | |/ _` | __/ _` | |\/| |/ _ \| | | | '_ \| __/ _` | | '_ \ / _ \/ _ \ '__|
 | |_| | (_| | || (_| | |  | | (_) | |_| | | | | || (_| | | | | |  __/  __/ |
 |____/ \__,_|\__\__,_|_|  |_|\___/ \__,_|_| |_|\__\__,_|_|_| |_|\___|\___|_|
  ____  _            _     ____ _           _         ____ by Stefan Bocutiu
 | __ )| | ___   ___| | __/ ___| |__   __ _(_)_ __   / ___|  ___  _   _ _ __ ___ ___
 |  _ \| |/ _ \ / __| |/ / |   | '_ \ / _` | | '_ \  \___ \ / _ \| | | | '__/ __/ _ \
 | |_) | | (_) | (__|   <| |___| | | | (_| | | | | |  ___) | (_) | |_| | | | (_|  __/
 |____/|_|\___/ \___|_|\_\\____|_| |_|\__,_|_|_| |_| |____/ \___/ \__,_|_|  \___\___| (com.datamountaineer.streamreactor.connect.blockchain.source.BlockchainSourceTask:41)
[2016-12-06 09:19:18,320] INFO
Configuration for task
Map(connector.class -> com.datamountaineer.streamreactor.connect.blockchain.source.BlockchainSourceConnector, max.tasks -> 1, task.class -> com.datamountaineer.streamreactor.connect.blockchain.source.BlockchainSourceTask, name -> blockchain-source, connect.blockchain.source.kafka.topic -> blockchain)
       (com.datamountaineer.streamreactor.connect.blockchain.source.BlockchainSourceTask:42)
[2016-12-06 09:19:18,322] INFO AbstractConfig values:
	connect.blockchain.source.kafka.topic = blockchain
	connect.blockchain.source.subscription.addresses = null
	connect.blockchain.source.url = wss://ws.blockchain.info/inv
 (org.apache.kafka.common.config.AbstractConfig:180)
[2016-12-06 09:19:19,770] INFO Data manager started (com.datamountaineer.streamreactor.connect.blockchain.source.BlockchainSourceTask:57)
[2016-12-06 09:19:19,770] INFO Source task WorkerSourceTask{id=blockchain-source-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2016-12-06 09:19:21,703] INFO Reflections took 4098 ms to scan 71 urls, producing 6430 keys and 66195 values  (org.reflections.Reflections:229)
[2016-12-06 09:20:18,293] ERROR CRITICAL: Failed to serialize offset data, making it impossible to commit offsets under namespace blockchain-source. This likely won't recover unless the unserializable partition or offset information is overwritten. (org.apache.kafka.connect.storage.OffsetStorageWriter:154)
[2016-12-06 09:20:18,293] ERROR Cause of serialization failure: (org.apache.kafka.connect.storage.OffsetStorageWriter:157)
java.lang.NullPointerException
	at org.apache.kafka.connect.storage.OffsetUtils.validateFormat(OffsetUtils.java:51)
	at org.apache.kafka.connect.storage.OffsetStorageWriter.doFlush(OffsetStorageWriter.java:142)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:319)
	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2016-12-06 09:20:18,295] ERROR Failed to flush org.apache.kafka.connect.runtime.WorkerSourceTask$2@32c907d2 offsets to storage:  (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
java.lang.NullPointerException
	at org.apache.kafka.connect.storage.OffsetUtils.validateFormat(OffsetUtils.java:51)
	at org.apache.kafka.connect.storage.OffsetStorageWriter.doFlush(OffsetStorageWriter.java:142)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:319)
	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:107)
	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:44)
	at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:73)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2016-12-06 09:20:18,295] ERROR Failed to commit offsets for WorkerSourceTask{id=blockchain-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109)

I'm using kafka-connect-blockchain-0.2.2-3.0.1-all.jar which I've compiled from source at rev 422838a.

Here is the connector config blockchain-source.properties:

name=blockchain-source
connector.class=com.datamountaineer.streamreactor.connect.blockchain.source.BlockchainSourceConnector
max.tasks=1
connect.blockchain.source.kafka.topic = blockchain

And the Kafka Connect worker properties connect-avro-standalone.properties:

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

NoSuchMethodError (kafka-connect-hbase)

Hello, I got this NoSuchMethodError message when I ran connector in standalone mode with hbase-sink.properties.
I installed kafka version 0.9 on my local.
As you see error message, "subscribe(java.util.List<java.lang.String> topics, ConsumerRebalanceListener listener)" method is possible on 0.9 version.
But datamountaineer kafka connector is dependent with 0.10 version.

Should I upgrade my kafka version to 0.10?
If I still want to use the 0.9 version, how can I modify the configuration?

Error message during running connector

[2016-08-16 17:54:23,913] INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-08-16 17:54:23,913] INFO Kafka commitId : b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser:84)
[2016-08-16 17:54:23,915] INFO Created connector person-hbase-test (org.apache.kafka.connect.cli.ConnectStandalone:82)
[2016-08-16 17:54:23,916] ERROR Thread WorkerSinkTask-person-hbase-test-0 exiting with uncaught exception: (org.apache.kafka.connect.util.ShutdownableThread:84)
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at org.apache.kafka.connect.runtime.WorkerSinkTask.joinConsumerGroupAndStart(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:54)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Exception in thread "WorkerSinkTask-person-hbase-test-0" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at org.apache.kafka.connect.runtime.WorkerSinkTask.joinConsumerGroupAndStart(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:54)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

Improvement on Cassandra source

The value of the parameter connect.cassandra.authentication.mode can easily be inferred. So remove this configuration parameter and infer in code like

if ( ( connect.cassandra.username.length >= 0 ) && ( connect.cassandra.password >= 0 ) ) {
  // use authentication
}  

I also suggest to rename connect.cassandra.contact.points -> connect.cassandra.hosts with description The cassandra hosts to connect to

Also you mention connect.kudu.sink.error.policy
at
http://docs.datamountaineer.com/en/latest/cassandra.html#source-connector-configuration-incremental
I guess that is a type

Classloader conflict when using elasticsink

Hi,

I am trying out the stream-reactor elastic search sink. Currently I get an error when starting the plugin:

[2016-10-18 15:51:10,490] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:769)
[2016-10-18 15:51:10,490] INFO

    ____        __        __  ___                  __        _
   / __ \____ _/ /_____ _/  |/  /___  __  ______  / /_____ _(_)___  ___  ___  _____
  / / / / __ `/ __/ __ `/ /|_/ / __ \/ / / / __ \/ __/ __ `/ / __ \/ _ \/ _ \/ ___/
 / /_/ / /_/ / /_/ /_/ / /  / / /_/ / /_/ / / / / /_/ /_/ / / / / /  __/  __/ /
/_____/\__,_/\__/\__,_/_/  /_/\____/\__,_/_/ /_/\__/\__,_/_/_/ /_/\___/\___/_/
       ________           __  _      _____ _       __
      / ____/ /___ ______/ /_(_)____/ ___/(_)___  / /__
     / __/ / / __ `/ ___/ __/ / ___/\__ \/ / __ \/ //_/
    / /___/ / /_/ (__  ) /_/ / /__ ___/ / / / / / ,<
   /_____/_/\__,_/____/\__/_/\___//____/_/_/ /_/_/|_|


by Andrew Stevenson
       (com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask:36)
[2016-10-18 15:51:10,490] INFO ElasticSinkConfig values:
        connect.elastic.url = ec2-xx-xx-xx-xx.eu-west-1.compute.amazonaws.com:9300
        connect.elastic.url.prefix = elasticsearch
        connect.elastic.cluster.name = elasticsearch-kafka
        connect.elastic.export.route.query = INSERT INTO text SELECT * FROM text
 (com.datamountaineer.streamreactor.connect.elastic.config.ElasticSinkConfig:178)
[2016-10-18 15:51:10,531] INFO [Inertia] modules [], plugins [], sites [] (org.elasticsearch.plugins:180)
[2016-10-18 15:51:10,539] ERROR Task elasticsearch-sink-connector-text-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NoSuchMethodError: com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
        at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:190)
        at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
        at com.sksamuel.elastic4s.ElasticClient$.transport(ElasticClient.scala:111)
        at com.datamountaineer.streamreactor.connect.elastic.ElasticWriter$.apply(ElasticWriter.scala:43)
        at com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask.start(ElasticSinkTask.scala:56)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I was wondering if you also have seen this problem or have any clue where the source of the classloader conflict lies...?

Best regards,
Bart

ERROR Commit of WorkerSinkTask {...} offsets failed due to exception while flushing: (org.apache.kafka.connect.runtime.WorkerSinkTask:288)

I'm generating messages from Python to Kafka and having issues when running the kudu connector. Specifically, I can see the output from python is being written properly by running ./kafka-avro-console-consumer, however, when I run the kudu sink I am receiving the following error.

[2016-11-14 20:06:09,297] INFO KuduSinkConfig values:
	connect.kudu.export.route.query = INSERT INTO claims_test SELECT * FROM test.python.msg
	connect.kudu.max.retires = 20
	connect.kudu.sink.retry.interval = 60000
	connect.kudu.sink.batch.size = 1
	connect.kudu.sink.schema.registry.url = http://192.168.233.3:8081
	connect.kudu.master = 10.12.225.75:7051
	connect.kudu.sink.error.policy = throw
	connect.kudu.sink.bucket.size = 3
 (com.datamountaineer.streamreactor.connect.kudu.config.KuduSinkConfig:178)
[2016-11-14 20:06:09,297] INFO Connecting to Kudu Master at 10.12.225.75:7051 (com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter$:41)
[2016-11-14 20:06:09,301] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:769)
[2016-11-14 20:06:09,305] INFO Initialising Kudu writer (com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter:49)
[2016-11-14 20:06:09,314] INFO Found schemas for test.python.msg (com.datamountaineer.streamreactor.connect.schemas.SchemaRegistry$:55)
[2016-11-14 20:06:09,315] INFO Sink task WorkerSinkTask{id=test.python.msg-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:208)
[2016-11-14 20:06:09,419] INFO Discovered coordinator 192.168.233.3:9092 (id: 2147483646 rack: null) for group connect-test.python.msg. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:528)
[2016-11-14 20:06:09,421] INFO Revoking previously assigned partitions [] for group connect-test.python.msg (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:292)
[2016-11-14 20:06:09,422] INFO (Re-)joining group connect-test.python.msg (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:349)
[2016-11-14 20:06:09,433] INFO Successfully joined group connect-test.python.msg with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:457)
[2016-11-14 20:06:09,433] INFO Setting newly assigned partitions [test.python.msg-0] for group connect-test.python.msg (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:231)
[2016-11-14 20:06:09,449] ERROR Commit of WorkerSinkTask{id=test.python.msg-0} offsets failed due to exception while flushing: (org.apache.kafka.connect.runtime.WorkerSinkTask:288)
java.lang.NullPointerException
	at scala.collection.convert.Wrappers$JListWrapper.iterator(Wrappers.scala:88)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:258)
	at scala.collection.TraversableLike$class.filter(TraversableLike.scala:270)
	at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.flush(KuduWriter.scala:179)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask$$anonfun$flush$2.apply(KuduSinkTask.scala:88)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask$$anonfun$flush$2.apply(KuduSinkTask.scala:88)
	at scala.Option.foreach(Option.scala:257)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask.flush(KuduSinkTask.scala:88)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:286)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:432)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2016-11-14 20:06:09,449] ERROR Rewinding offsets to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:289)
[2016-11-14 20:06:09,450] ERROR Commit of WorkerSinkTask{id=test.python.msg-0} offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
java.lang.NullPointerException
	at scala.collection.convert.Wrappers$JListWrapper.iterator(Wrappers.scala:88)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:258)
	at scala.collection.TraversableLike$class.filter(TraversableLike.scala:270)
	at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.flush(KuduWriter.scala:179)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask$$anonfun$flush$2.apply(KuduSinkTask.scala:88)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask$$anonfun$flush$2.apply(KuduSinkTask.scala:88)
	at scala.Option.foreach(Option.scala:257)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask.flush(KuduSinkTask.scala:88)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:286)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:432)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2016-11-14 20:06:09,451] ERROR Task test.python.msg-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found. io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
	at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:50)
	at io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource.lookUpSchemaUnderSubject(SubjectsResource.java:79)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
	at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:297)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:267)
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291)
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140)
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:334)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:808)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
	at org.eclipse.jetty.server.Server.handle(Server.java:499)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
	at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
	at java.lang.Thread.run(Thread.java:745)
; error code: 40401
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:164)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:181)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:209)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:200)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:194)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:68)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:151)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:149)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:190)
	at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2016-11-14 20:06:09,452] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2016-11-14 20:06:09,454] INFO Stopping Kudu sink. (com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask:82)
[2016-11-14 20:06:09,454] INFO Closing Kudu Session and Client (com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter:162)
[2016-11-14 20:06:09,454] ERROR Task test.python.msg-0 threw an uncaught and unrecoverable exception during shutdown (org.apache.kafka.connect.runtime.WorkerTask:123)
java.lang.NullPointerException
	at scala.collection.convert.Wrappers$JListWrapper.iterator(Wrappers.scala:88)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:258)
	at scala.collection.TraversableLike$class.filter(TraversableLike.scala:270)
	at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.flush(KuduWriter.scala:179)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduWriter.close(KuduWriter.scala:163)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask$$anonfun$stop$1.apply(KuduSinkTask.scala:83)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask$$anonfun$stop$1.apply(KuduSinkTask.scala:83)
	at scala.Option.foreach(Option.scala:257)
	at com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkTask.stop(KuduSinkTask.scala:83)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:126)
	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:121)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

kakfa-avro-console-consumer output:

root@fae10f5c7182:/confluent-3.0.1/bin# ./kafka-avro-console-consumer --zookeeper 192.168.233.3:2181 --bootstrap-server 192.168.233.3:9092 --property schema.registry.url=http://192.168.233.3:8081 --topic test.python.msg
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/confluent-3.0.1/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/confluent-3.0.1/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/confluent-3.0.1/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"id":433,"random":"foo"}
{"id":196,"random":"foo"}
^CProcessed a total of 2 messages

kudu-sink.properties file:

name=test.python.msg
connector.class=com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector
tasks.max=1
connect.kudu.export.route.query = INSERT INTO claims_test SELECT * FROM test.python.msg
connect.kudu.master=10.12.225.75:7051
topics=test.python.msg
connect.kudu.sink.error.policy=throw
connect.kudu.sink.bucket.size=3
connect.kudu.sink.schema.registry.url=http://192.168.233.3:8081

Can confirm the schema is being set properly. Also, the kudu-sink example works fine when I generate data from the cli. What would be causing an issue with processing this stream?

blockchain connector has an extra e in datamountaineer

The blockchain connector's path is

kafka-connect-blockchain/src/main/scala/com/datamountaineeer/streamreactor/connect/blockchain

Notice the extra e in datamountaineer. :)
It changes the connector's class name and is a bit difficult to catch for the user.

Casting problem

When I try to send json messages with this client;
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
data={"id": 9, "created": "2016-05-06 13:54:00", "product": "OP-DAX-C-20150201-100", "price": 86.5}
producer.send('orders-topic', data)

It throws an class cast exception

[2016-12-18 15:22:13,693] INFO Received 2 records. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:96)
[2016-12-18 15:22:13,839] ERROR There was an error inserting the records java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:143)
java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at com.datamountaineer.streamreactor.connect.schemas.ConverterUtil$class.convert(ConverterUtil.scala:59)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.convert(CassandraJsonWriter.scala:40)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$toJson(CassandraJsonWriter.scala:154)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(CassandraJsonWriter.scala:127)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125)
at com.datamountaineer.streamreactor.connect.concurrent.ExecutorExtension$RunnableWrapper$$anon$1.run(ExecutorExtension.scala:30)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-12-18 15:22:13,842] ERROR Encountered error java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:46)
[2016-12-18 15:22:13,843] ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:404)
java.lang.RuntimeException: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:67)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:48)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.handleTry(CassandraJsonWriter.scala:40)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.insert(CassandraJsonWriter.scala:144)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.write(CassandraJsonWriter.scala:104)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72)
at scala.Option.foreach(Option.scala:257)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask.put(CassandraSinkTask.scala:72)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at com.datamountaineer.streamreactor.connect.schemas.ConverterUtil$class.convert(ConverterUtil.scala:59)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.convert(CassandraJsonWriter.scala:40)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$toJson(CassandraJsonWriter.scala:154)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(CassandraJsonWriter.scala:127)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125)
at com.datamountaineer.streamreactor.connect.concurrent.ExecutorExtension$RunnableWrapper$$anon$1.run(ExecutorExtension.scala:30)
... 3 more
[2016-12-18 15:22:13,844] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:405)
[2016-12-18 15:22:13,844] INFO WorkerSinkTask{id=cassandra-sink-orders-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2016-12-18 15:22:13,851] ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:406)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

ElasticSearch authentication

Add authentication mechanisms. Two ideas:

How to connect using a username/token to elastic-search ?

Another mechanism when securing an Elastic-Search server is putting a nginx proxy in front of it.
The proxy blocks connection requests unless a special header is set - and a particular token

i.e. on the HTTP request if

'HEADER_MY_SECRET_ELASTIC_TOKEN' : 'MY_SECRET_PASSWORD'

Then nginx allows proxying the request

configuration for remote HBase cluster

Maybe a stupid question but how can I configure the HBase connector to a Zookeeper/HBase Cluster which is not running on localhost?

`[2016-12-16 14:03:42,449] INFO Process identifier=hconnection-0x25a855e1 connecting to ZooKeeper ensemble=localhost:2181 (org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper:120)

[2016-12-16 14:03:59,391] ERROR ZooKeeper exists failed after 4 attempts (org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper:277)`

I couldn't find the setting for this

Cannot start kafka-connect-elastic

Hi,

I have a problem when starting the kafka-connect-elastic sink,I get the following stack:

[2016-10-24 16:35:19,896] INFO Kafka version : 0.10.0.1-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-10-24 16:35:19,896] INFO Kafka commitId : ea5fcd28195f168b (org.apache.kafka.common.utils.AppInfoParser:84)
[2016-10-24 16:35:19,898] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:769)
[2016-10-24 16:35:19,923] INFO

    ____        __        __  ___                  __        _
   / __ \____ _/ /_____ _/  |/  /___  __  ______  / /_____ _(_)___  ___  ___  _____
  / / / / __ `/ __/ __ `/ /|_/ / __ \/ / / / __ \/ __/ __ `/ / __ \/ _ \/ _ \/ ___/
 / /_/ / /_/ / /_/ /_/ / /  / / /_/ / /_/ / / / / /_/ /_/ / / / / /  __/  __/ /
/_____/\__,_/\__/\__,_/_/  /_/\____/\__,_/_/ /_/\__/\__,_/_/_/ /_/\___/\___/_/
       ________           __  _      _____ _       __
      / ____/ /___ ______/ /_(_)____/ ___/(_)___  / /__
     / __/ / / __ `/ ___/ __/ / ___/\__ \/ / __ \/ //_/
    / /___/ / /_/ (__  ) /_/ / /__ ___/ / / / / / ,<
   /_____/_/\__,_/____/\__/_/\___//____/_/_/ /_/_/|_|


by Andrew Stevenson
       (com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask:36)
[2016-10-24 16:35:19,927] INFO ElasticSinkConfig values:
        connect.elastic.url = localhost:9200
        connect.elastic.url.prefix = elasticsearch
        connect.elastic.cluster.name = elastic-test
        connect.elastic.export.route.query = INSERT INTO kafka_test SELECT * FROM testTopic
 (com.datamountaineer.streamreactor.connect.elastic.config.ElasticSinkConfig:178)
[2016-10-24 16:35:20,211] INFO [Baron Brimstone] loaded [], sites [] (org.elasticsearch.plugins:149)

[2016-10-24 16:35:21,302] ERROR Task elastic-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: newWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function
        at org.elasticsearch.transport.netty.NettyTransport.createClientBootstrap(NettyTransport.java:349)
        at org.elasticsearch.transport.netty.NettyTransport.doStart(NettyTransport.java:277)
        at org.elasticsearch.common.component.AbstractLifecycleComponent.start(AbstractLifecycleComponent.java:68)
        at org.elasticsearch.transport.TransportService.doStart(TransportService.java:170)
        at org.elasticsearch.common.component.AbstractLifecycleComponent.start(AbstractLifecycleComponent.java:68)
        at org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:159)
        at com.sksamuel.elastic4s.ElasticClient$.transport(ElasticClient.scala:98)
        at com.datamountaineer.streamreactor.connect.elastic.ElasticWriter$.apply(ElasticWriter.scala:43)
        at com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask.start(ElasticSinkTask.scala:56)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
[2016-10-24 16:35:21,304] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2016-10-24 16:35:21,304] INFO Stopping Elastic sink. (com.datamountaineer.streamreactor.connect.elastic.ElasticSinkTask:71)

I use the kafka-connect-elastic-0.2-3.0.0-all token from github release and a standard confluent platform 3.0.1-2.11 install, with kafka-connect-cli-0.5
elastic connector is in share/java

here is my classpath:
/opt/confluent-3.0.1/share/java/confluent-common/*
/opt/confluent-3.0.1/share/java/kafka-serde-tools/*
/opt/confluent-3.0.1/share/java/monitoring-interceptors/*
/opt/confluent-3.0.1/share/java/kafka-connect-elastic-datamountainer/*
/opt/confluent-3.0.1/bin/../share/java/kafka/*
/opt/confluent-3.0.1/bin/../share/java/confluent-support-metrics/*
/usr/share/java/confluent-support-metrics/*

Is there something wrong on my env or is the package on github invalid?

Escape table names

The sink should be able to support table names which are reserved words or even containing whitespaces

Influxdb Sink Retention policy

Hi guys, I got retention policy error when writing avro messages to kafka.
Influx version:1.1.1

CLI output;
warning: changed name=influxdb-sink into name=influx-sink
#Connector influx-sink:
topics=influx-topic
name=influx-sink
connect.influx.connection.user=root
connect.influx.connection.database=mydb
tasks.max=1
connector.class=com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector
connect.influx.sink.route.query=INSERT INTO influxMeasure SELECT * FROM influx-topic WITHTIMESTAMP sys_time()
connect.influx.connection.url=http://remote:8086
connect.influx.connection.password=root
connect.influx.retention.policy=autogen
#task ids: 0

[2016-12-26 11:54:39,250] INFO Received 2 record(-s) (com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask:81)
[2016-12-26 11:54:39,327] INFO Writing ${batchPoints.getPoints.size()} to the database... (com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter:45)
[2016-12-26 11:54:39,777] ERROR Encountered error {"error":"retention policy not found: default"}
(com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter:46)
[2016-12-26 11:54:39,785] ERROR Task influx-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:401)
java.lang.RuntimeException: java.lang.RuntimeException: {"error":"retention policy not found: default"}

at com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:67)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:48)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter.handleTry(InfluxDbWriter.scala:30)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter.write(InfluxDbWriter.scala:48)
at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask$$anonfun$put$2.apply(InfluxSinkTask.scala:82)
at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask$$anonfun$put$2.apply(InfluxSinkTask.scala:82)
at scala.Option.foreach(Option.scala:257)
at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask.put(InfluxSinkTask.scala:82)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:381)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: {"error":"retention policy not found: default"}

at org.influxdb.impl.InfluxDBErrorHandler.handleError(InfluxDBErrorHandler.java:19)
at retrofit.RestAdapter$RestHandler.invoke(RestAdapter.java:242)
at org.influxdb.impl.$Proxy41.writePoints(Unknown Source)
at org.influxdb.impl.InfluxDBImpl.write(InfluxDBImpl.java:151)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter$$anonfun$1.apply$mcV$sp(InfluxDbWriter.scala:46)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter$$anonfun$1.apply(InfluxDbWriter.scala:46)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter$$anonfun$1.apply(InfluxDbWriter.scala:46)
at scala.util.Try$.apply(Try.scala:192)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter.write(InfluxDbWriter.scala:46)
... 15 more

Cassandra sink - resiliency

Seeing this error and failing to recover/restart.

The cause is most likely due to bad messages coming into kafka from the source - which could be difficult to check for every message.

Is there any thought/likelihood-if-so to build some resiliency e.g. w/ a -k flag to keep continuing inspite of errors and maintain such error counts in a management/oamp interface?

(or if there is a workaround to restart and continue?)

org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.lang.ArrayIndexOutOfBoundsException: -35
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
2016-11-27 00:54:34,684] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2016-11-27 00:54:34,684] INFO Stopping Cassandra sink. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:95)
[2016-11-27 00:54:34,685] INFO Shutting down Cassandra driver session and cluster. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:166)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.