Git Product home page Git Product logo

blog's Introduction

🕛 I'm in Chengdu now.

  • 🌱 202001-202012 Cloud Container Engine(CCE).
  • 🔭 202101-202112 Go and InfluxDB.
  • 😄 202201-~ I’m currently working on openGemini.

blog's People

Contributors

shilinlee avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

blog's Issues

Spark系列: 初识Spark

Spark简介

Spark具有如下几个主要特点

  • 运行速度快:使用DAG执行引擎以支持循环数据流与内存计算
  • 容易使用:支持使用Scala、Java、Python和R语言进行编程,可以通过Spark Shell进行交互式编程
  • 通用性:Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法组件
  • 运行模式多样:可运行于独立的集群模式中,可运行于Hadoop中,也可运行于Amazon EC2等云环境中,并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源

Scala简介

  • Scala具备强大的并发性,支持函数式编程,可以更好地支持分布式系统

  • Scala语法简洁,能提供优雅的API

  • Scala兼容Java,运行速度快,且能融合到Hadoop生态圈中

Spark与Hadoop的对比

  • Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比Hadoop MapReduce更灵活

  • Spark提供了内存计算,可将中间结果放到内存中,对于迭代运算效率更高

  • Spark基于DAG的任务调度执行机制,要优于Hadoop MapReduce的迭代执行机制

Spark生态系统

Spark的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统,既能够提供内存计算框架,也可以支持SQL即席查询、实时流式计算、机器学习和图计算等。Spark可以部署在资源管理器YARN之上,提供一站式的大数据解决方案。因此,Spark所提供的生态系统足以应对上述三种场景,即同时支持批处理、交互式查询和流数据处理。

Spark的生态系统主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX 等组件。

Spark生态系统组件的应用场景:

应用场景 时间跨度 其他框架 Spark生态系统中的组件
复杂的批量数据处理 小时级 MapReduce、Hive Spark
基于历史数据的交互式查询 分钟级、秒级 Impala、Dremel、Drill Spark SQL
基于实时数据流的数据处理 毫秒、秒级 Storm、S4 Spark Streaming
基于历史数据的数据挖掘 - Mahout MLlib
图结构数据的处理 - Pregel、Hama GraphX

Spark运行架构

基本概念

  • RDD:是Resillient Distributed Dataset(弹性分布式数据集)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型
  • DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系
  • Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task
  • Application:用户编写的Spark应用程序
  • Task:运行在Executor上的工作单元
  • Job:一个Job包含多个RDD及作用于相应RDD上的各种操作
  • Stage:是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet,代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集

架构设计

  • Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)

image

  • 一个Application由一个Driver和若干个Job构成,一个Job由多个Stage构成,一个Stage由多个没有Shuffle关系的Task组成
  • 当执行一个Application时,Driver会向集群管理器申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行Task,运行结束后,执行结果会返回给Driver,或者写到HDFS或者其他数据库中

image

Spark运行基本流程

  • 首先为应用构建起基本的运行环境,即由Driver创建一个SparkContext,进行资源的申请、任务的分配和监控
  • 资源管理器为Executor分配资源,并启动Executor进程
  • SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理;Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行,并提供应用程序代码
  • Task在Executor上运行,把执行结果反馈给TaskScheduler,然后反馈给DAGScheduler,运行完毕后写入数据并释放所有资源

RDD运行原理

RDD概念

  • 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算

  • RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和group by)而创建得到新的RDD

  • RDD提供了一组丰富的操作以支持常见的数据运算,分为“动作”(Action)和“转换”(Transformation)两种类型

  • RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改(不适合网页爬虫

  • 表面上RDD的功能很受限、不够强大,实际上RDD已经被实践证明可以高效地表达许多框架的编程模型(比如MapReduce、SQL、Pregel)

  • Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作

RDD典型的执行过程如下:

  • RDD读入外部数据源进行创建

  • RDD经过一系列的转换(Transformation)操作,每一次都会产生不同的RDD,供给下一个转换操作使用

  • 最后一个RDD经过“动作”操作进行转换,并输出到外部数据源

这一系列处理称为一个Lineage(血缘关系),即DAG拓扑排序的结果。

优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单

RDD特性

  • 高效的容错性。RDD: 血缘关系、重新计算丢失分区、无需回滚系统、重算过程在不同节点之间并行、只记录粗粒度的操作
  • 中间结果持久化到内存,数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销
  • 存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化

RDD的依赖关系

image

  • 窄依赖表现为一个父RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区

  • 宽依赖则表现为存在一个父RDD的一个分区对应一个子RDD的多个分区

Stage的划分

Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分Stage,具体划分方法是:

  • 在DAG中进行反向解析,遇到宽依赖就断开

  • 遇到窄依赖就把当前的RDD加入到Stage中

  • 将窄依赖尽量划分在同一个Stage中,可以实现流水线计算

image

Spark SQL

Shark

  • Shark即Hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作。

  • Shark的设计导致了两个问题:

    • 一是执行计划优化完全依赖于Hive,不方便添加新的优化策略;
    • 二是因为Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支

Spark SQL设计

Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。

  • Spark SQL增加了SchemaRDD(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据

  • Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范

Spark的部署和应用方式

Spark应用程序

  • Python
  • Scala

The Google File System

简介

我们设计并实现了Google GFS文件系统,一个面向大规模数据密集型应用的、可伸缩的分布式文件系统。GFS虽然运行在廉价的普遍硬件设备上,但是它依然了提供灾难冗余的能力,为大量客户机提供了高性能的服务。

首先,组件失效被认为是常态事件,而不是意外事件。所以,持续的监控、错误侦测、灾难冗余以及自动恢复的机制必须集成在GFS中。

其次,以通常的标准衡量,我们的文件非常巨大。

第三,绝大部分文件的修改是采用在文件尾部追加数据,而不是覆盖原有数据的方式。数据的追加操作是性能优化和原子性保证的主要考量因素。

第四,应用程序和文件系统API的协同设计提高了整个系统的灵活性。

设计概述

接口

我们支持常用的操作,如创建新文件、删除文件、打开文件、关闭文件、读和写文件。另外,GFS提供了快照和记录追加操作。

架构

一个GFS集群包含一个单独的Master节点、多台Chunk服务器,并且同时被多个客户端访问。

GFS存储的文件都被分割成固定大小的Chunk。在Chunk创建的时候,Master服务器会给每个Chunk分配一个不变的、全球唯一的64位的Chunk标识。Chunk服务器把Chunk以Linux文件的形式保存在本地硬盘上,并且根据指定的Chunk标识和字节范围来读写块数据。出于可靠性的考虑,每个块都会复制到多个块服务器上。缺省情况下,我们使用3个存储复制节点,不过用户可以为不同的文件命名空间设定不同的复制级别。

Master节点管理所有的文件系统元数据。这些元数据包括名字空间、访问控制信息、文件和Chunk的映射信息、以及当前Chunk的位置信息。Master节点还管理着系统范围内的活动,比如,Chunk租用管理、孤儿Chunk的回收、以及Chunk在Chunk服务器之间的迁移。Master节点使用心跳信息周期地和每个Chunk服务器通讯,发送指令到各个Chunk服务器并接收Chunk服务器的状态信息。

GFS客户端代码以库的形式被链接到客户程序里。客户端代码实现了GFS文件系统的API接口函数、应用程序与Master节点和Chunk服务器通讯、以及对数据进行读写操作。客户端和Master节点的通信只获取元数据,所有的数据操作都是由客户端直接和Chunk服务器进行交互的。我们不提供POSIX标准的API的功能,因此,GFS API调用不需要深入到Linux vnode级别。

无论是客户端还是Chunk服务器都不需要缓存文件数据。客户端缓存数据几乎没有什么用处,因为大部分程序要么以流的方式读取一个巨大文件,要么工作集太大根本无法被缓存。无需考虑缓存相关的问题也简化了客户端和整个系统的设计和实现。Chunk服务器不需要缓存文件数据的原因是,Chunk以本地文件的方式保存,Linux操作系统的文件系统缓存会把经常访问的数据缓存在内存中。

单一Master节点

单一的Master节点的策略大大简化了我们的设计。另外,我们必须减少对Master节点的读写,避免Master节点成为系统的瓶颈。

Chunk尺寸

我们选择了64MB,这个尺寸远远大于一般文件系统的Block size。每个Chunk的副本都以普通Linux文件的形式保存在Chunk服务器上,只有在需要的时候才扩大。

然而,当我们第一次把GFS用于批处理队列系统的时候,热点的问题还是产生了:一个可执行文件在GFS上保存为single-chunk文件,之后这个可执行文件在数百台机器上同时启动。存放这个可执行文件的几个Chunk服务器被数百个客户端的并发请求访问导致系统局部过载。我们通过使用更大的复制参数来保存可执行文件,以及错开批处理队列系统程序的启动时间的方法解决了这个问题。一个可能的长效解决方案是,在这种的情况下,允许客户端从其它客户端读取数据。

元数据

Master服务器存储3种主要类型的元数据,包括:文件和Chunk的命名空间、文件和Chunk的对应关系、每个Chunk副本的存放地点。

内存中的数据结构

因为元数据保存在内存中,所以Master服务器的操作速度非常快。并且,Master服务器可以在后台简单而高效的周期性扫描自己保存的全部状态信息。这种周期性的状态扫描也用于实现Chunk垃圾收集、在Chunk服务器失效的时重新复制数据、通过Chunk的迁移实现跨Chunk服务器的负载均衡以及磁盘使用状况统计等功能。

Chunk位置信息

Master服务器并不保存持久化保存哪个Chunk服务器存有指定Chunk的副本的信息。Master服务器只是在启动的时候轮询Chunk服务器以获取这些信息。Master服务器能够保证它持有的信息始终是最新的,因为它控制了所有的Chunk位置的分配,而且通过周期性的心跳信息监控Chunk服务器的状态。

操作日志

操作日志包含了关键的元数据变更历史记录。

Master服务器在灾难恢复时,通过重演操作日志把文件系统恢复到最近的状态。为了缩短Master启动的时间,我们必须使日志足够小。Master服务器在日志增长到一定量时对系统状态做一次Checkpoint,将所有的状态数据写入一个Checkpoint文件

一致性模型

GFS一致性保障机制

件命名空间的修改(例如,文件创建)是原子性的。它们仅由Master节点的控制:命名空间锁提供了原子性和正确性的保障;Master节点的操作日志定义了这些操作在全局的顺序。

如果所有客户端,无论从哪个副本读取,读到的数据都一样,那么我们认为文件region是“一致的”;如果对文件的数据修改之后,region是一致的,并且客户端能够看到写入操作全部的内容,那么这个region是“已定义的”。

程序的实现

尽量采用追加写入而不是覆盖,Checkpoint,自验证的写入操作,自标识的记录。

系统交互

一个重要的原则是最小化所有操作和Master节点的交互。

租约(lease)和变更顺序

image

数据流

为了提高网络效率,我们采取了把数据流控制流分开的措施。在控制流从客户机到主Chunk、然后再到所有二级副本的同时,数据以管道的方式,顺序的沿着一个精心选择的Chunk服务器链推送。我们的目标是充分利用每台机器的带宽,避免网络瓶颈和高延时的连接,最小化推送所有数据的延时。

我们利用基于TCP连接的、管道式数据推送方式来最小化延迟。

原子的记录追加

GFS提供了一种原子的数据追加操作–记录追加。

快照

快照操作几乎可以瞬间完成对一个文件或者目录树(“源”)做一个拷贝,并且几乎不会对正在进行的其它操作造成任何干扰。

Master节点的操作

Master节点执行所有的名称空间操作。此外,它还管理着整个系统里所有Chunk的副本:它决定Chunk的存储位置,创建新Chunk和它的副本,协调各种各样的系统活动以保证Chunk被完全复制,在所有的Chunk服务器之间的进行负载均衡,回收不再使用的存储空间。

名称空间管理和锁

Master节点的很多操作会花费很长的时间:比如,快照操作必须取消Chunk服务器上快照所涉及的所有的Chunk的租约。我们不希望在这些操作的运行时,延缓了其它的Master节点的操作。因此,我们允许多个操作同时进行,使用名称空间的region上的锁来保证执行的正确顺序。

副本的位置

Chunk副本位置选择的策略服务两大目标:最大化数据可靠性和可用性,最大化网络带宽利用率。另一方面,写操作必须和多个机架上的设备进行网络通信,但是这个代价是我们愿意付出的。

创建,重新复制,重新负载均衡

垃圾回收

度量(benchmark)

上一篇: 大数据技术关键字
下一篇: Bigtable:结构化数据的分布式存储系统

Hadoop:离线批处理MapReduce任务执行过程详解

MapReduce简介

MapReduce模型简介

  • MapReduce采用“分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理
  • MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销
  • MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave。Master上运行JobTracker,Slave上运行TaskTracker

MapReduce1.0的体系结构

MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task。

image

Client

  • 用户编写的MapReduce程序通过Client提交到JobTracker端
  • 用户可通过Client提供的一些接口查看作业运行状态

JobTracker

  • JobTracker负责资源监控和作业调度
  • JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点
  • JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源

TaskTracker

  • TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)
  • TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用

Task

Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动

MapReduce工作流程

输入->Map任务->shuffle->Reduce任务->输出

MapReduce可以很好地应用于各种计算问题:

  • 关系代数运算(选择、投影、并、交、差、连接)
  • 分组与聚合运算
  • 矩阵-向量乘法
  • 矩阵乘法

MapReduce:大型集群上的简化数据处理

简介

MapReduce是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个Map函数处理一个基于key/value pair的数据集合,输出中间的基于key/value pair的数据集合;然后再创建一个Reduce函数用来合并所有的具有相同中间key值的中间value值。

MapReduce架构的程序能够在大量的普通配置的计算机上实现并行化处理。这个系统在运行时只关心:如何分割输入数据,在大量计算机组成的集群上的调度,集群中计算机的错误处理,管理集群中计算机之间必要的通信。采用MapReduce架构可以使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源。

我们的MapReduce实现运行在规模可以灵活调整的由普通机器组成的集群上:一个典型的MapReduce计算往往由几千台机器组成、处理以TB计算的数据。程序员发现这个系统非常好用:已经实现了数以百计的MapReduce程序,在Google的集群上,每天都有1000多个MapReduce程序在执行。

编程模型

MapReduce编程模型的原理是:利用一个输入key/value pair集合来产生一个输出的key/value pair集合。MapReduce库的用户用两个函数表达这个计算:Map和Reduce。

用户自定义的Map函数接受一个输入的key/valuepair值,然后产生一个中间key/value pair值的集合。MapReduce库把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。

用户自定义的Reduce函数接受一个中间key的值I和相关的一个value值的集合。Reduce函数合并这些value值,形成一个较小的value值的集合。一般的,每次Reduce函数调用只产生0或1个输出value值。通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。

类型

map(k1,v1) -> list(k2,v2)
reduce(k2,list(v2)) -> list(v2)

比如,输入的key和value值与输出的key和value值在类型上推导的域不同。此外,中间key和value值与输出key和value值在类型上推导的域相同。

实现

执行概括

通过将Map调用的输入数据自动分割为M个数据片段的集合,Map调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将Map调用产生的中间key值分成R个不同分区(例如,hash(key) mod R),Reduce调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。

image

图上展示了我们的MapReduce实现中操作的全部流程。当用户调用MapReduce函数时,将发生下面的一系列动作(下面的序号和图1中的序号一一对应):

  1. 用户程序首先调用的MapReduce库将输入文件分成M个数据片度,每个数据片段的大小一般从16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。

  2. 这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序,由master分配任务。有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker。

  3. 被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并缓存在内存中。

  4. 缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。

  5. 当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。

  6. Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。

  7. 当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。

在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这R个输出文件合并成一个文件–他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。

Master数据结构

Master持有一些数据结构,它存储每一个Map和Reduce任务的状态(空闲、工作中或完成),以及Worker机器(非空闲任务的机器)的标识。

Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。

容错

work故障

master周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。

当一个Map任务首先被worker A执行,之后由于worker A失效了又被调度到worker B执行,这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。

master失败

一个简单的解决办法是让master周期性的将上面描述的数据结构写入磁盘,即检查点(checkpoint)。如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。然而,由于只有一个master进程,master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止MapReduce运算。客户可以检查到这个状态,并且可以根据需要重新执行MapReduce操作。

在失效方面的处理机制

我们依赖对Map和Reduce任务的输出是原子提交的来完成这个特性。每个工作中的任务把它的输出写到私有的临时文件中。每个Reduce任务生成一个这样的文件,而每个Map任务则生成R个这样的文件(一个Reduce任务对应一个文件)。当一个Map任务完成的时,worker发送一个包含R个临时文件名的完成消息给master。如果master从一个已经完成的Map任务再次接收到到一个完成消息,master将忽略这个消息;否则,master将这R个文件的名字记录在数据结构里。

存储位置

我们通过尽量把输入数据(由GFS管理)存储在集群中机器的本地磁盘上来节省网络带宽。GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)。当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。

技巧

分区函数

MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。我们在中间key上使用分区函数来对数据进行分区,之后再输入到后续任务执行进程。一个缺省的分区函数是使用hash方法(比如,hash(key) mod R)进行分区。hash方法能产生非常平衡的分区。然而,有的时候,其它的一些分区函数对key值进行的分区将非常有用。比如,输出的key值是URLs,我们希望每个主机的所有条目保持在同一个输出文件中。为了支持类似的情况,MapReduce库的用户需要提供专门的分区函数。例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的URLs保存在同一个输出文件中。

Combiner函数

每个Map任务将产生成千上万个这样的记录<the,1>。所有的这些记录将通过网络被发送到一个单独的Reduce任务,然后由这个Reduce任务把所有这些记录累加起来产生一个数字。我们允许用户指定一个可选的combiner函数,combiner函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出去。

Combiner函数在每台执行Map任务的机器上都会被执行一次。一般情况下,Combiner和Reduce函数是一样的。Combiner函数和Reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出。Reduce函数的输出被保存在最终的输出文件里,而Combiner函数的输出被写到中间文件里,然后被发送给Reduce任务。

跳过损坏的记录

每个worker进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。在执行Map或者Reduce操作之前,MapReduce库通过全局变量保存记录序号。如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号。当master看到在处理某条特定记录不止失败一次时,master就标志着条记录需要被跳过,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。

状态信息

处于最顶层的状态页面显示了哪些worker失效了,以及他们失效的时候正在运行的Map和Reduce任务。这些信息对于调试用户代码中的bug很有帮助。

相关工作

MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的集群上分布和运行用户任务。虽然这个不是本论文的重点,但是有必要提一下,这个集群管理系统在理念上和其它系统,如Condor是一样。

MapReduce库的排序机制和NOW-Sort的操作上很类似。读取输入源的机器(map workers)把待排序的数据进行分区后,发送到R个Reduce worker中的一个进行处理。每个Reduce worker在本地对数据进行排序(尽可能在内存中排序)。当然,NOW-Sort没有给用户自定义的Map和Reduce函数的机会,因此不具备MapReduce库广泛的实用性。

Spark内核设计的艺术: 第8章 计算引擎

8.1 计算引擎概述

Spark的计算引擎主要包括执行内存Shuffle两部分

  1. 执行内存

    执行内存主要包括执行内存、任务内存管理器(TaskMemoryManager)、内存消费者(MemoryCosumer)等内容。执行内存包括在JVM堆上进行分配的执行内存池(ExecutionMemoryPool)和在操作系统的内存中进行分配的Tungsten。内存管理器将提供API对执行内存和Tungsten进行管理(包括申请内存、释放内存等)。因为同一节点上能够运行多次任务尝试,所以需要每一次任务尝试都有单独的任务内存管理器为其服务。任务尝试通过任务内存管理器与内存管理器交互,以申请任务尝试所需要的执行内存,并在任务尝试结束后释放使用的执行内存。一次任务尝试过程中会有多个组件需要使用执行内存,这些组件统称为内存消费者。内存消费者多种多样,有对map任务的中间输出数据在JVM堆上进行缓存、聚合、溢出、持久化等处理的ExternalSorter,也有在操作系统内存中进行缓存、溢出、持久化处理的ShuffleExternalSorter,还有将key/value对存储到连续的内存块中的RowBasedKeyValueBatch。消费者需要的执行内存都是向任务内存管理器所申请的。从执行内存的角度来看,计算引擎的整体架构如下图所示:
    image

  2. 什么是Shuffle

    Shuffle是所有MapReduce计算框架必须面临的执行阶段,Shuffle用于打通map任务的输出与reduce任务的输入,map任务的中间输出结果按照指定的分区策略(例如:按照key值哈希)分配给处理某一个分区的reduce任务。

image

在早期Spark版本中,Shuffle如下图所示:

image

步骤如下:

1)map任务会为每一个reduce任务创建一个bucket。假设有M个map任务,R个reduce任务,则map阶段一共会创建M * R个桶(bucket)
2)map任务会将产生的中间结果按照分区(partition)写入到不同的bucket中
3)reduce任务从本地或者远端的map任务所在的BlockManager获取相应的bucket作为输入。

Spark早期版本的Shuffle过程存在以下问题:

1)map任务的中间结果首先存入内存,然后才写入磁盘。这对于内存的开销很大,当一个节点上map任务的输出结果集很大时,很容易导致内存紧张,进而发生内存溢出
2)每个map任务都会输出R(reduce任务数量)个bucket。假设M等于1000,R也等于1000,那么共计生成100万个bucket,在bucket本身不大,但是Shuffle很频繁的情况下,磁盘I/O将成为性能瓶颈。

Hadoop MapReduce的Shuffle过程存在以下问题:

1)reduce任务获取到map任务的中间输出后,会对这些数据在磁盘上进行合并(merge)和排序(sort),虽然占用内存很小,但是却产生了更多的磁盘I/O

2)当数据量很小,但是map任务和reduce任务数目很多时,会产生很多网络I/O

为了解决以上MapReduce和早期spark的Shuffle过程中的性能问题,目前Spark已经对Shuffle做了多种性能优化,主要解决方法如下:

1)将map任务给每个partition的reduce任务输出的bucket合并到同一个文件中,这解决了bucket数量很多,但是数据本身的体积不大时,造成Shuffle频繁,磁盘I/O成为性能瓶颈的问题
2)map任务逐条输出计算结果,而不是一次性输出到内存,并使用AppendOnlyMap缓存及其聚合算法对中间结果进行聚合,这大大减小了中间结果所占的内存大小
3)对SizeTrackingAppendOnlyMap、SizeTrackingPairBuffer及Tungsten的Page进行溢出判断,当超出溢出限制的大小时,将数据写入磁盘,防止内存溢出
4)reduce任务将要拉取的Block按照BlockManager地址划分,然后将同一BlockManager地址中的Block累积为少量网络请求,减少网络I/O。

经过以上优化,目前Spark实现的Shuffle的过程大致为:map任务在输出时会进行分区计算并生成数据文件和索引文件等步骤,可能还伴随有缓存、排序、聚合、溢出、合并等操作。reduce任务将map任务输出的Block划分为本地和远端的Block,对于远端的Block,需要使用ShuffleClient从远端节点下载,而对于 本地的BLock,只需要从本地的存储体系中读取即可。reduce任务读取到map任务输出的数据后,可能进行缓存 、排序、聚合、溢出、合并等操作,最终输出结果。

8.2 内存管理器和执行内存

第6章介绍MemoryManager时只讲了存储内存,执行内存放到此章节来讲解。

8.2.1 ExecutionMemoryPool 详情

ExecutionMemoryPool继承了MemoryPool,是执行内存池的具体实现。Execution
MemoryPooI只是逻辑上的执行内存池,并不是堆上和堆外的实际内存。要理解Execution
MemoryPool,应该从其属性开始。ExecutionMemoryPool继承了MemoryPool的lock和
_poolSize两个属性,还增加了一些特有的属性。

  1. acquireMemory: 用于给TaskAttemptId对应的任务尝试获取指定大小(numBytes)的内存。
  2. releaseMemory: 用于给TaskAttemptId对应的任务尝试释放指定大小(numBytes)的内存。
  3. releaseAllMemoryForTask: 用于释放taskAttemptId对应的任务尝试所消费的所有内存。

8.2.2 MemoryManager 模型与执行内存

在6.5.3节曾经详细介绍了MemoryManager的内存模型,但是没有介绍其与执行内存相关的方法。本节将介绍MemoryManager中与执行相关的方法。
1)acquireExecutionMemory:为执行taskAttemptId对应的任务尝试,从堆内存或堆外内存获取所需大小(即numBytes)的内存。
2) releaseExecutionMemory:从堆内存或堆外内存释放taskAttemptId对应的任务尝试所消费的指定大小(即numBytes)的执行内存。
3)releaseAllExecutionMemoryForTask:从堆内存及堆外内存释放taskAttemptId代表的任务尝试所消费的所有执行内存。
4)executionMemoryUsed:获取堆上执行内存池与堆外执行内存池已经使用的执行内存之和
5)getExecutionMemoryUsageForTask:获取taskAttemptId代表的任务尝试在堆上执行内存池与堆外执行内存池所消费的执行内存之和。

8.2.3 UnifiedMemoryManager与执行内存

在6.5.4节介绍UnifiedMemoryManager时,只介绍了与存储体系相关的方法,本节将介绍UnifiedMemoryManager提供的与执行内存相关的acquireExecutionMemory方法。在MemoryManager中定义了acquireExecutionMemory方法的接口,需要子类去实现。

8.3 内存管理器与Tungsten

什么是Tungsten?翻译为中文是“钨”的意思。Tungsten最早是由Databricks公司提出的对spark的内存和CPU使用进行优化的计划,但本书限定Tungsten是一种内存分配与释放的实现。Tungsten使用sun.misc.UnsafeAPI直接操作系统内存,避免了在JVM中加载额外的CIass.也不用创建额外的对象,因而减少了不必要的内存开销,降低了GC扫描和回收的频率,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内存来说降低了管理的难度,也降低了误差。

8.3.1 MemoryBIock 详解

操作系统中的Page是一个内存块,在Page中可以存放数据,操作系统中会有多种不同的Page。操作系统对数据的读取,往往是先确定数据所在的Page,然后使用Page的偏移量(offset)和所读取数据的长度(length)从Page中读取数据。
在Tungsten中实现了一种与操作系统的内存Page非常相似的数据结构,这个对象就是MemoryBlock。MemoryBlock中的数据可能位于JVM的堆上,也可能位于JVM的堆外内存(操作系统内存)中。
由于MemoryBlock继承自MemoryLocation,所以分析MemoryBlock之前,应该首先弄清楚MemoryLocation。MemoryLocation用于表示内存的位置信息。Tungsten如果是堆外模式,那么MemoryLocation的实现如下。

public class MemoryLocation {

  @Nullable
  Object obj;

  long offset;

  public MemoryLocation(@Nullable Object obj, long offset) {
    this.obj = obj;
    this.offset = offset;
  }

  public MemoryLocation() {
    this(null, 0);
  }

  public void setObjAndOffset(Object newObj, long newOffset) {
    this.obj = newObj;
    this.offset = newOffset;
  }

  public final Object getBaseObject() {
    return obj;
  }

  public final long getBaseOffset() {
    return offset;
  }
}

可以看到,MemoryLocation主要由obj和offset两个属性及其读写方法组成。有些读者可能会发现,obj属性由注解NuIlable来标注,这是为什么?Tungsten处于堆内存模式时,数据作为对象存储在JVM的堆上,此时的obj不为空。Tungsten处于堆外内存模式时,数据存储在JVM的堆外内存(操作系统内存)中,因而不会在JVM中存在对象。offset属性主要用来定位数据。当Tungsten处于堆内存模式时,首先从堆内找到对象,然后使用offset定位数据的具体位置。当Tungsten处于堆外内存模式时,则直接使用。offset从堆外内存中定位。定位到数据的位置后,该怎样读取数据呢?MemoryBlock继承自MemoryLocation,代表从obj和offt定位的起始位置开始,固定长度(由MemoryBlock的性确定)的连续内存块。

8.3.2 MemoryManager模型与Tungsten

