dubin555 / blog Goto Github PK
View Code? Open in Web Editor NEWJust use the issue of the repo as a blog
Just use the issue of the repo as a blog
如果是静态数据, 例如要算一个List的topK, 最简单的方法就是用优先队列, 这里用PriorityQueue代替, 如果是topK大, 那就是最小堆来解决
for (int i: list) {
if (queue.size() < k) {
queue.offer(i);
} else {
if (i > queue.peek()) {
queue.poll();
queue.offer(i);
}
}
}
如果问题变成了, 一个动态的字符串流, 需要计算实时的当前word count的topK,该怎么实现呢?
string stream --> hashmap --> priorityqueue
受限于单机内存
没有持久化
对于每一个word, 都有一个对应的count, 当接到一个word时, 相应的count自增
对于QPS不高的应用, 足够了
Master --> hash --> Slave, 将word count表进行水平拆分, 对于client想获得topK时, 要merge各个分表的topK
QPS取决于各个分表的读写QPS
针对于单机HashMap方案受限于单机内存, 可进行如下的改进:
Client --> put word: Master --> hash --> slave --> update slave local hashmap
Client --> get topK: Merge every local hashmap to get topK
针对于单机hashmap不能扩容的问题, 改进成根据hash%(N台slave)的方式。
仍然有容错的问题
一般实时的topK问题, 是没有强一致的需求的,并且K一般都是可变的,有可能是3, 有可能是5, 这时可以有一些改进。
首先针对K可变的问题, 将hashmap + priorityqueue 方案换成treemap实现即可。
针对容错的问题, 可以将内存的实现替换成cache --> async task --> storage.
针对于没有强一致的需求, 可定期将内存数据同步到持久化存储, 这个思路跟接下来的checkpoint类似。
如果遇到slave下线的情况, 这里有两个思路
这里讨论了两个容错的方案, 从实现的角度难度来说, 利用现有开源方案是更好的选择, 例如方案
如果是大部分的word, 都只出现1次或者很少, 要怎么优化
可以通过Bloom Filter先过滤一部分, 这样会有一部分误判的概率, 不过不影响最终结果。
生产者和消费者的速度不匹配, 这时候就需要回压机制(也有翻译成背压)
Producer —> Queue —> Consumer
100event/s vs 50event/s
如果说想做一个回压系统, 自然对producer和consumer的逻辑侵入越少越好。首先能想到的就是利用缓冲, 这样producer和consumer的逻辑都不会进行任何变动。
当设计缓冲时, 单机的缓冲可以是简单的内存队列或者分布式的kafka, 对于缓冲, 最理想的肯定是无限的缓冲, 但是一旦缓冲是无限的, 并且长期生产者速率大于消费者, 这样的话, 部分消息永远都不会消费, 失去了无限缓冲的意义, 同时由于有限的IO, DISK资源, 无限的缓冲也是不可能的, 所以缓冲也是有限的, 最终还是要做相应的控制和丢弃策略。
我所知道的几个常见的大数据系统在回压时所用的不同的解决方案
在实际生产中可以参考以上成熟系统的方式来定义自己的回压处理。
先说下场景, 我们有实时数据需要用Structured Streaming 写入ElasticSearch, 写入峰值速度大概10,000条/s,某晚开始, 出现大量Request timeout报警。
class Flush implements Runnable {
@Override
public void run() {
synchronized (BulkProcessor.this) {
if (closed) {
return;
}
if (bulkRequest.numberOfActions() == 0) {
return;
}
execute();
}
}
}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
});
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore
toRelease.run();
}
}
}
其中的latch.await() 无超时的阻塞, 初步怀疑是这里导致的, 某种原因, onResponse 和 onFailure都没有回调。
可以看到执行写入操作的是Retry这个类, 其中的execute方法是在当前线程运行的, retry方法是在scheduler中执行的,scheduler是线程为1 的线程池,
execute方法
public void execute(BulkRequest bulkRequest) {
this.currentBulkRequest = bulkRequest;
consumer.accept(bulkRequest, this);
}
private void retry(BulkRequest bulkRequestForRetry) {
assert backoff.hasNext();
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME);
}
也就是说当每条数据需要写入到ES的时候, 检查当前满足写入条件(大小, 条数)--> 当前线程执行写入操作 --> 如果出错并且配置了retry policy的话 --> 由core size为1的scheduler来执行重试操作
接下来看Flush操作, Flush 也是由scheduler来执行的
race condition
client线程 --- add -- 当前线程尝试写入-- 失败 -- 由scheduler执行retry操作, 自己阻塞在countdownlatch的await方法
时间到达fixed rate的时间, Flush在scheduler开始运行, 但是没有获得对象锁, 等待add方法释放锁。死锁发生。
条件发生比较苛刻, 我们发生的时候也是由于ES 频繁GC导致了大量的超时和重试。
也有人已经发现了这个bug, es的issue 47599 和 44556
按照社区的建议, 关掉了retry policy
日志有相应的timestamp_date和arrive_date, timestamp_date表示该记录发生在哪一天, arrive_date表示日志到达日期, 目前每天的到达记录统一放在一张表, 以日期结尾, 例如, XXX_2018-07-01的表中包含了所有arrive_date是2018-07-01的日期, 但因为数据延迟的原因, 其中包含了很多timestamp是其他天的数据,例如2018-06-XX的数据。 我们处理时间用process_date来表示, 目前数据是T+1的, 即今天处理昨天的全量数据。
处理的日志需要做些聚合操作, 举例,{“timestamp”:”2018-06-30”, “name”:”db”,”age”:29, “category”:”Genius”, “arrive_date”:”2018-07-01"}
这样的一条日志, 会落在XXX_2018-07-01的表中, 如果希望统计的是对category进行聚合操作得到avg(age), 我们希望得到的相应地聚合结果的primary key包含timestamp, arrive_date和category, 增量的落到DB当中。 当数据延迟出现时, 也不会对记录造成影响, 会增量的补数据进去。
那怎么证明这样做是对的呢? 用循环不变式来证明一下。证明这样做是没错的, 即证明每条日志都不会对其他同类别的日志产生影响, 每当同类别的其他日志出现的时候, 该日志都会参与计算, 并覆盖最终的结果。
针对任一条日志, 当处理系统(例如Spark), 未读取该条记录的时候(若实时读取, 有可能该记录还未写入), 该条记录不会影响现有结果(会缺数据, 但不会导致错误)。
当该条日志到达时, 该条日志与其他同类别日志都参与了计算,因为相同的arrive_date的数据都在一张表中,当process_date = arrive_date + 1时, 该表是完整的, 并覆盖了最终结果(靠primary key或者rowkey保证)。
当process_date > arrive_date + 1 时, 即数据到达的时间的数据表已经处理过了,新到的数据的arrive_date > 该条日志的arrive_date, 即跟该条日志同类别的数据不会再到达。
近期完成了公司实时作业平台的构建, 把一些经验先记录一下。
Server 端采用的是Flume 和 Logstash
Native端采用的自研的一个go service
公认的Kafka
鉴于公司对于Spark技术栈比较熟, 批处理也有相应的积累, 所以大部分常用的场景采用的都是Structured streaming, 一小部分需要session window 或者 CEP的业务采用Flink完成。
Kafka, Phonenix, Ignite, Oracle
目前日志量大概每天几T, 大概可以分为用量日志, 质量日志, 行为日志等等。 需要用这些日志来进行实时统计, 告警。 需要流-流join, 流-维度表join的场景。
大概使用了 Akka + Zookeeper + Redis + Spark + Spring Boot.
每个Actor都将自己的地址向Zookeeper注册,用户提交job配置的时候, 会通过一个query balancer 来找到自己合适的spark session来执行任务, 用户停止job或者查看job状态的时候也会通过query balancer。
这里面还是有不少坑的, 例如
等等, 其他的也说不明白了, 只能说坑不少。做完是费了很多劲的。
谢谢我自己, 之前也做过类似批处理的, 所以这次做实时的第一版大概一周就写好了, 结果发现,实时的要比批的难多了。。。改了2周多。。。半个月前完成的差不多了。前端还写了一周多, 写不明白了, 放弃了React, 找前端同事写了。
已经上线了。
下次把做批处理的和Flink的也记录下。
如何设计一个带有ttl功能的HashMap, 例如一个ttl为5s的HashMap, 当put(key, value) 之后, 5秒内, get(key)为value, 5秒后,get(key) 为空, 其中ttl的时间不是那个严格, 主要是为了限制HashMap所用的内存不是无限增长。
按照环形时间轮的思路, 将一个HashMap,分成12个HashMap, curBucket指向当前的桶。
当put元素时, 根据当前的seconds%12, 将元素插入到指定的HashMap当中,同时更新当前的curBucket。
当get元素时, 查找curBucket及其之前的一个bucket。
在此基础上, 异步删除过期数据(不属于curBucket和之前一个bucket的数据)
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TtlMap<K, V> {
private final int N; // count of bucket
private final int ttl;
private volatile int curBucket = -1;
private volatile int prevBucket = -1;
private final List<Map<K, V>> maps;
private final ScheduledExecutorService cleanService;
public TtlMap(int ttl) {
this.N = 60 / ttl;
this.ttl = ttl;
this.maps = new ArrayList<>(this.N);
for (int i = 0; i < N; i++) {
maps.add(i, null);
}
cleanService = Executors.newSingleThreadScheduledExecutor();
cleanService.scheduleAtFixedRate(
() -> cleanOldData()
, 0, ttl, TimeUnit.SECONDS);
}
public void put(K key, V value) {
long seconds = System.currentTimeMillis()/1000;
int bucket = (int) seconds % N;
if (bucket != curBucket) {
// move forward the curBucket
curBucket = bucket;
prevBucket = getPrev(curBucket);
}
if (maps.get(curBucket) == null) {
maps.add(curBucket, new HashMap<>());
}
maps.get(curBucket).put(key, value);
}
public V get(K key) {
if (curBucket == -1 || prevBucket == -1) return null;
if (maps.get(curBucket) != null && maps.get(curBucket).containsKey(key)) {
return maps.get(curBucket).get(key);
}
if (maps.get(prevBucket) != null && maps.get(prevBucket).containsKey(key)) {
return maps.get(prevBucket).get(key);
}
return null;
}
private int getPrev(int i) {
return i != 0? i - 1: N - 1;
}
private void cleanOldData() {
for (int i = 0; i < N; i++) {
// bypass the current and prev bucket
if (i == curBucket || i == prevBucket) continue;
this.maps.set(i, null);
}
}
public void stop() {
cleanService.shutdown();
for (int i = 0; i < N; i++) {
// help GC
maps.set(i, null);
}
}
public static void main(String[] args) throws Exception {
TtlMap<String, String> map = new TtlMap<>(5);
map.put("key1", "value1");
map.put("key2", "value2");
System.out.println("0 seconds, get the key1 and key2");
System.out.println("key1 --> " + map.get("key1") + "\tkey2--> " + map.get("key2"));
Thread.sleep(5000);
System.out.println("5 seconds, update key2");
map.put("key2", "value2");
Thread.sleep(2000);
System.out.println("7 seconds, get the key1 and key2");
System.out.println("key1 --> " + map.get("key1") + "\tkey2--> " + map.get("key2"));
map.put("key3", "value3");
Thread.sleep(5000);
System.out.println("12 seconds");
System.out.println("key1 --> " + map.get("key1") + "\tkey2--> " + map.get("key2"));
map.stop();
}
}
0 seconds, get the key1 and key2
key1 --> value1 key2--> value2
5 seconds, update key2
7 seconds, get the key1 and key2
key1 --> null key2--> value2
12 seconds
key1 --> null key2--> null
微信公众号:进击的大数据
关注大数据方面技术。问题或建议,请公众号留言。
Monitoring 指的是监控, 重点强调系统是不是正在工作。
Observability 更强调可观测, 例如某一个组件或者servie挂了, 能不能追踪到根源。
本文的起因也是因为看到了一个不错的视频,这里主要是照顾下没法fq的同志, 文章会对视频里的内容有小部分的不同, 有些是自己的理解。
能fq的同志可以看下原版的视频。
地址:https://www.youtube.com/watch?v=ACL_YVPD3gw
主要分为四个部分
健康监控主要的目的在于:
完成healthcheck的方式有很多种,可以是广播模式,例如一个负责监控的service定期向所有的agent发请求,得到所有agent的health情况,再做进一步汇总,也可以是Register模式,例如利用Zookeeper或者etcd来检测健康状况。
这两种我都做过,区别在于:
GET http://1.1.1.1:8080/health
200 OK
{
"service": "registration-service",
"healthy":true,
"workload":{"healthy":true},
"dependencies":[
{"name":"cassandra","healthy":true},
{"name":"billing-service","healthy":true}
]
}
metric的scope可以是从底层到更上层的:
System metrics(CPU, memory) -> Application metrics(Error rates) -> Business metrics
metric收集:
Metrics --> Metrics collector && Metrics query engine <-- Dashboard, Alert
开源的方案可以参照Prometheus & Grafana
这里有两点需要说明一下:
ERROR [SVC=A][trace=a1b2c3] Failed to process order cause:xxxx
ERROR [SVC=F][trace=a1b2c3] Failed to process order cause:xxxx
ERROR [SVC=G][trace=a1b2c3] Failed to process order cause:xxxx
ERROR [SVC=B][trace=a1b2c3] Failed to process order cause:DB timeout
结构化的日志才能有威力, 例如某个版本的代码, 或者某个数据中心挂了导致的metric下降, 总体来说体现不出来, 要具体的细化才能反应出来
很早以前就看过airflow的代码, 由于公司提供的离线平台功能少, 部分任务依赖的解决方案其实是参照airflow的, 当时就粗略读过airflow 0.1版的代码, 最近又重新看了看, 这里记录一下。
这里我主要关心4个问题
首先看到airflow的run方法, 跟进去, 里面TaskInstance的run方法,TaskInstance的属性
__tablename__ = "task_instance"
task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
start_date = Column(DateTime)
end_date = Column(DateTime)
duration = Column(Integer)
state = Column(String(20))
try_number = Column(Integer)
hostname = Column(String(1000))
unixname = Column(String(1000))
基本是表达当前任务的状态。
TI的run方法有点长, 基本是各种判断,最后走到 task_copy.execute(self.execution_date) , 里面是 各个operator的execute,这里对于普通的operator来说, 只要执行就可以了, 看到这里就可以回答第四个问题, 对于0.1版本来说, MySQL, Hive operator会把连接信息存到DB里, 这里利用Python得到MySQL或者Hive的记录。
对于第二个问题, 外部依赖,可以看到这样的代码
class BaseSensorOperator(BaseOperator):
@apply_defaults
def __init__(
self,
poke_interval=60,
timeout=60*60*24*7,
*args, **kwargs):
super(BaseSensorOperator, self).__init__(*args, **kwargs)
self.poke_interval = poke_interval
self.timeout = timeout
# Since a sensor pokes in a loop, no need for higher level retries
self.retries = 0
def poke(self):
'''
Function that the sensors defined while deriving this class should
override.
'''
raise Exception('Override me.')
def execute(self, execution_date):
started_at = datetime.now()
while not self.poke():
sleep(self.poke_interval)
if (datetime.now() - started_at).seconds > self.timeout:
raise Exception('Snap. Time is OUT.')
logging.info("Success criteria met. Exiting.")
如果poke返回false的话, 会等待直到poke返回true。
以上是对于单个dag里的单个task的执行流程。
那如果是调度任务呢, 需要一直运行的?
这里是一个MasterJob, 会进到_execute方法,里面大概是
while True:
for dag in dags:
for task in dag.tasks:
if task.is_runnable():
execute()....
这里就可以回答其余的问题, job间依赖和重试怎么解决
def is_runnable(self):
"""
Returns a boolean on whether the task instance has met all dependencies
and is ready to run. It considers the task's state, the state
of its dependencies, depends_on_past and makes sure the execution
isn't in the future.
"""
if self.execution_date > datetime.now() - self.task.schedule_interval:
return False
elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
return False
elif self.state in State.runnable() and self.are_dependencies_met():
return True
else:
return False
会检查DB中的上游task是不是已经完成, 还有执行的合法与否。
结束。
在数据平台部门中, 当需求井喷了之后, 对每个需求做定制化的编码已经不现实了。这时候一般都需要与业务部门合作,这时最好有一个可视化的开发UI, 后端有个service层, 用户所有对数据处理的逻辑通过配置文件,或者纯SQL来表达。这时用户有看实时结果的需求来验证代码或者配置的正确性。
在其他博客有看到通过websocket来拿当前的spark日志, 包括spark运行时的日志, 这样就可以把streaming的sink改为console模式来获得一部分的日志。
不足之处在于, console模式获得的日志其实是dataframe.show()的结果, 对于前端的交互可视化会比较差。那可不可以做到运行时获得一部分数据并以类似json的方式传回来
先看下console的实现方式,从start方法入手:
def start(): StreamingQuery = {
...
if (source == "memory") {
...
} else if (source == "foreach") {
...
} else {
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
val sink = ds.newInstance() match {
case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
case _ =>
val ds = DataSource(
df.sparkSession,
className = source,
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
ds.createSink(outputMode)
}
...
其中有个lookupDataSource方法,
case sources =>
// There are multiple registered aliases for the input. If there is single datasource
// that has "org.apache.spark" package in the prefix, we use it considering it is an
// internal datasource within Spark.
val sourceNames = sources.map(_.getClass.getName)
val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
if (internalSources.size == 1) {
logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
internalSources.head.getClass
} else {
throw new AnalysisException(s"Multiple sources found for $provider1 " +
s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
}
看来类名要以org.apache.spark开头才行, 按console的方式, 实现一下自己的Debug sink,
class DebugWriter(schema: StructType, options: DataSourceOptions) extends StreamWriter {
assert(SparkSession.getActiveSession.isDefined)
protected val spark = SparkSession.getActiveSession.get
implicit val formats = org.json4s.DefaultFormats
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
printRows(messages, schema, s"Batch: $epochId")
}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
protected def printRows(
commitMessages: Array[WriterCommitMessage],
schema: StructType,
printMessage: String): Unit = {
val rows = commitMessages.collect {
case PackedRowCommitMessage(rs) => rs
}.flatten
val sample = spark
.createDataFrame(rows.take(10).toList.asJava, schema)
.toJSON
.collect()
val message = Serialization.writePretty(Map("message" -> sample))
// scalastyle:off println
println("-------------------------------------------")
println(message)
println("-------------------------------------------")
// scalastyle:off println
}
}
checkpoint 的地址必须要指定, 因为源代码中可以看到useTempCheckpointLocation = source == "console",
随机一个/tmp下的目录即可
这样json格式的debug用的部分dataframe就可以打印出来了, 我们可以上传到redis或者kafka用于前端画图, 方便用户debug, 查看列类型等等。
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.