Git Product home page Git Product logo

kubernetesjoboperator's People

Contributors

dependabot[bot] avatar jvalencia-cbs avatar lamaani avatar mgoldbas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

kubernetesjoboperator's Issues

Cannot create resource "jobs"

We use official helmchart apache-airflow/airflow, in trying this operator starting pod fails with following error:

airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.operations.CreateNamespaceResource, Forbidden: jobs.batch is forbidden: User "system:serviceaccount:airflow:airflow-worker" cannot create resource "jobs" in API group "batch" in the namespace "airflow"

BUG: Delete pods after job completion

Hello again,

I nearly have a working job workflow. One problem I have left is that completed/failed pods are not cleaned up according to the JobRunnerDeletePolicy I use.

Simply put, when using JobRunnerDeletePolicy.IfSucceeded, the job is properly deleted but my pods are still present, in the Completed state. I would like for the operator to clean up my pods as well once the job is done. Of course this should depend on the clean up policy, it's useful to be able to inspect pods for failed job runs.

Thanks, and sorry for my many requests :)

ConnectionResetError: [Errno 104] Connection reset by peer

Describe the bug
I getting this error in our dags running KubenernetesJobOperator

To Reproduce
Steps to reproduce the behavior:

  1. Create an aiflow DAG with the following code:
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from airflow.utils.dates import days_ago

dag_name = 'logs-job-operator'

default_args = {"owner": "leandro", "start_date": days_ago(2), "retries": 0}

with DAG(
    dag_name,
    default_args=default_args,
    description='dag_anomaly',
    schedule_interval= None,
    start_date=days_ago(1),
    tags=['ml'],
) as dag:

    start = EmptyOperator(
        task_id="start",
        dag=dag
    )
    complete = EmptyOperator(
        task_id="complete",
        dag=dag
    )


    manifest_job = {
        "apiVersion": "batch/v1",
        "kind": "Job",
        "metadata": {
            "name": "job-operator-example",
            "namespace": "default"
        },
        "spec": {
            "completions": 10,
            "parallelism": 10,
            "backoffLimit": 10,
            "template": {
                "spec": {
                    "nodeSelector": {
                        "agentpool": "userpool"
                    },
                    "containers": [
                        {
                            "name": "job-operator-example",
                            "image": "sikwan/random-json-logger:latest"
                        }
                    ],
                    "restartPolicy": "OnFailure"
                }
            }
        }
    }


    k8sJobOperator = KubernetesJobOperator(task_id="test-job-success", body=manifest_job, dag=dag)

    start >> k8sJobOperator >> complete
  1. Run the DAG
  2. In our environment the error occurs after about 5 min of execution

Expected behavior
No error raise during job execution.

Screenshots
If applicable, add screenshots to help explain your problem.

Environment
Airflow is deployed on Azure Kubernetes Service with KubernetesExecutor that spawns the workers pod in the same AKS Clusters.
Kubernetes Version 1.24.9
Airflow Version 2.4.3
airflow_kubernetes_job_operator-2.0.12

Log

[2023-02-23, 14:40:56 UTC] {queries.py:79} INFO - [anomaly/pods/test-job-success-job-operator-example-of4vaqie-q8rwm]: {"@timestamp": "2023-02-23T14:40:56+0000", "level": "DEBUG", "message": "first loop completed."}
[2023-02-23, 14:40:56 UTC] {queries.py:79} INFO - [anomaly/pods/test-job-success-job-operator-example-of4vaqie-vvn6v]: {"@timestamp": "2023-02-23T14:40:56+0000", "level": "ERROR", "message": "something happened in this execution."}
[2023-02-23, 14:40:56 UTC] {client.py:485} ERROR - Traceback (most recent call last):

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 443, in _error_catcher
    yield

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 815, in read_chunked
    self._update_chunk_length()

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 745, in _update_chunk_length
    line = self._fp.fp.readline()

  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)

  File "/usr/local/lib/python3.7/ssl.py", line 1071, in recv_into
    return self.read(nbytes, buffer)

  File "/usr/local/lib/python3.7/ssl.py", line 929, in read
    return self._sslobj.read(len, buffer)

ConnectionResetError: [Errno 104] Connection reset by peer


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _execute_query
    self.query_loop(client)

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 346, in query_loop
    raise ex

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 339, in query_loop
    return super().query_loop(client)

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 390, in query_loop
    raise ex

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 348, in query_loop
    for line in self._read_response_stream_lines(response):

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 183, in _read_response_stream_lines
    for chunk in response.stream(decode_content=False):

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 623, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 844, in read_chunked
    self._original_response.close()

  File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 460, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)

urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))

Complete log:
dag_id=logs-job-operator_run_id=manual__2023-02-23T14_31_39.933191+00_00_task_id=test-job-success_attempt=1.log

Connection reset by peer on long job run

Hello,

I am trying to run a long job (long = over 5 minutes) using the Kubernetes Job Operator. After 5 minutes, the DAG crashes because of a client issue. In my real-life setup I receive a "Connection reset by peer" error. I wrote a sample to illustrate the issue. It also crashes after 5 minutes but it yields a different error (see here).

My questions:

  • Why does the connection crash 5 minutes after the pod starts?
  • Is there anything I can do to prevent that?

To Reproduce
Here is a DAG and a job YAML file to reproduce the issue. The DAG only has one task that echoes a first message ("start"), sleeps for 10 minutes and then echoes a second message ("done").

DAG:

import datetime as dt
from typing import Union

import pendulum
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import JobRunnerDeletePolicy, KubernetesJobOperator

def create_dag(
    dag_name: str,
    environment: str,
    start_date: dt.datetime,
    schedule_interval: Union[dt.datetime, str],
):
    default_args = {
        "owner": "Airflow",
        "start_date": start_date,
        "retries": 0,
    }
    dag = DAG(
        f"{dag_name}_{environment}",
        default_args=default_args,
        description="Test a 'connection reset by peer' issue with K8S jobs.",
        schedule_interval=schedule_interval,
        max_active_runs=1,
    )

    k8s_task = KubernetesJobOperator(
        task_id=f"{dag_name}_task",
        dag=dag,
        body_filepath="dags/templates/test_job.yaml",
        config_file="<my-cluster-config>",
        namespace="<my-namespace>",
        in_cluster=False,
        delete_policy=JobRunnerDeletePolicy.IfSucceeded,
    )

    dag >> k8s_task

    return dag


k8s_dag = create_dag(
    dag_name="test_connection_reset",
    environment="dev",
    start_date=dt.datetime(2021, 2, 15, tzinfo=pendulum.timezone("Europe/Brussels")),
    schedule_interval="0 6 * * *",
)

YAML file:

apiVersion: batch/v1
kind: Job
metadata:
  finalizers:
    - foregroundDeletion
spec:
  backoffLimit: 2
  template:
    spec:
      restartPolicy: Never
      containers:
        - name: test-connection-reset-by-peer
          image: python:3.8.6-slim
          imagePullPolicy: Always
          command: [ "bash" ]
          args: [ "-c", "echo 'start' && sleep 600 && echo 'done'" ]

I use Airflow 1.10.13 and Kubernetes Operator 1.0.14.

Investigate warnings

