Git Product home page Git Product logo

Comments (36)

lw-lin avatar lw-lin commented on August 15, 2024 4

@TopSpoofer

一般我们把上游数据源 (Source) 看做一个 end,把下游数据接收 (Sink) 看做另一个 end:

 Source  -->  Spark Streaming  -->  Sink
 [end]                             [end]

目前的 Spark Streaming 处理过程自身是 exactly-once 的,对上游这个 end 的数据管理做得挺好(比如在 direct 模式里自己保存 Kafka 的偏移),但对下游除 HDFS 外的 HBase, RDBMS, Redis 啊等 end 还不太友好,需要 user code 来实现幂等逻辑,才能保证 end-to-end 的 exactly once。

而在 Spark 2.x 的 Structured Streaming 里,将也将把常见的下游 end 也管理起来(比如通过 batch id 来原生支持幂等),那么不需要 user code 做什么就可以保证 end-to-end 的 exactly once 了。

Hope it helps!

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024 2

@zzcclp 写了文章解释了下:http://www.jianshu.com/p/27f91de7417d,之前说的可能会引起数据丢失还是跟我们自己的实现方式有关,我们是自己保留每个 batch 消费到的 offsets,所以如果 spark.streaming.concurrentJobs 大于1,那么就可能会有多个 batch 的 job 一起运行,可能会晚一点的batch 反而更早运行完了,这个时候把这个晚一点的 batch 写到我们的 zookeeper 中当做上次消费到的 offsets,如果此时挂掉了,那么在下次重启的时候就会从该 offsets 处重启,那么在挂掉时,那个更早一点的 batch 对应的 job 其实没有执行完成,这个时候就会有一部分数据丢失了。这是我们之前方案的不足之处,之所以采取这样的方案的原因也是我们的业务允许绩效的数据误差。不过要在spark.streaming.concurrentJobs 大于1时保证整个业务的 exactly once,其实还要视具体情况而定,在我们的业务中代价会比较大。

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024 1

@luckuan 要看怎么用,如果spark.streaming.concurrentJobs为1,则不会丢数据,但存在有很小一部分数据重复消费的可能;如果spark.streaming.concurrentJobs大于1,是会丢数据的

另,凡是处理数据的完成操作和确认数据已处理这两个操作不是一个原子操作,那么一定是不能保证加好一次的语义的

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@lw-lin 请问 Spark Streaming 在你们的实践中主要用来做哪些事情?

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@keepsimplefocus

我们是做社交和效果广告的业务,所以 Spark Streaming 在我们的实践中:

  • 从业务应用来讲
    • 各种维度、各种指标的数据统计(非精准、允许有少许误差)
    • 各种维度、各种指标的数据统计(精准)
    • 实时反作弊与计费(可重入、精准)
    • 与其它数据源进行实时数据关联、数据清洗、特征抽取
    • 实验相关内容的实时控制与反馈
  • 从技术栈来讲
    • 原有 Storm 能做的业务已经迁移了很多到 Spark Streaming
    • 原有 MR 能做的业务已经迁移了很多到 Spark Streaming

抱歉不能讲的很详细 :-)

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

多谢~

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@lw-lin 再请问下,在你们的业务中应该也有需要结合比较长一段历史数据进行实时统计的吧。你们是如何做的呢?比如要实时算一天的 uuid 排重数。我们这边这种操作都是需要借助外部的东西,比如 redis 之类的

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@keepsimplefocus

是的,我们也有。

之前跟你们差不多,我们是借助 HBase 进行去重,加点优化一个是批量,另一个是一个 key 保存多个 uuid 的 value 列表、进一步批量化一下。

