Git Product home page Git Product logo

kafka-spark-consumer's People

Contributors

dependabot[bot] avatar dibbhatt avatar jedisct1 avatar spring1843 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-spark-consumer's Issues

Single message to kafka not processed in some cases

I sent a single message to kafka topic with 3 partitions. It landed in partition 0. It looks like there is a chance in PartitionManager that _lastEnquedOffset == _lastComittedOffset, and with a single message in _dataBuffer.

        if ((_lastEnquedOffset > _lastComittedOffset)
                && (_waitingToEmit.isEmpty())) {

basically will wait for another message to come in to this partition before flushing out the last message. I had to change this condition to

        if ((_lastEnquedOffset >= _lastComittedOffset)
                && (_waitingToEmit.isEmpty())) {

to avoid this issue.

Released latest version 1.0.8 of the Receiver

Released latest version 1.0.8 of Receiver based Kafka Consumer for Spark Streaming.

Receiver is compatible with Kafka versions 0.8.x, 0.9.x and 0.10.x and All Spark Versions

Available at Spark Packages : https://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Salient Features :

  • End to End No Data Loss without Write Ahead Log
  • ZK Based offset management for both consumed and processed offset
  • No dependency on WAL and Checkpoint for recovery on Driver failure
  • In-built PID Controller for Rate Limiting and Backpressure management
  • Custom Message Interceptor

Please refer to https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.md for more details

No Data reciver Apache kafka apache spark

hi i'am using apache kafka and apache spark 1.3 and i test this code
def main(args: Array[String]) {
val ctx = new SparkContext("local[4]", "Spark Streamin")
val kafkaParams = Map("metadata.broker.list" -> "127.0.1:2182")
val topics = Set("page_visits")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
kafkaStream.foreachRDD(rdd => print())
ssc.start()
ssc.awaitTermination()
}
and my tomic its work when i start java programme too produce and consume data but in streaming no reciver data
Help pleas!

Build Failure

I cloned the latest version and did a mvn install and it fails to build, few test cases are failing i think.
Here's the mvn stack:

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 14.397s
[INFO] Finished at: Wed Feb 11 11:39:19 UTC 2015
[INFO] Final Memory: 27M/290M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.10:test (default-test) on project kafka-spark-consumer: There are test failures.
[ERROR] 
[ERROR] Please refer to /root/akhld/kafka-spark-consumer/target/surefire-reports for the individual test results.
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

And part of these /root/akhld/kafka-spark-consumer/target/surefire-reports/TEST-consumer.kafka.KafkaServerTest.xml and /root/akhld/kafka-spark-consumer/target/surefire-reports/TEST-consumer.kafka.ZKServerTest.xml file says:

<testcase time="1.112" classname="consumer.kafka.KafkaServerTest" name="sendMessageAndAssertValueForOffset">
    <error message="org.apache.zookeeper.ZooKeeper.&lt;init&gt;(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V" type="java.lang.NoSuchMethodError">java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.&lt;init&gt;(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
    at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:169)
    at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94)
    at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55)
    at org.apache.curator.ConnectionState.reset(ConnectionState.java:219)
    at org.apache.curator.ConnectionState.start(ConnectionState.java:103)
    at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188)
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234)
    at consumer.kafka.ZkState.&lt;init&gt;(ZkState.java:67)
    at consumer.kafka.KafkaServerTest.setUp(KafkaServerTest.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.apache.maven.surefire.junit4.JUnit4TestSet.execute(JUnit4TestSet.java:53)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:123)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:164)
    at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:110)
    at org.apache.maven.surefire.booter.SurefireStarter.invokeProvider(SurefireStarter.java:175)
    at org.apache.maven.surefire.booter.SurefireStarter.runSuitesInProcessWhenForked(SurefireStarter.java:107)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:68)
</error>

This is my mvn version:

Apache Maven 3.0.4
Maven home: /usr/share/maven
Java version: 1.7.0_75, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-7-openjdk-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.16.0-0.bpo.4-amd64", arch: "amd64", family: "unix"

app client consumer doesn´t consume offsets from kafka topic

I´m submitting java jobs to yarn by use of yarn-client mode in a single node.

The job has a simple logic:

  • readStreaming from a kafka topic with spark streaming
  • making some logic
  • persisting data in other kafka topic

I send the jar file from console to jobserver (works as a proxy with yarn) and jobserver send the job to yarn working as yarn-client.

All is in the same host but working as docker containers:
Yarn is inside a docker (cloudera docker)
Jobserver is inside other docker

But I´ve a weird behaviour with Kafka consumer because messages are sent, committed but not processed.

I tried with KafkaUtils createStream and also got the same issue but not with KafkaUtils createDirectStream which runs right.

I´d need to control the offsets read to know from which one start to read so I have to use kafka consumer...

I found the issue when tried to check spark jobserver context...Inside the job logic, If I stopped the current context and create a new one in spark local mode, it works fine, but it´s not the desired behaviour because two context are created and the second one doesn´t run in yarn...

I know it´s a specific case but perhaps You have found a similar issue and could help me with it because I saw the log trace and there´s no error consuming offsets, it shows that messages are committed but never read...

The current fw versions are:
kafka spark consumer 1.0.8
spark streaming 1.6.0
kafka 0.10.0 (tried also 0.8.2.1)
jobserver 0.6.1
jdk 1.8

Let me know if you need any other info.

Thanks

PartitionManager not writing Offset to Zookeeper if a record fetched from Kafka only contain one line.

PartitionManager does not write Offset to zookeeper in some condition
duplicated data will be fetched if program restart without writing offset to zookeeper

How to reproduce:
Fetch one line at a time from Kafka

version
kafka-spark-consumer : 1.0.6
spark-streaming_2.10 : 1.6.0

here is the log immediately after deploying the Spark program to YARN
notice that offset 6 is already duplicated
and PartitonManager cannot write offset 6/7 to zookeeper due to

