apache / seatunnel Goto Github PK
View Code? Open in Web Editor NEWSeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
Home Page: https://seatunnel.apache.org/
License: Apache License 2.0
SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
Home Page: https://seatunnel.apache.org/
License: Apache License 2.0
RDD vs DataFrame vs Dataset:
https://stackoverflow.com/questions/37301226/difference-between-dataset-api-and-dataframe
https://dzone.com/articles/the-dominant-apis-of-spark-datasets-dataframes-and
http://spark.apache.org/docs/latest/streaming-programming-guide.html
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
http://www.csdn.net/article/2015-04-03/2824407
https://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark
https://indatalabs.com/blog/data-engineering/convert-spark-rdd-to-dataframe-dataset
https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
配置解析
插件流程体系
Input,Filter,Output插件开发
全流程简化
中英文文档
[在这个节点上线]
各种IP库,我们除了要支持geoip2,还需要挑选1个国内的ip库做支持:
ipip.net【国外不准,国内准】
纯真ip数据库【国外不准,国内准】 http://www.cz88.net/, http://www.cnblogs.com/anpengapple/p/5384985.html
geoip2【国外准,国内不准】
filterConfObj.foreach遍历顺序不是按照配置文件中插件顺序从上至下遍历
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
https://www.slideshare.net/julesdamji/a-deep-dive-into-structured-streaming-in-apache-spark
example:
if [a.b.c != 3] {
} else {
}
references:
logstash configuration
http://enear.github.io/2016/03/31/parser-combinators/
https://github.com/sirthias/parboiled2
http://www.lihaoyi.com/fastparse/
https://gist.github.com/nicerobot/4189552
https://dzone.com/articles/rolling-your-own-dsl-in-scala
https://www.slideshare.net/abhijit.sharma/writing-dsls-in-scala
https://www.slideshare.net/holograph/a-field-guide-to-dsl-design-in-scala
http://jmespath.org/
http://www.antlr.org/
Search keyword: Grammars, workflows, DSL, expression, eval
https://interestinglab.github.io/waterdrop/#/
Description
主要涉及到的插件包括filter: split
, table
filter.Sql插件中sql
字段读缺失最后一个字符
app.conf
spark {
spark.streaming.batchDuration = 5
spark.master = "local[2]"
spark.app.name = "Waterdrop-1"
spark.ui.port = 13000
}
input {
kafka {
topics = "sinabip_test"
consumer.auto.offset.reset = "largest"
}
}
filter {
Split {
source_field = "raw_message"
fields = ["times", "info"]
}
Sql {
table_name = "test",
sql = "select info from test where info='hello'"
}
}
output {
Stdout {}
}
Main函数中打印的配置
[INFO] Parsed Config:
{
"filter" : [
{
"entries" : {
"fields" : [
"times",
"info"
],
"source_field" : "raw_message"
},
"name" : "Split"
},
{
"entries" : {
"sql" : "select info from test where info='hello",
"table_name" : "test"
},
"name" : "Sql"
}
],
"input" : [
{
"name" : "kafka"
}
],
"output" : [
{
"entries" : {},
"name" : "Stdout"
}
],
"spark" : {
"spark" : {
"app" : {
"name" : "Waterdrop-1"
},
"master" : "local[2]",
"streaming" : {
"batchDuration" : 5
},
"ui" : {
"port" : 13000
}
}
}
}
Antlr tutorial:
https://tomassetti.me/antlr-mega-tutorial/
http://sqtds.github.io/tags/antlr4/
https://alexecollins.com/antlr4-and-maven-tutorial/
http://meri-stuff.blogspot.com/2011/09/antlr-tutorial-expression-language.html#LexerBasics
http://progur.com/2016/09/how-to-create-language-using-antlr4.html
https://yq.aliyun.com/articles/11366
http://www.cnblogs.com/sld666666/p/6145854.html
http://blog.csdn.net/dc_726/article/details/45399371
https://github.com/antlr/antlr4/blob/master/doc/index.md
https://github.com/antlr/grammars-v4/blob/master/json/JSON.g4
https://plugins.jetbrains.com/plugin/7358-antlr-v4-grammar-plugin
https://stackoverflow.com/questions/21534316/is-there-a-simple-example-of-using-antlr4-to-create-an-ast-from-java-source-code
https://stackoverflow.com/questions/23092081/antlr4-visitor-pattern-on-simple-arithmetic-example
https://stackoverflow.com/questions/6487593/what-does-fragment-mean-in-antlr
http://floris.briolas.nl/floris/2008/10/antlr-common-pittfals/
https://github.com/odiszapc/nginx-java-parser
https://codevomit.wordpress.com/2015/04/25/antlr4-project-with-maven-tutorial-episode-3/
https://stackoverflow.com/questions/1931307/antlr-is-there-a-simple-example
https://stackoverflow.com/questions/29971097/how-to-create-ast-with-antlr4
Listener vs Vistor:
https://stackoverflow.com/questions/20714492/antlr4-listeners-and-visitors-which-to-implement?rq=1
http://jakubdziworski.github.io/java/2016/04/01/antlr_visitor_vs_listener.html
ANTLRv4: How to read double quote escaped double quotes in string?
nested boolean expression parsing:
https://stackoverflow.com/questions/25096713/parser-lexer-logical-expression
https://stackoverflow.com/questions/30976962/nested-boolean-expression-parser-using-antlr
parsing comment:
https://stackoverflow.com/questions/7070763/parse-comment-line?rq=1
https://stackoverflow.com/questions/28674875/antlr-4-how-to-parse-comments
http://meri-stuff.blogspot.com/2012/09/tackling-comments-in-antlr-compiler.html
design pattern: visitor
https://dzone.com/articles/design-patterns-visitor
books:
"The Definitive Antlr4 Reference"
我列举一下本周可以做的事情:
(1) Filter UDF:
a) 找到Spark SQL自带的所有UDF列表,看看这些UDF都能做什么,将来我们可以在Waterdrop的文档里引用这些用户可以直接用的UDF;
b) 我们计划实现的那些Filter,能不能同时提供对应的UDF,如果能该怎么做?
c) 我们的Filter与SparkSQL自带的UDF有没有重复,能不能复用。
(2) 确定 BaseFilter的最终接口定义;
一个思路是把所有的Filter插件整理一遍,看它们需要什么样的BaseFilter接口定义;
另一个思路是想明白,如果将来有一个用户要开发他自己的插件,他该如何利用BaseFilter的接口开发出自己的插件。
(3) 在流程代码中支持多个 input, output
(4) [先搞定前3个再看此条] 确定BaseInput, BaseOutput的接口定义,这个涉及到几个麻烦的技术点,下周再跟你说。
markdown + docsify
插件体系和具体的实现可以参考
logstash
hangout
Spark SQL UDF,UDAF
Hive UDF
flume
fluent
kafka stream
https://github.com/onurakpolat/awesome-bigdata#data-ingestion
Filter插件需求:
Distinct,sample
用户需求:
repartition 增加或减少并行度,输出文件个数。
scala code format:
http://scalameta.org/scalafmt/
travis.ci
debug 模式:能让用户很容易知道每个环节的数据变化
本地模式:利用spark 的local模式,方便用户debug和本地开发。
Filter过程可视化。
帮用户想好应用场景,并简化对应的部署和运行流程。
You can use the pipe() function on RDDs to call external code. It passes data to an external program through stdin / stdout. For Spark Streaming, you would do dstream.transform(rdd => rdd.pipe(...)) to call it on each RDD.
Flume, Http
Clone, Dict, Geoip
Http
https://docs.databricks.com/spark/latest/data-sources/index.html
https://www.slideshare.net/databricks/yin-huai-20150325meetupwithdemos
https://mapr.com/blog/spark-data-source-api-extending-our-spark-sql-query-engine/
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-datasource-api.html
http://www.spark.tc/exploring-the-apache-spark-datasource-api/
https://mapr.com/blog/how-integrate-custom-data-sources-apache-spark/
https://blog.cloudera.com/blog/2017/02/working-with-udfs-in-apache-spark/
支持在start-waterdrop.sh命令参数中使用spark配置,如运行资源的大小,优先级高于application.conf ,类似spark-submit。
example:
if [a.b.c != 3] {
} else {
}
references:
logstash configuration
http://enear.github.io/2016/03/31/parser-combinators/
https://github.com/sirthias/parboiled2
http://www.lihaoyi.com/fastparse/
https://gist.github.com/nicerobot/4189552
https://dzone.com/articles/rolling-your-own-dsl-in-scala
https://www.slideshare.net/abhijit.sharma/writing-dsls-in-scala
https://www.slideshare.net/holograph/a-field-guide-to-dsl-design-in-scala
http://jmespath.org/
http://www.antlr.org/
Search keyword: Grammars, workflows, DSL, expression, eval
中文文档完成度:
配置
(Garyelephant) 通用配置
Input插件
Filter插件
Output插件
英文文档完成度:
Waterdrop 未来5个重要的发展方向:
支持Flink/Bean 计算引擎
支持基于Flink 的【有状态】【实时】【聚合】计算(用户可指定时间粒度,纬度,指标)
交互式 UI(支持Pipeline的交互式构建、交互式的SQL执行,功能和性能诊断可视化工具)
基于应用场景的深入Spark/Flink 底层做性能优化。
扩大生产环境使用Waterdrop的公司规模(国内公司技术支持,英文社区推广)
自助化和交互式的问题诊断和性能优化,参考Alibaba Arthas
增加页面评论/讨论功能,如disqus
增加实时反馈:
https://www.hotjar.com
应用场景grok, split等filter
使用\s
pattern = "\s"
运行报错
使用\\s
pattern = "\s"
configParse会对\转义
当前目录结构
filter/
├── BaseFilter.scala
└── Split.scala
└── Sql.scala
在具体文件上面加一级document会不会好点, 因为有的插件代码不可能都在一个文件里面完成,如果都放在一起的话, 不好管理。以这个grok
为例
filter/
├── BaseFilter.scala
├── grok
│ ├── Grok.scala
│ └── PatternGrok.scala
├── split
│ └── Split.scala
└── sql
└── Sql.scala
target_field
为***ROOT***时, 由于需要从原始数据获取schema
,导致目前没有解决办法spark升级到2.2之后由于scala升级到2.11导致无法兼容spark1.6
兼容jdk 1.7
想新建一个测试目录 src/test/java
,新建一个类的Main方法
org.interestinglab.waterdrop.WaterDropTest
运行报错
Error:(13, 14) BoolExprBaseVisitor is already defined as object BoolExprBaseVisitor
public class BoolExprBaseVisitor<T> extends AbstractParseTreeVisitor<T> implements BoolExprVisitor<T> {
^
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.