Git Product home page Git Product logo

prefect-kubernetes's People

Contributors

ahuang11 avatar bunchesofdonald avatar chrisguidry avatar dependabot[bot] avatar desertaxle avatar jawnsy avatar tardunge avatar tsaiian avatar urimandujano avatar willraphaelson avatar zangell44 avatar zanieb avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-kubernetes's Issues

Kubernetes Worker does not release sockets after Job completes

Expectation / Proposal

After a Job responsible for a flow run completes, TCP connections on the worker pod should close and release sockets. Instead, one TCP connection per flow run persists in state CLOSE_WAIT. Eventually, the worker pod will run out of sockets and flow runs will begin to fail during calls to create_namespaced_job.

Observed in prefect-kubernetes==0.3.1.

Traceback / Example

Failed to submit flow run 'c34aed0d-0396-424c-bc88-b8497b79ba63' to infrastructure. Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 174, in _new_conn conn = connection.create_connection( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/util/[connection.py](https://connection.py/)", line 95, in create_connection raise err File "/usr/local/lib/python3.11/site-packages/urllib3/util/[connection.py](https://connection.py/)", line 85, in create_connection sock.connect(sa) OSError: [Errno 99] Cannot assign requested address During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 715, in urlopen httplib_response = self._make_request( ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 404, in _make_request self._validate_conn(conn) File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 1058, in _validate_conn conn.connect() File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 363, in connect self.sock = conn = self._new_conn() ^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 186, in _new_conn raise NewConnectionError( urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x7f8e2acb28d0>: Failed to establish a new connection: [Errno 99] Cannot assign requested address During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/prefect/workers/[base.py](https://base.py/)", line 896, in _submit_run_and_capture_errors result = await self.run( ^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/[worker.py](https://worker.py/)", line 567, in run job = await run_sync_in_worker_thread( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect/utilities/[asyncutils.py](https://asyncutils.py/)", line 91, in run_sync_in_worker_thread return await anyio.to_thread.run_sync( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync return await get_asynclib().run_sync_in_worker_thread( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread return await future ^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run result = context.run(func, *args) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/[worker.py](https://worker.py/)", line 763, in _create_job job = batch_client.create_namespaced_job( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job return self.create_namespaced_job_with_http_info(namespace, body, **kwargs) # noqa: E501 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info return self.api_client.call_api( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api return self.__call_api(resource_path, method, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api response_data = self.request( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 391, in request return self.rest_client.POST(url, ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/[rest.py](https://rest.py/)", line 279, in POST return self.request("POST", url, ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/[rest.py](https://rest.py/)", line 172, in request r = self.pool_manager.request( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[request.py](https://request.py/)", line 81, in request return self.request_encode_body( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[request.py](https://request.py/)", line 173, in request_encode_body return self.urlopen(method, url, **extra_kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[poolmanager.py](https://poolmanager.py/)", line 376, in urlopen response = conn.urlopen(method, u.request_uri, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 799, in urlopen retries = retries.increment( ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/util/[retry.py](https://retry.py/)", line 592, in increment raise MaxRetryError(_pool, url, error or ResponseError(cause)) urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='172.20.0.1', port=443): Max retries exceeded with url: /apis/batch/v1/namespaces/prefect2-flows/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f8e2acb28d0>: Failed to establish a new connection: [Errno 99] Cannot assign requested address'))

This can be reproduced by starting a Kubernetes worker using the helm chart with all default configs, and then running some flows. After the flows complete, the output from running cat /proc/net/tcp | wc -l on the worker pod will eventually show to have increased by exactly the number of flow runs. Running cat /proc/net/tcp shows these connections in state 08:

23: 08013C0A:EBD2 0100400A:01BB 08 00000000:00000019 00:00000000 00000000  1001        0 137269 1 0000000000000000 20 4 12 10 -1                   
24: 08013C0A:9874 0100400A:01BB 08 00000000:00000019 00:00000000 00000000  1001        0 156620 1 0000000000000000 20 4 12 10 -1                   
25: 08013C0A:98D6 0100400A:01BB 08 00000000:00000019 00:00000000 00000000  1001        0 171986 1 0000000000000000 20 4 12 10 -1                   
26: 08013C0A:98EA 0100400A:01BB 08 00000000:00000019 00:00000000 00000000  1001        0 172205 1 0000000000000000 20 4 12 10 -1

from tcp_states.h:

enum {
    TCP_ESTABLISHED = 1,
    TCP_SYN_SENT,
    TCP_SYN_RECV,
    TCP_FIN_WAIT1,
    TCP_FIN_WAIT2,
    TCP_TIME_WAIT,
    TCP_CLOSE,
    TCP_CLOSE_WAIT,
    TCP_LAST_ACK,
    TCP_LISTEN,
    TCP_CLOSING,    /* Now a valid state */
    TCP_NEW_SYN_RECV,

    TCP_MAX_STATES  /* Leave at the end! */
};

08 is CLOSE_WAIT

The CLOSE_WAIT state indicates that the remote end of the connection has finished transmitting data and that the remote application has issued a close(2) or shutdown(2) call. The local TCP stack is now waiting for the local application that owns the socket to close(2) the local socket as well.

Here are some issues reporting the same behavior for async and multi-threaded applications that use the Python Kubernetes client:

Replicate kube events to Prefect that occur before the pod starts

Expectation / Proposal

Like agents on Prefect 1, when using Prefect 2 kube workers, make kube events that occur before the pod starts visible via Prefect.

At the moment all we get via Prefect is:

Worker 'KubernetesWorker edf7261c-e388-46f8-a0c5-9eb7de8c7c0f' submitting flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' 02:20:25 PM prefect.flow_runs.worker
Creating Kubernetes job... 02:20:26 PM prefect.flow_runs.worker
Completed submission of flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' 02:20:26 PM prefect.flow_runs.worker
Job 'beige-stingray-tzkrv': Pod has status 'Pending'. 02:20:27 PM prefect.flow_runs.worker
Job 'beige-stingray-tzkrv': Pod never started. 02:21:26 PM prefect.flow_runs.worker
Reported flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' as crashed: Flow run infrastructure exited with non-zero status code -1. 02:21:27 PM prefect.flow_runs.worker

It would be useful to get the kube events so that we can diagnose this from Prefect without having to use kubectl etc.

prefect-kubernetes 0.2.8

Traceback / Example

Example events that are available via kubectl but not prefect:

17m         Normal    SuccessfulCreate    job/beige-stingray-tzkrv              Created pod: beige-stingray-tzkrv-vglpm
16m         Normal    TriggeredScaleUp    pod/beige-stingray-tzkrv-vglpm        pod triggered scale-up: [{gpu-accelerated-us-east-1a 1->2 (max: 5)}]
16m         Warning   FailedScheduling    pod/beige-stingray-tzkrv-vglpm        0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 1 node(s) were unschedulable, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 7 Insufficient memory.
16m         Warning   FailedScheduling    pod/beige-stingray-tzkrv-vglpm        0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 1 node(s) were unschedulable, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 6 Insufficient memory.
15m         Warning   FailedScheduling    pod/beige-stingray-tzkrv-vglpm        0/51 nodes are available: 1 node(s) had taint {node.kubernetes.io/not-ready: }, that the pod didn't tolerate, 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 6 Insufficient memory.
15m         Warning   FailedScheduling    pod/beige-stingray-tzkrv-vglpm        0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 45 Insufficient cpu, 50 Insufficient nvidia.com/gpu, 6 Insufficient memory.
14m         Normal    Scheduled           pod/beige-stingray-tzkrv-vglpm        Successfully assigned awesome-app/beige-stingray-tzkrv-vglpm to ip-10-144-171-199.ec2.internal

support api key authentication for `KubernetesCredentials`

Instead of forcing users towards KubernetesClusterConfig to produce authenticated kubernetes clients, we should allow client authentication by updating client configuration headers with an API key as was done in 1.0 task library.

Support Custom Objects API

Expectation / Proposal

Add CustomObjectsAPI to mange crud operations on Kubernetes Custom Resource Definitions. This gives end users flexibility to create their own operator specific resources and watch over them.

Traceback / Example

'CoreV1Api' object has no attribute 'patch_namespaced_deployment'.

patch_namespaced_deployment is not working

Error message: 'CoreV1Api' object has no attribute 'patch_namespaced_deployment'.

Expectation / Proposal

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import patch_namespaced_deployment
from kubernetes.client.models import V1Deployment

@flow
def kubernetes_orchestrator():
    v1_deployment_metadata = patch_namespaced_deployment(
        kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
        deployment_name="test-deployment",
        deployment_updates=V1Deployment(metadata={"labels": {"foo": "bar"}}),
    )

Traceback / Example

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1581, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/deployments.py", line 186, in patch_namespaced_deployment
    core_v1_client.patch_namespaced_deployment,
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'CoreV1Api' object has no attribute 'patch_namespaced_deployment'

add `KubernetesSecret` functionality

In 1.0 task library, we had a task for retrieving kubernetes secrets KubernetesSecret. We'd like to recreate this functionality with a similar task for a 2.0 context

add `job` tasks

Migrate the create, delete, update, patch, read, replace job tasks from 1.0 task library

Worker raises InvalidChunkLength when replicating pod events

The KubernetesEventsReplicator._replicate_pod_events method occasionally raises the following exception:

urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

The exception happens randomly and is difficult to reproduce. A similar issue was raised related to streaming Kubernetes logs. This PR seems to have fixed the problem in that case. Hopefully the fix here is to use the same handling strategy in this part of the code.

Traceback / Example

Exception in thread Thread-5 (_replicate_pod_events):
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
    self.chunk_left = int(line, 16)
                      ^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
    raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.11/threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/events.py", line 93, in _replicate_pod_events
    for event in self._watch.stream(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

add `deployment` tasks

We would like to reach parity with the 1.0 task library as it relates to interacting with kubernetes deployments via this collection. The following tasks should be re-implemented as 2.0 tasks in deployments.py:

  • CreateNamespacedDeployment
  • DeleteNamespacedDeployment
  • ListNamespacedDeployment
  • PatchNamespacedDeployment
  • ReadNamespacedDeployment
  • ReplaceNamespacedDeployment

report ErrImagePull in Prefect UI to improve observability

Description

When using the KubernetesFlowRunner, and the agent fails to create a pod because of an invalid image, the UI shows "Pending" with no further indication of the failure.

Screen Shot 2022-04-18 at 3 39 54 pm

Reproduction / Example

from prefect import flow, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner


@flow
def test_flow() -> None:
    # show in prefect ui
    logger = get_run_logger()
    logger.info("Hello Prefect UI from Kubernetes!")


DeploymentSpec(
    name="test-deployment",
    flow=test_flow,
    flow_runner=KubernetesFlowRunner(image="localhost:5550/flow-this-does-not-exist", stream_output=True)
)

As expected, the pod fails to start. This can be seen via kubectl:

$ kubectl describe pod berserk-lionfish2b48r--1-9tqtq
Name:         berserk-lionfish2b48r--1-9tqtq
Namespace:    default
Priority:     0
Node:         k3d-orion-server-0/172.20.0.3
Start Time:   Mon, 18 Apr 2022 15:35:53 +1000
Labels:       controller-uid=359d3bbd-e5da-472b-bc48-5afeb42d6790
              job-name=berserk-lionfish2b48r
Annotations:  <none>
Status:       Pending
IP:           10.42.0.24
IPs:
  IP:           10.42.0.24
Controlled By:  Job/berserk-lionfish2b48r
Containers:
  job:
    Container ID:
    Image:         localhost:5550/flow-this-does-not-exist:latest
    Image ID:
    Port:          <none>
    Host Port:     <none>
    Command:
      python
      -m
      prefect.engine
      37dd67bb-0bb0-4c77-91f4-e31082ec415b
    State:          Waiting
      Reason:       ImagePullBackOff
    Ready:          False
    Restart Count:  0
    Environment:
      PREFECT_API_URL:  http://orion:4200/api
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-4mgps (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             False
  ContainersReady   False
  PodScheduled      True
Volumes:
  kube-api-access-4mgps:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
QoS Class:                   BestEffort
Node-Selectors:              <none>
Tolerations:                 node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                             node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type     Reason     Age                    From               Message
  ----     ------     ----                   ----               -------
  Normal   Scheduled  5m46s                  default-scheduler  Successfully assigned default/berserk-lionfish2b48r--1-9tqtq to k3d-orion-server-0
  Normal   Pulling    4m15s (x4 over 5m45s)  kubelet            Pulling image "localhost:5550/flow-this-does-not-exist:latest"
  Warning  Failed     4m15s (x4 over 5m45s)  kubelet            Failed to pull image "localhost:5550/flow-this-does-not-exist:latest": rpc error: code = Unknown desc = failed to pull and unpack image "localhost:5550/flow-this-does-not-exist:latest": failed to resolve reference "localhost:5550/flow-this-does-not-exist:latest": failed to do request: Head "http://localhost:5550/v2/flow/manifests/latest": dial tcp 127.0.0.1:5550: connect: connection refused
  Warning  Failed     4m15s (x4 over 5m45s)  kubelet            Error: ErrImagePull
  Warning  Failed     4m3s (x6 over 5m45s)   kubelet            Error: ImagePullBackOff
  Normal   BackOff    37s (x21 over 5m45s)   kubelet            Back-off pulling image "localhost:5550/flow-this-does-not-exist:latest"

Agent logs:

05:35:53.375 | INFO    | prefect.agent - Submitting flow run '37dd67bb-0bb0-4c77-91f4-e31082ec415b'
05:35:53.423 | INFO    | prefect.flow_runner.kubernetes - RUNNING
05:35:53.427 | INFO    | prefect.flow_runner.kubernetes - Flow run 'berserk-lionfish' has job settings = {'metadata': {'generateName': 'berserk-lionfish', 'namespace': 'default', 'labels': {'io.prefect.flow-run-id': '37dd67bb-0bb0-4c77-91f4-e31082ec415b', 'io.prefect.flow-run-name': 'berserk-lionfish', 'app': 'orion'}}, 'spec': {'template': {'spec': {'restartPolicy': 'Never', 'containers': [{'name': 'job', 'image': 'localhost:5550/flow:latest', 'command': ['python', '-m', 'prefect.engine', '37dd67bb-0bb0-4c77-91f4-e31082ec415b'], 'env': [{'name': 'PREFECT_API_URL', 'value': 'http://orion:4200/api'}]}]}}, 'backoff_limit': 4}}
05:35:53.470 | INFO    | prefect.agent - Completed submission of flow run '37dd67bb-0bb0-4c77-91f4-e31082ec415b'
05:35:53.481 | INFO    | prefect.flow_runner.kubernetes - Flow run job 'berserk-lionfish2b48r' has status {'active': None,
 'completed_indexes': None,
 'completion_time': None,
 'conditions': None,
 'failed': None,
 'ready': None,
 'start_time': None,
 'succeeded': None,
 'uncounted_terminated_pods': None}
05:35:53.481 | INFO    | prefect.flow_runner.kubernetes - Starting watch for pod to start. Job: berserk-lionfish2b48r
05:35:58.459 | ERROR   | prefect.flow_runner.kubernetes - Pod never started. Job: berserk-lionfish2b48r

Reporting the ErrImagePull in the Prefect UI, and setting the status to "Failed", would aid debugging and improve observability.

Pod logs are retrieved before job is completed

When invoking a job via run_namespaced_job, pod logs are being retrieved when the pod transitions away from "Pending",
but this is too early, as the next state is generally "Running". Capturing the logs at this point results in only grabbing the logs
up to that point in the execution, resulting in any log messages generated during the continued execution of the job being omitted.

Expectation / Proposal

All of the logs generated during the execution of the job are captured.

Logs should be captured when the job is completed, not when it transitions away from Pending.

Kubernetes job run OK but agent fails to get status : KeyError: โ€˜controller-uidโ€™

Kubernetes job run OK but agent fails to get status : KeyError: โ€˜controller-uidโ€™

Expectation / Proposal

from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.flows import run_namespaced_job # this is a flow
from prefect_kubernetes.jobs import KubernetesJob
import asyncio
k8s_creds = KubernetesCredentials.load("my-cerd")

job = KubernetesJob.from_yaml_file( # or create in the UI with a dict manifest
    credentials=k8s_creds,
    manifest_path="helloworld.yaml",
    namespace="my-ns"
)

if __name__ == "__main__":
    asyncio.run(run_namespaced_job(job))

Traceback / Example

03:18:41.197 | ERROR   | Flow run 'amazing-rottweiler' - Finished in state Failed("Flow run encountered an exception. KeyError: 'controller-uid'\n")
Traceback (most recent call last):
  File "/mnt/k8s_run_pod4.py", line 18, in <module>
    asyncio.run(run_namespaced_job(job))
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 160, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 258, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 700, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/flows.py", line 41, in run_namespaced_job
    await task(kubernetes_job_run.wait_for_completion.aio)(kubernetes_job_run)
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 160, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1168, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1581, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/jobs.py", line 419, in wait_for_completion
    "controller-uid=" f"{v1_job_status.metadata.labels['controller-uid']}"
                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
KeyError: 'controller-uid'

Related issue:

Support entirety of KubernetesJob Spec out of the box

Hello,

I found myself in the place that I basically have to write a worker and thus infrastructure block from scratch just to support the whole range of the KubernetesJob spec. I think this should be the default and I don't understand why the decision has been made to limit this library to only a very limited subset of options such as env, args, labels, name... Much of the flexibility of kubernetes is lost due to that, e.g. we cannot map secrets to the pod as environment variables, see #83.

Expectation / Proposal

I propose to add all the available options from the KubernetesJob spec to the KubernetesWorkerVariables, KubernetesWorkerJobConfiguration and KubernetesJob classes. I already did the majority of the work for myself and are happy to create a PR for this.

Traceback / Example

For example, the version of my KubernetesWorkerVariables used in my custom worker is simply extending the current class:

class KASWorkerVariables(KubernetesWorkerVariables):
    ports: list[Port] | None = Field(
        default=None,
        description="Ports exposed on the container",
    )
    labels: dict[str, str] | None = Field(
        default=_get_default_labels(),
        description="Container labels",
    )
    command: list[str] | None = Field(
        default=None,
        description="Container command",
    )
    args: list[str] | None = Field(
        default=None,
        description="Container arguments appended to container command",
    )
    volumes: list[Volume] | None = Field(
        default=None,
        description="List of volumes to expose",
    )
    affinity: Affinity | None = Field(
        default=None,
        description="Pod Affinity",
    )
    env_from: list[EnvRef] | None = Field(
        default=None,
        description="Kubernetes References that are mapped as environment variables in the KubernetesJob",
    )
    resources: Resources | None = Field(
        default=Resources(
            limits=Resource(memory="2Gi"),
            requests=Resource(cpu="1", memory="2Gi"),
        ),
        description="Pod Kubernetes Ressources",
    )
    annotations: dict[str, str] | None = Field(
        default=None,
        description="Pod's annotation",
    )
    volume_mounts: list[VolumeMount] | None = Field(
        default=None,
        description="List of volumes to mount into the pod",
    )
    image_pull_secrets: list[ImagePullSecret] | None = Field(
        default=[ImagePullSecret(name="my-pull-secret")],
        description="List of image pull secrets",
    )

You can see that in non-trivial cases I added Pedantic Models that mimmic the KubernetesJob spec, such as Resources, Port, Volume, VolumeMount, EnvRef, Affinity

add `service` tasks

We would like to reach parity with the 1.0 task library as it relates to interacting with kubernetes services via this collection. The following tasks should be re-implemented as 2.0 tasks in services.py:

  • CreateNamespacedService
  • DeleteNamespacedService
  • ListNamespacedService
  • PatchNamespacedService
  • ReadNamespacedService
  • ReplaceNamespacedService

multi run_namespaced_job funcs run togeter. If a job error ,others job will get error.

Code like this :

for i in job:
    log = run_namespaced_job(i)

If one job faild , the def wait_for_completion() of the jobs.py , self._completed = False , it counldn't become self._completed = True. this while will continue. Then it will get a error and retry the same job. But if it didn't delete the job , the kuberentes will get error like this HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"bidding-hunter-cjwiif\" already exists","reason":"AlreadyExists","details":{"name":"bidding-hunter-cjwiif","group":"batch","kind":"jobs"},"code":409}

Traceback / Example

Temporary fix this issues, i modify the jobs.py. Let the program exit , then it can run the other jobs

Kubernetes Worker stops polling work pool

Expectation / Proposal

I sometimes hit an issue where the kubernetes worker is running (deployed using Helm) and it suddenly stops polling the work pool. When I check the pod there is no logs indicating anything went wrong but it simply stops/hangs. It is likely related to #96 but not the same.

I'm using AKS witk k8 version 1.27.7

Moved over from PrefectHQ/prefect#11561.

running the command inside the worker pod

Version:             2.14.3
API version:         0.8.4
Python version:      3.11.6
Git commit:          f1ff9257
Built:               Thu, Nov 2, 2023 4:12 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Traceback / Example

Sometimes it recovers from the hanging and then I see this in the logs (where it apears that it has been hanging for 10 minuttes)

Exception in thread Thread-3 (_replicate_pod_events):
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
    self.chunk_left = int(line, 16)
                      ^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
    raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/events.py", line 93, in _replicate_pod_events
    for event in self._watch.stream(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
22:39:01.985 | ERROR   | prefect.flow_runs.worker - An error occurred while monitoring flow run 'f95bef47-c688-4fd1-85af-aa9177cae6b7'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
    self.chunk_left = int(line, 16)
                      ^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
    raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
    result = await self.run(
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 568, in run
    status_code = await run_sync_in_worker_thread(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 841, in _watch_job
    for event in watch.stream(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

Request: detect when jobs are blocked from starting due to k8s limits

Description

My k8s namespace has resource limits (max total CPU and RAM). If starting a Prefect job pod would go over those limits, it just never starts, and the flow on the UI hangs indefinitely. The agent logs still say Completed deployment of flow run. When I increased the limits, the job pod started immediately and the flow completed as normal.

Expected Behavior

It would be nice if this scenario could be detected and result in a useful error on the UI or agent level.

Cruft update

I think this collection might be outdated with cruft

Support Docker Image Registry Block for Kubernetes Worker

Expectation / Proposal

Kubernetes Workers do not easily support private registry images. A private image can be specified, but it's not possible to provide credentials through the default configuration, and it's necessary to dip into the advanced configuration, and reference credentials already stored on the cluster.

As a comparison, the Azure Container Instance worker supports this:
image

The expectation would be that I can specify a private image, and then supply an existing block for authentication :
image

This behavior is possible by creating a registry credential secret like here:
https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/

Then referencing this secret in the advanced config like this:

      "spec": {
        "template": {
          "spec": {
            "containers": [
              {
                "env": "{{ env }}",
                "args": "{{ command }}",
                "name": "prefect-job",
                "image": "{{ image }}",
                "imagePullPolicy": "{{ image_pull_policy }}"
              }
            ],
            "restartPolicy": "Never",
            "imagePullSecrets": [
              {
                "name": "regcred"
              }
            ],

Traceback / Example

`KubernetesWorker` does not respect `job_watch_timeout_seconds` in some cases

Expectation / Proposal

The KubernetesWorker does not respect job_watch_timeout_seconds when stream_output is True, but no logs are emitted by the flow run. The deadline for when a job has run to long is only checked when new logs are emitted which can lead to jobs running long past their expected timeout.

Traceback / Example

Kubernetes jobs' log couldn't print in prefect UI

Expectation / Proposal

I run a Kubernetes job useing func run_namespaced_job, but my prefect-ui couldn't show the job's log.

Traceback / Example

I saw the code of the jobs.py. I found this code. If my job interval_seconds is 5s , my pod successed in 10s, pending time is 2s and the pod log output in 7s, the first phase of interval_seconds my pod will jion the pod_logs.keys, but it didn't have the log, it will show like this {'bidding-hunter-test-tc6xs': ''} ,then the second phase of interval_seconds ,the pod has log, but it couldn't add the log to the dict.

for pod in v1_pod_list.items:
               pod_name = pod.metadata.name

               if pod.status.phase == "Pending" or pod_name in self.pod_logs.keys():
                   continue

add`run_namespaced_job` task

A relatively heavily used task from the 1.0 library, this task should be migrated into the 2.0 collection.

The interface should improve usability from 1.0 versions, allowing easier specification of Kubernetes Jobs.

Kubernetes worker hangs at the end of flow run execution

Expectation / Proposal

The Kubernetes worker hangs at the end of a flow run execution when tearing down the Event Replicator. Main thread gets blocked on this line when joining the replicator thread to the main thread. Adding a timeout to the .join() call prevents the blocking, but the replicator thread still hangs around, and then the worker hangs on shutdown.

Traceback / Example

add `pod` tasks

migrate the primary remaining tasks for interacting with pod resources, ensuring we have complete test coverage

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.