if (_lastEnquedOffset > _lastComittedOffset) {

17/01/19 10:22:30 INFO kafka.PartitionManager: Read last commit offset from zookeeper: 6; old topology_id: test_test_kpi_reg_user - new consumer_id: test_test_kpi_reg_user
17/01/19 10:22:30 INFO kafka.PartitionManager: Starting Kafka cdh005.testdev.local:0 from offset 6
17/01/19 10:22:30 INFO kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local:0 there in internal buffers
17/01/19 10:22:30 INFO kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is : 6
17/01/19 10:22:31 INFO kafka.PartitionManager: LastComitted Offset : 6
17/01/19 10:22:31 INFO kafka.PartitionManager: New Emitted Offset : 7
17/01/19 10:22:31 INFO kafka.PartitionManager: Enqueued Offset :6
17/01/19 10:22:31 INFO kafka.PartitionManager: Last Enqueued offset 6 not incremented since previous Comitted Offset 6 for partition  Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user. Some issue in Process!!

produce another record -> Write succeed

17/01/19 10:57:30 INFO kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local:0 there in internal buffers
17/01/19 10:57:30 INFO kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is : 7
17/01/19 10:57:33 INFO kafka.PartitionManager: LastComitted Offset : 6
17/01/19 10:57:33 INFO kafka.PartitionManager: New Emitted Offset : 8
17/01/19 10:57:33 INFO kafka.PartitionManager: Enqueued Offset :7
17/01/19 10:57:33 INFO kafka.PartitionManager: Committing offset for Partition{host=cdh005.testdev.local:9092, partition=0}
17/01/19 10:57:33 INFO kafka.PartitionManager: Wrote committed offset to ZK: 8
17/01/19 10:57:33 INFO kafka.PartitionManager: Committed offset 8 for Partition{host=cdh005.testdev.local:9092, partition=0} for consumer: test_test_kpi_reg_user

produce another record -> write failed again

17/01/19 10:58:40 INFO kafka.PartitionManager: Total 1 messages from Kafka: cdh005.testdev.local:0 there in internal buffers
17/01/19 10:58:40 INFO kafka.PartitionManager: Store for topic topic_tid_9999_dev for partition 0 is : 8
17/01/19 10:58:42 INFO kafka.PartitionManager: LastComitted Offset : 8
17/01/19 10:58:42 INFO kafka.PartitionManager: New Emitted Offset : 9
17/01/19 10:58:42 INFO kafka.PartitionManager: Enqueued Offset :8
17/01/19 10:58:42 INFO kafka.PartitionManager: Last Enqueued offset 8 not incremented since previous Comitted Offset 8 for partition  Partition{host=cdh005.testdev.local:9092, partition=0} for Consumer test_test_kpi_reg_user. Some issue in Process!!

it seems that this issue can be fix by saving processed offset(_lastEnquedOffset) to zookeeper and +1 to _emittedToOffset from the constructer
having README.md states that "the last processed offset is write in Zookeeper."
dont know if these change will affect other class or not

only tested for 1.0.6
but base on the logic this issue should reproduce in 1.0.8 as well

Doesn't seems to be working

I gave this project a try, i cloned the repo, build the project and used that jar inside my SparkStreaming project and used your KafkaReceiver to create dstream. Whenever i run it, it just crashes with my SparkContext being shutdown.

You can look at the code over here https://gist.github.com/akhld/a59a2369f0f1f5509af4

This is what happens when i run it in Standalone mode https://gist.github.com/akhld/b5627bf866721df7321b

This is what happens when i run it in local mode https://gist.github.com/akhld/36baccb2a866106315f6

getting GC : outofmemory exception

Hi,

I am using this consumer with the following settings:

.set("spark.cleaner.ttl", "800")
.set("spark.executor.memory", "8g")
.set("spark.driver.memory", "8g")
.set("spark.driver.maxResultSize", "10g")

I need to do my aggregations on 10 minutes of data, so I am creating JavaStreamingContext with 10 minutes (600000 milliseconds).

The data is around 5 - 7 lakh records every 10 min.

The program is throwing GC outofmemory exception, so I increased the jvm heap size to 16 GB.
But still I am getting the same error after 20-30 minutes.

And when I checked the memory consumption, this process is consuming around 57-58 percent of my memory. My machine's total memory is 30 GB.

Please let me know what could be the reason behind this.

Thanks,

ReceiverLauncher.launch can not get data

The blow is the log of the application, in the log i found spark dstream always consume the kafka, but it can not do nothing.

My code:
val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY)
tmp_stream.foreachRDD(rdd => {
if (!rdd.isEmpty()){
val today = DateTime.now
val nowDay = today.toString("yyyy-MM-dd")
val lastDay = today.minusDays(1).toString("yyyy-MM-dd")
val nowDayType = today.toString("yyyy.MM.dd")
val lastDayType = today.minusDays(1).toString("yyyy.MM.dd")
println(nowDay)
sqlContext.createDataFrame(rdd.map(x => new String(x.getPayload, "UTF-8"))
.flatMap(checkClick), affiliate_data_schema).where(s"dt = '${nowDay}'")
.saveToEs(s"spark-aflt-all-record-${nowDayType}/record", esconf)
sqlContext.createDataFrame(rdd.map(x => new String(x.getPayload, "UTF-8"))
.flatMap(checkClick), affiliate_data_schema).where(s"dt = '${lastDay}'")
.saveToEs(s"spark-aflt-all-record-${lastDayType}/record", esconf)
} else {
println("rdd is empty")
}
} )

  • 17/02/28 21:37:06 INFO Version: Elasticsearch Hadoop v5.0.0-alpha5 [c513369b06]
  • 17/02/28 21:37:07 INFO EsDataFrameWriter: Writing to [spark-aflt-all-record-2017.02.28/record]
  • 17/02/28 21:37:07 INFO EsDataFrameWriter: Writing to [spark-aflt-all-record-2017.02.28/record]

