Git Product home page Git Product logo

airflow-provider-fivetran-async's Introduction

Fivetran Async Provider for Apache Airflow

This package provides an async operator, sensor and hook that integrates Fivetran into Apache Airflow. FivetranSensor allows you to monitor a Fivetran sync job for completion before running downstream processes. FivetranOperator submits a Fivetran sync job and polls for its status on the triggerer. Since an async sensor or operator frees up worker slot while polling is happening on the triggerer, they consume less resources when compared to traditional "sync" sensors and operators.

Fivetran automates your data pipeline, and Airflow automates your data processing.

Installation

Prerequisites: An environment running apache-airflow.

pip install airflow-provider-fivetran-async

Configuration

In the Airflow user interface, configure a Connection for Fivetran. Most of the Connection config fields will be left blank. Configure the following fields:

  • Conn Id: fivetran
  • Conn Type: Fivetran
  • Login: Fivetran API Key
  • Password: Fivetran API Secret

Find the Fivetran API Key and Secret in the Fivetran Account Settings, under the API Config section. See our documentation for more information on Fivetran API Authentication.

The sensor assumes the Conn Id is set to fivetran, however if you are managing multiple Fivetran accounts, you can set this to anything you like. See the DAG in examples to see how to specify a custom Conn Id.

Modules

from fivetran_provider_async.operators import FivetranOperator

FivetranOperator submits a Fivetran sync job and monitors it on trigger for completion.

FivetranOperator requires that you specify the connector_id of the Fivetran connector you wish to trigger. You can find connector_id in the Settings page of the connector you configured in the Fivetran dashboard.

The FivetranOperator will wait for the sync to complete so long as wait_for_completion=True (this is the default). It is recommended that you run in deferrable mode (this is also the default). If wait_for_completion=False, the operator will return the timestamp for the last sync.

Import into your DAG via:

from fivetran_provider_async.sensors import FivetranSensor

FivetranSensor monitors a Fivetran sync job for completion. Monitoring with FivetranSensor allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.

FivetranSensor requires that you specify the connector_id of the Fivetran connector you want to wait for. You can find connector_id in the Settings page of the connector you configured in the Fivetran dashboard.

You can use multiple instances of FivetranSensor to monitor multiple Fivetran connectors.

FivetranSensor is most commonly useful in two scenarios:

  1. Fivetran is using a separate scheduler than the Airflow scheduler.
  2. You set wait_for_completion=False in the FivetranOperator, and you need to await the FivetranOperator task later. (You may want to do this if you want to arrange your DAG such that some tasks are dependent on starting a sync and other tasks are dependent on completing a sync).

If you are doing the 1st pattern, you may find it useful to set the completed_after_time to data_interval_end, or data_interval_end with some buffer:

fivetran_sensor = FivetranSensor(
    task_id="wait_for_fivetran_externally_scheduled_sync",
    connector_id="bronzing_largely",
    poke_interval=5,
    completed_after_time="{{ data_interval_end + macros.timedelta(minutes=1) }}",
)

If you are doing the 2nd pattern, you can use XComs to pass the target completed time to the sensor:

fivetran_op = FivetranOperator(
    task_id="fivetran_sync_my_db",
    connector_id="bronzing_largely",
    wait_for_completion=False,
)

fivetran_sensor = FivetranSensor(
    task_id="wait_for_fivetran_db_sync",
    connector_id="bronzing_largely",
    poke_interval=5,
    completed_after_time="{{ task_instance.xcom_pull('fivetran_sync_op', key='return_value') }}",
)

fivetran_op >> fivetran_sensor

You may also specify the FivetranSensor without a completed_after_time. In this case, the sensor will make note of when the last completed time was, and will wait for a new completed time.

Examples

See the examples directory for an example DAG.

Issues

Please submit issues and pull requests in our official repo: https://github.com/astronomer/airflow-provider-fivetran-async

We are happy to hear from you. Please email any feedback to the authors at [email protected].

airflow-provider-fivetran-async's People

Contributors

csaller avatar denimalpaca avatar dwreeves avatar johnhoran avatar josh-fell avatar kaxil avatar pankajastro avatar pankajkoti avatar phanikumv avatar pre-commit-ci[bot] avatar pubchimps avatar sunank200 avatar utkarsharma2 avatar virajmparekh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airflow-provider-fivetran-async's Issues

Fivetran Connection does not show API Key and Secret slots

HI,

I configured the Airflow connection to Fivetran as specified in the docs. I am getting Fivetran conenction type, but the API Secret and API key slots are missing. Attaching the screenshot of the same. Please advise next steps to resolve the issue.

Screenshot 2023-09-21 at 14 31 54

Timing Delay

Hi Folks,

We recently started using the FivetranOperatorAsync operator because the FivetranOperator is deprecated and cannot handle reschedules.

