Each of the individual arrakis processing stages are currently a dask based pipeline. The workflow is constructed using dask.delayed
functions, which are used to build a graph that is then traversed by dask workers.
The previous prefect
version 1 pipeline has been updated to version 2. This pipeline sequentially calls each of the individual dask workflows. The intent is that some of the prefect 2 features (agents for remotely starting pipelines, RESTful API) can be leveraged when full racs processing commences.
We are creating a single Client
/Cluster
pair which is supplied to the main prefect workflow, and are relying on the prefect_dask.get_dask_client
context manager to pass this compute resource manager through the to dask workflows started by each of the separate arrakis stages.
This setup though is losing information emitted by the arrakis
logger. The supported methods to attach prefect log handlers to a module (which are defined via an export PREFECT_LOGGING_EXTRA_LOGGERS="arrakis
or prefect logging configuration file) are not being properly propagated through to the code being executed with the @dask.delayed
functions. Similarly, the logs that should be printed to stdout
in these functions are not being printed to stdout
. This makes debugging difficult. Grrr.
I have tried to hijack the arrakis
logger when the delayed functions are being executed and manually attach the prefect orion logger handler. However, by this point there is not prefect context that can be obtained (e.g. the flowrun/taskrun id). Unless we want to pass this context information into the dask workflow when we enter some main, as I understand things, relying on the prefect helpers (e.g. get_run_context
and get_run_logger
) is not possible.
The way I have managed to get information out of the encapsulated dask workflows is to retrieve the distributed.worker
logger and use that, similar to this below.
@delayed
def cutout(...):
logger = logging.getLogger('distributed.worker')
logger.setLevel(logging.INFO)
seems to work reliably, although it emits messages using the distributed.worker
formatter.
And in typing up all this as a MWE I think this works just as well
from arrakis.logger import logger
@delayed
def cutout(...):
logger.setLevel(logging.INFO)
The real gotcha in all of this seems to be the logging level. There needs to be a setLevel(logging.INFO)
inside the dask.delayed
functions - putting it at the top level of each function does not seem to work. The logger
object created in arrakis.logger
also has this level set. All this is to say I am not sure why this is needed. It might be related to how serialisation works? As the dask workers are scaled up and down with the Cluster.adapt
method, there might be a need to reset this often?
Not sure where the problem is coming through.