MemoryManager中除了存储内存和执行内存外,还定义了几个与Tungsten优化相关的
常量。
1)tungstenMemoryMode:Tungsten的内存模式。tungstenMemoryMode也采用枚举类型MemoryMode来表示堆内存和堆外内存。当Tungsten在堆内存模式下,数据存储在JVM堆上,这时Tungsten选择onHeapExecutionMemoryPool作为内存池。当Tungsten在堆外内存模式下,数据则会存储在堆外内存(操作系统内存)中,这时Tungsten选择offHeapExecutionMemoryPool作为内存池。可以通过spark.memory.offHeapenabled属性(默认为false)来配置是否启用Tungsten的堆外内存。
2)pageSizeBytes:Tungsten采用的Page的默认大小(单位为字节)。可通过sparkbuffer.pageSize属性进行配置。如果未指定spark.buffer.pageSize属性,则计算pageSizeBytes。
3)tungstenMemoryAllocator:Tungsten采用的内存分配器(MemoryAllocator)。如果
tungstenMemoryModeMemoryMode.ON_HEAP,那么tungstenMemoryAllocator为堆内存分配器(HeapMemoryAllocator),否则为使用sun.migc.Unsafe的API分配操作系统内存的分配器UnsafeMemoryAllocator

8.3.3 Tungsten的内存分配器

MemoryAllocator是Tungsten的内存分配器的接口规范。

public interface MemoryAllocator {

  /**
   * Whether to fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes respectively.
   * This helps catch misuse of uninitialized or freed memory, but imposes some overhead.
   */
  boolean MEMORY_DEBUG_FILL_ENABLED = Boolean.parseBoolean(
    System.getProperty("spark.memory.debugFill", "false"));

  // Same as jemalloc's debug fill values.
  byte MEMORY_DEBUG_FILL_CLEAN_VALUE = (byte)0xa5;
  byte MEMORY_DEBUG_FILL_FREED_VALUE = (byte)0x5a;

  /**
   * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
   * to be zeroed out (call `fill(0)` on the result if this is necessary).
   */
  MemoryBlock allocate(long size) throws OutOfMemoryError;

  void free(MemoryBlock memory);

  MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();

  MemoryAllocator HEAP = new HeapMemoryAllocator();
}

HeapMemoryAllocator的工作原理:
image

UnsafeMemoryAllocator的工作原理:

image

8.4 任务内存管理器

任务内存管理器(TaskMemoryManager)用于管理单个任务尝试的内存分配与释放。TaskMemoryManager实际上依赖于MemoryManager提供的内存管理能力,多个TaskMemoryManager将共享MemoryManager所管理的内存。一次任务尝试有很多组件需要使用内存,这些组件都借助于TaskMemoryManager提供的服务对实际的物理内存进行消费,它们统称为内存消费者(MemoryConsumer)。本节首先详细分析TaskMemoryManager,然后介绍MemoryConsumer。

8.4.1 TaskMemoryManager 详解

  1. acquireExecutionMemory:用于为内存消费者获得指定大小(单位为字节)的内存。当Task没有足够的内存时,将调用MemoryConsumer的spill方法释放内存。
  2. releaseExecutionMemory: 用于为消费者释放指定大小的内存(单位为字节)。
  3. ShowMemoryUsage:将任务尝试、各个MemoryConsumer及MemoryManager管理的执行内存和存储内存的使用情况打印到日志。
  4. pageSizeBytes: 获取Page的大小(单位为字节)。其实际为MemoryManager的pageSizeBytes属性。
  5. allocatePage: 用于给MemoryConsumer分配指定大小(单位为字节)的MemoryBlock。
  6. freePage: 释放给MemoryConsumer分配的MemoryBlock。
  7. encodePageNumberAndOffset: 用于根据给定的Page(即MemoryBlock)和Page中的偏移量的地址,返回页号和相对于内存块的起始地址的偏移量(64位长整型)。
  8. decodePageNumber: 用于将64位长整型右移51位(只剩下页号),然后转换为整型以获得Page的页号。
  9. decodeOffset: 用于将64位长整型与51位的掩码按位进行与运算,以获得在Page中的偏移量。
  10. getPage: 用于通过64位的长整型,获取Page在内存中的对象。此方法在Tungsten采用堆内存模式时才有效,否则返回null。
  11. getOffsetInPage: 将64位的长整型,获取在Page中的偏移量。

8.4.2 内存消费者

抽象类MemoryConsumer定义了内存消费者的规范,它通过TaskMemoryManager在执行内存(堆内存或堆外内存)上申请或释放内存。
MemoryConsumer是抽象类,其抽象方法spill需要子类去实现,spark中有很多MemoryConsumer的子类。

image

8.4.3 执行内存整体架构

image

图中,从下往上展示了Spark执行内存的各个组成部分,上层的组件依赖于下层提供的服务或支持。操作系统内存(OS Memory)是整个架构的基础,无论执行内存如何分配,都离不开系统内存的支持。Java虚拟机(JVM)的堆内存(Heap)提供了对Java对象的存储支持,其实质依然是从操作系统申请获得的内存。内存管理器(MemoryManager)提供了四种逻辑上的内存池,分别为堆外执行内存池(offHeapExecutionMemorypool)、堆上执行内存池(onHeapExecutionMemoryPooI)、堆外存储内存池(offHeapStorageMemoryPool)、堆上存储内存池(onHeapStorageMemoryPooI)。内存管理器提供了在Tungsten的堆外内存上分配内存的UnsafeMemoryAllocator和在Tungsten的堆内存上分配内存的HeapMemoryAllocatorUnsafeMemoryAllocator通过sun.misc.Unsafe的各种API操纵操作系统内存,HeapMemoryAllocator则通过在JVM Heap分配对象的方式操纵JVM Heap。由于每个节点只有一个MemoryManager,而每个任务尝试都会有一个TaskMemoryManager为其管理内存,所以多个TaskMemoryManager将分享MemoryManager管理的内存。每个TaskMemoryManager管理的任务内存又会有多个内存消费者(MemoryConsumer)进行消费。

8.5 Task 详解

Task的实现细节与计算引擎息息相关。

8.5.1 任务上下文 TaskContext

TaskContext维护了Task执行时的上下文信息,所以我们需要对TaskContext的功能进行分析。抽象类TaskContext中定义了一系列抽象方法,由于TaskContext只有一个实现类TaskContextImpl,所以我们将直接介绍TaskContextImpl。TaskContext还有一个利用ThreadLocaI技术的伴生对象,用于维护每个Task线程的TaskcontextImplcl。

  1. TaskContextImpl 详解
    TaskContextImpI是抽象类的唯一实现,我们从了解它的属性信息开始,逐步深人TaskContextImpl的实现。

    • stageId
    • partitionId
    • metricsSystem
  2. TaskContext 的伴生对象

    提供了将TaskContext保存到ThreadLocal中,用于将每个任务尝试线程的TaskContextImpl的线程安全性。

8.5.2 Task 的定义

Task是spark中作业运行的最小单位,为了容错,每个Task可能会有一到多次任务尝试。Task主要包括ShuffleMapTask和ResuItTask两种。每次任务尝试都会申请单独的连续内存,以执行计算。

抽象类Task定义了spark中的规范,我们首先了解Task自身的属性,然后介绍Task提供的基本方法及需要子类实现的抽象方法,最后分析Task定义的模板方法。

  def runTask(context: TaskContext): T

  def preferredLocations: Seq[TaskLocation] = Nil
  // 程序入口
  final def run(
      taskAttemptId: Long,
      attemptNumber: Int,
      metricsSystem: MetricsSystem): T 
  ....

8.5.3 ShuffleMapTask 的实现

8.5.4 ResultTask 的实现

8.6 IndexShuffleBlockResolver

特质ShuffleBlockResolver定义了对ShuffleBlock进行解析的规范,包括获取Shuffle数据文件、获取Shuffle索引文件、删除指定的Shuffle数据文件和索引文件、生成Shuffle索引文件、获取Shume块的数据等。ShuffleBlockResolver目前只有IndexShuffleBlockResolver这唯一的实现类。IndexShuffeBlockResolver用于创建和维护ShuffleBlock与物理文件位置之间的映射关系。

  1. getDataFile: 获取shuffle数据文件。
  2. getIndexFile:获取Shuffle索引文件。
  3. removeDataByMap:删除Shuffle过程中的包含指定map任务输出数据的Shuffle数据文件和索引文件。
  4. writeIndexFileAndCommit:将每个Block的偏移量写入索引文件,并最后增加一个表示输出文件末尾的偏移量。
  5. getBlockData:获取指定ShuffleBlockId对应的数据。

8.7 采样与估算

Spark在Shuffle阶段,给map任务的输出增加了缓存、聚合的数据结构。这些数据结构将使用各种执行内存,为了对这些数据结构的大小进行计算,以便于扩充大小或在没有足够内存时溢出到磁盘,特质SizeTracker定义了对集合进行采样和估算的规范。

8.7.1 SizeTracker 的实现分析

  • takeSample
  • resetSamples
  • afterupdate
  • estimateSize:估算集合的当前大小

8.7.2 SizeTracker 的工作原理

经过对SizeTracker的实现分析,本节来一起总结SizeTracker的工作原理。为便于说明,本节假设采样增长的速率(SAMPLE_GROWTH_RATE)的值不是1.1,而是2。我们将会针对初始估算大小为100的集合进行4次采样。为了便于计算,添加到集合中的元素都只选择10、20、30这样的整数。本节假设SizeEstimator的方法对集合大小的估算非常精确,即与集合的实际大小一致。SizeTracker的工作原理如图8.8所示。

image

根据图 8-8,对各个标记进行说明。

标记①:SizeTracker所追踪的集合处于初始化状态,集合中没有任何元素,此时Size-Tracker在初始化时会调用resetsamples方法,因而会调用takeSample方法进行采样。此时samples中只有一次采样,各个属性分别为bytesPerUpdate—0.0;estimateSize=100;nextsample-Num=2;numUpdates=1。

标记②:向集合中添加了10这个元素,此时的numUpdates增加为2。由于numUpdates与nextSampleNum相同,因而调用takeSample方法进行采样。经过计算,samples中已经有两次采样,各个属性分别为bytesPerUpdate=10.0;estimateSize=110;nextSamp1eNum=4;numUpdates=2。

标记③:向集合中添加了20这个元素,此时的numUpdates增加为3。由于numUpdates不等于nextSampleNum,所以不会进行采样。向集合中继续添加了10这个元素,此时的numUpdates增加为4。由于numUpdates与nextSampleNum相同,因而调用takeSample方法进行采样。经过计算,sample保留最新的两次采样,各个属性分别为bytesPerUpdate=15.0;estimateSize=140;nextSampleNum=8;numUpdates=4。

标记④:向集合中添加了10、30、20这几个元素,此时的numUpdates经过三次增加后值为7。由于numUpdates不等于nextSampIeNum,所以不会进行采样。向集合中继续添加了20这个元素,此时的numUpdates增加为8。由于nurnUpdates与nextSampleNum相同,因而调用takesample方法进行采样。经过计算,samples保留最新的两次采样,各个属性分别为bytesPerUpdate=20.0;estimateSize=220;nextSampleNum=16;numUpdates=8。

8.8 特质 WritablePartitionedPairCollection

WritablePartitionedPairCollection是对由键值对构成的集合进行大小跟踪的通用接口。这里的每个键值对都有相关联的分区,例如,key为(0,#),value为1的键值对,真正的键实际是#,而0则是键#的分区ID。WritablePartitionedPairCollection支持基于内存进行有效排序,并可以创建将集合内容按照字节写人磁盘的Writab1ePartitionedIterator。

8.9 AppendOnlyMap 的实现分析

8.10 PartitionedPairBuffer 的实现分析

map任务除了采用AppendOnlyMap对键值对在内存中进行更新或聚合,spark还提供了一种将键值对缓存在内存中,并支持对元素进行排序的数据结构。AppendOnlyMap的表现行为类似于Map,而这种数据结构类似于Collection,它就是PartitionedPairBuffer。PartitionedPairBuffer最大支持1073741823(即2^30-1)个元素。

PartitionedPairBuffer同时继承了WritablePartitionedPairCollection和SizeTracker这两
个特质。

8.11 外部排序器

Spark中的外部排序器用于对map任务的输出数据在map端或reduce端进行排序。Spark中有两个外部排序器。分别是ExternaISorter和ShuffleExternaISorter,本节将分别展开介绍。

8.11.1 ExternalSorter 详解

ExternalSortersss是SortShuffleManager的底层组件,它提供了很多功能,包括将map任务的输出存储到JVM的堆中,如果指定了聚合函数,则还会对数据进行聚合;使用分区计算器首先将Key分组到各个分区中,然后使用自定义比较器对每个分区中的键进行可选的排序;可以将每个分区输出到单个文件的不同字节范围中,便于reduce端的Shuffle获取。

8.11.2 ShuffleExternaISorter 详解

ShumeExternalSorter是专门用于对Shuffle数据进行排序的外部排序器,用于将map任务的输出存储到Tungsten中;在记录超过限制时,将数据溢出到磁盘。与ExternalSorter不同,ShuffleExternaISorter本身并没有实现数据的持久化功能,具体的持久化将由ShuffleExternalSorter的调用者UnsafeShuffleWriter来实现。

8.12 Shuffle管理器

8.12.1 ShuffleWriter 详解

SortShuffleManager依赖于ShuffleWriter 提供的服务,抽象类ShuffleWriter定义了将map任务的中间结果输出到磁盘上的功能规范,包括将数据写入磁盘和关闭ShuffleWriter 。

  1. ShuffleHandle
  2. MapStatus
  3. SortShuffleWriter
  4. BypassMergeSortShuffleWriter map端不需要在持久化数据之前聚合、排序等操作
  5. UnsafeShuffleWriter

8.12.2 ShuffleBlockFetcherIterator 详解

ShuffleBlockFetcherIterator 是用于获取对个Block的迭代器。如果Block在本地,那么从本地的BlockManager获取;如果Block在远端,那么通过ShuffleClient请求远端节点上的BlockTransferService获取。

  1. initialize:初始化
  2. splitLocalRemoteBlocks: 划分本地与远端Block。
  3. fetchUpToMaxBytes: 向远端发起请求,以获取Block。
  4. fetchLocalBlocks: 获取本地Block。
  5. hasNext, next

8.12.3 BlockStoreShuffleReader 详解

BlockStoreShuffleReader 用于Shuffle执行过程中,reduce任务从其他节点的Block文件中读取起始分区(startPatient)和结束分区(endPartition)指定范围内的数据。

它只有read一个方法。

8.12.4 SortShuffleManager 详解

SortShuffleManager 管理基于排序的Shuffle-----输入的记录按照目标分区ID排序,然后输出到一个单独的map输出文件中。reduce为了读取map输出,需要获取map输出文件的连续内容。当map的输出数据太大已经不适合放在内存中时,排序后的输出子集将被溢出到文件中,这些磁盘上的文件将被合并生成最终的输出文件。

  1. registerShuffle: 用于根据条件创建不同的ShuffleHandle实例。
  2. unregisterShuffle 删除Shuffle过程中的所有map任务数据文件和索引文件。
  3. getWriter: 根据ShuffleHandle获取ShuffleWriter。
  4. getReader: 用于获取对map任务输出的分区数据文件中从startPatient到endPartition-1范围内的数据进行读取,供reduce任务使用。

8.13 map端与reduce端的Shuffle组合

经过对map端和reduce端执行代码的分析,读者对Shuffle的整个过程应该有了更深的理解,并且能够对Shuffle进行性能调优。

  1. map端和reduce端都进行聚合
  2. map端缓存和reduce端聚合
  3. map端缓存和reduce端不聚合
  4. map端绕开聚合、排序和reduce端不聚合

Spark内核设计的艺术: 第9章 部署模式

9.1 心跳接收器 HeartbeatReceiver

HeartbeatReceiver 运行在Driver上,用以接收各个Executor的心跳消息,对各个Executor的"生死”进行监控。

  • 注册Executor
    HeartbeatReceiver继承了SparkListener,并实现了onExecutorAdded方法(见代码清单9.1)。根据3.3节的内容,我们知道事件总线在接收到SparkListenerExecutorAdded消息后,将调用HeartbeatReceiver的onExecutorAdded方法,这样HeartbeatReceiver将监听到Executor的添加。

      /**
       * If the heartbeat receiver is not stopped, notify it of executor registrations.
       */
      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
        addExecutor(executorAdded.executorId)
      }

    addExecutor实现如下:

      /**
       * Send ExecutorRegistered to the event loop to add a new executor. Only for test.
       *
       * @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
       *         indicate if this operation is successful.
       */
      def addExecutor(executorId: String): Option[Future[Boolean]] = {
        Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
      }
  • 移除 Executor

  • TaskSchedulerIsSet 消息

  • 检查超时的Executor

  • Executor的心跳

    image

9.2 Executor 的实现分析

9.2.1 Executor的心跳报告

在初始化Executor的过程中,Executor会调用自己的startDriverHeartbeater方法启动心跳报告的定时任务。

9.2.2 运行Task

Executor 的launchTask方法用于运行Task。

9.3 local 部署模式

本书在讲解各章内容时,主要以local模式为例。local部署模式只有Driver,没有Master和Worker,执行任务的Executor与Driver在同一个JVM进程内。local模式中使用的ExecutorBackend和SchedulerBackend的实现类都是LocalSchedulerBackend。在第7章介绍调度系统时已经分析过local部署模式下的LocalSchedulerBackend、LocalEndpoint等组件的实现,本节将以图形的方式展现local部署模式的启动、local部署模式下的任务提交与执行等流程。

local部署模式的启动过程如图9.2所示。

image

9.4 持久化引擎 PersistentEngine

PersistenceEngine用于当Master发生故障后,通过领导选举选择其他Master接替整个集群的管理工作时,能够使得新激活的Master有能力从故障中恢复整个集群的状态信息,进而恢复对集群资源的管理和分配。抽象类PersistenceEngine定义了对Master必需的任何状态信息进行持久化的接口规范。实现PersistenceEngine必须满足以下机制。

  • 在完成新的Application的注册之前,addApplication方法必须被调用。
  • 在完成新的Worker的注册之前,addWorker方法必须被调用。
  • removeApplication方法和removeWorker方法可以在任何时候调用。

在以上机制的保证下,整个集群的Worker、Driver和Application的信息都被持久化,集群因此可以在领导选举和主Master的切换后,对集群状态进行恢复。

image

BlackHolePersistentEngine都是空实现,CustomPersistentEngine是用于单元测试的实现。其他两个是用于生产环境的实现类。

9.4.1 基于文件系统的持久化引擎

FileSystemPersistenceEngine是基于文件系统的持久化引擎。对于ApplicationInfo、WorkerInfo及DriverInfo,FiIeSystemPersistenceEngine会将它们的数据存储到磁盘上的单个文件夹中,当要移除它们时,这些磁盘文件将被删除。由于不同的Master往往不在同一个机器节点上,因此在使用FileSystemPersistenceEngine时,底层的文件系统应该是分布式的。

9.4.2 基于ZooKeeperPersistenceEngine的久化引擎

ZooKeeperPersistenceEngine是基于ZooKeeper的持久化引擎。对于ApplicationInfo、WorkerInfo及DriverInfo,ZooKeeperPersistenceEngine会将它们的数据存储到ZooKeeper的不同节点(也称为Znode)中,当要移除它们时,这些节点将被删除。

ZooKeeperPersistenceEngine有以下属性。

  • conf: 即SparkConf。
  • serializer:持久化时使用的序列化器。
  • WORKING_DIR:ZooKeeperPersistenceEngine在ZooKeeper上的工作目录,是spark基于ZooKeeper进行热备的根节点(可通过spark.deploy.ZooKeeper.dir属性配置,默认为spark)的子节点master_status。
  • zk:连接ZooKeeper的客户端类型为CuratorFramework。

小贴士Curator是Netflix公司开源的一个ZooKeeper客户端,与ZooKeeper提供的原生客户端相比,Curator的抽象层次更高,其核心目标是帮助工程师管理zooKeeper的相关操作,简化ZooKeeper客户端的开发量。Curator现已提升为Apache的顶级项目。Curator-Framework就是Curator提供的APIO

9.5 领导选举代理

领导选举机制(Leader Election)可以保证集群虽然存在多个Master,但是只有一个Master处于激活(Active)状态,其他的Master处于支持(Standby)状态。当Active状态的Master出现故障时,会选举出一个standby状态的Master作为新的Active状态的Master。由于整个集群的worker,Driver和Application的信息都已经通过持久化引擎持久化,因此切换Master时只会影响新任务的提交,对于正在运行中的任务没有任何影响。

特质LeaderElectionAgent定义了对当前的Master进行跟踪和领导选举代理的通用接口,其定义如下。

/**
 * :: DeveloperApi ::
 *
 * A LeaderElectionAgent tracks current master and is a common interface for all election Agents.
 */
@DeveloperApi
trait LeaderElectionAgent {
  val masterInstance: LeaderElectable
  def stop() {} // to avoid noops in implementations.
}

masterInstance 属性类型是LeaderElectable,特质LeaderElectable的定义如下:

@DeveloperApi
trait LeaderElectable {
  def electedLeader(): Unit   // 被选举为领导
  def revokedLeadership(): Unit  // 撤销领导关系
}
  1. MonarchyLeaderAgent 详解
  2. ZookeeperLeadeElectionrAgent 详解

9.6 Master 详解

Master是local-cluster部署模式和Standalone部署模式中,整个Spark集群最为重要的组件之一,它的设计将直接决定整个集群的可扩展性、可用性和容错性。Master的职责包括worker的管理、Application的管理、Driver的管理等。Master负责对整个集群中所有资源的统一管理和分配,它接收各个worker的注册、更新状态、心跳等消息,也接收Driver和Application的注册。

worker向Master注册时会携带自身的身份和资源信息(如ID、host、P酾、内核数、内存大小等),这些资源将按照一定的资源调度策略分配给或Application。Master给Driver分配了资源后,将向Worker发送启动Driver的命令,后者在接收到启动Driver的命令后启动Driver。Master给Application分配了资源后,将向Worker发送启动Executor的命令,后者在接收到启动Executor的命令后启动Executor。

Master接收Worker的状态更新消息,用于“杀死”不匹配的Driver或Application。worker向Master发送的心跳消息有两个目的:一是告知Master自己还“活着”,另外则是某个出现故障后,通过领导选举选择了其他Master负责对整个集群的管理,此时被激活的Master可能并没有缓存worker的相关信息,因此需要告知Worker重新向新的
Master注册。

本节主要对Master进行详细分析,理解local-cluster部署模式和StandaIone部署模式下Master如何对整个集群的资源进行管理和分配,但在此之前先需要按部就班地了解Master包含的属性。

****** 识别结果 1******

9.6.1 启动Master

启动Master有作为JVM进程内的对象启动和作为单独的进程启动的两种方式。以对象启动的方式主要用于local-cluster模式,而作为进程启动则用于standalone模式。

  1. 对象方式启动
    Master的伴生对象的startRpcEnvAndEndpoint方法用于创建Master对象,并将Master对象注册到RpcEnv中完成对Master对象的启动。

      /**
       * Start the Master and return a three tuple of:
       *   (1) The Master RpcEnv
       *   (2) The web UI bound port
       *   (3) The REST server bound port, if any
       */
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
        val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
      }
    }
    1. 进程方式启动

      Master的伴生对象中实现了main方法,这样就可以作为单独的JVM进程启动了。

        def main(argStrings: Array[String]) {
          Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
            exitOnUncaughtException = false))
          Utils.initDaemon(log)
          val conf = new SparkConf
          val args = new MasterArguments(argStrings, conf)
          val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
          rpcEnv.awaitTermination()
        }

9.6.2 检查worker超时

经过上一小节对启动Master的分析,我们知道定时任务checkForWorkerTimeOutTask是以WORKER_TIMEOUT_MS为时间间隔,通过不断向Master自身发送CheckForWorkerTime0ut消息来实现对worker的超时检查的。Master也继承自RpcEndpoint,Master实现的receive方法中处理CheckForWorkerTimeOut消息的代码如下。

 case CheckForWorkerTimeOut =>
   timeOutDeadWorkers()
  /** Check for, and remove, any timed-out workers */
  private def timeOutDeadWorkers() {
    // Copy the workers into an array so we don't modify the hashset while iterating through it
    val currentTime = System.currentTimeMillis()
    val toRemove = workers.filter(_.lastHeartbeat < currentTime - workerTimeoutMs).toArray
    for (worker <- toRemove) {
      if (worker.state != WorkerState.DEAD) {
        val workerTimeoutSecs = TimeUnit.MILLISECONDS.toSeconds(workerTimeoutMs)
        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
          worker.id, workerTimeoutSecs))
        removeWorker(worker, s"Not receiving heartbeat for $workerTimeoutSecs seconds")
      } else {
        if (worker.lastHeartbeat < currentTime - ((reaperIterations + 1) * workerTimeoutMs)) {
          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
        }
      }
    }
  }

9.6.3 被选举为领导时的处理

Master基于高可用性的考虑,可以同时启动多个Master。这些中只有一个是激活(Active)状态的,其余的都是支持(Standby)状态。根据9.5节的介绍,Master为了具备故障迁移的能力,它实现了LeaderElectable接囗,因此当Master被选举为领导时,领导选举代理(LeaderElectionAgent)将会调用Master的electedLeader方法。electedLeader方法的实现如下所示。

  override def electedLeader() {
    self.send(ElectedLeader)
  }

9.6.4 一级资源调度

9.6.5 注册Worker

在Spark集群中,Master接收到提交的应用程序后,需要根据应用的资源需求,将应用分配到worker上去运行。一个集群刚开始的时候只有Master,为了让后续启动的worker加人到Master的集群中,每个Worker都需要在启动的时候向Master注册,Master接收到worker的注册信息后,将把的各种重要信息(如ID、host、port、内核数、内存大小等信息)缓存起来,以便进行资源的分配与调度。Master为了容灾,还将worker的信息通过持久化引擎进行持久化,以便经过领导选举出的新Mar能够将集群的状态从错误或灾难中恢复。

Master的receiveAndReply方法中实现了对Worker发送的RegisterWorker消息进行处理的实现。

9.6.6 更新Worker的最新状态

Worker在向Master注册成功后,会向Master发送WorkerLatestState消息。WorkerLatestState消息将携带Worker的身份标识、worker节点的所有Executor的描述信息、调度到当前Worker的所有Driver的身份标识。Master接收到WorkerLatestState消息的处理。

9.6.7 处理Worker的心跳

向Master注册Worker,可以让Master知道worker的资源配置,进而通过资源调度使得Driver及Executor可以在Worker上执行。如果Worker的JVM进程发生了崩溃或者Worker所在的机器宕机或网络不通,那么Master所维护的关于worker的注册信息将变得不可用。

为了让Master及时得知Worker的最新状态,需要向Master发送心跳,Master将根据worker的心跳更新Worker的最后心跳时间,以便为整个集群的健康工作提供参考。Master的receive方法中实现了对Worker的心跳的处理。

    case Heartbeat(workerId, worker) =>
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          if (workers.map(_.id).contains(workerId)) {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " Asking it to re-register.")
            worker.send(ReconnectWorker(masterUrl))
          } else {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " This worker was never registered, so ignoring the heartbeat.")
          }
      }

根据代码清单,Master接收到Heartbeat消息后,将从idToWorker中找出缓存的WorkerInfo,并将workerlnfo的最后心跳时间(lastHeartbeat)更新为系统当前时间的时间戳。如果idToWorker中没有缓存的workerlnfo,且workers中有对应的Workerinfo(这说明定时任务checkForWorkerTime0utTask检查到worker超时,但是workerlnfo的状态不是DEAD,那么在调用removeworker方法时将workerlnfo从idToWorker中清除,此时的workers中仍然持有WorkerInfo),那么向Worker发送ReconnectWorker消息。如果idToWorker中没有缓存的WorkerInfo,且workers中也没有对应的WorkerInfo,那么说明checkForWorkerTimeOutTask已经发现Worker很长时间没有心跳,并且WorkerInfo的状态为DEAD后,将WorkerInfo从workers中也移除了。

9.6.8 注册Application

9.6.9 处理Executor的申请

9.6.10 处理Executor的状态变化

9.6.11 Master的常用方法

9.7 Worker详解

