Hello~ @dibbhatt
I used this framework to integrate Kafka and Spark Streaming. But I have some problems.
My cluster is based on Spark2.0.0 , Kafka 0.10.1.0 , ZK-3.4.9.
And I fixed some syntax errors in "ProcessedOffsetManager.java" because it may be some difference between spark1.6.0 and spark2.0.0.
`
package consumer.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;
import scala.Tuple2;
import scala.reflect.ClassTag;
import com.google.common.collect.ImmutableMap;
public class ProcessedOffsetManager {
private static void persistProcessedOffsets(Properties props, Map<Integer, Long> partitionOffsetMap) {
ZkState state = new ZkState(props.getProperty(Config.ZOOKEEPER_CONSUMER_CONNECTION));
for(Map.Entry<Integer, Long> po : partitionOffsetMap.entrySet()) {
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap
.builder()
.put("consumer",ImmutableMap.of("id",props.getProperty(Config.KAFKA_CONSUMER_ID)))
.put("offset", po.getValue())
.put("partition",po.getKey())
.put("broker",ImmutableMap.of("host", "", "port", ""))
.put("topic", props.getProperty(Config.KAFKA_TOPIC)).build();
String path = processedPath(po.getKey(), props);
try{
state.writeJSON(path, data);
}catch (Exception ex) {
state.close();
throw ex;
}
}
state.close();
}
private static String processedPath(int partition, Properties props) {
return props.getProperty(Config.ZOOKEEPER_CONSUMER_PATH)
+ "/" + props.getProperty(Config.KAFKA_CONSUMER_ID) + "/"
+ props.getProperty(Config.KAFKA_TOPIC)
+ "/processed/" + "partition_"+ partition;
}
public static JavaPairDStream<Integer, Iterable> getPartitionOffset(JavaDStream unionStreams) {
// JavaPairDStream<Integer, Long> partitonOffsetStream = unionStreams.mapPartitionsToPair
// (new PairFlatMapFunction<Iterator, Integer, Long>() {
// @OverRide
// public Iterable<Tuple2<Integer, Long>> call(Iterator entry) throws Exception {
// MessageAndMetadata mmeta = null;
// List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
// while(entry.hasNext()) {
// mmeta = entry.next();
// }
// if(mmeta != null) {
// l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
// }
// return l;
// }
// });
JavaPairDStream<Integer, Long> partitonOffsetStream = unionStreams.mapPartitionsToPair
(new PairFlatMapFunction<Iterator, Integer, Long>() {
@Override
public Iterator<Tuple2<Integer, Long>> call(Iterator<MessageAndMetadata> entry) throws Exception {
MessageAndMetadata mmeta = null;
List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
while(entry.hasNext()) {
mmeta = entry.next();
}
if(mmeta != null) {
l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
}
return l.iterator();
}
});
JavaPairDStream<Integer, Iterable<Long>> partitonOffset = partitonOffsetStream.groupByKey(1);
return partitonOffset;
}
@SuppressWarnings("deprecation")
public static void persists(JavaPairDStream<Integer, Iterable> partitonOffset, Properties props) {
//spark 2.0.0
partitonOffset.foreachRDD(new VoidFunction<JavaPairRDD<Integer, Iterable>>() {
@OverRide
public void call(JavaPairRDD<Integer, Iterable> po) throws Exception {
List<Tuple2<Integer, Iterable>> poList = po.collect();
System.out.println("ProcessedOffsetManager persist list size = " + poList.size());
Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
for(Tuple2<Integer, Iterable> tuple : poList) {
int partition = tuple._1();
Long offset = getMaximum(tuple._2());
partitionOffsetMap.put(partition, offset);
}
persistProcessedOffsets(props, partitionOffsetMap);
}
public <T extends Comparable<T>> T getMaximum(Iterable<T> values) {
T max = null;
for (T value : values) {
if (max == null || max.compareTo(value) < 0) {
max = value;
}
}
return max;
}
});
//spark 1.6.0
// partitonOffset.foreachRDD(new Function<JavaPairRDD<Integer,Iterable>, Void>() {
// @OverRide
// public Void call(JavaPairRDD<Integer, Iterable> po) throws Exception {
// List<Tuple2<Integer, Iterable>> poList = po.collect();
// Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
// for(Tuple2<Integer, Iterable> tuple : poList) {
// int partition = tuple._1();
// Long offset = getMaximum(tuple._2());
// partitionOffsetMap.put(partition, offset);
// }
// persistProcessedOffsets(props, partitionOffsetMap);
// return null;
// }
// public <T extends Comparable> T getMaximum(Iterable values) {
// T max = null;
// for (T value : values) {
// if (max == null || max.compareTo(value) < 0) {
// max = value;
// }
// }
// return max;
// }
// });
}
public static DStream<Tuple2<Integer, Iterable>> getPartitionOffset(DStream unionStreams) {
ClassTag messageMetaClassTag =
ScalaUtil.getClassTag(MessageAndMetadata.class);
JavaDStream javaDStream =
new JavaDStream(unionStreams, messageMetaClassTag);
// JavaPairDStream<Integer, Long> partitonOffsetStream = javaDStream.mapPartitionsToPair
// (new PairFlatMapFunction<Iterator, Integer, Long>() {
// @OverRide
// public Iterable<Tuple2<Integer, Long>> call(Iterator entry)
// throws Exception {
// MessageAndMetadata mmeta = null;
// List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
// while(entry.hasNext()) {
// mmeta = entry.next();
// }
// if(mmeta != null) {
// l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
// }
// return l;
// }
// });
JavaPairDStream<Integer, Long> partitonOffsetStream = javaDStream.mapPartitionsToPair
(new PairFlatMapFunction<Iterator, Integer, Long>() {
@OverRide
public Iterator<Tuple2<Integer, Long>> call(Iterator entry) throws Exception {
MessageAndMetadata mmeta = null;
List<Tuple2<Integer, Long>> l = new ArrayList<Tuple2<Integer, Long>>();
while(entry.hasNext()) {
mmeta = entry.next();
}
if(mmeta != null) {
l.add(new Tuple2<Integer, Long>(mmeta.getPartition().partition,mmeta.getOffset()));
}
return l.iterator();
}
});
JavaPairDStream<Integer, Iterable> partitonOffset = partitonOffsetStream.groupByKey(1);
return partitonOffset.dstream();
}
@SuppressWarnings("deprecation")
public static void persists(DStream<Tuple2<Integer, Iterable>> partitonOffset, Properties props) {
ClassTag<Tuple2<Integer, Iterable>> tuple2ClassTag =
ScalaUtil.<Integer, Iterable>getTuple2ClassTag();
JavaDStream<Tuple2<Integer, Iterable>> jpartitonOffset =
new JavaDStream<Tuple2<Integer, Iterable>>(partitonOffset, tuple2ClassTag);
//spark 2.0.0
jpartitonOffset.foreachRDD(new VoidFunction<JavaRDD<Tuple2<Integer, Iterable<Long>>>>() {
@Override
public void call(JavaRDD<Tuple2<Integer, Iterable<Long>>> po) throws Exception {
List<Tuple2<Integer, Iterable<Long>>> poList = po.collect();
Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
for(Tuple2<Integer, Iterable<Long>> tuple : poList) {
int partition = tuple._1();
Long offset = getMaximum(tuple._2());
partitionOffsetMap.put(partition, offset);
}
persistProcessedOffsets(props, partitionOffsetMap);
}
public <T extends Comparable<T>> T getMaximum(Iterable<T> values) {
T max = null;
for (T value : values) {
if (max == null || max.compareTo(value) < 0) {
max = value;
}
}
return max;
}
});
//spark 1.6.0
// jpartitonOffset.foreachRDD(new Function<JavaRDD<Tuple2<Integer, Iterable>>, Void>() {
// @OverRide
// public Void call(JavaRDD<Tuple2<Integer, Iterable>> po) throws Exception {
// List<Tuple2<Integer, Iterable>> poList = po.collect();
// Map<Integer, Long> partitionOffsetMap = new HashMap<Integer, Long>();
// for(Tuple2<Integer, Iterable> tuple : poList) {
// int partition = tuple._1();
// Long offset = getMaximum(tuple._2());
// partitionOffsetMap.put(partition, offset);
// }
// persistProcessedOffsets(props, partitionOffsetMap);
// return null;
// }
// public <T extends Comparable> T getMaximum(Iterable values) {
// T max = null;
// for (T value : values) {
// if (max == null || max.compareTo(value) < 0) {
// max = value;
// }
// }
// return max;
// }
// });
}
}
`
And then I build the project as a new jar. After that I run the "SampleConsumer.java" with my settings. It works! But the spark has many failed jobs caused by "java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver". So that it receives 0 data.
Error Logs:
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.io.IOException: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) ... 19 more ERROR: org.apache.spark.scheduler.TaskSetManager - Task 0 in stage 19.0 failed 4 times; aborting job ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Receiver has been stopped. Try to restart it. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 4 times, most recent failure: Lost task 0.3 in stage 19.0 (TID 141, 10.100.3.90): java.io.IOException: java.lang.ClassNotFoundException: consumer.kafka.client.KafkaReceiver at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)