prefecthq / prefect-kubernetes Goto Github PK
View Code? Open in Web Editor NEWPrefect integrations for interacting with Kubernetes.
Home Page: https://prefecthq.github.io/prefect-kubernetes/
License: Apache License 2.0
Prefect integrations for interacting with Kubernetes.
Home Page: https://prefecthq.github.io/prefect-kubernetes/
License: Apache License 2.0
After a Job responsible for a flow run completes, TCP connections on the worker pod should close and release sockets. Instead, one TCP connection per flow run persists in state CLOSE_WAIT
. Eventually, the worker pod will run out of sockets and flow runs will begin to fail during calls to create_namespaced_job
.
Observed in prefect-kubernetes==0.3.1
.
Failed to submit flow run 'c34aed0d-0396-424c-bc88-b8497b79ba63' to infrastructure. Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 174, in _new_conn conn = connection.create_connection( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/util/[connection.py](https://connection.py/)", line 95, in create_connection raise err File "/usr/local/lib/python3.11/site-packages/urllib3/util/[connection.py](https://connection.py/)", line 85, in create_connection sock.connect(sa) OSError: [Errno 99] Cannot assign requested address During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 715, in urlopen httplib_response = self._make_request( ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 404, in _make_request self._validate_conn(conn) File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 1058, in _validate_conn conn.connect() File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 363, in connect self.sock = conn = self._new_conn() ^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 186, in _new_conn raise NewConnectionError( urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x7f8e2acb28d0>: Failed to establish a new connection: [Errno 99] Cannot assign requested address During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/prefect/workers/[base.py](https://base.py/)", line 896, in _submit_run_and_capture_errors result = await self.run( ^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/[worker.py](https://worker.py/)", line 567, in run job = await run_sync_in_worker_thread( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect/utilities/[asyncutils.py](https://asyncutils.py/)", line 91, in run_sync_in_worker_thread return await anyio.to_thread.run_sync( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync return await get_asynclib().run_sync_in_worker_thread( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread return await future ^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run result = context.run(func, *args) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/[worker.py](https://worker.py/)", line 763, in _create_job job = batch_client.create_namespaced_job( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job return self.create_namespaced_job_with_http_info(namespace, body, **kwargs) # noqa: E501 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info return self.api_client.call_api( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api return self.__call_api(resource_path, method, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api response_data = self.request( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 391, in request return self.rest_client.POST(url, ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/[rest.py](https://rest.py/)", line 279, in POST return self.request("POST", url, ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/[rest.py](https://rest.py/)", line 172, in request r = self.pool_manager.request( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[request.py](https://request.py/)", line 81, in request return self.request_encode_body( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[request.py](https://request.py/)", line 173, in request_encode_body return self.urlopen(method, url, **extra_kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[poolmanager.py](https://poolmanager.py/)", line 376, in urlopen response = conn.urlopen(method, u.request_uri, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 799, in urlopen retries = retries.increment( ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/util/[retry.py](https://retry.py/)", line 592, in increment raise MaxRetryError(_pool, url, error or ResponseError(cause)) urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='172.20.0.1', port=443): Max retries exceeded with url: /apis/batch/v1/namespaces/prefect2-flows/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f8e2acb28d0>: Failed to establish a new connection: [Errno 99] Cannot assign requested address'))
This can be reproduced by starting a Kubernetes worker using the helm chart with all default configs, and then running some flows. After the flows complete, the output from running cat /proc/net/tcp | wc -l
on the worker pod will eventually show to have increased by exactly the number of flow runs. Running cat /proc/net/tcp
shows these connections in state 08
:
23: 08013C0A:EBD2 0100400A:01BB 08 00000000:00000019 00:00000000 00000000 1001 0 137269 1 0000000000000000 20 4 12 10 -1
24: 08013C0A:9874 0100400A:01BB 08 00000000:00000019 00:00000000 00000000 1001 0 156620 1 0000000000000000 20 4 12 10 -1
25: 08013C0A:98D6 0100400A:01BB 08 00000000:00000019 00:00000000 00000000 1001 0 171986 1 0000000000000000 20 4 12 10 -1
26: 08013C0A:98EA 0100400A:01BB 08 00000000:00000019 00:00000000 00000000 1001 0 172205 1 0000000000000000 20 4 12 10 -1
from tcp_states.h:
enum {
TCP_ESTABLISHED = 1,
TCP_SYN_SENT,
TCP_SYN_RECV,
TCP_FIN_WAIT1,
TCP_FIN_WAIT2,
TCP_TIME_WAIT,
TCP_CLOSE,
TCP_CLOSE_WAIT,
TCP_LAST_ACK,
TCP_LISTEN,
TCP_CLOSING, /* Now a valid state */
TCP_NEW_SYN_RECV,
TCP_MAX_STATES /* Leave at the end! */
};
08
is CLOSE_WAIT
The CLOSE_WAIT state indicates that the remote end of the connection has finished transmitting data and that the remote application has issued a close(2) or shutdown(2) call. The local TCP stack is now waiting for the local application that owns the socket to close(2) the local socket as well.
Here are some issues reporting the same behavior for async and multi-threaded applications that use the Python Kubernetes client:
Like agents on Prefect 1, when using Prefect 2 kube workers, make kube events that occur before the pod starts visible via Prefect.
At the moment all we get via Prefect is:
Worker 'KubernetesWorker edf7261c-e388-46f8-a0c5-9eb7de8c7c0f' submitting flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' 02:20:25 PM prefect.flow_runs.worker
Creating Kubernetes job... 02:20:26 PM prefect.flow_runs.worker
Completed submission of flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' 02:20:26 PM prefect.flow_runs.worker
Job 'beige-stingray-tzkrv': Pod has status 'Pending'. 02:20:27 PM prefect.flow_runs.worker
Job 'beige-stingray-tzkrv': Pod never started. 02:21:26 PM prefect.flow_runs.worker
Reported flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' as crashed: Flow run infrastructure exited with non-zero status code -1. 02:21:27 PM prefect.flow_runs.worker
It would be useful to get the kube events so that we can diagnose this from Prefect without having to use kubectl
etc.
prefect-kubernetes 0.2.8
Example events that are available via kubectl but not prefect:
17m Normal SuccessfulCreate job/beige-stingray-tzkrv Created pod: beige-stingray-tzkrv-vglpm
16m Normal TriggeredScaleUp pod/beige-stingray-tzkrv-vglpm pod triggered scale-up: [{gpu-accelerated-us-east-1a 1->2 (max: 5)}]
16m Warning FailedScheduling pod/beige-stingray-tzkrv-vglpm 0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 1 node(s) were unschedulable, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 7 Insufficient memory.
16m Warning FailedScheduling pod/beige-stingray-tzkrv-vglpm 0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 1 node(s) were unschedulable, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 6 Insufficient memory.
15m Warning FailedScheduling pod/beige-stingray-tzkrv-vglpm 0/51 nodes are available: 1 node(s) had taint {node.kubernetes.io/not-ready: }, that the pod didn't tolerate, 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 6 Insufficient memory.
15m Warning FailedScheduling pod/beige-stingray-tzkrv-vglpm 0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 45 Insufficient cpu, 50 Insufficient nvidia.com/gpu, 6 Insufficient memory.
14m Normal Scheduled pod/beige-stingray-tzkrv-vglpm Successfully assigned awesome-app/beige-stingray-tzkrv-vglpm to ip-10-144-171-199.ec2.internal
Instead of forcing users towards KubernetesClusterConfig
to produce authenticated kubernetes clients, we should allow client authentication by updating client configuration headers with an API key as was done in 1.0 task library.
Add CustomObjectsAPI
to mange crud operations on Kubernetes Custom Resource Definitions. This gives end users flexibility to create their own operator specific resources and watch over them.
patch_namespaced_deployment
is not working
Error message: 'CoreV1Api' object has no attribute 'patch_namespaced_deployment'.
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import patch_namespaced_deployment
from kubernetes.client.models import V1Deployment
@flow
def kubernetes_orchestrator():
v1_deployment_metadata = patch_namespaced_deployment(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
deployment_name="test-deployment",
deployment_updates=V1Deployment(metadata={"labels": {"foo": "bar"}}),
)
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1581, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/deployments.py", line 186, in patch_namespaced_deployment
core_v1_client.patch_namespaced_deployment,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'CoreV1Api' object has no attribute 'patch_namespaced_deployment'
In 1.0 task library, we had a task for retrieving kubernetes secrets KubernetesSecret
. We'd like to recreate this functionality with a similar task for a 2.0 context
Migrate the create, delete, update, patch, read, replace job tasks from 1.0 task library
The KubernetesEventsReplicator._replicate_pod_events
method occasionally raises the following exception:
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
The exception happens randomly and is difficult to reproduce. A similar issue was raised related to streaming Kubernetes logs. This PR seems to have fixed the problem in that case. Hopefully the fix here is to use the same handling strategy in this part of the code.
Exception in thread Thread-5 (_replicate_pod_events):
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
self.chunk_left = int(line, 16)
^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
yield
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
self._update_chunk_length()
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.11/threading.py", line 975, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/events.py", line 93, in _replicate_pod_events
for event in self._watch.stream(
File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
for line in iter_resp_lines(resp):
File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
for seg in resp.stream(amt=None, decode_content=False):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
with self._error_catcher():
File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
We would like to reach parity with the 1.0 task library as it relates to interacting with kubernetes deployments via this collection. The following tasks should be re-implemented as 2.0 tasks in deployments.py
:
CreateNamespacedDeployment
DeleteNamespacedDeployment
ListNamespacedDeployment
PatchNamespacedDeployment
ReadNamespacedDeployment
ReplaceNamespacedDeployment
When using the KubernetesFlowRunner, and the agent fails to create a pod because of an invalid image, the UI shows "Pending" with no further indication of the failure.
from prefect import flow, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
@flow
def test_flow() -> None:
# show in prefect ui
logger = get_run_logger()
logger.info("Hello Prefect UI from Kubernetes!")
DeploymentSpec(
name="test-deployment",
flow=test_flow,
flow_runner=KubernetesFlowRunner(image="localhost:5550/flow-this-does-not-exist", stream_output=True)
)
As expected, the pod fails to start. This can be seen via kubectl:
$ kubectl describe pod berserk-lionfish2b48r--1-9tqtq
Name: berserk-lionfish2b48r--1-9tqtq
Namespace: default
Priority: 0
Node: k3d-orion-server-0/172.20.0.3
Start Time: Mon, 18 Apr 2022 15:35:53 +1000
Labels: controller-uid=359d3bbd-e5da-472b-bc48-5afeb42d6790
job-name=berserk-lionfish2b48r
Annotations: <none>
Status: Pending
IP: 10.42.0.24
IPs:
IP: 10.42.0.24
Controlled By: Job/berserk-lionfish2b48r
Containers:
job:
Container ID:
Image: localhost:5550/flow-this-does-not-exist:latest
Image ID:
Port: <none>
Host Port: <none>
Command:
python
-m
prefect.engine
37dd67bb-0bb0-4c77-91f4-e31082ec415b
State: Waiting
Reason: ImagePullBackOff
Ready: False
Restart Count: 0
Environment:
PREFECT_API_URL: http://orion:4200/api
Mounts:
/var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-4mgps (ro)
Conditions:
Type Status
Initialized True
Ready False
ContainersReady False
PodScheduled True
Volumes:
kube-api-access-4mgps:
Type: Projected (a volume that contains injected data from multiple sources)
TokenExpirationSeconds: 3607
ConfigMapName: kube-root-ca.crt
ConfigMapOptional: <nil>
DownwardAPI: true
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 5m46s default-scheduler Successfully assigned default/berserk-lionfish2b48r--1-9tqtq to k3d-orion-server-0
Normal Pulling 4m15s (x4 over 5m45s) kubelet Pulling image "localhost:5550/flow-this-does-not-exist:latest"
Warning Failed 4m15s (x4 over 5m45s) kubelet Failed to pull image "localhost:5550/flow-this-does-not-exist:latest": rpc error: code = Unknown desc = failed to pull and unpack image "localhost:5550/flow-this-does-not-exist:latest": failed to resolve reference "localhost:5550/flow-this-does-not-exist:latest": failed to do request: Head "http://localhost:5550/v2/flow/manifests/latest": dial tcp 127.0.0.1:5550: connect: connection refused
Warning Failed 4m15s (x4 over 5m45s) kubelet Error: ErrImagePull
Warning Failed 4m3s (x6 over 5m45s) kubelet Error: ImagePullBackOff
Normal BackOff 37s (x21 over 5m45s) kubelet Back-off pulling image "localhost:5550/flow-this-does-not-exist:latest"
Agent logs:
05:35:53.375 | INFO | prefect.agent - Submitting flow run '37dd67bb-0bb0-4c77-91f4-e31082ec415b'
05:35:53.423 | INFO | prefect.flow_runner.kubernetes - RUNNING
05:35:53.427 | INFO | prefect.flow_runner.kubernetes - Flow run 'berserk-lionfish' has job settings = {'metadata': {'generateName': 'berserk-lionfish', 'namespace': 'default', 'labels': {'io.prefect.flow-run-id': '37dd67bb-0bb0-4c77-91f4-e31082ec415b', 'io.prefect.flow-run-name': 'berserk-lionfish', 'app': 'orion'}}, 'spec': {'template': {'spec': {'restartPolicy': 'Never', 'containers': [{'name': 'job', 'image': 'localhost:5550/flow:latest', 'command': ['python', '-m', 'prefect.engine', '37dd67bb-0bb0-4c77-91f4-e31082ec415b'], 'env': [{'name': 'PREFECT_API_URL', 'value': 'http://orion:4200/api'}]}]}}, 'backoff_limit': 4}}
05:35:53.470 | INFO | prefect.agent - Completed submission of flow run '37dd67bb-0bb0-4c77-91f4-e31082ec415b'
05:35:53.481 | INFO | prefect.flow_runner.kubernetes - Flow run job 'berserk-lionfish2b48r' has status {'active': None,
'completed_indexes': None,
'completion_time': None,
'conditions': None,
'failed': None,
'ready': None,
'start_time': None,
'succeeded': None,
'uncounted_terminated_pods': None}
05:35:53.481 | INFO | prefect.flow_runner.kubernetes - Starting watch for pod to start. Job: berserk-lionfish2b48r
05:35:58.459 | ERROR | prefect.flow_runner.kubernetes - Pod never started. Job: berserk-lionfish2b48r
Reporting the ErrImagePull in the Prefect UI, and setting the status to "Failed", would aid debugging and improve observability.
When invoking a job via run_namespaced_job
, pod logs are being retrieved when the pod transitions away from "Pending",
but this is too early, as the next state is generally "Running". Capturing the logs at this point results in only grabbing the logs
up to that point in the execution, resulting in any log messages generated during the continued execution of the job being omitted.
All of the logs generated during the execution of the job are captured.
Logs should be captured when the job is completed, not when it transitions away from Pending.
Similar to PrefectHQ/prefect#8640
Kubernetes job run OK but agent fails to get status : KeyError: โcontroller-uidโ
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.flows import run_namespaced_job # this is a flow
from prefect_kubernetes.jobs import KubernetesJob
import asyncio
k8s_creds = KubernetesCredentials.load("my-cerd")
job = KubernetesJob.from_yaml_file( # or create in the UI with a dict manifest
credentials=k8s_creds,
manifest_path="helloworld.yaml",
namespace="my-ns"
)
if __name__ == "__main__":
asyncio.run(run_namespaced_job(job))
03:18:41.197 | ERROR | Flow run 'amazing-rottweiler' - Finished in state Failed("Flow run encountered an exception. KeyError: 'controller-uid'\n")
Traceback (most recent call last):
File "/mnt/k8s_run_pod4.py", line 18, in <module>
asyncio.run(run_namespaced_job(job))
File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 160, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 258, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 700, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/flows.py", line 41, in run_namespaced_job
await task(kubernetes_job_run.wait_for_completion.aio)(kubernetes_job_run)
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 160, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1168, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1581, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/jobs.py", line 419, in wait_for_completion
"controller-uid=" f"{v1_job_status.metadata.labels['controller-uid']}"
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
KeyError: 'controller-uid'
Related issue:
eg:
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: minio
key: root-user
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio
key: root-password
Hello,
I found myself in the place that I basically have to write a worker and thus infrastructure block from scratch just to support the whole range of the KubernetesJob spec. I think this should be the default and I don't understand why the decision has been made to limit this library to only a very limited subset of options such as env, args, labels, name... Much of the flexibility of kubernetes is lost due to that, e.g. we cannot map secrets to the pod as environment variables, see #83.
I propose to add all the available options from the KubernetesJob spec to the KubernetesWorkerVariables
, KubernetesWorkerJobConfiguration
and KubernetesJob
classes. I already did the majority of the work for myself and are happy to create a PR for this.
For example, the version of my KubernetesWorkerVariables
used in my custom worker is simply extending the current class:
class KASWorkerVariables(KubernetesWorkerVariables):
ports: list[Port] | None = Field(
default=None,
description="Ports exposed on the container",
)
labels: dict[str, str] | None = Field(
default=_get_default_labels(),
description="Container labels",
)
command: list[str] | None = Field(
default=None,
description="Container command",
)
args: list[str] | None = Field(
default=None,
description="Container arguments appended to container command",
)
volumes: list[Volume] | None = Field(
default=None,
description="List of volumes to expose",
)
affinity: Affinity | None = Field(
default=None,
description="Pod Affinity",
)
env_from: list[EnvRef] | None = Field(
default=None,
description="Kubernetes References that are mapped as environment variables in the KubernetesJob",
)
resources: Resources | None = Field(
default=Resources(
limits=Resource(memory="2Gi"),
requests=Resource(cpu="1", memory="2Gi"),
),
description="Pod Kubernetes Ressources",
)
annotations: dict[str, str] | None = Field(
default=None,
description="Pod's annotation",
)
volume_mounts: list[VolumeMount] | None = Field(
default=None,
description="List of volumes to mount into the pod",
)
image_pull_secrets: list[ImagePullSecret] | None = Field(
default=[ImagePullSecret(name="my-pull-secret")],
description="List of image pull secrets",
)
You can see that in non-trivial cases I added Pedantic Models that mimmic the KubernetesJob spec, such as Resources, Port, Volume, VolumeMount, EnvRef, Affinity
Add examples and explanations of how to use theKubernetesWorker
on the front page of the docs. Improve the documentation of the worker
module to include more detailed examples.
We would like to reach parity with the 1.0 task library as it relates to interacting with kubernetes services via this collection. The following tasks should be re-implemented as 2.0 tasks in services.py:
CreateNamespacedService
DeleteNamespacedService
ListNamespacedService
PatchNamespacedService
ReadNamespacedService
ReplaceNamespacedService
Code like this :
for i in job:
log = run_namespaced_job(i)
If one job faild , the def wait_for_completion() of the jobs.py , self._completed = False , it counldn't become self._completed = True. this while will continue. Then it will get a error and retry the same job. But if it didn't delete the job , the kuberentes will get error like this HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"bidding-hunter-cjwiif\" already exists","reason":"AlreadyExists","details":{"name":"bidding-hunter-cjwiif","group":"batch","kind":"jobs"},"code":409}
Temporary fix this issues, i modify the jobs.py. Let the program exit , then it can run the other jobs
I sometimes hit an issue where the kubernetes worker is running (deployed using Helm) and it suddenly stops polling the work pool. When I check the pod there is no logs indicating anything went wrong but it simply stops/hangs. It is likely related to #96 but not the same.
I'm using AKS witk k8 version 1.27.7
Moved over from PrefectHQ/prefect#11561.
Version: 2.14.3
API version: 0.8.4
Python version: 3.11.6
Git commit: f1ff9257
Built: Thu, Nov 2, 2023 4:12 PM
OS/Arch: linux/x86_64
Profile: default
Server type: cloud
Sometimes it recovers from the hanging and then I see this in the logs (where it apears that it has been hanging for 10 minuttes)
Exception in thread Thread-3 (_replicate_pod_events):
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
self.chunk_left = int(line, 16)
^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
yield
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
self._update_chunk_length()
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/events.py", line 93, in _replicate_pod_events
for event in self._watch.stream(
File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
for line in iter_resp_lines(resp):
File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
for seg in resp.stream(amt=None, decode_content=False):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
with self._error_catcher():
File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
22:39:01.985 | ERROR | prefect.flow_runs.worker - An error occurred while monitoring flow run 'f95bef47-c688-4fd1-85af-aa9177cae6b7'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
self.chunk_left = int(line, 16)
^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
yield
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
self._update_chunk_length()
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
result = await self.run(
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 568, in run
status_code = await run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 841, in _watch_job
for event in watch.stream(
File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
for line in iter_resp_lines(resp):
File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
for seg in resp.stream(amt=None, decode_content=False):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
with self._error_catcher():
File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
My k8s namespace has resource limits (max total CPU and RAM). If starting a Prefect job pod would go over those limits, it just never starts, and the flow on the UI hangs indefinitely. The agent logs still say Completed deployment of flow run
. When I increased the limits, the job pod started immediately and the flow completed as normal.
It would be nice if this scenario could be detected and result in a useful error on the UI or agent level.
I think this collection might be outdated with cruft
Kubernetes Workers do not easily support private registry images. A private image can be specified, but it's not possible to provide credentials through the default configuration, and it's necessary to dip into the advanced configuration, and reference credentials already stored on the cluster.
As a comparison, the Azure Container Instance worker supports this:
The expectation would be that I can specify a private image, and then supply an existing block for authentication :
This behavior is possible by creating a registry credential secret like here:
https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
Then referencing this secret in the advanced config like this:
"spec": {
"template": {
"spec": {
"containers": [
{
"env": "{{ env }}",
"args": "{{ command }}",
"name": "prefect-job",
"image": "{{ image }}",
"imagePullPolicy": "{{ image_pull_policy }}"
}
],
"restartPolicy": "Never",
"imagePullSecrets": [
{
"name": "regcred"
}
],
The KubernetesWorker
does not respect job_watch_timeout_seconds
when stream_output
is True
, but no logs are emitted by the flow run. The deadline for when a job has run to long is only checked when new logs are emitted which can lead to jobs running long past their expected timeout.
CPU and memory are not fields by default in the k8s workpool, however they were for the k8s infra block.
I run a Kubernetes job useing func run_namespaced_job, but my prefect-ui couldn't show the job's log.
I saw the code of the jobs.py. I found this code. If my job interval_seconds is 5s , my pod successed in 10s, pending time is 2s and the pod log output in 7s, the first phase of interval_seconds my pod will jion the pod_logs.keys, but it didn't have the log, it will show like this {'bidding-hunter-test-tc6xs': ''}
,then the second phase of interval_seconds ,the pod has log, but it couldn't add the log to the dict.
for pod in v1_pod_list.items:
pod_name = pod.metadata.name
if pod.status.phase == "Pending" or pod_name in self.pod_logs.keys():
continue
A relatively heavily used task from the 1.0 library, this task should be migrated into the 2.0 collection.
The interface should improve usability from 1.0 versions, allowing easier specification of Kubernetes Jobs.
The Kubernetes worker hangs at the end of a flow run execution when tearing down the Event Replicator. Main thread gets blocked on this line when joining the replicator thread to the main thread. Adding a timeout
to the .join()
call prevents the blocking, but the replicator thread still hangs around, and then the worker hangs on shutdown.
migrate the primary remaining tasks for interacting with pod resources, ensuring we have complete test coverage
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.