worker是Spark在local-cluster部署模式和Standa10ne部署模式中对工作节点的资源和Executor进行管理的服务。Worker一方面向Master汇报自身所管理的资源信息,一方面接收Master的命令运行Drrver或者为Apphcatlon运行Executor。同一个机器上可以同时部署多个Worker服务,一个worker也可以启动多个Executor。当Executor完成后,Worker将回收使用的资源。

9.7.1 启动Worker

启动Worker有作为JVM进程内的对象启动和作为单独的进程启动的两种方式。以对象启动的方式主要用于local-cluster模式,而作为进程启动则用于standalone模式。

9.7.2 向Master注册Worker

worker在启动后,需要加人到Master管理的整个集群中,以参与Dnver、Executor的资源调度。Worker要加入Mar管理的集群,就必须将注册到Mastero在启动Worker的过程中需要调用registerWithMaster方法向Master注册Worker。

9.7.3 向Master发送心跳

为了让Mar得知Worker依然健康运行着,就需要不断地告诉Master:“我活着”,这个过程是通过发送心跳实现的。

根据之前的内容我们知道,当Worker向Mar注册成功后会接收到Master回复的RegisteredWorker消息,Worker使用handleRegisterResponse方法处理RegisteredWorker消息时,将会向forwordMessageScheduler提交以HEARTBEATMILLIS作为间隔向Worker自身发送SendHeaflbeat消息的定时任务。Worker的receive方法实现了对SendHeartbeac消息的处理。

9.7.4 Worker与领导选举

9.7.5 运行 Driver

在介绍 Master对 Driver的资源调度和运行时,我们知道 Master将向 Worker发送LaunchDriver消息以运行 Driver。下面一起来看 Worker接收到 LaunchDriver消息后,是如何运行 Driver的。

9.7.6 运行 Executor

9.7.7 处理Executor的状态变化

9.8 StandloneAppClient 实现

StandloneAppClient 是在standalone模式下,Application与集群管理器进行对话的客户端。

9.8.1 ClientEndpoint 的实现分析

Spark各个组件之间的通信离不开RpcEnv以及RpcEndpoint。ClientEndpoint 继承自ThreadSafeRpcEndpoint,也是StandloneAppClient 的内部类,StandloneAppClient 依赖于ClientEndpoint 与集群管理器进行通信。

9.8.2 StandloneAppClient 的实现分析

StandloneAppClient 最为核心的功能是向集群管理器请求或“杀死”Executor。

9.9 StandloneSchedulerBackend 的实现分析

在7.8.2节我们曾经介绍了loca部署模式下, SchedulerBackend的实现类 LocalSchedulerBackend,本节将介绍在local- cluster模式和 Standalone模式下, SchedulerBackend的另个实现类 StandaloneScheduler Backend。由于 Standalonescheduler Backend继承自 CoarseGrainedSchedulerBackend,本节还需要介绍 CoarseGrainedSchedulerBackend及其与其他组件通信的内部类 DriverEndpoint。

Spark内核设计的艺术: 第3章 Spark基础设施

Spark配置

SparkConf 是Spark的配置类,Spark中的每一个组件都直接或者间接的使用这个类存储的属性.

SparkConf中,使用ConcurrentHaskMap来存储这些属性,其中key以及value都是String类型的.

/** 线程安全的,用于存储配置的各种属性 */
private val settings = new ConcurrentHashMap[String, String]()

从系统属性中加载

SparkConf的构造器中有一个布尔类型的loadDefaults,当loadDefaults为true时,将会从系统属性中加载Spark配置,而这些配置的key都是以spark.开头的属性:

if (loadDefaults) {
  // 加载系统中以spark.开头的系统属性
  loadFromSystemProperties(false)
}

/**
  * 加载系统中以spark.开始的系统属性
  *
  * @param silent 是否检查过时属性并打印警告️信息 true:不检查  false 检查
  * @return
  */
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
  // Load any spark.* system properties
  for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
    set(key, value, silent)
  }
  this
}

我们可以从loadFromSystemProperties方法中可以看到,使用Utils工具类获取到系统属性后,进行遍历,遍历时如果是以spark.开头的,就调用SparkConf的set方法存储到setting属性中.set方法的源码如下:

private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
  // 检查key和value,保证key和value都不为null
  if (key == null) {
    throw new NullPointerException("null key")
  }
  if (value == null) {
    throw new NullPointerException("null value for " + key)
  }
  // 是否检查过时警告:false 检查
  if (!silent) {
    logDeprecationWarning(key)
  }
  // 如果key和value都不为null,将key和value存储到settings中
  settings.put(key, value)
  // 返回当前SparkConf的实例
  this
}

使用SparkConf配置的API

从SparkConf的源码中可以看到,set方法被重载了多个,但是,最终都下面这一个set方法:

/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
  set(key, value, false)
}

常用的通过SparkConf设置setMaster, setAppName等属性:

/**
  * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
  * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
  */
def setMaster(master: String): SparkConf = {
  set("spark.master", master)
}

/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
  set("spark.app.name", name)
}

/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
  for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
  set("spark.jars", jars.filter(_ != null).mkString(","))
}

克隆SparkConf配置

在有些情况下,同一个SparkConf实例中的配置信息需要被Spark中的多个组件共用,在SparkConf的源码中可以看到,其继承了Cloneable特质并实现了clone方法,功能与java的一样,就是可以通过克隆来创建.

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable{
		// 省略无关代码
		def this() = this(true)
}

配置读取

Spark提供了设置属性,当然也提供了诸多获取属性的方法,但是最终调用的也都是下面这一个:

/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
		Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
}

在SparkConf的伴生对象中,还将对应版本号过时的配置信息存储到deprecatedConfigs中,对应版本可选参数存储到configsWithAlternatives中,更多关于配置信息,请参见源码.

Spark内置的RPC框架

在Spark中很多地方都涉及到网络通信,比如Spark各个组件间的消息互通,用户文件与Jar包上传,节点间的Shuffle过程,Block数据的复制与备份等.

在Spark2.0.0版本中节点间Shuffle过程和Block数据的复制与备份依然使用Netty.通过对接口和程序接口的重新设计,将各个组件间的消息互通,用户文件与Jar包上传等内容统一纳入Spark的RPC框架体系中.

Spark内置RPC框架的基本架构

image

TransportContext内部包含传输上下文的配置信息TransportConf和对客户端请求消息进行处理的RpcHandler

TransportConf在创建TransportClientFactoryTransportServer时都是必须的,而Rpc.Handler只用于创建TransportServer

①. TransportClientFactory是RPC客户端的工厂类。

②. TransportServer是RPC服务端的实现.

图中记号的含义如下:

① 表示通过调用TransportContextcreateClientFactory方法创建传输客户端工厂TransportClientFactory的实例.

在构造TransportClientFactory的时候,还会传递客户端引导程序TransportClientBootstrap的列表。

此外,TransportClientFactory内部还存在针对每一个Socket地址的连接池ClientPool,这个连接池的定义如下:

private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;

ClientPool定义如下:

private static class ClientPool {
        // ClientPool由TransportClient构成
        TransportClient[] clients;
        // 与每一个TransportClient 一一对应的锁对象,
        // 通过对每个TransportClient分别采用不同的锁,降低并发情况下线程间对锁的争用,减少阻塞,提高并发度
        Object[] locks;

        ClientPool(int size) {
            clients = new TransportClient[size];
            locks = new Object[size];
            for (int i = 0; i < size; i++) {
                locks[i] = new Object();
            }
        }
 }

② 标示通过调用TransportContextcreateServer方法创建传输服务端TransportServer的实例.在构造 TransportServer的实例时,需要传递TransportContext,host,port,RpcHandler以及服务端引导程序TransportServerBootstrap的列表.

Spark RPC框架所包含的各个组件

  • TransportContext: 传输上下文,包含了用于创建传输服务端(Transportserver)和传输客户端工厂(TransportClientFactory)的上下文信息,并支持使用TransportChannelHandler设置Netty提供的SocketChannelPipeline的实现.
  • TransportConf: 传输上下文的配置信息
  • RpcHandler: 对调用传输客户端(TransportClient)的sendRPC方法发送的消息进行处理的程序.
  • MessageEncoder: 在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误.
  • MessageDecoder: 对从管道中读取的ByteBuf进行解析,防止丢包和解析错误.
  • TransportFreameDecoder: 对从管道中读取的ByteBuf按照数据帧进行解析.
  • RpcResponseCallBack: RpcHandler对请求的消息处理完毕后进行回掉的接口.
  • TransportClientFactory: 创建TransportClient的传输客户端工厂类.
  • ClientPool : 在两个对等节点间维护的关于TransportClient的池子.ClientPoolTransportClientFacctory的内部组件.
  • TransportClient: RPC框架的客户端,用于获取预先协商好的流中的连续块.TransportClient旨在允许有效传输大量的数据,这些数据将被拆分成几百KB到MB的块.TransportClient处理从流中获取的块时,实际的设置是在传输层之外完成的.sendRPC方法能够在客户端和服务端的统一水平线进行这些设置.
  • TransportClintBootstrap: 当服务端响应客户端连接时在客户端执行一次的引导程序.
  • TransportRequestHandler: 用于处理客户端的请求并写完块数据后返回的处理程序.
  • TransportChannelHandler: 代理由TransportRequestHandler处理的请求和由TransportResponseHandler处理的响应,并传入传输层的处理.
  • TransportServerBootstrap: 当客户端连接到服务端时在服务端执行一次的引导程序.
  • TransportServer: RPC框架的服务端,提供高效,低级别的流服务

TransportConf

TransportConf: 传输上下文的配置信息

Spark通常使用SparkTransportConf创建TransportConf,可以通过SparkTransportConffromSparkConf方法获取TransportConf实例,获取时需要SparkConf实例,module名称,用于处理网络传输的内核数numUsableCores

SparkTransportConf源码:

/**
 * Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
 * Driver, or a standalone shuffle service) into a TransportConf with details on our environment
 * like the number of cores that are allocated to this JVM.
 */
object SparkTransportConf {defaultNumThreads

  /**
   * Utility for creating a [[TransportConf]] from a [[SparkConf]].
   * @param _conf the [[SparkConf]]
   * @param module the module name
   * @param numUsableCores if nonzero, this will restrict the server and client threads to only
   *                       use the given number of cores, rather than all of the machine's cores.
   *                       This restriction will only occur if these properties are not already set.
   * @param role           optional role, could be driver, executor, worker and master. Default is
   *                      [[None]], means no role specific configurations.
   */
  def fromSparkConf(
      _conf: SparkConf,
      module: String,
      numUsableCores: Int = 0,
      role: Option[String] = None): TransportConf = {
    val conf = _conf.clone
    // specify default thread configuration based on our JVM's allocation of cores (rather than
    // necessarily assuming we have all the machine's cores).
    val numThreads = NettyUtils.(numUsableCores)
    // override threads configurations with role specific values if specified
    // config order is role > module > default
    Seq("serverThreads", "clientThreads").foreach { suffix =>
      val value = role.flatMap { r => conf.getOption(s"spark.$r.$module.io.$suffix") }
        .getOrElse(
          conf.get(s"spark.$module.io.$suffix", numThreads.toString))
      conf.set(s"spark.$module.io.$suffix", value)
    }

    new TransportConf(module, new ConfigProvider {
      override def get(name: String): String = conf.get(name)
      override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
      override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
        conf.getAll.toMap.asJava.entrySet()
      }
    })
  }
}

fromSparkConf最终构造TransportConf对象时传递的ConfigProvider为实现get方法的匿名内部类,get的实现实际是代理了SparkConf的get方法。

TransportClientFactory

TransportClientFactory是创建TransportClient的工厂类。

TransportContextcreateClientFactory方法可以创建出TransportClientFactory实例。

代码清单:

  /**
   * Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
   * a new Client. Bootstraps will be executed synchronously, and must run successfully in order
   * to create a Client.
   */
   public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
    return new TransportClientFactory(this, bootstraps);
  }

   public TransportClientFactory createClientFactory() {
    return createClientFactory(new ArrayList<>());
  }

TransportClientFactory的函数实现比较重要,理解其中的参数的含义。

客户端引导程序TransportClientBootstrap

TransportClientFactoryclientBootstraps属性是TransportClientBootstrap的列表。TransportClientBootstrap是在TranportClient上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。TransportClientBootstrap所做的操作往往是昂贵的,好在建立的连接可以重用。

public interface TransportClientBootstrap {
  void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

创建RPC客户端TransportClient

有了TransportClientFactory,Spark的各个模块就可以使用它创建RPC客户端TransportClient。每个TransportClient实例只能和一个远端的RPC服务通信,所以Spark中的组件如果想要和多个RPC服务通信,就需要持有多个TransportClient实例。创建TransportClient的方法如下(实际为从缓存中获取TransportClient):

public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
  //创建InetSocketAddress
  final InetSocketAddress unresolvedAddress =
    InetSocketAddress.createUnresolved(remoteHost, remotePort);
 
  // Create the ClientPool if we don't have it yet.
  ClientPool clientPool = connectionPool.get(unresolvedAddress);
  if (clientPool == null) {
    connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
    clientPool = connectionPool.get(unresolvedAddress);
	.......
}

1)调用InetSocketAddress的静态方法createUnresolved构建InetSocketAddress(这种方式创建InetSocketAddress,可以在缓存中已经有TransportClient时避免不必要的域名解析),然后从connectionPool中获取与此地址对应的ClientPool,如果没有则需要新建ClientPool,并放入缓存connectionPool中。

2)根据numConnectionsPerPeer的大小(使用“spark.+模块名+.io.numConnectionsPerPeer”属性配置),从ClientPool中随机选择一个TransportClient

3)如果ClientPool的clients数组中在随机产生的索引位置不存在TransportClient或者TransportClient没有激活,则进入第5步,否则对此TransportClient进行第4步的检查。

4)更新TransportClient的channel中配置的TransportChannelHandler的最后一次使用时间,确保channel没有超时,然后检查TransportClient是否是激活状态,最后返回此TransportClient给调用方。

5)由于缓存中没有TransportClient可用,于是调用InetSocketAddress的构造器创建InetSocketAddress对象(直接使用InetSocketAddress的构造器创建InetSocketAddress会进行域解析),在这一步骤多个线程可能会产生竞态条件(由于没有同步处理,所以多个线程极有可能同时执行到此处,都发现缓存中没有TransportClient可用,于是都使用InetSocketAddress的构造器创建InetSocketAddress)

6)第5步创建InetSocketAddress的过程中产生的竞态条件如果不妥善处理,会产生线程安全问题,所以到了ClientPool的locks数组发挥作用的时候了。按照随机产生的数组索引,locks数组中的锁对象可以对clients数组中的TransportClient一对一进行同步。即使之前产生了竞态条件,但是这一步只能有一个线程进入临界区。在临界区内,先进入的线程调用重载的createClient方法创建TransportClient对象并放入ClientPool的clients数组中。当率先进入临界区的线程退出临界区后,其它线程才能进入,此时发现ClientPool的clients数组中已经存在了TransportClient对象,那么将不再创建TransportClient,而是直接使用它。

上述代码整个执行过程实际解决了TransportClient缓存的使用及createClient方法的线程安全问题,并没有涉及创建TransportClient的实现。TransportClient的创建过程在重载的createClient方法:

创建TransportClient步骤如下:

1)构建根引导程序Bootstrap并对其进行配置

2)为根引导程序设置管道初始化回调函数,此回调函数将调用TransportContext的initializePipeLine方法初始化Channel的pipeline。

3)使用根引导程序连接远程服务器,当连接成功对管道初始化时会回调初始化回调函数,将TransportClient和Channel对象分别设置到原子引用clientRef与channelRef中

4)给TransportClient设置客户端引导程序,即设置TransportClientFactory中的TransportClientBootstrap列表

5)返回此TransportClient对象

RPC服务端TransportServer

TransportServer是RPC框架的服务端,可提供高效、低级别的流服务,TransportContextcreateServer方法用于创建TransportServer,其实现如下:

public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
  return new TransportServer(this, null, port, rpcHandler, bootstraps);
}

管道初始化

在创建TransportClient和对TranportServer初始化的实现中,都在管道初始化回调函数中调用了TranportContextinitializePipeline方法,initializePipeline方法将调用Netty的API对管道初始化。

image

TransportChannelHandler详解

image

服务端RpcHandler详解

由于TransportRequestHandler实际是把请求消息交给RpcHandler进行处理,RpcHandler是一个抽象类,定义了一些RPC处理器的规范,代码下:

public abstract class RpcHandler {
  private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
  //抽象方法,用来接收单一的RPC消息,具体处理逻辑需要子类去实现
  public abstract void receive(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback);
  //获取Streammanager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被TransportClient获取的流的状态
  public abstract StreamManager getStreamManager();
  //重载receive方法,RpcResponseCallback为默认的ONE_WAY_CALLBACK
  public void receive(TransportClient client, ByteBuffer message) {
    receive(client, message, ONE_WAY_CALLBACK);
  }
  //当与给定客户端相关联的channel处于活动状态时调用
  public void channelActive(TransportClient client) { }
  //当与给定客户端相关联的channel处于非活动状态时调用
  public void channelInactive(TransportClient client) { }
  //当channel产生异常时调用
  public void exceptionCaught(Throwable cause, TransportClient client) { }
}

服务端引导程序TransportServerBootstrap

TransportServer的构造器中的bootstrapsTranportServerBootstrap的列表。接口TransportServerBootstrap定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。TransportServerBootstrap的定义如下:

public interface TransportServerBootstrap {
  RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

客户端TransportClient详解

学习完服务端RpcHandler对请求消息的处理后,接下来学习客户端发送RPC请求的原理。 TransportContext的createChannelHandler方法中调用了TransportClient的构造器,其中TranportResponseHandler的引用将赋给handler属性。

public TransportClient(Channel channel, TransportResponseHandler handler) {
  this.channel = Preconditions.checkNotNull(channel);
  this.handler = Preconditions.checkNotNull(handler);
  this.timedOut = false;
}

TranportClient一共有5个方法用于发送请求,分别如下:

1)fetchChunk:从远端协商好的流中请求单个块

2)stream:使用流的ID,从远端获取流数据

3)sendRpc:向服务端发送RPC的请求,通过At least Once Delivery原则保证请求不会丢失

4)sendRpcSync:向服务端发送异步的RPC请求,并根据指定的超时时间等待响应

5)send:向服务端发送RPC的请求,但是并不期望能获取响应,因此不能保证投递的可靠性

事件总线

事件总线介绍

Spark 定义了一个特质 ListenerBus,可以接受事件并且将事件提交到对应事件的监听器。

该特征主要有一个 listeners 成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]

该特征还有几个主要的函数:

  • addListener:添加 listener
  • doPostEvent:给特定 listener 发送事件,该方法具体需要子类实现
  • findListenersByClass:根据类型查找 listener 列表
  • postToAll: 把事件发送给所有的 listener,虽然 CopyOnWriteArrayList 是线程安全的,但 postAll 引入了“先检查后运行”的逻辑,因此该方法不是线程安全的。
  • removeListener:删除 listener
  • removeListenerOnError:内部调用 removeListener,可由子类覆盖

ListenerBus 继承体系

image

每个 ListenerBus 用于将不同的 Event 投递到不同的Listener 中,下面以主要分析下 LiveListenerBus

LiveListenerBus 详解

LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。

LiveListenerBus 中的属性:

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

private def validateAndGetQueueSize(): Int = {
    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
    if (queueSize <= 0) {
      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
    }
    queueSize
  }

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
  .intConf
  .createWithDefault(10000)
  • eventQueue:是 SparkListenerEvent 事件的阻塞队列,队列大小可以通过 Spark 属性 spark.scheduler.listenerbus.eventqueue.size 进行配置,默认为 10000;

  • started:标记 LiveListenerBus 的启动状态的 AtomicBoolean 类型的变量;

  • stopped:标记LiveListenerBus的停止状态的 AtomicBoolean 类型的变量;

  • droppedEventsCounter:使用 AtomicLong 类型对删除的事件进行计数,每当日志打印了 droppedEventsCounter 后,会将 droppedEventsCounter 重置为0;

  • lastReportTimestamp:记录最后一次日志打印 droppedEventsCounter 的时间戳;

  • processingEvent:暗示当前正有事件在被 listenerThread 线程处理;

  • logDroppedEvent:标记是否由于 eventQueue 已满,导致新的事件被删除;

  • eventLock:表示队列中事件产生和消费的一个计数器,当有新的事件到来时释放信号量,当对事件进行处理时获取信号量,eventLock = new Semaphore(0);

  • listenerThread:异步处理事件的线程;

异步事件处理线程

private val listenerThread = new Thread(name) {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
      LiveListenerBus.withinListenerThread.withValue(true) {
        while (true) {
          eventLock.acquire()
          self.synchronized {
            processingEvent = true
          }
          try {
            val event = eventQueue.poll
            if (event == null) {
              // Get out of the while loop and shutdown the daemon thread
              if (!stopped.get) {
                throw new IllegalStateException("Polling `null` from eventQueue means" +
                  " the listener bus has been stopped. So `stopped` must be true")
              }
              return
            }
            postToAll(event)
          } finally {
            self.synchronized {
              processingEvent = false
            }
          }
        }
      }
    }
  }

代码不算复杂,主要逻辑是:

  • 设置为 daemon thread;
  • 不断获取信号量,如果没有就会阻塞,有信号释放才会往下运行(这是依靠 new Semaphore(0)实现的,在 spark 后面的版本中,是直接用阻塞队列的 take() 方法实现。);
  • 同步控制,将 processingEvent 设置为 true;
  • 从 eventQueue 中获取事件;
  • 调用超类 ListenerBus 的 postToAll 方法,对监听器进行遍历,并调用 SparkListenerBus 的 doPostEvent 方法对事件进行匹配后执行监听器的相应方法;;
  • 每次循环结束同步控制,将 processingEvent 设置为 false;

异步事件处理线程的事件来源

DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。

 def post(event: SparkListenerEvent): Unit = {
    if (stopped.get) {
      // Drop further events to make `listenerThread` exit ASAP
      logError(s"$name has already stopped! Dropping event $event")
      return
    }
    val eventAdded = eventQueue.offer(event)
    if (eventAdded) {
      eventLock.release()
    } else {
      onDropEvent(event)
      droppedEventsCounter.incrementAndGet()
    }

    val droppedEvents = droppedEventsCounter.get
    if (droppedEvents > 0) {
      // Don't log too frequently
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        // There may be multiple threads trying to decrease droppedEventsCounter.
        // Use "compareAndSet" to make sure only one thread can win.
        // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
        // then that thread will update it.
        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
            new java.util.Date(prevLastReportTimestamp))
        }
      }
    }
  }
  • 先判断 LiveListenerBus 是否停止,停止记录错误日志,返回;

  • 向eventQueue中添加事件:

    • 如果成功,就释放信号量,这时 listenerThread 中的 eventLock.acquire() 就可以后去信号量,从队列取出事件进行后续操作;
    • 如果失败,则移除事件 onDropEvent,并对删除事件计数器进行自增 droppedEventsCounter.incrementAndGet();
  • 如果有事件被删除,并且当前系统时间距离上一次打印 droppedEventsCounter 超过了 60 秒则重置 droppedEventsCounter 计算为0,并更新 lastReportTimestamp 为当前系统时间

流程总结

image

Spark系列: Transformations算子讲解

Value数据类型的Transformation 

输入分区与输出分区一对一型

map

将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。

下图每个方框表示一个 RDD 分区,左侧的分区经过用户自定义函数 f:T->U 映射为右侧的新 RDD 分区。但是,实际只有等到 Action算子触发后,这个 f 函数才会和其他函数在一个stage 中对数据进行运算。

image

flatMap

将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,内部创建 FlatMappedRDD(this,sc.clean(f))。

下图 表 示 RDD 的 一 个 分 区 ,进 行 flatMap函 数 操 作, flatMap 中 传 入 的 函 数 为 f:T->U, T和 U 可以是任意的数据类型。将分区中的数据通过用户自定义函数 f 转换为新的数据。外部大方框可以认为是一个 RDD 分区,小方框代表一个集合。 V1、 V2、 V3 在一个集合作为 RDD 的一个数据项,可能存储为数组或其他容器,转换为V’1、 V’2、 V’3 后,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项。

image

mapPartitions

mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 行 操 作。 内 部 实 现 是 生 成MapPartitionsRDD。

下图中的方框代表一个 RDD 分区。图中,用户通过函数 f (iter)=>iter.filter(_>=3) 对分区中所有数据进行过滤,大于和等于 3 的数据保留。一个方块代表一个 RDD 分区,含有 1、 2、 3 的分区过滤只剩下元素 3。

image

glom

glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。

图中的每个方框代表一个RDD分区。该图表示含有V1、 V2、 V3的分区通过函数glom形成一数组Array[(V1),(V2),(V3)]

image

输入分区与输出分区多对一型

union

使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重可以使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。

图中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。含有V1、V2、U1、U2、U3、U4的RDD和含有V1、V8、U5、U6、U7、U8的RDD合并所有元素形成一个RDD。V1、V1、V2、V8形成一个分区,U1、U2、U3、U4、U5、U6、U7、U8形成一个分区。

image

>>> rdd1 = sc.parallelize([1, 2, 3])
>>> rdd1
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
>>> rdd2 = sc.parallelize([4, 5, 6])
>>> rdd3 = sc.parallelize([7, 8, 9])
>>> rdd = sc.union([rdd1, rdd2, rdd3])
>>> rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9]

>>> first = rdd1.union(rdd2)
>>> first.collect()
[1, 2, 3, 4, 5, 6]

cartesian

对 两 个 RDD 内 的 所 有 元 素 进 行 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD

图中左侧大方框代表两个RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。

例 如: V1 和 另 一 个 RDD 中 的 W1、 W2、 Q5 进 行 笛 卡 尔 积 运 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)。

image

>>> rdd1 = sc.parallelize([1, 2, 3])
>>> rdd2 = sc.parallelize([4, 5, 6])
>>> rdd = rdd1.cartesian(rdd2)
>>> rdd.collect()
[(1, 4), (1, 5), (1, 6), (2, 4), (2, 5), (2, 6), (3, 4), (3, 5), (3, 6)]

输入分区与输出分区多对多型

groupByKey(Avoid)

将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
函数实现如下:

  • 将用户函数预处理:val cleanF = sc.clean(f)

  • 对数据进行map函数操作,最后再进行 groupByKey 分组操作。

    this.map(t => (cleanF(t), t)).groupByKey(p) 其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。

    image

输出分区为输入分区子集型

filter

filter 函数功能是对元素进行过滤,对每个元素应用 f 函数, 返回值为true的元素在RDD中保留,返回值为false 的元素将被过滤掉。内部实现相当于生成FilteredRDD(this,sc.clean(f))。下面代码为函数的本质实现:

def filter(f:T=>Boolean):
	RDD[T] = new FilteredRDD(this, sc.clean(f))

image

distinct

distinct将RDD中的元素进行去重操作。

image

subtract

subtract相当于进行集合的操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。

图中左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。 右侧大方框
代表合并后的RDD,大方框内的小方框代表分区。 V1在两个RDD中均有,根据差集运算规则,新RDD不保留,V2在第一个RDD有,第二个RDD没有,则在新RDD元素中包含V2。

image

sample

sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。

函数参数设置:

  • withReplacement=true,表示有放回的抽样。
  • withReplacement=false,表示无放回的抽样。

通 过 sample 函 数, 采 样 50% 的 数 据。V1、 V2、 U1、 U2、U3、U4 采样出数据 V1 和 U1、 U2 形成新的 RDD。

image

takeSample

takeSample函数和上面的sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组。

通过takeSample对数据采样,设置为采样一份数据,返回结果为V1。

image

Cache型

cache

cache 将 RDD 元素从磁盘缓存到内存。相当于persist(MEMORY_ONLY) 函数的功能。

图中每个方框代表一个 RDD 分区,左侧相当于数据分区都存储在磁盘,通过 cache 算子将数据缓存在内存。

image

persist

persist 函数对 RDD 进行缓存操作。数据缓存在哪里依据 StorageLevel 这个枚举类型进行确定。 有以下几种类型的组合: DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进行序列化存储。

