Git Product home page Git Product logo

Comments (59)

lw-lin avatar lw-lin commented on August 15, 2024 2

@romantic123

是的,如果整个 app 只有 1 个 receiver,那数据就会只收到 1 个或 2 个 Executor 上(看数据冗余是配了总共 1 份还是 2 份),数据会倾斜很多。

这种情况下一般是起多个 receiver,分别消费数据的不同部分,而且 Spark Streaming 会保证 receiver 均分到不同的 Executor 上,这样数据就可以均分到不同的 Executor 上了,同时计算时又会有很好的 data locality。

from coolplayspark.

wangwenting avatar wangwenting commented on August 15, 2024 2

Accumulators and Broadcast Variables
Accumulators and Broadcast variables cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulators or Broadcast variables as well, you’ll have to create lazily instantiated singleton instances for Accumulators and Broadcast variables so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.

when I use lazy --no problem
def getWordBlacklist(sparkContext):
if ('wordBlacklist' not in globals()):
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
return globals()['wordBlacklist']

when I use bc = sc.broadcast(["a", "b", "c"]) in main func
and use in foreachRDD has error

File "/home/bfd_hz/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 39, in _from_id
raise Exception("Broadcast variable '%s' not loaded!" % bid)
Exception: (Exception("Broadcast variable '0' not loaded!",)

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024 2

@duanjianmin

是的,你的理解是正确的。对于有状态的,需要顺序执行。对于无状态的,可以并行执行。是由参数 spark.streaming.concurrentJobs 控制的。进一步可以看这个系列的《2.1 JobScheduler, Job, JobSet 详解》,里面有详细的解释。

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024 1

@bjkonglu

是提供一个 Receiver 的具体类实现(系统内置的,或用户自定义的)、并提供并行度 x,然后 Spark 会在 x 个 executor 上启动 x 个Receiver。

from coolplayspark.

fengshenwu avatar fengshenwu commented on August 15, 2024

Spark Streaming 的长时容错特性,能够提供不重、不丢,exactly-once 的处理语义。
不丢 可以通过记录标签及wal做到,怎么做到不重及exactly-once呢,此点本文没有说。

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@lwwcl1314

考虑三点:

  1. 给定 HDFS 上的输入文件 f,那么 MapReduce 无论失败几次、重做几次,最终的结果 r 是一致的;
  2. Spark Streaming 就是保证了每条源头数据都唯一的划分到一个块数据里,每个块数据都唯一的划分到一个 batch 里;
  3. 然后对于一个 batch,失败几次就对着源头数据再重做几次(就像 MapReduce 对着 f 多次重做一样),就可以保证本 batch 的结果与本 batch 的源头数据是一一对应的。

不重复,是因为每条源头数据都唯一划分一个 batch 里;不丢 + 不重复,就等于 exactly-once 了。

Storm 的 Trident 层,也是通过类似划分 batch 的方式做到的 exactly-once —— 不过那个实现有点不太自然。。。

from coolplayspark.

fengshenwu avatar fengshenwu commented on August 15, 2024

我感觉 http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ 这个文件讲的还是很好的,呵呵。

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

cloudera 的这篇文章是针对 Apache Kafka 的情况作了具体的讲解,也推荐大家都看看。

赞 @lwwcl1314 ~

from coolplayspark.

jacksu avatar jacksu commented on August 15, 2024

里面的图是什么画得呢?

from coolplayspark.

luphappy avatar luphappy commented on August 15, 2024

我用spark streaming,但时间长了之后,经常会自己就退出了,是为什么呢

from coolplayspark.

jacksu avatar jacksu commented on August 15, 2024

https://github.com/jacksu/utils4s/tree/master/sparkstreaming-demo

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@jacksu
基本都是用 Visio 画的,:-)

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@luphappy
这个需要具体看一下 driver 端打印的日志,退出一般是报 Exception 了,可以根据具体的 Exception 来看;有可能还需要到报错的 executor 端去看日志。

