mendhak / airflow-ms-teams-operator Goto Github PK
View Code? Open in Web Editor NEWAirflow operator that can send messages to MS Teams
Home Page: https://code.mendhak.com/Airflow-MS-Teams-Operator/
Airflow operator that can send messages to MS Teams
Home Page: https://code.mendhak.com/Airflow-MS-Teams-Operator/
With the current code I get a warning and an error when I try to execute it:
[2023-11-27, 21:14:48 UTC] {warnings.py:109} WARNING - /opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py:394: AirflowProviderDeprecationWarning: Class
SimpleHttpOperator is deprecated and will be removed in a future release. Please use ``HttpOperator`` instead.
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1544, in _run_finished_callback
callback(context)
File "/home/airflow/gcs/dags/LikeIT/src/dags_utils.py", line 19, in send_teams_message
teams_notification = MSTeamsWebhookOperator(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 394, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/decorators.py", line 54, in wrapper
return func(*args, **kwargs)
File "/home/airflow/gcs/dags/LikeIT/src/ms_teams_webhook_operator.py", line 74, in __init__
self.hook = None
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 981, in __setattr__
super().__setattr__(key, value)
AttributeError: can't set attribute
Fixed already, will create a PR with the changes.
Hi I'm trying to follow your example but without success. The logs says it is trying to POST to webhook URL, but then right after it says that GET is not allowed. Any advice?
Task definition
notify_teams_success = MSTeamsWebhookOperator(
task_id='ms_team_notification',
http_conn_id='msteams_webhook_url',
message = f"Airflow: Dag {dag_name} finished succesfully.",
subtitle = "What a **great** achievement.",
theme_color = "00FF00",
button_text = "Link to Airflow UI",
button_url = "http://localhost:8080/admin/airflow/tree",
#proxy = "https://yourproxy.domain:3128/",
dag=dag
)
Log
[2021-07-06 13:59:37,184] {base.py:78} INFO - Using connection to: id: msteams_webhook_url. Host: maersk.webhook.office.com/webhookb2/xxx, Port: None, Schema: http, Login: , Password: None, extra: {}
[2021-07-06 13:59:37,252] {base.py:78} INFO - Using connection to: id: msteams_webhook_url. Host: maersk.webhook.office.com/webhookb2/xxx, Port: None, Schema: http, Login: , Password: None, extra: {}
[2021-07-06 13:59:37,254] {logging_mixin.py:104} INFO - {}
[2021-07-06 13:59:37,258] {logging_mixin.py:104} INFO - Proxy is :
[2021-07-06 13:59:37,326] {base.py:78} INFO - Using connection to: id: msteams_webhook_url. Host: maersk.webhook.office.com/webhookb2/xxx, Port: None, Schema: http, Login: , Password: None, extra: {}
[2021-07-06 13:59:37,378] {http.py:140} INFO - Sending 'POST' to url: http://maersk.webhook.office.com/webhookb2/xxx
[2021-07-06 13:59:41,910] {http.py:154} ERROR - HTTP error: Method Not Allowed
[2021-07-06 13:59:41,920] {http.py:155} ERROR - Invalid webhook request - GET not supported
[2021-07-06 13:59:42,004] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/http/hooks/http.py", line 152, in check_response
response.raise_for_status()
File "/home/airflow/.local/lib/python3.6/site-packages/requests/models.py", line 943, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 405 Client Error: Method Not Allowed for url: https://maersk.webhook.office.com/webhookb2/xxx
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/opt/airflow/plugins/operators/ms_teams_webhook_operator.py", line 90, in execute
self.hook.execute()
File "/opt/airflow/plugins/hooks/ms_teams_webhook_hook.py", line 136, in execute
extra_options={'proxies': proxies})
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/http/hooks/http.py", line 141, in run
return self.run_and_check(session, prepped_request, extra_options)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/http/hooks/http.py", line 191, in run_and_check
self.check_response(response)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/http/hooks/http.py", line 156, in check_response
raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 405:Method Not Allowed
[2021-07-06 13:59:42,069] {taskinstance.py:1531} INFO - Marking task as FAILED. dag_id=ms_teams_notification, task_id=ms_team_notification, execution_date=20151012T000000, start_date=20210706T135936, end_date=20210706T135942
[2021-07-06 13:59:42,283] {local_task_job.py:151} INFO - Task exited with return code 1
Button is not showing up in the notification
from ms_teams_webhook_operator import MSTeamsWebhookOperator
def on_failure(context):
dag_id = context['dag_run'].dag_id
task_id = context['task_instance'].task_id
context['task_instance'].xcom_push(key=dag_id, value=True)
logs_url = "file:///Users/shubamsachdeva/airflow/logs/{}/{}/{}".format(
dag_id, task_id, context['ts'])
teams_notification = MSTeamsWebhookOperator(
task_id="msteams_notify_failure",
trigger_rule="all_done",
#message="`{}` has failed on task: `{}`. View Logs at: `{}`".format(dag_id, task_id, logs_url),
message = "**Hello from Airflow!**",
subtitle = "`{}` has failed on task: `{}`".format(dag_id, task_id),
button_text="View log",
button_url=logs_url,
theme_color="FF0000",
http_conn_id="msteams-webhook-url")
teams_notification.execute(context)
print('Error Notification is sent to your MS Team Channel')
are you simply adding "MSTeamsWebhookOperator" to requirements.txt ? I don't see this mentioned in the instructions anywhere. Best to copy/paste the raw code for the operator & hook into my plugins directory?
I have 100 table loads, running every 10min, if the source system goes down, I get 100 alerts, and I miss other alerts from different DAG which might come between these alerts.
Is it possible to configure that the 2nd alert from the dag for an execution date is sent as a reply to the 1st alert?
I am trying to create a Connection in my managed Airflow service - GCP's Composer. However, when trying to create the connection, Airflow is letting me know that the data is too long in the column 'host'. Here is part of the error without the actual Webhook URI attached:
Do you know if there is a way to fix this in Composer? I cannot access the database since it is managed by GCP.
Hi, I'm using Airflow 1.10.15, hosted on a GCP composer, so I was trying to make a http connection to MSTeams, while adding the webook url into the Host field got this error:
Failed to create record. (_mysql_exceptions.DataError) (1406, "Data too long for column 'host' at row 1") [SQL: INSERT INTO connection (conn_id, conn_type, host, schema
, login, password, port, is_encrypted, is_extra_encrypted, extra) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)] [parameters: ('http_teams_hook', 'http', 'https://.webhook.office.com/webhookb2/29762ac4----bc12c6fb788e@efce8346----0fd07bd5e442/IncomingWebhook/32287185d66a4813b4cbd78914e01cab/8094d090----************', None, None, None, None, 0, 0, '')] (Background on this error at: http://sqlalche.me/e/13/9h9h)
used the below script to send teams alert. It's sending alert as expected but color is not displayed on the Card's topline.
teams_notification = MSTeamsWebhookOperator(
task_id="msteams_notify_batch_start", trigger_rule="all_done",
message=f"**Dag Running**",
subtitle=f"Task-1 is completed successfully",
button_text=f'{dag_id}',
button_url=f'{dag_url}',
theme_color="00FF00",
http_conn_id='prashanth-webhook')
teams_notification.execute(context)
Any chance on implementation of @mentions in msteams?
Hi,
After upgrading the cloud composer and airflow versions from v.1.10.10 to 1.10.15
Detailed logs are attached here.
Can you please help ?
[2022-01-05 14:34:40,780] {base_hook.py:89} INFO - Using connection to: id: msteam-test. Host: https://XXXXXXX.webhook.office.com/webhookb2/76735d08-3051-4315-9636-09590681eb21@68283f3b-8487-4c86-adb3-a5228f18b893/IncomingWebhook/7475361be6614c4e81e9ea09e4393f4e/bebbf3a3-76c5-4a28-9802-4e734e03b60e, Port: None, Schema: https, Login: None, Password: None, extra: None
[2022-01-05 14:34:40,785] {http_hook.py:136} INFO - Sending 'POST' to url: https://XXXXX.webhook.office.com/webhookb2/76735d08-3051-4315-9636-09590681eb21@68283f3b-8487-4c86-adb3-a5228f18b893/IncomingWebhook/7475361be6614c4e81e9ea09e4393f4e/bebbf3a3-76c5-4a28-9802-4e734e03b60e
[2022-01-05 14:34:40,815] {http_hook.py:185} WARNING - ('Connection aborted.', OSError(0, 'Error')) Tenacity will retry to execute the operation
[2022-01-05 14:34:40,816] {taskinstance.py:1152} ERROR - ('Connection aborted.', OSError(0, 'Error'))
Traceback (most recent call last):
File "/opt/python3.6/lib/python3.6/site-packages/urllib3/connectionpool.py", line 706, in urlopen
chunked=chunked,
File "/opt/python3.6/lib/python3.6/site-packages/urllib3/connectionpool.py", line 382, in _make_request
self.validate_conn(conn)
File "/opt/python3.6/lib/python3.6/site-packages/urllib3/connectionpool.py", line 1010, in validate_conn
conn.connect()
File "/opt/python3.6/lib/python3.6/site-packages/urllib3/connection.py", line 421, in connect
tls_in_tls=tls_in_tls,
File "/opt/python3.6/lib/python3.6/site-packages/urllib3/util/ssl.py", line 429, in ssl_wrap_socket
sock, context, tls_in_tls, server_hostname=server_hostname
File "/opt/python3.6/lib/python3.6/site-packages/urllib3/util/ssl.py", line 472, in _ssl_wrap_socket_impl
return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
File "/opt/python3.6/lib/python3.6/ssl.py", line 407, in wrap_socket
_context=self, _session=session)
File "/opt/python3.6/lib/python3.6/ssl.py", line 817, in init
self.do_handshake()
File "/opt/python3.6/lib/python3.6/ssl.py", line 1077, in do_handshake
self._sslobj.do_handshake()
File "/opt/python3.6/lib/python3.6/ssl.py", line 689, in do_handshake
self._sslobj.do_handshake()
OSError: [Errno 0] Error
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/test_ms_teams.zip/test_ms_teams.py", line 43, in ms_team_notification
op1.execute(dict())
File "/home/airflow/gcs/dags/test_ms_teams.zip/msteams/ms_teams_webhook_operator.py", line 89, in execute
self.hook.execute()
File "/home/airflow/gcs/dags/test_ms_teams.zip/msteams/ms_teams_webhook_hook.py", line 132, in execute
extra_options={'proxies': proxies})
File "/usr/local/lib/airflow/airflow/hooks/http_hook.py", line 137, in run
return self.run_and_check(session, prepped_request, extra_options)
File "/usr/local/lib/airflow/airflow/hooks/http_hook.py", line 186, in run_and_check
raise ex
File "/usr/local/lib/airflow/airflow/hooks/http_hook.py", line 178, in run_and_check
allow_redirects=extra_options.get("allow_redirects", True))
File "/opt/python3.6/lib/python3.6/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/requests/adapters.py", line 498, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', OSError(0, 'Error'))
airflow.hooks.http_hook is deprecated , I can't seem to get the replacement to import.
If only the webhook_token
parameter is provided and no http_conn_id
, the following error occurs in the method get_proxy
:
File ".../ms_teams_webhook_operator.py", line 90, in execute
self.hook.execute()
File ".../ms_teams_webhook_hook.py", line 129, in execute
proxy_url = self.get_proxy(self.http_conn_id)
File ".../ms_teams_webhook_hook.py", line 72, in get_proxy
conn = self.get_connection(http_conn_id)
File ".../lib/python3.8/site-packages/airflow/hooks/base.py", line 67, in get_connection
conn = Connection.get_connection_from_secrets(conn_id)
File ".../lib/python3.8/site-packages/airflow/models/connection.py", line 376, in get_connection_from_secrets
conn = secrets_backend.get_connection(conn_id=conn_id)
File ".../lib/python3.8/site-packages/airflow/secrets/base_secrets.py", line 64, in get_connection
conn_uri = self.get_conn_uri(conn_id=conn_id)
File ".../lib/python3.8/site-packages/airflow/secrets/environment_variables.py", line 34, in get_conn_uri
environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
AttributeError: 'NoneType' object has no attribute 'upper'
This can be easily fixed by adding proxy information only if http_conn_id
is provided:
# ms_teams_webhook_hook.py, l. 127ff
proxies = {}
if self.http_conn_id is not None:
proxy_url = self.get_proxy(self.http_conn_id)
print("Proxy is : " + proxy_url)
if len(proxy_url) > 5:
proxies = {'https': proxy_url}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.