We are observing large delays between the task initialization and the execution. Here is an excerpt from the logs:

Run ID: scheduled__2023-06-27T17:00:00+00:00

...
[2023-06-27, 20:00:03 CEST] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=transactions_speed_data_to_snowflake_dag, task_id=sync_mysql_c24_main_speed, execution_date=20230627T170000, start_date=20230627T180002
[2023-06-27, 20:00:03 CEST] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-06-27, 22:17:21 CEST] {base.py:73} INFO - Using connection ID 'fivetran_chronolytics' for task execution.
[2023-06-27, 22:17:23 CEST] {hooks.py:129} INFO - Connector "plied_presenting": sync_state = "scheduled"
[2023-06-27, 22:17:23 CEST] {triggers.py:89} INFO - sync is still running...
[2023-06-27, 22:17:23 CEST] {triggers.py:90} INFO - sleeping for 15 seconds.
[2023-06-27, 22:17:41 CEST] {hooks.py:129} INFO - Connector "plied_presenting": sync_state = "scheduled"
...

[2023-06-27, 22:46:57 CEST] {triggerer_job_runner.py:608} INFO - Trigger transactions_speed_data_to_snowflake_dag/scheduled__2023-06-27T17:00:00+00:00/sync_mysql_c24_main_speed/-1/1 (ID 452) fired: TriggerEvent<{'status': 'error', 'message': "Cannot connect to host api.fivetran.com:443 ssl:default [Connect call failed ('35.236.237.87', 443)]"}>
[2023-06-27, 22:47:00 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T17:00:00+00:00 [queued]>
[2023-06-27, 22:47:00 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T17:00:00+00:00 [queued]>
[2023-06-27, 22:47:00 CEST] {taskinstance.py:1306} INFO - Resuming after deferral
[2023-06-27, 22:47:00 CEST] {taskinstance.py:1327} INFO - Executing <Task(FivetranOperatorAsync): sync_mysql_c24_main_speed> on 2023-06-27 17:00:00+00:00
[2023-06-27, 22:47:00 CEST] {standard_task_runner.py:57} INFO - Started process 1655824 to run task
[2023-06-27, 22:47:00 CEST] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'transactions_speed_data_to_snowflake_dag', 'sync_mysql_c24_main_speed', 'scheduled__2023-06-27T17:00:00+00:00', '--job-id', '331794', '--raw', '--subdir', 'DAGS_FOLDER/extract/fivetran_data_to_snowflake_dag.py', '--cfg-path', '/tmp/tmpdhc5q_08']
[2023-06-27, 22:47:00 CEST] {standard_task_runner.py:85} INFO - Job 331794: Subtask sync_mysql_c24_main_speed
[2023-06-27, 22:47:00 CEST] {task_command.py:410} INFO - Running <TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T17:00:00+00:00 [running]> on host airflow
[2023-06-27, 22:47:00 CEST] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/fivetran_provider_async/operators.py", line 92, in execute_complete
    raise AirflowException(msg)
airflow.exceptions.AirflowException: error: Cannot connect to host api.fivetran.com:443 ssl:default [Connect call failed ('35.236.237.87', 443)]
...

Run ID: scheduled__2023-06-27T18:00:00+00:00

...
[2023-06-27, 21:00:05 CEST] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=transactions_speed_data_to_snowflake_dag, task_id=sync_mysql_c24_main_speed, execution_date=20230627T180000, start_date=20230627T190002
[2023-06-27, 21:00:05 CEST] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-06-27, 22:17:21 CEST] {base.py:73} INFO - Using connection ID 'fivetran_chronolytics' for task execution.
[2023-06-27, 22:17:23 CEST] {hooks.py:129} INFO - Connector "plied_presenting": sync_state = "scheduled"
[2023-06-27, 22:17:23 CEST] {triggers.py:89} INFO - sync is still running...
[2023-06-27, 22:17:23 CEST] {triggers.py:90} INFO - sleeping for 15 seconds.
[2023-06-27, 22:17:41 CEST] {hooks.py:129} INFO - Connector "plied_presenting": sync_state = "scheduled"
...

