Git Product home page Git Product logo

prefect-soda-core's People

Contributors

alessandrolollo avatar dependabot[bot] avatar fakhavan avatar tintamarre avatar vijaykiran avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-soda-core's Issues

Retrieve Soda Core check outputs as JSON/Dict

Hi there,

I'm in the process of migrating from Prefect 1 to Prefect 2 and have been exploring the use of the Perfect 2 with your prefect-soda-core package. I'm impressed with the block abstraction in Prefect 2, and I noticed that your package has been updated to support this abstraction for running Soda checks.

However, I noticed that your package is using the shell command to run the Soda code, which means that the output of the jobs is only available in the stdout format. As I use this library independent of Soda Cloud, I prefer having a dictionary/JSON output that I can easily manipulate and use.

I think this could be achieved by changing your package to use and execute soda-core methods directly so that the task can return dictionary objects containing Soda check results.
Alternatively, an option to store the JSON-based output in a specified location using the --scan-results-file param from the soda-core library could be added.

I'd be happy to contribute to this change if it makes sense to you.
Thanks!

soda_logs used before being declared causes UnboundLocalError

          @AlessandroLollo The above issue has been fixed with this PR https://github.com/sodadata/prefect-soda-core/pull/21

But there is this new error

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1730, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 292, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 346, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_soda_core/tasks.py", line 148, in soda_scan_execute
    return soda_logs
           ^^^^^^^^^
UnboundLocalError: cannot access local variable 'soda_logs' where it is not associated with a value

Finished in state Failed("Task run encountered an exception UnboundLocalError: cannot access local variable 'soda_logs' where it is not associated with a value")

Originally posted by @DVD-99 in #18 (comment)

Error when an error occurs in some soda check

When all tests pass, no error occurs
Some libs used
prefect==2.4.2
prefect_shell==0.1.3
prefect-soda-core[postgres]
prefect-soda-core[athena]
soda-core
soda-core-postgres
soda-core-athena
MarkupSafe==2.0.1
PyAthena

