signifai / spark-pubsub Goto Github PK
View Code? Open in Web Editor NEWGoogle Cloud Pubsub connector for Spark Streaming
License: Apache License 2.0
Google Cloud Pubsub connector for Spark Streaming
License: Apache License 2.0
I am trying to get this library and not having any luck... Appreciate any help!
here is the code:
from pyspark.streaming import StreamingContext
from signifai.pubsub import PubsubUtils
from pyspark import SparkContext
SUBSCRIPTION = "projects/bigdata220-final-project/subscriptions/out_meetup_rsvp"
sc =SparkContext()
ssc = StreamingContext(sc, 1)
pubsubStream = PubsubUtils.createStream(ssc, SUBSCRIPTION, 5, False)
pubsubStream.flatMap(lambda x: x).pprint()
ssc.start()
here are the logs from GCP:
18/03/11 05:00:38 INFO org.spark_project.jetty.util.log: Logging initialized @2825ms
18/03/11 05:00:38 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT
18/03/11 05:00:38 INFO org.spark_project.jetty.server.Server: Started @2951ms
18/03/11 05:00:39 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@5625b833{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/03/11 05:00:39 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.3-hadoop2
18/03/11 05:00:40 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at cluster-main-m/10.128.0.5:8032
18/03/11 05:00:43 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1519879216511_0007
18/03/11 05:00:52 WARN org.apache.spark.streaming.StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sources like Flume. See the programming guide for details on how to enable the Write Ahead Log.
[Stage 0:> (0 + 0) / 50]
[Stage 0:> (0 + 1) / 50]
[Stage 0:=> (1 + 1) / 50]
[Stage 0:=====> (5 + 1) / 50]
[Stage 0:===========> (10 + 1) / 50]
[Stage 0:=================> (15 + 1) / 50]
[Stage 0:==========================> (23 + 1) / 50]
[Stage 0:===================================> (31 + 1) / 50]
[Stage 0:============================================> (39 + 1) / 50]
[Stage 0:=======================================================> (49 + 1) / 50]
[Stage 1:================================================> (17 + 1) / 20]
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Slide time = 1000 ms
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Storage level = Serialized 1x Replicated
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Checkpoint interval = null
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Remember interval = 1000 ms
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Initialized and validated io.signifai.pubsub_spark.receiver.PubsubInputDStream@395a0746
18/03/11 05:01:00 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error generating jobs for time 1520744460000 ms
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
at com.sun.proxy.$Proxy27.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 ERROR org.apache.spark.streaming.api.python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
at com.sun.proxy.$Proxy27.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 ERROR org.apache.spark.streaming.api.python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
at com.sun.proxy.$Proxy27.call(Unknown Source)
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 WARN org.apache.spark.streaming.scheduler.ReceiverTracker: Not all of the receivers have deregistered, ArrayBuffer(0)
18/03/11 05:01:00 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
18/03/11 05:01:00 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
18/03/11 05:01:00 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@5625b833{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
Hi, I'm trying to use your library for creating a pyspark streaming pipeline using pub/sub.
I've followed your guide and wrote this code:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = \
os.path.join(os.curdir, "config", "google", "sa-pubsub.json")
config = SparkConf()\
.setAppName("streaming_pipeline") \
.setMaster("local[1]") \
.set("spark.jars", "jars/spark_pubsub-1.1-SNAPSHOT.jar")
spark_context = SparkContext(conf=config)
spark_context.setLogLevel("ERROR")
streaming_context = StreamingContext(spark_context, 1)
streaming_context.checkpoint("streaming_checkpoints")
socket = PubsubUtils.createStream(
streaming_context,
"projects/project_id/subscriptions/pyspark_subscription",
5,
False
)
messages = socket \
.flatMap(transform.flat_map) \
.map(transform.map_function) \
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
messages.pprint()
streaming_context.start()
streaming_context.awaitTermination()
But I receive this error:
Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error while fetching messages from pubsub for subscription projects/project_id/subscriptions/pyspark_subscription - com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Resource not found (resource=pyspark_subscription).",
"reason" : "notFound"
} ],
"message" : "Resource not found (resource=pyspark_subscription).",
"status" : "NOT_FOUND"
}
Can you help me?
Thank you
From Amir:
[12:30] because in line 46 we are casting the binary data into a string ๐
[12:30] So Iโm not sure if we can pass the data as binary
[12:31] or we can just translate it again into base64 and decode it once in the python code
[12:31] another option would be to just decompress and open the msgpack in the java code
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.