[2023-06-27, 22:47:02 CEST] {triggerer_job_runner.py:608} INFO - Trigger transactions_speed_data_to_snowflake_dag/scheduled__2023-06-27T18:00:00+00:00/sync_mysql_c24_main_speed/-1/1 (ID 455) fired: TriggerEvent<{'status': 'error', 'message': "Cannot connect to host api.fivetran.com:443 ssl:default [Connect call failed ('35.236.237.87', 443)]"}>
[2023-06-27, 22:47:04 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T18:00:00+00:00 [queued]>
[2023-06-27, 22:47:04 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T18:00:00+00:00 [queued]>
[2023-06-27, 22:47:04 CEST] {taskinstance.py:1306} INFO - Resuming after deferral
[2023-06-27, 22:47:04 CEST] {taskinstance.py:1327} INFO - Executing <Task(FivetranOperatorAsync): sync_mysql_c24_main_speed> on 2023-06-27 18:00:00+00:00
[2023-06-27, 22:47:04 CEST] {standard_task_runner.py:57} INFO - Started process 1655892 to run task
[2023-06-27, 22:47:04 CEST] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'transactions_speed_data_to_snowflake_dag', 'sync_mysql_c24_main_speed', 'scheduled__2023-06-27T18:00:00+00:00', '--job-id', '331796', '--raw', '--subdir', 'DAGS_FOLDER/extract/fivetran_data_to_snowflake_dag.py', '--cfg-path', '/tmp/tmp7y56_u6y']
[2023-06-27, 22:47:04 CEST] {standard_task_runner.py:85} INFO - Job 331796: Subtask sync_mysql_c24_main_speed
[2023-06-27, 22:47:04 CEST] {task_command.py:410} INFO - Running <TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T18:00:00+00:00 [running]> on host airflow
[2023-06-27, 22:47:04 CEST] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/fivetran_provider_async/operators.py", line 92, in execute_complete
    raise AirflowException(msg)
airflow.exceptions.AirflowException: error: Cannot connect to host api.fivetran.com:443 ssl:default [Connect call failed ('35.236.237.87', 443)]
...

Run ID: scheduled__2023-06-27T19:00:00+00:00

...
[2023-06-27, 22:00:04 CEST] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=transactions_speed_data_to_snowflake_dag, task_id=sync_mysql_c24_main_speed, execution_date=20230627T190000, start_date=20230627T200002
[2023-06-27, 22:00:04 CEST] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-06-27, 22:17:21 CEST] {base.py:73} INFO - Using connection ID 'fivetran_chronolytics' for task execution.
[2023-06-27, 22:17:23 CEST] {hooks.py:129} INFO - Connector "plied_presenting": sync_state = "scheduled"
[2023-06-27, 22:17:23 CEST] {triggers.py:89} INFO - sync is still running...
[2023-06-27, 22:17:23 CEST] {triggers.py:90} INFO - sleeping for 15 seconds.
[2023-06-27, 22:17:41 CEST] {hooks.py:129} INFO - Connector "plied_presenting": sync_state = "scheduled"
...

