lamaani / kubernetesjoboperator Goto Github PK
View Code? Open in Web Editor NEWAn airflow operator that executes a task in a kubernetes cluster, given a kubernetes yaml configuration or an image refrence.
An airflow operator that executes a task in a kubernetes cluster, given a kubernetes yaml configuration or an image refrence.
We use official helmchart apache-airflow/airflow, in trying this operator starting pod fails with following error:
airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.operations.CreateNamespaceResource, Forbidden: jobs.batch is forbidden: User "system:serviceaccount:airflow:airflow-worker" cannot create resource "jobs" in API group "batch" in the namespace "airflow"
Hello again,
I nearly have a working job workflow. One problem I have left is that completed/failed pods are not cleaned up according to the JobRunnerDeletePolicy
I use.
Simply put, when using JobRunnerDeletePolicy.IfSucceeded
, the job is properly deleted but my pods are still present, in the Completed
state. I would like for the operator to clean up my pods as well once the job is done. Of course this should depend on the clean up policy, it's useful to be able to inspect pods for failed job runs.
Thanks, and sorry for my many requests :)
Describe the bug
I getting this error in our dags running KubenernetesJobOperator
To Reproduce
Steps to reproduce the behavior:
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from airflow.utils.dates import days_ago
dag_name = 'logs-job-operator'
default_args = {"owner": "leandro", "start_date": days_ago(2), "retries": 0}
with DAG(
dag_name,
default_args=default_args,
description='dag_anomaly',
schedule_interval= None,
start_date=days_ago(1),
tags=['ml'],
) as dag:
start = EmptyOperator(
task_id="start",
dag=dag
)
complete = EmptyOperator(
task_id="complete",
dag=dag
)
manifest_job = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": "job-operator-example",
"namespace": "default"
},
"spec": {
"completions": 10,
"parallelism": 10,
"backoffLimit": 10,
"template": {
"spec": {
"nodeSelector": {
"agentpool": "userpool"
},
"containers": [
{
"name": "job-operator-example",
"image": "sikwan/random-json-logger:latest"
}
],
"restartPolicy": "OnFailure"
}
}
}
}
k8sJobOperator = KubernetesJobOperator(task_id="test-job-success", body=manifest_job, dag=dag)
start >> k8sJobOperator >> complete
Expected behavior
No error raise during job execution.
Screenshots
If applicable, add screenshots to help explain your problem.
Environment
Airflow is deployed on Azure Kubernetes Service with KubernetesExecutor that spawns the workers pod in the same AKS Clusters.
Kubernetes Version 1.24.9
Airflow Version 2.4.3
airflow_kubernetes_job_operator-2.0.12
Log
[2023-02-23, 14:40:56 UTC] {queries.py:79} INFO - [anomaly/pods/test-job-success-job-operator-example-of4vaqie-q8rwm]: {"@timestamp": "2023-02-23T14:40:56+0000", "level": "DEBUG", "message": "first loop completed."}
[2023-02-23, 14:40:56 UTC] {queries.py:79} INFO - [anomaly/pods/test-job-success-job-operator-example-of4vaqie-vvn6v]: {"@timestamp": "2023-02-23T14:40:56+0000", "level": "ERROR", "message": "something happened in this execution."}
[2023-02-23, 14:40:56 UTC] {client.py:485} ERROR - Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 443, in _error_catcher
yield
File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 815, in read_chunked
self._update_chunk_length()
File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 745, in _update_chunk_length
line = self._fp.fp.readline()
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.7/ssl.py", line 1071, in recv_into
return self.read(nbytes, buffer)
File "/usr/local/lib/python3.7/ssl.py", line 929, in read
return self._sslobj.read(len, buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _execute_query
self.query_loop(client)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 346, in query_loop
raise ex
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 339, in query_loop
return super().query_loop(client)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 390, in query_loop
raise ex
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 348, in query_loop
for line in self._read_response_stream_lines(response):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 183, in _read_response_stream_lines
for chunk in response.stream(decode_content=False):
File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 623, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 844, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 460, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))
Complete log:
dag_id=logs-job-operator_run_id=manual__2023-02-23T14_31_39.933191+00_00_task_id=test-job-success_attempt=1.log
Hello,
I am trying to run a long job (long = over 5 minutes) using the Kubernetes Job Operator. After 5 minutes, the DAG crashes because of a client issue. In my real-life setup I receive a "Connection reset by peer" error. I wrote a sample to illustrate the issue. It also crashes after 5 minutes but it yields a different error (see here).
My questions:
To Reproduce
Here is a DAG and a job YAML file to reproduce the issue. The DAG only has one task that echoes a first message ("start"), sleeps for 10 minutes and then echoes a second message ("done").
DAG:
import datetime as dt
from typing import Union
import pendulum
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import JobRunnerDeletePolicy, KubernetesJobOperator
def create_dag(
dag_name: str,
environment: str,
start_date: dt.datetime,
schedule_interval: Union[dt.datetime, str],
):
default_args = {
"owner": "Airflow",
"start_date": start_date,
"retries": 0,
}
dag = DAG(
f"{dag_name}_{environment}",
default_args=default_args,
description="Test a 'connection reset by peer' issue with K8S jobs.",
schedule_interval=schedule_interval,
max_active_runs=1,
)
k8s_task = KubernetesJobOperator(
task_id=f"{dag_name}_task",
dag=dag,
body_filepath="dags/templates/test_job.yaml",
config_file="<my-cluster-config>",
namespace="<my-namespace>",
in_cluster=False,
delete_policy=JobRunnerDeletePolicy.IfSucceeded,
)
dag >> k8s_task
return dag
k8s_dag = create_dag(
dag_name="test_connection_reset",
environment="dev",
start_date=dt.datetime(2021, 2, 15, tzinfo=pendulum.timezone("Europe/Brussels")),
schedule_interval="0 6 * * *",
)
YAML file:
apiVersion: batch/v1
kind: Job
metadata:
finalizers:
- foregroundDeletion
spec:
backoffLimit: 2
template:
spec:
restartPolicy: Never
containers:
- name: test-connection-reset-by-peer
image: python:3.8.6-slim
imagePullPolicy: Always
command: [ "bash" ]
args: [ "-c", "echo 'start' && sleep 600 && echo 'done'" ]
I use Airflow 1.10.13 and Kubernetes Operator 1.0.14.
Describe the bug
│ [2021-08-03 22:14:52,663[] WARNING - /usr/local/lib/python3.8/dist-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use kubernetes.client.models.V1Volume
. │
│ [2021-08-03 22:14:52,665[] WARNING - /usr/local/lib/python3.8/dist-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use kubernetes.client.models.V1VolumeMount
.
To Reproduce
Steps to reproduce the behavior:
Look at scheduler logs
Expected behavior
should not see warnings
ORIGINAL POST FROM: @teaglebuilt
I am looking for a way to execute each task without restarting pods for every task....even if it is a job...do you know how this could be possible?
Originally posted by @teaglebuilt in #40 (comment)
I'm working with that operator to launch Spark applications on AWS kubernetes cluster.
The operator needs to get Succeeded or Failed state from SparkAplication and delete via API.
An example of the kubectl command is:
kubectl delete sparkapplications.sparkoperator.k8s.io -n fargate application-test-7g3yjf3k
Where application-test-7g3yjf3k is the name of the application.
An example of code to register those applications on the operator:
def parse_my_resource_state(body) -> KubeResourceState:
if "status" in body:
state = body["status"]["applicationState"]["state"]
if state == "COMPLETED":
return KubeResourceState.Succeeded
elif state == "FAILED":
return KubeResourceState.Failed
return KubeResourceState.Running
KubeApiConfiguration.register_kind(
name="SparkApplication",
api_version="sparkoperator.k8s.io/v1beta2",
parse_kind_state=parse_my_resource_state,
auto_include_in_watch=True,
)
Finally I think those changes related with deletion of spark application could be implemented on JobRunner.py on that file: https://github.com/LamaAni/KubernetesJobOperator/blob/f63f10330affa6c66bb59fd1bad3b5533c56aec3/airflow_kubernetes_job_operator/job_runner.py with a validation about the final state of the job and filtering the resources to get the name of the application just to create a different way to delete job as SparkApplication.
If you need more information related how it works spark operator I could explain more accurated.
I developed a pipeline using this lovely package locally, truly loved it, however upon transitioning all my code to run on Kubernetes, I have not had much success.
I get this error (Official airflow docker image + pip3 install airflow_kubernetes_job_operator
Broken DAG: [/git/repo/components/airflow/dags/full_refresh_dag.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py", line 15, in <module>
from airflow_kubernetes_job_operator.kubernetes_legacy_pod_generators import (
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kubernetes_legacy_pod_generators.py", line 21, in <module>
from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
ModuleNotFoundError: No module named 'airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env'
Now I have tried using different airflow and different python versions, no avail.
I have tried to patch these files (files in backcompact dir) by loading them into a dockerfile, although not optimal, but it did in fact bypass this error, however upon running the task I got
return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis] TypeError: argument of type 'NoneType' is not iterable
(queries.py)
I have tried different files, pods, jobs, nothing works.
Everything works smoothy still with same code synced from the same github repo on my windows pc and my Linux laptop, which makes me suspect that the image itself might be an issue, however I 've had no time to test out that theory. I have sadly reverted to the KubernetesPodOperator but I hope you find this helpful.
Thank you for your wonderful work.
Describe the bug
Trying a simple batch job using docker-desktop failed with the following error:
ERROR - ('Namespace was not provided in yaml and auto namespace resolution failed.', Exception('Could not resolve current namespace, you must provide a namespace or a context file', TypeError("argument of type 'NoneType' is not iterable")))
Traceback (most recent call last):
File "/usr/lib/python3.7/site-packages/airflow_kubernetes_job_operator/job_runner.py", line 81, in get_current_namespace
contexts, active_context = kubernetes.config.list_kube_config_contexts()
File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 621, in list_kube_config_contexts
loader = _get_kube_config_loader_for_yaml_file(config_file)
File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 613, in _get_kube_config_loader_for_yaml_file
**kwargs)
File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 153, in __init__
self.set_active_context(active_context)
File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 173, in set_active_context
context_name = self._config['current-context']
File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 495, in __getitem__
v = self.safe_get(key)
File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 491, in safe_get
key in self.value):
TypeError: argument of type 'NoneType' is not iterable
Task configuration:
job_task=KubernetesJobOperator(
task_id="test-job",
dag=dag,
image="ubuntu",
in_cluster=False,
cluster_context='docker-desktop',
config_file = '/usr/local/airflow/include/.kube/config',
command=["bash", "-c", 'echo "all ok"'],
)
I am running Airflow in a Docker Compose setup (using Astronomers CLI). docker-desktop uses https://kubernetes.docker.internal:6443
as the server endpoint, which was reachable from inside the airflow container.
KubernetesBatchJob Library Version:
0.2.12
To Reproduce
Steps to reproduce the behavior:
Desktop:
Describe the bug
Airflow: 2.5.0
kubernetesjoboperator: 2.0.12
description:
The task running log output is incomplete. When the task runs for more than 4 hours, the Airflow task will stop updating and display error information "Task is not able to be run", but the container in the Kubernetes still outputs logs normally.
Hello,
I am currently trying to use this operator for my use cases. Thanks for the work, I really like the possibility of using jobs rather than pods. I use Helm on a regular basis and I wanted to propose a feature that would make this operator work nicely with Helm/Go-templated specification files.
Templated specification file
Allows to specify fields as Go templated values, a la Helm. The following example is actually exactly how I specify my config/secret volumes in Helm at the moment:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
...
volumes:
- name: config-volume
configMap:
name: {{ .Values.image.releaseName }}
- name: secrets-volume
secret:
secretName: {{ .Values.image.releaseName }}-secrets
The values themselves could be passed to the constructor of the operator. For example:
job_task = KubernetesJobOperator(
...
body_filepath="my_job.yaml",
template_values={"image": {"releaseName": "my-first-release"}},
Advantages
Interoperability between Helm and Airflow: take your Helm job template file and move it to Airflow. No need for additional work/testing.
Alternatives I considered
When using KubernetesJobOperator in Apache Airflow and encountering issues where Jinja templating for jinja_job_args is treating Airflow time macros as string, shall we provide a mechanism similar to the templates_dict in KubernetesPodOperator?
e.g.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval='@daily',
)
my_task = KubernetesPodOperator(
task_id='my_task',
namespace='my_namespace',
image='my_image:latest',
cmds=['/bin/bash', '-c'],
arguments=['echo', '{{ ds }}'], # Using time macro directly
templates_dict={'my_jinja_template': '{{ ds }}'}, # Template for Jinja templating
dag=dag,
)
Ability to pass environmental variables thru kubernetes job operator into container
Describe the bug
Job can't start with error
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kubernetes_job_operator.py", line 368, in execute
on_kube_api_event=lambda event: self.handle_kube_api_event(
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/job_runner.py", line 302, in execute_job
watchable_kinds = GetAPIVersions.get_existing_api_kinds(self.client, all_kinds)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 410, in get_existing_api_kinds
return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis]
File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 410, in <listcomp>
return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis]
TypeError: argument of type 'NoneType' is not iterable
To Reproduce
Run any job
Env:
Airflow docker 2.3.3
Hello,
I'm running into an issue where pod logs are written twice to the Airflow logs. I run a simple "hello world" command and get the expected output twice in the Airflow logs. The pod logs show only one line as expected. Here is my config and output:
** Spec file **
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
imagePullSecrets:
- name: my-private-registry
restartPolicy: Never
containers:
- name: hello-python
image: python:3.8.6-slim
command: ["python", "-c", "print('Hello Python!')"]
imagePullPolicy: Always
** kubectl logs from-image-aaggosqx-9jtkf **
Hello Python
** Airflow logs **
*** Reading local file: /opt/airflow/logs/job-tester/from-image/2020-11-10T10:14:29.942399+00:00/1.log
[2020-11-10 10:14:43,708] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: job-tester.from-image 2020-11-10T10:14:29.942399+00:00 [queued]>
[2020-11-10 10:14:43,795] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: job-tester.from-image 2020-11-10T10:14:29.942399+00:00 [queued]>
[2020-11-10 10:14:43,797] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-11-10 10:14:43,799] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-11-10 10:14:43,802] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-11-10 10:14:43,870] {taskinstance.py:900} INFO - Executing <Task(KubernetesJobOperator): from-image> on 2020-11-10T10:14:29.942399+00:00
[2020-11-10 10:14:43,880] {standard_task_runner.py:53} INFO - Started process 376 to run task
[2020-11-10 10:14:44,168] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: job-tester.from-image 2020-11-10T10:14:29.942399+00:00 [running]> 3248fd2ede4d
[2020-11-10 10:14:44,622] {job_runner.py:255} INFO - {job-runner}: Executing context: my-cluster
[2020-11-10 10:14:44,623] {job_runner.py:255} INFO - {job-runner}: Executing cluster: my-cluster
[2020-11-10 10:14:44,626] {job_runner.py:255} INFO - {job-runner}: Started watcher for kinds: pod, service, job, deployment
[2020-11-10 10:14:44,628] {job_runner.py:255} INFO - {job-runner}: Watching namespaces: my-namespace
[2020-11-10 10:14:44,638] {job_runner.py:255} INFO - {job-runner}: Waiting for my-namespace/from-image-aaggosqx to finish...
[2020-11-10 10:14:44,922] {operations.py:62} INFO - [my-namespace/jobs/from-image-aaggosqx] created
[2020-11-10 10:14:44,940] {watchers.py:276} INFO - [my-namespace/pods/from-image-aaggosqx-9jtkf] Pending
[2020-11-10 10:14:45,085] {watchers.py:276} INFO - [my-namespace/jobs/from-image-aaggosqx] Pending
[2020-11-10 10:14:45,088] {watchers.py:276} INFO - [my-namespace/jobs/from-image-aaggosqx] Running
[2020-11-10 10:14:49,576] {watchers.py:276} INFO - [my-namespace/pods/from-image-aaggosqx-9jtkf] Succeeded
[2020-11-10 10:14:49,600] {watchers.py:276} INFO - [my-namespace/jobs/from-image-aaggosqx] Succeeded
[2020-11-10 10:14:49,605] {job_runner.py:255} INFO - {job-runner}: Job Succeeded
[2020-11-10 10:14:49,606] {job_runner.py:255} INFO - {job-runner}: Deleting resources due to policy: IfSucceeded
[2020-11-10 10:14:49,608] {job_runner.py:255} INFO - {job-runner}: Deleting job..
[2020-11-10 10:14:49,609] {job_runner.py:255} INFO - {job-runner}: Deleting objects: my-namespace/jobs/from-image-aaggosqx
[2020-11-10 10:14:49,770] {queries.py:57} INFO - [my-namespace/pods/from-image-aaggosqx-9jtkf]: Hello Python
[2020-11-10 10:14:49,856] {queries.py:57} INFO - [my-namespace/pods/from-image-aaggosqx-9jtkf]: Hello Python
[2020-11-10 10:14:49,858] {operations.py:62} INFO - [my-namespace/jobs/from-image-aaggosqx] deleted
[2020-11-10 10:14:49,861] {job_runner.py:255} INFO - {job-runner}: Job deleted
[2020-11-10 10:14:49,863] {job_runner.py:255} INFO - {job-runner}: Client stopped, execution completed.
[2020-11-10 10:14:49,903] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=job-tester, task_id=from-image, execution_date=20201110T101429, start_date=20201110T101443, end_date=20201110T101449
[2020-11-10 10:14:53,589] {logging_mixin.py:112} INFO - [2020-11-10 10:14:53,588] {local_task_job.py:103} INFO - Task exited with return code 0
This feels like an issue where two watchers would be attached instead of one, but I'm not sure. Any idea?
So basically we are facing this issue located in the k8s python client:
This has been opened for more than one year. As far as I understand, once the client is instanced, there is no way to refresh it's credentials given that it caches them. Because of that, when credentials expire, client calls respond with 401. Meanwhile, people started to develop workarounds on their solutions. The most popular seems to be creating a new client with fresh credentials every time we need to call the client.
So basically that's what i'm proposing. Because this workaround could be ugly for people using the operator that is not suffering this issue, we should consider make some kind of feature toggling from operator params or something like that.
This is my naive proposal given my short knowledge on this but any other proposal would be appreciated.
Because, in our case, we can't run any job that last more than 15 minutes, so our pipelines crash and our pods weren't deleted and got staled (incrementing our infra costs since we are using aws fargate).
Describe alternatives you've considered
Try to catch 401 responses and refresh credentials there somehow.
Additional context
We are running everything on aws. We have MWAA (airflow 2.0) with dags running tasks like this:
KubernetesLegacyJobOperator(
task_id="task1",
namespace="fargate",
config_file=kube_config_yaml,
get_logs=True,
startup_timeout_seconds=300,
body_filepath="/usr/local/airflow/dags/config/task1.yaml",
dag=pipeline,
is_delete_operator_pod=True,
delete_policy="Always",
execution_timeout=timedelta(hours=1)
)
Yaml file looks like
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
annotations:
kubernetes_job_operator.main_container: "spark-kubernetes-driver"
name: "foo"
namespace: fargate
spec:
arguments:
...
deps:
files:
- "local:///etc/spark/conf/default-logstash-fields.properties"
driver:
coreLimit: 2000m
coreRequest: 1995m
cores: 2
env:
....
initContainers:
- command:
- sh
- "-c"
- "echo hi"
image: busybox
name: "volume-mount-hack"
volumeMounts:
- mountPath: /tmp/committer
name: "staging-vol"
readOnly: false
labels:
metrics: prometheus
version: '3.1.1'
memory: 4086M
podSecurityContext:
fsGroup: 185
serviceAccount: fargate
sidecars:
- command:
- "/fluent-bit/bin/fluent-bit"
- "-c"
- "/tmp/fluent-bit/fluent-bit-custom.conf"
image: "fluent/fluent-bit:1.7"
name: "fluent-bit"
resources:
limits:
cpu: 50m
memory: 60Mi
requests:
cpu: 5m
memory: 10Mi
volumeMounts:
- mountPath: "/tmp/spark-logs"
name: "spark-logs"
readOnly: false
- mountPath: "/tmp/fluent-bit"
name: "fluent-bit"
readOnly: false
volumeMounts:
- mountPath: "/tmp/spark-logs"
name: "spark-logs"
readOnly: false
- mountPath: "/tmp/fluent-bit"
name: "fluent-bit"
readOnly: false
- mountPath: /tmp/committer
name: "staging-vol"
readOnly: false
dynamicAllocation:
enabled: true
initialExecutors: 4
maxExecutors: 4
minExecutors: 2
executor:
coreLimit: 2000m
coreRequest: 1995m
cores: 2
deleteOnTermination: true
labels:
metrics: prometheus
version: '3.1.1'
memory: 6134M
podSecurityContext:
fsGroup: 185
serviceAccount: fargate
sidecars:
- command:
- "/fluent-bit/bin/fluent-bit"
- "-c"
- "/tmp/fluent-bit/fluent-bit-custom.conf"
image: "fluent/fluent-bit:1.7"
name: "fluent-bit"
resources:
limits:
cpu: 50m
memory: 60Mi
requests:
cpu: 5m
memory: 10Mi
volumeMounts:
- mountPath: "/tmp/spark-logs"
name: "spark-logs"
readOnly: false
- mountPath: "/tmp/fluent-bit"
name: "fluent-bit"
readOnly: false
volumeMounts:
- mountPath: "/tmp/spark-logs"
name: "spark-logs"
readOnly: false
- mountPath: "/tmp/fluent-bit"
name: "fluent-bit"
readOnly: false
- mountPath: /tmp/committer
name: "staging-vol"
readOnly: false
hadoopConf:
fs.s3.maxRetries: '10'
fs.s3a.aws.credentials.provider: ...
fs.s3a.block.size: 64M
fs.s3a.buffer.dir: /tmp/committer/buffer
fs.s3a.committer.magic.enabled: 'false'
fs.s3a.committer.name: partitioned
fs.s3a.committer.staging.abort.pending.uploads: 'false'
"fs.s3a.committer.staging.conflict-mode": replace
fs.s3a.committer.staging.tmp.path: "file:///tmp/committer/staging"
fs.s3a.connection.ssl.enabled: 'false'
fs.s3a.experimental.fadvise: random
fs.s3a.fast.upload.buffer: disk
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.multipart.purge: 'false'
fs.s3a.retry.throttle.interval: 10000ms
image: "spark:3.1.1-v1.0.7"
imagePullPolicy: IfNotPresent
mainApplicationFile: "s3a://foo.jar"
mainClass: com.foo.FooApp
mode: cluster
monitoring:
exposeDriverMetrics: true
exposeExecutorMetrics: true
prometheus:
...
restartPolicy:
type: Never
sparkConf:
...
sparkConfigMap: "spark-conf-map-foo"
sparkVersion: '3.1.1'
template:
metadata:
labels:
app: "foo-pod"
type: Scala
volumes:
- emptyDir: {}
name: "spark-logs"
- configMap:
name: "fluent-bit-conf-map"
name: "fluent-bit"
- name: "staging-vol"
persistentVolumeClaim:
claimName: "data-staging-share"
As you can see, we are running spark applications.
Here is the kube conf file:
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: ***
name: ****
contexts:
- context:
cluster: ****
user: ****
name: aws
current-context: aws
kind: Config
preferences: {}
users:
- name: ****
user:
exec:
apiVersion: ****
args:
- --region
- ***
- eks
- get-token
- --cluster-name
- ****
command: /usr/local/airflow/.local/bin/aws
Here is the way we get the token and the reason why it last for just 15 min
https://awscli.amazonaws.com/v2/documentation/api/latest/reference/eks/get-token.html
And here is one example for a failed client call log looks like:
[2021-09-15 10:31:15,682] {{client.py:483}} ERROR - Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 334, in query_loop
collection_formats={},
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
_preload_content, _request_timeout, _host)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
_request_timeout=_request_timeout)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 420, in request
body=body)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 270, in DELETE
body=body)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 233, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '49ea69bb-f188-4620-aa0e-e4e57ba77e95', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Wed, 15 Sep 2021 10:31:15 GMT', 'Content-Length': '129'})
HTTP response body: {'kind': 'Status', 'apiVersion': 'v1', 'metadata': {}, 'status': 'Failure', 'message': 'Unauthorized', 'reason': 'Unauthorized', 'code': 401}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/zthreading/tasks.py", line 173, in _run_as_thread
rslt = self.action(*args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _exdcute_query
self.query_loop(client)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 375, in query_loop
raise err
airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.operations.DeleteNamespaceResource, Unauthorized: Unauthorized
Describe the bug
When passing secrets the pod is unable to find it
Hi,
We are using KubernetesLegacyJobOperator
to manage our Spark-based applications with Airflow. In our setup we are allocating two containers (spark-kubernetes-driver and fluent-bit) within the same pod and this is causing the following error:
airflow_kubernetes_job_operator.kube_api.queries.GetPodLogs, Bad Request: a container name must be specified for pod application-name, choose one of: [spark-kubernetes-driver fluent-bit]
We managed to make it work by setting the value of the get_logs
parameter to False
, but we would like to have access to the logs in the airflow task.
Is there any way to get this? I looked for but I had no luck finding an answer.
Thanks in advance!
A fragment of the stack trace:
kubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'cef97a1d-ee58-4bc1-b220-63338af4f3ee', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Mon, 26 Apr 2021 11:14:05 GMT', 'Content-Length': '259'})
HTTP response body: {'kind': 'Status', 'apiVersion': 'v1', 'metadata': {}, 'status': 'Failure', 'message': 'a container name must be specified for pod application-y761881y, choose one of: [spark-kubernetes-driver fluent-bit]', 'reason': 'BadRequest', 'code': 400}
Hello.
There are no logs from jobs in out environment. 'get_logs' is True.
We use service account with some rights, may be we need more of it?
Logs
2022-10-06, 04:21:04 UTC] {taskinstance.py:1397} INFO - Executing <Task(KubernetesJobOperator): dbt_eds_daily> on 2022-10-05 04:21:00+00:00
[2022-10-06, 04:21:04 UTC] {standard_task_runner.py:52} INFO - Started process 28952 to run task
[2022-10-06, 04:21:04 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_daily_1', 'dbt_eds_daily', 'scheduled__2022-10-05T04:21:00+00:00', '--job-id', '260441', '--raw', '--subdir', 'DAGS_FOLDER/dags/eds/dbt_daily.py', '--cfg-path', '/tmp/tmprbn4so5r', '--error-file', '/tmp/tmpqbjlb0e3']
[2022-10-06, 04:21:04 UTC] {standard_task_runner.py:80} INFO - Job 260441: Subtask dbt_eds_daily
[2022-10-06, 04:21:04 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:528: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)
[2022-10-06, 04:21:04 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:528: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)
[2022-10-06, 04:21:04 UTC] {task_command.py:371} INFO - Running <TaskInstance: dbt_daily_1.dbt_eds_daily scheduled__2022-10-05T04:21:00+00:00 [running]> on host airflow-etl-worker-1.airflow-etl-worker.dapa.svc.dapak
[2022-10-06, 04:21:04 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:202: AirflowContextDeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds }}' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2022-10-06, 04:21:04 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:202: AirflowContextDeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2022-10-06, 04:21:05 UTC] {taskinstance.py:1589} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dbt_daily_1
AIRFLOW_CTX_TASK_ID=dbt_eds_daily
AIRFLOW_CTX_EXECUTION_DATE=2022-10-05T04:21:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-10-05T04:21:00+00:00
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Executing context: dapak
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Executing cluster: dapak
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Started watcher for kinds: pod, service, job, deployment, configmap, secret
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Watching namespaces: dapa-jobs
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Waiting for dapa-jobs/dbt-eds-daily to finish...
[2022-10-06, 04:21:05 UTC] {watchers.py:330} INFO - [dapa-jobs/jobs/dbt-eds-daily] Pending
[2022-10-06, 04:21:05 UTC] {watchers.py:330} INFO - [dapa-jobs/secrets/dbt-spark-profiles] Active
[2022-10-06, 04:21:05 UTC] {operations.py:62} INFO - [dapa-jobs/secrets/dbt-spark-profiles] created
[2022-10-06, 04:21:05 UTC] {operations.py:62} INFO - [dapa-jobs/jobs/dbt-eds-daily] created
[2022-10-06, 04:21:05 UTC] {watchers.py:330} INFO - [dapa-jobs/jobs/dbt-eds-daily] Running
[2022-10-06, 04:21:16 UTC] {watchers.py:330} INFO - [dapa-jobs/jobs/dbt-eds-daily] Succeeded
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Job Succeeded
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Deleting resources due to policy: IfSucceeded
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Deleting job..
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Deleting objects: dapa-jobs/jobs/dbt-eds-daily, dapa-jobs/secrets/dbt-spark-profiles
[2022-10-06, 04:21:16 UTC] {operations.py:62} INFO - [dapa-jobs/jobs/dbt-eds-daily] deleted
[2022-10-06, 04:21:16 UTC] {operations.py:62} INFO - [dapa-jobs/secrets/dbt-spark-profiles] deleted
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Job deleted
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Client stopped, execution completed.
[2022-10-06, 04:21:16 UTC] {taskinstance.py:1415} INFO - Marking task as SUCCESS. dag_id=dbt_daily_1, task_id=dbt_eds_daily, execution_date=20221005T042100, start_date=20221006T042104, end_date=20221006T042116
[2022-10-06, 04:21:16 UTC] {local_task_job.py:156} INFO - Task exited with return code 0
[2022-10-06, 04:21:16 UTC] {local_task_job.py:273} INFO - 1 downstream tasks scheduled from follow-on schedule check
Versions (please complete the following information):
Additional context
K8s acls:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ airflow_role_name }}
namespace: {{ airflow_jobs_namespace }}
rules:
- verbs:
- create
- get
- delete
- watch
- list
- patch
- update
apiGroups:
- batch
resources:
- jobs
- verbs:
- create
- get
- delete
- watch
- list
- patch
- update
apiGroups:
- ''
resources:
- configmaps
- verbs:
- create
- get
- delete
- watch
- list
- patch
- update
apiGroups:
- ''
resources:
- secrets
Feel free to write here how you use this operator, what was the case you needed it for, and if you can how it was implemented.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.