可否贴一下报错信息?

from coolplayspark.

winwill2012 avatar winwill2012 commented on August 15, 2024

大神,你这一系列文章可以转载吗?转载会附上原文链接与作者名。

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@winwill2012
OK 的。附上原文链接与作者名,然后把链接 post 到这里就可以了。

from coolplayspark.

zwzm85 avatar zwzm85 commented on August 15, 2024

想问一下,热备和冷备都是针对块数据来的,那些还没成为块数据的缓存中的细小数据怎么处理?

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@zwzm85
思考的很细致,赞。确实这里是有些问题的,这些细小数据就没有保障了;最主要的原因还是在于上游不能支持重放。

from coolplayspark.

luphappy avatar luphappy commented on August 15, 2024

您好,请问,我的spark-streaming程序以yarn-client的方式运行了一段时间后,就退出了,但driver还在,yarn日志如下:
15/09/19 14:32:34 ERROR util.Utils: Uncaught exception in thread Thread-1
org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken from appattempt_1437371132890_10529_000002
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:94)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy18.finishApplicationMaster(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.unregisterApplicationMaster(AMRMClientImpl.java:378)
at org.apache.spark.deploy.yarn.YarnRMClient.unregister(YarnRMClient.scala:86)
at org.apache.spark.deploy.yarn.ApplicationMaster.unregister(ApplicationMaster.scala:184)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:123)
at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2308)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2278)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2278)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2278)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2278)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2278)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2278)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2278)
at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2260)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1437371132890_10529_000002
at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy17.finishApplicationMaster(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:91)
... 23 more

这个是什么原因造成的呢

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@luphappy

你好,请问你的 Yarn 是什么版本?有可能跟这个有关:
YARN-3103: AMRMClientImpl does not update AMRM token properly

另外,用 yarn cluster 方式部署试试看;通常 Streaming 的程序不会部署为 yarn client。

from coolplayspark.

pzz2011 avatar pzz2011 commented on August 15, 2024

问个Spark-Core的问题,只是我自己一直没搞明白。看源码的时候生成FinalStage的时候,那个ActiveJob是啥? 话说一个Stage应该就是对应一个job吧,然后一个Stage可以含有多个RDD,但是对于FinalStage而言,应该就只有一个job吧。
而非FinalStage可以对应多个RDD,那么非FinalStage是不是可以对应多个job呢?
求解... 看源码看晕了T_T~~~~

from coolplayspark.

pzz2011 avatar pzz2011 commented on August 15, 2024

@lw-lin

from coolplayspark.

luckuan avatar luckuan commented on August 15, 2024

对于job如何提交的,如何运行的,你可以看看这篇博客 http://www.cnblogs.com/luckuan/p/5250258.html
一个job可以生成多个stage,每个stage中间会有一个shuffle过程。@pzz2011

from coolplayspark.

pzz2011 avatar pzz2011 commented on August 15, 2024

@luckuan @lw-lin
1.5.0 version
spark-1.5.0\spark-1.5.0\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala

private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]

// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]


private[scheduler] val activeJobs = new HashSet[ActiveJob]

感觉还是这几个Collection没搞懂jobIdToActiveJob shuffleToMapStage stageIdToStage jobIdToStageIds 没搞懂它们和Stage啊 job啊task的关系....

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@pzz2011 可以看看这篇文章 [Spark源码剖析] DAGScheduler划分stage http://www.jianshu.com/p/50c5c1032206

from coolplayspark.

wangwenting avatar wangwenting commented on August 15, 2024

我想问下WAL 默认配置是false的, 这个你们目前线上是开启的吗? 如果在保存WAL的时候程序强制退出,做了checkpoint ,这个没有保存在WAL的数据是不是丢失了。

在保存数据的时候exactly-once 这个怎么做到,比如写到非事务的存储里,写了一半挂掉了。