17/02/28 21:37:02 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 40.3 KB, free 1618.0 MB)
17/02/28 21:37:02 INFO PartitionManager: Consumed offset 2438034718 for Partition 1 written to ZK
17/02/28 21:37:03 INFO MemoryStore: Block input-1-1488346622800 stored as values in memory (estimated size 1055.5 KB, free 1617.0 MB)
17/02/28 21:37:03 INFO BlockGenerator: Pushed block input-1-1488346622800
17/02/28 21:37:03 INFO BlockManager: Found block input-1-1488346618800 locally
17/02/28 21:37:03 INFO BlockManager: Found block input-1-1488346619000 locally
17/02/28 21:37:03 INFO PartitionManager: Consumed offset 2438036464 for Partition 1 written to ZK
17/02/28 21:37:04 INFO MemoryStore: Block input-1-1488346623800 stored as values in memory (estimated size 1035.7 KB, free 1616.0 MB)
17/02/28 21:37:04 INFO BlockGenerator: Pushed block input-1-1488346623800
17/02/28 21:37:04 INFO PartitionManager: Consumed offset 2438036893 for Partition 1 written to ZK
17/02/28 21:37:05 INFO MemoryStore: Block input-1-1488346624800 stored as values in memory (estimated size 248.7 KB, free 1615.7 MB)
17/02/28 21:37:05 INFO BlockGenerator: Pushed block input-1-1488346624800
17/02/28 21:37:05 INFO CodeGenerator: Code generated in 1382.889398 ms
17/02/28 21:37:05 INFO CodeGenerator: Code generated in 289.775591 ms
17/02/28 21:37:05 INFO PartitionManager: Consumed offset 2438037051 for Partition 1 written to ZK
17/02/28 21:37:06 INFO MemoryStore: Block input-1-1488346625800 stored as values in memory (estimated size 99.6 KB, free 1615.6 MB)
17/02/28 21:37:06 INFO BlockGenerator: Pushed block input-1-1488346625800
17/02/28 21:37:06 INFO CodeGenerator: Code generated in 341.839913 ms
17/02/28 21:37:06 INFO Version: Elasticsearch Hadoop v5.0.0-alpha5 [c513369b06]
17/02/28 21:37:07 INFO EsDataFrameWriter: Writing to [spark-aflt-all-record-2017.02.28/record]
17/02/28 21:37:07 INFO EsDataFrameWriter: Writing to [spark-aflt-all-record-2017.02.28/record]or: Pushed block input-1-1488346628800
17/02/28 21:37:10 INFO PartitionManager: Consumed offset 2438037474 for Partition 1 written to ZK
17/02/28 21:37:11 INFO MemoryStore: Block input-1-1488346630800 stored as values in memory (estimated size 88.7 KB, free 1615.4 MB)
17/02/28 21:37:11 INFO BlockGenerator: Pushed block input-1-1488346630800
17/02/28 21:37:11 INFO PartitionManager: Consumed offset 2438037671 for Partition 1 written to ZK
17/02/28 21:37:12 INFO MemoryStore: Block input-1-1488346631800 stored as values in memory (estimated size 121.1 KB, free 1615.3 MB)
17/02/28 21:37:12 INFO BlockGenerator: Pushed block input-1-1488346631800
17/02/28 21:37:12 INFO PartitionManager: Consumed offset 2438037859 for Partition 1 written to ZK
17/02/28 21:37:13 INFO MemoryStore: Block input-1-1488346632800 stored as values in memory (estimated size 115.8 KB, free 1615.1 MB)
17/02/28 21:37:13 INFO BlockGenerator: Pushed block input-1-1488346632800
17/02/28 21:37:14 INFO CodeGenerator: Code generated in 909.674488 ms
17/02/28 21:37:15 INFO PartitionManager: Consumed offset 2438038010 for Partition 1 written to ZK
17/02/28 21:37:16 INFO MemoryStore: Block input-1-1488346635800 stored as values in memory (estimated size 95.0 KB, free 1615.0 MB)
17/02/28 21:37:16 INFO BlockGenerator: Pushed block input-1-1488346635800
17/02/28 21:37:16 INFO Executor: Finished task 3.0 in stage 6.0 (TID 79). 1366 bytes result sent to driver
17/02/28 21:37:16 INFO PartitionManager: Consumed offset 2438038227 for Partition 1 written to ZK
17/02/28 21:37:17 INFO MemoryStore: Block input-1-1488346636800 stored as values in memory (estimated size 136.0 KB, free 1614.9 MB)
17/02/28 21:37:17 INFO BlockGenerator: Pushed block input-1-1488346636800
17/02/28 21:37:17 INFO Executor: Finished task 2.0 in stage 6.0 (TID 77). 2165 bytes result sent to driver
17/02/28 21:37:18 INFO PartitionManager: Consumed offset 2438038614 for Partition 1 written to ZK
17/02/28 21:37:19 INFO MemoryStore: Block input-1-1488346638800 stored as values in memory (estimated size 240.8 KB, free 1614.7 MB)
17/02/28 21:37:19 INFO BlockGenerator: Pushed block input-1-1488346638800
17/02/28 21:37:21 INFO PartitionManager: Consumed offset 2438038949 for Partition 1 written to ZK
17/02/28 21:37:21 INFO MemoryStore: Block input-1-1488346641000 stored as values in memory (estimated size 209.7 KB, free 1614.5 MB)
17/02/28 21:37:21 INFO BlockGenerator: Pushed block input-1-1488346641000
17/02/28 21:37:22 INFO PartitionManager: Consumed offset 2438039279 for Partition 1 written to ZK
17/02/28 21:37:22 INFO MemoryStore: Block input-1-1488346642000 stored as values in memory (estimated size 207.3 KB, free 1614.3 MB)
17/02/28 21:37:22 INFO BlockGenerator: Pushed block input-1-1488346642000
17/02/28 21:37:24 INFO PartitionManager: Consumed offset 2438039451 for Partition 1 written to ZK
17/02/28 21:37:24 INFO MemoryStore: Block input-1-1488346644000 stored as values in memory (estimated size 106.9 KB, free 1614.2 MB)
17/02/28 21:37:24 INFO BlockGenerator: Pushed block input-1-1488346644000
17/02/28 21:37:26 INFO PartitionManager: Consumed offset 2438039633 for Partition 1 written to ZK
17/02/28 21:37:26 INFO MemoryStore: Block input-1-1488346646000 stored as values in memory (estimated size 113.0 KB, free 1614.1 MB)
17/02/28 21:37:26 INFO BlockGenerator: Pushed block input-1-1488346646000
17/02/28 21:37:27 INFO PartitionManager: Consumed offset 2438039793 for Partition 1 written to ZK
17/02/28 21:37:27 INFO MemoryStore: Block input-1-1488346647000 stored as values in memory (estimated size 98.4 KB, free 1614.0 MB)
17/02/28 21:37:27 INFO BlockGenerator: Pushed block input-1-1488346647000
17/02/28 21:37:31 INFO PartitionManager: Consumed offset 2438040131 for Partition 1 written to ZK
17/02/28 21:37:31 INFO MemoryStore: Block input-1-1488346651000 stored as values in memory (estimated size 210.8 KB, free 1613.8 MB)
17/02/28 21:37:31 INFO BlockGenerator: Pushed block input-1-1488346651000
17/02/28 21:37:36 INFO PartitionManager: Consumed offset 2438040293 for Partition 1 written to ZK
17/02/28 21:37:36 INFO MemoryStore: Block input-1-1488346656000 stored as values in memory (estimated size 101.0 KB, free 1613.7 MB)
17/02/28 21:37:36 INFO BlockGenerator: Pushed block input-1-1488346656000
17/02/28 21:37:39 INFO PartitionManager: Consumed offset 2438040484 for Partition 1 written to ZK
17/02/28 21:37:39 INFO MemoryStore: Block input-1-1488346659000 stored as values in memory (estimated size 121.8 KB, free 1613.5 MB)
17/02/28 21:37:39 INFO BlockGenerator: Pushed block input-1-1488346659000
17/02/28 21:37:41 INFO PartitionManager: Consumed offset 2438040614 for Partition 1 written to ZK
17/02/28 21:37:41 INFO MemoryStore: Block input-1-1488346661000 stored as values in memory (estimated size 81.1 KB, free 1613.5 MB)
17/02/28 21:37:41 INFO BlockGenerator: Pushed block input-1-1488346661000
17/02/28 21:37:42 INFO PartitionManager: Consumed offset 2438040749 for Partition 1 written to ZK
17/02/28 21:37:42 INFO MemoryStore: Block input-1-1488346662000 stored as values in memory (estimated size 83.3 KB, free 1613.4 MB)
17/02/28 21:37:42 INFO BlockGenerator: Pushed block input-1-1488346662000
17/02/28 21:37:43 INFO PartitionManager: Consumed offset 2438041118 for Partition 1 written to ZK
17/02/28 21:37:43 INFO MemoryStore: Block input-1-1488346663000 stored as values in memory (estimated size 228.7 KB, free 1613.2 MB)
17/02/28 21:37:43 INFO BlockGenerator: Pushed block input-1-1488346663000
17/02/28 21:37:47 INFO PartitionManager: Consumed offset 2438041299 for Partition 1 written to ZK
17/02/28 21:37:47 INFO MemoryStore: Block input-1-1488346667000 stored as values in memory (estimated size 114.0 KB, free 1613.0 MB)
17/02/28 21:37:47 INFO BlockGenerator: Pushed block input-1-1488346667000
17/02/28 21:37:49 INFO PartitionManager: Consumed offset 2438041670 for Partition 1 written to ZK
17/02/28 21:37:49 INFO MemoryStore: Block input-1-1488346669000 stored as values in memory (estimated size 231.0 KB, free 1612.8 MB)
17/02/28 21:37:49 INFO BlockGenerator: Pushed block input-1-1488346669000
17/02/28 21:37:51 INFO PartitionManager: Consumed offset 2438041997 for Partition 1 written to ZK
17/02/28 21:37:51 INFO MemoryStore: Block input-1-1488346671000 stored as values in memory (estimated size 206.4 KB, free 1612.6 MB)
17/02/28 21:37:51 INFO BlockGenerator: Pushed block input-1-1488346671000
17/02/28 21:37:53 INFO PartitionManager: Consumed offset 2438042157 for Partition 1 written to ZK
17/02/28 21:37:53 INFO MemoryStore: Block input-1-1488346673000 stored as values in memory (estimated size 102.0 KB, free 1612.5 MB)
17/02/28 21:37:53 INFO BlockGenerator: Pushed block input-1-1488346673000
17/02/28 21:37:56 INFO PartitionManager: Consumed offset 2438042349 for Partition 1 written to ZK
17/02/28 21:37:56 INFO MemoryStore: Block input-1-1488346676000 stored as values in memory (estimated size 116.8 KB, free 1612.4 MB)
17/02/28 21:37:56 INFO BlockGenerator: Pushed block input-1-1488346676000
17/02/28 21:37:57 INFO PartitionManager: Consumed offset 2438042826 for Partition 1 written to ZK
17/02/28 21:37:57 INFO MemoryStore: Block input-1-1488346677000 stored as values in memory (estimated size 251.6 KB, free 1612.2 MB)
17/02/28 21:37:57 INFO BlockGenerator: Pushed block input-1-1488346677000
17/02/28 21:37:58 INFO PartitionManager: Consumed offset 2438043001 for Partition 1 written to ZK
17/02/28 21:37:58 INFO MemoryStore: Block input-1-1488346678000 stored as values in memory (estimated size 109.5 KB, free 1612.0 MB)
17/02/28 21:37:58 INFO BlockGenerator: Pushed block input-1-1488346678000
17/02/28 21:38:01 INFO PartitionManager: Consumed offset 2438043002 for Partition 1 written to ZK
17/02/28 21:38:01 INFO MemoryStore: Block input-1-1488346681000 stored as values in memory (estimated size 968.0 B, free 1612.0 MB)
17/02/28 21:38:01 INFO BlockGenerator: Pushed block input-1-1488346681000
17/02/28 21:38:02 INFO PartitionManager: Consumed offset 2438043003 for Partition 1 written to ZK
17/02/28 21:38:02 INFO MemoryStore: Block input-1-1488346682000 stored as values in memory (estimated size 1232.0 B, free 1612.0 MB)
17/02/28 21:38:02 INFO BlockGenerator: Pushed block input-1-1488346682000
17/02/28 21:38:03 INFO PartitionManager: Consumed offset 2438043004 for Partition 1 written to ZK
17/02/28 21:38:03 INFO MemoryStore: Block input-1-1488346683000 stored as values in memory (estimated size 888.0 B, free 1612.0 MB)
17/02/28 21:38:03 INFO BlockGenerator: Pushed block input-1-1488346683000
17/02/28 21:38:04 INFO PartitionManager: Consumed offset 2438043005 for Partition 1 written to ZK
17/02/28 21:38:04 INFO MemoryStore: Block input-1-1488346684000 stored as values in memory (estimated size 1072.0 B, free 1612.0 MB)
17/02/28 21:38:04 INFO BlockGenerator: Pushed block input-1-1488346684000
17/02/28 21:38:05 INFO PartitionManager: Consumed offset 2438043006 for Partition 1 written to ZK
17/02/28 21:38:05 INFO MemoryStore: Block input-1-1488346685000 stored as values in memory (estimated size 1232.0 B, free 1612.0 MB)
17/02/28 21:38:05 INFO BlockGenerator: Pushed block input-1-1488346685000
17/02/28 21:38:06 INFO PartitionManager: Consumed offset 2438043007 for Partition 1 written to ZK
17/02/28 21:38:06 INFO MemoryStore: Block input-1-1488346686000 stored as values in memory (estimated size 1016.0 B, free 1612.0 MB)
17/02/28 21:38:06 INFO BlockGenerator: Pushed block input-1-1488346686000
17/02/28 21:38:07 INFO PartitionManager: Consumed offset 2438043008 for Partition 1 written to ZK
17/02/28 21:38:07 INFO MemoryStore: Block input-1-1488346687000 stored as values in memory (estimated size 1032.0 B, free 1612.0 MB)
17/02/28 21:38:07 INFO BlockGenerator: Pushed block input-1-1488346687000
17/02/28 21:38:08 INFO PartitionManager: Consumed offset 2438043009 for Partition 1 written to ZK
17/02/28 21:38:08 INFO MemoryStore: Block input-1-1488346688000 stored as values in memory (estimated size 1016.0 B, free 1612.0 MB)
17/02/28 21:38:08 INFO BlockGenerator: Pushed block input-1-1488346688000
17/02/28 21:38:09 INFO PartitionManager: Consumed offset 2438043011 for Partition 1 written to ZK
17/02/28 21:38:09 INFO MemoryStore: Block input-1-1488346689000 stored as values in memory (estimated size 1384.0 B, free 1612.0 MB)
17/02/28 21:38:09 INFO BlockGenerator: Pushed block input-1-1488346689000
17/02/28 21:38:10 INFO PartitionManager: Consumed offset 2438043012 for Partition 1 written to ZK
17/02/28 21:38:10 INFO MemoryStore: Block input-1-1488346690000 stored as values in memory (estimated size 912.0 B, free 1612.0 MB)
17/02/28 21:38:10 INFO BlockGenerator: Pushed block input-1-1488346690000
17/02/28 21:38:11 INFO PartitionManager: Consumed offset 2438043013 for Partition 1 written to ZK