前段时间我们在改为 Spark Streaming 原生的 mapWithState() 实现,就是把见过的值保存到 state 里,state 靠 Spark Streaming 来做 snapshot 和 replay。但是还在调优、暂时还没到线上、应该很快可以到线上了。

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@lw-lin 好的 多谢 我们在 redis 的使用了也做了批量和 pipeline 的优化。我也尝试使用你们现在的思路试试~

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@lw-lin 再请教一个问题:
你们是怎么保证 Application 在 failed 之后计算不会多也不会少,且重启后也不丢失计算进度?
我们的实践中,数据源是 kafka,之前由于 checkpoint 限制较多就没启用 checkpoint,自己维护 offsets。但自己维护 offsets 始终存在丢失数据的可能,所以现在考虑启用 checkpoint + WAL

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@keepsimplefocus

重启后不丢失进度这个靠 Spark Streaming 的 checkpoint 机制(是指 DStreamGraph、JobGenerator 等的状态的 checkpoint,详见《4.2》)就很好了——注意这个 checkpoint 是 Streaming 对计算进度的 checkpoint,不是 DStream 或 RDD 的内容的 checkpoint。Application 失效前计算到什么进度、恢复后从什么进度开始重计算,都是很准确的。

之前 checkpoint 限制较多是指?

对于 Kafka 来讲,建议使用 Direct 而不是 Receiver-based 的方式读取,offsets 都由 Spark Streaming 放到自己的 checkpoint 中而不是 zk 中。Direct 读取的方式,就不需要 WAL 了,而是需要重计算时根据随时更新到 checkpoint 中的 offsets 重新回 Kafka 那里去取。

最后,对外写出需要简单的原子性保证,如 HBase 的 checkAndPut()。没有这个原子性的,就每次写出时加个 batch id,在下游获取结果的时候根据 batch id 进行一下结果的去重吧。

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

嗯,direct 方式确实不需要 WAL,每个 batch 开始的时候才去拉数据。我说的 checkpoint 限制是指,在修改 app 后 checkpoint 因为反序列化失败是失效的。这样我修改了 app 或想要修改配置,使用 checkpoint 都不能恢复。
不过,在将spark.streaming.concurrentJobs设置为多个的时候,还是需要启用 WAL,不然同时执行未完成的 job 还是可能会丢

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@lw-lin 我仔细想了下,使用 checkpoint (甚至加上 WAL)理论上也是无法保证 exactly once 的,因为在 job 丢给 jobExecutor 异步执行之后,会马上执行 checkpoint 操作,这时 checkpoint 数据里,刚刚的 job 其实是未完成的。假设,在下一个 batch 开始之前刚刚的 job 完成了,但下一个 batch 在 checkpoint 之前 application 挂掉了,那么本来已经完成的 job 对应的 time 在 driver 从 checkpoint 恢复后还是未完成的。这样就会重复处理了

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@keepsimplefocus
是的。End-to-end exactly-once 还是需要下游数据接收端也有些去重处理逻辑,现在我们都是自己写的,利用 HBase 的 checkAndPut() / MySQL 里是另加了 batch-id 等。不过 2.0 的 Structured Streaming 里确实添加了 end-to-end exactly-once 的框架级支持,参见来自 databricks 的 slide:

image

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@keepsimplefocus 你在哪个公司?感觉你们 streaming 也研究和应用的挺深入的,没有在你 github 页找到公司信息。

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@lw-lin 我是猎豹移动的 :-)

from coolplayspark.

luckuan avatar luckuan commented on August 15, 2024

@keepsimplefocus 自己维护偏移量,会存在丢失数据的可能么?

from coolplayspark.

luckuan avatar luckuan commented on August 15, 2024

膜拜~~~

from coolplayspark.

zzcclp avatar zzcclp commented on August 15, 2024

@keepsimplefocus 能解释下为什么spark.streaming.concurrentJobs大于1时会丢数据吗?

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@zzcclp 我周末写篇文章说明下这个问题吧,到时发链接给你~

from coolplayspark.

zzcclp avatar zzcclp commented on August 15, 2024

@keepsimplefocus 多谢,能不能把你blog先给我,学习下哈

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

http://www.jianshu.com/users/001d44710e2e/latest_articles

在 2016-04-12 17:17:58,"Zhichao Zhang" [email protected] 写道:

@keepsimplefocus 多谢,能不能把你blog先给我,学习下哈


You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub

from coolplayspark.