Describe the bug
│ [2021-08-03 22:14:52,663[] WARNING - /usr/local/lib/python3.8/dist-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use kubernetes.client.models.V1Volume. │
│ [2021-08-03 22:14:52,665[] WARNING - /usr/local/lib/python3.8/dist-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use kubernetes.client.models.V1VolumeMount.

To Reproduce
Steps to reproduce the behavior:
Look at scheduler logs

Expected behavior
should not see warnings

FEATURE: Delete all completed tasks, regardless the final result of the task (successful or failed).

Feature description

I'm working with that operator to launch Spark applications on AWS kubernetes cluster.

The operator needs to get Succeeded or Failed state from SparkAplication and delete via API.
An example of the kubectl command is:

kubectl delete sparkapplications.sparkoperator.k8s.io -n fargate application-test-7g3yjf3k

Where application-test-7g3yjf3k is the name of the application.

An example of code to register those applications on the operator:

   def parse_my_resource_state(body) -> KubeResourceState:
         if "status" in body:
             state = body["status"]["applicationState"]["state"]
             if state == "COMPLETED":
                 return KubeResourceState.Succeeded
             elif state == "FAILED":
                 return KubeResourceState.Failed
         return KubeResourceState.Running    


   KubeApiConfiguration.register_kind(
       name="SparkApplication",
       api_version="sparkoperator.k8s.io/v1beta2",
       parse_kind_state=parse_my_resource_state,
       auto_include_in_watch=True,
   )

Finally I think those changes related with deletion of spark application could be implemented on JobRunner.py on that file: https://github.com/LamaAni/KubernetesJobOperator/blob/f63f10330affa6c66bb59fd1bad3b5533c56aec3/airflow_kubernetes_job_operator/job_runner.py with a validation about the final state of the job and filtering the resources to get the name of the application just to create a different way to delete job as SparkApplication.

If you need more information related how it works spark operator I could explain more accurated.

Any specific setup on kubernetes?

I developed a pipeline using this lovely package locally, truly loved it, however upon transitioning all my code to run on Kubernetes, I have not had much success.
I get this error (Official airflow docker image + pip3 install airflow_kubernetes_job_operator

Broken DAG: [/git/repo/components/airflow/dags/full_refresh_dag.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py", line 15, in <module>
    from airflow_kubernetes_job_operator.kubernetes_legacy_pod_generators import (
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kubernetes_legacy_pod_generators.py", line 21, in <module>
    from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
ModuleNotFoundError: No module named 'airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env'

Now I have tried using different airflow and different python versions, no avail.
I have tried to patch these files (files in backcompact dir) by loading them into a dockerfile, although not optimal, but it did in fact bypass this error, however upon running the task I got
return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis] TypeError: argument of type 'NoneType' is not iterable
(queries.py)
I have tried different files, pods, jobs, nothing works.

Everything works smoothy still with same code synced from the same github repo on my windows pc and my Linux laptop, which makes me suspect that the image itself might be an issue, however I 've had no time to test out that theory. I have sadly reverted to the KubernetesPodOperator but I hope you find this helpful.

Thank you for your wonderful work.

Namespace resolution failed

Describe the bug
Trying a simple batch job using docker-desktop failed with the following error:

ERROR - ('Namespace was not provided in yaml and auto namespace resolution failed.', Exception('Could not resolve current namespace, you must provide a namespace or a context file', TypeError("argument of type 'NoneType' is not iterable")))
Traceback (most recent call last):
  File "/usr/lib/python3.7/site-packages/airflow_kubernetes_job_operator/job_runner.py", line 81, in get_current_namespace
    contexts, active_context = kubernetes.config.list_kube_config_contexts()
  File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 621, in list_kube_config_contexts
    loader = _get_kube_config_loader_for_yaml_file(config_file)
  File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 613, in _get_kube_config_loader_for_yaml_file
    **kwargs)
  File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 153, in __init__
    self.set_active_context(active_context)
  File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 173, in set_active_context
    context_name = self._config['current-context']
  File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 495, in __getitem__
    v = self.safe_get(key)
  File "/usr/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 491, in safe_get
    key in self.value):
TypeError: argument of type 'NoneType' is not iterable

Task configuration:

job_task=KubernetesJobOperator(
    task_id="test-job",
    dag=dag,
    image="ubuntu",
    in_cluster=False,
    cluster_context='docker-desktop',
    config_file = '/usr/local/airflow/include/.kube/config',
    command=["bash", "-c", 'echo "all ok"'],
)

I am running Airflow in a Docker Compose setup (using Astronomers CLI). docker-desktop uses https://kubernetes.docker.internal:6443 as the server endpoint, which was reachable from inside the airflow container.

KubernetesBatchJob Library Version:
0.2.12

To Reproduce
Steps to reproduce the behavior:

  1. Start a kubernetes cluster on docker-desktop
  2. Create a DAG with the kubernetes batch job operator (see above).
  3. Trigger DAG (e.g. via Airflow UI)
  4. Check Logs of Task
  5. See stack trace

Desktop:

  • OS: MacOs

Running task logs is incomplete

Describe the bug
Airflow: 2.5.0
kubernetesjoboperator: 2.0.12
description:
The task running log output is incomplete. When the task runs for more than 4 hours, the Airflow task will stop updating and display error information "Task is not able to be run", but the container in the Kubernetes still outputs logs normally.

image

FEATURE: templated specification file / override specific fields

Hello,

I am currently trying to use this operator for my use cases. Thanks for the work, I really like the possibility of using jobs rather than pods. I use Helm on a regular basis and I wanted to propose a feature that would make this operator work nicely with Helm/Go-templated specification files.

Feature description

Templated specification file
Allows to specify fields as Go templated values, a la Helm. The following example is actually exactly how I specify my config/secret volumes in Helm at the moment:

apiVersion: batch/v1
kind: Job
spec:
  template:
    spec:
      ...
      volumes:
        - name: config-volume
          configMap:
            name: {{ .Values.image.releaseName }}
        - name: secrets-volume
          secret:
            secretName: {{ .Values.image.releaseName }}-secrets

The values themselves could be passed to the constructor of the operator. For example:

job_task = KubernetesJobOperator(
    ...
    body_filepath="my_job.yaml",
    template_values={"image": {"releaseName": "my-first-release"}},

Advantages

Interoperability between Helm and Airflow: take your Helm job template file and move it to Airflow. No need for additional work/testing.

Alternatives I considered

  1. Implement this myself, using jinja: very doable, I'm just suggesting something generic and familiar for all Helm users (there are dozens of us!). It's probably what I'll end up doing in the meantime.
  2. Specify the body in Python directly: again, doable, but this feels exotic coming from Helm. This reduces the interoperability between the two environments and forces to use two formats while this is (kinda) standardised.

FEATURE: need 'templates_dict' to ensure proper rendering of time macros.

When using KubernetesJobOperator in Apache Airflow and encountering issues where Jinja templating for jinja_job_args is treating Airflow time macros as string, shall we provide a mechanism similar to the templates_dict in KubernetesPodOperator?
e.g.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.kubernetes_pod_operator import KubernetesPodOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'my_dag',
    default_args=default_args,
    schedule_interval='@daily',
)

my_task = KubernetesPodOperator(
    task_id='my_task',
    namespace='my_namespace',
    image='my_image:latest',
    cmds=['/bin/bash', '-c'],
    arguments=['echo', '{{ ds }}'],  # Using time macro directly
    templates_dict={'my_jinja_template': '{{ ds }}'},  # Template for Jinja templating
    dag=dag,
)

Error after upgrading to airflow 2.3.3

Describe the bug

Job can't start with error

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kubernetes_job_operator.py", line 368, in execute
    on_kube_api_event=lambda event: self.handle_kube_api_event(
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/job_runner.py", line 302, in execute_job
    watchable_kinds = GetAPIVersions.get_existing_api_kinds(self.client, all_kinds)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 410, in get_existing_api_kinds
    return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis]
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 410, in <listcomp>
    return [k for k in all_kinds if k.api_version == "v1" or k.api_version in apis]
TypeError: argument of type 'NoneType' is not iterable

To Reproduce

Run any job

Env:
Airflow docker 2.3.3

Duplicate pod logs

Hello,

I'm running into an issue where pod logs are written twice to the Airflow logs. I run a simple "hello world" command and get the expected output twice in the Airflow logs. The pod logs show only one line as expected. Here is my config and output:

** Spec file **

apiVersion: batch/v1
kind: Job
spec:
  template:
    spec:
      imagePullSecrets:
        - name: my-private-registry
      restartPolicy: Never
      containers:
        - name: hello-python
          image: python:3.8.6-slim
          command: ["python", "-c", "print('Hello Python!')"]
          imagePullPolicy: Always

** kubectl logs from-image-aaggosqx-9jtkf **

Hello Python

** Airflow logs **

*** Reading local file: /opt/airflow/logs/job-tester/from-image/2020-11-10T10:14:29.942399+00:00/1.log
[2020-11-10 10:14:43,708] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: job-tester.from-image 2020-11-10T10:14:29.942399+00:00 [queued]>
[2020-11-10 10:14:43,795] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: job-tester.from-image 2020-11-10T10:14:29.942399+00:00 [queued]>
[2020-11-10 10:14:43,797] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2020-11-10 10:14:43,799] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-11-10 10:14:43,802] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2020-11-10 10:14:43,870] {taskinstance.py:900} INFO - Executing <Task(KubernetesJobOperator): from-image> on 2020-11-10T10:14:29.942399+00:00
[2020-11-10 10:14:43,880] {standard_task_runner.py:53} INFO - Started process 376 to run task
[2020-11-10 10:14:44,168] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: job-tester.from-image 2020-11-10T10:14:29.942399+00:00 [running]> 3248fd2ede4d
[2020-11-10 10:14:44,622] {job_runner.py:255} INFO - {job-runner}: Executing context: my-cluster
[2020-11-10 10:14:44,623] {job_runner.py:255} INFO - {job-runner}: Executing cluster: my-cluster
[2020-11-10 10:14:44,626] {job_runner.py:255} INFO - {job-runner}: Started watcher for kinds: pod, service, job, deployment
[2020-11-10 10:14:44,628] {job_runner.py:255} INFO - {job-runner}: Watching namespaces: my-namespace
[2020-11-10 10:14:44,638] {job_runner.py:255} INFO - {job-runner}: Waiting for my-namespace/from-image-aaggosqx to finish...
[2020-11-10 10:14:44,922] {operations.py:62} INFO - [my-namespace/jobs/from-image-aaggosqx] created
[2020-11-10 10:14:44,940] {watchers.py:276} INFO - [my-namespace/pods/from-image-aaggosqx-9jtkf] Pending
[2020-11-10 10:14:45,085] {watchers.py:276} INFO - [my-namespace/jobs/from-image-aaggosqx] Pending
[2020-11-10 10:14:45,088] {watchers.py:276} INFO - [my-namespace/jobs/from-image-aaggosqx] Running
[2020-11-10 10:14:49,576] {watchers.py:276} INFO - [my-namespace/pods/from-image-aaggosqx-9jtkf] Succeeded
[2020-11-10 10:14:49,600] {watchers.py:276} INFO - [my-namespace/jobs/from-image-aaggosqx] Succeeded
[2020-11-10 10:14:49,605] {job_runner.py:255} INFO - {job-runner}: Job Succeeded
[2020-11-10 10:14:49,606] {job_runner.py:255} INFO - {job-runner}: Deleting resources due to policy: IfSucceeded
[2020-11-10 10:14:49,608] {job_runner.py:255} INFO - {job-runner}: Deleting job..
[2020-11-10 10:14:49,609] {job_runner.py:255} INFO - {job-runner}: Deleting objects: my-namespace/jobs/from-image-aaggosqx
[2020-11-10 10:14:49,770] {queries.py:57} INFO - [my-namespace/pods/from-image-aaggosqx-9jtkf]: Hello Python
[2020-11-10 10:14:49,856] {queries.py:57} INFO - [my-namespace/pods/from-image-aaggosqx-9jtkf]: Hello Python
[2020-11-10 10:14:49,858] {operations.py:62} INFO - [my-namespace/jobs/from-image-aaggosqx] deleted
[2020-11-10 10:14:49,861] {job_runner.py:255} INFO - {job-runner}: Job deleted
[2020-11-10 10:14:49,863] {job_runner.py:255} INFO - {job-runner}: Client stopped, execution completed.
[2020-11-10 10:14:49,903] {taskinstance.py:1065} INFO - Marking task as SUCCESS.dag_id=job-tester, task_id=from-image, execution_date=20201110T101429, start_date=20201110T101443, end_date=20201110T101449
[2020-11-10 10:14:53,589] {logging_mixin.py:112} INFO - [2020-11-10 10:14:53,588] {local_task_job.py:103} INFO - Task exited with return code 0