Missing explanation why it has better kafka offsets/error handling

Hello,

The readme of this project mentions "This utility will help to pull messages from Kafka Cluster using Spark Streaming. The Kafka Consumer is Low Level Kafka Consumer ( SimpleConsumer) and have better handling of the Kafka Offsets and handle failures.", but I can not find any explanation what it does differently to provide this features.

I looked through the code and I can see that kafka offsets are committed to zookeeper after a Recevier.store() (instead of auto commit). This is the main feature or are there other reasons why it has better offset handling and failure handling?

Thanks

How to use in kerberized context ?

In a context with Yarn and Kerberos where to pass this kind of options ?
`
kafkaParams.put("security.protocol", "SASL_SSL");
kafkaParams.put("ssl.truststore.location","./truststore");

kafkaParams.put("ssl.truststore.password", "pass");
`

Consumer not able to read kafka topic and write to HDFS

The kafka-spark-consumer (v.1.0.6) job was reading kafka topic and writing to couchbase successfully on CDH 5.5.1 Cluster. But it is not reading or writing the data after we upgraded to CDH 5.9.0. Additionally we are also trying to write to HDFS along with Couchbase. The job just creates specified directories in HDFS and keeps running forever but does not read or write anything.
Specs:
kafka version: 0.8.2.0
Zookeeper version: zookeeper-3.4.5+cdh5.9.0+98
spark version: spark-1.6.0+cdh5.9.0+229

pom.xml :

dibbhatt
kafka-spark-consumer
1.0.6

We also tried to use latest version v1.0.7 but it didnt help.

Thanks
Srikanth

Licensing concerns with current kafka-spark-consumer source code

Dibyendu,

first, thanks for your work on providing an improved Kafka consumer for Spark Streaming. Much appreciated!

I have been playing around with Kafka and Spark Streaming myself, and stumbled upon your project in the spark-user thread where you announced it last month. Since there are apparently still a couple of issues (including Spark issues) to be ironed out, I began reading your source code for further details on the current status of Kafka support in Spark Streaming -- actually because I thought that "Hey, the Apache Storm project has a reasonable Kafka spout/connector, maybe that code would help the Spark project to improve their own variant."

While reading your source code that I noticed that apparently most of the code is a verbatim copy of the Kafka spout of the Apache Storm project, which was originally created by wurstmeister. In both cases the code is licensed under the Apache License v2.0, which means you can't just copy the code -- there are some rules you must follow. (And both Apache Spark and Apache Storm, as ASF projects, are using the very same license, which also means it's easy to share code amongst the projects.) Notably, "you must give any other recipients of derivative work a copy of that license, you must cause any modified files to carry prominent notices stating that you changed the files, and you must retain, in the source form of any derivative works that you distribute, all copyright, patent, trademark, and attribution notices from the source form of the work, excluding those notices that do not pertain to any part of the derivative works". See Apache License v2.0 for details of what you would have to do/change/add/etc. to be license compliant.

I am sure you have done this in good faith, and I am making you aware of this issue primarily to help you.

Best wishes,
Michael

Not working with Spark 2.2.0

Since Spark has been updated to version 2.2.0, the following exception is thrown:

Exception in thread "SparkListenerBus" java.lang.AbstractMethodError
ERROR: org.apache.spark.util.Utils - uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.AbstractMethodError
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:69)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:69)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)

How to recover the failed receiver on a partition which has exception of " Offsets out of range with no configured reset policy for partitions:"

Hello Dibyendu,
When a partition met a exception of "Offsets out of range with no configured reset policy for partitions: ", then this partition no longer works, how to recover the receiver for the failed partition?
following is the details log message:

18/09/20 00:32:40 WARN PartitionManager: Got fetch request with offset out of range: 313303788 for Topic tr69data24 partition 19
18/09/20 00:32:40 WARN PartitionManager: Offset reset to LatestTime 313303788 for Topic tr69data24 partition 19
18/09/20 00:32:40 ERROR KafkaSparkConsumer: Partition 19 encountered error during createStream : Offsets out of range with no configured reset policy for partitions: {tr69data24-19=313303788}
consumer.kafka.OutOfRangeException: Offsets out of range with no configured reset policy for partitions: {tr69data24-19=313303788}
at consumer.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:75)
at consumer.kafka.PartitionManager.fetchMessages(PartitionManager.java:278)
at consumer.kafka.PartitionManager.fill(PartitionManager.java:195)
at consumer.kafka.PartitionManager.next(PartitionManager.java:134)
at consumer.kafka.KafkaSparkConsumer.createStream(KafkaSparkConsumer.java:96)
at consumer.kafka.KafkaSparkConsumer.run(KafkaSparkConsumer.java:118)
at java.lang.Thread.run(Thread.java:745)
18/09/20 00:32:40 INFO ZooKeeper: Session: 0x265abf0f5a496a0 closed
18/09/20 00:32:40 INFO ClientCnxn: EventThread shut down
consumer.kafka.OutOfRangeException: Offsets out of range with no configured reset policy for partitions: {tr69data24-19=313303788}
at consumer.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:75)
at consumer.kafka.PartitionManager.fetchMessages(PartitionManager.java:278)
at consumer.kafka.PartitionManager.fill(PartitionManager.java:195)
at consumer.kafka.PartitionManager.next(PartitionManager.java:134)
at consumer.kafka.KafkaSparkConsumer.createStream(KafkaSparkConsumer.java:96)
at consumer.kafka.KafkaSparkConsumer.run(KafkaSparkConsumer.java:118)
at java.lang.Thread.run(Thread.java:745)

18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 443585258 for Parittion 0
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 311429390 for Parittion 2
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 315225480 for Parittion 5
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 385262949 for Parittion 6
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 166297698 for Parittion 7
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 490824808 for Parittion 8
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 589901040 for Parittion 10
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 320172181 for Parittion 11
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 388380731 for Parittion 12
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 404124739 for Parittion 14
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 666967289 for Parittion 15
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 434333538 for Parittion 16
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 368144106 for Parittion 17
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 244265022 for Parittion 18
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 445694201 for Parittion 20

