Git Product home page Git Product logo

airflow-provider-grafana-loki's Introduction

Join Slack Community

Airflow

Airflow Grafana Loki Provider

Log Handler for pushing Airflow Task Log to Grafana Loki


This package provides Hook and LogHandler that integrates with Grafana Loki. LokiTaskLogHandler is a python log handler that handles and reads task instance logs. It extends airflow FileTaskHandler and uploads to and reads from Grafana Loki.

Installation

Install using pip

pip install airflow-provider-grafana-loki

Configuration Airflow to write logs to Grafana Loki

Airflow can be configured to read and write task logs in Grafana Loki. It uses an existing Airflow connection to read or write logs. If you don't have a connection properly setup, this process will fail.

Follow the steps below to enable Grafana Loki logging:

  1. Airflow's logging system requires a custom .py file to be located in the :envvar:PYTHONPATH, so that it's importable from Airflow. Start by creating a directory to store the config file, $AIRFLOW_HOME/config is recommended.
  2. Create empty files called $AIRFLOW_HOME/config/log_config.py and $AIRFLOW_HOME/config/__init__.py.
  3. Copy the contents of airflow/config_templates/airflow_local_settings.py into the log_config.py file created in Step 2.
  4. Customize the following portions of the template:
     elif REMOTE_BASE_LOG_FOLDER.startswith('loki'):
        LOKI_HANDLER: Dict[str, Dict[str, Union[str, bool]]] = {
            'task': {
                'class': 'grafana_loki_provider.log.loki_task_handler.LokiTaskHandler',
                'formatter': 'airflow',
                'name':"airflow_task",
                'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
                'filename_template': FILENAME_TEMPLATE
            },
        }

        DEFAULT_LOGGING_CONFIG['handlers'].update(LOKI_HANDLER)
    else:
        raise AirflowException(
            "Incorrect remote log configuration. Please check the configuration of option 'host' in "
            "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
            "'remote_base_log_folder' option in the 'logging' section."
        )




  1. Make sure a Grafana Loki (Loki) connection hook has been defined in Airflow. The hook should have read and write access to the Grafana Loki Api.

  2. Update $AIRFLOW_HOME/airflow.cfg to contain:


        [logging]
        remote_logging = True
        remote_base_log_folder = loki
        logging_config_class= log_config.DEFAULT_LOGGING_CONFIG
        remote_log_conn_id = <name of the Grafana Loki connection>
  1. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
  2. Verify that logs are showing up for newly executed tasks is showing up in Airflow UI.

in case you are using gevent worker class, you might face RecursionError: maximum recursion depth exceeded error while reading logs from Loki. please refer following issue for more info:gevent/gevent/#1016 apache/airflow/#9118

current workaround is to add monkey patching at the top of the airflow log settings file. in this above case, $AIRFLOW_HOME/config/log_config.py

eg:

""Airflow logging settings."""
from __future__ import annotations

import gevent.monkey
gevent.monkey.patch_all()
import os

Note: The provider is in active development stage. All sorts of feedback, and bug reports are welcome. I will try to addresss and resolve all issues to the best of my ability

Incase of any issue or you need any help, please feel free to open an issue.

Your contribution to the projects is highly appreciated and welcome.

airflow-provider-grafana-loki's People

Contributors

snjypl avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

airflow-provider-grafana-loki's Issues

Plugin does not work with Python 3.7 (cannot import name 'cached_property' from 'functools')

Hi,

I'm using Apache Airflow 2.3.3 with Python 3.7 and I get the following error when using this provider:

apache-airflow-airflow-worker-1 | File "/home/airflow/.local/lib/python3.7/site-packages/grafana_loki_provider/log/loki_task_handler.py", line 2, in
apache-airflow-airflow-worker-1 | ValueError: Cannot resolve 'grafana_loki_provider.log.loki_task_handler.LokiTaskHandler': cannot import name 'cached_property' from 'functools' (/usr/local/lib/python3.7/functools.py)
apache-airflow-airflow-worker-1 | LOGGING_CLASS_PATH = configure_logging()
apache-airflow-airflow-worker-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/logging_config.py", line 73, in configure_logging
apache-airflow-airflow-worker-1 | File "/home/airflow/.local/lib/python3.7/site-packages/airflow/logging_config.py", line 68, in configure_logging
apache-airflow-airflow-worker-1 | dictConfig(logging_config)
apache-airflow-airflow-worker-1 | File "/usr/local/lib/python3.7/logging/config.py", line 800, in dictConfig
apache-airflow-airflow-worker-1 | File "/usr/local/lib/python3.7/logging/config.py", line 571, in configure

This is expected as 'cached_property' has been introduced in Python 3.8.

Do you plan to add support for older Python versions (3.7 in my case)?

BTW. Great work! Makes integration with Loki super easy :)

Error viewing logs in UI

Hello @snjypl ,

I used this plugin with airflow 2.4.2 version. It worked fine. I am able to see logs in loki UI. I couldn't see the logs in airflow ui for dags and task when using remote logging as loki.

How I can get those logs from loki to airflow ui.

Please help me to get the solution.

Diffferent subdomain to query loki

The loki setup our infra team has is a bit different than usual. My logs reach loki correctly and are available for me to query in grafana correctly.
However in airflow UI when I go to a specific task instance and navigate to the logs screen then the request to loki to read times out. Turns out the infra team is routing read and writes to loki as two different services (for scalability, I think) the writes go to
https://loki-distributor.abccompany.net
however the reads must come from
https://loki-query-frontend.abccompany.net/loki/api/v1/query_range

I am not really sure how to go about solving it, but to me it seems like a second connection/ hook to be able to make a query for the logs to show up back in airflow.

Any suggestions.

TypeError: unsupported operand type(s) for +: 'NoneType' and 'datetime.timedelta'

Hi again,

I get the following error when retrieving logs in the UI:

apache-airflow-airflow-webserver-1   | [2022-12-21 08:02:36,557] {app.py:1456} ERROR - Exception on /get_logs_with_metadata [GET]
apache-airflow-airflow-webserver-1   | Traceback (most recent call last):
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 2077, in wsgi_app
apache-airflow-airflow-webserver-1   |     response = self.full_dispatch_request()
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 1525, in full_dispatch_request
apache-airflow-airflow-webserver-1   |     rv = self.handle_user_exception(e)
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 1523, in full_dispatch_request
apache-airflow-airflow-webserver-1   |     rv = self.dispatch_request()
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/flask/app.py", line 1509, in dispatch_request
apache-airflow-airflow-webserver-1   |     return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/www/auth.py", line 46, in decorated
apache-airflow-airflow-webserver-1   |     return func(*args, **kwargs)
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/www/decorators.py", line 80, in wrapper
apache-airflow-airflow-webserver-1   |     return f(*args, **kwargs)
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper
apache-airflow-airflow-webserver-1   |     return func(*args, session=session, **kwargs)
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/www/views.py", line 1577, in get_logs_with_metadata
apache-airflow-airflow-webserver-1   |     logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/log_reader.py", line 57, in read_log_chunks
apache-airflow-airflow-webserver-1   |     logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
apache-airflow-airflow-webserver-1   |   File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/log/file_task_handler.py", line 267, in read
apache-airflow-airflow-webserver-1   |     log, metadata = self._read(task_instance, try_number_element, metadata)
apache-airflow-airflow-webserver-1   |   File "/opt/airflow/plugins/grafana_loki_provider/log/loki_task_handler.py", line 119, in _read
apache-airflow-airflow-webserver-1   |     end = ti.end_date + timedelta(hours=1)
apache-airflow-airflow-webserver-1   | **TypeError: unsupported operand type(s) for +: 'NoneType' and 'datetime.timedelta'**

This is a running task so indeed end_date is not set yet. I guess in such a case we need to use the current date.

Add custom labels to pushed logs

Hi

Would it be possible to add custom labels to pushed logs, something configured in config?

our loki has a lot of stuff in it and would be difficult to find logs in.

What labels are added to pushed logs as is?

Date intervals in Loki queries too wide?

Our logs are taking quite a lot of time to load from Loki. I suppose it's caused by a very wide time range in which they are searched for in Loki:

start = ti.start_date - timedelta(days=15)
#if the task is running or queued, the task will not have end_date, in that
# case, we will use a resonable internal of 5 days

end_date = ti.end_date  or ti.start_date + timedelta(days=5)

end = end_date + timedelta(hours=1)

Why do we need to search for logs up to 15 days before task's start_date? Shouldn't it be just the start_date?

Also it would be nice to be able to parametrize the end_date for non-finished tasks (currently hard-coded to 5 days but that may be too much depending on the use case).

No module named 'airflow.providers.grafana_loki_provider'

Hello,

I tried to send logs from airflow to grafana loki, but I am getting this error:

ValueError: Cannot resolve 'grafana_loki_provider.log.loki_task_handler.LokiTaskHandler': No module named 'grafana_loki_provider'

Installed this module by pip install airflow-provider-grafana-loki and running standalone apache-airflow : 2.4.2 in my local

These changes I did in airflow.cfg file:

remote_logging = True
remote_base_log_folder = loki
logging_config_class= log_config.DEFAULT_LOGGING_CONFIG
remote_log_conn_id = http://localhost:3101/loki/api/v1/push

And this code snippet I added in log_config.py file:

    elif REMOTE_BASE_LOG_FOLDER.startswith('loki'):
       LOKI_HANDLER: Dict[str, Dict[str, Union[str, bool]]] = {
           'task': {
               'class': 'grafana_loki_provider.log.loki_task_handler.LokiTaskHandler',
               'formatter': 'airflow',
               'name':"airflow_task",
               'base_log_folder': str(os.path.expanduser(BASE_LOG_FOLDER)),
               'filename_template': FILENAME_TEMPLATE
           },
       }

       DEFAULT_LOGGING_CONFIG['handlers'].update(LOKI_HANDLER)
   else:
       raise AirflowException(
           "Incorrect remote log configuration. Please check the configuration of option 'host' in "
           "section 'elasticsearch' if you are using Elasticsearch. In the other case, "
           "'remote_base_log_folder' option in the 'logging' section."
       )

Please help me to understand if something I am missing.

Not pushing logs to Loki

Hi, I have attempted to use this provider on the 2.6.2 version of Airflow and it seems to me like it doesn't push the logs to Loki.

Once I open the logs view for a task, it loads local logs but I can see in the docker logs that it is trying to load it from Loki.

airflow-airflow-webserver-1  | [2023-07-08T06:39:49.086+0000] {loki_task_handler.py:134} INFO - loki log query params {'query': ' {dag_id="crm-elastic-dag",task_id="hello"}\n
      | json try_number="try_number",map_index="map_index",run_id="run_id"\n                    | try_number="1" and\n                      map_index="-1" and\n                      run_id="manual__2023-07-08T06:39:32.209086+00:00"\n                    | __error__!="JSONParserErr"\n                ', 'start': '2023-06-23T06:39:34.475096+00:00', 'end': '2023-07-08T07:39:34.724944+00:00', 'limit': 5000, 'direction': 'forward'}

Using the provider manually in a task works fine using it this way:

 from grafana_loki_provider.hooks.loki import LokiHook
 loki_hook = LokiHook(loki_conn_id=loki_conn_id)

Logs from task execution:

airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.041+0000] {scheduler_job_runner.py:412} INFO - 1 tasks up for execution:
airflow-airflow-scheduler-1  |  <TaskInstance: crm-elastic-dag.hello manual__2023-07-08T06:46:49.915800+00:00 [scheduled]>
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.041+0000] {scheduler_job_runner.py:480} INFO - DAG crm-elastic-dag has 0/16 running and queued tasks
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.042+0000] {scheduler_job_runner.py:587} INFO - Setting the following tasks to queued state:
airflow-airflow-scheduler-1  |  <TaskInstance: crm-elastic-dag.hello manual__2023-07-08T06:46:49.915800+00:00 [scheduled]>
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.046+0000] {scheduler_job_runner.py:625} INFO - Sending TaskInstanceKey(dag_id='crm-elastic-dag', task_id='hello', run_id='manual__2023-07-08T06:46:49.915800+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.047+0000] {base_executor.py:147} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'crm-elastic-dag', 'hello', 'manual__2023-07-08T06:46:49.915800+00:00', '--local', '--subdir', 'DAGS_FOLDER/crm-elastig-dag.py']
airflow-airflow-worker-1     | [2023-07-08 06:46:51,056: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[8a3bb8d5-9a63-4495-b234-ef7762a1a788] received
airflow-airflow-worker-1     | [2023-07-08 06:46:51,066: INFO/ForkPoolWorker-15] [8a3bb8d5-9a63-4495-b234-ef7762a1a788] Executing command in Celery: ['airflow', 'tasks', 'run', 'crm-elastic-dag', 'hello', 'manual__2023-07-08T06:46:49.915800+00:00', '--local', '--subdir', 'DAGS_FOLDER/crm-elastig-dag.py']
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.107+0000] {scheduler_job_runner.py:677} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='crm-elastic-dag', task_id='hello', run_id='manual__2023-07-08T06:46:49.915800+00:00', try_number=1, map_index=-1)
airflow-airflow-scheduler-1  | [2023-07-08T06:46:51.131+0000] {scheduler_job_runner.py:703} INFO - Setting external_id for <TaskInstance: crm-elastic-dag.hello manual__2023-07-08T06:46:49.915800+00:00 [queued]> to 8a3bb8d5-9a63-4495-b234-ef7762a1a788
airflow-airflow-worker-1     | [2023-07-08 06:46:51,151: INFO/ForkPoolWorker-15] Filling up the DagBag from /opt/airflow/dags/crm-elastig-dag.py
airflow-airflow-worker-1     | [2023-07-08 06:46:51,191: INFO/ForkPoolWorker-15] This is a log message
airflow-airflow-worker-1     | [2023-07-08 06:46:52,330: INFO/ForkPoolWorker-15] Running <TaskInstance: crm-elastic-dag.hello manual__2023-07-08T06:46:49.915800+00:00 [queued]> on host d5752f79742d
airflow-airflow-webserver-1  | 127.0.0.1 - - [08/Jul/2023:06:46:53 +0000] "GET /health HTTP/1.1" 200 243 "-" "curl/7.74.0"
airflow-airflow-worker-1     | [2023-07-08 06:46:53,394: INFO/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[8a3bb8d5-9a63-4495-b234-ef7762a1a788] succeeded in 2.335277110338211s: None
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.344+0000] {dagrun.py:616} INFO - Marking run <DagRun crm-elastic-dag @ 2023-07-08 06:46:49.915800+00:00: manual__2023-07-08T06:46:49.915800+00:00, state:running, queued_at: 2023-07-08 06:46:49.935860+00:00. externally triggered: True> successful
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.344+0000] {dagrun.py:682} INFO - DagRun Finished: dag_id=crm-elastic-dag, execution_date=2023-07-08 06:46:49.915800+00:00, run_id=manual__2023-07-08T06:46:49.915800+00:00, run_start_date=2023-07-08 06:46:50.980423+00:00, run_end_date=2023-07-08 06:46:54.344670+00:00, run_duration=3.364247, state=success, external_trigger=True, run_type=manual, data_interval_start=2023-07-07 00:00:00+00:00, data_interval_end=2023-07-08 00:00:00+00:00, dag_hash=c848848d668b428fd5345193e82ebc08
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.354+0000] {dag.py:3490} INFO - Setting next_dagrun for crm-elastic-dag to 2023-07-08T00:00:00+00:00, run_after=2023-07-09T00:00:00+00:00
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.391+0000] {scheduler_job_runner.py:677} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='crm-elastic-dag', task_id='hello', run_id='manual__2023-07-08T06:46:49.915800+00:00', try_number=1, map_index=-1)
airflow-airflow-scheduler-1  | [2023-07-08T06:46:54.398+0000] {scheduler_job_runner.py:733} INFO - TaskInstance Finished: dag_id=crm-elastic-dag, task_id=hello, run_id=manual__2023-07-08T06:46:49.915800+00:00, map_index=-1, run_start_date=2023-07-08 06:46:52.740642+00:00, run_end_date=2023-07-08 06:46:53.212918+00:00, run_duration=0.472276, state=success, executor_state=success, try_number=1, max_tries=0, job_id=67, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2023-07-08 06:46:51.043588+00:00, queued_by_job_id=63, pid=8798
airflow-airflow-webserver-1  | 172.29.0.23 - - [08/Jul/2023:06:46:55 +0000] "GET /get_logs_with_metadata?dag_id=crm-elastic-dag&task_id=hello&map_index=-1&execution_date=2023-07-08T06%3A39%3A32.209086%2B00%3A00&try_number=1&metadata=null HTTP/1.1" 200 2451 "https://airflow.local.lab.com/log?dag_id=crm-elastic-dag&task_id=hello&execution_date=2023-07-08T06%3A39%3A32.209086%2B00%3A00&map_index=-1" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36

Has anyone tried it recently for automatic pushing to Loki? Or maybe an example of a DAG task that works with it?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.