Overview of existing FivetranSensor
behavior
The FivetranSensor does the following:
- At start, retrieves the last completed time of a Fivetran sync job
- Waits around for a new completed time of a Fivetran sync job
I feel that this control flow is not what a majority of users will find optimal for a couple reasons.
First, any user triggering Fivetran via the FivetranOperator
should probably prefer to use an ExternalSensorOperator
over the FivetranSensor
. This means the FivetranSensor
is at its best when waiting on jobs scheduled by Fivetran automatically, rather than jobs scheduled by Airflow (and thus scheduled in Fivetran "manually").
For that reason, all examples and issues described below assume the user's system is that they are using the Sensor to wait on Fivetran jobs scheduled by Fivetran rather than jobs scheduled by Airflow. Many of the issues I will be describing still apply when scheduling Fivetran jobs via Airflow (and see my Implementation proposal section where I discuss a feature that would make sense to be added to FivetranOperator
), but these issues are more pronounced and easier to understand when thinking about the Fivetran scheduler interoperating with the Airflow scheduler.
The main issue: backfilling (with 2 examples)
The main issue is that when waiting on Fivetran scheduled jobs, a DAG that is being backfilled does not necessarily need to wait for a new Fivetran job.
Scenario A (Last fail > Last success)
Imagine the following situation (Scenario A):
- Today is January 15th
- The last successful Fivetran sync was January 10th
- The last unsuccessful Fivetran sync was January 12th
- We are backfilling an Airflow dag with a
@daily
schedule starting January 1st
The user wants the following behavior to occur:
- Successes up through January 10th because the data has been succesfully synced as of those dates.
- Failures on Jan 11th and 12th because a sync was attempted, but the job failed.
- The ability to select between one of two control flow behaviors:
- (1) Waiting on Jan 14th and 15th because the system has not received a completed status since the Jan 12th failure.
- (2) Failures on Jan 14th and 15th because previous Fivetran sensor tasks failed, and they don't want their DAG to have any potential gaps that come out of a temporal inconsistency of when the jobs were run. (The user can also do
depends_on_past=True
, but there is a subtle difference between that and this.)
This will allow the user to backfill data up to January 10th, at which point the DAG will fail:
Su |
Mo |
Tu |
We |
Th |
Fr |
Sa |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
❌ |
❌ |
❓ |
❓ |
❓ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
|
|
|
|
The user in this case implements the FivetranSensor
, and instead the way their DAG works is that it waits for the next Fivetran job to trigger, even though the backfilled jobs don't need to wait around for anything.
For this small backfill and with Fivetran jobs that aren't failing, this isn't a huge deal, but when backfilling years of data in daily chunks, this can unnecessarily slow down a backfill a lot. Imagine you sync your Fivetran data once a day, and your Airflow DAG's max active DagRuns is 10. In this case, implementing a FivetranSensor is bottlenecking your backfill by 10 DagRuns a day, and the only way around it is to implement complex control flow logic (e.g. BranchDateTimeOperator), or to implement a custom implementation, or to implement the backfill as a separate DAG.
Scenario B (Last success > Last time)
Scenario A was designed to see the full range of behaviors, but a far more typical scenario is that a last success occurred much more recently in the future than a last failure.
Imagine the following situation (Scenario B):
- Today is January 15th
- The last successful Fivetran sync was January 10th
- The last unsuccessful Fivetran sync was January 2th
- We are backfilling an Airflow dag with a
@daily
schedule starting January 1st
The user wants the following behavior to occur:
- Successes up through January 10th because the data has been succesfully synced as of those dates.
- Waiting between Jan 11th to Jan 15th because there are no further system updates since then.
The desired behavior when last fail time > last success time was up to interpretation, but here it is a little more straightforward that we should just be waiting.
Su |
Mo |
Tu |
We |
Th |
Fr |
Sa |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
✅ |
⌛ |
⌛ |
⌛ |
⌛ |
⌛ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
⬜ |
|
|
|
|
Other issues
Fault tolerance
The FivetranSensor
is not fault tolerant in the sense that, if the FivetranSensor is restarted, the new instance of the sensor may end up waiting on a different datetime than the previous instance.
For example, imagine a user has some sort of error that causes the task to fail (doesn't need to even come from the sensor itself; it could be e.g. an OOM error on the worker running the job). If a Fivetran jobs completes between the time the job failed and was resumed, the sensor will now be waiting on a different Fivetran job to complete.
Race condition (into cascading failures)
Imagine a user syncs the Fivetran job once every hour on the hour, and their Airflow DAG also syncs once every hour on the hour. One of the tasks within the DAG is a FivetranSensor
.
Imagine the Fivetran job typically takes 55 seconds after the hour to run. This means that if the FivetranSensor
is not executed by 00:00:55
, then the sensor will end up waiting a whole hour to run, i.e. it completes at around 01:00:55
.
It gets worse from here. Imagine the whole DAG is configured with default_args={"depends_on_past": True}
. The FivetranSensor
with execution date 00:00:00
that got stuck for an hour will end up blocking the next Fivetran sensor task from getting scheduled, meaning that the FivetranSensor
with execution date 01:00:00
won't finish until 02:00:55
. This is a cascading failure!
Implementation proposal
FivetranDateTimeSensor
The core functionality I am proposing is a FivetranDateTimeSensor
.
- For backwards compatibility reasons, this should be a separate sensor and should not replace the existing
FivetranSensor
.
- Please provide feedback on the name, as I don't know what it should be called. I do believe this is a reasonable name because the behavior of this Sensor is very similar to how the
DateTimeSensor
works, which is that it waits for a time to pass but passes immediately on backfills.
The majority of the functionality can just become a new method in the hook.
Additional control flow kwargs:
target_time: datetime | str = "{{ data_interval_end }}"
. (Note: this is a templated field). This kwarg name comes directly from DateTimeSensor
, albeit here it is optional. This is the timestamp that the Fivetran completed time is compared to.
propagate_failures_forward: bool = True
- The behavior of this should be: when this flag is True, it makes it so the sensor fails when context["data_interval_end"] > fivetran_failed_at > fivetran_succeeded_at
. If the flag is False, then instead it will wait around until there is a completed at time.
- By default this should be True because (1) Fivetran shows the status of a connector as its latest status (2) it is more likely in practice to be a good "better safe than sorry" option for people.
always_wait_when_syncing: bool = False
- Fivetran syncs in chunks and this can cause issues when reading from a database currently being synced in some situations. Imagine for example a transform job that "increments" using select max(write_time) from tbl
. Fivetran writes in chunks not necessarily ordered by write_time
, meaning doing stuff while Fivetran is syncing can cause you to skip data. (This is not the best example because you probably wouldn't have a backfill job touching the most recent write_time
data, but this can still happen in other contexts).
- By default this should be False because (1) it is not an obvious behavior / it is an added layer of complexity over the default and simple advertised behavior of the sensor (2) it is not typical or ideal that data being backfilled should be impacted by most recent syncs.
I see it as uncommon in most Airflow provider packages to create large inheritance trees, so it is reasonable to simply implement all of this as a subclass of BaseSensor and just allow the FivetranHook
abstraction to do most of the heavy lifting.
Additional kwarg for FivetranOperator
On the topic of supporting backfills in a sensible manner, the FivetranOperator
should also have a kwarg that skips running the Fivetran job when context["data_interval_end"] > fivetran_succeeded_at
.
I'm not sure what a good name for this kwarg may be. skip_if_succeeded_after_data_interval_end
is a little on the verbose side, but is an accurate description. I'd love to hear if anyone has ideas for a snappier name though.
For backwards compatibility, it should be introduced as having a default of False
, albeit I do believe this would be a good default in a future release.
Other notes
-
I already have a version of this implemented in a production system, and am willing to implement a version of this functionality in this open source library.
-
Regardless of what happens, the documentation for the existing FivetranSensor
should be more clear about what is happening.
-
EDIT: I realize I should be using the data_interval_end
and not the logical_date
. Logical date would be sensible if we knew when Fivetran last syncs start, but we do not have that info available; we only know when a sync ends.