kestra-io / plugin-debezium Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
for now, there is no convenient way to handle delete & update on destination.
It's simple to append only the data with the format: RAW
, but when you need to keep the structure on the destination and remove duplicate rows (using update & delete), there is no convenient way to do it.
You can easily use a FileTransform in order to generate multiples sql queries, but Batch from jdbc will not accepted them (since we used the same query for all the operation): kestra-io/plugin-jdbc#81
Does we need to add a new format: SQL
?
Currently, Debezium logs are restricted to ERROR only.
This is an issue for debugging Debezium related issue, see kestra-io/kestra#1728
We should have a way to capture Debezium logs for debugging purpose.
Also, the basic setup may restric to WARNING instead of error.
The Debezium trigger should see all new records added during the interval. I have 2 flows with Debezium trigger connected to different Postgres servers. I think it's quite stable with only 1 flow that includes the Debezium trigger.
It worked fine after setup. However, one of the triggers was evaluating but didn't see any records after a few days. I see in the logs: Ended after receiving 0 records: {}
but I added a new record to the Postgres database. A restart of Kestra Worker helped to resume catching new records (almost immediately trigger found new records that were added before the restart). However, my second flow with the Debezium trigger worked correctly all the time so the issue was with only one flow.
There was another issue when Debezium Postgres trigger stopped the evaluation and did not provide any logs. Then the restart helped again. See the screenshot for an example of this case. Debezium ran at 04:13:05.901 then only at 22:23:47.637 (after my restart).
Also, it's possible that trigger found new records after some time. I set interval: PT30S
but the trigger evaluates that 0 records were received a few times and only after 5 mins approximately catch the new records.
I tried to solve the issues myself by the next steps:
Maybe, there is any conflict between Debezium triggers and something like different stateName
or something can help.
triggers:
- id: listen-debezium-identityserver
type: "io.kestra.plugin.debezium.postgres.Trigger"
hostname: "{{vars.sourceServer}}"
port: '5432'
username: "{{vars.replicationUser}}"
password: "{{secret('PG_PROD_IDP_REPLICAION_USER_PASSWORD')}}"
database: "{{vars.databaseName}}"
sslMode: REQUIRE
snapshotMode: INITIAL
slotName: "{{vars.databaseName}}"
includedTables: public.identifications,public.external_identifications
pluginName: PGOUTPUT
format: INLINE
metadata: ADD_FIELD
deleted: DROP
interval: PT60S
tasks:
- id: parallel
type: io.kestra.core.tasks.flows.EachParallel
value: "{{vars.tableNameList}}"
tasks:
- id: if
type: io.kestra.core.tasks.flows.If
condition: "{{trigger.uris[taskrun.value] ?? false }}"
then:
- id: launch
type: io.kestra.core.tasks.flows.Flow
namespace: common
flowId: syncTable
inputs:
fullTableName: "{{parents[0].taskrun.value}}"
debeziumFile: "{{trigger.uris[parents[0].taskrun.value]}}"
databaseUserName: "{{vars.destinationDatabaseUserName}}"
databaseUserSecret: "{{vars.destinationDatabaseUserSecret}}"
serverName: "{{vars.destinationServer}}"
databaseName: "{{vars.databaseName}}"
labels:
destinationServer: "{{vars.destinationServer}}"
sourceServer: "{{vars.sourceServer}}"
tableName: "{{parents[0].taskrun.value}}"
wait: true
transmitFailed: true
outputs:
sql: "{{ outputs.node.outputFiles['script.sql'] }}"
json: "{{ outputs.json.uri }}"
errors:
- id: send_email
type: io.kestra.plugin.notifications.mail.MailSend
from: "{{vars.fromEmail}}"
to: "{{vars.toEmail}}"
username: "{{ secret('EMAIL_USERNAME') }}"
password: "{{ secret('EMAIL_PASSWORD') }}"
host: smtp.postmarkapp.com
port: 587
transportStrategy: SMTP
subject: "Synchronization of {{flow.id}} database failed"
htmlTextContent: |
Synchronization of database from {{vars.sourceServer}} to {{vars.sourceServer}} failed.<br />
Please check Kestra server for more details.
#logical decoding requires wal_level = logical
triggers:
- id: listen-debezium-identityserver
type: "io.kestra.plugin.debezium.postgres.Trigger"
hostname: "{{vars.sourceServer}}"
port: '5432'
username: "{{vars.replicationUser}}"
password: "{{secret('PG_PROD_IDP_REPLICAION_USER_PASSWORD')}}"
database: "{{vars.databaseName}}"
sslMode: REQUIRE
snapshotMode: INITIAL
slotName: "{{vars.databaseName}}"
includedTables: public.identifications,public.external_identifications
pluginName: PGOUTPUT
format: INLINE
metadata: ADD_FIELD
deleted: DROP
interval: PT60S
After the initial snapshot, I would like when changes happen for debezium to pick them up.
But the changes are never captured.
Below is a snippet of the logs received even after updating the tables multiple times.
2024-03-01 11:53:14.764 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:53:14.291 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:51:46.702 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:51:46.206 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:50:18.952 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:50:18.485 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:48:48.316 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:48:47.818 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:47:29.771 trigger_sub_flow_for_cdc_events_executable[26bPjyBH131x1cQU6mus20] Create new execution for flow 'engineering'.'process_cdc_events' with id '15juNMOI3WrGlpU7mVe3xc'
2024-03-01 11:47:25.003 trigger_sub_flow_for_cdc_events_executable[26bPjyBH131x1cQU6mus20]Create new execution for flow 'engineering'.'process_cdc_events' with id '7ms5luSBYk0AWz9NCblLLE'
2024-03-01 11:47:21.765 trigger_cdc_events_capturedEnded after receiving 11 records: {boot_staging.hosts=5, boot_staging.statuses=6}
2024-03-01 11:47:20.857 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
Followed this guide: AWS EC2 with Amazon RDS and S3 To setup the environment.
Used MySQL
instead of Postgres
for the database.
Created this flow:
id: debezium-mysql-connector-1
namespace: engineering
tasks:
- id: loop_cdc_tables
type: io.kestra.core.tasks.flows.EachSequential
value: "{{ trigger.uris | keys }}"
tasks:
- id: trigger_sub_flow_for_cdc_events
type: io.kestra.core.tasks.flows.ForEachItem
items: "{{ trigger.uris[parent.taskrun.value] }}"
batch:
rows: 1000
namespace: engineering
flowId: process_cdc_events
wait: true
transmitFailed: true
inputs:
data: "{{ taskrun.items }}"
triggers:
- id: trigger_cdc_events_captured
type: io.kestra.plugin.debezium.mysql.Trigger
hostname: host
password: "pwd"
username: username
port: "3306"
ignoreDdl: true
format: INLINE
snapshotMode: INITIAL
includedTables:
- boot_staging.statuses
- boot_staging.hosts
disabled: false
Table creation:
CREATE TABLE `statuses` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`class` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`order` int(11) NOT NULL,
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
KEY `statuses_order_index` (`order`) USING BTREE
)
INSERT INTO `statuses` (`id`, `name`, `class`, `order`, `created_at`, `updated_at`) VALUES (1, 'Draft', 'default', 1, '2017-07-07 19:58:21', '2017-07-07 19:58:21');
Once the trigger is enabled. It grabs the details of the included tables.
But then I trigger an update
update statuses
set class = 'Test'
where id = 1;
I then expect the change to be tracked... but minutes go by and nothing is tracked.
2024-03-01 11:53:14.764 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:53:14.291 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:51:46.702 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:51:46.206 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:50:18.952 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:50:18.485 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:48:48.316 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:48:47.818 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:47:29.771 trigger_sub_flow_for_cdc_events_executable[26bPjyBH131x1cQU6mus20] Create new execution for flow 'engineering'.'process_cdc_events' with id '15juNMOI3WrGlpU7mVe3xc'
2024-03-01 11:47:25.003 trigger_sub_flow_for_cdc_events_executable[26bPjyBH131x1cQU6mus20]Create new execution for flow 'engineering'.'process_cdc_events' with id '7ms5luSBYk0AWz9NCblLLE'
2024-03-01 11:47:21.765 trigger_cdc_events_capturedEnded after receiving 11 records: {boot_staging.hosts=5, boot_staging.statuses=6}
2024-03-01 11:47:20.857 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
id: debezium-mysql-connector-1
namespace: engineering
tasks:
- id: loop_cdc_tables
type: io.kestra.core.tasks.flows.EachSequential
value: "{{ trigger.uris | keys }}"
tasks:
- id: trigger_sub_flow_for_cdc_events
type: io.kestra.core.tasks.flows.ForEachItem
items: "{{ trigger.uris[parent.taskrun.value] }}"
batch:
rows: 1000
namespace: engineering
flowId: process_cdc_events
wait: true
transmitFailed: true
inputs:
data: "{{ taskrun.items }}"
triggers:
- id: trigger_cdc_events_captured
type: io.kestra.plugin.debezium.mysql.Trigger
hostname: host
password: "pwd"
username: username
port: "3306"
ignoreDdl: true
format: INLINE
snapshotMode: INITIAL
includedTables:
- boot_staging.statuses
- boot_staging.hosts
disabled: false
MySQL Debezium snapshot is completed successfully
During the initial run, Debezium does table data and database schema snapshot.
With the default maxWait
time which is 10 seconds, the plugin does not wait until it completes the snapshot and interrupts it.
The same issue occurs if a user sets maxRecords
which is less than the row count in the captured tables.
To provide data consistency with the source database the plugin should wait until the snapshot is done.
2023-07-12 23:11:06,494 INFO stra-change-event-source-coordinator d.c.m.MySqlSnapshotChangeEventSource Reading structure of database 'dynamodb-clone'
2023-07-12 23:11:13,126 INFO WorkerThread io.debezium.embedded.EmbeddedEngine Stopping the embedded engine
2023-07-12 23:11:13,127 INFO WorkerThread io.debezium.embedded.EmbeddedEngine Waiting for PT5M for connector to stop
2023-07-12 23:11:13,610 INFO Capture_0 io.debezium.embedded.EmbeddedEngine Stopping the task and engine
2023-07-12 23:11:13,611 INFO Capture_0 i.d.connector.common.BaseSourceTask Stopping down connector
2023-07-12 23:12:09,054 INFO stra-change-event-source-coordinator .RelationalSnapshotChangeEventSource Snapshot step 6 - Persisting schema history
2023-07-12 23:12:09,180 WARN stra-change-event-source-coordinator .s.AbstractSnapshotChangeEventSource Snapshot was interrupted before completion
2023-07-12 23:12:09,180 INFO stra-change-event-source-coordinator .s.AbstractSnapshotChangeEventSource Snapshot - Final stage
2023-07-12 23:12:09,180 WARN stra-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator Change event source executor was interrupted
java.lang.InterruptedException: Interrupted while processing event SchemaChangeEvent [database=, schema=null, ddl=SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci, tables=[], type=DATABASE]
at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.createSchemaChangeEventsForTables(MySqlSnapshotChangeEventSource.java:532)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:121)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-07-12 23:12:09,313 INFO pool-15-thread-1 io.debezium.jdbc.JdbcConnection Connection gracefully closed
2023-07-12 23:12:09,314 INFO Capture_0 o.a.k.c.s.FileOffsetBackingStore Stopped FileOffsetBackingStore
2023-07-12 23:12:09,314 INFO Capture_0 f.t.4.12AIsOBFwmo07iIlg1WqQz Debezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2023-07-12 23:12:09,319 INFO WorkerThread f.t.4.12AIsOBFwmo07iIlg1WqQz Ended after receiving 0 records: {}
id: test-pipeline
namespace: dev
tasks:
for now, we rely on Debezium 1.x
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.