Hello,
I'm trying to figure out how to use in a good way tranquility in a Spark Streaming Context.
In the Tranquility Spark Documentation, nothing is said regarding the batchDuration of the Spark Streaming Context.
I'd like to know if there a good practice about that batchDuration, ie : should we have that duration smaller/higher than the segmentGranularity(+ windowPeriod?) or the queryGranularity ?
From what I understand, it should not be correlated (thinking about avoiding too much data loss/duplication in case of a failure of a task), but I'd like to get some confirmation about that.
It brings me to the recommended and documented pattern for creating a BeamFactory
(ie, makeBeam
as a lazy val
), in a StreamingContext :
- The BeamFactory instance is created on the SparkDriver
- It is then serialized and sent to the Spark Executors
Then, for each partition of each RDD :
- That BeamFactory is deserialized
- The
BeamFactory.makeBeam
creates a new Curator/Zookeeper Connection (I was not sure about that, thinking each RDD of a given executor were sharing the same deserialized object, but adding a log line in the lazy val makeBeam = { ... }
shows that the method is executed for each partition for each new RDD)
Now, given a batchDuration of 1 second and some events filling 4 partitions, we'll get 4 new Curator/Zookeeper connections each second :
- Is that the desired behavior?
- If yes, are these Curator/Zookeeper client closed properly by the tranquilizer? (According to the
curator.close()
call in the Scala example, it is the user's responsability to close the curator. How to do that using the lazy val pattern?). And then, why asking for the user to start the curator using a lazy, when it may be better to start it in the foreachPartition of the BeamRDD.propagate
method and close it properly at the end?
- If no, it seems we got an issue here. I'm thinking about writing an CuratorPool object to avoid this problem. (not sure that it will work)
For the record, here is some spark executor logs (with 2 batches, given batchDuration = 10 seconds), showing the problem (each RDD is empty).
Please take a look to the "EventBeamFactory: Make Beam" and "EventBeamFactory: Start Curator" messages, which are logs added respectively at the very start of the lazy val makeBeam
initialization and just before the curator.start()
call.
After a while, we're getting a too many connection
error on Zookeeper
16/02/05 11:41:30 INFO CoarseGrainedExecutorBackend: Got assigned task 41
16/02/05 11:41:30 INFO Executor: Running task 1.0 in stage 10.0 (TID 41)
16/02/05 11:41:30 INFO TorrentBroadcast: Started reading broadcast variable 10
16/02/05 11:41:30 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 2.7 KB, free 33.7 KB)
16/02/05 11:41:30 INFO TorrentBroadcast: Reading broadcast variable 10 took 73 ms
16/02/05 11:41:30 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 5.0 KB, free 38.7 KB)
16/02/05 11:41:30 INFO KafkaRDD: Beginning offset 0 is the same as ending offset skipping example-topic1 0
16/02/05 11:41:30 INFO EventBeamFactory: Make Beam
16/02/05 11:41:30 INFO EventBeamFactory: Start Curator
16/02/05 11:41:30 INFO CuratorFrameworkImpl: Starting
16/02/05 11:41:30 INFO ZooKeeper: Initiating client connection, connectString=zookeeper:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@2752d6ff
16/02/05 11:41:30 INFO ClientCnxn: Opening socket connection to server zookeeper_1/172.17.0.6:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/05 11:41:30 INFO ClientCnxn: Socket connection established to zookeeper_1/172.17.0.6:2181, initiating session
16/02/05 11:41:30 INFO ClientCnxn: Session establishment complete on server zookeeper_1/172.17.0.6:2181, sessionid = 0x152b0a3979807d1, negotiated timeout = 40000
16/02/05 11:41:30 INFO ConnectionStateManager: State change: CONNECTED
16/02/05 11:41:30 INFO LoggingEmitter: Start: started [true]
16/02/05 11:41:30 INFO Executor: Finished task 1.0 in stage 10.0 (TID 41). 875 bytes result sent to driver
16/02/05 11:41:30 INFO CoarseGrainedExecutorBackend: Got assigned task 42
16/02/05 11:41:30 INFO Executor: Running task 2.0 in stage 10.0 (TID 42)
16/02/05 11:41:30 INFO KafkaRDD: Beginning offset 0 is the same as ending offset skipping example-topic2 0
16/02/05 11:41:30 INFO EventBeamFactory: Make Beam
16/02/05 11:41:30 INFO EventBeamFactory: Start Curator
16/02/05 11:41:30 INFO CuratorFrameworkImpl: Starting
16/02/05 11:41:30 INFO ZooKeeper: Initiating client connection, connectString=zookeeper:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@77b39141
16/02/05 11:41:30 INFO ClientCnxn: Opening socket connection to server zookeeper_1/172.17.0.6:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/05 11:41:30 INFO ClientCnxn: Socket connection established to zookeeper_1/172.17.0.6:2181, initiating session
16/02/05 11:41:30 INFO LoggingEmitter: Start: started [true]
16/02/05 11:41:30 INFO ClientCnxn: Session establishment complete on server zookeeper_1/172.17.0.6:2181, sessionid = 0x152b0a3979807d3, negotiated timeout = 40000
16/02/05 11:41:30 INFO ConnectionStateManager: State change: CONNECTED
16/02/05 11:41:30 INFO Executor: Finished task 2.0 in stage 10.0 (TID 42). 875 bytes result sent to driver
16/02/05 11:41:30 INFO BlockManager: Removing RDD 29
16/02/05 11:41:30 INFO BlockManager: Removing RDD 28
16/02/05 11:41:30 INFO BlockManager: Removing RDD 27
16/02/05 11:41:40 INFO CoarseGrainedExecutorBackend: Got assigned task 45
16/02/05 11:41:40 INFO Executor: Running task 1.0 in stage 11.0 (TID 45)
16/02/05 11:41:40 INFO TorrentBroadcast: Started reading broadcast variable 11
16/02/05 11:41:40 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 2.7 KB, free 41.4 KB)
16/02/05 11:41:40 INFO TorrentBroadcast: Reading broadcast variable 11 took 96 ms
16/02/05 11:41:40 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 5.0 KB, free 46.4 KB)
16/02/05 11:41:40 INFO KafkaRDD: Beginning offset 0 is the same as ending offset skipping example-topic1 0
16/02/05 11:41:40 INFO EventBeamFactory: Make Beam
16/02/05 11:41:40 INFO EventBeamFactory: Start Curator
16/02/05 11:41:40 INFO CuratorFrameworkImpl: Starting
16/02/05 11:41:40 INFO ZooKeeper: Initiating client connection, connectString=zookeeper:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@364ae98f
16/02/05 11:41:40 INFO ClientCnxn: Opening socket connection to server zookeeper_1/172.17.0.6:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/05 11:41:40 INFO LoggingEmitter: Start: started [true]
16/02/05 11:41:40 INFO ClientCnxn: Socket connection established to zookeeper_1/172.17.0.6:2181, initiating session
16/02/05 11:41:40 INFO ClientCnxn: Session establishment complete on server zookeeper_1/172.17.0.6:2181, sessionid = 0x152b0a3979807d6, negotiated timeout = 40000
16/02/05 11:41:40 INFO ConnectionStateManager: State change: CONNECTED
16/02/05 11:41:40 INFO Executor: Finished task 1.0 in stage 11.0 (TID 45). 875 bytes result sent to driver
16/02/05 11:41:40 INFO CoarseGrainedExecutorBackend: Got assigned task 47
16/02/05 11:41:40 INFO Executor: Running task 3.0 in stage 11.0 (TID 47)
16/02/05 11:41:40 INFO KafkaRDD: Beginning offset 0 is the same as ending offset skipping example-topic2 0
16/02/05 11:41:40 INFO EventBeamFactory: Make Beam
16/02/05 11:41:40 INFO EventBeamFactory: Start Curator
16/02/05 11:41:40 INFO CuratorFrameworkImpl: Starting
16/02/05 11:41:40 INFO ZooKeeper: Initiating client connection, connectString=zookeeper:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@2dd6535a
16/02/05 11:41:40 INFO ClientCnxn: Opening socket connection to server zookeeper_1/172.17.0.6:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/05 11:41:40 INFO LoggingEmitter: Start: started [true]
16/02/05 11:41:40 INFO ClientCnxn: Socket connection established to zookeeper_1/172.17.0.6:2181, initiating session
16/02/05 11:41:40 INFO ClientCnxn: Session establishment complete on server zookeeper_1/172.17.0.6:2181, sessionid = 0x152b0a3979807d8, negotiated timeout = 40000
16/02/05 11:41:40 INFO ConnectionStateManager: State change: CONNECTED
16/02/05 11:41:40 INFO Executor: Finished task 3.0 in stage 11.0 (TID 47). 875 bytes result sent to driver
16/02/05 11:41:40 INFO BlockManager: Removing RDD 32
16/02/05 11:41:40 INFO BlockManager: Removing RDD 31
16/02/05 11:41:41 INFO BlockManager: Removing RDD 30
and here is some of the BeamFactory code :
class EventBeamFactory extends BeamFactory[Map[String,Any]] {
@transient lazy val log = Logger.getLogger(getClass.getName)
lazy val makeBeam: Beam[Map[String,Any]] = {
log.info("Make Beam")
val curator = CuratorFrameworkFactory.newClient(
...,
new BoundedExponentialBackoffRetry(100, 3000, 5)
)
log.info("Start Curator")
curator.start()
DruidBreams.builder(...)
.druidBeamConfig(new DruidBeamConfig(randomizeTaskId=true))
.curator(curator)
.discoveryPath(...)
.location(...)
.rollup(...)
.tuning(...)
.buildBeam()
}
}