Git Product home page Git Product logo

spark-pubsub's People

Contributors

bmahe avatar guyfig avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-pubsub's Issues

Job crashes with py4j.Py4JException: Cannot obtain a new communication channel

I am trying to get this library and not having any luck... Appreciate any help!

here is the code:

from pyspark.streaming import StreamingContext
from signifai.pubsub import PubsubUtils
from pyspark import SparkContext


SUBSCRIPTION = "projects/bigdata220-final-project/subscriptions/out_meetup_rsvp"

sc =SparkContext()
ssc = StreamingContext(sc, 1)
pubsubStream = PubsubUtils.createStream(ssc, SUBSCRIPTION, 5, False)
pubsubStream.flatMap(lambda x: x).pprint()
ssc.start()

here are the logs from GCP:

18/03/11 05:00:38 INFO org.spark_project.jetty.util.log: Logging initialized @2825ms
18/03/11 05:00:38 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT
18/03/11 05:00:38 INFO org.spark_project.jetty.server.Server: Started @2951ms
18/03/11 05:00:39 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@5625b833{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/03/11 05:00:39 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.3-hadoop2
18/03/11 05:00:40 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at cluster-main-m/10.128.0.5:8032
18/03/11 05:00:43 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1519879216511_0007
18/03/11 05:00:52 WARN org.apache.spark.streaming.StreamingContext: Dynamic Allocation is enabled for this application. Enabling Dynamic allocation for Spark Streaming applications can cause data loss if Write Ahead Log is not enabled for non-replayable sources like Flume. See the programming guide for details on how to enable the Write Ahead Log.

[Stage 0:>                                                         (0 + 0) / 50]
[Stage 0:>                                                         (0 + 1) / 50]
[Stage 0:=>                                                        (1 + 1) / 50]
[Stage 0:=====>                                                    (5 + 1) / 50]
[Stage 0:===========>                                             (10 + 1) / 50]
[Stage 0:=================>                                       (15 + 1) / 50]
[Stage 0:==========================>                              (23 + 1) / 50]
[Stage 0:===================================>                     (31 + 1) / 50]
[Stage 0:============================================>            (39 + 1) / 50]
[Stage 0:=======================================================> (49 + 1) / 50]
[Stage 1:================================================>        (17 + 1) / 20]
                                                                                
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Slide time = 1000 ms
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Storage level = Serialized 1x Replicated
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Checkpoint interval = null
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Remember interval = 1000 ms
18/03/11 05:00:59 INFO io.signifai.pubsub_spark.receiver.PubsubInputDStream: Initialized and validated io.signifai.pubsub_spark.receiver.PubsubInputDStream@395a0746
18/03/11 05:01:00 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error generating jobs for time 1520744460000 ms
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
	at com.sun.proxy.$Proxy27.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 ERROR org.apache.spark.streaming.api.python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
	at com.sun.proxy.$Proxy27.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 ERROR org.apache.spark.streaming.api.python.PythonDStream$$anon$1: Cannot connect to Python process. It's probably dead. Stopping StreamingContext.
py4j.Py4JException: Cannot obtain a new communication channel
	at py4j.CallbackClient.sendCommand(CallbackClient.java:340)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:316)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:103)
	at com.sun.proxy.$Proxy27.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
	at scala.Option.orElse(Option.scala:289)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/03/11 05:01:00 WARN org.apache.spark.streaming.scheduler.ReceiverTracker: Not all of the receivers have deregistered, ArrayBuffer(0)
18/03/11 05:01:00 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
18/03/11 05:01:00 WARN org.apache.spark.streaming.StreamingContext: StreamingContext has already been stopped
18/03/11 05:01:00 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@5625b833{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}

Could not resolve dependencies for project

I am trying to get this library and not having any luck... Appreciate any help!
When executing the mvn clean install command inside the java directory, I am receiving this error below:

Captura de Tela 2020-12-23 aฬ€s 12 05 11

404 Resource not found (resource=pyspark_subscription).

Hi, I'm trying to use your library for creating a pyspark streaming pipeline using pub/sub.
I've followed your guide and wrote this code:

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = \
        os.path.join(os.curdir, "config", "google", "sa-pubsub.json")

config = SparkConf()\
        .setAppName("streaming_pipeline") \
        .setMaster("local[1]") \
        .set("spark.jars", "jars/spark_pubsub-1.1-SNAPSHOT.jar")

spark_context = SparkContext(conf=config)

spark_context.setLogLevel("ERROR")

streaming_context = StreamingContext(spark_context, 1)
streaming_context.checkpoint("streaming_checkpoints")

socket = PubsubUtils.createStream(
        streaming_context,
        "projects/project_id/subscriptions/pyspark_subscription",
        5,
        False
    )

messages = socket \
        .flatMap(transform.flat_map) \
        .map(transform.map_function) \
        .updateStateByKey(updateFunc, initialRDD=initialStateRDD)

messages.pprint()

streaming_context.start()
streaming_context.awaitTermination()

But I receive this error:

Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error while fetching messages from pubsub for subscription projects/project_id/subscriptions/pyspark_subscription - com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Resource not found (resource=pyspark_subscription).",
    "reason" : "notFound"
  } ],
  "message" : "Resource not found (resource=pyspark_subscription).",
  "status" : "NOT_FOUND"
}

Can you help me?
Thank you

Update doc

  • General cleanup
  • Reference google doc and Apache Spark doc
  • Explain why this project is needed

Can't read binary data

From Amir:

[12:30] because in line 46 we are casting the binary data into a string ๐Ÿ™‚
[12:30] So Iโ€™m not sure if we can pass the data as binary
[12:31] or we can just translate it again into base64 and decode it once in the python code
[12:31] another option would be to just decompress and open the msgpack in the java code

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.