wangyang0918 / flink-native-k8s-operator Goto Github PK
View Code? Open in Web Editor NEWFlink native Kubernetes Operator is a java based control plane for running Apache Flink native application on Kubernetes.
License: Apache License 2.0
Flink native Kubernetes Operator is a java based control plane for running Apache Flink native application on Kubernetes.
License: Apache License 2.0
This is an umbrella issue to collect features that wants to be supported.
kubectl annotation
imageName
, jarURI
the job source code for test items
Hello Yang,
I noticed a problem in the current implementation.
In https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L270
and
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L241
You are actually creating a new clusterClient
without closing it. There will be a lot of stucked connections that will eventually make the container reach error code 137 and restart. This process might take 10~20 hours.
I understand clusterClient
has implemented the AutoClosable
interface. However, we can't directly use try-with-resources to recycle the resource since the trigger savepoint action has a nested CompletableFuture
. Wrap it inside the try-with-resources will break this logic and gives you errors like this.
2021-09-21 00:26:27,960 WARN org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel [] - Force-closing a channel whose registration task was not accepted by an event loop: [id: 0xadcdcf10]
java.util.concurrent.RejectedExecutionException: event executor terminated
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:471) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:421) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:344) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:258) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.client.program.rest.RestClusterClient.lambda$null$23(RestClusterClient.java:777) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) [?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649) [?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
2021-09-21 00:26:27,968 ERROR org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:183) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:425) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:344) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:258) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at org.apache.flink.client.program.rest.RestClusterClient.lambda$null$23(RestClusterClient.java:777) ~[flink-native-k8s-operator-1.0-SNAPSHOT.jar:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) [?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) [?:?]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649) [?:?]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
I tried to add a Thread.sleep() in the try body part and I can get rid of this error. However, the savepoint completion time is not very deterministic due to various reasons. Doing a Thread.sleep() is not the optimal solution and if there is an exception. The close() statement in the try body won't catch it.
I am not sure how we can solve this elegantly? I would like to hear your input on this. Thanks!
Hello Yang,
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/flink-native-k8s-operator.yaml#L113
这个apiGroup我不是很理解,这个apiGroup是在哪里定义的,需要在什么地方用到呢?谢谢
hello,yang:
目前在看flink on k8s 的方案选型,发现了你的开源项目,想咨询下你的这个项目实现方式后期会是flink 项目组主推的方式吗?和开源flink 自身的native k8s 的区别是什么?这个operator的方式比网上google等其他公司的operator的优化在什么地方?谢谢
I've been investigating flink on k8s recently, and have seen some enhancements to flink native k8s in Flink 1.12. I have some questions ~ which of the current open source Flink operators do you think is a more complete implementation, such as Lyft's and google's (GoogleCloudPlatform/flink-on-k8s-operator). Are there any plans to maintain the Flink operator by itself in the future on the Flink side? Thank you.
I think with the current configurations in the operator, we will be able to deploy the operator in one namespace (default
namespace in the current implementation) and watch all the CRD changes in that namespaces. Is there a way to let the operator have the ability to watch FlinkApplication
CRD in all namespaces and deploy Flink applications to different namespaces?
Hello Yang, thanks for sharing the code. After adding ImagePullSecrets property to the code. I am able to verify all functionalities mentioned in the code except for the Ingress way you mentioned to access Flink WebUI.
The ADDRESS column is empty when I tried to do kubectl get ingress.
$ kubectl get ingress
NAME CLASS HOSTS ADDRESS PORTS AGE
flink-native-k8s-operator <none> flink-demo-2.flink.k8s.io,flink-demo.flink.k8s.io 80 38m
$ kubectl describe ingress flink-native-k8s-operator
Name: flink-native-k8s-operator
Namespace: default
Address:
Default backend: default-http-backend:80 (<error: endpoints "default-http-backend" not found>)
Rules:
Host Path Backends
---- ---- --------
flink-demo-2.flink.k8s.io
flink-demo-2-rest:8081 (10.244.0.207:8081)
flink-demo.flink.k8s.io
flink-demo-rest:8081 (10.244.0.206:8081)
Annotations: <none>
Events: <none>
The backend CIDR here is within the POD CIDR for the cluster.
I am not sure if I missed any information. Do we need to create an additional ingress controller (for example, nginx?) besides the code you provided here?
Probably this is not a smart question due to my limited knowledge on k8s... Thanks for your patience in advance.
For example, I guess we need deploy something like this?
https://kubernetes.github.io/ingress-nginx/deploy/#bare-metal
If needed, is there anything I should modify for such yaml to make it compatible with the flink.k8s.io suffix? Thanks!
When I run the operator using both Flink CLI and the operator, it will give me such sed error at the beginning of the logs.
When I use Flink CLI based way to start a flink application. logs is still available. However, when I use this operator, the logs is not available.
When I create the flink application using FLINK CLI. The flink-config-volume will contain three files
flink-conf.yaml
, log4j-console.properties
and logback-console.xml
.
However, when I tried to create flink application using this operator. With the same image, the auto generated flink configmap only contains flink-conf.yaml
and there is NO log4j-console.properties
and logback-console.xml
Here are the arguments in JM pod. As you can see, the operator created pod's command is missing log related configurations.
Flink CLI:
Args:
native-k8s
$JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
Operator created JM:
Args:
native-k8s
$JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 -Xms3462817376 -XX:MaxMetaspaceSize=268435456 org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=429496736b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=3462817376b -D jobmanager.memory.jvm-overhead.max=429496736b
I do keep the command template in the flinkConfig part in the cr.yaml
file. It doesn't help due to some unknown reasons.
kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
Please share some ideas.. Thanks!
I also checked this thread. Didn't find anything useful. Any ideas on this? Thanks.
http://apache-flink.147419.n8.nabble.com/flink-1-11-on-kubernetes-td4586.html
$ kubectl logs -f flink-demo-56f487f446-5w86h
sed: couldn't open temporary file /opt/flink/conf/sedx3urji: Read-only file system
sed: couldn't open temporary file /opt/flink/conf/seduhOSTg: Read-only file system
/docker-entrypoint.sh: line 75: /opt/flink/conf/flink-conf.yaml: Read-only file system
sed: couldn't open temporary file /opt/flink/conf/sedrn0ZFj: Read-only file system
/docker-entrypoint.sh: line 88: /opt/flink/conf/flink-conf.yaml: Read-only file system
/docker-entrypoint.sh: line 90: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
Start command: $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx3462817376 -Xms3462817376 -XX:MaxMetaspaceSize=268435456 org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=429496736b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=3462817376b -D jobmanager.memory.jvm-overhead.max=429496736b
ERROR StatusLogger No Log4j 2 configuration file found. Using default configuration (logging only errors to the console), or user programmatically provided configurations. Set system property 'log4j2.debug' to show Log4j 2 internal initialization logging. See https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions on how to configure Log4j 2
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink/lib/flink-dist_2.11-1.12.1.jar) to field java.util.Properties.serialVersionUID
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Hello @wangyang0918 , I noticed a issue for exporting the Flink WebUI through the ingress controller implementation.
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkApplicationController.java#L239
If operator is deployed in default
namespace, the application is deployed in flink-test
namespace. The Ingress will be created in default namespace along with the operator. However, such Ingress could not access the rest endpoint in flink-test
namespace. I think maybe we should maintain a Ingress for each namespace that has a Flink application?
Another small question, do you know how to retrieve the operator deployment name inside the operator java code? I can't find a API in Fabric8io documentation. I want to do this since I am using helm chart, the operator name is a dynamic value injected via { .Release.Name }
. I tried to use env variables like FLINK_CONF_DIR (https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/flink-native-k8s-operator.yaml#L21) to pass the value into the code. Not sure if there are better solutions? Thanks!!!
大佬,你好,我是@abbykimi,我IDE运行您这个项目的时候,提示有几个漏洞,项目调用了junit:junit等92个开源组件,存在1个安全漏洞,建议你升级下。
漏洞标题:Junit 信息泄露漏洞
漏洞编号:CVE-2020-15250
漏洞描述:
Junit是个人开发者的一个开放源代码的Java测试框架。
JUnit4 4.13.1之前版本存在信息泄露漏洞,该漏洞源于测试规则TemporaryFolder包含一个本地信息泄露漏洞。在类似Unix的系统中,系统的临时目录在该系统上的所有用户之间共享。因此,在将文件和目录写入此目录时,默认情况下,相同系统上的其他用户都可以读取它们。此漏洞不允许其他用户覆盖这些目录或文件的内容。这纯粹是一个信息披露的漏洞。如果JUnit测试编写了敏感信息,这个漏洞就会对您造成影响。
影响范围:[4.7, 4.13.1)
最小修复版本:4.13.1
引入路径:
org.apache.flink:[email protected]>io.fabric8:[email protected]>io.fabric8:[email protected]>com.squareup.okhttp3:[email protected]>junit:[email protected]
还有其它几个漏洞,信息有点多我就不贴了,你自己看下完整报告:https://www.mfsec.cn/jr?p=i395f0
如果你对这个issues有任何疑问可以回复我哈( @abbykimi ),我会及时回复你的。
ping @FuyaoLi2017 , I have created a new branch two-k8s-client-version
showing how to use different fabric8 Kubernetes version.
I just figure out to compile successfully and you need to verify the behavior.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.