下面为函数定义, StorageLevel 是枚举类型,代表存储模式,用户可以通过图 按需进行选择。 persist(newLevel:StorageLevel)
例如,MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他同理。

image

Key-Value数据类型的Transfromation

输入分区与输出分区一对一(???)

mapValues

针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。

图中的方框代表 RDD 分区。 a=>a+2 代表对 (V1,1) 这样的(Key Value)数据对,数据只对 Value 中的 1 进行加 2 操作,返回结果为 3。

image

对单个RDD聚集(PairRDDFunctions)

combineByKey

手动标星

查看源码

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
    	combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
      	partitioner, mapSideCombine, serializer)(null)
  }

说明:

  • createCombiner: V => C: C 不存在的情况下,比如通过 V 创建 seq C。

  • mergeValue: (C, V) => C: 当 C 已经存在的情况下,需要 merge,比如把 item V 加到 seq C 中,或者叠加。

  • mergeCombiners: (C, C) => C,合并两个 C。

  • partitioner: Partitioner:Shuffle 时需要的 Partitioner。

  • mapSideCombine : Boolean = true: 为了减小传输量,很多 combine 可以在 map端先做,比如叠加,可以先在一个 partition 中把所有相同的 key 的 value 叠加,再 shuffle。

  • serializerClass: String = null,传输需要序列化,用户可以自定义序列化类

    image

reduceByKey

查看源码

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] =     
	self.withScope {
    	combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

reduceByKey 是比 combineByKey 更简单的一种情况,只是两个值合并成一个值,( Int, Int V)to (Int, Int C),比如叠加。所以 createCombiner reduceBykey 很简单,就是直接返回 v,而 mergeValue和 mergeCombiners 逻辑是相同的,没有区别。

image

partitionBy

查看源码

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
}

对RDD进行分区操作。

如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。

图的方框代表RDD分区。 通过新的分区策略将原来在不同分区的V1、 V2数据都合并到了一个分区。

image

对两个RDD聚集

cogroup

查看源码

def cogroup[W](
      other: RDD[(K, W)],
      numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
      cogroup(other, new HashPartitioner(numPartitions))
}

连接(PairRDDFunctions)

join

查看源码

join 对两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。

下面代码为 join 的函数实现,本质是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再通过flatMapValues将合并的数据打散。

this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }

图 是对两个 RDD 的 join 操作示意图。大方框代表 RDD,小方框代表 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 做连接后结果为 (V1,(1,1)) 和 (V1,(1,2))。

image

leftOutJoin 和 rightOutJoin

Spark内核设计的艺术: 第7章 调度系统

7.1 调度系统概述

第一层:Cluster Manager(在YARN模式下为ResourceManager,在Mesos模式下是Mesos Master,在Standlone模式下是Master)将资源分配给Application

第二层:Application进一步将资源分配给Application的各个Task。

我们这里主要讲解第二层。

image

7.2 RDD 详情

更多RDD的内容,请查看 #1

7.3 Stage 详情

DAGScheduler会将Job的RDD划分到不同的stage,并构建这些stage的依赖关系。这样可以使得没有依赖关系的stage并行执行,有依赖关系的stage顺序执行。

7.3.1 ResultStage 的实现

ResultStage 可以使用指定的函数对RDD中的分区进行计算并得出最终结果。ResultStage 是最后执行的stage,此阶段主要进行作业的收尾工作。

  /**
   * The active job for this result stage. Will be empty if the job has already finished
   * (e.g., because the job was cancelled).
   */
  private[this] var _activeJob: Option[ActiveJob] = None

  def activeJob: Option[ActiveJob] = _activeJob

  def setActiveJob(job: ActiveJob): Unit = {
    _activeJob = Option(job)
  }

  def removeActiveJob(): Unit = {
    _activeJob = None
  }

  /**
   * Returns the sequence of partition ids that are missing (i.e. needs to be computed).
   *
   * This can only be called when there is an active job.
   */
  override def findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
  }

7.3.2 ShuffleMapStage 的实现

ShuffleMapStage是DAG调度流程的中间stage,他可以包括一到多个ShuffleMapTask,这些ShuffleMapTask将生成用于Shuffle的数据。

7.3.3 StageInfo

StageInfo用于描述stage信息,并可以传递给SparkListener。StageInfo包括以下属性。

  • stageId
  • attemptId
  • name
  • numTasks
  • rddInfos

........

StageInfo 提供了一个当stage失败时要调用的方法,stageFailed

  def stageFailed(reason: String) {
    failureReason = Some(reason)
    completionTime = Some(System.currentTimeMillis)
  }

StageInfo 伴生对象提供了 构建stageInfo的方法。

7.4 面向DAG的调度器 DAGScheduler

JobListener 用于对作业中的每个Task执行成功或失败进行监听,JobWaiter实现了JobListener 并最终确定作业的成功或失败。

7.4.1 JobListenerJobWaiter

JobListener 定义了所有Job的监听器的接口规范:

/**
 * Interface used to listen for job completion or failure events after submitting a job to the
 * DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
 * job fails (and no further taskSucceeded events will happen).
 */
private[spark] trait JobListener {
  def taskSucceeded(index: Int, result: Any): Unit
  def jobFailed(exception: Exception): Unit
}

JobWaiter实现了JobListener 。

7.4.2 ActiveJob 详解

ActiveJob用来表示已经激活的Job,即被DAGScheduler接收处理的Job。

7.4.3 DAGSchedulerEventProcessLoop 的简要介绍

DAGSchedulerEventProcessLoop 是DAGScheduler内部的事件循环处理器,用于处理DAGSchedulerEvent类型的事件。它能够处理的事件包括:

源码

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)

    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val workerLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      }
      dagScheduler.handleExecutorLost(execId, workerLost)

    case WorkerRemoved(workerId, host, message) =>
      dagScheduler.handleWorkerRemoved(workerId, host, message)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case SpeculativeTaskSubmitted(task) =>
      dagScheduler.handleSpeculativeTaskSubmitted(task)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

7.4.4 DAGScheduler 的组成

7.4.5 DAGScheduler 提供的常用方法

  • clearCacheLocs(): Unit 清空cacheLocs中的缓存的各个rdd的所有分区的位置信息。
  • updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit 更新Job的身份标识与stage及其祖先的映射关系。
  • activeJobForStage(stage: Stage): Option[Int] 找到stage所有已经激活的job的身份标识。
  • getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized 获取rdd的各个分区的TaskLocation序列
  • getPreferredLocsInternal 获取rdd的指定分区的偏好位置
  • getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] 获取rdd的指定分区的偏好位置
  • handleExecutorAdded(execId: String, host: String) 用于将Executor的身份标识从failedEpoch中移除
  • executorAdded(execId: String, host: String): Unit 用于投递 ExecutorAdded事件

7.4.6 DAGScheduler 与 Job的提交

  1. 提交Job

用户提交的Job首先会被转换成一系列RDD,然后才交给DAGScheduler 进行处理。DAGScheduler 的runJob是这一过程的入口。

  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrase(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

submitJob的实现。

  1. 处理Job的提交

DAGSchedulerEventProcessLoop 接收到JobSubmitted事件后,将调用DAGScheduler 的handleJobSubmitted方法。

7.4.7 构建Stage

Job中所有Stage提交过程包括反向驱动和正向提交。

  1. 构建ResultStage
  2. 获取或创建父Stage列表
  /**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

7.4.7 提交ResultStage

7.4.9 提交还未计算的 Task

submitMissingTasks方法。此方法在Stage中没有不可用的父Stage时,提交当前Stage还未提交的任务。

7.4.10 DAGScheduler 的调度流程

7.4.11 Task 执行结果的处理

  1. ResultTask的结果处理
  2. ShuffleMapTask的结果处理

7.5 调度池 Pool

TaskSchedulerImpl对Task的调度依赖于调度池Pool。

7.5.1 调度算法

特质 SchedulingAlgorithm定义了调度算法的规范。

/**
 * An interface for sort algorithm
 * FIFO: FIFO algorithm between TaskSetManagers
 * FS: FS algorithm between Pools, and FIFO or FS within Pools
 */
private[spark] trait SchedulingAlgorithm {
  def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
  1. FIFOSchedulingAlgorithm 先进先出
  2. FairSchedulingAlgorithm 公平调度

7.5.2 Pool的实现

Pool是对TaskSet进行调度的调度池。调度池内部有一个根调度队列,包含了多个子调度池。子调度池自身的调度队列中还包含其他调度池或者TaskSetManager,所以整个调度池是一个多层次的调度队列。

  • addTaskSetManager 该方法将TaskSetManager装载到rootPool中。直接调用的方法是Pool#addSchedulable()。
  • removeSchedulable

7.5.3 调度池构建器

SchedulableBuilder定义了调度池构建器的行为规范。

/**
 * An interface to build Schedulable tree
 * buildPools: build the tree nodes(pools)
 * addTaskSetManager: build the leaf nodes(TaskSetManagers)
 */
private[spark] trait SchedulableBuilder {
  def rootPool: Pool

  def buildPools(): Unit

  def addTaskSetManager(manager: Schedulable, properties: Properties): Unit
}

7.6 任务集合管理器 TaskSetManager

TaskSetManager也实现了Schedulable特质,并参与到调度池的调度当中。

7.6.1 Task集合

DAGScheduler 将Task提交到TaskScheduler 时,需要将多个Task打包问TaskSet。TaskSet是整个调度池中对Task进行调度管理的基本单位,由调度池中的TaskSetManager来管理。

/**
 * A set of tasks submitted together to the low-level TaskScheduler, usually representing
 * missing partitions of a particular stage.
 */
private[spark] class TaskSet(
    val tasks: Array[Task[_]],
    val stageId: Int,
    val stageAttemptId: Int,
    val priority: Int,
    val properties: Properties) {
  val id: String = stageId + "." + stageAttemptId

  override def toString: String = "TaskSet " + id
}

7.6.2 TaskSetManager 的成员属性

7.6.3 调度池与推断执行

  • checkSpeculatableTasks 检测当前TaskSetManager 中是否存在需要推断的任务。
  • dequeueSpeculativeTask 根据指定的Host、Executor和本地性级别,从可推断的Task中找出可推断的Task在TaskSet中的索引和相应的本地性级别。

7.6.4 Task本地性

7.6.5 TaskSetManager 的常用的方法

7.7 运行器后端接口 LauncherBackend

7.7.1 BackendConnection的实现

BackendConnection是LauncherBackend的内部组件,用于保持与LauncherServer的Socket连接,并通过此Socket收发消息。

private class BackendConnection(s: Socket) extends LauncherConnection(s) 
  • handle 处理LauncherServer发送的消息

        override protected def handle(m: Message): Unit = m match {
          case _: Stop =>
            fireStopRequest()
    
          case _ =>
            throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
        }
  • close 关闭socket链接

        override def close(): Unit = {
          try {
            super.close()
          } finally {
            onDisconnected() // 空方法
            _isConnected = false
          } 
        }

7.7.2 LauncherBackend 的实现

LauncherBackend 是 SchedulerBackend与LauncherServer通信的组件。

  • connect: 与LauncherServer建立连接。
  • setAppId: 向LauncherServer发送SetAPPId消息。此消息携带者应用程序的身份标识。
  • setState: 发送SetState消息,此消息携带着LauncherBackend 最后一次的状态。
  • isConnected: 返回clientThread是否与LauncherServer已经简历了Socket连接的状态。
  • onStopRequest : LauncherBackend定义处理LauncherServer的停止消息的抽象方法。
  • onDisconnected
  • fireStopRequest

7.8 调度后端接口 SchedulerBackend

SchedulerBackend是TaskScheduler的调度后端接口,TaskScheduler给Task分配资源实际上是通过SchedulerBackend来完成的,SchedulerBackend给Task分配完资源后将与分配给Task的Executor通信,并要求后者运行Task。

7.8.1 SchedulerBackend 的定义

7.8.2 LocalSchedulerBackend 的实现分析

7.9 任务结果获取器 TaskResultGetter

7.10 任务调度器 TaskScheduler

Spark系列: Action算子讲解

Action

本质上在 Action 算子中通过 SparkContext 进行了提交作业的 runJob 操作,触发了RDD DAG 的执行。
例如, collect,感兴趣的读者可以顺着这个入口进行源码剖析

无输出(RRD)

foreach

查看源码

def foreach(f: T => Unit): Unit = withScope {
	val cleanF = sc.clean(f)
	sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

foreach 对 RDD 中的每个元素都应用 f 函数操作,不返回 RDD 和 Array, 而是返回Uint。

图表示 foreach 算子通过用户自定义函数对每个数据项进行操作。本例中自定义函数为 println(),控制台打印所有数据项。

image

HDFS (RRD)

saveAsTextFile

查看源码

 /**
 * Save this RDD as a compressed(压缩过的) text file, using string representations of elements.
 */
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
    this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        require(x != null, "text files do not allow null rows")
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}

函数将数据输出,存储到 HDFS 的指定目录。通过调用saveAsHadoopFile,将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS。

image

saveAsObjectFile

查看源码

/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String): Unit = withScope {
	this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
		.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
		.saveAsSequenceFile(path)
}

saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。

图左侧方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS上的一个Block。

image

Scala集合和数据类型(RRD)

collect

查看源码

/**
* Return an array that contains all of the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T] = withScope {
	val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
	Array.concat(results: _*)
}

collectAsMap

collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素。

图中的左侧方框代表RDD分区,右侧方框代表单机数组。 数据通过collectAsMap函数返回给Driver程序计算结果,结果以HashMap形式存储。

image

reduceByKeyLocally

实现是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。

lookup

def lookup(key: K): Seq[V]

lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。返回一个WrappedArray【包装类数组】

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21

scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)

scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)

count

count 返回整个 RDD 的元素个数。

/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

image

top

 /**
 * Returns the top k (largest) elements from this RDD as defined by the specified
 * implicit Ordering[T] and maintains the ordering. This does the opposite of
 * [[takeOrdered]]. For example:
 * {{{
 *   sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
 *   // returns Array(12)
 *
 *   sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
 *   // returns Array(6, 5)
 * }}}
 *
 * @note This method should only be used if the resulting array is expected to be small, as
 * all the data is loaded into the driver's memory.
 *
 * @param num k, the number of top elements to return
 * @param ord the implicit ordering for T
 * @return an array of top elements
 */
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  takeOrdered(num)(ord.reverse)
}

top可返回最大的k个元素。

相近函数说明如下。

  • top(n), 降序排序,返回RDD中从0到n-1下标的元素,即返回最大的n个元素。
  • take(n)用于获取RDD中从0到n-1下标的元素,不排序。
  • takeOrdered(n), 和top(n)相反,返回最小的n个元素,并且在返回的数组中保持元素的顺序。
  • first() 返回整个RDD中的前1个元素。

reduce

查看源码

/**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
  def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }

reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。

reduceLeft先对两个元素<K,V>进行reduce函数操作,然后将结果和迭代器取出的下一个元素<k,V>进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。在RDD中,先对每个分区中的所有元素<K,V>的集合分别进行reduceLeft。 每个分区形成的结果相当于一个元素<K,V>,再对这个结果集合进行reduceleft操作。

fold

查看源码

  /**
   * Aggregate the elements of each partition, and then the results for all the partitions, using a
   * given associative function and a neutral "zero value". The function
   * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
   * allocation; however, it should not modify t2.
   *
   * This behaves somewhat differently from fold operations implemented for non-distributed
   * collections in functional languages like Scala. This fold operation may be applied to
   * partitions individually, and then fold those results into the final result, rather than
   * apply the fold to each element sequentially in some defined ordering. For functions
   * that are not commutative, the result may differ from that of a fold applied to a
   * non-distributed collection.
   *
   * @param zeroValue the initial value for the accumulated result of each partition for the `op`
   *                  operator, and also the initial value for the combine results from different
   *                  partitions for the `op` operator - this will typically be the neutral
   *                  element (e.g. `Nil` for list concatenation or `0` for summation)
   * @param op an operator used to both accumulate results within a partition and combine results
   *                  from different partitions
   */
  def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。

aggregate

查看源码

/**
   * Aggregate the elements of each partition, and then the results for all the partitions, using
   * given combine functions and a neutral "zero value". This function can return a different result
   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
   * allowed to modify and return their first argument instead of creating a new U to avoid memory
   * allocation.
   *
   * @param zeroValue the initial value for the accumulated result of each partition for the
   *                  `seqOp` operator, and also the initial value for the combine results from
   *                  different partitions for the `combOp` operator - this will typically be the
   *                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
   * @param seqOp an operator used to accumulate results within a partition
   * @param combOp an associative operator used to combine results from different partitions
   */
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。

aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。 而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。

最后,介绍两个计算模型中的两个特殊变量。

  • 广播(broadcast)变量:其广泛用于广播Map Side Join中的小表,以及广播大变量等场景。 这些数据集合在单节点内存能够容纳,不需要像RDD那样在节点之间打散存储。
    Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。 相比Hadoo的distributed cache,广播的内容可以跨作业共享。 Broadcast的底层实现采用了BT机制。
  • accumulator变量:允许做全局累加操作,如accumulator变量广泛使用在应用中记录当前的运行指标的情景。

Spark快速大数据分析: 第5章 数据读取与保存

5.1 动机

Spark 支持很多种输入输出源。一部分原因是 Spark 本身是基于 Hadoop 生态圈而构建,特别是 Spark 可以通过 Hadoop MapReduce 所使用的 InputFormat 和 OutputFormat 接口访问数据,而大部分常见的文件格式与存储系统(例如 S3、HDFS、Cassandra、HBase 等)都支持这种接口。

  • 文件格式与文件系统

    对于存储在本地文件系统或分布式文件系统(比如 NFS、HDFS、Amazon S3 等)中的 数据,Spark 可以访问很多种不同的文件格式,包括文本文件、JSON、SequenceFile, 以及 protocol buffer。我们会展示几种常见格式的用法,以及 Spark 针对不同文件系统 的配置和压缩选项。

  • Spark SQL中的结构化数据源

    第 9 章会介绍 Spark SQL 模块,它针对包括 JSON 和 Apache Hive 在内的结构化数据 源,为我们提供了一套更加简洁高效的 API。此处会粗略地介绍一下如何使用 Spark SQL,而大部分细节将留到第 9 章讲解。

  • 数据库与键值存储

    本章还会概述 Spark 自带的库和一些第三方库,它们可以用来连接 Cassandra、HBase、 Elasticsearch 以及 JDBC 源。

5.2 文件格式

表5-1: Spark支持的一些常见格式

格式名称 结构化 备注
文本文件 普通的文本文件,每行一条记录
JSON 半结构化 常见的基于文本的格式,半结构化;大多数库都要求每行一条记录
CSV 非常常见的基于文本的格式,通常在电子表格应用中使用
SequenceFiles 一种用于键值对数据的常见 Hadoop 文件格式
Protocol buffers 一种快速、节约空间的跨语言格式
对象文件 用来将 Spark 作业中的数据存储下来以让共享的代码读取。改变类的时候 它会失效,因为它依赖于 Java 序列化

5.2.1 文本文件

  1. 读取文本文件

python:

input = sc.textFile("file:///home/holden/repos/spark/README.md")

scala:

val input = sc.textFile("file:///home/holden/repos/spark/README.md")

java:

JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")

希望同时处理整个文件,使用 SparkContext. wholeTextFiles() 方法 。Spark 支持读取给定目录中的所有文件,以及在输入路径中使用通配字符 (如 part-*.txt)

  1. 保存文本文件
result.saveAsTextFile(outputFile)

5.2.2 JSON

  1. 读取JSON

python:

import json
data = input.map(lambda x: json.loads(x))
  1. 保存JSON

写出 JSON 文件比读取它要简单得多,因为不需要考虑格式错误的数据,并且也知道要写 出的数据的类型。可以使用之前将字符串 RDD 转为解析好的 JSON 数据的库,将由结构 化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件 API 写出去。

python:

(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x))
.saveAsTextFile(outputFile))

scala:

result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)

5.2.3 CSV和TSV

  1. 读取CSV

读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,再对数据进行处理。由于格式标准的缺失,同一个库的不同版本有时也会用不同的方式处理输入数据。

python:

import csv
import StringIO

def loadRecord(line):
"""解析一行CSV记录"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

scala:

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile(inputFile)
val result = input.map{ line =>
       val reader = new CSVReader(new StringReader(line));
	   reader.readNext();
}

在 Python 中完整读取 CSV :

def loadRecords(fileNameContents):
    """读取给定文件中的所有记录"""
    input = StringIO.StringIO(fileNameContents[1])
    reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

在 Scala 中完整读取 CSV:

case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
       val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x => Person(x(0), x(1))) }
  1. 保存CSV

python:

def writeRecords(records):
    """写出一些CSV记录"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"]) 
    for record in records:
    	writer.writerow(record) return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

scala:

pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray) .mapPartitions{people =>
    val stringWriter = new StringWriter();
    val csvWriter = new CSVWriter(stringWriter); 
    csvWriter.writeAll(people.toList)
    Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)

5.2.4 SequenceFile

Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile(path, keyClass, valueClass, minPartitions)。前面提到过,SequenceFile 使用 Writable 类,因 此 keyClass 和 valueClass 参数都必须使用正确的 Writable 类。

  1. 读取SequenceFile

python:

data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")

scala:

val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).
map{case (x, y) => (x.toString, y.get())}
  1. 保存SequenceFile

scala:

val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)

5.2.5 对象文件

5.2.6 Hadoop输入输出格式

  1. 读取其他Hadoop输入格式

scala:

新式API

val input = sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat], classOf[LongWritable], classOf[MapWritable], conf)
// "输入"中的每个MapWritable代表一个JSON对象
  1. 保存Hadoop输出格式

saveAsNewAPIHadoopFile

5.2.7 文件压缩

5.3 文件系统

5.3.1 本地/“常规”文件系统

Spark 支持从本地文件系统中读取文件,不过它要求文件在集群中所有节点的相同路径下都可以找到。

5.3.2 Amazon S3

5.3.3 HDFS

在 Spark 中使用 HDFS 只需要将输入输出路径指定为 hdfs://master:port/path 就够了。 需要考虑Hadoop和spark的版本。

5.4 Spark SQL中的结构化数据

5.4.1 Apache Hive

Apache Hive 是 Hadoop 上的一种常见的结构化数据源。Hive 可以在 HDFS 内或者在其他存储系统上存储多种格式的表。这些格式从普通文本到列式存储格式,应有尽有。Spark SQL 可以读取 Hive 支持的任何表。

要把 Spark SQL 连接到已有的 Hive 上,你需要提供 Hive 的配置文件。你需要将 hive-site.xml 文件复制到 Spark 的 ./conf/ 目录下。这样做好之后,再创建出 HiveContext 对象,也就是 Spark SQL 的入口,然后你就可以使用Hive 查询语言(HQL)来对你的表进行查询, 并以由行组成的 RDD 的形式拿到返回数据。

python:

from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users") 
firstRow = rows.first()
print(firstRow.name)

scala:

import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // 字段0是name字段

5.4.2 JSON

json推文:

{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}

python:

tweets = hiveCtx.jsonFile("tweets.json") 
tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")

scala:

val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")

5.5 数据库

通过数据库提供的 Hadoop 连接器或者自定义的 Spark 连接器,Spark 可以访问一些常用的数据库系统。

5.5.1 Java数据库连接

5.5.2 Cassandra

5.5.3 HBase

5.5.4 Elasticsearch

Spark 可以使用 Elasticsearch-Hadoop从 Elasticsearch

中读写数据。Elasticsearch 是一个开源的、基于 Lucene 的搜索系统。

Spark系列: 流计算Spark Streaming

流计算概念

  • 流计算:实时获取来自不同数据源的海量数据,经过实时分析处理,获得有价值的信息。
  • 流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低,如用户点击流。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎。
  • 对于一个流计算系统来说,它应达到如下需求:
    • 高性能:处理大数据的基本要求,如每秒处理几十万条数据。
    • 海量式:支持TB级甚至是PB级的数据规模。
    • 实时性:必须保证一个较低的延迟时间,达到秒级别,甚至是毫秒级别。
    • 分布式:支持大数据的基本架构,必须能够平滑扩展。
    • 易用性:能够快速进行开发和部署。
    • 可靠性:能可靠地处理流数据。
  • 流计算处理过程包括数据实时采集、数据实时计算和实时查询服务。

image

Spark Streaming简介

Spark Streaming是构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。Spark Streaming可结合批处理和交互查询,适合一些需要对历史数据和实时数据进行结合分析的应用场景。

Spark Streaming设计

Spark Streaming是Spark的核心组件之一,为Spark提供了可拓展、高吞吐、容错的流计算能力。如下图所示,Spark Streaming可整合多种输入数据源,如Kafka、Flume、HDFS,甚至是普通的TCP套接字。经处理后的数据可存储至文件系统、数据库,或显示在仪表盘里。

image

Spark Streaming的基本原理是将实时输入数据流以**时间片(秒级)**为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据,执行流程如下图所示。

image

Spark Streaming最主要的抽象是DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。在内部实现上,Spark Streaming的输入数据按照时间片(如1秒)分成一段一段的DStream,每一段数据转换为Spark中的RDD,并且对DStream的操作都最终转变为对相应的RDD的操作。例如,下图展示了进行单词统计时,每个时间片的数据(存储句子的RDD)经flatMap操作,生成了存储单词的RDD。整个流式计算可根据业务的需求对这些中间的结果进一步处理,或者存储到外部设备中。

image

DStream操作

概述

Spark Streaming工作原理

在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行task。

在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上。每个Receiver都会负责一个input DStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等)。Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据。

Spark Streaming程序基本步骤

  • 通过创建输入DStream来定义输入源
  • 通过对DStream应用转换操作和输出操作来定义流计算。
  • 用streamingContext.start()来开始接收数据和处理流程。
  • 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。
  • 可以通过streamingContext.stop()来手动结束流计算进程。

创建StreamingContext对象

如果要运行一个Spark Streaming程序,就需要首先生成一个StreamingContext对象,它是Spark Streaming程序的主入口。因此,在定义输入之前,我们首先介绍如何创建StreamingContext对象。我们可以从一个SparkConf对象创建一个StreamingContext对象。

>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> sc = SparkContext.getOrCreate()
>>> ssc = StreamingContext(sc, 1)  // 1表示每隔1秒钟就自动执行一次流计算

输入源类型

基本输入源

示例程序:文件流(DStream)

示例程序:套接字流(DStream)

示例程序:RDD队列流(DStream)

高级数据源

示例程序:Kafka

示例程序:Flume

转换操作

DStream转换操作包括无状态转换有状态转换

  • 无状态转换:每个批次的处理不依赖于之前批次的数据。
  • 有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。

DStream无状态转换操作

  • map(func) :对源DStream的每个元素,采用func函数进行转换,得到一个新的DStream;
  • flatMap(func): 与map相似,但是每个输入项可用被映射为0个或者多个输出项;
  • filter(func): 返回一个新的DStream,仅包含源DStream中满足函数func的项;
  • repartition(numPartitions): 通过创建更多或者更少的分区改变DStream的并行程度;
  • union(otherStream): 返回一个新的DStream,包含源DStream和其他DStream的元素;
  • count():统计源DStream中每个RDD的元素数量;
  • reduce(func):利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素RDDs的新DStream;
  • countByValue():应用于元素类型为K的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数;
  • reduceByKey(func, [numTasks]):当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key的值均由给定的recuce函数(func)聚集起来;
  • join(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新DStream;
  • cogroup(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组;
  • transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作。

DStream有状态转换操作

对于DStream有状态转换操作而言,当前批次的处理需要使用之前批次的数据或者中间结果。有状态转换包括基于滑动窗口的转换追踪状态变化(updateStateByKey)的转换。

滑动窗口转换操作

滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框入窗口内,形成一个小段的DStream,这时,就可以启动对这个小段DStream的计算。

image

下面给给出一些窗口转换操作的含义:

  • window(windowLength, slideInterval) 基于源DStream产生的窗口化的批数据,计算得到一个新的DStream;
  • countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数;
  • reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算;
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数;
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce”操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入);
  • countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

updateStateByKey操作

当我们需要在跨批次之间维护状态时,就必须使用updateStateByKey操作。

下面我们就给出一个具体实例。以“套接字流”为例子来介绍,我们统计单词词频采用的是无状态转换操作,也就是说,每个批次的单词发送给NetworkWordCount程序处理时,NetworkWordCount只对本批次内的单词进行词频统计,不会考虑之前到达的批次的单词,所以,不同批次的单词词频都是独立统计的。

对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
 
while True:
    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    ssc.checkpoint("file:///usr/local/spark/mycode/streaming/")
 
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
 
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0)
 
    lines = ssc.socketTextStream("127.0.0.1", 4444))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
                          .map(lambda word: (word, 1))\
                          .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
 
    running_counts.pprint()
 
    ssc.start()
    ssc.awaitTermination()