还有个问题,对于输出数据到hdfs中,如何做到一个小时对应partition个文件。 设置 batchDuration 为一个小时,感觉这种做法不靠谱, 能够append 到之前的文件这种做法吗?

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@wangwenting

(1.a) 我们线上还是分应用,对于一点不能丢的都是从可重放的数据源读数据(如 HDFS,Kafka)等,可以丢一点的就开 WAL 或 RAM_2 双机热备;
(1.b) 是的,不可重放的数据源不论是 WAL 还是 RAM_2 双机热备都有丢数据的风险,保险的还是靠可重
放数据源。

(2) exactly-once 主要是指 Streaming 内部的处理,end-to-end 的 exactly-once 还需要上游数据源支持(可重放)、下游数据接收支持(事务)。如果写到非事务的存储里,则保证 at-lease-once;其实多加一列 batch-id,其实也能保证 exactly-once,比如 batch-id =10, count = 2000 这样的输出多写几次,最终取出来还是 count = 2000 这样一条。

(3) 这个需求还不好直接支持,可以尝试间接:(a) 自己起一个合并任务,每小时把 p × n 个文件合并为 p 个;或者 (b) 自己写个输出方式为 append 到已有文件,只不过这种要注意设置 concurrentJobs = 1,以防同一个小时内多个 batch 同时 append 到同一个 partition 对应的文件。

希望有帮助!

from coolplayspark.

wangwenting avatar wangwenting commented on August 15, 2024

非常感谢,很有帮助,前期准备就用kafka direct 方式读,在线的就foreachRDD 的方式写入mysql
离线的就kafka direct 方式读 写 hdfs中,spark目前好像不支持小文件combine, 只能自己合并了。

from coolplayspark.

wangwenting avatar wangwenting commented on August 15, 2024

在streaming checkpoing 的情况下用broadcast 会出现raise Exception("Broadcast variable '%s' not loaded!" % bid) 不知道你可预见过

from coolplayspark.

romantic123 avatar romantic123 commented on August 15, 2024

请问: 一个inputStream对应一个Receiver对吧,一个Receiver会分配到一个Executor上,那么这个Receiver接收到的数据都会放在这个Executor上吧,这样会不会造成数据倾斜呢?

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@wangwenting

没有遇到过。以下代码是 checkpoint + broadcast,起停了几次都是正常运行的,请参考:……

Update: 代码已删除,原来的代码在 local 运行有效、但放到 Yarn 上跑确实有问题。

正确的代码请见 Streaming 官方 Programming Guide

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@wangwenting

之前贴的代码确实有问题,从你上面的解释里也学习到了 accumulators & broadcast 在 Streaming 里的正确用法,感谢感谢!

from coolplayspark.

luphappy avatar luphappy commented on August 15, 2024

@lw-lin 2.6.0,应该是那个原因

from coolplayspark.

luphappy avatar luphappy commented on August 15, 2024

@lw-lin 您好,我想问一下,以yarn-cluster运行的程序,如果需要更新,重启,要怎么做呢

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@luphappy
2.6.1 应该是可以修复了。
你是说如何停止 yarn-cluster 下的 Streaming 程序对吧。

image

from coolplayspark.

huduan-D avatar huduan-D commented on August 15, 2024

@lw-lin 您好,非常感谢您的分享,写得特别棒!赞赞赞
我有一段没看明白:
“(2) 要求 DStreamGraph 复制出一套新的 RDD DAG 的实例,具体过程是:DStreamGraph 将要求图里的尾 DStream 节点生成具体的 RDD 实例,并递归的调用尾 DStream 的上游 DStream 节点……以此遍历整个 DStreamGraph,遍历结束也就正好生成了 RDD DAG 的实例;”
DAG不是已经静态生成了吗?即DStreamGraph从头到尾的操作已经固定了,为什么还要如此从尾部遍历呢?

from coolplayspark.

spanf avatar spanf commented on August 15, 2024