18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 205794190 for Parittion 21
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 420965741 for Parittion 22
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 338009268 for Parittion 23
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 389383488 for Parittion 24
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 444850876 for Parittion 26
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 254682257 for Parittion 28
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 296762823 for Parittion 29
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 280982673 for Parittion 30
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 148408705 for Parittion 31
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 345764987 for Parittion 32
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 198189174 for Parittion 34
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 153297639 for Parittion 35
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 345925489 for Parittion 36
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 353553758 for Parittion 37
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 293651243 for Parittion 38
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 457799405 for Parittion 40
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 352548943 for Parittion 42
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 373666232 for Parittion 43
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 450144063 for Parittion 44
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 200918074 for Parittion 45
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 55549751 for Parittion 46
18/09/20 09:41:00 INFO ProcessedOffsetManager: Wrote processed offset 266091161 for Parittion 47

Thanks,
Fred

org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/load/partitions

Hi,
I am trying to run the consumer , but getting following error. Any idea why i am getting this?

INFO : org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
Exception in thread "main" java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/load/partitions
at consumer.kafka.ReceiverLauncher.getNumPartitions(ReceiverLauncher.java:109)
at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:53)
at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:36)
at consumer.kafka.client.Consumer.run(Consumer.java:72)
at consumer.kafka.client.Consumer.start(Consumer.java:43)
at consumer.kafka.client.Consumer.main(Consumer.java:95)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/load/partitions
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:203)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:199)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191)
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38)
at consumer.kafka.ReceiverLauncher.getNumPartitions(ReceiverLauncher.java:105)
... 5 more

Static zookeeper hosts configuration

The current configuration for zookeeper.hosts doesn't allow url paths to connect to different zookeepers.

My zookeeper instance is running under myhost:2181/kafka

Map(
  "zookeeper.hosts" -> "myhost",
  "zookeeper.port" -> "2181"
)

will connect to the wrong zookeeper. The easiest way arround this would be a zookeeper.connections setting.

Map(
  "zookeeper.connections" -> "myhost:2181/kafka"
)

which we can prefere over the host/port combination. WDYT?

Release on Spark Packages Repo

Hi,
Would you like to make a release of this on the Spark Packages Repository? This will allow users to easily include this package in their Spark Applications simply by adding the flag:
--packages dibbhatt/kafka-spark-consumer:0.1
to spark-shell, spark-submit, or even pyspark.

For this, you need to upload a "Release Artifact". You can make the release directly from the command line by simply using the spark-package command tool, with the command spark-package publish. Please refer to the README. Or you can go through the Release process on the webpage. Since your project contains java code, you will need to build your jar beforehand using maven.

Let me know if you have any questions or issues!

Best,
Burak

work fine few days, then can't commit offerset, no error log, and no info log about PartitionManager

the work fine few days, then, no commit offersset in zk

the last commit success log:

16/10/18 17:18:46 INFO PartitionManager: Committed offset 4951367 for Partition{host=x.x.x.x:9092, partition=0} for consumer: 123
16/10/18 17:18:47 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
16/10/18 17:18:47 INFO PartitionManager: Total 1 messages from Kafka: x.x.x.x:0 there in internal buffers
16/10/18 17:18:47 INFO PartitionManager: Store for topic stream.affiliate_converted_click_record_log for partition 0 is : 4951367
16/10/18 17:18:47 INFO MemoryStore: Block input-0-1476427412421 stored as values in memory (estimated size 1392.0 B, free 2.8 GB)
16/10/18 17:18:47 INFO PartitionManager: LastComitted Offset : 4951367
16/10/18 17:18:47 INFO PartitionManager: New Emitted Offset : 4951368
16/10/18 17:18:47 INFO PartitionManager: Enqueued Offset :4951367
16/10/18 17:18:47 INFO PartitionManager: Last Enqueued offset 4951367 not incremented since previous Comitted Offset 4951367 for partition  Partition{host=x.x.x.x:9092, partition=0} for Consumer 123. Some issue in Process!!
16/10/18 17:18:48 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
...
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302048
16/10/18 17:19:00 INFO Executor: Running task 0.0 in stage 20386.0 (TID 302048)
16/10/18 17:19:00 INFO TorrentBroadcast: Started reading broadcast variable 20386
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20386_piece0 stored as bytes in memory (estimated size 1399.0 B, free 2.8 GB)
16/10/18 17:19:00 INFO TorrentBroadcast: Reading broadcast variable 20386 took 2 ms
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20386 stored as values in memory (estimated size 2.0 KB, free 2.8 GB)
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412419 locally
16/10/18 17:19:00 WARN Executor: 1 block locks were not released by TID = 302048:
[input-0-1476427412419]
16/10/18 17:19:00 INFO Executor: Finished task 0.0 in stage 20386.0 (TID 302048). 2322 bytes result sent to driver
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302049
16/10/18 17:19:00 INFO Executor: Running task 0.0 in stage 20387.0 (TID 302049)
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302051
16/10/18 17:19:00 INFO Executor: Running task 1.0 in stage 20387.0 (TID 302051)
16/10/18 17:19:00 INFO TorrentBroadcast: Started reading broadcast variable 20387
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20387_piece0 stored as bytes in memory (estimated size 12.5 KB, free 2.8 GB)
16/10/18 17:19:00 INFO TorrentBroadcast: Reading broadcast variable 20387 took 3 ms
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20387 stored as values in memory (estimated size 35.1 KB, free 2.8 GB)
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412419 locally
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412420 locally
16/10/18 17:19:00 INFO EsDataFrameWriter: Writing to [spark-aflt-data-test-2016-10-18/sparktest]
16/10/18 17:19:00 INFO EsDataFrameWriter: Writing to [spark-aflt-data-test-2016-10-18/sparktest]

then the log allways:

16/10/18 17:19:32 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
16/10/18 17:19:33 INFO ZkCoordinator: Refreshing partition manager connections
16/10/18 17:19:33 INFO DynamicBrokersReader: Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=x.x.x.x:9092, 1=x.x.x.x:9092}}
16/10/18 17:19:33 INFO ZkCoordinator: Added partition index 0 for coordinator
16/10/18 17:19:33 INFO ZkCoordinator: Deleted partition managers: []
16/10/18 17:19:33 INFO ZkCoordinator: New partition managers: []
16/10/18 17:19:33 INFO ZkState: Starting curator service
16/10/18 17:19:33 INFO CuratorFrameworkImpl: Starting
16/10/18 17:19:33 INFO ZooKeeper: Initiating client connection, connectString=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 sessionTimeout=120000 watcher=org.apache.curator.ConnectionState@44bae644
16/10/18 17:19:33 INFO ClientCnxn: Opening socket connection to server x.x.x.x/x.x.x.x:2181. Will not attempt to authenticate using SASL (unknown error)
16/10/18 17:19:33 INFO ClientCnxn: Socket connection established to x.x.x.x/x.x.x.x:2181, initiating session
16/10/18 17:19:33 INFO ClientCnxn: Session establishment complete on server x.x.x.x/x.x.x.x:2181, sessionid = 0x2535a6cba5348e6, negotiated timeout = 120000
16/10/18 17:19:33 INFO ConnectionStateManager: State change: CONNECTED
16/10/18 17:19:33 INFO ZkCoordinator: Modified Fetch Rate for topic stream.affiliate_converted_click_record_log to : 1024
16/10/18 17:19:33 INFO ZooKeeper: Session: 0x2535a6cba5348e6 closed
16/10/18 17:19:33 INFO ClientCnxn: EventThread shut down
16/10/18 17:19:33 INFO ZkCoordinator: Finished refreshing
16/10/18 17:19:33 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576

spark: 2.10_2.0.0

kafka-spark-consumer:1.0.6

kafka:0.8.2.2

Exception: Could not compute split, block not found

I currently experience the above exception. It seems to happen when receiving data from multiple receivers and unioning said stream of data.

The exceptions always looks like this
17/11/20 18:52:59 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 19341
17/11/20 18:52:59 INFO executor.Executor: Running task 37.0 in stage 131.0 (TID 19341)
17/11/20 18:52:59 ERROR executor.Executor: Exception in task 37.0 in stage 131.0 (TID 19341)
java.lang.Exception: Could not compute split, block input-0-1511200296800 not found
  at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
17/11/20 18:53:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 19343
17/11/20 18:53:00 INFO executor.Executor: Running task 38.0 in stage 131.0 (TID 19343)
17/11/20 18:53:00 ERROR executor.Executor: Exception in task 38.0 in stage 131.0 (TID 19343)
java.lang.Exception: Could not compute split, block input-0-1511200297800 not found
  at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