[17:01:56]       c20 nulos [PASSED]
[17:01:56]       C20 deve ser maior ou igual a 0 [PASSED]
[17:01:56]       C40 nulos [PASSED]
[17:01:56]       C40 deve ser maior ou igual a 0 [PASSED]
[17:01:56]     db_maritimo [Duplicated] in cabotagem
[17:01:56]       CEs duplicados [PASSED]
[17:01:56]     db_maritimo [Today] in cabotagem
[17:01:56]       Sem registros criados na data de hoje [PASSED]
[17:01:56]       Data Operação null [PASSED]
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56] 2/30 checks WARNED: 
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56]     db_maritimo [Five] in cabotagem
[17:01:56]       tipo_fcl nulos [WARNED]
[17:01:56]         check_value: 1.03
[17:01:56]         row_count: 7749
[17:01:56]         missing_count: 80
[17:01:56]     db_maritimo [Duplicated] in cabotagem
[17:01:56]       Embarque duplicados (id_porto_carga,id_porto_origem,nrblconhecimento,lista_fcl iguais) [WARNED]
[17:01:56]         check_value: 2368
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56] Only 2 warnings. 0 failures. 0 errors. 28 pass.
[17:01:56] Sending results to Soda Cloud
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:02:03] Soda Cloud Trace: 7899457579506349419
05:02:03 PM
soda_scan_execute-1578bf38-0
Saving scan results to 2023-06-30T20:00:37.181228+00:00--soda_scan_execute-1578bf38-0.json
05:02:03 PM
soda_scan_execute-1578bf38-0
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1214, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 142, in soda_scan_execute
    raise e
  File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 134, in soda_scan_execute
    soda_logs = await shell_run_command.fn(
  File "/usr/local/lib/python3.8/site-packages/prefect_shell/commands.py", line 103, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 1:
Saving scan results to 2023-06-30T20:00:37.181228+00:00--soda_scan_execute-1578bf38-0.json
05:02:08 PM
soda_scan_execute-1578bf38-0
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 117, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 97, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "flows/Governanca/soda/soda_cabotagem_flow.py", line 26, in soda_cabotagem_captura_flow
    run_soda_scan('psql',
  File "/tmp/tmpsv7sxzh_prefect/flows/lib_classes/libsoda.py", line 30, in run_soda_scan
    return soda_scan_execute(
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 295, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 736, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 137, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 47, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 906, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/usr/local/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1214, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 142, in soda_scan_execute
    raise e
  File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 134, in soda_scan_execute
    soda_logs = await shell_run_command.fn(
  File "/usr/local/lib/python3.8/site-packages/prefect_shell/commands.py", line 103, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 1:

Issues with the example code

With the example code on the docs page, there are some issues I have ran into:

  1. Missing a comma on line 26 after, verbose=True
  2. Running the code at first gives an error: NameError: name 'get_run_context' is not defined

Pickling error realted to psycopg2 when running task soda_scan_execute against PostgreSQL/Redshift

def soda_scan_execute(

When running the new soda_scan_execute task, a pickle error is being raised by Prefect related to psycopg2.
The following is the traceback obtained by running the task locally against a PostgreSQL database

09:18:29.634 | INFO    | prefect.engine - Created flow run 'fiery-nightingale' for flow 'test-flow'
09:18:29.802 | INFO    | Flow run 'fiery-nightingale' - Created task run 'soda_scan_execute-1578bf38-0' for task 'soda_scan_execute'
09:18:29.803 | INFO    | Flow run 'fiery-nightingale' - Executing 'soda_scan_execute-1578bf38-0' immediately...
09:18:29.880 | ERROR   | Flow run 'fiery-nightingale' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 566, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/var/folders/f0/p934kz610pb_sssp_zsmc7qr0000gp/T/ipykernel_50931/1336752104.py", line 3, in test_flow
    return soda_scan_execute(data_source_name="soda_test", configuration=soda_configuration, checks=sodacl_check, disable_telemetry=True, verbose=True, variables=None)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 705, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 822, in create_task_run_then_submit
    return await future._result()
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/futures.py", line 227, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/task_runners.py", line 214, in submit
    result = await run_fn(**run_kwargs)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 963, in begin_task_run
    return await orchestrate_task_run(
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py", line 1081, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/states.py", line 129, in return_value_to_state
    return Completed(data=DataDocument.encode(serializer, result))
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/data.py", line 42, in encode
    blob = lookup_serializer(encoding).dumps(data, **kwargs)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/serializers.py", line 59, in dumps
    data_bytes = cloudpickle.dumps(data)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/alessandro.lollo/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'psycopg2.extensions.connection' object
09:18:29.882 | INFO    | Task run 'soda_scan_execute-1578bf38-0' - Crash detected! Execution was interrupted by an unexpected exception.
09:18:29.942 | ERROR   | Flow run 'fiery-nightingale' - Finished in state Failed('Flow run encountered an exception.')

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [35], in <cell line: 1>()
----> 1 result = test_flow()

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/flows.py:384, in Flow.__call__(self, return_state, *args, **kwargs)
    380 parameters = get_call_parameters(self.fn, args, kwargs)
    382 return_type = "state" if return_state else "result"
--> 384 return enter_flow_run_engine_from_flow_call(
    385     self, parameters, return_type=return_type
    386 )

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:149, in enter_flow_run_engine_from_flow_call(flow, parameters, return_type)
    145 if in_async_main_thread():
    146     # An event loop is already running and we must create a blocking portal to
    147     # run async code from this synchronous context
    148     with start_blocking_portal() as portal:
--> 149         return portal.call(begin_run)
    150 else:
    151     # An event loop is not running so we will create one
    152     return anyio.run(begin_run)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
    268 def call(
    269     self,
    270     func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
    271     *args: object
    272 ) -> T_Retval:
    273     """
    274     Call the given function in the event loop thread.
    275 
   (...)
    281 
    282     """
--> 283     return cast(T_Retval, self.start_task_soon(func, *args).result())

File ~/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
    216             else:
    217                 future.add_done_callback(callback)
--> 219             retval = await retval
    220 except self._cancelled_exc_class:
    221     future.cancel()

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/client.py:105, in inject_client.<locals>.with_injected_client(*args, **kwargs)
    103 async with client_context as client:
    104     kwargs.setdefault("client", client)
--> 105     return await fn(*args, **kwargs)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:228, in create_then_begin_flow_run(flow, parameters, return_type, client)
    226     return state
    227 elif return_type == "result":
--> 228     return state.result()
    229 else:
    230     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/states.py:145, in State.result(self, raise_on_failure)
    143 if (self.is_failed() or self.is_crashed()) and raise_on_failure:
    144     if isinstance(data, Exception):
--> 145         raise data
    146     elif isinstance(data, BaseException):
    147         warnings.warn(
    148             f"State result is a {type(data).__name__!r} type and is not safe "
    149             "to re-raise, it will be returned instead."
    150         )

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:566, in orchestrate_flow_run(flow, flow_run, parameters, interruptible, client, partial_flow_run_context)
    560             else:
    561                 run_sync = (
    562                     run_sync_in_interruptible_worker_thread
    563                     if interruptible or timeout_scope
    564                     else run_sync_in_worker_thread
    565                 )
--> 566                 result = await run_sync(flow_call)
    568         waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
    569             flow_run_context.task_run_futures, client=client
    570         )
    572 except Exception as exc:

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:56, in run_sync_in_worker_thread(__fn, *args, **kwargs)
     45 """
     46 Runs a sync function in a new worker thread so that the main thread's event loop
     47 is not blocked
   (...)
     53 thread may continue running — the outcome will just be ignored.
     54 """
     55 call = partial(__fn, *args, **kwargs)
---> 56 return await anyio.to_thread.run_sync(call, cancellable=True)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/to_thread.py:31, in run_sync(func, cancellable, limiter, *args)
     10 async def run_sync(
     11     func: Callable[..., T_Retval],
     12     *args: object,
     13     cancellable: bool = False,
     14     limiter: Optional[CapacityLimiter] = None
     15 ) -> T_Retval:
     16     """
     17     Call the given function with the given arguments in a worker thread.
     18 
   (...)
     29 
     30     """
---> 31     return await get_asynclib().run_sync_in_worker_thread(
     32         func, *args, cancellable=cancellable, limiter=limiter
     33     )

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread(func, cancellable, limiter, *args)
    935 context.run(sniffio.current_async_library_cvar.set, None)
    936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:867, in WorkerThread.run(self)
    865 exception: Optional[BaseException] = None
    866 try:
--> 867     result = context.run(func, *args)
    868 except BaseException as exc:
    869     exception = exc

Input In [34], in test_flow()
      1 @flow
      2 def test_flow():
----> 3     return soda_scan_execute(data_source_name="soda_test", configuration=soda_configuration, checks=sodacl_check, disable_telemetry=True, verbose=True, variables=None)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/tasks.py:294, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
    290 parameters = get_call_parameters(self.fn, args, kwargs)
    292 return_type = "state" if return_state else "result"
--> 294 return enter_task_run_engine(
    295     self,
    296     parameters=parameters,
    297     wait_for=wait_for,
    298     task_runner=SequentialTaskRunner(),
    299     return_type=return_type,
    300     mapped=False,
    301 )

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:705, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
    703 # Async or sync task run in sync flow run
    704 elif not flow_run_context.flow.isasync:
--> 705     return run_async_from_worker_thread(begin_run)
    707 # Sync task run in async flow run
    708 else:
    709     # Call out to the sync portal since we are not in a worker thread
    710     return flow_run_context.sync_portal.call(begin_run)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:136, in run_async_from_worker_thread(__fn, *args, **kwargs)
    131 """
    132 Runs an async function in the main thread's event loop, blocking the worker
    133 thread until completion
    134 """
    135 call = partial(__fn, *args, **kwargs)
--> 136 return anyio.from_thread.run(call)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/from_thread.py:49, in run(func, *args)
     46 except AttributeError:
     47     raise RuntimeError("This function can only be run from an AnyIO worker thread")
---> 49 return asynclib.run_async_from_thread(func, *args)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:970, in run_async_from_thread(func, *args)
    964 def run_async_from_thread(
    965     func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
    966 ) -> T_Retval:
    967     f: concurrent.futures.Future[T_Retval] = asyncio.run_coroutine_threadsafe(
    968         func(*args), threadlocals.loop
    969     )
--> 970     return f.result()

File ~/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
    444     raise CancelledError()
    445 elif self._state == FINISHED:
--> 446     return self.__get_result()
    447 else:
    448     raise TimeoutError()

File ~/.pyenv/versions/3.9.10/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:822, in create_task_run_then_submit(task, flow_run_context, parameters, wait_for, return_type, task_runner)
    820     return await future._wait()
    821 elif return_type == "result":
--> 822     return await future._result()
    823 else:
    824     raise ValueError(f"Invalid return type for task engine {return_type!r}.")

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/futures.py:227, in PrefectFuture._result(self, timeout, raise_on_failure)
    225 if not final_state:
    226     raise TimeoutError("Call timed out before task finished.")
--> 227 return final_state.result(raise_on_failure=raise_on_failure)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/states.py:145, in State.result(self, raise_on_failure)
    143 if (self.is_failed() or self.is_crashed()) and raise_on_failure:
    144     if isinstance(data, Exception):
--> 145         raise data
    146     elif isinstance(data, BaseException):
    147         warnings.warn(
    148             f"State result is a {type(data).__name__!r} type and is not safe "
    149             "to re-raise, it will be returned instead."
    150         )

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/task_runners.py:214, in SequentialTaskRunner.submit(self, task_run, run_key, run_fn, run_kwargs, asynchronous)
    204 async def submit(
    205     self,
    206     task_run: TaskRun,
   (...)
    211 ) -> PrefectFuture[R, A]:
    212     # Run the function immediately and store the result in memory
    213     try:
--> 214         result = await run_fn(**run_kwargs)
    215     except BaseException as exc:
    216         result = exception_to_crashed_state(exc)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:963, in begin_task_run(task, task_run, parameters, wait_for, result_filesystem, settings)
    957     raise RuntimeError(
    958         f"Cannot orchestrate task run '{task_run.id}'. "
    959         f"Failed to connect to API at {client.api_url}."
    960     ) from connect_error
    962 try:
--> 963     return await orchestrate_task_run(
    964         task=task,
    965         task_run=task_run,
    966         parameters=parameters,
    967         wait_for=wait_for,
    968         result_filesystem=result_filesystem,
    969         interruptible=interruptible,
    970         client=client,
    971     )
    972 except Abort:
    973     # Task run already completed, just fetch its state
    974     task_run = await client.read_task_run(task_run.id)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/engine.py:1081, in orchestrate_task_run(task, task_run, parameters, wait_for, result_filesystem, interruptible, client)
   1076     terminal_state = Failed(
   1077         message="Task run encountered an exception.",
   1078         data=DataDocument.encode("cloudpickle", exc),
   1079     )
   1080 else:
-> 1081     terminal_state = await return_value_to_state(
   1082         result, serializer="cloudpickle"
   1083     )
   1085     # for COMPLETED tasks, add the cache key and expiration
   1086     if terminal_state.is_completed():

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/states.py:129, in return_value_to_state(result, serializer)
    122     return State(
    123         type=new_state_type,
    124         message=message,
    125         data=DataDocument.encode(serializer, result),
    126     )
    128 # Otherwise, they just gave data and this is a completed result
--> 129 return Completed(data=DataDocument.encode(serializer, result))

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/orion/schemas/data.py:42, in DataDocument.encode(cls, encoding, data, **kwargs)
     36 """
     37 Create a new data document
     38 
     39 A serializer must be registered for the given `encoding`
     40 """
     41 # Dispatch encoding
---> 42 blob = lookup_serializer(encoding).dumps(data, **kwargs)
     44 inst = cls(blob=blob, encoding=encoding)
     45 inst._cache_data(data)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/prefect/serializers.py:59, in PickleSerializer.dumps(data)
     57 @staticmethod
     58 def dumps(data: Any) -> bytes:
---> 59     data_bytes = cloudpickle.dumps(data)
     61     return base64.encodebytes(data_bytes)

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
     69 with io.BytesIO() as file:
     70     cp = CloudPickler(
     71         file, protocol=protocol, buffer_callback=buffer_callback
     72     )
---> 73     cp.dump(obj)
     74     return file.getvalue()

File ~/.pyenv/versions/3.9.10/envs/playground/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py:633, in CloudPickler.dump(self, obj)
    631 def dump(self, obj):
    632     try:
--> 633         return Pickler.dump(self, obj)
    634     except RuntimeError as e:
    635         if "recursion" in e.args[0]:

TypeError: cannot pickle 'psycopg2.extensions.connection' object

Steps to reproduce:

  1. Start a local PostgreSQL database, i.e docker run -d --name soda_postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=soda postgres:latest
  2. Define Soda configuration inside configuration.yml
data_source soda_test:
  type: postgres
  connection:
    host: localhost
    port: 5432
    username: postgres
    password: postgres
    database: soda
  schema: information_schema
  1. Define Soda checks inside checks.yml
checks for tables:
- row_count = 0 
  1. Clone prefect-soda-core, checkout branch add-soda-scan-execute-task, then install prefect-soda-core[postgres] with pip install /path/to/prefect-soda-core[postgresql]
  2. Define a Prefect flow using the following:
from prefect_soda_core.tasks import soda_scan_execute
from prefect_soda_core.soda_configuration import SodaConfiguration
from prefect_soda_core.sodacl_check import SodaCLCheck
from prefect import flow

soda_configuration = SodaConfiguration(configuration_yaml_file="configuration.yml")
sodacl_check = SodaCLCheck(sodacl_yaml_file="checks.yml")

@flow
def test_flow():
    return soda_scan_execute(data_source_name="soda_test", configuration=soda_configuration, checks=sodacl_check, disable_telemetry=True, verbose=True, variables=None)

result = test_flow()
  1. Run the script and you should get a traceback similar to the one mentioned above.

cc @vijaykiran

Cannot pickle 'classmethod' object

Hello,

I'm getting this error with the latest prefect version (=2.13) while it works with the previous version.

The error happens when we try to import the prefect-soda-core class during the deployment of the flow:
prefect.exceptions.ScriptError: Script at 'src/my_soda_flow.py' encountered an exception: TypeError("cannot pickle 'classmethod' object")

I'll investigate on my side and let you know what I find.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.