zzcclp avatar zzcclp commented on August 15, 2024

@keepsimplefocus 原来你就是“牛肉圆粉不加葱”啊,看过你的文章,赞

from coolplayspark.

zzcclp avatar zzcclp commented on August 15, 2024

@keepsimplefocus 关于spark.streaming.concurrentJobs大于1时会丢数据的问题,你什么时候能写个blog呢?

from coolplayspark.

weibin0516 avatar weibin0516 commented on August 15, 2024

@zzcclp 抱歉,我周末补上,大致说一下,主要原因是因为 job 都是放到线程池里异步提交执行的,这样会导致写入 checkpoint 的 job 状态和 job 真正的状况可能会不一致~

from coolplayspark.

zzcclp avatar zzcclp commented on August 15, 2024

@keepsimplefocus 如果每个batch都能完成任务,这种情况就不会出现?当batch出现滞后,就会导致这种情况呢?

from coolplayspark.

zzcclp avatar zzcclp commented on August 15, 2024

大概清楚了,谢谢你的解释,不过就算是不自己管理offset,采用cp,也是有可能出现你说的情况吧?归根结底,还是能及时处理,才能避免这种情况

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@keepsimplefocus @zzcclp
另一个与异步 ck 有些相关的问题(已经在 1.5 中 fix 了),FYI:
[SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint

from coolplayspark.

zzcclp avatar zzcclp commented on August 15, 2024

@lw-lin 那个pr是增加了在结束job后再删除cp数据的,但是如果出现后面的batch比前面的batch早处理完,似乎也有可能出现 @keepsimplefocus 说的情况吧?

from coolplayspark.

TopSpoofer avatar TopSpoofer commented on August 15, 2024

@lw-lin 博主 End-to-end exactly-once 是什么意思,能解析一下吗?

from coolplayspark.

permanentstar avatar permanentstar commented on August 15, 2024

@lw-lin @keepsimplefocus 看到上面你们讨论关于“需要结合比较长一段历史数据进行实时统计”的需求在spark streaming中的mapWithState实现,请问这个方案现在在使用中吗?

另外我有几个问题想请教下
1.如果需求中有比较多的这种去重需求,那对于一天的统计,mapWithState中累积的数据就会随时间线性增加,在一天结束时这个量相当于和做了一个每天定时的批处理数据量是相当的,这对于stateful stream的checkpoint操作是否有比较大的压力?

2.由于mapWithState操作是只对增量数据进行操作,因此每个batch中并不会遍历所有key,这虽然减少运算量,但相比updateStateByKey就不能及时remove掉不需要的数据(比如一天结束时的clean操作),想到过timeout方法,但这样就需要设置超时大于1天,内存和checkpoint冗余量会比较大,这个如何解决?

3.一旦生成"静态的RDD DAG 模板",是不是用户就无法通过在driver侧设置变量等方式改变某个batch时的执行流程了?比如我想每N个batch才执行一次output,我试过在foreachRDD里面使用driver的变量来判断是否要执行输出操作,但我发现只有在实际输出需要执行的那个batch,job才会真正启动计算,期间的数据计算都会堆积在此时才会触发,这种做法是否合理?

这几个问题困扰了好久,希望能解答下,谢谢。

from coolplayspark.

TopSpoofer avatar TopSpoofer commented on August 15, 2024

@lw-lin thk!!!~

from coolplayspark.

teeyog avatar teeyog commented on August 15, 2024

请问batch-id怎么来?

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@SOBIGUFO 是指 Spark Streaming 还是 Structured Streaming?

from coolplayspark.

teeyog avatar teeyog commented on August 15, 2024

@lw-lin Spark Streaming

from coolplayspark.

lw-lin avatar lw-lin commented on August 15, 2024

@SOBIGUFO please see the code below:

val dstream = ssc...
dstream.foreachRDD((rdd: RDD[...], time: Time) => {
  val batchTime = time.milliseconds   // this batchTime basically represents the batch-id
  rdd.doSomething(...)
})

from coolplayspark.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.