knaufk / flink-faker Goto Github PK
View Code? Open in Web Editor NEWA data generator source connector for Flink SQL based on data-faker.
License: Apache License 2.0
A data generator source connector for Flink SQL based on data-faker.
License: Apache License 2.0
faker
tables can already be used in temporal table joins (FOR SYSTEM_TIME AS OF
) with processing time.
In order to support event time temporal table joins with faker
tables directly, faker
tables would need to be interpreted as a changelog in these cases (very similar to the upsert-kafka
connector).
This could be done by an upsert-faker
connector.
There are some metrics in https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics, which we could implement for flink-faker, too, like numRecordsIn.
Hello,
i use the faker plugin in flink-sql-client within my standalone-cluster.
I created the following table:
CREATE TEMPORARY TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT,
WATERMARK FOR log_time AS log_time - INTERVAL '15' SECONDS
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
Adding was successfull, but then i just wanted to select from this table
SELECT * FROM server_logs
But then the following error occured:
Flink SQL> select * from server_logs;
[ERROR] Could not execute SQL statement. Reason:
java.time.format.DateTimeParseException: Text 'Mon Dec 14 17:29:11 CET 2020' could not be parsed at index 2
The dateformat shown there is the recommeded from your README, so what is the failure?
Support for ROW
, MAP
, ÀRRAY
and MULTISET
flinksql执行了:
CREATE TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search.html|/login.html|/prod.html|cart.html|/order.html){1}''} #{regexify ''(HTTP/1.1|HTTP/2|/HTTP/1.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
CREATE TABLE client_errors (
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT
)
WITH (
'connector' = 'blackhole'
);
INSERT INTO client_errors
SELECT
log_time,
request_line,
status_code,
size
FROM server_logs
WHERE
status_code SIMILAR TO '4[0-9][0-9]'
;
select * from server_logs ;
-- 我该怎么解决,难道说必须用你们那个平台?
This is because in FlinkFakerSourceFunction#generateNextRow
we have the following code:
float fieldNullRate = fieldNullRates[i];
if (rand.nextFloat() > fieldNullRate) {
String value = faker.expression(fieldExpressions[i]);
row.setField(i, FakerUtils.stringValueToType(value, typeRoot));
} else {
row.setField(i, null);
}
It should be rand.nextFloat() >= fieldNullRate
as rand.nextFloat()
generates a value in [0, 1)
, so it is possible for a column with zero null rate to generate a null value.
Use this following test code to verify what I have said.
import java.util.Random;
public class Test {
public static void main(String[] args) {
Random random = new Random();
while (random.nextFloat() > 0.0f);
System.out.println("!!!\n");
}
}
The code will exit shortly and prints out !!!
.
A newbie question.
why there's a ~
in the end of thd id
produced by faker?
CREATE TABLE subscriptions (
id STRING,
user_id INT,
type STRING,
start_date TIMESTAMP(3),
end_date TIMESTAMP(3),
payment_expiration TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'faker',
'fields.id.expression' = '#{Internet.uuid}',
'fields.user_id.expression' = '#{number.numberBetween ''1'',''50''}',
'fields.type.expression'= '#{regexify ''(basic|premium|platinum){1}''}',
'fields.start_date.expression' = '#{date.past ''30'',''DAYS''}',
'fields.end_date.expression' = '#{date.future ''365'',''DAYS''}',
'fields.payment_expiration.expression' = '#{date.future ''365'',''DAYS''}'
);
Hi there, my name is Erik Pragt, and sorry for the plug, but I'm one of the authors of Datafaker (https://www.datafaker.net), and I was wondering if you'd be willing to consider to migrate to datafaker.net instead of javafaker.
Datafaker is a fork of Javafaker, but it's a bit more modern (Java 8/11), almost zero dependencies (since 1.1.0), has a lot of new data providers, quite a few issues fixed, and I'm planning on making a release ~every month. I've created Datafaker mostly because of the lack of releases of Javafaker and their inactivity on open PRs, which is something which I'm hoping to do better for Datafaker.
Datafaker is mostly a drop in replacement of Javafaker, so migrating should be low effort, but I'm willing to submit a PR if you are open to the idea of a migration, but no worries if not! Cheers, Erik
java.lang.AbstractMethodError: com.github.knaufk.flink.faker.FlinkFakerTableSource$1.produceDataStream(Lorg/apache/flink/streaming/api/environment/StreamExecutionEnvironment;)Lorg/apache/flink/streaming/api/datastream/DataStream;
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan.translateToPlanInternal(CommonExecTableSourceScan.java:106)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:114)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
While Sequences are not supported by data-faker, it would be nice to support them in flink-faker e.g. for primary key columns.
Run the following SQL in SQL client:
create temporary table faker (
a string,
b string,
c string,
d string
) with (
'connector' = 'faker',
'number-of-rows' = '3',
'fields.a.expression' = '#{lorem.sentence}',
'fields.b.expression' = '#{lorem.sentence}',
'fields.c.expression' = '#{lorem.paragraph}',
'fields.d.expression' = '#{lorem.paragraph}'
);
select * from faker;
Result:
a b c d
Fugit id praesentium soluta n~ Fugit id praesentium soluta n~ A tempore numquam inventore c~ A tempore numquam inventore c~
Fugit id praesentium soluta n~ Fugit id praesentium soluta n~ A tempore numquam inventore c~ A tempore numquam inventore c~
Fugit id praesentium soluta n~ Fugit id praesentium soluta n~ A tempore numquam inventore c~ A tempore numquam inventore c~
It looks like the demo web application page is broken and clicking on it shows the following application error:
Application error An error occurred in the application and your page could not be served. If you are the application owner, [check your logs for details](https://devcenter.heroku.com/articles/logging#view-logs). You can do this from the Heroku CLI with the command
For some queries, it would be convenient to generate null values for some fields and some columns.
The null rate determines the frequency at which a field is empty. For each field, the null rate could be configured via the Table options. The null rate would default to 0.0.
Run the following SQL in SQL client:
create temporary table faker ( a string ) with ( 'connector' = 'faker', 'number-of-rows' = '10', 'fields.a.expression' = '#{address.city}' );
select * from faker;
Exception:
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.faker'.
Table options are:
'connector'='faker'
'fields.a.expression'='#{address.city}'
'number-of-rows'='10'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:159) ~[flink-table-api-java-uber-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184) ~[flink-table-api-java-uber-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:178) ~[?:?]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:116) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[?:?]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:198) ~[?:?]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:190) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[?:?]
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) ~[?:?]
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid expression for column "a".
at com.github.knaufk.flink.faker.FlinkFakerTableSourceFactory.readAndValidateFieldExpression(FlinkFakerTableSourceFactory.java:187) ~[flink-faker-0.4.1.jar:?]
at com.github.knaufk.flink.faker.FlinkFakerTableSourceFactory.createDynamicTableSource(FlinkFakerTableSourceFactory.java:90) ~[flink-faker-0.4.1.jar:?]
at com.github.knaufk.flink.faker.FlinkFakerTableSourceFactory.createDynamicTableSource(FlinkFakerTableSourceFactory.java:21) ~[flink-faker-0.4.1.jar:?]
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156) ~[flink-table-api-java-uber-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184) ~[flink-table-api-java-uber-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:178) ~[?:?]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:116) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[?:?]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:198) ~[?:?]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:190) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[?:?]
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) ~[?:?]
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
... 11 more
Caused by: java.lang.RuntimeException: Unable to resolve #{city_suffix} directive.
at net.datafaker.service.FakeValuesService.resolveExpression(FakeValuesService.java:446) ~[flink-faker-0.4.1.jar:?]
at net.datafaker.service.FakeValuesService.lambda$resolveExpression$1(FakeValuesService.java:450) ~[flink-faker-0.4.1.jar:?]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_252]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_252]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_252]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_252]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_252]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_252]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) ~[?:1.8.0_252]
at net.datafaker.service.FakeValuesService.resolveExpression(FakeValuesService.java:457) ~[flink-faker-0.4.1.jar:?]
at net.datafaker.service.FakeValuesService.expression(FakeValuesService.java:407) ~[flink-faker-0.4.1.jar:?]
at net.datafaker.Faker.expression(Faker.java:713) ~[flink-faker-0.4.1.jar:?]
at com.github.knaufk.flink.faker.FlinkFakerTableSourceFactory.readAndValidateFieldExpression(FlinkFakerTableSourceFactory.java:185) ~[flink-faker-0.4.1.jar:?]
at com.github.knaufk.flink.faker.FlinkFakerTableSourceFactory.createDynamicTableSource(FlinkFakerTableSourceFactory.java:90) ~[flink-faker-0.4.1.jar:?]
at com.github.knaufk.flink.faker.FlinkFakerTableSourceFactory.createDynamicTableSource(FlinkFakerTableSourceFactory.java:21) ~[flink-faker-0.4.1.jar:?]
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156) ~[flink-table-api-java-uber-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:184) ~[flink-table-api-java-uber-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:178) ~[?:?]
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:116) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[?:?]
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[?:?]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:198) ~[?:?]
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:190) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1240) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1188) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:345) ~[?:?]
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[?:?]
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) ~[?:?]
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:172) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
at org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172) ~[flink-sql-client-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
... 11 more
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.