[2023-06-27, 22:47:35 CEST] {triggerer_job_runner.py:608} INFO - Trigger transactions_speed_data_to_snowflake_dag/scheduled__2023-06-27T19:00:00+00:00/sync_mysql_c24_main_speed/-1/1 (ID 458) fired: TriggerEvent<{'status': 'error', 'message': "Cannot connect to host api.fivetran.com:443 ssl:default [Connect call failed ('35.236.237.87', 443)]"}>
[2023-06-27, 22:47:36 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T19:00:00+00:00 [queued]>
[2023-06-27, 22:47:36 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T19:00:00+00:00 [queued]>
[2023-06-27, 22:47:36 CEST] {taskinstance.py:1306} INFO - Resuming after deferral
[2023-06-27, 22:47:36 CEST] {taskinstance.py:1327} INFO - Executing <Task(FivetranOperatorAsync): sync_mysql_c24_main_speed> on 2023-06-27 19:00:00+00:00
[2023-06-27, 22:47:36 CEST] {standard_task_runner.py:57} INFO - Started process 1656304 to run task
[2023-06-27, 22:47:36 CEST] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'transactions_speed_data_to_snowflake_dag', 'sync_mysql_c24_main_speed', 'scheduled__2023-06-27T19:00:00+00:00', '--job-id', '331798', '--raw', '--subdir', 'DAGS_FOLDER/extract/fivetran_data_to_snowflake_dag.py', '--cfg-path', '/tmp/tmpe4dkt18r']
[2023-06-27, 22:47:36 CEST] {standard_task_runner.py:85} INFO - Job 331798: Subtask sync_mysql_c24_main_speed
[2023-06-27, 22:47:36 CEST] {task_command.py:410} INFO - Running <TaskInstance: transactions_speed_data_to_snowflake_dag.sync_mysql_c24_main_speed scheduled__2023-06-27T19:00:00+00:00 [running]> on host airflow
[2023-06-27, 22:47:36 CEST] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/fivetran_provider_async/operators.py", line 92, in execute_complete
    raise AirflowException(msg)
airflow.exceptions.AirflowException: error: Cannot connect to host api.fivetran.com:443 ssl:default [Connect call failed ('35.236.237.87', 443)]
...

I have built a small working example from the code we use. Since we need the operator several times, I have outsourced the initialisation to a method.

import pendulum
from airflow import DAG
from fivetran_provider_async.operators import FivetranOperatorAsync

FIVETRAN_CONN_ID = 'xxx'
FIVETRAN_DAG_NAME_TEMPLATE = "{}_data_to_snowflake_dag"
FIVETRAN_DAG_ARGS = {
    "start_date": pendulum.today('UTC').add(days=-1),
    "catchup": False
}
CONNECTORS = [{'name':'xxx', 'id': 'xxx'}]

def sync_connectors(connectors, dag=None):

    for connector in connectors:

        run_fivetran_sync = FivetranOperatorAsync(
            task_id=f"sync_{connector['name'].lower()}",
            fivetran_conn_id=FIVETRAN_CONN_ID,
            connector_id=connector['id'],
            dag=dag
        )

    return run_fivetran_sync

sync_transactions_hourly_data_dag = DAG(
    dag_id=FIVETRAN_DAG_NAME_TEMPLATE.format('transactions_speed'),
    schedule="0 * * * *",
    **FIVETRAN_DAG_ARGS
)

sync_connectors(connectors, dag=sync_transactions_hourly_data_dag)

We host Airflow ourselves and have not used Deferred Tasks so far. Therefore, I cannot exclude that there is a problem with the configuration of the environment. Can anyone help here?

Best,
Fabio

FivetranOperatorAsync waits forever in case of reschedule

We ran into a case where we triggered a Fivetran sync of Hubspot and it hit a Hubspot API error:

From the fivetran logs:

{
  "event" : "warning",
  "data" : {
    "type" : "retry_api_call",
    "message" : "Retrying last failed API call after 78 seconds, Reason :HTTP 500 Internal Server Error"
  },
  "created" : "2023-03-20T20:01:14.545Z",
  "connector_type" : "hubspot",
  "connector_id" : "feeling_against",
  "connector_name" : "hubspot_fivetran",
  "sync_id" : "c316efc5-92a6-46a9-b759-60dbe5972b27"
}

and later:

{
  "event" : "status",
  "data" : {
    "status" : "RESCHEDULED",
    "reason" : "Rescheduled for 2023-03-20T20:54:53.894Z, reason: Rescheduled the sync due to server error",
    "rescheduledAt" : "2023-03-20T20:54:54.041Z"
  },
  "created" : "2023-03-20T20:25:47.512Z",
  "connector_type" : "hubspot",
  "connector_id" : "feeling_against",
  "connector_name" : "hubspot_fivetran",
  "sync_id" : "c316efc5-92a6-46a9-b759-60dbe5972b27"
}

According to fivetran docs:

Fivetran reschedules syncs in two situations:
Fivetran has reached the request limit for your application's API.
Fivetran has exceeded the set number of API calls we have provisioned for your connector.
In either rescheduling scenario, the sync automatically pauses and resumes the following day, once the daily API limit has > been restored.

In our case, the triggerer on our end just kept polling forever. Almost a day later, we noticed this, and cancelled the run, letting the next run trigger again.

It would be better if the fivetran sensor failed the task right away, and then we could use our own retry or schedule logic to decide what to do from there.

FivetranSensor throws TypeError when sync concludes too quickly

My work has a few Fivetran connectors that finish very fast -- less than half a minute. When this happens, I get the following error:

Using connection ID 'fivetran' for task execution.
[2023-10-30, 16:21:58 UTC] Connector <connector name hidden>: sync_state = scheduled
[2023-10-30, 16:21:58 UTC] Task failed with exception
Traceback (most recent call last):
  File "/<path hidden>/python3.9/site-packages/fivetran_provider_async/sensors.py", line 73, in execute
    elif not self.poke(context):
  File "/<path hidden>/python3.9/site-packages/fivetran_provider_async/sensors.py", line 101, in poke
    return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time)
  File "/<path hidden>/python3.9/site-packages/fivetran_provider_async/hooks.py", line 413, in get_sync_status
    self.log.info("Connector %s: succeeded_at: %s, connector_id", succeeded_at.to_iso8601_string())
  File "/usr/local/lib/python3.9/logging/__init__.py", line 1446, in info
    self._log(INFO, msg, args, **kwargs)
... 
 TypeError: not enough arguments for format string

I noticed that the string has two %s but only a single variable succeeded_at.to_iso8601_string() to format into it. I think the ending doublequote is actually misplaced: it should come before connector_id, not after.

self.log.info("Connector %s: succeeded_at: %s", connector_id, succeeded_at.to_iso8601_string())

Below is the corresponding code.

# Check if sync started by FivetranOperator has finished
# indicated by new 'succeeded_at' timestamp
if current_completed_at > previous_completed_at:
self.log.info("Connector %s: succeeded_at: %s, connector_id", succeeded_at.to_iso8601_string())
return True
else:
return False

FivetranOperatorAsync fails when using AWS SystemsManagerParameterStoreBackend secrets backend for fivetran_conn_id

Apache Airflow version
2.4.2

Deployment
Astronomer Runtime 6.0.3 based on Airflow 2.4.2+astro.1