The first time it happened after 45 minutes. The second time it happened after 2 minutes. First run used 4 receivers and 12 executors. Second run used 2 receivers and 8 executors.

Have you seen this problem before?

Unable to receive data

Hi Dibbhatt,

A am facing some issue after changing one of the zookeeper nodes. I've changed all the settings related to zookeeper ip's. But getting the following:

ERROR ReceiverTracker: Deregistered receiver for stream 1: Error starting receiver 1 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73)
at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64)
at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61)
at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73)
at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117)
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62)
... 16 more
Caused by: java.lang.IllegalStateException: instance must be started before calling this method
at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360)
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110)
... 17 more

15/09/15 12:56:45 ERROR ReceiverTracker: Deregistered receiver for stream 1: Error starting receiver 1 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73)
at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64)
at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61)
at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73)
at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117)
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62)
... 16 more
Caused by: java.lang.IllegalStateException: instance must be started before calling this method
at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360)
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110)
... 17 more

entered forEachRDD
[Stage 2:> (0 + 4) / 4][Stage 3:> (0 + 0) / 14]
15/09/15 12:57:24 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73)
at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64)
at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61)
at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73)
at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117)
at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62)
... 16 more
Caused by: java.lang.IllegalStateException: instance must be started before calling this method
at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360)
at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110)
... 17 more

Regards,
Sorabh

mvn repository you provided is not working

I can't download the jar file through http://dl.bintray.com/spark-packages/maven

  <dependency>
            <groupId>kafka.spark.consumer</groupId>
            <artifactId>kafka-spark-consumer</artifactId>
            <version>1.0.8</version>
        </dependency>
   <repositories>
        <!-- list of other repositories -->
        <repository>
            <id>SparkPackagesRepo</id>
            <url>http://dl.bintray.com/spark-packages/maven</url>
        </repository>
    </repositories>

is not working for me.

thanks.

Contributing Receiver based Low Level Kafka Consumer from Spark-Packages to Apache Spark Project

Dear all ,

I have created a JIRA to track the progress of contributing back this project to Apache Spark.

https://issues.apache.org/jira/browse/SPARK-11045

This project is now presently in spark-packages and I believe this is the correct time to contribute it to Apache Spark Project and give better options to larger community around Kafka Connectivity for Spark Streaming.

Those who are using this packages , kindly Vote for this JIRA.

Python api?

Just wanted to check if there is any way to use this package in pyspark :)

Not able to get streaming data

hi Dibbhatt,

I'm not able to figure out why am I not getting the content in my batch.
I have added an SOP message in foreachRDD().call method but it seems as if this method is not at all getting executed.
Following is my piece of code:

props.put("consumer.forcefromstart", "false");
props.put("consumer.fetchsizebytes", "524288");
props.put("consumer.fillfreqms", "2000");

    SparkConf _sparkConf = new SparkConf();
    _sparkConf.setAppName("KafkaReceiver");
    _sparkConf.setMaster("local[4]");
    _sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false");


            JavaStreamingContext jsc = new JavaStreamingContext(_sparkConf,
                    new Duration(100000));
            //Specify number of Receivers you need.
            int numberOfReceivers = 4;

            JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(jsc, props, numberOfReceivers, StorageLevel.DISK_ONLY());

            unionStreams.foreachRDD(new Function2<JavaRDD<MessageAndMetadata>, Time, Void>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = -5999013346771943994L;

                public Void call(JavaRDD<MessageAndMetadata> rdd,
                        Time time) throws Exception {
                    System.out.println(" methiod call rdd.collect()");
                    rdd.collect();
                    return null;
                }
            });

            jsc.start();
            jsc.awaitTermination();

I am new to Spark so not very comfortable with it,
please let me know when this call() method gets executed and do I need a spark cluster to execute this.

Great Thanks,

exception on simple example

 WARN [run-main-0] NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[error] (run-main-0) java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
    at consumer.kafka.KafkaConfig.<init>(KafkaConfig.java:38)
    at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:88)
    at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:66)
    at it.dtk.KafkaConsumerTest$.main(KafkaConsumerTest.scala:48)
    at it.dtk.KafkaConsumerTest.main(KafkaConsumerTest.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at consumer.kafka.KafkaConfig.<init>(KafkaConfig.java:38)
    at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:88)
    at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:66)
    at it.dtk.KafkaConsumerTest$.main(KafkaConsumerTest.scala:48)
    at it.dtk.KafkaConsumerTest.main(KafkaConsumerTest.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)

failure occurred when set spark.streaming.concurrentJobs from 1 to 2

Exception like this:

02 DEBUG zookeeper.ClientCnxn: Reading reply sessionid:0x3557739d57e0613, packet:: clientPath:null serverPath:null finished:false header:: 305,4 replyHeader:: 305,21475155982,-101 request:: '/brokers/ids/-1,F response::
16/06/22 19:03:02 ERROR kafka.DynamicBrokersReader: Node /brokers/ids/-1 does not exist

Consumer how to use Kafka utilities like ConsumerOffsetChecker and get to show on Kafka Tool

We are using kafka-spark-consumer verison 1.0.6, kafka 0.8.2 and here are our configurations-

zookeeper.consumer.path=/listingmetrics-impressions-kafka
kafka.consumer.id=listingmetricsimpressionssparkconsumer-prod-2
group.id=ListingMetricsImpressionsSparkConsumer-prod
consumer.forcefromstart=false
consumer.fetchsizebytes=102400
consumer.fillfreqm=250
consumer.receivers=1

Out issue is that this consumer does not seem to be listed as a consumer group
when using the following kafka utility
kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper my-host:my-port --topic impression --group listingmetricsimpressionssparkconsumer-prod-2

group.id=ListingMetricsImpressionsSparkConsumer-prod), group.id (ListingMetricsImpressionsSparkConsumer-prod)from the config

we get the following error Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/ListingMetricsImpressionsSparkConsumer-prod/offsets/impression/6.

The consumer shows for other flume related groups

kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper my-host:my-port --topic impression --group flume
Group Topic Pid Offset logSize Lag Owner
flume impression 0 2195314576 2707268737 511954161 none
flume impression 1 2200432543 2712319730 511887187 none
flume impression 2 2188578827 2700580138 512001311 none
flume impression 3 2192263104 2704228698 511965594 none

Even on the Kafka Tool on the consumers list I dont see the spark-kafka-consumer.
Can you please let us know how we can get these utilities working for the spark kafka consumer

High CPU usage

Hi Dibyendu,

While the low-level Kafka consumer works fine, it also seems to require a suspiciously high amount of CPU cycles, even on a beefy, bare-metal machine.

With a single consumer reading from a single partition, CPU usage on the machine running the consumer goes up to 25% to read 4 Mo/s.

After bumping kafka.partitions.number to 50, still with a single consumer, and with the same data pushed to a single partition at a 4 Mo/s rate, CPU usage jumps to 100% and never goes down.

Tweaking consumer.fetchsizebytes and consumer.fillfreqms didn't make a difference.

Is the Kafka consumer supposed to be that slow? If this is not the case, how may I help diagnose what is going on?

After some hours not processing because of executor fail error

Hi Dibbhatt,

I am facing some issue, when using this project for a longer duration.

ERROR TaskSchedulerImpl: Lost executor 0 on : Executor heartbeat timed out after 161334 ms
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 20:58:10 ERROR TaskSchedulerImpl: Lost executor 1 on : remote Rpc client disassociated
15/08/14 20:58:10 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 1
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 20:59:47 ERROR TaskSchedulerImpl: Lost an executor 1 (already removed): Executor heartbeat timed out after 124846 ms
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 21:08:47 ERROR TaskSchedulerImpl: Lost executor 0 on : Executor heartbeat timed out after 146607 ms
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 21:24:20 ERROR TaskSchedulerImpl: Lost executor 5 on : remote Rpc client disassociated
15/08/14 21:24:20 ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor 5
[Stage 2:> (0 + 4) / 4][Stage 1019:> (0 + 0) / 2]15/08/14 21:26:47 ERROR TaskSchedulerImpl: Lost an executor 5 (already removed): Executor heartbeat timed out after 165821 ms

I am using a 2 node cluster with 4 executors on each and with 5 GB and 2 cores for each executor.

One more question that I have is :

This consumer always gives data from the last saved offset, Is there a way to neglect that offset and receive only the real time data?

