astronomer / astronomer-providers Goto Github PK
View Code? Open in Web Editor NEWAirflow Providers containing Deferrable Operators & Sensors from Astronomer
Home Page: https://astronomer-providers.rtfd.io/
License: Apache License 2.0
Airflow Providers containing Deferrable Operators & Sensors from Astronomer
Home Page: https://astronomer-providers.rtfd.io/
License: Apache License 2.0
Some of the rules that can be added are:-
This story is the technical debt story for the pending work to use the OSS BigQueryHook within the BigQueryInsertJobOperatorAsync.
The code change needs to
Note
Ensure that the PR apache/airflow#21385 is released before this story is implemented.
Based on the research in #32 - Async RedshiftOperator
Acceptance Criteria:
Create a utility script to auto terminate the cloud resources, if not used within the last 1 hr. This will improve our resource usage efficiency and reduce costs.
For instance, we can create a Lambda function on AWS which checks whether a particular resource(say, for example an EMR cluster) is unused for the last 1 hr and then terminate it automatically.
Describe the bug
If you try to install airflow==2.2.4
with the astronomer-providers==1.0.0
it will run into a dependency version conflict.
To Reproduce
Steps to reproduce the behavior:
pyenv virtualenv 3.9.7 airflow-test
pyenv virtualenv activate airlfow-test
pip install "apache-airflow[s3]==2.2.4" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.4/constraints-3.9.txt" "astronomer-providers==1.0.0"
ERROR: Cannot install astronomer-providers because these package versions have conflicting dependencies.
The conflict is caused by:
aiobotocore 2.1.1 depends on botocore<1.23.25 and >=1.23.24
The user requested (constraint) botocore==1.24.2
To fix this you could try to:
1. loosen the range of package versions you've specified
2. remove package versions to allow pip attempt to solve the dependency conflict
Expected behavior
It should install cleanly.
Additional context
Airflow 2.2.4 constraints: botocore==1.24.2
astronomer-providers 1.0.0 requirements: aiobotocore>=2.1.1
aiobotocore 2.1.1 requirements: botocore>=1.23.24,<1.23.25
There might be similar issues with the other boto dependencies... I have tried with 2.2.3 and 2.2.2 but there are similar issues with a different packages:
e.g. 2.2.3:
The conflict is caused by:
astronomer-providers 1.0.0 depends on apache-airflow-providers-cncf-kubernetes>=3
The user requested (constraint) apache-airflow-providers-cncf-kubernetes==2.2.0
e.g. 2.2.2:
The conflict is caused by:
apache-airflow[s3] 2.2.2 depends on apache-airflow-providers-amazon; extra == "s3"
astronomer-providers 1.0.0 depends on apache-airflow-providers-amazon>=3.0.0
The user requested (constraint) apache-airflow-providers-amazon==2.4.0
Providers:
Testing & Production Readiness
- Setup dev environments
- Write and run actual dags to tests apart from the unit tests in the repo
AzureCosmosDocumentSensor. This task can be done after the research task in #185.
Acceptance Criteria:
apache-airflow-providers-google==6.3.0
2.2.3
Any
Docker-Compose
No response
The operator airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator
fails when the table for which data needs to be fetched has data field with the following error.
2022-02-24, 06:16:45 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/providers/google/cloud/operators/bigquery.py:475: DeprecationWarning: The bigquery_conn_id parameter has been deprecated. You should pass the gcp_conn_id parameter.
hook = BigQueryHook(
[2022-02-24, 06:16:47 UTC] {bigquery.py:489} INFO - Total extracted rows: 10
[2022-02-24, 06:16:47 UTC] {xcom.py:333} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config.
[2022-02-24, 06:16:47 UTC] {taskinstance.py:1700} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1329, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1455, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2135, in xcom_push
XCom.set(
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 100, in set
value = XCom.serialize_value(value)
File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 331, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.9/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.9/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.9/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.9/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type date is not JSON serializable
[2022-02-24, 06:16:47 UTC] {taskinstance.py:1267} INFO - Marking task as FAILED. dag_id=example_async_bigquery_queries, task_id=get_data, execution_date=20220224T061606, start_date=20220224T061644, end_date=20220224T061647
[2022-02-24, 06:16:47 UTC] {standard_task_runner.py:89} ERROR - Failed to execute job 86 for task get_data
The data has been fetched but while pushing to XCOM Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom,
Expected to the return all the records properly.
Create a table with date field column and try to fetch the records using BigQueryGetDataOperator
We need a solution to highlight to the developer whenever an incorrect trigger classpath is used in the serialize method.
We ended up spending unreasonable time to detect this last time, as the task will just show the exception as "Trigger failure" and no other information
Either write a unit test to detect this (or) come up with an alternative approach.
Follow up of #31
Implement async versions for the following sensors:
BigQueryTableExistenceSensor
-> @rajaths010494BigQueryTablePartitionExistenceSensor
--> Addressed in #173Acceptance Criteria:
SnowflakeOperatorAsync
The Python Connector for Snowflake support Asynchronous query:
https://docs.snowflake.com/en/user-guide/python-connector-example.html#performing-an-asynchronous-query
So we should be able to make the SnowflakeOperator
Async by creating a run_async
method in the SnowflakeHook
which will be used by SnowflakeOperatorAsync
.
Take example from the DatabricksSubmitRunOperatorAsync
.
The SnowflakeOperator
in Open source Airflow (code) executes a query, waits and blocks an Airflow worker slot until the query is completed. Our job will be to replace the blocking call with execute_async
and time based poll where we use asyncio.sleep()
(similar to DatabricksTrigger
)
SnowflakeHookAsync
SnowflakeTrigger
SnowflakeOperatorAsync
that uses AsyncSnowflakeHook
and SnowflakeTrigger
Follow up of #35 to Implement async versions of the remaining GCS sensors:
GCSObjectUpdateSensor
-> @bharanidharan14GCSObjectsWithPrefixExistenceSensor
-> @rajaths010494GCSUploadSessionCompleteSensor
-> @rajaths010494Acceptance Criteria:
Acceptance Criteria:
We want to have a private PyPI to publish the wheel package (packaging this repo) that has the following features:
Based on the research in #32 - Async RedshiftOperator
.
Acceptance Criteria:
Please look at docs to see if there are other operators we should make Async (Principle - Only create Async operators for the “sync-version” of operators that do some level of polling; take more than a few seconds to complete) for the Azure Provider.
Acceptance Criteria:
Get the numbers for a rough sizing (how many) in each category from the OSS?
Database (Postgres, Snowflake, …)
File (Local, GCS, S3, …)
Job submit (Spark, Databricks?, …)
Notification / Alerting (Slack, …)
REST / JSON API (…) Not sure how many of these exist in Airflow, but big market for these.
Monitoring (Datadog, ..)
Data Quality (Great Expectation, … )
Observability / Lineage (Datakin, …)
We want to run all the example DAGs in this repo on a weekly basis at least to start with (we can change frequency later on).
There is also a recent related AIP worth reading and adding your thoughts on it - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests
Acceptance Criteria:
Note: The DAG should contain cleanup tasks at the end to destroy all the resources, example nuking the BigQuery table as the last step.
Possible Solution:
Acceptance Criteria:
We want to write a version of the KubernetesPodOperator where the core "waiting for the pod to finish" part of the operator is offloaded to a trigger.
Acceptance Criteria:
Add Compatibility matrix with Airflow versions
Async version of https://github.com/apache/airflow/blob/1008d8bf8acf459dbc692691a589c27fa4567123/airflow/providers/google/cloud/sensors/gcs.py#L30 using one of the following libraries:
Official Python client does not support it yet: googleapis/google-cloud-python#3103
Acceptance Criteria:
As a follow-up to #14 we should complete the remaining operators/sensors in the s3 module for the Amazon provider. Please look at docs to see if there are other operators we should make Async (Principle - Only create Async operators for the “sync-version” of operators that do some level of polling; take more than a few seconds to complete).
Acceptance Criteria:
Tests fail (link) if we upgrade to apache-airflow-providers-databricks>2.0.2
We should bump the version and fix the failing tests.
Implement async version of DatabricksSqlOperator
Acceptance Criteria:
Unit Tests coverage in the PR (90% Code Coverage -- We will need to add CodeCov separately to measure code cov) with all of them passing
Example DAG using the async Operator that can be used to run Integration tests that are parametrized via Environment variables. Example - https://github.com/apache/airflow/blob/8a03a505e1df0f9de276038c5509135ac569a667/airflow/providers/google/cloud/example_dags/example_bigquery_to_gcs.py#L33-L35
Add proper docstrings for each of the methods and functions including Example DAG on how it should be used (populate
Exception Handling in case of errors
Improve the OSS Docs to make sure it covers the following:
Has an example DAG for the sync version
How to add a connection via Environment Variable & explain each of the fields. Example - https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/connections/postgres.html
How to use Guide for the Operator - example: https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html
We'd like a pair of async operators that mirror the functionality of Airflow's Databricks operators.
Add Type Hints and Enable mypy with strict checking.
Preparatory work in https://github.com/astronomer/astronomer-providers/tree/mypy-strict
Running make run-mypy
will show the errors that need fixing.
Acceptance Criteria:
Code coverage on the Snowflake and Redshift operators is low. Lets bring it above 90%.
Acceptance Criteria:
Document possible options and selection reason for a particular library in this GitHub issue via a Summary comment.
Find a Python library that supports async implementation for boto3.
Some quick research points to the following:
Official Python client does not support it yet: boto/botocore#458
astronomer.providers.XYZ
(#57)Use Sphinx and Sphinx auto-api extension to autogenerate docs to achieve the following:
To better control what we expose in the docs we could take the Docker project for inspiration. Docs and it's source-code.
Feel free to add more tickets for main code completion/tests, or just expand this one to cover them all.
Follow up of #34 to implement async version of S3ToRedshiftOperator
.
Acceptance Criteria:
Acceptance Criteria:
Acceptance Criteria:
Get Certification
The HTTPSensorAsync
does not defer the task to trigger. Its essentially calling the HTTPSensor
run method. Pasting the log for the example dag for the reference.
[2022-02-28, 13:43:41 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/utils/context.py:152: AirflowContextDeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2022-02-28, 13:43:46 UTC] {http.py:101} INFO - Poking:
[2022-02-28, 13:43:46 UTC] {base.py:70} INFO - Using connection to: id: http_default. Host: randomuser.me, Port: None, Schema: , Login: , Password: None, extra: {}
[2022-02-28, 13:43:46 UTC] {http.py:140} INFO - Sending 'GET' to url: http://randomuser.me
[2022-02-28, 13:43:52 UTC] {http.py:101} INFO - Poking:
[2022-02-28, 13:43:52 UTC] {base.py:70} INFO - Using connection to: id: http_default. Host: randomuser.me, Port: None, Schema: , Login: , Password: None, extra: {}
[2022-02-28, 13:43:52 UTC] {http.py:140} INFO - Sending 'GET' to url: http://randomuser.me
[2022-02-28, 13:43:57 UTC] {http.py:101} INFO - Poking:
[2022-02-28, 13:43:57 UTC] {base.py:70} INFO - Using connection to: id: http_default. Host: randomuser.me, Port: None, Schema: , Login: , Password: None, extra: {}
[2022-02-28, 13:43:58 UTC] {http.py:140} INFO - Sending 'GET' to url: http://randomuser.me
[2022-02-28, 13:44:03 UTC] {http.py:101} INFO - Poking:
[2022-02-28, 13:44:03 UTC] {base.py:70} INFO - Using connection to: id: http_default. Host: randomuser.me, Port: None, Schema: , Login: , Password: None, extra: {}
[2022-02-28, 13:44:03 UTC] {http.py:140} INFO - Sending 'GET' to url: http://randomuser.me
Get the Databricks operator coded up and working in manual testing, but without a full unit test suite
Is your feature request related to a problem? Please describe.
I would like to use only a part of this provider, in my case I only need the CNCF ( so I do not want all the aws , gcp .. depedencies )
Describe the solution you'd like
if I could do a pip install :
astronomer-providers[cncf]==1.0.0
Describe alternatives you've considered
Copy paste the CNCF folder of this repo inside my private dags repo
Additional context
I guess having all this great operators and hooks out of the official Airlfow project is on purpose, so I guess there is no plan to merge theses operators and hooks in the existing airflow providers ?
Thanks for all your work and open sourcing this great code 👍
The example DAG for S3 sensors is still not self sufficient. For example, when you are deferring the task, the task will keep on waiting for the expected file to come into S3, which currently doesnt move to complete until there is a manual intervention of uploading the file to s3. We should create a file automatically in the expected location through a parallel task which runs, say a minute after the deferral , and then this action should move the deferred task to completion.
PostgresOperatorAsync
If not there are potentially other libraries we can use:
So we should be able to make the PostgresOperator
Async by creating a run_async
method in the PostgresHook
which will be used by PostgresOperatorAsync
.
Take example from the DatabricksSubmitRunOperatorAsync
.
The PostgresOperator
in Open source Airflow (code) executes a query, waits and blocks an Airflow worker slot until the query is completed. Our job will be to replace the blocking call with an async call and time based poll where we use asyncio.sleep()
(similar to DatabricksTrigger
)
PostgresHookAsync
PostgresTrigger
PostgresOperatorAsync
that uses PostgresHookAsync
and PostgresTrigger
Integration Test with an example should cover all the actual testing
Research and integrate a pre-commit hook that checks the files for bad naming convention within this repository.
Example of a bad naming convention would be to name a file as AsyncBigQueryInsertJobOperator rather than BigQueryInsertJobOperatorAsync.
Add the trigger's common method to hooks as the methods can be reused by other sensors and triggers.
Acceptance Criteria:
Describe the bug
the env variable AIRFLOW__KUBERNETES__CONFIG_FILE is not used by the KubernetesPodOperatorAsync
To Reproduce
set
AIRFLOW__KUBERNETES__CONFIG_FILE=/usr/local/airflow/include/.kube/config
then it will log
Config not found: ~/.kube/config
Build async version of https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/bigquery.py
Acceptance Criteria:
Async version of AzureDataFactoryPipelineRunStatusSensor
: https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/stable/_api/airflow/providers/microsoft/azure/sensors/data_factory/index.html#airflow.providers.microsoft.azure.sensors.data_factory.AzureDataFactoryPipelineRunStatusSensor
Acceptance Criteria:
Based on the research in #13 - Async GCSObjectExistenceSensor
.
Async version of https://github.com/apache/airflow/blob/1008d8bf8acf459dbc692691a589c27fa4567123/airflow/providers/google/cloud/sensors/gcs.py#L30 using one of the following libraries:
https://github.com/talkiq/gcloud-aio/blob/master/storage/README.rst
https://github.com/omarryhan/aiogoogle
Official Python client does not support it yet: googleapis/google-cloud-python#3103
Acceptance Criteria:
Acceptance Criteria:
Get Certification
We're going to run this primarily through the beta docs UI - Jake W will handle the final formatting and markup, we just need to get him a reasonable draft in Notion.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.