Provider
airflow-provider-fivetran-async==1.0.0a5

What happened

  • We have our fivetran connection in aws parameter store
  • The FivetranOperatorAsync operator kicks off the sync in fivetran for our connector
  • The task is deferred
  • The connection to fivetran fails when the deferrable task is triggered to restart, with error "The conn_id fivetran isn't defined".
  • The dag fails
  • The fivetran sync continues to completion

NOTE: if we put the connection in the Airflow UI instead, then the deferrable task works as expected.

What you think should happen instead
The FivetranOperatorAsync should support aws parameter store as a secrets backend, or have a callout in the documentation that it only supports the two default secrets backends.

How to reproduce

  • Configure airflow to use aws param store secrets backend
[secrets]
backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "/airflow/connections", "variables_prefix": "/airflow/variables", "profile_name": "default"}
  • Add a fivetran connection in aws param store called "/airflow/connections/fivetran"
  • Create a dag that uses the FivetranOperatorAsync to trigger a fivetran connector sync using the fivetran connection in param store.
fivetran_async = FivetranOperatorAsync(
	task_id="fivetran_async",
	connector_id="fivetran_connector",
	fivetran_conn_id="fivetran"
)
  • Run the dag
  • The sync will start. You can verify that it is running in fivetran.
  • The dag will fail when the deferrable operator restarts

[Feature Request] Support DateTimeSensor-like behavior for FivetranSensor

Overview of existing FivetranSensor behavior

The FivetranSensor does the following:

  • At start, retrieves the last completed time of a Fivetran sync job
  • Waits around for a new completed time of a Fivetran sync job

I feel that this control flow is not what a majority of users will find optimal for a couple reasons.

First, any user triggering Fivetran via the FivetranOperator should probably prefer to use an ExternalSensorOperator over the FivetranSensor. This means the FivetranSensor is at its best when waiting on jobs scheduled by Fivetran automatically, rather than jobs scheduled by Airflow (and thus scheduled in Fivetran "manually").

For that reason, all examples and issues described below assume the user's system is that they are using the Sensor to wait on Fivetran jobs scheduled by Fivetran rather than jobs scheduled by Airflow. Many of the issues I will be describing still apply when scheduling Fivetran jobs via Airflow (and see my Implementation proposal section where I discuss a feature that would make sense to be added to FivetranOperator), but these issues are more pronounced and easier to understand when thinking about the Fivetran scheduler interoperating with the Airflow scheduler.

The main issue: backfilling (with 2 examples)

The main issue is that when waiting on Fivetran scheduled jobs, a DAG that is being backfilled does not necessarily need to wait for a new Fivetran job.

Scenario A (Last fail > Last success)

Imagine the following situation (Scenario A):

  • Today is January 15th
  • The last successful Fivetran sync was January 10th
  • The last unsuccessful Fivetran sync was January 12th
  • We are backfilling an Airflow dag with a @daily schedule starting January 1st

The user wants the following behavior to occur:

  • Successes up through January 10th because the data has been succesfully synced as of those dates.
  • Failures on Jan 11th and 12th because a sync was attempted, but the job failed.
  • The ability to select between one of two control flow behaviors:
    • (1) Waiting on Jan 14th and 15th because the system has not received a completed status since the Jan 12th failure.
    • (2) Failures on Jan 14th and 15th because previous Fivetran sensor tasks failed, and they don't want their DAG to have any potential gaps that come out of a temporal inconsistency of when the jobs were run. (The user can also do depends_on_past=True, but there is a subtle difference between that and this.)

This will allow the user to backfill data up to January 10th, at which point the DAG will fail:

Su Mo Tu We Th Fr Sa

The user in this case implements the FivetranSensor, and instead the way their DAG works is that it waits for the next Fivetran job to trigger, even though the backfilled jobs don't need to wait around for anything.

For this small backfill and with Fivetran jobs that aren't failing, this isn't a huge deal, but when backfilling years of data in daily chunks, this can unnecessarily slow down a backfill a lot. Imagine you sync your Fivetran data once a day, and your Airflow DAG's max active DagRuns is 10. In this case, implementing a FivetranSensor is bottlenecking your backfill by 10 DagRuns a day, and the only way around it is to implement complex control flow logic (e.g. BranchDateTimeOperator), or to implement a custom implementation, or to implement the backfill as a separate DAG.

Scenario B (Last success > Last time)

Scenario A was designed to see the full range of behaviors, but a far more typical scenario is that a last success occurred much more recently in the future than a last failure.

Imagine the following situation (Scenario B):

  • Today is January 15th
  • The last successful Fivetran sync was January 10th
  • The last unsuccessful Fivetran sync was January 2th
  • We are backfilling an Airflow dag with a @daily schedule starting January 1st