Regards,
Sorabh

Running application on 3 node cluster, 3 instances of the same application are shown with different user

I'm running this application on a 3 node cluster, 1 with root and other 2 with ec2-user. when I submit the application, it shows 3 instances in web UI, with the following users: 1 root and 2 ec2-user.
But the 2 instances with ec2-user are in waiting state.

I'm submitting the application from root machine.

Not able to figure out the reason for this.

One more issue that I'm facing is :
Since the default Job scheduling in standalone mode in FIFO. I've reduced the cores for each application. So this works fine but if I try to run more than that i.e let say 5 application others go in waiting state.

Is there any other type of Job Scheduling that I can use for this, So that all the application can run in parallel with no one in waiting state.

Kafka Headers Support

I'm aware that this consumer supports 0.11 brokers. Does it provide access to the kafka message headers API that was added in 0.11 release?

Unable to pull Batch info metrics using StreamingListener Interface

I have a jobListener extending StreamingListener interface to perform some tasks onBatchCompleted, it works pretty well with native SparkStreaming and KafkaUtil. But its giving wrong values when i used with this lowlevel consumer.

Here's the Listener class:

private class JobListenern(ssc: StreamingContext) extends StreamingListener {

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {

    var totalProcessedRecords = 0L
    println("====> Total delay:  " + batchCompleted.batchInfo.totalDelay.getOrElse(-1) + " ms")

    batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
      totalProcessedRecords += infos.map(_.numRecords).sum        
    }
    println("\n=====> Recieved Events: "+ totalProcessedRecords)

  }
}

You can attach it to your ssc as:

val listen = new JobListenern(ssc)
ssc.addStreamingListener(listen)

ssc.start()
ssc.awaitTermination()

Let me know if there's some other way to pull batch info

AbstractMethodError with Spark 1.6.0 and Kafka 0.10.2

I'm trying to use this library with older versions of Spark (1.6.0-cdh5.11.1) and Kafka (0.10.2-kafka-2.2.0), but while trying to persist the offsets after the application logic happened I get the mentioned error.

It seems to me that it is a version miss match between Scala versions. For me its not easy to switch to 2.11 scala so I guess my question would be: Is there a way to make your library work with my versions?

Below is the observed exception and the important bits of my pom file:

java.lang.AbstractMethodError: consumer.kafka.PartitionOffsetPair.call(Ljava/lang/Object;)Ljava/lang/Iterable;
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/11/17 12:02:52 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.AbstractMethodError: consumer.kafka.PartitionOffsetPair.call(Ljava/lang/Object;)Ljava/lang/Iterable;
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/11/17 12:02:52 INFO storage.DiskBlockManager: Shutdown hook called
               <dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.scala-lang</groupId>
					<artifactId>scala-library</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<exclusion>
				<artifactId>org.apache.kafka</artifactId>
				<groupId>kafka_2.10</groupId>
			</exclusion>
			<exclusion>
				<groupId>org.scala-lang</groupId>
				<artifactId>scala-library</artifactId>
			</exclusion>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-spark</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>dibbhatt</groupId>
			<artifactId>kafka-spark-consumer</artifactId>
			<version>1.0.12</version>
		</dependency>

Consumer commit not proceeding the offset anymore

We are using kafka-spark-consumer verison 1.0.6, kafka 0.8.2 and here are our configurations-

zookeeper.consumer.path=/listingmetrics-impressions-kafka
kafka.consumer.id=listingmetricsimpressionssparkconsumer-prod-2
group.id=ListingMetricsImpressionsSparkConsumer-prod
consumer.forcefromstart=false
consumer.fetchsizebytes=102400
consumer.fillfreqm=250
consumer.receivers=1

The issue we are facing is that

Consumer commit not proceeding the offset anymore

Heres what we found upon debugging the code-
PartitionManager -> next() method and then
-> fill() method
However the KafkaUtils.fetchMessages call is not retuning any validBytes as well as no errors
The following call

ByteBufferMessageSet msgs =
          fetchResponse.messageSet(topic, _partition.partition);

has no messages and hence _emittedToOffset is not getting incremented

When the commit is called next in the flow, we are getting the following message-

Last Enqueued offset "
          + _lastEnquedOffset
            + " not incremented since previous Comitted Offset "
            + _lastComittedOffset
            + " for partition  "
            + _partition
            + " for Consumer "
            + _ConsumerId
            + ". Some issue in Process!!"

Can you help us with the resolution. Let us know if you need any other info.
Thanks.

MEMORY_ONLY, MEMORY_ONLY_* storage levels

Hi,

Since the very first version of kafka-spark-consumer, the only storage level I have been able to use are the disk-based ones such as MEMORY_AND_DISK_SER.

Unfortunately, this requires a lot of disk operations on hosts running the Kafka receivers, and disks then become a bottleneck. Disks I/O only happen on these hosts (the ones running the receivers); I don't know if this is the expected behavior.

When using MEMORY_ONLY or MEMORY_ONLY_SER, I quickly get a slew of exceptions:

15/05/06 16:25:11 WARN TaskSetManager: Lost task 69.0 in stage 3.0 (TID 141, node-00263.hadoop): java.lang.Exception:
Could not compute split, block input-1-1430922252737 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
        at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

MEMORY_ONLY_2 and MEMORY_ONLY_SER_2 work amazingly well for a few minutes and eventually fail the same way.

Your expertise would be more than welcome.

How many KafkaReceiver do I need?

I am trying to read from a single topic, which has 100 partitions.

The description of Kafka-Spark-Consumer mentions:

The logic will detect number of partitions for a topic and spawn that many Kafka Receivers.

But the example seems to suggest that the number of partitions has to be manually given, and that for each of them, a KafkaReceiver instance is required:

val kafkaStreams = (1 to partitions).map { i=>
    ssc.receiverStream(new KafkaReceiver(props, i))
}

Do I actually need to manually spawn 100 KafkaReceiver instances? Does it mean that 100 CPU cores have to be reserved to this?

It there a way to have receivers manage multiple partitions instead of having to spawn that many receivers?

Thanks in advance for your help.

java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver

Hello~ @dibbhatt
I used this framework to integrate Kafka and Spark Streaming. But I have some problems.
My cluster is based on Spark2.0.0 , Kafka 0.10.1.0 , ZK-3.4.9.
And I fixed some syntax errors in "ProcessedOffsetManager.java" because it may be some difference between spark1.6.0 and spark2.0.0.

