lucidfrontier45 / elasticsearchsink2 Goto Github PK
View Code? Open in Web Editor NEWFlume-NG Sink for Elasticsarch >= 2.0
License: Apache License 2.0
Flume-NG Sink for Elasticsarch >= 2.0
License: Apache License 2.0
Hey,
First of all thank you for creating this. If i can get it working it will be a great help. The issue I have on restarting flume is this:
2:25:55.970 PM ERROR org.apache.flume.node.AbstractConfigurationProvider
Sink enrichedEpoESSink has been removed due to an error during configuration
java.lang.RuntimeException: java.lang.ClassNotFoundException: TimeBasedIndexNameBuilder
at com.google.common.base.Throwables.propagate(Throwables.java:156)
at com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink.configure(ElasticSearchSink.java:309)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.ClassNotFoundException: TimeBasedIndexNameBuilder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:190)
at com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink.configure(ElasticSearchSink.java:303)
... 11 more
Any ideas what I'm doing wrong please?
Thx a lot for addressing the compatibility issue between ElasticSearch 2.0.0 and Flume!
However I have an issue creating the jar files using gradlew (I attached the logs
buildLogs.txt)
Would you be by any chance familiar with this issue ?
(sorry to open this as an issue as it is likely not linked to your dev but to a config error on my part, but I'm pulling my hairs over it, and and I can't seem to find a solution)
Hello
Is there a chance that we can make this work with 1.5.x flume? Its specified that flume => 1.6. Can you please elaborate what might break?
write data:
{
"action_type": "media",
"test_double": "2131312312",
"media_type": "mobile",
"created_timestamp": "2016-09-07 09:42:57",
"is_incr": "yes",
"browser_lang": "OTHER",
"device_mobile": "15559850392",
"device_os_ver": "Unknown",
"media_code": "bj001",
"test_long": 12313123,
"browser_ver": "sss",
"loc_ip": "61.233.69.82",
"device_mac": "00:70:A4:00:00:5C",
"browser": "Downloading Tool",
"browser_core": "OTHER",
"adid": "123126",
"device_os": "UNKNOWN",
"space_code": "index"
}
result in es 2.3.3:
{
"_index": "portal-ad",
"_type": "log22",
"_id": "AVdB84VbxE8oIkj-6i9D",
"_version": 1,
"_score": 1,
"_source": {
"body": {
"action_type": "media",
"test_double": "2131312312",
"media_type": "mobile",
"created_timestamp": "2016-09-07 09:42:57",
"is_incr": "yes",
"browser_lang": "OTHER",
"device_mobile": "15559850392",
"device_os_ver": "Unknown",
"media_code": "bj001",
"test_long": 12313123,
"browser_ver": "sss",
"loc_ip": "61.233.69.82",
"device_mac": "00:70:A4:00:00:5C",
"browser": "Downloading Tool",
"browser_core": "OTHER",
"adid": "123126",
"device_os": "UNKNOWN",
"space_code": "index"
}
}
}
I am trying with flume 1.6 with elastic search 2.3. Everything is working gradle perspective. But while executing the flume, it is producing the serializer error.
java.lang.IllegalArgumentException: org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer is not an ElasticSearchEventSerializer
at com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink.configure(ElasticSearchSink.java:278)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2016-05-20 09:36:25,003 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:427)] Sink k1 has been removed due to an error during configuration
java.lang.IllegalArgumentException: org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer is not an ElasticSearchEventSerializer
at com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink.configure(ElasticSearchSink.java:278)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
24 Jun 2016 16:36:16,138 ERROR lifecycleSupervisor-1-1 - Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@161dbb35 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.IncompatibleClassChangeError: Found interface org.elasticsearch.common.settings.Settings, but class was expected
at com.frontier45.flume.sink.elasticsearch2.client.ElasticSearchTransportClient.openClient(ElasticSearchTransportClient.java:195)
at com.frontier45.flume.sink.elasticsearch2.client.ElasticSearchTransportClient.(ElasticSearchTransportClient.java:68)
at com.frontier45.flume.sink.elasticsearch2.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:43)
at com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink.start(ElasticSearchSink.java:339)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
24 Jun 2016 16:36:16,142 INFO lifecycleSupervisor-1-1 - ElasticSearch sink {} stopping
I got this error,
a1.sinks = k1
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
a1.sinks.k1.hostNames = 10.182.139.227:9300
a1.sinks.k1.indexName = cpic_log
a1.sinks.k1.indexType = flume_kafka
a1.sinks.k1.clusterName = logCollection
a1.sinks.k1.batchSize = 4000
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
this my conf
any idea why i got error ? thank you.
Elasticsarch should be Elasticsearch
please test it for ES2.3.3.
16/06/14 13:17:37 ERROR elasticsearch2.ElasticSearchSink: Failed to commit transaction. Transaction rolled back.
NoNodeAvailableException[None of the configured nodes are available: []]
at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:280)
at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:197)
at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55)
at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:272)
at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:347)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85)
at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59)
at com.frontier45.flume.sink.elasticsearch2.client.ElasticSearchTransportClient.execute(ElasticSearchTransportClient.java:178)
at com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink.process(ElasticSearchSink.java:187)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
this is my flume.conf:
`# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.chiwei.filemonitor.FileMonitorSource
a1.sources.r1.file=/usr/local/tomcat6/logs/catalina.out
a1.sources.r1.positionDir = /home/flume
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
a1.sinks.k1.hostNames = 192.168.3.111:9300
a1.sinks.k1.indexName = basis_log_info
a1.sinks.k1.indexType = logs
a1.sinks.k1.clusterName = elastic-nginx
a1.sinks.k1.batchSize = 100
a1.sinks.k1.timeZone = Asia/Shanghai
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder
a1.sinks.k1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 80
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1`
the error log shows:
[ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@269c7f70 counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchFieldError: LUCENE_5_3_1
the elasticsearch 2.2.1 uses lucene 5.4.1, how to fix it. thx
Hi,
i'm using elasticsearch 2.4.0, cloudera-manager 5.8.0
I copied elasticsearch-2.4.0.jar and lucene-core-5.5.2.jar to flume plugins.
I changed some flume jars for elastic(i saw errors about it)
t-digest-3.0.jar
jsr166e-1.1.0.jar
guava-18.0.jar
And I put elasticsearch-sink2-1.0.jar to /var/lib/flume-ng/plugins.d/elasticsearch/lib/
Now, I saw this error after adding some strings to file
2016-09-12 21:14:06,201 ERROR org.apache.flume.SinkRunner: Unable to deliver event. Exception follows. java.lang.NullPointerException at com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink.process(ElasticSearchSink.java:171) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:74
Example config:
tier1.sources = tail
tier1.channels = memoryChannel
tier1.channels.memoryChannel.type = memory
tier1.sources.tail.channels = memoryChannel
tier1.sources.tail.type = exec
tier1.sources.tail.command = tail -F /tmp/es_log.log
tier1.sources.tail.interceptors=i1 i2 i3
tier1.sources.tail.interceptors.i1.type=regex_extractor
tier1.sources.tail.interceptors.i1.regex = (\w.):(\w.):(\w.*)\s
tier1.sources.tail.interceptors.i1.serializers = s1 s2 s3
tier1.sources.tail.interceptors.i1.serializers.s1.name = source
tier1.sources.tail.interceptors.i1.serializers.s2.name = type
tier1.sources.tail.interceptors.i1.serializers.s3.name = src_path
tier1.sources.tail.interceptors.i2.type=org.apache.flume.interceptor.TimestampInterceptor$Builder
tier1.sources.tail.interceptors.i3.type=org.apache.flume.interceptor.HostInterceptor$Builder
tier1.sources.tail.interceptors.i3.hostHeader = host
tier1.sinks = elasticsearch
tier1.sinks.elasticsearch.channel = memoryChannel
tier1.sinks.elasticsearch.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink
tier1.sinks.elasticsearch.batchSize=100
tier1.sinks.elasticsearch.hostNames = ip:9300
tier1.sinks.elasticsearch.indexName = logstash
tier1.sinks.elasticsearch.clusterName = logsearch
tier1.sinks.elasticsearch.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchLogStashEventSerializer
Can you help me with this problem?
Hello,
as I read the major version of the TransportClient must be the as the major version of Elasticsearch. I am highly interested to use this sink together with ES 5.0. Will there be adaptions made to this new major verison of Elasticsearch?
Many Thanks.
Regards
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.