$ nc -lk 4444
hadoop
spark
hadoop
spark
hadoop
spark

Spark快速大数据分析: 第7章 在集群上运行Spark

7.1 简介

Spark 可以在各种各样的集群管理器(Hadoop YARN、Apache Mesos,还有 Spark 自带的独立集群管理器)上运行,所以 Spark 应用既能够适应专用集群,又能用于 共享的云计算环境。

7.2 Spark运行时架构

Spark 在分布式环境中的架构:

image

在分布式环境下,Spark 集群采用的是主 / 从结构。在一个 Spark 集群中,有一个节点负责**协调,调度各个分布式工作节点。这个**协调节点被称为驱动器(Driver)节点,与之对应的工作节点被称为执行器(executor)节点。驱动器节点可以和大量的执行器节点进行通信,它们也都作为独立的 Java 进程运行。驱动器节点和所有的执行器节点一起被称为一个 Spark 应用(application)。

Spark 应用通过一个叫作集群管理器(Cluster Manager)的外部服务在集群中的机器上启 动。Spark 自带的集群管理器被称为独立集群管理器。Spark 也能运行在 Hadoop YARN 和 Apache Mesos 这两大开源集群管理器上。

7.2.1 驱动器节点

Spark 驱动器是执行你的程序中的 main() 方法的进程。它执行用户编写的用来创建 SparkContext、创建 RDD,以及进行 RDD 的转化操作和行动操作的代码.。

驱动器程序在 Spark 应用中有下述两个职责。

  • 把用户程序转为任务

Spark 程序其实是隐式地创建出了一个由操作组成的逻辑上的有向无环图。当驱动器程序运行时,它会把这个逻辑图转为物理执行计划。并且做一些优化。这样 Spark 就把逻辑计划转为一系列步骤(stage) 。任务是 Spark 中最小的工作单元,用户程序通常要启动成百上千的独立任务。

  • 为执行器节点调度任务

有了物理执行计划之后,Spark 驱动器程序必须在各执行器进程间协调任务的调度。 执行器进程启动后,会向驱动器进程注册自己。因此,驱动器进程始终对应用中所有的执行 器节点有完整的记录。每个执行器节点代表一个能够处理任务和存储 RDD 数据的进程。

7.2.2 执行器节点

Spark 执行器节点是一种工作进程,负责在 Spark 作业中运行任务,任务间相互独立。执行器进 程有两大作用: 第一,它们负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程; 第二,它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。

7.2.3 集群管理器

Spark 依赖于集群管理器来启动执行器节点,而在某些特殊情况下,也依赖集群管理器来启动驱动器节点。

7.2.4 启动一个程序

不论你使用的是哪一种集群管理器,你都可以使用 Spark 提供的统一脚本 spark-submit 将你的应用提交到那种集群管理器上。通过不同的配置选项,spark-submit 可以连接到相应的集群管理器上,并控制应用所使用的资源数量。

7.2.5 小结

回顾在集群上运行 Spark 应用的详细过程,可把本节的主要概念作如下总结。

(1) 用户通过 spark-submit 脚本提交应用。

(2) spark-submit 脚本启动驱动器程序,调用用户定义的 main() 方法。

(3) 驱动器程序与集群管理器通信,申请资源以启动执行器节点。

(4) 集群管理器为驱动器程序启动执行器节点。

(5) 驱动器进程执行用户应用中的操作。根据程序中所定义的对 RDD 的转化操作和行动操作,驱动器节点把工作以任务的形式发送到执行器进程。

(6) 任务在执行器程序中进行计算并保存结果。

(7) 如果驱动器程序的 main() 方法退出,或者调用了 SparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源。

7.3 使用spark-submit部署应用

前面学习过,Spark 为各种集群管理器提供了统一的工具来提交作业,这个工具是 spark-submit。

提交 Python 应用

$ bin/spark-submit my_script.py

如果在调用 spark-submit 时除了脚本或 JAR 包的名字之外没有别的参数,那么这个 Spark 程序只会在本地执行。当我们希望将应用提交到 Spark 独立集群上的时候,可以将独立集群的地址和希望启动的每个执行器进程的大小作为附加标记提供。

提交应用时添加附加参数

$ bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.py

--master 标记指定要连接的集群 URL;在这个示例中,spark:// 表示集群使用独立模式

spark-submit的--master标记可以接收的值 :

image

你可以运行 spark-submit --help 列出所有可以接收的标记。

7.4 打包代码与依赖

7.4.1 使用Maven构建的用Java编写的Spark应用

Maven 的 pom.xml 文件;Maven 中默认的用户代码在工程根目录(该目录应包含 pom. xml 文件)下的 src/main/java 目录中。

使用 Maven 构建的 Spark 应用的 pom.xml 文件

```
	4.0.0
     
    com.databricks 
    example-build 
    Simple Project 
    jar 
    1.0
    
         
        
            org.apache.spark 
            spark-core_2.10
            1.2.0 
            provided
        
         
        
    		net.sf.jopt-simple 
    		jopt-simple 
    		4.3
    	
    	 
        
    	joda-time 
        joda-time 
        2.0
        
    
    
         
             
             
                 org.apache.maven.plugins
                 maven-shade-plugin 
                 2.3
    			 
                     
                         package
                         
                             shade
                         
                     
                 
             
          
    

```

这个工程声明了两个传递依赖:jopt-simple 和 joda-time,前者用来作选项解析,而后者是一个用来处理时间日期转换的工具库。

打包使用 Maven 构建的 Spark 应用

$ mvn package
# 在目标路径中,可以看到超级JAR包和原版打包方法生成的JAR包
$ ls target/
example-build-1.0.jar
original-example-build-1.0.jar
# 展开超级JAR包可以看到依赖库中的类
$ jar tf target/example-build-1.0.jar
...
joptsimple/HelpFormatter.class
...
org/joda/time/tz/UTCProvider.class
...
# 超级JAR可以直接传给spark-submit
$ /path/to/spark/bin/spark-submit --master local ... target/example-build-1.0.jar

7.4.2 使用sbt构建的用Scala编写的Spark应用

sbt 是一个通常在 Scala 工程中使用的比较新的构建工具。sbt 预期的目录结构和 Maven 相 似。在工程的根目录中,你要创建出一个叫作 build.sbt 的构建文件,源代码则应该放在 src/main/scala 中。

使用 sbt 0.13 的 Spark 应用的 build.sbt 文件

import AssemblyKeys._
name := "Simple Project"
version := "1.0"
organization := "com.databricks" 
scalaVersion := "2.10.3"
libraryDependencies ++= Seq( // Spark依赖
    "org.apache.spark" % "spark-core_2.10" % "1.2.0" % "provided", // 第三方库
    "net.sf.jopt-simple" % "jopt-simple" % "4.3",
    "joda-time" % "joda-time" % "2.0"
)
// 这条语句打开了assembly插件的功能 
assemblySettings
// 配置assembly插件所使用的JAR
jarName in assembly := "my-project-assembly.jar"

// 一个用来把Scala本身排除在组合JAR包之外的特殊选项,因为Spark // 已经包含了Scala
assemblyOption in assembly :=
	(assemblyOption in assembly).value.copy(includeScala = false)

我 们只需要创建出 project/assembly.sbt 文件,并在其中加入 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")。sbt-assembly 的实际版本可能会因使用的 sbt 版本不同而变 化。

在 sbt 0.13 工程构建中添加 assembly 插件

# 显示project/assembly.sbt的内容
$ cat project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

打包使用 sbt 构建的 Spark 应用

$ sbt assembly
# 在目标路径中,可以看到一个组合JAR包
$ ls target/scala-2.10/
my-project-assembly.jar
# 展开组合JAR包可以看到依赖库中的类
$ jar tf target/scala-2.10/my-project-assembly.jar ...
joptsimple/HelpFormatter.class
...
org/joda/time/tz/UTCProvider.class
...
# 组合JAR可以直接传给spark-submit
$ /path/to/spark/bin/spark-submit --master local ... target/scala-2.10/my-project-assembly.jar

7.4.3 依赖冲突

7.5 Spark应用内与应用间调度

Spark 有一系列调度策略来保障资源不会被过度使用,还允许 工作负载设置优先级。

在调度多用户集群时,Spark 主要依赖集群管理器来在 Spark 应用间共享资源。 当 Spark 应用向集群管理器申请执行器节点时,应用收到的执行器节点个数可能比它申请的更多或者 更少,这取决于集群的可用性与争用。许多集群管理器支持队列,可以为队列定义不同优先级或容量限制,这样 Spark 就可以把作业提交到相应的队列中。请查看你所使用的集群管理器的文档获取详细信息。

Spark 应用有一种特殊情况,就是那些长期运行(long lived)的应用。 这意味着这些应用 从不主动退出。Spark SQL 中的 JDBC 服务器就是一个长期运行的 Spark 应用。当 JDBC 服务器启动后,它会从集群管理器获得一系列执行器节点,然后就成为用户提交 SQL 查询 的永久入口。由于这个应用本身就是为多用户调度工作的,所以它需要一种细粒度的调度 机制来强制共享资源。Spark 提供了一种用来配置应用内调度策略的机制。Spark 内部的**公平调度器(Fair Scheduler)**会让长期运行的应用定义调度任务的优先级队列。本书对这部 分内容未作深入探讨,若想了解详情,请参考公平调度器的官方文档:(http://spark.apache.org/docs/latest/job-scheduling.html)。

7.6 集群管理器

Spark 可以运行在各种集群管理器上 ,并通过集群管理器访问集群中的机器。

7.6.1 独立集群管理器

Spark 独立集群管理器提供在集群上运行应用的简单方法。这种集群管理器由一个主节点和几个工作节点组成,各自都分配有一定量的内存和 CPU 核心。

  1. 启动独立集群管理器

你既可以通过手动启动一个主节点和多个工作节点来实现,也可 以使用 Spark 的sbin 目录中的启动脚本来实现。

  1. 提交应用
$ spark-submit --master spark://masternode:7077 yourapp

你可以使用 --master 参数以同样的方式启动 spark-shell 或 pyspark,来连接到该集群上:

$ spark-shell --master spark://masternode:7077
$ pyspark --master spark://masternode:7077
  1. 配置资源用量
  • 执行器进程内存(default: 1G)
  • 占用核心总数的最大值
  1. 高度可用性

Spark 还支持使用 Apache ZooKeeper(一个分布式协调 系统)来维护多个备用的主节点,并在一个主节点失败时切换到新的主节点上。为独立集 群配置 ZooKeeper 不在本书的探讨范围内,不过在 Spark 官方文档(https://spark.apache.org/docs/latest/spark-standalone.html#high-availability)中有所描述。

7.6.2 Hadoop YARN

YARN 是在 Hadoop 2.0 中引入的集群管理器,它可以让多种数据处理框架运行在一个共享的资源池上,并且通常安装在与 Hadoop 文件系统(简称 HDFS)相同的物理节点上。在 这样配置的 YARN 集群上运行 Spark 是很有意义的,它可以让 Spark 在存储数据的物理节点上运行,以快速访问 HDFS 中的数据。

在 Spark 里使用 YARN 很简单:你只需要设置指向你的 Hadoop 配置目录的环境变量,然 后使用 spark-submit 向一个特殊的主节点 URL 提交作业即可。

第一步是找到你的 Hadoop 的配置目录,并把它设为环境变量 HADOOP_CONF_DIR。这个目录 包含 yarn-site.xml 和其他配置文件;如果你把 Hadoop 装到 HADOOP_HOME 中,那么这 个目录通常位于 HADOOP_HOME/conf 中,否则可能位于系统目录 /etc/hadoop/conf 中。然后用如下方式提交你的应用:

$ export HADOOP_CONF_DIR="..." 
$ spark-submit --master yarn yourapp

配置资源用量

当在 YARN 上运行时,根据你在 spark-submit 或 spark-shell 等脚本的 --num-executors 标记中设置的值,Spark 应用会使用固定数量的执行器节点。默认情况下,这个值仅为 2, 所以你可能需要提高它。你也可以设置通过 --executor-memory 设置每个执行器的内存用量,通过 --executor-cores 设置每个执行器进程从 YARN 中占用的核心数目。对于给定 的硬件资源,Spark 通常在用量较大而总数较少的执行器组合(使用多核与更多内存)上 表现得更好,因为这样 Spark 可以优化各执行器进程间的通信。然而,需要注意的是,一些集群限制了每个执行器进程的最大内存(默认为 8 GB),不让你使用更大的执行器进程。

出于资源管理的目的,某些 YARN 集群被设置为将应用调度到多个队列中。使用 --queue 选项来选择你的队列的名字。

7.6.3 Apache Mesos

Apache Mesos 是一个通用集群管理器,既可以运行分析型工作负载又可以运行长期运行的服务(比如网页服务或者键值对存储)。要在 Mesos 上使用 Spark,需要把一个 mesos:// 的 URI 传给 spark-submit:

spark-submit --master mesos://masternode:5050 yourapp

在运行多个主节点时,你可以使用 ZooKeeper 来为 Mesos 集群选出一个主节点。在这种情况下,应该使用mesos://zk:// 的 URI 来指向一个 ZooKeeper 节点列表。比如,你有三个 ZooKeeper 节点(node1、node2 和 node3),并且 ZooKeeper 分别运行在三台机器的 2181 端口上时,你应该使用如下 URI:

mesos://zk://node1:2181/mesos,node2:2181/mesos,node3:2181/mesos
  1. Mesos调度模式

在“细粒度”模式(默认)中,执行器进程占用的 CPU 核心数会在它们执行任务时 动态变化,因此一台运行了多个执行器进程的机器可以动态共享 CPU 资源。而在“粗粒 度”模式中,Spark 提前为每个执行器进程分配固定数量的 CPU 数目,并且在应用结束前 绝不释放这些资源,哪怕执行器进程当前不在运行任务。

  1. 客户端和集群模式
  2. 配置资源用量

7.6.4 Amazon EC2

大数据技术关键字

平时在流浪大数据以及产品架构的时候,出现很多技术关键字,先记录下:

未分类的关键字

  • elastic
  • zeppelin
  • kylin
  • canal
  • codis
  • S3
  • Apache Beam
  • Avro 格式
  • Kanban
  • Google Cloud Data Flow
  • StreamSets
  • Apache NiFi
  • Druid
  • LinkedIn WhereHows
  • Microsoft Cognitive Services
  • Thirft
  • gRpc
  • protobuf
  • jenkins

1、计算框架

实时计算

  • Storm
  • JStorm
  • Spark Streaming
  • Flink
  • Samza
  • S4
  • Heron

批处理/离线计算

  • Hadoop
  • Spark

2、大数据工具

  • MapReduce
  • Pig
  • Sqoop
  • ZooKeeper

3、消息队列

  • Kafka
  • RabbitMQ
  • ActiveMQ

4、日志采集

  • Flume-NG

5、 存储

  • Hive
  • HBase
  • HDFS
  • ES
  • Redis
  • MongoDB
  • MySQL

6、虚拟化

  • Docker
  • Kubernetes

7、SQL 引擎

  • Streaming CQL

8、发行版本

  • CDH(Cloudera)

9、负载均衡

  • Nginx
  • Nginx + Lua
  • LVS
  • Keepalived
  • haproxy

10、运维工具

  • puppet
  • ansible
  • monit

11、人工智能

  • TensorFlow

Spark快速大数据分析: 第6章 Spark编程进阶

6.1 简介

两种类型的共享变量: 累加器(accumulator)与广播变量(broadcast variable)。

累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。在已有的 RDD 转化操作的基础上,我们为类似查询数据库这样需要很大配置代价的任务引入了批操作。为了扩展可用的工具范围,本章会介绍 Spark 与外部程序交互的方式,比如如何与用 R 语言编写的脚本进行交互。

本章会使用业余无线电操作者的呼叫日志作为输入,构建出一个完整的示例应用。

6.2 累加器

累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法。 累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。

在 Python 中累加空行

file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0 
blankLines = sc.accumulator(0)

def extractCallSigns(line):
	global blankLines # 访问全局变量 
    if (line == ""):
		blankLines += 1 
    return line.split(" ")
callSigns = file.flatMap(extractCallSigns) 
callSigns.saveAsTextFile(outputDir + "/callsigns") 
print("Blank lines: %d" % blankLines.value)

在 Scala 中累加空行

val sc = new SparkContext(...)
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) // 创建Accumulator[Int]并初始化为0 
val callSigns = file.flatMap(line => {
	if (line == "") {
		blankLines += 1 // 累加器加1
	}
	line.split(" ") 
})
callSigns.saveAsTextFile("output.txt") 
println("Blank lines: " + blankLines.value)

我们创建了一个叫作 blankLinesAccumulator[Int] 对象,然后在输入 中看到一个空行时就对其加 1

总结起来,累加器的用法如下所示。

  • 通 过 在 驱 动 器 中 调 用 SparkContext.accumulator(initialValue) 方 法, 创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型。
  • Spark 闭包里的执行器代码可以使用累加器的 += 方法(在 Java 中是 add)增加累加器的值。
  • 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue())来访问累加器的值。

注意,工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。在这种模式下,累加器的实现可以更加高效,不需要对每次更新操作进行复杂的通信。

6.2.1 累加器与容错性

Spark 会自动重新执行失败的或较慢的任务来应对有错误的或者比较慢的机器。 即使该节点没有崩溃,而只是处理速度比别的节点慢很多,Spark 也可以抢占式地在另一个节点上启动一个“投机”(speculative)型的任务副本,如果该任务更早结束就可以直接获取结果。即使没有节点失败,Spark 有时也需要重新运行任务来获取缓存中被移除出内存的数据。

**对于要在action操作中使用的累加器,Spark 只会把每个任务对各累加器的修改应用一次。**因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的action操作中。

对于在 RDD transformation 操作中使用的累加器,就不能保证有这种情况了。 转化操作中累加器可 能会发生不止一次更新。举个例子,当一个被缓存下来但是没有经常使用的 RDD 在第一 次从 LRU 缓存中被移除并又被重新用到时,这种非预期的多次更新就会发生。这会强制 RDD 根据其谱系进行重算,而副作用就是这也会使得谱系中的转化操作里的累加器进行更新,并再次发送到驱动器中。在转化操作中,累加器通常只用于调试目的。

6.2.2 自定义累加器

自定义累加器需要扩展 AccumulatorParam,这在 Spark API 文档(http://spark.apache.org/docs/latest/api/scala/index.html#package)中有所介绍。 只要该操作同时满足交换律和结合律。

6.3 广播变量

Spark 的第二种共享变量类型是广播变量,它可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。

在 Python 中使用广播变量查询国家

# 查询RDD contactCounts中的呼号的对应位置。将呼号前缀 
# 读取为国家代码来进行查询
signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
	country = lookupCountry(sign_count[0], signPrefixes.value) 
    count = sign_count[1]
	return (country, count)
countryContactCounts = (contactCounts
                        .map(processSignCount)
                        .reduceByKey((lambda x, y: x+ y))) 

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

在 Scala 中使用广播变量查询国家

// 查询RDD contactCounts中的呼号的对应位置。将呼号前缀
// 读取为国家代码来进行查询
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
	val country = lookupInArray(sign, signPrefixes.value)
	(country, count)
}.reduceByKey((x, y) => x + y) 
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

广播的优化

序列化格式

6.4 基于分区进行操作

Spark 提供基于分区的 map 和 foreach,让你的部分代码只对 RDD 的每个分区运行 一次,这样可以帮助降低这些操作的代价。

回到呼号的示例程序中来,我们有一个在线的业余电台呼号数据库,可以用这个数据库查 询日志中记录过的联系人呼号列表。通过使用基于分区的操作,可以在每个分区内共享一 个数据库连接池,来避免建立太多连接,同时还可以重用 JSON 解析器。如例 6-10 至例 6-12 所示,使用 mapPartitions 函数获得输入 RDD 的每个分区中的元素迭代器,而需要返 回的是执行结果的序列的迭代器。

在 Python 中使用共享连接池

def processCallSigns(signs):
	"""使用连接池查询呼号"""
	# 创建一个连接池
	http = urllib3.PoolManager()
	# 与每条呼号记录相关联的URL
	urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs) 
    # 创建请求(非阻塞)
	requests = map(lambda x: (x, http.request('GET', x)), urls)
	# 获取结果
	result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
    # 删除空的结果并返回
    return filter(lambda x: x[1] is not None, result)

def fetchCallSigns(input): 
    """获取呼号"""
	return input.mapPartitions(lambda callSigns : processCallSigns(callSigns)) 

contactsContactList = fetchCallSigns(validSigns)

在 Scala 中使用共享连接池与 JSON 解析器

val contactsContactLists = validSigns.distinct().mapPartitions{ 
    signs =>
	val mapper = createMapper() 
    val client = new HttpClient() 
    client.start()
	// 创建http请求
	signs.map{sign =>
		createExchangeForSign(sign) // 获取响应
	}.map{ case (sign, exchange) =>
		(sign, readExchangeCallLog(mapper, exchange)) 
    }.filter(x => x._2 != null) // 删除空的呼叫日志
}

按分区执行的操作符

image

除了避免重复的配置工作,也可以使用 mapPartitions() 避免创建对象的开销。有时需要创建一个对象来将不同类型的数据聚合起来。当计算平均值时,一种方法是将数值 RDD 转为二元组 RDD,以在归约过程中追踪所处理的元素个数。现在,可以为每个分区只创建一次二元组,而不用为每个元素都执行这个操作。

在 Python 中不使用 mapPartitions() 求平均值

def combineCtrs(c1, c2):
    return (c1[0] + c2[0], c1[1] + c2[1])

def basicAvg(nums):
    """计算平均值"""
	nums.map(lambda num: (num, 1)).reduce(combineCtrs)

在 Python 中使用 mapPartitions() 求平均值

def partitionCtr(nums): 
    """计算分区的sumCounter""" 
    sumCount = [0, 0]
	for num in nums:
        sumCount[0] += num
        sumCount[1] += 1
    return [sumCount]

def fastAvg(nums): 
    """计算平均值"""
	sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs) 
    return sumCount[0] / float(sumCount[1])

6.5 与外部程序间的管道

可以将数据通过管道传给用其他语言编写的程序,比如 R 语言脚本。

Spark 在 RDD 上提供 pipe() 方法。 通过 pipe(),你可以 将 RDD 中的各元素从标准输入流中以字符串形式读出,并对这些元素执行任何你需要的操作,然后把结果以字符串的形式写入标准输出——这个过程就是 RDD 的转化操作过 程。

我们使用一个 R 语言的库来计算所有联系人的距离。 程序会把 RDD 的每 个元素都以换行符作为分隔符写出去,而那个 R 程序输出的每一行都是字符串,用来构 成结果 RDD 中的元素。 我们会把数据以 mylat, mylon, theirlat, theirlon 的格式重新组织。这里使用逗号作为分隔符。

R 语言的距离程序

#!/usr/bin/env Rscript
library("Imap")
f <- file("stdin")
open(f)
while(length(line <- readLines(f, n=1)) > 0) {
	# 处理行
	contents <- Map(as.numeric, strsplit(line, ",")) 
    mydist <- gdist(contents[[1]][1],
                    contents[[1]][2],
                    contents[[1]][3], 
                    contents[[1]][4],
					units="m", a=6378137.0, b=6356752.3142, verbose = FALSE) 
    write(mydist, stdout())
}
$ ./src/R/finddistance.R 
37.75889318222431,-122.42683635321838,37.7614213,-122.4240097
349.2602
coffee
NA
ctrl-d

可以将 stdin 中的每一行数据都转为 stdout 中的输出了。现在需要做的事情是让每个工作节点都能访问 finddistance.R,并调用这个脚本来对 RDD 进行实际的转化操作。这两个任务在 Spark 中都很容易完成,

在 Python 中使用 pipe() 调用 finddistance.R 的驱动器程序

# 使用一个R语言外部程序计算每次呼叫的距离 
distScript = "./src/R/finddistance.R" 
distScriptName = "finddistance.R" 
sc.addFile(distScript)
def hasDistInfo(call):
	"""验证一次呼叫是否有计算距离时必需的字段"""
	requiredFields = ["mylat", "mylong", "contactlat", "contactlong"] 
	return all(map(lambda f: call[f], requiredFields))

def formatCall(call): 
	"""将呼叫按新的格式重新组织以使之可以被R程序解析""" 
	return "{0},{1},{2},{3}".format(
			call["mylat"], 
			call["mylong"],
            call["contactlat"],
            call["contactlong"])

pipeInputs = contactsContactList.values().flatMap(
	lambda calls: map(formatCall, filter(hasDistInfo, calls)))
distances = pipeInputs.pipe(SparkFiles.get(distScriptName)) 
print(distances.collect())

6.6 数值RDD的操作

Spark 的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些 统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以 StatsCounter 对象返回。

StatsCounter中可用的汇总统计数据

StatsCounter

如果你只想计算这些统计数据中的一个,也可以直接对 RDD 调用对应的方法,比如 rdd. mean() 或者 rdd.sum()。

用 Python 移除异常值

# 要把String类型RDD转为数字数据,这样才能
# 使用统计函数并移除异常值
import math
distanceNumerics = distances.map(lambda string: float(string)) 
stats = distanceNumerics.stats()
stddev = std.stdev()
mean = stats.mean()
reasonableDistances = distanceNumerics.filter(
	lambda x: math.fabs(x - mean) < 3 * stddev) 
print(reasonableDistances.collect())

用 Scala 移除异常值

// 现在要移除一些异常值,因为有些地点可能是误报的
// 首先要获取字符串RDD并将它转换为双精度浮点型
val distanceDouble = distances.map(string => string.toDouble)
val stats = distanceDoubles.stats()
val stddev = stats.stdev
val mean = stats.mean
val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev) println(reasonableDistance.collect().toList)

用 Java 移除异常值

// 首先要把String类型RDD转为DoubleRDD,这样才能使用统计函数
JavaDoubleRDD distanceDoubles = distances.mapToDouble(new DoubleFunction<String>() {
	public double call(String value) { 
        return Double.parseDouble(value);
	}});

final StatCounter stats = distanceDoubles.stats(); 
final Double stddev = stats.stdev();
final Double mean = stats.mean();5214
    
JavaDoubleRDD reasonableDistances =
distanceDoubles.filter(new Function<Double, Boolean>() {
    public Boolean call(Double x) {
		return (Math.abs(x-mean) < 3 * stddev);
    }}); 
System.out.println(StringUtils.join(reasonableDistance.collect(), ","));

Spark内核设计的艺术: 第10章 Spark API

10.1 基本概念

  1. DataType 简介

    DataType是 Spark SQL的所有数据类型的基本类型, Spark SQL的所有数据类型都继承自 DataType, DataType的继承体系如图所示。

    image

    从图中可以看到, Spark SQL中定义的数据类型与Java的基本数据类型大部分都是一致的。由于 Spark SQL不是本书要讲解的内容,所以读者在这里只需要了解这些内容
    即可。

  2. Metadata简介
    Metadata用来保存 StructField的元数据信息,其本质是底层的 Map[string, Any] Metadata可以对Boolean、Long、 Double、 String、 Metadata、Array[ Boolean]、 Array[Long]、Array[Double]、 Array[String]和Array [Metadata]等类型的元数据进行存储或读取。 Metadata属于 Spark SQL中的内容,这里不多介绍。

  3. StructType 与 StructField
    样例类 StructType与样例类 Structfield共同构建起数据源的数据结构。StructField**定义了4个属性:字段名称(name)、数据类型( data Type)、是否允许为null( nullable)、元数据( metadata)。 StructField的定义如下:

  case class StructField(
      name: String,
      dataType: DataType,
      nullable: Boolean = true,
      metadata: Metadata = Metadata.empty)

