sodadata / prefect-soda-core Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
Hi there,
I'm in the process of migrating from Prefect 1 to Prefect 2 and have been exploring the use of the Perfect 2 with your prefect-soda-core
package. I'm impressed with the block abstraction in Prefect 2, and I noticed that your package has been updated to support this abstraction for running Soda checks.
However, I noticed that your package is using the shell command to run the Soda code, which means that the output of the jobs is only available in the stdout format. As I use this library independent of Soda Cloud, I prefer having a dictionary/JSON output that I can easily manipulate and use.
I think this could be achieved by changing your package to use and execute soda-core methods directly so that the task can return dictionary objects containing Soda check results.
Alternatively, an option to store the JSON-based output in a specified location using the --scan-results-file
param from the soda-core library could be added.
I'd be happy to contribute to this change if it makes sense to you.
Thanks!
@AlessandroLollo The above issue has been fixed with this PR https://github.com/sodadata/prefect-soda-core/pull/21
But there is this new error
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1730, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect_soda_core/tasks.py", line 148, in soda_scan_execute
return soda_logs
^^^^^^^^^
UnboundLocalError: cannot access local variable 'soda_logs' where it is not associated with a value
Finished in state Failed("Task run encountered an exception UnboundLocalError: cannot access local variable 'soda_logs' where it is not associated with a value")
Originally posted by @DVD-99 in #18 (comment)
When all tests pass, no error occurs
Some libs used
prefect==2.4.2
prefect_shell==0.1.3
prefect-soda-core[postgres]
prefect-soda-core[athena]
soda-core
soda-core-postgres
soda-core-athena
MarkupSafe==2.0.1
PyAthena
[17:01:56] c20 nulos [PASSED]
[17:01:56] C20 deve ser maior ou igual a 0 [PASSED]
[17:01:56] C40 nulos [PASSED]
[17:01:56] C40 deve ser maior ou igual a 0 [PASSED]
[17:01:56] db_maritimo [Duplicated] in cabotagem
[17:01:56] CEs duplicados [PASSED]
[17:01:56] db_maritimo [Today] in cabotagem
[17:01:56] Sem registros criados na data de hoje [PASSED]
[17:01:56] Data Operação null [PASSED]
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56] 2/30 checks WARNED:
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56] db_maritimo [Five] in cabotagem
[17:01:56] tipo_fcl nulos [WARNED]
[17:01:56] check_value: 1.03
[17:01:56] row_count: 7749
[17:01:56] missing_count: 80
[17:01:56] db_maritimo [Duplicated] in cabotagem
[17:01:56] Embarque duplicados (id_porto_carga,id_porto_origem,nrblconhecimento,lista_fcl iguais) [WARNED]
[17:01:56] check_value: 2368
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56] Only 2 warnings. 0 failures. 0 errors. 28 pass.
[17:01:56] Sending results to Soda Cloud
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:02:03] Soda Cloud Trace: 7899457579506349419
05:02:03 PM
soda_scan_execute-1578bf38-0
Saving scan results to 2023-06-30T20:00:37.181228+00:00--soda_scan_execute-1578bf38-0.json
05:02:03 PM
soda_scan_execute-1578bf38-0
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1214, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 142, in soda_scan_execute
raise e
File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 134, in soda_scan_execute
soda_logs = await shell_run_command.fn(
File "/usr/local/lib/python3.8/site-packages/prefect_shell/commands.py", line 103, in shell_run_command
raise RuntimeError(msg)
RuntimeError: Command failed with exit code 1:
Saving scan results to 2023-06-30T20:00:37.181228+00:00--soda_scan_execute-1578bf38-0.json
05:02:08 PM
soda_scan_execute-1578bf38-0
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 117, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 33, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 807, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 97, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "flows/Governanca/soda/soda_cabotagem_flow.py", line 26, in soda_cabotagem_captura_flow
run_soda_scan('psql',
File "/tmp/tmpsv7sxzh_prefect/flows/lib_classes/libsoda.py", line 30, in run_soda_scan
return soda_scan_execute(
File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 295, in __call__
return enter_task_run_engine(
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 736, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 137, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 47, in run
return asynclib.run_async_from_thread(func, *args)
File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 906, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
raise self._exception
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
return await future._result()
File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/usr/local/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1214, in orchestrate_task_run
result = await task.fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 142, in soda_scan_execute
raise e
File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 134, in soda_scan_execute
soda_logs = await shell_run_command.fn(
File "/usr/local/lib/python3.8/site-packages/prefect_shell/commands.py", line 103, in shell_run_command
raise RuntimeError(msg)
RuntimeError: Command failed with exit code 1:
With the example code on the docs page, there are some issues I have ran into:
verbose=True
NameError: name 'get_run_context' is not defined
When running the new soda_scan_execute
task, a pickle error is being raised by Prefect related to psycopg2
.
The following is the traceback obtained by running the task locally against a PostgreSQL database
09:18:29.634 | INFO | prefect.engine - Created flow run 'fiery-nightingale' for flow 'test-flow'
09:18:29.802 | INFO | Flow run 'fiery-nightingale' - Created task run 'soda_scan_execute-1578bf38-0' for task 'soda_scan_execute'
09:18:29.803 | INFO | Flow run 'fiery-nightingale' - Executing 'soda_scan_execute-1578bf38-0' immediately...
09:18:29.880 | ERROR | Flow run 'fiery-nightingale' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 566, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/var/folders/f0/p934kz610pb_sssp_zsmc7qr0000gp/T/ipykernel_50931/1336752104.py", line 3, in test_flow
return soda_scan_execute(data_source_name="soda_test", configuration=soda_configuration, checks=sodacl_check, disable_telemetry=True, verbose=True, variables=None)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/tasks.py", line 294, in __call__
return enter_task_run_engine(
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 705, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 822, in create_task_run_then_submit
return await future._result()
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/futures.py", line 227, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/task_runners.py", line 214, in submit
result = await run_fn(**run_kwargs)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 963, in begin_task_run
return await orchestrate_task_run(
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 1081, in orchestrate_task_run
terminal_state = await return_value_to_state(
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/states.py", line 129, in return_value_to_state
return Completed(data=DataDocument.encode(serializer, result))
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'psycopg2.extensions.connection' object
09:18:29.882 | INFO | Task run 'soda_scan_execute-1578bf38-0' - Crash detected! Execution was interrupted by an unexpected exception.
09:18:29.942 | ERROR | Flow run 'fiery-nightingale' - Finished in state Failed('Flow run encountered an exception.')
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [35], in <cell line: 1>()
----> 1 result = test_flow()
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/flows.py:384, in Flow.__call__(self, return_state, *args, **kwargs)
380 parameters = get_call_parameters(self.fn, args, kwargs)
382 return_type = "state" if return_state else "result"
--> 384 return enter_flow_run_engine_from_flow_call(
385 self, parameters, return_type=return_type
386 )
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:149, in enter_flow_run_engine_from_flow_call(flow, parameters, return_type)
145 if in_async_main_thread():
146 # An event loop is already running and we must create a blocking portal to
147 # run async code from this synchronous context
148 with start_blocking_portal() as portal:
--> 149 return portal.call(begin_run)
150 else:
151 # An event loop is not running so we will create one
152 return anyio.run(begin_run)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
268 def call(
269 self,
270 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
271 *args: object
272 ) -> T_Retval:
273 """
274 Call the given function in the event loop thread.
275
(...)
281
282 """
--> 283 return cast(T_Retval, self.start_task_soon(func, *args).result())
File ~/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
444 raise CancelledError()
445 elif self._state == FINISHED:
--> 446 return self.__get_result()
447 else:
448 raise TimeoutError()
File ~/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
216 else:
217 future.add_done_callback(callback)
--> 219 retval = await retval
220 except self._cancelled_exc_class:
221 future.cancel()
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/client.py:105, in inject_client.<locals>.with_injected_client(*args, **kwargs)
103 async with client_context as client:
104 kwargs.setdefault("client", client)
--> 105 return await fn(*args, **kwargs)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:228, in create_then_begin_flow_run(flow, parameters, return_type, client)
226 return state
227 elif return_type == "result":
--> 228 return state.result()
229 else:
230 raise ValueError(f"Invalid return type for flow engine {return_type!r}.")
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/states.py:145, in State.result(self, raise_on_failure)
143 if (self.is_failed() or self.is_crashed()) and raise_on_failure:
144 if isinstance(data, Exception):
--> 145 raise data
146 elif isinstance(data, BaseException):
147 warnings.warn(
148 f"State result is a {type(data).__name__!r} type and is not safe "
149 "to re-raise, it will be returned instead."
150 )
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:566, in orchestrate_flow_run(flow, flow_run, parameters, interruptible, client, partial_flow_run_context)
560 else:
561 run_sync = (
562 run_sync_in_interruptible_worker_thread
563 if interruptible or timeout_scope
564 else run_sync_in_worker_thread
565 )
--> 566 result = await run_sync(flow_call)
568 waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
569 flow_run_context.task_run_futures, client=client
570 )
572 except Exception as exc:
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:56, in run_sync_in_worker_thread(__fn, *args, **kwargs)
45 """
46 Runs a sync function in a new worker thread so that the main thread's event loop
47 is not blocked
(...)
53 thread may continue running — the outcome will just be ignored.
54 """
55 call = partial(__fn, *args, **kwargs)
---> 56 return await anyio.to_thread.run_sync(call, cancellable=True)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/to_thread.py:31, in run_sync(func, cancellable, limiter, *args)
10 async def run_sync(
11 func: Callable[..., T_Retval],
12 *args: object,
13 cancellable: bool = False,
14 limiter: Optional[CapacityLimiter] = None
15 ) -> T_Retval:
16 """
17 Call the given function with the given arguments in a worker thread.
18
(...)
29
30 """
---> 31 return await get_asynclib().run_sync_in_worker_thread(
32 func, *args, cancellable=cancellable, limiter=limiter
33 )
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread(func, cancellable, limiter, *args)
935 context.run(sniffio.current_async_library_cvar.set, None)
936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:867, in WorkerThread.run(self)
865 exception: Optional[BaseException] = None
866 try:
--> 867 result = context.run(func, *args)
868 except BaseException as exc:
869 exception = exc
Input In [34], in test_flow()
1 @flow
2 def test_flow():
----> 3 return soda_scan_execute(data_source_name="soda_test", configuration=soda_configuration, checks=sodacl_check, disable_telemetry=True, verbose=True, variables=None)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/tasks.py:294, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
290 parameters = get_call_parameters(self.fn, args, kwargs)
292 return_type = "state" if return_state else "result"
--> 294 return enter_task_run_engine(
295 self,
296 parameters=parameters,
297 wait_for=wait_for,
298 task_runner=SequentialTaskRunner(),
299 return_type=return_type,
300 mapped=False,
301 )
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:705, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
703 # Async or sync task run in sync flow run
704 elif not flow_run_context.flow.isasync:
--> 705 return run_async_from_worker_thread(begin_run)
707 # Sync task run in async flow run
708 else:
709 # Call out to the sync portal since we are not in a worker thread
710 return flow_run_context.sync_portal.call(begin_run)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:136, in run_async_from_worker_thread(__fn, *args, **kwargs)
131 """
132 Runs an async function in the main thread's event loop, blocking the worker
133 thread until completion
134 """
135 call = partial(__fn, *args, **kwargs)
--> 136 return anyio.from_thread.run(call)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/from_thread.py:49, in run(func, *args)
46 except AttributeError:
47 raise RuntimeError("This function can only be run from an AnyIO worker thread")
---> 49 return asynclib.run_async_from_thread(func, *args)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:970, in run_async_from_thread(func, *args)
964 def run_async_from_thread(
965 func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
966 ) -> T_Retval:
967 f: concurrent.futures.Future[T_Retval] = asyncio.run_coroutine_threadsafe(
968 func(*args), threadlocals.loop
969 )
--> 970 return f.result()
File ~/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
444 raise CancelledError()
445 elif self._state == FINISHED:
--> 446 return self.__get_result()
447 else:
448 raise TimeoutError()
File ~/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:822, in create_task_run_then_submit(task, flow_run_context, parameters, wait_for, return_type, task_runner)
820 return await future._wait()
821 elif return_type == "result":
--> 822 return await future._result()
823 else:
824 raise ValueError(f"Invalid return type for task engine {return_type!r}.")
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/futures.py:227, in PrefectFuture._result(self, timeout, raise_on_failure)
225 if not final_state:
226 raise TimeoutError("Call timed out before task finished.")
--> 227 return final_state.result(raise_on_failure=raise_on_failure)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/states.py:145, in State.result(self, raise_on_failure)
143 if (self.is_failed() or self.is_crashed()) and raise_on_failure:
144 if isinstance(data, Exception):
--> 145 raise data
146 elif isinstance(data, BaseException):
147 warnings.warn(
148 f"State result is a {type(data).__name__!r} type and is not safe "
149 "to re-raise, it will be returned instead."
150 )
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/task_runners.py:214, in SequentialTaskRunner.submit(self, task_run, run_key, run_fn, run_kwargs, asynchronous)
204 async def submit(
205 self,
206 task_run: TaskRun,
(...)
211 ) -> PrefectFuture[R, A]:
212 # Run the function immediately and store the result in memory
213 try:
--> 214 result = await run_fn(**run_kwargs)
215 except BaseException as exc:
216 result = exception_to_crashed_state(exc)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:963, in begin_task_run(task, task_run, parameters, wait_for, result_filesystem, settings)
957 raise RuntimeError(
958 f"Cannot orchestrate task run '{task_run.id}'. "
959 f"Failed to connect to API at {client.api_url}."
960 ) from connect_error
962 try:
--> 963 return await orchestrate_task_run(
964 task=task,
965 task_run=task_run,
966 parameters=parameters,
967 wait_for=wait_for,
968 result_filesystem=result_filesystem,
969 interruptible=interruptible,
970 client=client,
971 )
972 except Abort:
973 # Task run already completed, just fetch its state
974 task_run = await client.read_task_run(task_run.id)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:1081, in orchestrate_task_run(task, task_run, parameters, wait_for, result_filesystem, interruptible, client)
1076 terminal_state = Failed(
1077 message="Task run encountered an exception.",
1078 data=DataDocument.encode("cloudpickle", exc),
1079 )
1080 else:
-> 1081 terminal_state = await return_value_to_state(
1082 result, serializer="cloudpickle"
1083 )
1085 # for COMPLETED tasks, add the cache key and expiration
1086 if terminal_state.is_completed():
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/states.py:129, in return_value_to_state(result, serializer)
122 return State(
123 type=new_state_type,
124 message=message,
125 data=DataDocument.encode(serializer, result),
126 )
128 # Otherwise, they just gave data and this is a completed result
--> 129 return Completed(data=DataDocument.encode(serializer, result))
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/data.py:42, in DataDocument.encode(cls, encoding, data, **kwargs)
36 """
37 Create a new data document
38
39 A serializer must be registered for the given `encoding`
40 """
41 # Dispatch encoding
---> 42 blob = lookup_serializer(encoding).dumps(data, **kwargs)
44 inst = cls(blob=blob, encoding=encoding)
45 inst._cache_data(data)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/serializers.py:59, in PickleSerializer.dumps(data)
57 @staticmethod
58 def dumps(data: Any) -> bytes:
---> 59 data_bytes = cloudpickle.dumps(data)
61 return base64.encodebytes(data_bytes)
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
69 with io.BytesIO() as file:
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py:633, in CloudPickler.dump(self, obj)
631 def dump(self, obj):
632 try:
--> 633 return Pickler.dump(self, obj)
634 except RuntimeError as e:
635 if "recursion" in e.args[0]:
TypeError: cannot pickle 'psycopg2.extensions.connection' object
Steps to reproduce:
docker run -d --name soda_postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=soda postgres:latest
configuration.yml
data_source soda_test:
type: postgres
connection:
host: localhost
port: 5432
username: postgres
password: postgres
database: soda
schema: information_schema
checks.yml
checks for tables:
- row_count = 0
prefect-soda-core
, checkout branch add-soda-scan-execute-task
, then install prefect-soda-core[postgres]
with pip install /path/to/prefect-soda-core[postgresql]
from prefect_soda_core.tasks import soda_scan_execute
from prefect_soda_core.soda_configuration import SodaConfiguration
from prefect_soda_core.sodacl_check import SodaCLCheck
from prefect import flow
soda_configuration = SodaConfiguration(configuration_yaml_file="configuration.yml")
sodacl_check = SodaCLCheck(sodacl_yaml_file="checks.yml")
@flow
def test_flow():
return soda_scan_execute(data_source_name="soda_test", configuration=soda_configuration, checks=sodacl_check, disable_telemetry=True, verbose=True, variables=None)
result = test_flow()
cc @vijaykiran
Hello,
I'm getting this error with the latest prefect version (=2.13) while it works with the previous version.
The error happens when we try to import the prefect-soda-core class during the deployment of the flow:
prefect.exceptions.ScriptError: Script at 'src/my_soda_flow.py' encountered an exception: TypeError("cannot pickle 'classmethod' object")
I'll investigate on my side and let you know what I find.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.