This feels like an issue where two watchers would be attached instead of one, but I'm not sure. Any idea?

FEATURE: Refresh aws token

Feature description

  • what is the functionality you are trying to add/what is the problem you are trying to solve?

So basically we are facing this issue located in the k8s python client:

kubernetes-client/python#741

This has been opened for more than one year. As far as I understand, once the client is instanced, there is no way to refresh it's credentials given that it caches them. Because of that, when credentials expire, client calls respond with 401. Meanwhile, people started to develop workarounds on their solutions. The most popular seems to be creating a new client with fresh credentials every time we need to call the client.

So basically that's what i'm proposing. Because this workaround could be ugly for people using the operator that is not suffering this issue, we should consider make some kind of feature toggling from operator params or something like that.

This is my naive proposal given my short knowledge on this but any other proposal would be appreciated.

  • what's the importance? what dose it affect?

Because, in our case, we can't run any job that last more than 15 minutes, so our pipelines crash and our pods weren't deleted and got staled (incrementing our infra costs since we are using aws fargate).

Describe alternatives you've considered
Try to catch 401 responses and refresh credentials there somehow.

Additional context

We are running everything on aws. We have MWAA (airflow 2.0) with dags running tasks like this:

 KubernetesLegacyJobOperator(
        task_id="task1",
        namespace="fargate",
        config_file=kube_config_yaml,
        get_logs=True,
        startup_timeout_seconds=300,
        body_filepath="/usr/local/airflow/dags/config/task1.yaml",
        dag=pipeline,
        is_delete_operator_pod=True,
        delete_policy="Always",
        execution_timeout=timedelta(hours=1)
    )