The user wants the following behavior to occur:

  • Successes up through January 10th because the data has been succesfully synced as of those dates.
  • Waiting between Jan 11th to Jan 15th because there are no further system updates since then.

The desired behavior when last fail time > last success time was up to interpretation, but here it is a little more straightforward that we should just be waiting.

Su Mo Tu We Th Fr Sa

Other issues

Fault tolerance

The FivetranSensor is not fault tolerant in the sense that, if the FivetranSensor is restarted, the new instance of the sensor may end up waiting on a different datetime than the previous instance.

For example, imagine a user has some sort of error that causes the task to fail (doesn't need to even come from the sensor itself; it could be e.g. an OOM error on the worker running the job). If a Fivetran jobs completes between the time the job failed and was resumed, the sensor will now be waiting on a different Fivetran job to complete.

Race condition (into cascading failures)

Imagine a user syncs the Fivetran job once every hour on the hour, and their Airflow DAG also syncs once every hour on the hour. One of the tasks within the DAG is a FivetranSensor.

Imagine the Fivetran job typically takes 55 seconds after the hour to run. This means that if the FivetranSensor is not executed by 00:00:55, then the sensor will end up waiting a whole hour to run, i.e. it completes at around 01:00:55.

It gets worse from here. Imagine the whole DAG is configured with default_args={"depends_on_past": True}. The FivetranSensor with execution date 00:00:00 that got stuck for an hour will end up blocking the next Fivetran sensor task from getting scheduled, meaning that the FivetranSensor with execution date 01:00:00 won't finish until 02:00:55. This is a cascading failure!

Implementation proposal

FivetranDateTimeSensor

The core functionality I am proposing is a FivetranDateTimeSensor.

  • For backwards compatibility reasons, this should be a separate sensor and should not replace the existing FivetranSensor.
  • Please provide feedback on the name, as I don't know what it should be called. I do believe this is a reasonable name because the behavior of this Sensor is very similar to how the DateTimeSensor works, which is that it waits for a time to pass but passes immediately on backfills.

The majority of the functionality can just become a new method in the hook.

Additional control flow kwargs:

  • target_time: datetime | str = "{{ data_interval_end }}". (Note: this is a templated field). This kwarg name comes directly from DateTimeSensor, albeit here it is optional. This is the timestamp that the Fivetran completed time is compared to.
  • propagate_failures_forward: bool = True - The behavior of this should be: when this flag is True, it makes it so the sensor fails when context["data_interval_end"] > fivetran_failed_at > fivetran_succeeded_at. If the flag is False, then instead it will wait around until there is a completed at time.
    • By default this should be True because (1) Fivetran shows the status of a connector as its latest status (2) it is more likely in practice to be a good "better safe than sorry" option for people.
  • always_wait_when_syncing: bool = False - Fivetran syncs in chunks and this can cause issues when reading from a database currently being synced in some situations. Imagine for example a transform job that "increments" using select max(write_time) from tbl. Fivetran writes in chunks not necessarily ordered by write_time, meaning doing stuff while Fivetran is syncing can cause you to skip data. (This is not the best example because you probably wouldn't have a backfill job touching the most recent write_time data, but this can still happen in other contexts).
    • By default this should be False because (1) it is not an obvious behavior / it is an added layer of complexity over the default and simple advertised behavior of the sensor (2) it is not typical or ideal that data being backfilled should be impacted by most recent syncs.

I see it as uncommon in most Airflow provider packages to create large inheritance trees, so it is reasonable to simply implement all of this as a subclass of BaseSensor and just allow the FivetranHook abstraction to do most of the heavy lifting.

Additional kwarg for FivetranOperator

On the topic of supporting backfills in a sensible manner, the FivetranOperator should also have a kwarg that skips running the Fivetran job when context["data_interval_end"] > fivetran_succeeded_at.

I'm not sure what a good name for this kwarg may be. skip_if_succeeded_after_data_interval_end is a little on the verbose side, but is an accurate description. I'd love to hear if anyone has ideas for a snappier name though.

For backwards compatibility, it should be introduced as having a default of False, albeit I do believe this would be a good default in a future release.

Other notes

  • I already have a version of this implemented in a production system, and am willing to implement a version of this functionality in this open source library.

  • Regardless of what happens, the documentation for the existing FivetranSensor should be more clear about what is happening.

  • EDIT: I realize I should be using the data_interval_end and not the logical_date. Logical date would be sensible if we knew when Fivetran last syncs start, but we do not have that info available; we only know when a sync ends.

Unable to select Fivetran connection in the Airflow Dashboard

My customer is trying to create a connection in the Astronomer Dashboard but is not able to see the Fivetran option.

image

They are installing the packages for the fivetranoperation api calls but unable to get that in the creation of Dag,
airflow-provider-fivetran-async
airflow-provider-fivetran

They are referencing the following documents.
https://registry.astronomer.io/providers/airflow-provider-fivetran-async/versions/2.0.2/modules/FivetranOperator
https://docs.astronomer.io/learn/airflow-fivetran

They are using Airflow MWAA in AWS with version 2.4.3

Can you help determine why the Fivetran option is not appearing? What configuration step are they missing?
Thank you!

Make code more idiomatic, clean, and better enforced

I was working on a separate issue and I started cleaning up the code. I realized that the code needs a little more love to be in tip-top shape and it would be inappropriate to combine a PR that both adds a feature and does a lot of cleanup, so I believe this should be a separate issue.

Example issues with existing code style

  • Incomplete typing, most notably in hooks.py.
  • Mypy should be part of pre-commit
  • pre-commit is not run as part of CI, but should be
  • When I substitute some types from the docstrings into Python types, I get a couple type errors in Mypy. Two examples:
    • FivetranHook.set_schedule_type says it returns a string, but it clearly returns a dict, and mypy makes this clear. (Upon closer inspection, the return type doesn't actually matter because the return value is never used.)
    • FivetranHook._do_api_call's json arg says it takes a dict, but it clearly takes a string, or more specifically a string representation of a JSON.
  • Some unnecessary calls to the API. For example, FivetranHook.prep_connector makes 3 total API calls when it really only needs to make 2.
  • Some un-idiomatic Airflow patterns. For example, _get_hook() instead of hook as a cached_property.
  • FivetranHook._do_api_call has some unnecessary weird behavior. Notably, it has a parameter called json that maps to either the data=json or params=json kwargs in the requests.request() call. This seems less desirable than just passing data=data, params=params, or json=json. Specifically, the json kwarg is confusing from the perspective of the requests and aiohttp APIs: in those APIs, json= expects a dict and data= expects a string, but in the FivetranHook API, json= is a string that gets mapped to data=.

API Breakage?

It's unclear how much API breakage would be tolerated here. We can discuss in a PR, but TLDR I believe for a minor version bump that:

  • absolutely zero breakage should be tolerated for the FivetranSensor and FivetranOperator.
  • very minor breakage can be tolerated for FivetranHook. Specifically for private methods, and methods where the return type is never actually used by anything in FivetranSensor or FivetranOperator:
    • _do_api_call and _do_api_call_async should be changed to look and feel like the requests.request and aiohttp.ClientSession.request APIs.
      • That said, an error message should clearly state to other users what the issue is by checking whether the first arg to these calls is a tuple. I imagine there are some people who are poking under the hood of this code and are calling _do_api_call() directly.
    • FivetranHook.check_connector should return the connector_details dict object. (So that prep_connector can avoid making an unnecessary 3rd API call.)
    • FivetranHookprep_connector should return either None or dict[str, Any] | None. (Note that the returned value is never used anywhere in the code).

Inconsistency in the Hooks Class

We are using the new Async Operator for our Fivetran Sync in AWS MWAA

airflow-provider-fivetran-async==2.0.0

and we are facing the below warning

WARNING - Inconsistency! The hook class '<class 'fivetran_provider_async.hooks.FivetranHookAsync'>' declares connection type 'fivetran' but it is added by provider 'airflow-provider-fivetran-async' as connection_type 'Fivetran' in provider info. This should be fixed!
image

Is this anything we need to worry about or can it be ignored! please help me on this

fivetran_provider_async still has a reference to fivetran_provider

This is caused by #33

fivetran_provider_async still has a reference to fivetran_provider on this line:

"extra-links": ["fivetran_provider.operators.fivetran.RegistryLink"],

This is causing import errors in the Airflow webserver logs:

[2023-08-10T14:39:57.174+0000] {providers_manager.py:253} WARNING - Exception when importing 'fivetran_provider.operators.fivetran.RegistryLink' from 'airflow-provider-fivetran-async' package

ModuleNotFoundError: No module named 'fivetran_provider'

Transient Networking Errors & Timing Delay

Hello,

I am running into a failure in our production environment with the async sensor. I have observed a few odd behaviors in this and I'm looking for direction on troubleshooting steps. Ultimately we were able to clear the task and it ran successfully.

  1. Network issue:
[2023-05-12, 09:22:00 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/fivetran_provider_async/sensors.py", line 55, in execute_complete
    raise AirflowException(msg)
airflow.exceptions.AirflowException: error: Cannot connect to host api.fivetran.com:443 ssl:default [Connect call failed ('35.236.237.87', 443)]
[2023-05-12, 11:14:02 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/fivetran_provider_async/sensors.py", line 55, in execute_complete
    raise AirflowException(msg)
airflow.exceptions.AirflowException: error: API requests to Fivetran failed 3 times. Giving up.

It would appear that this is a networking issue that might have been caused by a service outage from Fivetran. However, we were able to trigger the sync in an upstream task and the sync completed without error in Fivetran. The Fivetran status page isn't showing any server issues on the day. Any troubleshooting suggestions would be appreciated.

  1. Timing Delay

We are observing large delays between the task initialization and the execution. Any information you can provide about why we might be seeing these timing delays would be appreciated.

Attempt 1: 1 hr 22 mintues

[2023-05-12, 08:00:16 UTC] {taskinstance.py:1465} INFO - Pausing task as DEFERRED. dag_id=marketing_app_data, task_id=sensor_fivetran_marketing_app, execution_date=20230511T080000, start_date=20230512T080016
[2023-05-12, 08:00:16 UTC] {local_task_job.py:164} INFO - Task exited with return code 0
[2023-05-12, 08:00:16 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

[2023-05-12, 09:22:00 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: marketing_app_data.sensor_fivetran_marketing_app scheduled__2023-05-11T08:00:00+00:00 [queued]>

Attempt 2: 1 hr

[2023-05-12, 09:23:01 UTC] {taskinstance.py:1465} INFO - Pausing task as DEFERRED. dag_id=marketing_app_data, task_id=sensor_fivetran_marketing_app, execution_date=20230511T080000, start_date=20230512T092301
[2023-05-12, 09:23:01 UTC] {local_task_job.py:164} INFO - Task exited with return code 0
[2023-05-12, 09:23:01 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

[2023-05-12, 10:22:40 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: marketing_app_data.sensor_fivetran_marketing_app scheduled__2023-05-11T08:00:00+00:00 [queued]>
[2023-05-12, 10:22:40 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: marketing_app_data.sensor_fivetran_marketing_app scheduled__2023-05-11T08:00:00+00:00 [queued]>

Attempt 3: 2 minutes

[2023-05-12, 10:23:42 UTC] {taskinstance.py:1465} INFO - Pausing task as DEFERRED. dag_id=marketing_app_data, task_id=sensor_fivetran_marketing_app, execution_date=20230511T080000, start_date=20230512T102342
[2023-05-12, 10:23:42 UTC] {local_task_job.py:164} INFO - Task exited with return code 0
[2023-05-12, 10:23:42 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

[2023-05-12, 10:25:49 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: marketing_app_data.sensor_fivetran_marketing_app scheduled__2023-05-11T08:00:00+00:00 [queued]>
[2023-05-12, 10:25:49 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: marketing_app_data.sensor_fivetran_marketing_app scheduled__2023-05-11T08:00:00+00:00 [queued]>

Attempt 4: 47 minutes

[2023-05-12, 10:26:51 UTC] {taskinstance.py:1465} INFO - Pausing task as DEFERRED. dag_id=marketing_app_data, task_id=sensor_fivetran_marketing_app, execution_date=20230511T080000, start_date=20230512T102650
[2023-05-12, 10:26:51 UTC] {local_task_job.py:164} INFO - Task exited with return code 0
[2023-05-12, 10:26:51 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

[2023-05-12, 11:14:01 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: marketing_app_data.sensor_fivetran_marketing_app scheduled__2023-05-11T08:00:00+00:00 [queued]>
[2023-05-12, 11:14:01 UTC] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: marketing_app_data.sensor_fivetran_marketing_app scheduled__2023-05-11T08:00:00+00:00 [queued]>

Merge sync and async operator sensor

Merge sync and async operator sensor

depend on #33

  • Merge FivetranOperator and FivetranOperatorAsync, keep operator name FivetranOperator and add param deferrable=True|False
  • Merge FivetranSensor and FivetranSensorAsync, keep sensor name FivetranSensor and add param deferrable=True|False
  • Use pytest accross unit tests
  • Deprecate existing FivetranSensorAsync and FivetranOperatorAsync

Operator link for FivetranOperatorAsync tasks is incorrect

In the Task Instance modal and Grid page, the operator link titled "Astronomer Registry" points to a non-existent page:
https://registry.astronomer.io/providers/fivetran/modules/fivetranoperator

Instead the link should point to this page since the sync version of the provider is no longer displayed on the Astronomer Registry: https://registry.astronomer.io/providers/airflow-provider-fivetran-async/versions/latest/modules/FivetranOperatorAsync

Although, maybe it makes sense to update the operator link to point to the Fivetran dashboard or logs instead?

Fivetran Connection type not enabled in AWS MWAA with airflow version 2.6.3

HI,

I am trying to setup Fivetran connection type in AWS MWAA with airflow version 2.6.3 . But i am not able see Fivetran as option in connection type drop down.

Steps:
I have installed airflow-provider-fivetran-async==1.1.2 version and verified the installation successful.

Error:

DeprecationWarning: The provider airflow-provider-fivetran-async uses hook-class-names property in provider-info and has no connection-types one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use both in case you want to have backwards compatibility with Airflow < 2.2..

Could you please suggest on this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.