Comments (17)
@tsface 我怀疑和 spark streaming 的清理机制有关。在 jobSet 完成和 checkpoint 的时候都会触发清理操作,这个时候可能会把需要用到的 blocks 删掉。http://www.jianshu.com/p/5e096df2618d 可能会给你一些启发,希望有帮助~
from coolplayspark.
block input-x-xxxxx not found
这个错误,就是数据已经找不到了:
- 在
MEMORY_ONLY
时,如果数据过多,内存中就会发生数据替换,被替换出的数据直接被丢掉,所以在后面计算时需要这部分数据的时候,就找不到了; - 在
MEMORY_AND_DISK
时,数据过多,替换出的数据会 flush 到硬盘上,所以比MEMORY_ONLY
时报错几率小很多;但 Executor 失效时(比如其它原因内存溢出后被 Yarn kill 掉、或网络太忙导致心跳发不出去被 driver 认为丢失了等),也会导致在 Memory 或在 Disk 上的数据没有了,虽然几率已经小很多了,但也还是会报这个错误的。
通常解决方法是几种:
- 如果确实配置的资源不够数据量的需求,那么酌情加大配置的内存或者 Executor 个数;
- 资源 OK 的前提下:
- 配置为
RAM_AND_DISK_SER_2
,这样会将数据保存到两个 Executor 上,就算一个 Executor 失效,还会在另一个 Executor 上有数据;但两个 Executor 同时失效的话,还是会报之前的错; - 启用 WAL;这样数据会同时保存到 Executor 和 HDFS 上,即使 Executor 失效,还是可以从 HDFS 上读出数据来;启用方法详见 这里的 "Configuring write ahead logs" 一节;
- 容忍数据部分丢失,那么在每个 batch 里都 try-batch 一下,这样即使一个 batch 的 job 失效,那么整个 Streaming 程序还会继续运行。
- 配置为
不过还是建议先看下 Executor 打出来的具体日志,看看需要加资源还是说能够容忍部分数据损失,再酌情选择解决方法。可在本帖后随时反馈;希望有帮助!
from coolplayspark.
上面提到的 try-catch 代码:
val inputDStream = ssc.fileStream("")
inputDStream.foreachRDD(rdd => {
try {
// do something
} catch {
case e => e.printStackTrace()
}
})
from coolplayspark.
@lw-lin
谢谢你的解答
try-catch的代码还没有试过,测试了下MEMORY_AND_DISK_2,性能比MEMORY_AND_DISK差很多,目前测试业务下数据处理性能差不多是这样的关系 :4 * MEMORY_AND_DISK_2 = MEMORY_AND_DISK 。
Executor被kill的原因,是Active Job队列里面任务开始积压,处理时延增加。Job的提交周期是1秒,由于CPU平均使用率到95%左右,Receiver接收速率不变,每个Job处理时延增加到了5到10s,目前Job的提交Interval能动态指定吗?
from coolplayspark.
现在有几个 receiver?几个 Executor、每个 Executor 几个 core?
block interval 是多大?batch interval(即 batch duration) 呢?每个 batch 处理多少 records?
from coolplayspark.
- receiver : 2
- Executor : 2
- Vcore: 每个机器64个VCore,全分配给Executor
每个Block差不多6M左右,batch duration: 1s, 每个batch处理的events没有注意,应该是12000多个吧。
测试了下try-catch,可以解决Executor被kill的情况👍
from coolplayspark.
@tsface
好的,try-catch 只是个应急手段,看起来还是建议调整下 block interval 和每个 executor 的 core 数~
from coolplayspark.
@lw-lin
关于这个问题我跟踪了下Driver和Executor端的debug日志,日志中有些问题暂时不明白,想请教下,下面信息是从Driver端Akka消息的角度整理的,完成的日志太大,不方便post
Driver:node4
Executors:node2,node3
- node2的上报数据更新请求:UpdateBlockInfo(input-0-1460509204597),处理成功
- node2发送AddBlock请求,处理成功,数据块存储在node2,块大小5.6MB
- sparkDriver端产生GetLocations消息,处理成功
- sparkDriver端发出RemoveBlock消息,此时node2上的BlockManager执行了remove操作
- node2发送UpdateBlockInfo消息,此时块的存储级别变为None
- sparkDriver端产生GetLocations,产生java.lang.Exception: Could not compute split, block input-0-1460509204597 not found (4次)(7~11的日志发生了两分钟后)
- node3上报更新请求:UpdateBlockInfo,存储块input-0-1460509204597到node3,块大小4.8MB
- node3产生AddBlock消息
- sparkDriver端产生GetLocations消息,处理成功
- sparkDriver端发出RemoveBlock消息,此时node3上的BlockManager执行了remove操作
- node3发送UpdateBlockInfo消息,Driver的blockid信息被删除
问题:
- 在4中,为什么sparkDriver的Akka端会产生RemoveBlock的消息,这个消息到底是怎么产生的?
- 在数据被删除的情况下JobScheduler将Tasks分发给node3,导致node3
getRemote
的时候找不到数据,这个在任务调度的时序上是怎么样的一个过程? - 既然数据已经被删除,为什么在node3上这个块数据又出现了,而且块的大小改变了?
谢谢!
from coolplayspark.
@tsface
收到。这个应用是跑在 Spark 1.? 的环境上的?Receiver 的 StorageLevel 是怎么设置的?整个 DAG 拓扑中有 window 操作吗?
from coolplayspark.
@lw-lin
spark 1.5.2
StorageLevel : MEMORY_AND_DISK
没有window操作
from coolplayspark.
@tsface 有没有调用过 StreamingContext#remember ?
from coolplayspark.
@keepsimplefocus
没有
from coolplayspark.
@tsface 请问,你的问题解决了吗?我也遇到了相同的问题
我的运行模式是 standalone client模式,同时在网上请教了一下,说是也有可能driver的内存设置的太小,导致gc时间太长,我的driver内存设置是默认的(应该是1g吧),请问有可能是这个原因吗?
from coolplayspark.
@tsface
能把完整日志(driver+相关 executors)发给我吗?lwlin7#gmail.com
之前没遇到过这种情况
from coolplayspark.
@proflin
已发
from coolplayspark.
@keepsimplefocus
谢谢,源码中加了日志,在跟踪
from coolplayspark.
请问这个问题解决了吗,我也遇到了类似的问题
from coolplayspark.
Related Issues (20)
- 关于SparkStreaming的join操作 HOT 2
- [SS]《1.1 Structured Streaming 实现思路与实现概述》讨论区 HOT 9
- [SS]《1.2 Structured Streaming 之 Output Modes 解析》讨论区 HOT 5
- [SS]《2.1 Structured Streaming 之 Source 解析》讨论区 HOT 1
- [SS]《2.2 Structured Streaming 之 Sink 解析》讨论区 HOT 3
- [SS]《3.1 Structured Streaming 之状态存储解析》讨论区 HOT 8
- [SS]《4.1 Structured Streaming 之 Event Time 解析》讨论区 HOT 2
- [SS]《4.2 Structured Streaming 之 Watermark 解析》讨论区 HOT 3
- [SS]《[Q&A] Structured Streaming 与 Spark Streaming 的区别》讨论区 HOT 1
- 请教问题
- Spark技术群二维码过期 HOT 2
- 这篇文档("0.1 Spark Streaming 实现思路与模块概述.md")存在描述错误的地方 HOT 1
- 大神有没有 sparkstreaming 读取kafka相关的代码
- 程序编译的时候是kafka_client-0.10.jar的,spark-submit的时候加载了CDH自带的spark-assembly。导致类冲突 HOT 1
- driver端异常恢复, 如何确保exactly once语义的呢? HOT 1
- 【question】在watermark下spark如何维护kafka的offset
- structured streaming java.io.EOFException
- StateStore的实现以及exactly-once HOT 1
- 读取多个topic数据效率问题 HOT 1
- spark streaming读取redis问题
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from coolplayspark.