Yaml file looks like

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  annotations:
    kubernetes_job_operator.main_container: "spark-kubernetes-driver"
  name: "foo"
  namespace: fargate
spec:
  arguments:
  ...
  deps:
    files:
    - "local:///etc/spark/conf/default-logstash-fields.properties"
  driver:
    coreLimit: 2000m
    coreRequest: 1995m
    cores: 2
    env:
    ....
    initContainers:
    - command:
      - sh
      - "-c"
      - "echo hi"
      image: busybox
      name: "volume-mount-hack"
      volumeMounts:
      - mountPath: /tmp/committer
        name: "staging-vol"
        readOnly: false
    labels:
      metrics: prometheus
      version: '3.1.1'
    memory: 4086M
    podSecurityContext:
      fsGroup: 185
    serviceAccount: fargate
    sidecars:
    - command:
      - "/fluent-bit/bin/fluent-bit"
      - "-c"
      - "/tmp/fluent-bit/fluent-bit-custom.conf"
      image: "fluent/fluent-bit:1.7"
      name: "fluent-bit"
      resources:
        limits:
          cpu: 50m
          memory: 60Mi
        requests:
          cpu: 5m
          memory: 10Mi
      volumeMounts:
      - mountPath: "/tmp/spark-logs"
        name: "spark-logs"
        readOnly: false
      - mountPath: "/tmp/fluent-bit"
        name: "fluent-bit"
        readOnly: false
    volumeMounts:
    - mountPath: "/tmp/spark-logs"
      name: "spark-logs"
      readOnly: false
    - mountPath: "/tmp/fluent-bit"
      name: "fluent-bit"
      readOnly: false
    - mountPath: /tmp/committer
      name: "staging-vol"
      readOnly: false
  dynamicAllocation:
    enabled: true
    initialExecutors: 4
    maxExecutors: 4
    minExecutors: 2
  executor:
    coreLimit: 2000m
    coreRequest: 1995m
    cores: 2
    deleteOnTermination: true
    labels:
      metrics: prometheus
      version: '3.1.1'
    memory: 6134M
    podSecurityContext:
      fsGroup: 185
    serviceAccount: fargate
    sidecars:
    - command:
      - "/fluent-bit/bin/fluent-bit"
      - "-c"
      - "/tmp/fluent-bit/fluent-bit-custom.conf"
      image: "fluent/fluent-bit:1.7"
      name: "fluent-bit"
      resources:
        limits:
          cpu: 50m
          memory: 60Mi
        requests:
          cpu: 5m
          memory: 10Mi
      volumeMounts:
      - mountPath: "/tmp/spark-logs"
        name: "spark-logs"
        readOnly: false
      - mountPath: "/tmp/fluent-bit"
        name: "fluent-bit"
        readOnly: false
    volumeMounts:
    - mountPath: "/tmp/spark-logs"
      name: "spark-logs"
      readOnly: false
    - mountPath: "/tmp/fluent-bit"
      name: "fluent-bit"
      readOnly: false
    - mountPath: /tmp/committer
      name: "staging-vol"
      readOnly: false
  hadoopConf:
    fs.s3.maxRetries: '10'
    fs.s3a.aws.credentials.provider: ...
    fs.s3a.block.size: 64M
    fs.s3a.buffer.dir: /tmp/committer/buffer
    fs.s3a.committer.magic.enabled: 'false'
    fs.s3a.committer.name: partitioned
    fs.s3a.committer.staging.abort.pending.uploads: 'false'
    "fs.s3a.committer.staging.conflict-mode": replace
    fs.s3a.committer.staging.tmp.path: "file:///tmp/committer/staging"
    fs.s3a.connection.ssl.enabled: 'false'
    fs.s3a.experimental.fadvise: random
    fs.s3a.fast.upload.buffer: disk
    fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    fs.s3a.multipart.purge: 'false'
    fs.s3a.retry.throttle.interval: 10000ms
  image: "spark:3.1.1-v1.0.7"
  imagePullPolicy: IfNotPresent
  mainApplicationFile: "s3a://foo.jar"
  mainClass: com.foo.FooApp
  mode: cluster
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      ...
  restartPolicy:
    type: Never
  sparkConf:
    ...
  sparkConfigMap: "spark-conf-map-foo"
  sparkVersion: '3.1.1'
  template:
    metadata:
      labels:
        app: "foo-pod"
  type: Scala
  volumes:
  - emptyDir: {}
    name: "spark-logs"
  - configMap:
      name: "fluent-bit-conf-map"
    name: "fluent-bit"
  - name: "staging-vol"
    persistentVolumeClaim:
      claimName: "data-staging-share"

