Spark配置
SparkConf 是Spark的配置类,Spark中的每一个组件都直接或者间接的使用这个类存储的属性.
SparkConf中,使用ConcurrentHaskMap来存储这些属性,其中key以及value都是String
类型的.
/** 线程安全的,用于存储配置的各种属性 */
private val settings = new ConcurrentHashMap[String, String]()
从系统属性中加载
SparkConf的构造器中有一个布尔类型的loadDefaults,当loadDefaults为true时,将会从系统属性中加载Spark配置,而这些配置的key都是以spark.
开头的属性:
if (loadDefaults) {
// 加载系统中以spark.开头的系统属性
loadFromSystemProperties(false)
}
/**
* 加载系统中以spark.开始的系统属性
*
* @param silent 是否检查过时属性并打印警告️信息 true:不检查 false 检查
* @return
*/
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
我们可以从loadFromSystemProperties方法中可以看到,使用Utils工具类获取到系统属性后,进行遍历,遍历时如果是以spark.
开头的,就调用SparkConf的set方法存储到setting属性中.set方法的源码如下:
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
// 检查key和value,保证key和value都不为null
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
// 是否检查过时警告:false 检查
if (!silent) {
logDeprecationWarning(key)
}
// 如果key和value都不为null,将key和value存储到settings中
settings.put(key, value)
// 返回当前SparkConf的实例
this
}
使用SparkConf配置的API
从SparkConf的源码中可以看到,set方法被重载了多个,但是,最终都下面这一个set方法:
/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}
常用的通过SparkConf设置setMaster
, setAppName
等属性:
/**
* The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
*/
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
set("spark.jars", jars.filter(_ != null).mkString(","))
}
克隆SparkConf配置
在有些情况下,同一个SparkConf实例中的配置信息需要被Spark中的多个组件共用,在SparkConf的源码中可以看到,其继承了Cloneable
特质并实现了clone
方法,功能与java的一样,就是可以通过克隆来创建.
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable{
// 省略无关代码
def this() = this(true)
}
配置读取
Spark提供了设置属性,当然也提供了诸多获取属性的方法,但是最终调用的也都是下面这一个:
/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
}
在SparkConf的伴生对象中,还将对应版本号过时的配置信息存储到deprecatedConfigs
中,对应版本可选参数存储到configsWithAlternatives
中,更多关于配置信息,请参见源码.
Spark内置的RPC框架
在Spark中很多地方都涉及到网络通信,比如Spark各个组件间的消息互通,用户文件与Jar包上传,节点间的Shuffle过程,Block数据的复制与备份等.
在Spark2.0.0版本中节点间Shuffle过程和Block数据的复制与备份依然使用Netty.通过对接口和程序接口的重新设计,将各个组件间的消息互通,用户文件与Jar包上传等内容统一纳入Spark的RPC框架体系中.
Spark内置RPC框架的基本架构
TransportContext
内部包含传输上下文的配置信息TransportConf
和对客户端请求消息进行处理的RpcHandler
。
TransportConf
在创建TransportClientFactory
和TransportServer
时都是必须的,而Rpc.Handler
只用于创建TransportServer
。
①. TransportClientFactory
是RPC客户端的工厂类。
②. TransportServer
是RPC服务端的实现.
图中记号的含义如下:
① 表示通过调用TransportContext
的createClientFactory
方法创建传输客户端工厂TransportClientFactory
的实例.
在构造TransportClientFactory
的时候,还会传递客户端引导程序TransportClientBootstrap
的列表。
此外,TransportClientFactory
内部还存在针对每一个Socket地址的连接池ClientPool
,这个连接池的定义如下:
private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
ClientPool
定义如下:
private static class ClientPool {
// ClientPool由TransportClient构成
TransportClient[] clients;
// 与每一个TransportClient 一一对应的锁对象,
// 通过对每个TransportClient分别采用不同的锁,降低并发情况下线程间对锁的争用,减少阻塞,提高并发度
Object[] locks;
ClientPool(int size) {
clients = new TransportClient[size];
locks = new Object[size];
for (int i = 0; i < size; i++) {
locks[i] = new Object();
}
}
}
② 标示通过调用TransportContext
的createServer
方法创建传输服务端TransportServer
的实例.在构造 TransportServer
的实例时,需要传递TransportContext
,host
,port
,RpcHandler
以及服务端引导程序TransportServerBootstrap
的列表.
Spark RPC框架所包含的各个组件
TransportContext
: 传输上下文,包含了用于创建传输服务端(Transportserver
)和传输客户端工厂(TransportClientFactory
)的上下文信息,并支持使用TransportChannelHandler
设置Netty
提供的SocketChannel
的Pipeline
的实现.
TransportConf
: 传输上下文的配置信息
RpcHandler
: 对调用传输客户端(TransportClient
)的sendRPC
方法发送的消息进行处理的程序.
MessageEncoder
: 在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误.
MessageDecoder
: 对从管道中读取的ByteBuf
进行解析,防止丢包和解析错误.
TransportFreameDecoder
: 对从管道中读取的ByteBuf
按照数据帧进行解析.
RpcResponseCallBack
: RpcHandler
对请求的消息处理完毕后进行回掉的接口.
TransportClientFactory
: 创建TransportClient
的传输客户端工厂类.
ClientPool
: 在两个对等节点间维护的关于TransportClient
的池子.ClientPool
时TransportClientFacctory
的内部组件.
TransportClient
: RPC框架的客户端,用于获取预先协商好的流中的连续块.TransportClient
旨在允许有效传输大量的数据,这些数据将被拆分成几百KB到MB的块.TransportClient
处理从流中获取的块时,实际的设置是在传输层之外完成的.sendRPC
方法能够在客户端和服务端的统一水平线进行这些设置.
TransportClintBootstrap
: 当服务端响应客户端连接时在客户端执行一次的引导程序.
TransportRequestHandler
: 用于处理客户端的请求并写完块数据后返回的处理程序.
TransportChannelHandler
: 代理由TransportRequestHandler
处理的请求和由TransportResponseHandler
处理的响应,并传入传输层的处理.
TransportServerBootstrap
: 当客户端连接到服务端时在服务端执行一次的引导程序.
TransportServer
: RPC框架的服务端,提供高效,低级别的流服务
TransportConf
TransportConf
: 传输上下文的配置信息
Spark通常使用SparkTransportConf
创建TransportConf
,可以通过SparkTransportConf
的fromSparkConf
方法获取TransportConf
实例,获取时需要SparkConf
实例,module
名称,用于处理网络传输的内核数numUsableCores
SparkTransportConf
源码:
/**
* Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
* Driver, or a standalone shuffle service) into a TransportConf with details on our environment
* like the number of cores that are allocated to this JVM.
*/
object SparkTransportConf {defaultNumThreads
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
* @param _conf the [[SparkConf]]
* @param module the module name
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
* @param role optional role, could be driver, executor, worker and master. Default is
* [[None]], means no role specific configurations.
*/
def fromSparkConf(
_conf: SparkConf,
module: String,
numUsableCores: Int = 0,
role: Option[String] = None): TransportConf = {
val conf = _conf.clone
// specify default thread configuration based on our JVM's allocation of cores (rather than
// necessarily assuming we have all the machine's cores).
val numThreads = NettyUtils.(numUsableCores)
// override threads configurations with role specific values if specified
// config order is role > module > default
Seq("serverThreads", "clientThreads").foreach { suffix =>
val value = role.flatMap { r => conf.getOption(s"spark.$r.$module.io.$suffix") }
.getOrElse(
conf.get(s"spark.$module.io.$suffix", numThreads.toString))
conf.set(s"spark.$module.io.$suffix", value)
}
new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)
override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
conf.getAll.toMap.asJava.entrySet()
}
})
}
}
fromSparkConf
最终构造TransportConf
对象时传递的ConfigProvider
为实现get方法的匿名内部类,get的实现实际是代理了SparkConf的get方法。
TransportClientFactory
TransportClientFactory
是创建TransportClient
的工厂类。
TransportContext
的createClientFactory
方法可以创建出TransportClientFactory
实例。
代码清单:
/**
* Initializes a ClientFactory which runs the given TransportClientBootstraps prior to returning
* a new Client. Bootstraps will be executed synchronously, and must run successfully in order
* to create a Client.
*/
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
return new TransportClientFactory(this, bootstraps);
}
public TransportClientFactory createClientFactory() {
return createClientFactory(new ArrayList<>());
}
TransportClientFactory
的函数实现比较重要,理解其中的参数的含义。
客户端引导程序TransportClientBootstrap
TransportClientFactory
的clientBootstraps
属性是TransportClientBootstrap
的列表。TransportClientBootstrap
是在TranportClient
上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。TransportClientBootstrap
所做的操作往往是昂贵的,好在建立的连接可以重用。
public interface TransportClientBootstrap {
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}
创建RPC客户端TransportClient
有了TransportClientFactory
,Spark的各个模块就可以使用它创建RPC客户端TransportClient
。每个TransportClient
实例只能和一个远端的RPC服务通信,所以Spark中的组件如果想要和多个RPC服务通信,就需要持有多个TransportClient
实例。创建TransportClient
的方法如下(实际为从缓存中获取TransportClient
):
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
//创建InetSocketAddress
final InetSocketAddress unresolvedAddress =
InetSocketAddress.createUnresolved(remoteHost, remotePort);
// Create the ClientPool if we don't have it yet.
ClientPool clientPool = connectionPool.get(unresolvedAddress);
if (clientPool == null) {
connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
clientPool = connectionPool.get(unresolvedAddress);
.......
}
1)调用InetSocketAddress
的静态方法createUnresolved
构建InetSocketAddress
(这种方式创建InetSocketAddress
,可以在缓存中已经有TransportClient时避免不必要的域名解析),然后从connectionPool
中获取与此地址对应的ClientPool
,如果没有则需要新建ClientPool
,并放入缓存connectionPool
中。
2)根据numConnectionsPerPeer的大小(使用“spark.+模块名+.io.numConnectionsPerPeer”属性配置),从ClientPool中随机选择一个TransportClient
3)如果ClientPool的clients数组中在随机产生的索引位置不存在TransportClient或者TransportClient没有激活,则进入第5步,否则对此TransportClient进行第4步的检查。
4)更新TransportClient的channel中配置的TransportChannelHandler的最后一次使用时间,确保channel没有超时,然后检查TransportClient是否是激活状态,最后返回此TransportClient给调用方。
5)由于缓存中没有TransportClient可用,于是调用InetSocketAddress的构造器创建InetSocketAddress对象(直接使用InetSocketAddress的构造器创建InetSocketAddress会进行域解析),在这一步骤多个线程可能会产生竞态条件(由于没有同步处理,所以多个线程极有可能同时执行到此处,都发现缓存中没有TransportClient可用,于是都使用InetSocketAddress的构造器创建InetSocketAddress)
6)第5步创建InetSocketAddress的过程中产生的竞态条件如果不妥善处理,会产生线程安全问题,所以到了ClientPool的locks数组发挥作用的时候了。按照随机产生的数组索引,locks数组中的锁对象可以对clients数组中的TransportClient一对一进行同步。即使之前产生了竞态条件,但是这一步只能有一个线程进入临界区。在临界区内,先进入的线程调用重载的createClient方法创建TransportClient对象并放入ClientPool的clients数组中。当率先进入临界区的线程退出临界区后,其它线程才能进入,此时发现ClientPool的clients数组中已经存在了TransportClient对象,那么将不再创建TransportClient,而是直接使用它。
上述代码整个执行过程实际解决了TransportClient
缓存的使用及createClient
方法的线程安全问题,并没有涉及创建TransportClient
的实现。TransportClient
的创建过程在重载的createClient
方法:
创建TransportClient步骤如下:
1)构建根引导程序Bootstrap并对其进行配置
2)为根引导程序设置管道初始化回调函数,此回调函数将调用TransportContext的initializePipeLine方法初始化Channel的pipeline。
3)使用根引导程序连接远程服务器,当连接成功对管道初始化时会回调初始化回调函数,将TransportClient和Channel对象分别设置到原子引用clientRef与channelRef中
4)给TransportClient设置客户端引导程序,即设置TransportClientFactory中的TransportClientBootstrap列表
5)返回此TransportClient对象
RPC服务端TransportServer
TransportServe
r是RPC框架的服务端,可提供高效、低级别的流服务,TransportContext
的createServer
方法用于创建TransportServer
,其实现如下:
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, null, port, rpcHandler, bootstraps);
}
管道初始化
在创建TransportClient
和对TranportServer
初始化的实现中,都在管道初始化回调函数中调用了TranportContext
的initializePipeline
方法,initializePipeline
方法将调用Netty的API对管道初始化。
TransportChannelHandler详解
服务端RpcHandler详解
由于TransportRequestHandler
实际是把请求消息交给RpcHandler进行处理,RpcHandler是一个抽象类,定义了一些RPC处理器的规范,代码下:
public abstract class RpcHandler {
private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
//抽象方法,用来接收单一的RPC消息,具体处理逻辑需要子类去实现
public abstract void receive(
TransportClient client,
ByteBuffer message,
RpcResponseCallback callback);
//获取Streammanager,StreamManager可以从流中获取单个的块,因此它也包含着当前正在被TransportClient获取的流的状态
public abstract StreamManager getStreamManager();
//重载receive方法,RpcResponseCallback为默认的ONE_WAY_CALLBACK
public void receive(TransportClient client, ByteBuffer message) {
receive(client, message, ONE_WAY_CALLBACK);
}
//当与给定客户端相关联的channel处于活动状态时调用
public void channelActive(TransportClient client) { }
//当与给定客户端相关联的channel处于非活动状态时调用
public void channelInactive(TransportClient client) { }
//当channel产生异常时调用
public void exceptionCaught(Throwable cause, TransportClient client) { }
}
服务端引导程序TransportServerBootstrap
TransportServer
的构造器中的bootstraps
是TranportServerBootstrap
的列表。接口TransportServerBootstrap
定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。TransportServerBootstrap的定义如下:
public interface TransportServerBootstrap {
RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}
客户端TransportClient详解
学习完服务端RpcHandler对请求消息的处理后,接下来学习客户端发送RPC请求的原理。 TransportContext的createChannelHandler方法中调用了TransportClient的构造器,其中TranportResponseHandler的引用将赋给handler属性。
public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel);
this.handler = Preconditions.checkNotNull(handler);
this.timedOut = false;
}
TranportClient一共有5个方法用于发送请求,分别如下:
1)fetchChunk:从远端协商好的流中请求单个块
2)stream:使用流的ID,从远端获取流数据
3)sendRpc:向服务端发送RPC的请求,通过At least Once Delivery原则保证请求不会丢失
4)sendRpcSync:向服务端发送异步的RPC请求,并根据指定的超时时间等待响应
5)send:向服务端发送RPC的请求,但是并不期望能获取响应,因此不能保证投递的可靠性
事件总线
事件总线介绍
Spark 定义了一个特质 ListenerBus
,可以接受事件并且将事件提交到对应事件的监听器。
该特征主要有一个 listeners
成员,用于维护所有注册的监听器,其数据结构是一个线程安全的 CopyOnWriteArrayList[L]
。
该特征还有几个主要的函数:
- addListener:添加 listener
- doPostEvent:给特定 listener 发送事件,该方法具体需要子类实现
- findListenersByClass:根据类型查找 listener 列表
- postToAll: 把事件发送给所有的 listener,虽然 CopyOnWriteArrayList 是线程安全的,但 postAll 引入了“先检查后运行”的逻辑,因此该方法不是线程安全的。
- removeListener:删除 listener
- removeListenerOnError:内部调用 removeListener,可由子类覆盖
ListenerBus 继承体系
每个 ListenerBus
用于将不同的 Event
投递到不同的Listener
中,下面以主要分析下 LiveListenerBus
。
LiveListenerBus 详解
LiveListenerBus 继承 SparkListenerBus,和其他 ListenerBus 不同的是, LiveListenerBus 是将事件都放到一个队列中,然后另外一个线程不断从队列获取事件,将事件异步投递给监听器,达到实时刷新UI界面数据的效果。
LiveListenerBus 中的属性:
// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.createWithDefault(10000)
-
eventQueue:是 SparkListenerEvent 事件的阻塞队列,队列大小可以通过 Spark 属性 spark.scheduler.listenerbus.eventqueue.size 进行配置,默认为 10000;
-
started:标记 LiveListenerBus 的启动状态的 AtomicBoolean 类型的变量;
-
stopped:标记LiveListenerBus的停止状态的 AtomicBoolean 类型的变量;
-
droppedEventsCounter:使用 AtomicLong 类型对删除的事件进行计数,每当日志打印了 droppedEventsCounter 后,会将 droppedEventsCounter 重置为0;
-
lastReportTimestamp:记录最后一次日志打印 droppedEventsCounter 的时间戳;
-
processingEvent:暗示当前正有事件在被 listenerThread 线程处理;
-
logDroppedEvent:标记是否由于 eventQueue 已满,导致新的事件被删除;
-
eventLock:表示队列中事件产生和消费的一个计数器,当有新的事件到来时释放信号量,当对事件进行处理时获取信号量,eventLock = new Semaphore(0);
-
listenerThread:异步处理事件的线程;
异步事件处理线程
private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}
代码不算复杂,主要逻辑是:
- 设置为 daemon thread;
- 不断获取信号量,如果没有就会阻塞,有信号释放才会往下运行(这是依靠 new Semaphore(0)实现的,在 spark 后面的版本中,是直接用阻塞队列的 take() 方法实现。);
- 同步控制,将 processingEvent 设置为 true;
- 从 eventQueue 中获取事件;
- 调用超类 ListenerBus 的 postToAll 方法,对监听器进行遍历,并调用 SparkListenerBus 的 doPostEvent 方法对事件进行匹配后执行监听器的相应方法;;
- 每次循环结束同步控制,将 processingEvent 设置为 false;
异步事件处理线程的事件来源
DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint 及 LocalSchedulerBackend 都是 LiveListenerBus 的事件来源,它们都是通过调用 LiveListenerBus 的 post 方法将消息交给异步线程 listenerThread 处理的。
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}
val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}
流程总结