aliyun / aliyun-log-flink-connector Goto Github PK
View Code? Open in Web Editor NEWflink log connector
License: Apache License 2.0
flink log connector
License: Apache License 2.0
When to support metrics?
配置如下:
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "write_to_es");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "1000");
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
DataStream logTestStream = env.addSource(
new FlinkLogConsumer<>(deserializer, configProps));
logTestStream.addSink(new ProcessLog());
env.execute();
We must enable checkpointing for Flink to support checkpointing now. However, user may want to not enable Flink's checkpointing or want to update checkpoint ASAP.
什么时候支持有序消费啊
用这个消费日志,从RawLog读取的字段,缺失topic等信息。
用loghub-client-lib消费没有这个问题。
如何消费指定时间段数据?比如今天想消费昨天2点 到 3点的数据,如何处理
A NPE thrown from here:
目前的 shard 分配给 subTask 的逻辑,和 kafka 的 partition 分配逻辑类似,是按照 shard ID 取余匹配 subTaskIndex 的,但是 partition 一定是有序的,而 shard ID 很难做到 100% 有序。如下分配代码:
private List<LogstoreShardMeta> listAssignedShards() throws Exception {
List<String> logstores = getLogstores();
List<LogstoreShardMeta> shardMetas = new ArrayList<>();
for (String logstore : logstores) {
List<Shard> shards = logClient.listShards(project, logstore);
for (Shard shard : shards) {
LogstoreShardMeta shardMeta = new LogstoreShardMeta(logstore, shard.GetShardId(), shard.getStatus());
if (shardAssigner.assign(shardMeta, totalNumberOfSubtasks) % totalNumberOfSubtasks == indexOfThisSubtask) {
shardMetas.add(shardMeta);
}
}
}
return shardMetas;
}
当 shard ID 不连续时,存在分配不均衡的问题。如我们线上的 sls 的 shard ID 如下:
会造成 task-0、task-5、task-6 空跑,如:
Currently, the request limiting logic is not reasonable for customer who's logstore is only response a few loggroups.
boolean genFetchTask = true;
if(mLastFetchRawSize < 1024 * 1024 && mLastFetchCount < 100 && mLastFetchCount < mMaxFetchLogGroupSize)
{
genFetchTask = (System.currentTimeMillis() - mLastFetchTime > 500);
}
else if(mLastFetchRawSize < 2 * 1024 * 1024 && mLastFetchCount < 500 && mLastFetchCount < mMaxFetchLogGroupSize)
{
genFetchTask = (System.currentTimeMillis() - mLastFetchTime > 200);
}
else if(mLastFetchRawSize < 4 * 1024 * 1024 && mLastFetchCount < 1000 && mLastFetchCount < mMaxFetchLogGroupSize)
{
genFetchTask = (System.currentTimeMillis() - mLastFetchTime > 50);
}
if(genFetchTask)
{
mLastFetchTime = System.currentTimeMillis();
LogHubFetchTask task = new LogHubFetchTask(mLogHubClientAdapter,mShardId, mNextFetchCursor, mMaxFetchLogGroupSize);
mFetchDataFuture = mExecutorService.submit(task);
}
else
{
mFetchDataFuture = null;
}
LOG_ACCESSSKEYID should be LOG_ACCESSKEYID
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.