As you can see, we are running spark applications.

Here is the kube conf file:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: ***
  name: ****
contexts:
- context:
    cluster: ****
    user: ****
  name: aws
current-context: aws
kind: Config
preferences: {}
users:
- name: ****
  user:
    exec:
      apiVersion: ****
      args:
      - --region
      - ***
      - eks
      - get-token
      - --cluster-name
      - ****
      command: /usr/local/airflow/.local/bin/aws

Here is the way we get the token and the reason why it last for just 15 min

https://awscli.amazonaws.com/v2/documentation/api/latest/reference/eks/get-token.html

And here is one example for a failed client call log looks like:

[2021-09-15 10:31:15,682] {{client.py:483}} ERROR - Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 334, in query_loop
    collection_formats={},

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
    _preload_content, _request_timeout, _host)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
    _request_timeout=_request_timeout)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 420, in request
    body=body)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 270, in DELETE
    body=body)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 233, in request
    raise ApiException(http_resp=r)

kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '49ea69bb-f188-4620-aa0e-e4e57ba77e95', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Wed, 15 Sep 2021 10:31:15 GMT', 'Content-Length': '129'})
HTTP response body: {'kind': 'Status', 'apiVersion': 'v1', 'metadata': {}, 'status': 'Failure', 'message': 'Unauthorized', 'reason': 'Unauthorized', 'code': 401}