`
package consumer.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;

import scala.Tuple2;
import scala.reflect.ClassTag;

import com.google.common.collect.ImmutableMap;

public class ProcessedOffsetManager {

private static void persistProcessedOffsets(Properties props, Map<Integer, Long> partitionOffsetMap) {
ZkState state = new ZkState(props.getProperty(Config.ZOOKEEPER_CONSUMER_CONNECTION));
for(Map.Entry<Integer, Long> po : partitionOffsetMap.entrySet()) {
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap
.builder()
.put("consumer",ImmutableMap.of("id",props.getProperty(Config.KAFKA_CONSUMER_ID)))
.put("offset", po.getValue())
.put("partition",po.getKey())
.put("broker",ImmutableMap.of("host", "", "port", ""))
.put("topic", props.getProperty(Config.KAFKA_TOPIC)).build();
String path = processedPath(po.getKey(), props);
try{
state.writeJSON(path, data);
}catch (Exception ex) {
state.close();
throw ex;
}
}
state.close();
}

private static String processedPath(int partition, Properties props) {
return props.getProperty(Config.ZOOKEEPER_CONSUMER_PATH)
+ "/" + props.getProperty(Config.KAFKA_CONSUMER_ID) + "/"
+ props.getProperty(Config.KAFKA_TOPIC)
+ "/processed/" + "partition_"+ partition;
}

public static JavaPairDStream<Integer, Iterable> getPartitionOffset(JavaDStream unionStreams) {

// JavaPairDStream<Integer, Long> partitonOffsetStream = unionStreams.mapPartitionsToPair
// (new PairFlatMapFunction<Iterator, Integer, Long>() {
// @OverRide
// public Iterable<Tuple2<Integer, Long>> call(Iterator entry) throws Exception {
// MessageAndMetadata mmeta = null;
// List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
// while(entry.hasNext()) {
// mmeta = entry.next();
// }
// if(mmeta != null) {
// l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
// }
// return l;
// }
// });
JavaPairDStream<Integer, Long> partitonOffsetStream = unionStreams.mapPartitionsToPair
(new PairFlatMapFunction<Iterator, Integer, Long>() {

      @Override
      public Iterator<Tuple2<Integer, Long>> call(Iterator<MessageAndMetadata> entry) throws Exception {
        MessageAndMetadata mmeta = null;
        List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
        while(entry.hasNext()) {
          mmeta = entry.next();
        }
        if(mmeta != null) {
          l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
        }
        return l.iterator();
      }
    });
JavaPairDStream<Integer, Iterable<Long>> partitonOffset = partitonOffsetStream.groupByKey(1);
return partitonOffset;

}

@SuppressWarnings("deprecation")
public static void persists(JavaPairDStream<Integer, Iterable> partitonOffset, Properties props) {
//spark 2.0.0
partitonOffset.foreachRDD(new VoidFunction<JavaPairRDD<Integer, Iterable>>() {
@OverRide
public void call(JavaPairRDD<Integer, Iterable> po) throws Exception {
List<Tuple2<Integer, Iterable>> poList = po.collect();
System.out.println("ProcessedOffsetManager persist list size = " + poList.size());
Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
for(Tuple2<Integer, Iterable> tuple : poList) {
int partition = tuple._1();
Long offset = getMaximum(tuple._2());
partitionOffsetMap.put(partition, offset);
}
persistProcessedOffsets(props, partitionOffsetMap);
}

  public <T extends Comparable<T>> T getMaximum(Iterable<T> values) {
    T max = null;
    for (T value : values) {
      if (max == null || max.compareTo(value) < 0) {
        max = value;
      }
    }
    return max;
  }
});

//spark 1.6.0

// partitonOffset.foreachRDD(new Function<JavaPairRDD<Integer,Iterable>, Void>() {
// @OverRide
// public Void call(JavaPairRDD<Integer, Iterable> po) throws Exception {
// List<Tuple2<Integer, Iterable>> poList = po.collect();
// Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
// for(Tuple2<Integer, Iterable> tuple : poList) {
// int partition = tuple._1();
// Long offset = getMaximum(tuple._2());
// partitionOffsetMap.put(partition, offset);
// }
// persistProcessedOffsets(props, partitionOffsetMap);
// return null;
// }
// public <T extends Comparable> T getMaximum(Iterable values) {
// T max = null;
// for (T value : values) {
// if (max == null || max.compareTo(value) < 0) {
// max = value;
// }
// }
// return max;
// }
// });
}

public static DStream<Tuple2<Integer, Iterable>> getPartitionOffset(DStream unionStreams) {
ClassTag messageMetaClassTag =
ScalaUtil.getClassTag(MessageAndMetadata.class);
JavaDStream javaDStream =
new JavaDStream(unionStreams, messageMetaClassTag);
// JavaPairDStream<Integer, Long> partitonOffsetStream = javaDStream.mapPartitionsToPair
// (new PairFlatMapFunction<Iterator, Integer, Long>() {
// @OverRide
// public Iterable<Tuple2<Integer, Long>> call(Iterator entry)
// throws Exception {
// MessageAndMetadata mmeta = null;
// List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
// while(entry.hasNext()) {
// mmeta = entry.next();
// }
// if(mmeta != null) {
// l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
// }
// return l;
// }
// });
JavaPairDStream<Integer, Long> partitonOffsetStream = javaDStream.mapPartitionsToPair
(new PairFlatMapFunction<Iterator, Integer, Long>() {
@OverRide
public Iterator<Tuple2<Integer, Long>> call(Iterator entry) throws Exception {
MessageAndMetadata mmeta = null;
List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
while(entry.hasNext()) {
mmeta = entry.next();
}
if(mmeta != null) {
l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
}
return l.iterator();
}
});
JavaPairDStream<Integer, Iterable> partitonOffset = partitonOffsetStream.groupByKey(1);
return partitonOffset.dstream();
}

@SuppressWarnings("deprecation")
public static void persists(DStream<Tuple2<Integer, Iterable>> partitonOffset, Properties props) {
ClassTag<Tuple2<Integer, Iterable>> tuple2ClassTag =
ScalaUtil.<Integer, Iterable>getTuple2ClassTag();
JavaDStream<Tuple2<Integer, Iterable>> jpartitonOffset =
new JavaDStream<Tuple2<Integer, Iterable>>(partitonOffset, tuple2ClassTag);

//spark 2.0.0
jpartitonOffset.foreachRDD(new VoidFunction<JavaRDD<Tuple2<Integer, Iterable<Long>>>>() {
  @Override
  public void call(JavaRDD<Tuple2<Integer, Iterable<Long>>> po) throws Exception {
    List<Tuple2<Integer, Iterable<Long>>> poList = po.collect();
    Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
    for(Tuple2<Integer, Iterable<Long>> tuple : poList) {
      int partition = tuple._1();
      Long offset = getMaximum(tuple._2());
      partitionOffsetMap.put(partition, offset);
    }
    persistProcessedOffsets(props, partitionOffsetMap);
  }
  public <T extends Comparable<T>> T getMaximum(Iterable<T> values) {
    T max = null;
    for (T value : values) {
      if (max == null || max.compareTo(value) < 0) {
        max = value;
      }
    }
    return max;
  }
});

//spark 1.6.0

// jpartitonOffset.foreachRDD(new Function<JavaRDD<Tuple2<Integer, Iterable>>, Void>() {
// @OverRide
// public Void call(JavaRDD<Tuple2<Integer, Iterable>> po) throws Exception {
// List<Tuple2<Integer, Iterable>> poList = po.collect();
// Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
// for(Tuple2<Integer, Iterable> tuple : poList) {
// int partition = tuple._1();
// Long offset = getMaximum(tuple._2());
// partitionOffsetMap.put(partition, offset);
// }
// persistProcessedOffsets(props, partitionOffsetMap);
// return null;
// }
// public <T extends Comparable> T getMaximum(Iterable values) {
// T max = null;
// for (T value : values) {
// if (max == null || max.compareTo(value) < 0) {
// max = value;
// }
// }
// return max;
// }
// });
}
}
`

And then I build the project as a new jar. After that I run the "SampleConsumer.java" with my settings. It works! But the spark has many failed jobs caused by "java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver". So that it receives 0 data.

Error Logs:

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.io.IOException: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) ... 19 more ERROR: org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 19.0 failed 4 times; aborting job ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 141, 10.100.3.90): java.io.IOException: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Why appear this exception information?

kafka-spark-consumer. v1.0.10

INFO ZooKeeper: Session: 0x35d4c588af300df closed
INFO ClientCnxn: EventThread shut down
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at consumer.kafka.KafkaConsumer.run(KafkaConsumer.java:123)
at java.lang.Thread.run(Thread.java:745)
WARN NettyRpcEndpointRef: Error sending message [message = DeregisterReceiver(2,Stopped by driver,)] in 1 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)

Kafka apache spark Streaming recive 0

i have lunched my code below

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

import com.google.common.collect.Lists;

public final class JavaKafkaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    private JavaKafkaWordCount() {
    }

    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount")
                .set("spark.master", "local[4]");
        // Create the context with a 1 second batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                new Duration(2000));

        // int numThreads = Integer.parseInt(args[3]);
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        // String[] topics = args[2].split(",");
        // for (String topic : topics) {
        topicMap.put("page_visits", 1);
        // }

        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
                .createStream(jssc, "127.0.0.1", "0", topicMap);

        JavaDStream<String> lines = messages
                .map(new Function<Tuple2<String, String>, String>() {
                    @Override
                    public String call(Tuple2<String, String> tuple2) {
                        return tuple2._2();
                    }
                });

        JavaDStream<String> words = lines
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public Iterable<String> call(String x) {
                        return Lists.newArrayList(SPACE.split(x));
                    }
                });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

but no data reciver the out put is only

-------------------------------------------
Time: 1434192136000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192138000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192140000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192142000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192144000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192146000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192148000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192150000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192152000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192154000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192156000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192158000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192160000 ms
-------------------------------------------

-------------------------------------------
Time: 1434192162000 ms
-------------------------------------------

any solution please !

Some jobs remain in processing state forever

Hi Dibbhatt,

My application is running normally except that the UI shows some jobs remain in processing state forever.

Following is the screenshot of the same:

screenshot from 2015-09-22 17 55 25

Regards,
Sorabh

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.