你好,我也遇到一个类似的问题,但是我的hadoop版本是2.7.2,YARN-3103这个bug也是修复了的,但是还是运行一个星期后报了下面这个错误,麻烦你看看这里还可能会有什么错误呢....

WARN 16/08/18 00:10:19 Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1470817219719_0004_000001
WARN 16/08/18 00:10:19 ApplicationMaster: Reporter thread fails 2 time(s) in a row.
org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken from appattempt_1470817219719_0004_000001
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy16.allocate(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:278)
at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:225)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:368)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1470817219719_0004_000001
at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy15.allocate(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
... 9 more

from coolplayspark.

teeyog avatar teeyog commented on August 15, 2024

第一次看源码解析完全看进去了,如果有spark core的源码解析就更完美了!

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@SOBIGUFO

Glad it helped! :-)

from coolplayspark.

Nicksxs avatar Nicksxs commented on August 15, 2024

@lw-lin 上面@wangweting 提到的那个问题还是不太懂怎么使用lazy,能具体解释下吗,感谢

from coolplayspark.

ly13131wyq avatar ly13131wyq commented on August 15, 2024

Greet!

from coolplayspark.

jinchenga avatar jinchenga commented on August 15, 2024

我刚开始研究Spark Streaming,这一节看到每个DStream instance维护着每个batch的指针,我想还是用引用比较好?毕竟JAVA类语言没有了指针的概念?�不知道对不对啊?

from coolplayspark.

bjkonglu avatar bjkonglu commented on August 15, 2024

@lw-lin 感谢你的分享,写的非常好。我很是受益,但是文章有一段没看明白,请大神赐教。

具体的,Spark Streaming 在程序刚开始运行时:
(1) 由 Receiver 的总指挥 ReceiverTracker 分发多个 job(每个 job 有 1 个 task),到多个 executor 上分别启动 ReceiverSupervisor 实例;
(2) 每个 ReceiverSupervisor 启动后将马上生成一个用户提供的 Receiver 实现的实例 —— 该 Receiver 实现可以持续产生或者持续接收系统外数据,比如 TwitterReceiver 可以实时爬取 twitter 数据 —— 并在 Receiver 实例生成后调用 Receiver.onStart();

上述描述的意思是下面哪个:

  1. 用户在业务代码里面创建一个Receiver实例,而Spark集群上会在多个executor上启动多个Receiver;
  2. 用户在业务代码里面创建多个Receiver实例,Spark集群会在多个executor上启动多个Receiver;

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@jinchenga 是的,我来修改下,还是用“引用”比较好。

from coolplayspark.

duanjianmin avatar duanjianmin commented on August 15, 2024

有个问题请教一下:
SparkStreaming 会把连续不断的 streaming data 进行多次切片,就会形成多个 batch
然后提交各个batch,进而由spark core 去执行。
问题:这多个batch可以并行处理吗,还是等一个batch执行完毕才会执行下一个?
我自己感觉是这两种方式都支持。如:updateStateBykey这种调用是有状态的,所以batch的执行应该是顺序执行。但是如果只是单纯的统计每个batch中的wordcount,那么就可以并行执行多个batch,提高效率。

from coolplayspark.

duanjianmin avatar duanjianmin commented on August 15, 2024

@lw-lin 谢谢你的回复。另外一个问题:
对于Spark Streaming , 我的理解是他的优点主要是

  1. 可以将Streaming data 分片
  2. 使用Spark core的API

但是对于通常的架构,服务器开启一个consumer,一条一条的从MQ中读取数据然后处理,个人感觉一条一条的数据也不会很大,因此不用“Spark core的API”性能也不会有多大影响。因为流处理其实每一个分片都不会大,所以很大程度Spark streaming是不是不适用(取而代之直接用一条一条的接受数据然后处理,这样也不会引入Spark依赖)?我想问一下Spark streaming 到底适用的场景是什么呢 ,想对于通常的架构有什么好处?

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@duanjianmin