During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/zthreading/tasks.py", line 173, in _run_as_thread
    rslt = self.action(*args, **kwargs)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _exdcute_query
    self.query_loop(client)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 375, in query_loop
    raise err

airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.operations.DeleteNamespaceResource, Unauthorized: Unauthorized

Secrets not found

Describe the bug
When passing secrets the pod is unable to find it

Get pod logs when multiple containers are used

In short:

Hi,

We are using KubernetesLegacyJobOperator to manage our Spark-based applications with Airflow. In our setup we are allocating two containers (spark-kubernetes-driver and fluent-bit) within the same pod and this is causing the following error:

airflow_kubernetes_job_operator.kube_api.queries.GetPodLogs, Bad Request: a container name must be specified for pod application-name, choose one of: [spark-kubernetes-driver fluent-bit]

We managed to make it work by setting the value of the get_logs parameter to False, but we would like to have access to the logs in the airflow task.

Is there any way to get this? I looked for but I had no luck finding an answer.

Thanks in advance!

More information

A fragment of the stack trace:

kubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'cef97a1d-ee58-4bc1-b220-63338af4f3ee', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Mon, 26 Apr 2021 11:14:05 GMT', 'Content-Length': '259'})
HTTP response body: {'kind': 'Status', 'apiVersion': 'v1', 'metadata': {}, 'status': 'Failure', 'message': 'a container name must be specified for pod application-y761881y, choose one of: [spark-kubernetes-driver fluent-bit]', 'reason': 'BadRequest', 'code': 400}

Is KubernetesJobOperator compatible with Apache Airflow >= v2.0?

In short:

I would like to upgrade my MWAA environment to Apache Airflow 2.0.2 but there are some incompatibilities between KubernetesJobOperator and new version of providers on the imports of the operator.

More information

Let me show you an image about the errors.

image

No logs from pod

Hello.
There are no logs from jobs in out environment. 'get_logs' is True.
We use service account with some rights, may be we need more of it?

Logs

2022-10-06, 04:21:04 UTC] {taskinstance.py:1397} INFO - Executing <Task(KubernetesJobOperator): dbt_eds_daily> on 2022-10-05 04:21:00+00:00
[2022-10-06, 04:21:04 UTC] {standard_task_runner.py:52} INFO - Started process 28952 to run task
[2022-10-06, 04:21:04 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'dbt_daily_1', 'dbt_eds_daily', 'scheduled__2022-10-05T04:21:00+00:00', '--job-id', '260441', '--raw', '--subdir', 'DAGS_FOLDER/dags/eds/dbt_daily.py', '--cfg-path', '/tmp/tmprbn4so5r', '--error-file', '/tmp/tmpqbjlb0e3']
[2022-10-06, 04:21:04 UTC] {standard_task_runner.py:80} INFO - Job 260441: Subtask dbt_eds_daily
[2022-10-06, 04:21:04 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:528: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
  option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)

