eneco / kafka-connect-twitter Goto Github PK
View Code? Open in Web Editor NEWKafka Connect Sink/Source for Twitter
License: Apache License 2.0
Kafka Connect Sink/Source for Twitter
License: Apache License 2.0
It is now hardcoded
Hello All,
The Jar file "kafka-connect-twitter-0.1-jar-with-dependencies.jar" is nolonger available in the git.
is there another place i must download that from?
regards
[rmoff@confluent-01-node-01 kafka-connect-twitter]$ mvn clean package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka-connect-twitter 0.1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ kafka-connect-twitter ---
[INFO] Deleting /home/rmoff/kafka-connect-twitter/target
[INFO]
[INFO] --- scala-maven-plugin:3.2.0:add-source (default) @ kafka-connect-twitter ---
[INFO] Add Source directory: /home/rmoff/kafka-connect-twitter/src/main/scala
[INFO] Add Test Source directory: /home/rmoff/kafka-connect-twitter/src/test/scala
[INFO]
[INFO] --- maven-resources-plugin:2.7:resources (default-resources) @ kafka-connect-twitter ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ kafka-connect-twitter ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- scala-maven-plugin:3.2.0:compile (default) @ kafka-connect-twitter ---
[WARNING] Expected all dependencies to require Scala version: 2.11.7
[WARNING] com.eneco.trading:kafka-connect-twitter:0.1 requires scala version: 2.11.7
[WARNING] org.scalamock:scalamock-scalatest-support_2.11:3.2.2 requires scala version: 2.11.5
[WARNING] Multiple versions of scala libraries detected!
[INFO] Using incremental compilation
[INFO] Compiling 12 Scala sources to /home/rmoff/kafka-connect-twitter/target/classes...
[ERROR] /home/rmoff/kafka-connect-twitter/src/main/scala/com/eneco/trading/kafka/connect/twitter/domain/TwitterStatus.scala:5: object time is not a member of package java
[ERROR] import java.time.format.DateTimeFormatter
[ERROR] ^
[ERROR] /home/rmoff/kafka-connect-twitter/src/main/scala/com/eneco/trading/kafka/connect/twitter/domain/TwitterStatus.scala:4: object time is not a member of package java
[ERROR] import java.time.{LocalDate, ZoneOffset}
[ERROR] ^
[ERROR] two errors found
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 12.799 s
[INFO] Finished at: 2016-07-18T16:45:03+01:00
[INFO] Final Memory: 41M/367M
From Googling, it seems this could be a java version issue, but I'm using 1.8:
$ java -version
java version "1.8.0_77"
Java(TM) SE Runtime Environment (build 1.8.0_77-b03)
Java HotSpot(TM) 64-Bit Server VM (build 25.77-b03, mixed mode)
Any suggestions? thanks.
Thanks @crcastle for reporting.
See https://gist.github.com/crcastle/6f78b025d29840c626d9ba995a92850e
Sep 27 08:02:36 twitter-ingest app/web.1: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field
Sep 27 08:02:36 twitter-ingest app/web.1: at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:210)
Sep 27 08:02:36 twitter-ingest app/web.1: at org.apache.kafka.connect.data.Struct.put(Struct.java:215)
Sep 27 08:02:36 twitter-ingest app/web.1: at org.apache.kafka.connect.data.Struct.put(Struct.java:204)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.domain.Entities$$anonfun$struct$4.apply(TwitterStatus.scala:55)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.domain.Entities$$anonfun$struct$4.apply(TwitterStatus.scala:52)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.AbstractTraversable.map(Traversable.scala:104)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.domain.Entities$.struct(TwitterStatus.scala:52)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.domain.TwitterStatus$.struct(TwitterStatus.scala:103)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.StatusToTwitterStatusStructure$.convert(TwitterStatusReader.scala:57)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.TwitterStatusReader$$anonfun$poll$1.apply(TwitterStatusReader.scala:96)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.TwitterStatusReader$$anonfun$poll$1.apply(TwitterStatusReader.scala:96)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.Iterator$class.foreach(Iterator.scala:742)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
Sep 27 08:02:36 twitter-ingest app/web.1: at scala.collection.AbstractTraversable.map(Traversable.scala:104)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.TwitterStatusReader.poll(TwitterStatusReader.scala:96)
Sep 27 08:02:36 twitter-ingest app/web.1: at com.eneco.trading.kafka.connect.twitter.TwitterSourceTask.poll(TwitterSourceTask.scala:15)
Sep 27 08:02:36 twitter-ingest app/web.1: at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
Sep 27 08:02:36 twitter-ingest app/web.1: at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
Sep 27 08:02:36 twitter-ingest app/web.1: at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
Sep 27 08:02:36 twitter-ingest app/web.1: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Sep 27 08:02:36 twitter-ingest app/web.1: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Sep 27 08:02:36 twitter-ingest app/web.1: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
Sep 27 08:02:36 twitter-ingest app/web.1: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
Sep 27 08:02:36 twitter-ingest app/web.1: at java.lang.Thread.run(Thread.java:745)
Sep 27 08:02:36 twitter-ingest app/web.1: [pool-2-thread-1] ERROR org.apache.kafka.connect.runtime.WorkerTask - Task is being killed and will not recover until manually restarted
Not Date.toString
Hi,
I am trying to create a source connector using the below-listed command and configurations.
curl -X POST
http://localhost:8091/connectors
-H 'Content-Type: application/json'
-H 'Accept: application/json'
-d '{
"name": "kafka-connect-twitter",
"config": {
"connector.class": "com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector",
"tasks.max": "1",
"topic": "fooTweets",
"twitter.consumerkey": "xxxxxxxxxxxxxx",
"twitter.consumersecret": "xxxxxxxxxxxx",
"twitter.token": "xxxxxxxxxxx",
"twitter.secret": "xxxxxxxxxxx",
"track.terms": "fooTerm"
}
}'
However, I am getting an error: {"error_code":500,"message":"Must configure one of topics or topics.regex"}
For some reason, I am unable to specify the topic name in the 'topic' field.
To make things more difficult for myself, I specified the topic name in 'topics' field and was left scratching my head why the topic being used was called 'tweets', realized now that it's the default value of topic name in the code.
Any help/advise would be appreciated.
Thanks
The following are lists but mentioned on README.md as comma separated strings i.e.
"location1,location2"
.define(TRACK_TERMS, Type.LIST, EMPTY_VALUE, Importance.MEDIUM, TRACK_TERMS_DOC)
.define(TRACK_FOLLOW, Type.LIST, EMPTY_VALUE, Importance.MEDIUM, TRACK_FOLLOW_DOC)
.define(TRACK_LOCATIONS, Type.LIST, EMPTY_VALUE, Importance.MEDIUM, TRACK_LOCATIONS_DOC)
I got this result :
`SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/java/kafka/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/java/kafka-connect-hdfs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[2017-11-09 16:51:30,790] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-11-09 16:51:30,794] INFO Added plugin 'io.confluent.connect.jdbc.JdbcSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,794] INFO Added plugin 'io.confluent.connect.s3.S3SinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,795] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,795] INFO Added plugin 'io.confluent.connect.hdfs.HdfsSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,795] INFO Added plugin 'io.confluent.connect.hdfs.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,795] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'io.confluent.connect.replicator.ReplicatorSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,796] INFO Added plugin 'io.confluent.connect.storage.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,798] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,799] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,799] INFO Added plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,799] INFO Added plugin 'io.confluent.connect.replicator.util.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,799] INFO Added plugin 'io.confluent.connect.avro.AvroConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,799] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,799] INFO Added plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,800] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,800] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,800] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,801] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,801] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,801] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,802] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,803] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,803] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,803] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,803] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,803] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-09 16:51:30,808] INFO Added aliases 'ElasticsearchSinkConnector' and 'ElasticsearchSink' to plugin 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,811] INFO Added aliases 'HdfsSinkConnector' and 'HdfsSink' to plugin 'io.confluent.connect.hdfs.HdfsSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,812] INFO Added aliases 'JdbcSinkConnector' and 'JdbcSink' to plugin 'io.confluent.connect.jdbc.JdbcSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,814] INFO Added aliases 'JdbcSourceConnector' and 'JdbcSource' to plugin 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,815] INFO Added aliases 'ReplicatorSourceConnector' and 'ReplicatorSource' to plugin 'io.confluent.connect.replicator.ReplicatorSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,815] INFO Added aliases 'S3SinkConnector' and 'S3Sink' to plugin 'io.confluent.connect.s3.S3SinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,816] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,817] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,817] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,819] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,819] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,819] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,819] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,820] INFO Added aliases 'AvroConverter' and 'Avro' to plugin 'io.confluent.connect.avro.AvroConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,820] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,820] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-11-09 16:51:30,824] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-11-09 16:51:30,825] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-11-09 16:51:30,825] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-11-09 16:51:30,854] INFO StandaloneConfig values:
access.control.allow.methods =
access.control.allow.origin =
bootstrap.servers = [localhost:9092]
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class io.confluent.connect.avro.AvroConverter
offset.flush.interval.ms = 10000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = /tmp/connect.offsets
plugin.path = null
rest.advertised.host.name = null
rest.advertised.port = null
rest.host.name = null
rest.port = 8083
task.shutdown.graceful.timeout.ms = 5000
value.converter = class io.confluent.connect.avro.AvroConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:223)
[2017-11-09 16:51:31,066] INFO Logging initialized @15369ms (org.eclipse.jetty.util.log:186)
[2017-11-09 16:51:31,414] INFO AvroConverterConfig values:
schema.registry.url = [http://localhost:8081]
max.schemas.per.subject = 1000
(io.confluent.connect.avro.AvroConverterConfig:170)
[2017-11-09 16:51:31,671] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:170)
[2017-11-09 16:51:31,672] INFO AvroConverterConfig values:
schema.registry.url = [http://localhost:8081]
max.schemas.per.subject = 1000
(io.confluent.connect.avro.AvroConverterConfig:170)
[2017-11-09 16:51:31,673] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:170)
[2017-11-09 16:51:31,686] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:49)
[2017-11-09 16:51:31,686] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2017-11-09 16:51:31,687] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:144)
[2017-11-09 16:51:31,688] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:59)
[2017-11-09 16:51:31,690] INFO Worker started (org.apache.kafka.connect.runtime.Worker:149)
[2017-11-09 16:51:31,691] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-11-09 16:51:31,691] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-11-09 16:51:31,805] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Nov 09, 2017 4:51:32 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
[2017-11-09 16:51:32,387] INFO Started o.e.j.s.ServletContextHandler@785b53b0{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-11-09 16:51:32,401] INFO Started ServerConnector@49be5654{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-11-09 16:51:32,401] INFO Started @16704ms (org.eclipse.jetty.server.Server:379)
[2017-11-09 16:51:32,403] INFO REST server listening at http://127.0.1.1:8083/, advertising URL http://127.0.1.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-11-09 16:51:32,403] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2017-11-09 16:51:32,405] ERROR Failed to create job for twitter-source.properties (org.apache.kafka.connect.cli.ConnectStandalone:89)
[2017-11-09 16:51:32,406] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.hdfs.HdfsSinkConnector, name='io.confluent.connect.hdfs.HdfsSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.hdfs.tools.SchemaSourceConnector, name='io.confluent.connect.hdfs.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='3.3.0', encodedVersion=3.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.confluent.connect.replicator.ReplicatorSourceConnector, name='io.confluent.connect.replicator.ReplicatorSourceConnector', version='3.3.0', encodedVersion=3.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:97)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.hdfs.HdfsSinkConnector, name='io.confluent.connect.hdfs.HdfsSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.hdfs.tools.SchemaSourceConnector, name='io.confluent.connect.hdfs.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='3.3.0', encodedVersion=3.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.confluent.connect.replicator.ReplicatorSourceConnector, name='io.confluent.connect.replicator.ReplicatorSourceConnector', version='3.3.0', encodedVersion=3.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}
at org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:161)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:341)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:240)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:157)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)
[2017-11-09 16:51:32,408] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2017-11-09 16:51:32,408] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2017-11-09 16:51:32,419] INFO Stopped ServerConnector@49be5654{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2017-11-09 16:51:32,427] INFO Stopped o.e.j.s.ServletContextHandler@785b53b0{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2017-11-09 16:51:32,431] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:165)
[2017-11-09 16:51:32,431] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:76)
[2017-11-09 16:51:32,431] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:156)
[2017-11-09 16:51:32,432] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:67)
[2017-11-09 16:51:32,432] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:176)
[2017-11-09 16:51:32,432] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:86)
[2017-11-09 16:51:32,432] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:70)
`
currently, only the compat-mode field text
with 140 chars is returned, not the extended-mode full_text
with 280 chars.
Retrieving the full text by default or perhaps even configurable would be great.
The feature in the official twitter docs: https://developer.twitter.com/en/docs/tweets/tweet-updates
It got out of sync ๐ณ
I followed the setup instruction carefully, but for some reason unable to start the twitter-source connector. I get the following error:
Failed to start connector twitter-source java.lang.NoSuchMethodError:
My $CLASSPATH is set to ...
/Users/Warsame/git/kafka-connect-twitter/target/kafka-connect-twitter-0.1-jar-with-dependencies.jar
Apologies if this is the wrong place to direct my question, but I'm wondering if anyone has tried using this connector and an SSL connection to the brokers? I think this is a Connect framework issue, but thought I'd pose the question here to see if anyone has had experience with this.
I've specified my SSL properties in the connect properties file, but on startup the logs says the truststore and keystore location and passwords are null
, and security.protocol
is PLAINTEXT
instead of SSL
. My full properties files and startup log are here if they're useful.
I'm starting the process up as explained in the README here.
$CONFLUENT_HOME/bin/connect-standalone confluent.properties twitter-simple-source.properties
Hi
Maybe it would be possible to add support for 'sample' Twitter streams: https://dev.twitter.com/streaming/reference/get/statuses/sample ?
Do not infinitely loop at least.
In the latest version, the twitter source connector has empty configuration. unable to create the connector as it does not accept any configuration parameters.
Had to switch to previous version to use the connector.
I have successfully used this connector to read a twitter stream on a virtual machine based on the Vagrant image vivid64.
When I use wily or xenial builds I keep getting errors from the twitter api as shown below.
Since I am using ansible playbooks to set the machines up and the only thing I changed is the Vagrant base image I am failry certain that everything else is equal.
I can telnet into port 443 on stream.twitter.com just fine, so should not be a network issue.
[hosebird-client-io-thread-0] INFO com.twitter.hbc.httpclient.ClientBase - KafkaConnectTwitterSource Establishing a connection
[hosebird-client-io-thread-0] WARN com.twitter.hbc.httpclient.ClientBase - KafkaConnectTwitterSource IOException caught when establishing connection to https://stream.twitter.com/1.1/statuses/filter.json?delimited=length&language=en
[hosebird-client-io-thread-0] WARN com.twitter.hbc.httpclient.ClientBase - KafkaConnectTwitterSource failed to establish connection properly
[hosebird-client-io-thread-0] INFO com.twitter.hbc.httpclient.ClientBase - KafkaConnectTwitterSource Done processing, preparing to close connection
I build kafka-connect-twitter project and included it to classpath for kafka-connect, during launch I get this error:
java.lang.NoSuchMethodError: org.apache.kafka.common.config.ConfigDef.define(Ljava/lang/String;Lorg/apache/kafka/common/config/ConfigDef$Type;Lorg/apache/kafka/common/config/ConfigDef$Importance;Ljava/lang/String;Ljava/lang/String;ILorg/apache/kafka/common/config/ConfigDef$Width;Ljava/lang/String;)Lorg/apache/kafka/common/config/ConfigDef;
I suspect this error related to incompatibility between confluent v2 and v3, is there any plans to migrate to newer version?
I'am trying to run connect-standalone command, i'am getting this error. WARN Connection to node -1 could not be established. Broker may not be available. while kafka and zookeeper are running.
def poll() : util.List[SourceRecord] = {
if (client.isDone) log.warn("Client connection closed unexpectedly: ", client.getExitEvent.getMessage) //TODO: what next?
I want to create a spark consumer and get these tweets via kafka stream and avro decoder.
The problem is how to convert the RDD[GenericRecord] i get to dataframes ?
Do you have any idea please ?
Thank you for sharing this code. I was able to successfully implement this before, but it no longer works after upgrading to confluent ver 3.2.2
I have not been able to start the kafka-connect-twitter
I've done maven clean build and updated the dependencies for both confluent 3.2.2 and kafaka to 10.2.1. I'm not sure what other changes I need to make to the pom file.
Please help me identify what the issue could be. Your assistance I very much appreciated.
As far as I understand, the same functionality is already provided by the producer's batch.size
and linger.ms
.
Hello,
Thank you for this code, I successfully connected to twitter and received Avro format messages.
I am trying to add to the TweeterStatus information about the original tweet so I added these bold lines to the object TwitterStatus.
`Object TwitterStatus {
def asIso8601String(d:Date) = {
val tz = TimeZone.getTimeZone("UTC")
val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
df.setTimeZone(tz)
df.format(if (d == null) { new Date() } else { d })
}
def struct(s: twitter4j.Status) =
new Struct(schema)
.put("id", s.getId)
.put("created_at", asIso8601String(s.getCreatedAt))
.put("user", TwitterUser.struct(s.getUser))
.put("text", s.getText)
.put("lang", s.getLang)
.put("is_retweet", s.isRetweet)
.put("retweeted_status", new Struct(schema)
.put("original_tweet_id", s.getRetweetedStatus.getId)
.put("original_user_id", s.getRetweetedStatus.getUser.getId))
.put("entities", Entities.struct(s))
val schema = SchemaBuilder.struct().name("com.eneco.trading.kafka.connect.twitter.Tweet")
.field("id", Schema.INT64_SCHEMA)
.field("created_at", Schema.OPTIONAL_STRING_SCHEMA)
.field("user", TwitterUser.schema)
.field("text", Schema.OPTIONAL_STRING_SCHEMA)
.field("lang", Schema.OPTIONAL_STRING_SCHEMA)
.field("is_retweet", Schema.BOOLEAN_SCHEMA)
.field("retweeted_status",
SchemaBuilder.struct.name("retweeted_status")
.field("original_tweet_id", Schema.OPTIONAL_INT64_SCHEMA).optional()
.field("original_user_id", Schema.OPTIONAL_INT64_SCHEMA).optional().build())
.field("entities", Entities.schema)
.build()
}`
Unfortunately, after various tries I still get an error of NullPointerException.
Would you have any tips about adding information to your schema?
Thanks a lot.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.