对于一般的数据量,区别不大。但是对于大数据量,batch processing 的吞吐量就比 one record at a time 的流式吞吐量大很多了,也就是节省很多机器,比如从 50 台节省到 30 台的话,就有 40% 的节约。

另外 Spark Core 的好处是自带节点故障恢复、stage/task 失败重做、以及推测执行。这些都是不间断处理、快速处理的保障。即使不用 Spark Core,也要有好的办法应对这些问题才行。

from coolplayspark.

duanjianmin avatar duanjianmin commented on August 15, 2024

@lw-lin Thanks .

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@duanjianmin

可以加一下讨论群:https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88

from coolplayspark.

bjkonglu avatar bjkonglu commented on August 15, 2024

@lw-linJob动态生成这个模块中,获取ReceiverTracker分配给本batch的源头数据的meta信息,图中是箭头是从JobGenerator指向ReceiverTracker,但我觉得这个过程应该是获取的过程,图中的箭头应该从ReceiverTracker到JobGenerator,不知道对不对!
此外,在executor长时间容错模块中,有个笔误“关联是保障收到的块数据的安全”,应该是“关键是保障收到的块数据的安全”吧

from coolplayspark.

joe2016 avatar joe2016 commented on August 15, 2024

@spanf 请问你找到原因了吗,我在2.7.5上面也会出现这个异常,每隔一天就出现。能分享一下你怎么解决的吗?

from coolplayspark.

xiaopailang avatar xiaopailang commented on August 15, 2024

@lw-lin 非常感谢大神的辛苦付出,文章写得非常细致,读来受益匪浅,这边遇到一个疑惑,想跟大神请教下:
一个batch里面有多个job,里面每个job完成都会进行一次checkpoint吗? 还是说一个batch的全部job完成了才进行checkpoint,找了下源码,在JobScheduler的handleJobCompletion方法里面好像也没找到jobComplete之后的checkpoint操作

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@xiaopailang

  • DoCheckpoint 在 src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala 里;
  • 里面每个 job 完成都会进行一次 checkpoint 吗?不是的,是在 batch 结束时,才做一次 checkpoint。

from coolplayspark.

xiaopailang avatar xiaopailang commented on August 15, 2024

@lw-lin

  1. 那其实batch里面的job状态跟checkpoint信息不同步,如果从checkpoint恢复的话,会存在重复执行job的情况
  2. 如果是上述的这种情况,那自己管理offset,在每个batch后面存储offset来恢复会不会好一点,这种情况还没有checkpoint jar包不能升级的限制

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@xiaopailang 业务逻辑保证写出结果是幂等就 OK。

from coolplayspark.

xiaopailang avatar xiaopailang commented on August 15, 2024

@lw-lin 明白了,感谢解惑!

from coolplayspark.

zhangjin8813 avatar zhangjin8813 commented on August 15, 2024

你好!我想问一下,我再学习的时候对于有些概念有点不清楚。比如数据流、块、block、task、job他们之间是什么样的关系呢

from coolplayspark.

JustTobe avatar JustTobe commented on August 15, 2024

请问题主?Batch Interval的大小设置对批次处理时延的影响有多大

from coolplayspark.

JustTobe avatar JustTobe commented on August 15, 2024

@lw-lin

from coolplayspark.

LGDSuiBianDa avatar LGDSuiBianDa commented on August 15, 2024

@lw-lin
您好,关于代码升级有些疑惑
看了官方文档 Upgrading Application Code中的说明
1.一种方式是升级后的streaming应用将与现有应用并行启动和运行,升级的稳定后,停掉现有的;
2.另一种方式停掉现有应用,启动升级后的应用,但需要修改checkpoint目录或者清空checkpoint目录。
我当前的场景是streaming中涉及state管理,升级代码后既要保证从上次结束时的offset开始处理,还要保证state状态不丢,目前看来用第一种方式同时启动两个应用可以实现,但需要两倍的集群资源,想请教下还有其他更好的方法吗

from coolplayspark.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.