10.2 数据源 DataSource

从 Spark1.3.0开始, Spark推出了 Data Frame的API,与此同时 Data Source也被引入到Spark中。 Spark将文本文件、CSV文件、JSON文件等一系列格式的输入数据都作为数据源。特质 DataSourceRegister是对所有数据源的抽象, DataSourceRegister的所有具体实现都被注册到了 DataSource中。 DataSource将负责对不同类型数据源的查找、创建不同类型数据源的信息、解析不同数据源的关系等。

10.2.1 DataSourceRegister 详解

DataSourceRegister是数据源的抽象,所有数据源都应该实现它。 DataSourceRegister的定义非常简单,代码如下。

trait DataSourceRegister {
	def shortName(): String
}

shortName方法意在获取数据源提供者使用的格式或格式的别名Spark中实现了大量的数据源提供者,如图所示。

image

这里以 TextFileFormat实现的 shortName方法为例。

override def shorName(): String = "text"

10.2.2 DataSource详解

DataSource表示在Spark SQL中可插拔的数据源。

10.3 检查点的实现

检查点是很多分布式系统为了容灾容错引入的机制,其实质是将系统运行期的内存数据结构和状态持久化到磁盘上,在需要时通过对这些持久化数据的读取,重新构造出之前的运行期状态。 Spark使用检查点主要是为了将RDD的执行状态保留下来,在重新执行时就不用重新计算,而直接从检查点读取。 CheckpointRDD是对检查点数据进行操作的RDD,例如,读写检查点数据。 RDDCheckpointData表示RDD检查点的数据。 Spark的检查点离不开 CheckpointRDD和 RDDCheckpointData的支持,本节将对它们的代码实现进行分析。

10.3.1 CheckpointRDD 的实现

CheckpointRDD 是用来存储体系中回复检查点的数据。

/**
 * An RDD that recovers checkpointed data from storage.
 */
private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
  extends RDD[T](sc, Nil) {

  // CheckpointRDD should not be checkpointed again
  override def doCheckpoint(): Unit = { }
  override def checkpoint(): Unit = { }
  override def localCheckpoint(): this.type = this

  // Note: There is a bug in MiMa that complains about `AbstractMethodProblem`s in the
  // base [[org.apache.spark.rdd.RDD]] class if we do not override the following methods.
  // scalastyle:off
  protected override def getPartitions: Array[Partition] = ???
  override def compute(p: Partition, tc: TaskContext): Iterator[T] = ???
  // scalastyle:on

}
  • compute 实际上是从恢复点恢复数据

10.3.2 RDDCheckpointData 的实现

RDDCheckpointData 用来保存和检查点相关的信息。每个RDDCheckpointData 实例都与一个RDD实例相关联。RDDCheckpointData中一共3个属性:

  • rdd
  • cpState: 检查点状态
  • cpRDD: 保存检查点数据的RDD,即CheckpointRDD的实现类。

10.3.3 ReliableRDDCheckpointData 的实现

本节以 RDDCheckpointData 的子类 ReliableRDDCheckpointData为例,来看看RDDCheckpointData的具体实现。

ReliableRDDCheckpointData除继承了 RDDCheckpointData 的属性外,还有自身的一个属性 cpDir。 cpDir是保存ReliableRDDCheckpointData所关联的RDD数据的检查点目录,是通过调用 ReliablerDDCheckpointData的伴生对象的 checkpointPath方法生成的。

10.4 RDD 的再次分析

笔者早在7.2.2节就对RDD的实现进行了分析,但当时只介绍了与调度系统相关的API。RDD还提供了很多其他类型的API,包括对RDD进行转换的API、对RDD进行计算(动作)的API及RDD检查点相关的API。转换API里的计算是延迟的,也就是说调用转换API不会向 Spark集群提交Job,更不会执行转换计算。只有调用了动作API,才会提交Job并触发对转换计算的执行。由于RDD提供的AP非常多,本书不可能一一展示。由于在10.8节将要介绍 word count的例子,因此本节主要挑选 word count例子中使用到的API进行分析。

10.4.1 转换 API

转换(transform)是指对现有RDD执行某个函数后转换为新的RDD的过程。转换前的RDD与转换后的RDD之间具有依赖和血缘关系。RDD的多次转换将创建出多个RDD这些RDD构成了一张单向依赖的图,也就是DAG。下面挑选10.8节的 word count例子所涉及的转换AP进行介绍。

  1. mapPartitions
    mapPartitions方法用于将RDD转换为 MapPartitionsRDD。
  2. mapPartitionsWithIndex

用于创建爱你一个将与分区索引先关的函数应用到RDD的每个分区的MapPartitionsRDD。mapPartitionsWithIndex与 mapAritions相似,区别在于多接收分区索引的参数

  1. mapPartitionsWithIndexInternal

    用于创建一个将函数应用到RDD的每个分区的MapPartitionsRDD。此方法是私有的,只有Spark SQL内部可使用。

  2. flatMap

  3. map

  4. toJavaRDD

    用于将RDD自己转换成JavaRDD。

10.4.2 动作 API

由于转换API都是预先编织好,但是不会执行的,所以Spak需要一些API来触发对转换的执行。动作API触发对数据的转换后,将接收到一些结果数据,动作AP因此还具备对这些数据进行收集、遍历、叠加的功能。下面挑选10.8节的 word count例子使用的动作 API-collect进行介绍。此外再介绍 foreach和 reduce两个动作API。

  1. collect
    collect方法将调用 Spark Context的 runJob方法提交基于RDD的所有分区上的作业,并返回数组形式的结果。

  2. foreach

  3. reduce

    reduce 方法按照指定的函数对RDD中的元素进行叠加操作。

10.4.3 检查点API的实现分析

RDD中提供了很多与检查点相关的API,通过对这些AP的使用,Spark应用程序才能够启用、保存及使用检查点,提高应用程序的容灾和容错能力。下面进行介绍。

  1. 检查点的启用
    用户提交的 Spark作业必须主动调用RDD的 checkpoint方法,才会启动检查点功能。给 Spark Context指定 checkpointDir是启用检查点机制的前提。可以使用 Spark Context的 setcheckpointDir方法设置checkpointDir。如果没有指定 RDDCheckpointData,那么创建 ReliableRDDCheckpointData
  2. 检查点的保存
    RDD的 doCheckpoint方法用于将RDD的数据保存到检查点。由于此方法是私有的,只能在RDD内部使用。
  3. 检查点的使用

10.4.4 迭代计算

在8.3.3节和8.5.4节分析 ShuffleMapTask和 ResultTask的 runtask方法时已经看到,Task的执行离不开对RDD的iterator方法的调用。RDD的 Iterator方法是迭代计算的人口。

10.5 数据集合 Dataset

Dataset是特定领域对象的强类型集合,可通过功能或关系操作进行并行转换。当Dataset的泛型类型是Row时, Dataset还可以作为 DataFrame。 DataFrame的类型定义如下。

type DataFrame Dataset [Row]

有了行的数据集合, Dataframe看起来更像是关系数据库中的表。 Data Frame是专门为了数据科学应用设计,支持从KB到PB级的数据量。 Spark支持从文本文件、CSV文件、Oracle脚本文件、JSON文件及所有支持JDBC的数据源(例如, MySQL和Hive)转换为Dataframe。

Dataset I中提供的API非常丰富,本书不可能逐一进行源码分析。

  1. ofRows

    ofRows是Dataset的伴生对象中提供的方法,用于将逻辑执行计划LogicalPlan转换为泛型是Row的Dataset(即DataFrame)。

  2. 多种多样的select

    Dataset提供了多个重载的 select方法,以实现类似于SQL中的 SELECT语句的选择功能。 Dataset虽然提供了多个 select操作的API,但这些API最终都将转换为统一的select方法。

  3. rdd

    rdd是 Dataset的属性之一,由于被关键字lazy修饰,因此在需要rdd的值时才会进行“懒”执行。

10.6 DataFrameReader

DataFrameReader用于通过外部数据源的格式(如text、cvs等)和数据结构加载Dataset, DataFrameReader还提供了非常丰富的操作DataFrame的API。有了DataReader,我们就可以将各种格式的数据源转换为Dataset或Dataframe,进而以面向关系型数据的方式操纵各种格式的数据。

DataFrameReader 只有三个属性,分别如下。

  • source:输入数据源的格式。可通过 spark. sql. sources. default属性配置,默认为 parquet

小贴士: Apache Parquet是 Hadoop生态圈中一种新型列式存储格式,它可以兼容 Hadoop生态圈中大多数计算框架( Hadoop、 Spark等),被多种查询引擎支持(Hive、Impala、Drill等),并且是数据处理框架、数据模型和语言无关的。Parquet最初是由Twitter和 Cloudera(由于Impala的缘故)合作开发完成并开源,2015年5月从 Apache的孵化器里华业成为 Apache顶级项目。

  • userSpecifiedSchema:用户指定的 Schema,实际的类型是 StructType
    口 extraOptions:类型为 HashMap[String, String],用于保存额外的选项
  • extraOptions:类型为 HashMap[String, String],用于保存额外的选项Q

DataFrameReader提供了大量的API,本书限于篇幅不可能全部都进行分析。

  1. format
    format方法用于设置输入数据源的格式。

  2. schema

    用于设置用户指定的结构类型(StructType)。

  3. 多种多样的option

    DataFrameReader提供了多个重载的 option方法,用于向 extraCtions中添加选项。无论哪个 option方法,都将转换为对最基本的 option方法的调用

  4. 多种多用的load

    DataFrameReader提供了多个重载的load方法,用于将数据源的数据加载到DataFrame这些load方法实际都会转换为调用基本的load方法来加载数据。

  5. 重载的text

  6. 重载的textFIle

10.7 SparkSession

Spark2.0引入了 SparkSession,为用户提供了一个统一的切入点来使用 Spark的各项功能。 SparkSession还提供了 DataFrame和 Dataset相关的API来编写Spark应用程序。SparkSession降低了学习曲线,使得工程师可以更容易地使用 Spark
SparkSession的属性如下。

  • spark Context: 即 Spark context。
  • sharedState: 在多个 SparkSession之间共享的状态(包括 Spark context、缓存的数
    据、监听器及与外部系统交互的字典信息)。
  • sessionState: SparkSession的状态( SessionState) SessionState中保存着 SparkSession
    指定的状态信息。
  • sqlContext: 即 SQLContext。 SQLContext是 Spark SQL的上下文信息。
  • conf: 类型为 RuntimeConfig,是 Spark运行时的配置接口类。

根据 SparkSession的属性,笔者认为 SparkSession并非是一个新创造的东西,它不过是对 SparkContext、 SQLContext及 Dataframe等的一层封装。 SparkSession的创建离不开构建器。

SparkSession的伴生对象提供了很多属性和方法,以便于我们构造Spaark Session。SparkSession的伴生对象中包含以下属性。

  • sqlListener: 类型定义为 AtomicReference[SQLListener],用于持有 SQLListenerliStener主要用于 SQL UI。
  • active ThreadSession: 类型定义为 InheritableThreadLocal[ SparkSession],用于持有当前线程的激活的 SparkSession。 SparkSession的伴生对象提供了setActiveSession方法、 getActiveSessior方法、 clearActivesession方法分别用于设置、获取、清空activeThreadSessic中持有的 SparkSession。
  • defaultsession: 类型定义为 AtomicReference[SparkSession],用于持有默认的spakSession。 SparkSession的伴生对象提供了 setDefaultsession方法、 getDefaultSession方法、 clearDefaultsession方法分别用于设置、获取、清空defaultsession中持有的SparkSession 。

10.7.1 SparkSession的构建器 Builder

Builder是 SparkSession实例的构建器,对 SparkSession实例的构造都依赖于它。Builder中的 options(类型为 HashMap[String, Stringl)用于缓存构建 SparkConf所需的属性配置。 userSuppliedcontext属性用于持有用户提供的 SparkContext,可以通过 Builder的sparkContext方法来设置。 Builder中提供了很多向 options中增加属性配置的方法,此外还提供了获取或创建 SparkSession的方法。

  1. 多种多样的config

  2. getOrCreate 方法

    getOrCreate 方法用于获取或创建SparkSession。

10.7.2 SparkSession 的API

  1. builder
    SparkSession的伴生对象中提供了 builder方法用于创建 Builder,其实现如下

    def builder (): Builder new Builder
  2. read
    sparkSession的read方法用于创建 DataFrameReader。

  3. baseRelationToDataFrame
    SparkSession的 baseRelationToDataFrame方法用于将 BaseRelation转换为 DataFrame。将 BaseRelation转换为 DataFrame后,就可以用操作关系数据的方式来开发。

10.8 word count 例子

Spark系列: 理解Catalyst优化器原理

认识Catalyst优化器

Spark引擎的Catalyst优化器编译并优化了逻辑计划,而且还有一个能够确保生成最有效的物理计划的成本优化器。

image

预备知识-Tree&Rule

在介绍SQL优化器工作原理之前,有必要首先介绍两个重要的数据结构:Tree和Rule。相信无论对SQL优化器有无了解,都肯定知道SQL语法树这个概念,不错,SQL语法树就是SQL语句通过编译器之后会被解析成一棵树状结构。这棵树会包含很多节点对象,每个节点都拥有特定的数据类型,同时会有0个或多个孩子节点(节点对象在代码中定义为TreeNode对象),下图是个简单的示例:

image

如上图所示,箭头左边表达式有3种数据类型(Literal表示常量、Attribute表示变量、Add表示动作),表示x+(1+2)。映射到右边树状结构后,每一种数据类型就会变成一个节点。另外,Tree还有一个非常重要的特性,可以通过一定的规则进行等价变换,如下图:

image

上图定义了一个等价变换规则(Rule):两个Integer类型的常量相加可以等价转换为一个Integer常量,这个规则其实很简单,对于上文中提到的表达式x+(1+2)来说就可以转变为x+3。对于程序来讲,如何找到两个Integer常量呢?其实就是简单的二叉树遍历算法,每遍历到一个节点,就模式匹配当前节点为Add、左右子节点是Integer常量的结构,定位到之后将此三个节点替换为一个Literal类型的节点。

上面用一个最简单的示例来说明等价变换规则以及如何将规则应用于语法树。在任何一个SQL优化器中,通常会定义大量的Rule(后面会讲到),SQL优化器会遍历语法树中每个节点,针对遍历到的节点模式匹配所有给定规则(Rule),如果有匹配成功的,就进行相应转换,如果所有规则都匹配失败,就继续遍历下一个节点。

Catalyst优化器

任何一个优化器工作原理都大同小异:SQL语句首先通过Parser模块被解析为语法树,此棵树称为Unresolved Logical Plan;Unresolved Logical Plan通过Analyzer模块借助于元数据解析为Logical Plan;此时再通过各种基于规则的优化策略进行深入优化,得到Optimized Logical Plan;优化后的逻辑执行计划依然是逻辑的,并不能被Spark系统理解,此时需要将此逻辑执行计划转换为Physical Plan;为了更好的对整个过程进行理解,下文通过一个简单示例进行解释。

Parser

Parser简单来说是将SQL字符串切分成一个一个Token,再根据一定语义规则解析为一棵语法树。Parser模块目前基本都使用第三方类库ANTLR进行实现,比如Hive、 Presto、SparkSQL等。下图是一个示例性的SQL语句(有两张表,其中people表主要存储用户基本信息,score表存储用户的各种成绩),通过Parser解析后的AST语法树如下图所示:

image

Analyzer

通过解析后的逻辑执行计划基本有了骨架,但是系统并不知道score、sum这些都是些什么鬼,此时需要基本的元数据信息来表达这些词素,最重要的元数据信息主要包括两部分:表的Scheme和基本函数信息,表的scheme主要包括表的基本定义(列名、数据类型)、表的数据格式(Json、Text)、表的物理位置等,基本函数信息主要指类信息。

Analyzer会再次遍历整个语法树,对树上的每个节点进行数据类型绑定以及函数绑定,比如people词素会根据元数据表信息解析为包含age、id以及name三列的表,people.age会被解析为数据类型为int的变量,sum会被解析为特定的聚合函数,如下图所示:

image

SparkSQL中Analyzer定义了各种解析规则,有兴趣深入了解的童鞋可以查看Analyzer类,其中定义了基本的解析规则,如下:

img

Optimizer(优化器)

优化器是整个Catalyst的核心,上文提到优化器分为基于规则优化和基于代价优化两种,当前SparkSQL 2.1依然没有很好的支持基于代价优化(下文细讲),此处只介绍基于规则的优化策略,基于规则的优化策略实际上就是对语法树进行一次遍历,模式匹配能够满足特定规则的节点,再进行相应的等价转换。因此,基于规则优化说到底就是一棵树等价地转换为另一棵树。SQL中经典的优化规则有很多,下文结合示例介绍三种比较常见的规则:谓词下推(Predicate Pushdown)、常量累加(Constant Folding)和列值裁剪(Column Pruning)。

image

上图左边是经过Analyzer解析前的语法树,语法树中两个表先做join,之后再使用age>10对结果进行过滤。大家知道join算子通常是一个非常耗时的算子,耗时多少一般取决于参与join的两个表的大小,如果能够减少参与join两表的大小,就可以大大降低join算子所需时间。谓词下推就是这样一种功能,它会将过滤操作下推到join之前进行,上图中过滤条件age>0以及id!=null两个条件就分别下推到了join之前。这样,系统在扫描数据的时候就对数据进行了过滤,参与join的数据量将会得到显著的减少,join耗时必然也会降低。

image

常量累加其实很简单,就是上文中提到的规则 x+(1+2) -> x+3,虽然是一个很小的改动,但是意义巨大。示例如果没有进行优化的话,每一条结果都需要执行一次100+80的操作,然后再与变量math_score以及english_score相加,而优化后就不需要再执行100+80操作。

image

列值裁剪是另一个经典的规则,示例中对于people表来说,并不需要扫描它的所有列值,而只需要列值id,所以在扫描people之后需要将其他列进行裁剪,只留下列id。这个优化一方面大幅度减少了网络、内存数据量消耗,另一方面对于列存数据库(Parquet)来说大大提高了扫描效率。

Optimizer源码

至此,逻辑执行计划已经得到了比较完善的优化,然而,逻辑执行计划依然没办法真正执行,他们只是逻辑上可行,实际上Spark并不知道如何去执行这个东西。比如Join只是一个抽象概念,代表两个表根据相同的id进行合并,然而具体怎么实现这个合并,逻辑执行计划并没有说明。

image

此时就需要将逻辑执行计划转换为物理执行计划,将逻辑上可行的执行计划变为Spark可以真正执行的计划。比如Join算子,Spark根据不同场景为该算子制定了不同的算法策略,有BroadcastHashJoin、ShuffleHashJoin以及SortMergeJoin等(可以将Join理解为一个接口,BroadcastHashJoin是其中一个具体实现),物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现,这个过程涉及到基于代价优化策略,后续文章细讲。

Spark SQL执行计划

通过一个简单的示例完整的介绍了Catalyst的整个工作流程,包括Parser阶段、Analyzer阶段、Optimize阶段以及Physical Planning阶段。有同学可能会比较感兴趣Spark环境下如何查看一条具体的SQL的整个过程,在此介绍两种方法:

queryExecution

使用queryExecution方法查看逻辑执行计划,使用explain方法查看物理执行计划,分别如下所示:

image

image

Spark WebUI

使用Spark WebUI进行查看,如下图所示:

image

Spark系列: 深入理解RDD

RDD介绍

RDD参考论文

RDD的全称是:Resilient Distributed Dataset (弹性分布式数据集),它有几个关键的特性:

  • RDD是只读的,表示它的不可变性。
  • 可以并行的操作分区集合上的所有元素。
  • 天生具有容错机制的特殊集。
  • 只能通过在稳定的存储器或其他RDD上的确定性操作(转换)来创建。

每一种RDD类型都包含以下五种特性:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

为解决资源不能在内存中,并且跨集群重复使用的问题,我们抽象出了RDD的概念,可以支持在大量的应用之间高效的重复利用数据。RDD是一种具备容错性、并行的数据结构,可以在内存中持久化,以便在大量的算子之间操作。

Resilient Distributed Datasets (RDDs)

RDD的抽象

  • 只读的分区记录的集合
  • RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建,RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage),据此可以从物理存储的数据计算出相应的RDD分区
  • 用户可控制RDDs的两个方面:持久化和分区性

Spark编程接口

在Spark中,RDD被表示为对象,通过这些对象上的方法(或函数)调用转换。

定义RDD之后,程序员就可以在动作中使用RDD了。动作是向应用程序返回值,或向存储系统导出数据的那些操作,例如,count(返回RDD中的元素个数),collect(返回元素本身),save(将RDD输出到存储系统)。在Spark中,只有在动作第一次使用RDD时,才会计算RDD(即延迟计算)。这样在构建RDD的时候,运行时通过管道的方式传输多个转换。

程序员还可以从两个方面控制RDD,即缓存和分区。用户可以请求将RDD缓存,这样运行时将已经计算好的RDD分区存储起来,以加速后期的重用。缓存的RDD一般存储在内存中,但如果内存不够,可以写到磁盘上。

另一方面,RDD还允许用户根据关键字(key)指定分区顺序,这是一个可选的功能。目前支持哈希分区和范围分区。例如,应用程序请求将两个RDD按照同样的哈希分区方式进行分区(将同一机器上具有相同关键字的记录放在一个分区),以加速它们之间的join操作。

控制台日志挖掘

lines = spark.textFile("hdfs://...")

errors = lines.filter(_.startsWith("ERROR"))

errors.cache()

// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS")).map(_.split(’\t’)(3)).collect()

image

为了说明模型的容错性,给出了3个查询的Lineage图。Spark调度器以流水线的方式执行后两个转换,向拥有errors分区缓存的节点发送一组任务。此外,如果某个errors分区丢失,Spark只在相应的lines分区上执行filter操作来重建该errors分区。

RDD与分布式共享内存

image

注意,通过备份任务的拷贝,RDD还可以处理落后任务(即运行很慢的节点),这点与MapReduce[12]类似。而DSM则难以实现备份任务,因为任务及其副本都需要读写同一个内存位置。

与DSM相比,RDD模型有两个好处:

  • 对于RDD中的批量操作,运行时将根据数据存放的位置来调度任务,从而提高性能。(计算向数据靠拢)
  • 对于基于扫描的操作,如果内存不足以缓存整个RDD,就进行部分缓存。把内存放不下的分区存储到磁盘上,此时性能与现有的数据流系统差不多。

最后看一下读操作的粒度。RDD上的很多动作(如count和collect)都是批量读操作,即扫描整个数据集,可以将任务分配到距离数据最近的节点上。同时,RDD也支持细粒度操作,即在哈希或范围分区的RDD上执行关键字查找。

RDD模型的优势

Spark中的RDD操作

下面是Spark中的RDD转换和动作。每个操作都给出了标识,其中方括号表示类型参数。前面说过转换是延迟操作,用于定义新的RDD;而动作启动计算操作,并向用户程序返回值或向外部存储写数据。

image

应用程序示例

逻辑回归(Logistic Regression)

例如下面的程序是逻辑回归的实现。逻辑回归是一种常见的分类算法,即寻找一个最佳分割两组点(即垃圾邮件和非垃圾邮件)的超平面w。算法采用梯度下降的方法:开始时w为随机值,在每一次迭代的过程中,对w的函数求和,然后朝着优化的方向移动w。

val points = spark.textFile(...).map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) { 
	val gradient = points.map{ 
			p =>p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
		}.reduce((a,b) => a+b)
	w -= gradient
}

首先定义一个名为points的缓存RDD,这是在文本文件上执行map转换之后得到的,即将每个文本行解析为一个Point对象。然后在points上反复执行map和reduce操作,每次迭代时通过对当前w的函数进行求和来计算梯度。

PageRank

该算法通过合计链接到自身页面的page对其的贡献,进而迭代式地更新自身的排名。每一次的迭代中,每一个文件(page)都将r/m的贡献发送给他的邻居,其中r是它的rank,n是它邻居页面的的数目,之后,它会将自身的排名更新为α/N + (1 − α)∑ci,其中 ∑ci是是当前页面获取到的贡献的总和,N是文件(page)的总数

image

RDD的描述

我们希望在不修改调度器的前提下,支持RDD上的各种转换操作,同时能够从这些转换获取Lineage信息。为此,我们为RDD设计了一组小型通用的内部接口。

简单地说,每个RDD都包含(RDD的内部实现接口):

Operation Meaning
partitions() Return a list of Partition objects
preferredLocations(p) List nodes where partition p can be accessed faster due to data locality
dependencies() Return a list of dependencies
iterator(p, parentIters) Compute the elements of partition pgiven iterators for its parent partitions
partitioner() Return metadata specifying whether the RDD is hash/range partitioned

设计接口的一个关键问题就是,如何表示RDD之间的依赖。我们发现RDD之间的依赖关系可以分为两类,即:

  • 窄依赖(narrow dependencies):子RDD的每个分区依赖于常数个父分区(即与数据规模无关);
  • 宽依赖(wide dependencies):子RDD的每个分区依赖于所有父RDD分区。例如,map产生窄依赖,而join则是宽依赖(除非父RDD被哈希分区)。

image

区分这两种依赖很有用。

  • 窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。例如,逐个元素地执行map、然后filter操作;
  • 宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。
  • 窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;
  • 对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。

Spark任务调度器

调度器根据RDD的结构信息为每个动作确定有效的执行计划。调度器的接口是runJob函数,参数为RDD及其分区集,和一个RDD分区上的函数。该接口足以表示Spark中的所有动作(即count、collect、save等)。

总的来说,我们的调度器跟Dryad类似,但我们还考虑了哪些RDD分区是缓存在内存中的。调度器根据目标RDD的Lineage图创建一个由stage构成的无回路有向图(DAG)每个stage内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化(pipeline)

stage的边界有两种情况

  • 宽依赖上的Shuffle操作;
  • 已缓存分区,它可以缩短父RDD的计算过程。

父RDD完成计算后,可以在stage内启动一组任务计算丢失的分区。

image

支持检查点(Checkpointing)

尽管RDD中的Lineage信息可以用来故障恢复,但对于那些Lineage链较长的RDD来说,这种恢复可能很耗时。如果将Lineage链存到物理存储中,再定期对RDD执行检查点操作就很有效。

一般来说,Lineage链较长、宽依赖的RDD需要采用检查点机制。这种情况下,集群的节点故障可能导致每个父RDD的数据块丢失,因此需要全部重新计算。将窄依赖的RDD数据存到物理存储中可以实现优化,将数据点和不变的顶点状态存储起来,就不再需要检查点操作。

当前Spark版本提供检查点API,但由用户决定是否需要执行检查点操作。今后我们将实现自动检查点,根据成本效益分析确定RDD Lineage图中的最佳检查点位置。

值得注意的是,因为RDD是只读的,所以不需要任何一致性维护(例如写复制策略,分布式快照或者程序暂停等)带来的开销,后台执行检查点操作。

缓存管理

Worker节点将RDD分区以Java对象的形式缓存在内存中。由于大部分操作是基于扫描的,采取RDD级的LRU(最近最少使用)替换策略(即不会为了装载一个RDD分区而将同一RDD的其他分区替换出去)。

目前这种简单的策略适合大多数用户应用。另外,使用带参数的cache操作可以设定RDD的缓存优先级。

相关工作

分布式共享内存(DSM)。RDD可以看成是一个基于DSM研究得到的抽象。RDD提供了一个比DSM限制更严格的编程模型,并能在节点失效时高效地重建数据集。DSM通过检查点实现容错,而Spark使用Lineage重建RDD分区,这些分区可以在不同的节点上重新并行处理,而不需要将整个程序回退到检查点再重新运行。RDD能够像MapReduce一样将计算推向数据,并通过推测执行来解决某些任务计算进度落后的问题,推测执行在一般的DSM系统上是很难实现的。

Lineage。在科学计算和数据库领域,对于一些应用,如需要解释结果以及允许被重新生成、工作流中发现了bug或者数据集丢失需要重新处理数据,表示数据的Lineage和原始信息一直以来都是一个研究课题。RDD提供了一个受限的编程模型,在这个模型中使用细粒度的Lineage来表示是非常容易的,因此它可以被用于容错。