[2022-10-06, 04:21:04 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py:528: DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the sql_alchemy_conn option in [database] - the old setting has been used, but please update your config.
  option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)

[2022-10-06, 04:21:04 UTC] {task_command.py:371} INFO - Running <TaskInstance: dbt_daily_1.dbt_eds_daily scheduled__2022-10-05T04:21:00+00:00 [running]> on host airflow-etl-worker-1.airflow-etl-worker.dapa.svc.dapak
[2022-10-06, 04:21:04 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:202: AirflowContextDeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds }}' instead.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2022-10-06, 04:21:04 UTC] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:202: AirflowContextDeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
  warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))

[2022-10-06, 04:21:05 UTC] {taskinstance.py:1589} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dbt_daily_1
AIRFLOW_CTX_TASK_ID=dbt_eds_daily
AIRFLOW_CTX_EXECUTION_DATE=2022-10-05T04:21:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-10-05T04:21:00+00:00
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Executing context: dapak
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Executing cluster: dapak
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Started watcher for kinds: pod, service, job, deployment, configmap, secret
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Watching namespaces: dapa-jobs
[2022-10-06, 04:21:05 UTC] {job_runner.py:257} INFO - {job-runner}: Waiting for dapa-jobs/dbt-eds-daily to finish...
[2022-10-06, 04:21:05 UTC] {watchers.py:330} INFO - [dapa-jobs/jobs/dbt-eds-daily] Pending
[2022-10-06, 04:21:05 UTC] {watchers.py:330} INFO - [dapa-jobs/secrets/dbt-spark-profiles] Active
[2022-10-06, 04:21:05 UTC] {operations.py:62} INFO - [dapa-jobs/secrets/dbt-spark-profiles] created
[2022-10-06, 04:21:05 UTC] {operations.py:62} INFO - [dapa-jobs/jobs/dbt-eds-daily] created
[2022-10-06, 04:21:05 UTC] {watchers.py:330} INFO - [dapa-jobs/jobs/dbt-eds-daily] Running
[2022-10-06, 04:21:16 UTC] {watchers.py:330} INFO - [dapa-jobs/jobs/dbt-eds-daily] Succeeded
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Job Succeeded
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Deleting resources due to policy: IfSucceeded
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Deleting job..
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Deleting objects: dapa-jobs/jobs/dbt-eds-daily, dapa-jobs/secrets/dbt-spark-profiles
[2022-10-06, 04:21:16 UTC] {operations.py:62} INFO - [dapa-jobs/jobs/dbt-eds-daily] deleted
[2022-10-06, 04:21:16 UTC] {operations.py:62} INFO - [dapa-jobs/secrets/dbt-spark-profiles] deleted
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Job deleted
[2022-10-06, 04:21:16 UTC] {job_runner.py:257} INFO - {job-runner}: Client stopped, execution completed.
[2022-10-06, 04:21:16 UTC] {taskinstance.py:1415} INFO - Marking task as SUCCESS. dag_id=dbt_daily_1, task_id=dbt_eds_daily, execution_date=20221005T042100, start_date=20221006T042104, end_date=20221006T042116
[2022-10-06, 04:21:16 UTC] {local_task_job.py:156} INFO - Task exited with return code 0
[2022-10-06, 04:21:16 UTC] {local_task_job.py:273} INFO - 1 downstream tasks scheduled from follow-on schedule check

Versions (please complete the following information):

  • Airflow 2.3.3
  • K8s 1.22.8
  • KubernetesJobOperator 2.0.8

Additional context
K8s acls:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: {{ airflow_role_name }}
  namespace: {{ airflow_jobs_namespace }}
rules:
  - verbs:
      - create
      - get
      - delete
      - watch
      - list
      - patch
      - update
    apiGroups:
      - batch
    resources:
      - jobs
  - verbs:
      - create
      - get
      - delete
      - watch
      - list
      - patch
      - update
    apiGroups:
      - ''
    resources:
      - configmaps
  - verbs:
      - create
      - get
      - delete
      - watch
      - list
      - patch
      - update
    apiGroups:
      - ''
    resources:
      - secrets

Uses cases and testimonials

Feel free to write here how you use this operator, what was the case you needed it for, and if you can how it was implemented.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.