Spark内核设计的艺术: 第4章 SparkContext的初始化

4.1 SparkContext概述

创建sc的代码入口都主要放在sparkContext

Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端。了解Spark Driver的初始化,有助于读者理解用户应用程序在客户端的处理过程。

Spark Driver的初始化始终围绕着SparkContext的初始化。SparkContext可以算得上是所有Spark应用程序的发动机引擎,轿车要想跑起来,发动机首先要启动。SparkContext初始化完毕,才能向Spark集群提交任务。在平坦的公路上,发动机只需以较低的转速,较低的功率就可以游刃有余;在山区,你可能需要一台能够提供大功率的发动机,这样才能满足你转山的体验。这些参数都是通过驾驶员操作油门、档位等传送给发动机的,而SparkContext的配置参数则由SparkConf负责,SparkConf就是你的操作面板。

4.2 创建Spark环境

4.2.1 创建sparkEnv

变量定义

private var _env: SparkEnv = _

创建和初始化SparkEnv变量

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

sparkContext中的创建函数

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
    conf: SparkConf,
	isLocal: Boolean,
	listenerBus: LiveListenerBus): SparkEnv = {
	SparkEnv.createDriverEnv(conf, isLocal, listenerBus,      		         											SparkContext.numDriverCores(master, conf)
                            )
}

SparkEnv.createDriverEnv可以查看源代码的实现。

4.2.2 SparkEnv的使用

当要访问SparkEnv的信息时,一般需要先调用get函数,来获取一个SparkEnv对象。然后再调用执行环境中创建好的各个组件的对象,比如:BroadcastManager对象,shufflerManager对象,BlockManager对象,BlockManagerMaster对象,等。

4.3 SparkUI的实现

任何系统都需要提供监控功能,用浏览器能访问具有样式及布局,并提供丰富监控数据的页面无疑是一种简单、高效的方式。SparkUI就是这样的服务,它的构成如图所示。

image

在大型分布式系统中,采用事件监听机制是最常见的。为什么要使用事件监听机制?假如SparkUI采用Scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到Driver所在JVM的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况。由于函数调用多数情况下是同步调用,这就导致线程被阻塞,在分布式环境中,还可能因为网络问题,导致线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。

DAGScheduler是主要的产生各类SparkListenerEvent的源头,它将各种SparkListenerEvent发送到listenerBus的事件队列中,listenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,改变SparkListener中的统计监控数据,最终由SparkUI的界面展示。从上图中可以看到Spark里定义了很多监听器SparkListener的实现,包括JobProgressListener、EnvironmentListener、StorageListener、ExecutorsListener。

4.4 创建心跳接收器

SparkContext创建_heartbeatReceiver的代码:

// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

4.5 创建和启动调度系统(*)

TaskScheduler是SparkContext的重要组成部分,负责请求集群管理器给应用程序分配并允许Executor(一级调度)和给任务分配Executor并运行任务(二级调度)。DAGScheduler主要用于在任务正式交到TaskSchedulerImpl提交之前做一些准备工作,包括创建Job、将DAG中的RDD划分到不同的Stage、提交Stage等。

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

createTaskScheduler的具体实现请看源码, 这个函数很重要

4.6 初始化块管理器BlockManager

它囊括了存储体系中所有的组件和功能,是存储体系中重要的组件。

 _env.blockManager.initialize(_applicationId)

4.7 启动度量系统

Metrics-SystemSourceSink进行封装,将Source的数据输出到不同的Sink。Metrics-SystemSparkEnv内部组件之一,是整个Spark应用程序的度量系统。

// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler=>ui.foreach(_.attachHandler(handler)))

4.8 创建事件日志监听器

EventLoggingListener是讲事件持久化到存储的监听器。

_eventLogger =
  if (isEventLogEnabled) {
    val logger =
      new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
        _conf, _hadoopConfiguration)
    logger.start()
    listenerBus.addToEventLogQueue(logger)
    Some(logger)
  } else {
}

4.9 创建和启动ExecutorAllocationManager

ExecutorAllocationManager是基于工作负载和删除Executor的代理。ExecutorAllocationManager和集群的关系如图:

image

val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
  if (dynamicAllocationEnabled) {
    schedulerBackend match {
      case b: ExecutorAllocationClient =>
        Some(new ExecutorAllocationManager(
          schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
          cleaner = cleaner))
      case _ =>
        None
    }
  } else {
    None
  }
_executorAllocationManager.foreach(_.start())

下图是Excutor的动态分配的过程。

image

4.10 ContentCleaner的创建与启动

_cleaner =
 if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
	Some(new ContextCleaner(this))
 } else {
	None
 }
_cleaner.foreach(_.start())

在文件ContextCleaner.scala中,start的定义如下:

  /** Start the cleaner. */
  def start(): Unit = {
    cleaningThread.setDaemon(true)
    cleaningThread.setName("Spark Context Cleaner")
    cleaningThread.start()
    periodicGCService.scheduleAtFixedRate(() => System.gc(),
      periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
  }

4.11 额外的SparkListener 与启动事件总线

SparkContext提供了用于自定义SparkListner的地方:

setupAndStartListenerBus()

setupAndStartListenerBus的实现:

  /**
   * Registers listeners specified in spark.extraListeners, then starts the listener bus.
   * This should be called after all internal listeners have been registered with the listener bus
   * (e.g. after the web UI and event logging listeners have been registered).
   */
  private def setupAndStartListenerBus(): Unit = {
    try {
      conf.get(EXTRA_LISTENERS).foreach { classNames =>
        val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
        listeners.foreach { listener =>
          listenerBus.addToSharedQueue(listener)
          logInfo(s"Registered listener ${listener.getClass().getName()}")
        }
      }
    } catch {
      case e: Exception =>
        try {
          stop()
        } finally {
          throw new SparkException(s"Exception when registering SparkListener", e)
        }
    }

    listenerBus.start(this, _env.metricsSystem)
    _listenerBusStarted = true
  }

4.12 Spark 环境更新

在SparkContext的初始化过程中会读取用户指定的Jar文件或者其他文件。

 _jars = Utils.getUserJars(_conf) 
_files =     _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten

4.13 SparkContext 初始化的收尾

postApplicationStart() // 向事件总线投递`SparkListnerApplicationStart`事件

// Post init
_taskScheduler.postStartHook()  // 等待ScudulerBackend准备完成
// 向度量系统注册Source
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_env.metricsSystem.registerSource(new JVMCPUSource())
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
appStatusSource.foreach(_env.metricsSystem.registerSource(_))

...

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
SparkContext.setActiveContext(this)

4.14 SparkContext提供的常用方法

4.14.1 broadcast

broadcast方法用于广播给定的对象。

  /**
   * Broadcast a read-only variable to the cluster, returning a
   * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
   * The variable will be sent to each cluster only once.
   *
   * @param value value to broadcast to the Spark nodes
   * @return `Broadcast` object, a read-only variable cached on each machine
   */
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

4.14.2 addSparkListener

此方法用于向LiveListenerBus中添加实现了特制SparkListnerInterface的监听器。

  /**
   * Add a file to be downloaded with this Spark job on every node.
   *
   * If a file is added during execution, it will not be available until the next TaskSet starts.
   *
   * @param path can be either a local file, a file in HDFS (or other Hadoop-supported
   * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
   * use `SparkFiles.get(fileName)` to find its download location.
   *
   * @note A path can be added only once. Subsequent additions of the same path are ignored.
   */
  def addFile(path: String): Unit = {
    addFile(path, false)
  }

4.14.3 多种多样的runJob

Spark提供了多个重载的runJob的方法,但是这些方法都最终会调用下面的runJob方法

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

4.14.4 SetCheckpointDir

用于给罪业中的RDD指定检查点保存的目录。指定检查点目录式启动检查点机制的前提。

4.15 SparkContext 的伴生对象

4.15.1 setActiveContext

用于将SparkContext作为激活的SparkContext,保存到activeContext中。

4.15.2 getOrCreate(config: SparkConf)

如果没有激活的SparkContext,则构造SparkContext,并调用setActiveContext方法保存到activeContext中,最后返回激活的SparkContext

还提供了无参的getOrCreate()方法,其实现与getOrCreate(config: SparkConf)类似。

Spark系列: DataFrame介绍

简介

DataFrame是一种不可变的分布式数据集,它被组织成指定的列,类似于关系型数据库中的表。

可以很方便的使用Spark SQL来查询结构化的数据或者使用Spark表达式方法。

DataFrame与RDD的区别

DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。

image

从上面的图中可以看出DataFrame和RDD的区别。RDD是分布式的 Java对象的集合,比如,RDD[Person]是以Person为类型参数,但是,Person类的内部结构对于RDD而言却是不可知的。DataFrame是一种以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供了详细的结构信息,也就是我们经常说的模式(schema),Spark SQL可以清楚地知道该数据集中包含哪些列、每列的名称和类型。
和RDD一样,DataFrame的各种变换操作也采用惰性机制,只是记录了各种转换的逻辑转换路线图(是一个DAG图),不会发生真正的计算,这个DAG图相当于一个逻辑查询计划,最终,会被翻译成物理查询计划,生成RDD DAG,按照之前介绍的RDD DAG的执行方式去完成最终的计算得到结果。

DataFrame的创建

从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。

SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。

DataFrame的操作

从RDD转换成DataFrame

Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是,利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。

利用反射机制推断RDD模式

在利用反射机制推断RDD模式时,我们会用到toDF()方法

查看示例

使用编程方式定义RDD模式

使用sparkSession的createDataFrame(rdd, schema)编程方式定义RDD模式。

查看示例

把DataFrame保存成文件

第1种保存方法

>>> df.select("name", "age").write.format("csv").save("file:///home/shilinlee/newperson.csv")

另外,write.format()支持输出 json,parquet, jdbc, orc, libsvm, csv, text等格式文件,如果要输出文本文件,可以采用write.format(“text”),但是,需要注意,只有select()中只存在一个列时,才允许保存成文本文件,如果存在两个列,比如select(“name”, “age”),就不能保存成文本文件。

第2种保存方法

>>> df.rdd.saveAsTextFile("file:///home/shilinlee/newpeople.txt")

可以看出,我们是把DataFrame转换成RDD,然后调用saveAsTextFile()保存成文本文件。

如果我们要再次把newpeople.txt中的数据加载到RDD中,可以直接使用newpeople.txt目录名称,而不需要使用part-00000文件,如下:

>>> textFile = sc.textFile("file:///home/shilinlee/newpeople.txt")

读写Parquet

读Parquet

InfoQ: 深入分析 Parquet 列式存储格式

Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:

  • 查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
  • 计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
  • 数据模型: Avro, Thrift, Protocol Buffers, POJOs
    Spark已经为我们提供了parquet样例数据,就保存在“/usr/local/spark/examples/src/main/resources/”这个目录下,有个users.parquet文件,这个文件格式比较特殊,如果你用vim编辑器打开,或者用cat命令查看文件内容,肉眼是一堆乱七八糟的东西,是无法理解的。只有被加载到程序中以后,Spark会对这种格式进行解析,然后我们才能理解其中的数据。
    下面代码演示了如何从parquet文件中加载数据生成DataFrame。

代码演示

写Parquet

下面介绍如何将DataFrame保存成parquet文件。

>>> peopleDF = spark.read.json("file:///home/shilinlee/people.json")
>>> peopleDF.write.parquet("file:///home/shilinlee/newpeople.parquet")

现在问题来了,如果我们要再次把这个刚生成的数据又加载到DataFrame中,应该加载哪个文件呢?很简单,只要加载newpeople.parquet目录即可,而不是加载这2个文件,语句如下:

>>> val users = spark.read.parquet("file:///home/shilinlee/newpeople.parquet")

略:JDBC(MySQL)、Hive。。。

大数据处理框架Hadoop的学习

Hadoop简介

  • Hadoop是基于Java语言开发的,具有很好的跨平台特性,并且可以部署在廉价的计算机集群中
  • Hadoop的核心是分布式文件系统HDFS(Hadoop Distributed File System)和MapReduce。
  • 经过多年发展,Hadoop项目已经变得非常成熟和完善,包括Common、Avro、Zookeeper、HDFS、MapReduce、HBase、Hive、Chukwa、Pig等子项目,其中,HDFS和MapReduce是Hadoop的两大核心组件。

Hadoop现状

  • 应用架构

image

  • 项目结构

    Hadoop的项目结构不断丰富发展,已经形成一个丰富的Hadoop生态系统

image

组件 功能
HDFS 分布式文件系统
MapReduce 分布式并行编程模型
YARN 资源管理和调度器
Tez 运行在YARN之上的下一代Hadoop查询处理框架
Hive Hadoop上的数据仓库
HBase Hadoop上的非关系型的分布式数据库
Pig 一个基于Hadoop的大规模数据分析平台,提供类似SQL的查询语言Pig Latin
Sqoop 用于在Hadoop与传统数据库之间进行数据传递
Oozie Hadoop上的工作流管理系统
Zookeeper 提供分布式协调一致性服务
Storm 流计算框架
Flume 一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
Ambari Hadoop快速部署工具,支持Apache Hadoop集群的供应、管理和监控
Kafka 一种高吞吐量的分布式发布订阅消息系统,可以处理消费者规模的网站中的所有动作流数据
Spark 类似于Hadoop MapReduce的通用并行框架

Hadoop集群的部署与使用

集群节点类型

  • Hadoop框架中最核心的设计是为海量数据提供存储的HDFS和对数据进行计算的MapReduce

  • MapReduce的作业主要包括:

    • 从磁盘或从网络读取数据,即IO密集工作;
    • 计算数据,即CPU密集工作
  • 一个基本的Hadoop集群中的节点主要有

    • NameNode:负责协调集群中的数据存储
    • DataNode:存储被拆分的数据块
    • JobTracker:协调数据计算任务
    • TaskTracker:负责执行由JobTracker指派的任务
    • SecondaryNameNode:帮助NameNode收集文件系统运行的状态信息

集群的建立与安装

  • 为了缓解安装和维护每个节点上相同的软件的负担,可以使用一个自动化方法实现完全自动化安装,比如Red Hat Linux’ Kickstart、Debian或者Docker

  • 自动化安装部署工具,会通过记录在安装过程中对于各个选项的回答来完成自动化安装过程。

benchmark

  • Hadoop自带有一些基准测试程序,被打包在测试程序JAR文件中

  • 用TestDFSIO基准测试,来测试HDFS的IO性能

  • 用排序测试MapReduce:Hadoop自带一个部分排序的程序,这个测试过程的整个数据集都会通过洗牌(Shuffle)传输至Reducer,可以充分测试MapReduce的性能

Hadoop的发展

改进核心组件

  • 自身核心两大组件MapReduceHDFS的架构设计改进
组件 Hadoop1.0****的问题 Hadoop2.0****的改进
HDFS 单一名称节点,存在单点失效问题 设计了HDFS HA,提供名称节点热备机制
HDFS 单一命名空间,无法实现资源隔离 设计了HDFS Federation,管理多个命名空间
MapReduce 资源管理效率低 设计了新的资源管理框架YARN

其他组件不断丰富

  • Pig
  • Tez
  • Spark
  • Kafka
组件 功能 解决Hadoop中存在的问题
Pig 处理大规模数据的脚本语言,用户只需要编写几条简单的语句,系统会自动转换为MapReduce作业 抽象层次低,需要手工编写大量代码
Spark 基于内存的分布式并行编程框架,具有较高的实时性,并且较好支持迭代计算 延迟高,而且不适合执行迭代计算
Oozie 工作流和协作服务引擎,协调Hadoop上运行的不同任务 没有提供作业(Job)之间依赖关系管理机制,需要用户自己处理作业之间依赖关系
Tez 支持DAG作业的计算框架,对作业的操作进行重新分解和组合,形成一个大的DAG作业,减少不必要操作 不同的MapReduce任务之间存在重复操作,降低了效率
Kafka 分布式发布订阅消息系统,一般作为企业大数据分析平台的数据交换枢纽,不同类型的分布式系统可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实时高效交换 Hadoop生态系统中各个组件和其他产品之间缺乏统一的、高效的数据交换中介

HDFS HA(high availiability)

HDFS1.0

  • NameNode

    • 在磁盘上:FsImage和EditLog

    • 在内存中:映射信息,即文件包含哪些块,每个块存储在哪个数据节点

  • SecondryNameNode

    • SecondaryNameNode会定期和NameNode通信

    • 从NameNode上获取到FsImage和EditLog文件,并下载到本地的相应目录下

    • 执行EditLog和FsImage文件合并

    • 将新的FsImage文件发送到NameNode节点上

    • NameNode使用新的FsImage和EditLog(缩小了)

HDFS 2.0

  • HDFS HA(High Availability)是为了解决单点故障问题

  • HA集群设置两个名称节点,“活跃(Active)”和“待命(Standby)”

  • 两种名称节点的状态同步,可以借助于一个共享存储系统来实现

  • 一旦活跃名称节点出现故障,就可以立即切换到待命名称节点

  • Zookeeper确保一个名称节点在对外服务

  • 名称节点维护映射信息,数据节点同时向两个名称节点汇报信息

image

共享存储系统: EditLog实时同步

HDFS Federation

  • HDFS集群扩展性
  • 性能更高效
  • 良好的隔离性

HDFS Federation并不能解决单点故障问题

YARN

MapReduce1.0

image

YARN设计思路

image

  • 到了Hadoop2.0以后,MapReduce1.0中的资源管理调度功能,被单独分离出来形成了YARN,它是一个纯粹的资源管理调度框架,而不是一个计算框架

  • 被剥离了资源管理调度功能的MapReduce 框架就变成了MapReduce2.0,它是运行在YARN之上的一个纯粹的计算框架,不再自己负责资源调度管理服务,而是由YARN为其提供资源管理调度服务

YARN体系结构

ResourceManager

  • ResourceManager(RM)是一个全局的资源管理器,负责整个系统的资源管理和分配,主要包括两个组件,即调度器(Scheduler)和应用程序管理器(Applications Manager)

  • 调度器接收来自ApplicationMaster的应用程序资源请求,把集群中的资源以“容器”的形式分配给提出申请的应用程序,容器的选择通常会考虑应用程序所要处理的数据的位置,进行就近选择,从而实现“计算向数据靠拢”

  • 容器(Container)作为动态资源分配单位,每个容器中都封装了一定数量的CPU、内存、磁盘等资源,从而限定每个应用程序可以使用的资源量

  • 调度器被设计成是一个可插拔的组件,YARN不仅自身提供了许多种直接可用的调度器,也允许用户根据自己的需求重新设计调度器

  • 应用程序管理器(Applications Manager)负责系统中所有应用程序的管理工作,主要包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动等

RM管理的对象是Applications Master

Applications Master

ResourceManager接收用户提交的作业,按照作业的上下文信息以及从NodeManager收集来的容器状态信息,启动调度过程,为用户作业启动一个ApplicationMaster。

ApplicationMaster的主要功能是:

  • 当用户作业提交时,ApplicationMaster与ResourceManager协商获取资源,ResourceManager会以容器的形式为ApplicationMaster分配资源;
  • 把获得的资源进一步分配给内部的各个任务(Map任务或Reduce任务),实现资源的“二次分配”;
  • 与NodeManager保持交互通信进行应用程序的启动、运行、监控和停止,监控申请到的资源的使用情况,对所有任务的执行进度和状态进行监控,并在任务发生失败时执行失败恢复(即重新申请资源重启任务);
  • 定时向ResourceManager发送“心跳”消息,报告资源的使用情况和应用的进度信息;
  • 当作业完成时,ApplicationMaster向ResourceManager注销容器,执行周期完成。

NodeManager

NodeManager是驻留在一个YARN集群中的每个节点上的代理,主要负责:

  • 容器生命周期管理
  • 监控每个容器的资源(CPU、内存等)使用情况
  • 跟踪节点健康状况
  • 以“心跳”的方式与ResourceManager保持通信
  • 向ResourceManager汇报作业的资源使用情况和每个容器的运行状态
  • 接收来自ApplicationMaster的启动/停止容器的各种请求

YARN和Hadoop平台其他组件的统一部署

image

YARN的目标

  • YARN的目标就是实现“一个集群多个框架”。

  • 由YARN为这些计算框架提供统一的资源调度管理服务,并且能够根据各种计算框架的负载需求,调整各自占用的资源,实现集群资源共享和资源弹性收缩

  • 可以实现一个集群上的不同应用负载混搭,有效提高了集群的利用率

  • 不同计算框架可以共享底层存储,避免了数据集跨集群移动

Hadoop生态系统

Pig

Pig用途

  • 提供了类似SQL的Pig Latin语言(包含Filter、GroupBy、Join、OrderBy等操作,同时也支持用户自定义函数)
  • 允许用户通过编写简单的脚本来实现复杂的数据分析,而不需要编写复杂的MapReduce应用程序
  • Pig会自动把用户编写的脚本转换成MapReduce作业在Hadoop集群上运行,而且具备对生成的MapReduce程序进行自动优化的功能
  • 用户在编写Pig程序的时候,不需要关心程序的运行效率,这就大大减少了用户编程时间

Pig书写格式

Pig语句通常按照如下的格式来编写:

  • 通过LOAD语句从文件系统读取数据

  • 通过一系列“转换”语句对数据进行处理

  • 通过一条STORE语句把处理结果输出到文件系统中,或者使用DUMP语句把处理结果输出到屏幕上

下面是一个采用Pig Latin语言编写的应用程序实例,实现对用户访问网页情况的统计分析:

visits             = load ‘/data/visits’ as (user, url, time);

gVisits          = group visits by url;
visitCounts  = foreach gVisits generate url, count(visits);
//得到的表的结构visitCounts(url,visits)
urlInfo          = load ‘/data/urlInfo’ as (url, category, pRank);
visitCounts  = join visitCounts by url, urlInfo by url;
//得到的连接结果表的结构visitCounts(url,visits,category,pRank)
gCategories = group visitCounts by category;
topUrls = foreach gCategories generate top(visitCounts,10);

store topUrls into ‘/data/topUrls’;

Pig应用场景

快速分析,不用直接写MapReduce任务。提高了效率,简化了流程。

Tez

  • 支持DAG作用的计算框架,核心是将Map和Reduce两个操作进一步划分,行程一个大的DAG作业。使用在数据仓库Hive中的话,性能提升100倍。
  • Tez可以优化MapReduce、Pig和Hive的性能。
  • Tez仅能优化Map和Reduce作业。

Spark

  • 内存计算,效率更高
  • 基于DAG的任务调度执行机制,优于MapReduce的迭代执行机制。

Kafka

  • Kafka是一种高吞吐量的分布式发布订阅消息系统,用户通过Kafka系统可以发布大量的消息,同时也能实时订阅消费消息。
  • Kafka可以同时满足在线实时处理和批量离线处理。

Automatically clean up expired schema data

元数据老化设计方案

问题

问题1:

由于一些业务,Tag 和 Field 会持续增加,导致元数据膨胀,而很多Tag和Field可能随着业务调整,不会再有新数据写入。导致这些元数据一直存在meta中,浪费空间,可能会影响性能。故需要设计一个安全删除过期元数据的方案。

问题2:

Tag 和 Field 不支持同名字段,需要兼容吗? Influxdb支持。

问题3:

某个表数据已经全部过期,是否需要删除整个measurement元数据。--- RP过期是否会删除?

本方案目标:

  • 清理过期元数据,主要是Tag和Field
  • Tag 和 Field 支持同名字段吗 // TODO:Thinking
  • 如果measurement没有任何shardkey,IndexRelation,则可以删除。

Tag Field支持同名

将Tag和Field拆成2个map

type MeasurementInfo struct {
	Name          string
  ......
  
  // 删除
  Schema map[string]int32
  
  // 新增
  Tags        map[string]int32
  Fields      map[string]int32
}

IndexGroup 引用 schema id

基于引用计数的清理。

每一个表的tag, field会被分配一个全局唯一的ID,按顺序递增1,2,3......,schema id就是上面说的ID。

  • tag, field中保存有IndexGroup的ref数量
  • IndexGroup中记录所有的scheme id

当其中一个IndexGroup过期后,对应的所有tag/field的引用计数 ref - 1,当其中的tag/field引用计数ref减为0了,则可以清理了。

// TODO:写互斥

meta元数据存在如下更新

IndexGroupInfo

type IndexGroupInfo struct {
	ID        uint64
	StartTime time.Time
	EndTime   time.Time
	Indexes   []IndexInfo
	DeletedAt time.Time
  
	// 新增
	SchemaBitmap uint64
}

MeasurementInfo

type KeyInfo struct {
	ID    uint64 // unique key id
  Ref   int    // IndexGroupInfo ref count
	Type  int32  // data type
}


type MeasurementInfo struct {
	Name          string
  ......
  
  // 删除
  Schema map[string]int32
  
  // 新增
	Tags        map[string]KeyInfo
  Fields      map[string]KeyInfo
}


"MeasurementInfo": {
   "name": "mst_0001",
   "Schema": 
           {
            "tag1": {
              "id": 1,
              "ref": 1,
              "typ": String
            },
            "tag2": {
              "id": 2,
              "ref": 2,
              "typ": String
            },
            "field1": {
              "id": 3,
              "ref": 1,
              "typ": Integer
            },
            "field2": {
              "id": 4,
              "ref": 2,
              "typ": Float
            }
				}
}

RetentionPolicyInfo

type RetentionPolicyInfo struct {
  IndexGroups          []IndexGroupInfo            // each IndexGroupInfo contains Indexes that expire at the same time for all nodes
  Measurements         map[string]*MeasurementInfo // {"cpu_0001": *MeasurementInfo}
  
  // 新增
  KeyID   uint64 // 此RP,保证每一个tag/field能分配一个递增的ID
}

写流程

image

  • 写IndexGroup1(IG1), 写入新tag/field

    • SQL创建IndexGroup,记录Tag/Field的ID。

    • SQL创建Tag/Field,ref加1

  • 已存在IndexGroup1(IG1),写入新tag/field

    • 获取IndexGroup1 ID,增加记录Tag/Field的ID
    • SQL创建Tag/Field,ref加1
  • 已存在IndexGroup1(IG1),写入旧 tag/field

    • 获取IndexGroup ID,此IndexGroup未引用过此tag/field的ID,需要频繁和meta交互。
    • 无需更改meta元数据
  • 写新IndexGroup2,写入新/旧 tag/field

    • SQL创建IndexGroup,记录Tag/Field的ID。
    • SQL【创建】Tag/Field,ref加1
  • 已存在IndexGroup2(IG2),旧 tag/field

schema id

写入时,由meta统一递增分配

tag/field过期老化

随着IndexGroup过期,他所记录的所有tag/field相关的ref都减1,当有ref==0时,可删除。

删除和写互斥判断:TODO

schama查询

  • show tag keys [ condition ]
  • show field keys [ condition ]

修改点

![image-20230406204905577](/Users/xiangyu/Library/Application Support/typora-user-images/image-20230406204905577.png)

Q&As

Spark内核设计的艺术: 第6章 存储体系

6.1 存储体系概述

简单来讲,Spark存储体系是各个Driver与Executor实例中的BlockManager所组成的;但是从一个整体来看,把各个节点的BlockManager看成存储体系的一部分,那存储体系就有了更多衍生的内容,比如块传输服务、map任务输出跟踪器、Shuffle管理器等。

image

  • BlockManagerMaster:代理BlockManager与Driver上的BlockManagerMasterEndpoint通信。记号①表示Executor节点上的BlockManager通过BockManagerMasterBlockManagerMasterEndpoint进行通信,记号②表示Driver节点上的BlockManager通过BlockManagerMasterBlockManagerMasterEndpoint进行通信。这些通信的内容有很多,例如,注册BlockManager、更新Block信息、获取Block的位置(即Block所在的BlockManager)、删除Executor等。BlockManagerMaster之所以能够和BlockManagerMasterEndpoint通信,是因为它持有了BlockManagerMasterEndpointRpcEndpointRef
  • BlockManagerMasterEndpoint:由Driver上的SparkEnv负责创建和注册到Driver的RpcEnv中。BlockManagerMasterEndpoint只存在于Driver的SparkEnv中,Driver或Executor上BlockManagerMaster的driverEndpoint属性将持有BlockManagerMasterEndpointRpcEndpointRefBlockManagerMasterEndpoint主要对各个节点上的BlockManagerBlockManager与Executor的映射关系及Block位置信息(即Block所在的BlockManager)等进行管理。
  • BlockManagerSlaveEndpoint:每个Executor或Driver的SparkEnv中都有属于自己的BlockManagerSlaveEndpoint,分别由各自的SparkEnv负责创建和注册到各自的RpcEnv中。Driver或Executor都存在各自的BlockManagerSlaveEndpoint,并由各自BlockManagerslaveEndpoint属性持有各自BlockManagerSlaveEndpoint下发的命令。记号③表示BlockManagerMasterEndpoint向Driver节点上的BlockManagerSlaveEndpoint下发命令,记号④表示BlockManagerMasterEndpoint向Executor节点上的BlockManagerSlaveEndpoint下发命令。例如,删除Block、获取Block状态、获取匹配的BlockId等。
  • SerializerManager:序列化管理器
  • MemoryManager:内存管理器。负责对单个节点上内存的分配与回收
  • MapOutPutTracker:map任务输出跟踪器。
  • ShuffleManager:Shuffle管理器
  • BlockTransferService:块传输服务。此组件也与Shuffle相关联,主要用于不同阶段的任务之间的Block数据的传输与读写。
  • shuffleClinet:Shuffle的客户端。与BlockTransferService配合使用。记号⑤表示Executor上的shuffleClient通过Driver上的BlockTransferService提供的服务上传和下载Block,记号⑥表示Driver上的shuffleClient通过Executor上的BlockTransferService提供的服务上传和下载Block。此外,不同Executor节点上的BlockTransferServiceshuffleClient之间也可以互相上传、下载Block。
  • SecurityManager:安全管理器
  • DiskBlockManager:磁盘块管理器。对磁盘上的文件及目录的读写操作进行管理
  • BlockInfoManager:块信息管理器。负责对Block的元数据及锁资源进行管理
  • MemoryStore:内存存储。依赖于MemoryManager,负责对Block的内存存在
  • DiskStore:磁盘存储。依赖于DiskBlockManager,负责对Block的磁盘存储

6.2 Block信息管理器

BlockInfoManager将主要对Block的锁资源进行管理。

6.2.1 Block锁的基本概念

BlockInfoManagerBlockManager内部子组件之一。它对Block的锁管理,读锁是共享锁,写锁是排它锁。

image

6.2.2 Block锁的实现

BlockInfoManager提供的方法实现Block的锁管理机制。

  • registerTask: 注册TaskAttenptId
  • currentTaskAttemptId: 获取上下文TaskContext中当前值横在执行的任务尝试的TaskAttenptId
  • lockForReading: 锁定读。
  • lockForWriting: 锁定写。
  • get: 获取BlockId对应的BlockInfo
  • unlock: 释放BlockId对应的Block上的锁。
  • downgradeLock: 锁降级。
  • lockNewBlockForWriting: 写新Block时获得写锁。
  • releaseAllLocksForTask: 释放给定的任务尝试线程所占用的所有Block的锁,并通知所有等待获取锁的线程。
  • size: 返回Infos的大小,即所有Block的数量。
  • entries: 以迭代器形式返回Infos
  • removeBlock: 移除BlockId对用的·。
  • clear: 清除BlockInfoManager中的所有信息,并通知所有在BlockInfoManager管理的Block的锁上等待的线程。

6.3 磁盘Block管理器

DiskBlockManager是存储体系的成员之一。它来创建和控制逻辑上的映射关系(逻辑上的 block 和物理盘上的 locations)。 一个 block 被映射为一个文件 File,这个文件的文件名由指定的 BlockId.name 指定。

6.3.1 本地目录结构

localDirsDiskBlockManager管理的本地目录数组,是通过调用createLocalDirs方法创建的本地目录数组。

image

6.3.2 DiskBlockManager 提供的方法

  • getFile(filename: String)
  • getFile(blockId: BlockId)
  • containsBlock(blockId: BlockId): 检查本地localDirs目录中是否包含BlockId对应的文件。
  • getAllFiles: 获取本地localDirs目录中所有文件。
  • getAllBlocks: 获取本地localDirs目录中所有Block的BlockId
  • createTempLocalBlock: 为中间结果创建唯一的BlockId和文件,此文件将用于保存本地Block的数据。
  • createTempShuffleBlock: 创建唯一的BlockId和文件,用来存储Shuffle中间结果(即map任务的输出)。
  • stop: 正常停止DiskBlockManager

6.4 磁盘存储 DiskStore

DiskStore 将负责将Block存储到磁盘。

属性:

  • conf
  • diskManager: DiskBlockManager
  • minMemoryMapBytes: 读取磁盘中的Block时,是直接读取还是使用FileChannel的内存镜像映射方法读取的阈值。

方法:

  • getSize(blockId: BlockId)
  • contaions(blockId: BlockId): 判断本地磁盘存储路径下是否包含给定的BlockId所对应的Block文件。
  • remove(blockId: BlockId)
  • putBytes(blockId: BlockId, bytes: ChunkedBytesBuffer): 将BlockId所对应的Block写入磁盘,Block的内容已经封装为ChunkedBytesBuffer
  • getBytes(blockId: BlockId): 读取对应的Block,并封装为ChunkedBytesBuffer返回。

6.5 内存管理

Spark与Hadoop的重要区别之一就是对于内存的使用。

6.5.1 内存池模型

Spark将内存从逻辑上区分为堆内存堆外内存,称为内存模式(MemoryModel)。

@Private
public enum MemoryMode {
  ON_HEAP,
  OFF_HEAP
}

Spark一共有两种MemoryPool的实现,分别是StorageMemoryPool(存储体系用的内存池)和ExecutionMemoryPool(计算引擎用到的内存池)

6.5.2 StorageMemoryPool详解

StorageMemoryPool是对用于存储的物理内存的逻辑抽象,通过对存储内存的逻辑管理,提高Spark存储体系对内存的使用效率。

  • acquireMemory(blockId: BlockId, numBytes: Long): Boolean 用于给定BlockId的Block获取numBytes指定大小的内存。
  • releaseMemory(size: Long): Unit
  • freeSpaceToShrinkPool(spaceToFree: Long): Long 用于释放指定大小的空间,缩小内存池的大小。

6.5.3 MemoryManager 模型

MemoryManager定义了内存管理的接口规范。

MemoryManager有两个子类,分别是StaticMemoryManagerUnifiedMemoryManager

StaticMemoryManager是早期遗留的机制,不存在堆外内存模型,并且存储内存和执行内存的大小均为固定的。UnifiedMemoryManager从Spark 1.6.0版本开始,作为默认的内存管理器。

6.5.4 UnifiedMemoryManager

UnifiedMemoryManager将计算内存和存储内存之间的边界修改为“软”边界,即任何一方可以向另一方借用空闲的内存。

  • maxOnHeapStorageMemory: Long
  • maxOffHeapStorageMemory: Long
  • acquireExecutionMemory : 为存储BlockId对应的Block,从堆内存或堆外内存获取所需大小的内存。
  • acquireUnrollMemory: 为展开BlockId的Block,从堆内存或者堆外内存获取所需大小的内存。

6.6 内存存储 MemoryStore

MemoryStore负责将Block存储到内存。Spark通过将广播数据、RDD、Shuffle数据存储到内存,减少了对磁盘I/O的依赖,提高了程序的读写效率。

6.6.1 MemoryStore 的内存模型

Block在内存中以什么形式存在呢?是将文件直接缓存到内存?Spark将内存中的Block抽象为特质MemoryEntry。

private sealed trait MemoryEntry[T] {
  def size: Long
  def memoryMode: MemoryMode
  def classTag: ClassTag[T]
}

MemoryStore 内存模型:

image

6.6.1 MemoryStore 提供的方法

便于对Block数据的存储和读取。

  • getSize(blockId: BlockId): Long : 用于获取BlockId对应MemoryEntry(即Block的内存形式)所占用的大小
  • putBytes[T: ClassTag]: 将BlockId对应的Block(已经封装为ChunkedByteBuffer)写入内存
  • reserveUnrollMemoryForThisTask: 用于为展开尝试执行任务给定的Block保留指定内存模式上指定大小的内存。
  • releaseUnrollMemoryForThisTask: 用于释放任务尝试线程占用的内存
  • putIteratorAsValues: 此方法将BlockId对应的Block(已经转换为Iterator)写入内存。有时候放入内存的Block很大,所以一次性将此对象写入内存可能将引发OOM异常。为了避免这种情况的发生,首先需要将Block转换为Iterator,然后渐进式地展开此Iterator,并且周期性地检查是否有足够的展开内存。此方法涉及很多变量。
  • getBytes: 从内存中读取BlockId对应的Block(已经封装为ChunkedByteBuffer),getBytes只能获取序列化的Block。
  • getValues:从内存中读取BlockId对应的Block(已经封装为Iterator),getValues只能获取没有序列化的Block。
  • remove(blockId: BlockId): Boolean : 从内存中移除BlockId对应的Block
  • evictBlocksToFreeSpace: 用于驱逐Block,以便释放一些空间来存储新的Block。
  • contains: 用于判断本地MemoryStore中是否包含给定的BlockId所应对的Block文件。

6.7 块管理器 BlockManager

BlockManager运行在每个节点上(包括Driver和Executor),提供对本地或远端节点上的内存、磁盘及堆外内存中Block的管理。存储体系从狭义上来说指的就是BlockManager,从广义上来说,则包括整个Spark集群中的各个 BlockManagerBlockInfoManagerDiskBlockManagerDiskStoreMemoryManagerMemoryStore、对集群中的所有BlockManager进行管理的BlockManagerMaster及各个节点上对外提供Block上传与下载服务的BlockTransferService

6.7.1 BlockManager的初始化

每个Driver或Executor在创建自身的SparkEnv时都会创建BlockManager,BlockManager只有在其initialize方法被调用后才能发挥作用。

6.7.2 BlockManager提供的方法

  • reregister(): Unit: 用于向BlockManagerMaster重新注册BlockManager,并向BlockManagerMaster报告所有的Block信息。
  • getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer]:用于存储体系获取BlockId所对应Block的数据,并封装为ChunkedByteBuffer后返回。
  • getBlockData(blockId: BlockId): ManagedBuffer : 此方法用于获取本地Block的数据。
  • putBytesputBytes实际调用的是doPutBytes方法
  • putBlockData: 用于将Block数据写入本地
  • getStatus(blockId: BlockId): Option[BlockStatus]: 用于获取Block的状态
  • getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId]: 用于获取匹配过滤器条件的BlockId 的序列, 代码中除了从BlockInfoManager的entries缓存中获取BlockId外,还需要从DiskBlockManager中获取,这是因为DiskBlockManager中可能存在BlockInfoManager不知道的Block。
  • getLocalValues(blockId: BlockId): Option[BlockResult]:用于从本地的BlockManager中获取Block数据。
  • getLocations(blockId: BlockId): Seq[BlockManagerId]getRemoteBytes方法的作用为从远端的BlockManager以序列化的字节形式获取Block数据。
  • get[T: ClassTag](blockId: BlockId): Option[BlockResult] :用于优先从本地获取Block数据,当本地获取不到所需的Block数据,再从远端获取Block数据。
  • downgradeLock(blockId: BlockId): Unit:将当前线程持有的Block的写锁降级为读锁。
  • releaseLock(blockId: BlockId): Unit:用于当前线程对持有的Block的锁进行释放
  • registerTask(taskAttemptId: Long): Unit:用于将任务尝试线程注册到BlockInfoManager
  • releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId]: 用于任务尝试线程对持有的所有Block的锁进行释放。
  • getOrElseUpdate: 用于获取Block。如果Block存在,则获取此Block并返回BlockResult,否则调用makeIterator方法计算Block,并持久化后返回BlockResultIterator
  • putIterator:此方法用于将Block数据写入存储体系。
  • getDiskWriter:用于创建并获取DiskBlockObjectWriter,通过DiskBlockObjectWriter可以跳过对DiskStore的使用,直接将数据写入磁盘。
  • dropFromMemory:用于从内存中删除Block,当Block的存储级别允许写入磁盘,Block将被写入磁盘。此方法主要在内存不足,需要从内存腾出空闲空间时使用。
  • removeRdd(rddId: Int): Int :移除属于指定RDD的所有Block。
  • removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int:移除属于指定Broadcast的所有Block。

6.8 BlockManagerMaster对BlockManager的管理

BlockManagerMaster 的作用是对存在于Executor或Driver上的BlockManager进行统一管理。Executor与Driver关于BlockManager的交互都依赖于BlockManagerMaster,比如Executor需要向Driver发送注册BlockManager、更新Executor上Block的最新信息、询问所需要Block目前所在的位置及当Executor运行结束需要将此Executor移除等。但是Driver与Executor却位于不同机器中,该怎么实现呢?

在Spark执行环境一节中有介绍过,Driver上的BlockManagerMaster 会实例化并且注册BlockManagerMasterEndpoint。无论是Driver还是Executor,它们的BlockManagerMasterdriverEndpoint属性都将持有BlockManagerMasterEndpointRpcEndpiointRef。无论是Driver还是Executor,每个BlockManager都拥有自己的BlockManagerSlaveEndpoint,且BlockManagerslaveEndpoint属性保存着各自BlockManagerSlaveEndpointRpcEndpointRefBlockManagerMaster负责发送消息,BlockManagerMasterEndpoint负责消息的接收与处理,BlockManagerSlaveEndpoint则接收BlockManagerMasterEndpoint下发的命令。

6.8.1 BlockManagerMaster的职责

BlockManagerMaster负责发送各种与存储体系相关的信息,这些消息的类型如下:

  • RemoveExecutor(移除Executor)
  • RegisterBlockManager(注册BlockManager
  • UpdateBlockInfo(更新Block信息)
  • GetLocations(获取Block的位置)
  • GetLocationsMultipleBlockIds(获取多个Block的位置)
  • GetPeers(获取其它BlockManagerBlockManagerId
  • GetExecutorEndpointRef(获取Executor的EndpointRef引用)
  • RemoveBlock(移除Block)
  • RemoveRdd(移除Rdd Block)
  • RemoveShuffle(移除Shuffle Block)
  • RemoveBroadcast(移除Broadcast Block)
  • GetMemoryStatus(获取指定的BlockManager的内存状态)
  • GetStorageStatus(获取存储状态)
  • GetMatchingBlockIds(获取匹配过滤条件的Block)
  • HasCachedBlocks(指定的Executor上是否有缓存的Block)
  • StopBlockManagerMaster(停止BlockManagerMaster

6.8.2 BlockManagerMasterEndpoint详解

BlockManagerMasterEndpoint接收Driver或Executor上BlockManagerMaster发送的消息,对所有的BlockManager统一管理BlockManager的属性。

BlockManagerMasterEndpoint接收的消息类型正好与BlockManagerMaster所发送的消息一一对应。选取RegisterBlockManager消息来介绍BlockManagerMasterEndpoint是如何接收和处理RegisterBlockManager消息。

6.8.3 BlockManagerSlaveEndpoint详解

BlockManagerSlaveEndpoint用于接收BlockManagerMasterEndpoint的命令并执行相应的操作。BlockManagerSlaveEndpoint也重写了RpcEndpointreceiveAndReply方法。

6.9 Block传输服务

BlockTransferServiceBlockManager的子组件之一,抽象类BlockTransferService有个实现类:

  • 用于测试的MockBlockTransferService
  • NettyBlockTransferService

BlockManager实际采用了NettyBlockTransferService提供的Block传输服务。

为什么要把由Netty实现的网络服务组件也放到存储体系里,由于Spark是分布式部署的,每个Task(准确说是任务尝试)最终都运行在不同的机器节点上。map任务的输出结果直接存储到map任务所在机器的存储体系中,reduce任务极有可能不在同一机器上运行,所以需要远程下载map任务的中间输出。NettyBlockTransferService提供了可以被其它节点的客户端访问的Shuffle服务。

有了Shuffle的服务端,那么也需要相应的Shuffle客户端,以便当前节点将Block上传到其它节点或者从其它节点下载Block到本地。BlockManager中创建Shuffle客户端的代码如下:

//org.apache.spark.storage.BlockManager
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
  new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
    securityManager.isSaslEncryptionEnabled())
} else {
  blockTransferService
}

6.9.1 初始化NettyBlockTransferService

NettyBlockTransferService只有在其init方法被调用 ,即被初始化后才提供服务。根据块管理器BlockManager可知,BlockManager在初始化的时候,将调用NettyBlockTransferServiceinit方法。

6.9.2 NettyBlockRpcServer详解

下面重点来看看NettyBlockTransferServiceNettyRpcEnv的最大区别——使用的RpcHandler实现类不同,NettyRpcEnv采用了NettyRpcHandler,而NettyBlockTransferService则采用了NettyBlockRpcServer

  • OneForOneStreamManager的实现

    NettyBlockRpcServer中使用OneForOneStreamManager来提供一对一的流服务。OneForOneStreamManager实现StreamManagerregisterChannelgetChunkconnectionTerminatedcheckAuthorizationregisterStream五个方法。OneForOneStreamManager将处理ChunkFetchRequest类型的消息。

  • NettyBlockRpcServer的实现

6.9.3 Shuffle客户端

如果没有部署外部的Shuffle服务,即spark.shuffle.service.enabled属性为false时,NettyBlockTransferService不但通过OneForOneStreamManagerNettyBlockRpcServer对外提供Block上传与下载的服务,也将作为默认的Shuffle客户端。NettyBlockTransferService作为Shuffle客户端,具有发起上传和下载请求并接收服务端响应的能力。NettyBlockTransferService的两个方法——fetchBlocksuploadBlock将具有此功能。

  • 发送下载远端Block的请求
  • 同步下载远端Block
  • 发送想远端上传Block的请求
  • 同步向远端上传Block

6.10 DiskBlockObjectWriter 详解

BlockManagergetDiskWriter方法用于创建DiskBlockObjectWriterDiskBlockObjectWriter 将在Shuffle阶段将map任务的输出写入磁盘,这样reduce任务就能从磁盘中获取map任务的中间输出了。

DiskBlockObjectWriter 用于将JVM中的对象直接写入磁盘文件中。DiskBlockObjectWriter 允许将数据追加到现有Block。为了提高效率,DiskBlockObjectWriter 保留了跨多个提交的底层文件通道。

  • open: 用于打开要写入文件的各种输出流及管道。
  • recordWritten:用于对写入的记录数进行统计和度量
  • write: 用于向输出流中写入键值对。
  • commitAdnGet:用于将输出流中的数据写入到磁盘。

Spark内核设计的艺术: 第5章 Spark执行环境

5.1 SparkEnv 概述

SparkEnv的私有方法create用于创建SparkEnv。其内部组件如图:

image

名称 说明
SecurityManager 主要对账户、权限及身份认证进行设置与管理。
RpcEnv 各个组件之间通信的执行环境。
SerializerManager Spark 中很多对象在通用网络传输或者写入存储体系时,都需要序列化。
BroadcastManager 用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。
MapOutputTracker 用于跟踪Map阶段任务的输出状态,此状态便于Reduce阶段任务获取地址及中间结果。
ShuffleManager 负责管理本地及远程的Block数据的shuffle操作。
MemoryManager 一个抽象的内存管理器,用于执行内存如何在执行和存储之间共享。
NettyBlockTransferService 使用Netty提供的异步事件驱动的网络应用框架,提供Web服务及客户端,获取远程节点上Block的集合。
BlockManagerMaster 负责对BlockManager的管理和协调。
BlockManager 负责对Block的管理,管理整个Spark运行时的数据读写的,当然也包含数据存储本身,在这个基础之上进行读写操作。
MetricsSystem 一般是为了衡量系统的各种指标的度量系统。
OutputCommitCoordinator 确定任务是否可以把输出提到到HFDS的管理者,使用先提交者胜的策略。

5.2 安全管理器 SecurityManager

SecurityManager主要对帐号、权限以及身份认证进行设置和管理。如果 Spark 的部署模式为 YARN,则需要生成 secret key (密钥)并存储 Hadoop UGI。而在其他模式下,则需要设置环境变量 _SPARK_AUTH_SECRET(优先级更高)或者 spark.authenticate.secret 属性指定 secret key (密钥)。最后SecurityManager 中设置了默认的口令认证实例 Authenticator,此实例采用匿名内部类实现,用于每次使用 HTTP client 从 HTTP 服务器获取用户的用户和密码。这是由于 Spark 的节点间通信往往需要动态协商用户名、密码,这种方式灵活地支持了这种需求。

val securityManager = new SecurityManager(conf, ioEncryptionKey, authSecretFileConf)

SecurityManager内部有很多属性。

  • authOn:是否开启认证。
  • aclsOn:是否对账号进行授权检查。
  • adminAcls:管理员账号集合。

5.3 RPC 环境

RpcEnv组件肩负着替代Spark 2.x.x以前版本中采用的Akka。SparkEnv创建RpcEnv代码如下:

val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), 				conf, securityManager, numUsableCores, !isDriver)

在RpcEnv的create方法中也只有如下代码:

val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port,                            securityManager, numUsableCores, clientMode)
new NettyRpcEnvFactory().create(config)

5.3.1 RPC 端点 RpcEndpoint

RPC端点是对Spark的PRC通信实体的统一抽象, 所有运行与RPC框架至上的实体都应该继承RpcEndpointRpcEndpoint是替代AkkaActorRpcEndpoint是能够处理RPC请求,给一特定服务提供本地调用及跨节点调用的RPC组件的抽象。

5.3.1.1 RpcEndpoint 的定义

查看源码,自带注释。

RpcEndpoint的一些接口非常类似于Akka的Actor

5.3.1.2 特质 RpcEndpoint 的继承体系

RpcEndpoint 只是一个特质,除了对接口的定义,并没有任何实现的逻辑。下图展示了那些子类实现了RpcEndpoint `。

image

其中灰色的子类型DummyMaster(Mummy意为虚拟的、假的),它只是用来测试的。ThreadSafeRpcEndpoint适用于必须是线程安全的场景,被很多继承者实现,比如HeartbeatReceiverMaster,遇到了再做具体分析。

5.3.2 RPC 端点引用 RpcEndpointRef

RpcEndpointRefActorRef的替代品。要向远端的RpcEndpoint发起请求,必须持有这个RpcEndpointRpcEndpointRef

image

下面介绍什么事消息投递规则。

5.3.2.1 消息投递规则

  • at-most-once: 意味着每条应用了这种机制的消息会被投递0次或1次。可以说这条消息可能会丢失。
  • at-least-once: 潜在的存在多次投递尝试并保证至少成功一次。
  • exaxctly-once: 只会准确发送一次,这种消息不会丢失、也不会重复。

5.3.2.2 RpcEndpointRef 的定义

它定义了所有RpcEndpoint引用的属性与接口。代码请看源码

/**
* Sends a one-way asynchronous message. Fire-and-forget semantics.
* 属于 at-most-once 投递规则
*/
def send(message: Any): Unit

  /**
   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
   * receive the reply within a default timeout.
   * 这个就要等待服务端的返回了
   * This method only sends the message once and never retries.
   */
  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

5.3.3 创建传输上下文 TransportConf

由于RPC环境RpcEnv的底层需要依赖于数据总线,因此需要创建传输上下文TransportConf

源代码

private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

5.3.4 消息调度器 Dispatcher (*)

Dispatcher 是有效提法哦NettyRpcEnv对消息异步处理并最大提升并行性处理能力的前提。Dispatcher负责将RPC消息路由到要该对此消息处理的RPCEndpoint。

5.3.5 传输上下文 TransportContext

NettyRpcEnv中,创建TransportContext的代码如下:

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

5.3.5.1 NettyStreamManager

NettyStreamManager专门用来提供文件服务的能力。定义了3个文件与目录缓存。

5.3.5.2 NettyRpcHandler

两个重载的receive的方法。

  • 对客户端进行响应的receive方法。

      override def receive(
          client: TransportClient,
          message: ByteBuffer,
          callback: RpcResponseCallback): Unit = {
        val messageToDispatch = internalReceive(client, message)
        dispatcher.postRemoteMessage(messageToDispatch, callback)
      }
  • 对客户端进行响应的receive重载方法。

      override def receive(
          client: TransportClient,
          message: ByteBuffer): Unit = {
        val messageToDispatch = internalReceive(client, message)
        dispatcher.postOneWayMessage(messageToDispatch)
      }

5.3.6 创建传输客户端工厂 TransportClientFactory

Spark与远端RpcEnv进行通信都依赖于TransportClientFactory 创建的TransportClient

5.3.7 创建TransportServer

  @volatile private var server: TransportServer = _

  private val stopped = new AtomicBoolean(false)

TransportServer并未在这实例化,而是在启动EpcEnv的偏函数StartNerryRpcEnv,它负责调用NettyRpcEnv的startServer方法。

5.3.8 客户端请求发送

5.3.9 NettyRpcEnv 中常用方法

  • 获取RpcEndPoint的引用对象RpcEndPointRef

      override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.getRpcEndpointRef(endpoint)
      }
  • 得到对应的RPCEndpointRef

  • 设置Endpoint

       override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.registerRpcEndpoint(name, endpoint)
      }

5.4 序列化管理器 SerializerManager

SparkEnv中有两个序列化的组件:

  • SerializerManager
  • closureSerializer
    val serializer = instantiateClassFromConf[Serializer](SERIALIZER)
    logDebug(s"Using serializer: ${serializer.getClass}")

    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

5.5 广播管理器 BroadcastManager

BroadcastManager用于将配置信息和序列化后的RDD、Job及ShuffleDependency等信息在本地存储。如果为了容灾,也会复制到其他节点上。在sparkEnv.scala中:

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

5.6 map任务输出跟踪器

mapOutputTracker用于跟踪map任务的输出状态,此状态便于reduce任务定位map输出结果所在节点地址,进而获取中间输出结果。每个map任务或者reduce任务都会有其唯一的表示,分别为mapIdreduceId。每个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务所在节点上拉取Block,这一过程叫Shuffle。每次shuffle都有唯一标识shuffleId

5.6.1 mapOutputTracker 的实现

5.6.2 mapOutputTrackerMaster 的实现

mapOutputTrackerWorkermap任务的跟踪信息,通过mapOutputTrackerMasterEndpointRPCEndpointRef发送给mapOutputTrackerMaster ,由mapOutputTrackerMaster负责管理和维护所有的map任务的输出跟踪信息。

5.7 构架存储体系

  • Shuffle管理器 ShuffleManager
  • 内存管理器 MemoryManager
  • 块传输服务 BlockTransferService
  • BlockManagerMaster
  • 磁盘块管理器 DiskBlockManager
  • 块锁管理器 BlockInfoManager
  • 块管理器 BlockManager

第6章会讲每个组件的功能。

5.8 创建度量系统

SparkEnv中,度量系统也是必不可少的一个子组件。

    val metricsSystem = if (isDriver) {
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf, securityManager)
    } else {
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set(EXECUTOR_ID, executorId)
      val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,
        securityManager)
      ms.start()
      ms
    }

metrics中,createMetricsSystem是MetricsSystem伴生对象提供的实现:

  def createMetricsSystem(
      instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
    new MetricsSystem(instance, conf, securityMgr)
  }
}

5.9 输出提交协调器

当Spark应用程序用了Spark SQL(包括Hive)或者需要将任务的输出保存到HDFS时,就会用到输出提交协调器 OutputCommitCoordinator,它将决定任务是否可以提交输出到HDFS。无论是Driver还是Executor,在SparkEnv中都包含子组件OutputCommitCoordinator

SparkEnv中,创建OutputCommitCoordinator的代码如下:

    val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
      new OutputCommitCoordinator(conf, isDriver)
    }
    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

5.9.1 OutputCommitCoordinatorEndpoint的实现

5.9.2 OutputCommitCoordinator的实现

5.9.2 OutputCommitCoordinator的工作原理

image

5.10 创建SparkEnv

当SparkEnv内部的所有组件都是丽华完毕,将正式创建SparkEnv。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.