Git Product home page Git Product logo

plugin-debezium's People

Contributors

anna-geller avatar dependabot[bot] avatar fhussonnois avatar loicmathieu avatar skraye avatar smantri-moveworks avatar tchiotludo avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

plugin-debezium's Issues

Use debezium cdc to replicate from Mysql to Oracle, Mysql to Mysql, ...

for now, there is no convenient way to handle delete & update on destination.
It's simple to append only the data with the format: RAW, but when you need to keep the structure on the destination and remove duplicate rows (using update & delete), there is no convenient way to do it.
You can easily use a FileTransform in order to generate multiples sql queries, but Batch from jdbc will not accepted them (since we used the same query for all the operation): kestra-io/plugin-jdbc#81

Does we need to add a new format: SQL ?

Provides a way to capture debezium logs

Feature description

Currently, Debezium logs are restricted to ERROR only.
This is an issue for debugging Debezium related issue, see kestra-io/kestra#1728

We should have a way to capture Debezium logs for debugging purpose.

Also, the basic setup may restric to WARNING instead of error.

Debezium Postgres trigger is evaluating but don't see any records

Expected Behavior

The Debezium trigger should see all new records added during the interval. I have 2 flows with Debezium trigger connected to different Postgres servers. I think it's quite stable with only 1 flow that includes the Debezium trigger.

Actual Behaviour

It worked fine after setup. However, one of the triggers was evaluating but didn't see any records after a few days. I see in the logs: Ended after receiving 0 records: {} but I added a new record to the Postgres database. A restart of Kestra Worker helped to resume catching new records (almost immediately trigger found new records that were added before the restart). However, my second flow with the Debezium trigger worked correctly all the time so the issue was with only one flow.

There was another issue when Debezium Postgres trigger stopped the evaluation and did not provide any logs. Then the restart helped again. See the screenshot for an example of this case. Debezium ran at 04:13:05.901 then only at 22:23:47.637 (after my restart).

Also, it's possible that trigger found new records after some time. I set interval: PT30S but the trigger evaluates that 0 records were received a few times and only after 5 mins approximately catch the new records.

I tried to solve the issues myself by the next steps:

  1. Change Kestra architecture flow standalone to webserver, executor, scheduler, worker setup. I had an idea that it could improve performance.
  2. Rename Debezium trigger to something unique (the ids of triggers in different flows were identical previously)

Maybe, there is any conflict between Debezium triggers and something like different stateName or something can help.

image

Steps To Reproduce

  1. Create PUBLICATION in 2 different Postgres servers for some tables.
  2. Create 2 Kestra flows with Debezium Postgres trigger.
    For example:
triggers:
  - id: listen-debezium-identityserver
    type: "io.kestra.plugin.debezium.postgres.Trigger"
    hostname: "{{vars.sourceServer}}"
    port: '5432'
    username: "{{vars.replicationUser}}"
    password: "{{secret('PG_PROD_IDP_REPLICAION_USER_PASSWORD')}}"
    database: "{{vars.databaseName}}"
    sslMode: REQUIRE
    snapshotMode: INITIAL
    slotName: "{{vars.databaseName}}"
    includedTables: public.identifications,public.external_identifications
    pluginName: PGOUTPUT
    format: INLINE
    metadata: ADD_FIELD
    deleted: DROP
    interval: PT60S
  1. Run some SQL commands against Postgres servers connected to Debezium triggers.
  2. Check that Debezium evaluations work fine.
  3. Leave Kestra flows for a few days (Postgres servers should be in use all this time ideally)
  4. Run SQL commands again and check logs in Kestra flows)

Environment Information

  • Kestra Version: 0.13.1
  • Plugin version: 0.13.0
  • Operating System (OS / Docker / Kubernetes): Kubernetes helm chart (webserver, executor, scheduler, worker and Postgres setup)

Example flow

tasks:
  - id: parallel
    type: io.kestra.core.tasks.flows.EachParallel
    value: "{{vars.tableNameList}}"
    tasks:
    - id: if
      type: io.kestra.core.tasks.flows.If
      condition: "{{trigger.uris[taskrun.value] ?? false }}"
      then:  
      - id: launch
        type: io.kestra.core.tasks.flows.Flow
        namespace: common
        flowId: syncTable
        inputs:
          fullTableName: "{{parents[0].taskrun.value}}"
          debeziumFile: "{{trigger.uris[parents[0].taskrun.value]}}"
          databaseUserName: "{{vars.destinationDatabaseUserName}}"
          databaseUserSecret: "{{vars.destinationDatabaseUserSecret}}"
          serverName: "{{vars.destinationServer}}"
          databaseName: "{{vars.databaseName}}"
        labels:
          destinationServer: "{{vars.destinationServer}}"
          sourceServer: "{{vars.sourceServer}}"
          tableName: "{{parents[0].taskrun.value}}"
        wait: true
        transmitFailed: true
        outputs:
          sql: "{{ outputs.node.outputFiles['script.sql'] }}"
          json: "{{ outputs.json.uri }}"

errors:
  - id: send_email
    type: io.kestra.plugin.notifications.mail.MailSend
    from: "{{vars.fromEmail}}"
    to: "{{vars.toEmail}}"
    username: "{{ secret('EMAIL_USERNAME') }}"
    password: "{{ secret('EMAIL_PASSWORD') }}"
    host: smtp.postmarkapp.com
    port: 587
    transportStrategy: SMTP
    subject: "Synchronization of {{flow.id}} database failed"
    htmlTextContent: |
      Synchronization of database from {{vars.sourceServer}}  to {{vars.sourceServer}} failed.<br />
      Please check Kestra server for more details.

#logical decoding requires wal_level = logical
triggers:
  - id: listen-debezium-identityserver
    type: "io.kestra.plugin.debezium.postgres.Trigger"
    hostname: "{{vars.sourceServer}}"
    port: '5432'
    username: "{{vars.replicationUser}}"
    password: "{{secret('PG_PROD_IDP_REPLICAION_USER_PASSWORD')}}"
    database: "{{vars.databaseName}}"
    sslMode: REQUIRE
    snapshotMode: INITIAL
    slotName: "{{vars.databaseName}}"
    includedTables: public.identifications,public.external_identifications
    pluginName: PGOUTPUT
    format: INLINE
    metadata: ADD_FIELD
    deleted: DROP
    interval: PT60S
  

Debezium captures initial snapshot but does not for subsequent changes

Expected Behavior

After the initial snapshot, I would like when changes happen for debezium to pick them up.
But the changes are never captured.

Actual Behaviour

Below is a snippet of the logs received even after updating the tables multiple times.

2024-03-01 11:53:14.764 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:53:14.291 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:51:46.702 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:51:46.206 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:50:18.952 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:50:18.485 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:48:48.316 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:48:47.818 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:47:29.771 trigger_sub_flow_for_cdc_events_executable[26bPjyBH131x1cQU6mus20] Create new execution for flow 'engineering'.'process_cdc_events' with id '15juNMOI3WrGlpU7mVe3xc'
2024-03-01 11:47:25.003 trigger_sub_flow_for_cdc_events_executable[26bPjyBH131x1cQU6mus20]Create new execution for flow 'engineering'.'process_cdc_events' with id '7ms5luSBYk0AWz9NCblLLE'
2024-03-01 11:47:21.765 trigger_cdc_events_capturedEnded after receiving 11 records: {boot_staging.hosts=5, boot_staging.statuses=6}
2024-03-01 11:47:20.857 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'

Steps To Reproduce

Followed this guide: A​W​S ​E​C2 with ​Amazon ​R​D​S and ​S3 To setup the environment.

Used MySQL instead of Postgres for the database.

Created this flow:

id: debezium-mysql-connector-1
namespace: engineering

tasks:
  - id: loop_cdc_tables
    type: io.kestra.core.tasks.flows.EachSequential
    value: "{{ trigger.uris | keys }}"
    tasks:
      - id: trigger_sub_flow_for_cdc_events
        type: io.kestra.core.tasks.flows.ForEachItem
        items: "{{ trigger.uris[parent.taskrun.value] }}"
        batch:
          rows: 1000
        namespace: engineering
        flowId: process_cdc_events
        wait: true
        transmitFailed: true
        inputs:
          data: "{{ taskrun.items }}"

triggers:
  - id: trigger_cdc_events_captured
    type: io.kestra.plugin.debezium.mysql.Trigger
    hostname: host
    password: "pwd"
    username: username
    port: "3306"
    ignoreDdl: true
    format: INLINE
    snapshotMode: INITIAL
    includedTables:
      - boot_staging.statuses
      - boot_staging.hosts

disabled: false

Table creation:

CREATE TABLE `statuses` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
  `class` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
  `order` int(11) NOT NULL,
  `created_at` timestamp NULL DEFAULT NULL,
  `updated_at` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  KEY `statuses_order_index` (`order`) USING BTREE
)

INSERT INTO `statuses` (`id`, `name`, `class`, `order`, `created_at`, `updated_at`) VALUES (1, 'Draft', 'default', 1, '2017-07-07 19:58:21', '2017-07-07 19:58:21');

Once the trigger is enabled. It grabs the details of the included tables.

But then I trigger an update

update statuses 
   set class = 'Test'
 where id = 1;

I then expect the change to be tracked... but minutes go by and nothing is tracked.

2024-03-01 11:53:14.764 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:53:14.291 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:51:46.702 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:51:46.206 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:50:18.952 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:50:18.485 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:48:48.316 trigger_cdc_events_capturedEnded after receiving 0 records: {}
2024-03-01 11:48:47.818 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2024-03-01 11:47:29.771 trigger_sub_flow_for_cdc_events_executable[26bPjyBH131x1cQU6mus20] Create new execution for flow 'engineering'.'process_cdc_events' with id '15juNMOI3WrGlpU7mVe3xc'
2024-03-01 11:47:25.003 trigger_sub_flow_for_cdc_events_executable[26bPjyBH131x1cQU6mus20]Create new execution for flow 'engineering'.'process_cdc_events' with id '7ms5luSBYk0AWz9NCblLLE'
2024-03-01 11:47:21.765 trigger_cdc_events_capturedEnded after receiving 11 records: {boot_staging.hosts=5, boot_staging.statuses=6}
2024-03-01 11:47:20.857 trigger_cdc_events_capturedDebezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'

Environment Information

  • Kestra Version: v0.16.0 full and v0.15.0 full
  • Plugin version: Latest-Full
  • Operating System (OS / Docker / Kubernetes): Docker, AWS EC2 Instance
  • Java Version (If not docker): N/A

Example flow

id: debezium-mysql-connector-1
namespace: engineering

tasks:
  - id: loop_cdc_tables
    type: io.kestra.core.tasks.flows.EachSequential
    value: "{{ trigger.uris | keys }}"
    tasks:
      - id: trigger_sub_flow_for_cdc_events
        type: io.kestra.core.tasks.flows.ForEachItem
        items: "{{ trigger.uris[parent.taskrun.value] }}"
        batch:
          rows: 1000
        namespace: engineering
        flowId: process_cdc_events
        wait: true
        transmitFailed: true
        inputs:
          data: "{{ taskrun.items }}"

triggers:
  - id: trigger_cdc_events_captured
    type: io.kestra.plugin.debezium.mysql.Trigger
    hostname: host
    password: "pwd"
    username: username
    port: "3306"
    ignoreDdl: true
    format: INLINE
    snapshotMode: INITIAL
    includedTables:
      - boot_staging.statuses
      - boot_staging.hosts

disabled: false

MySQL Debezium snapshot is interrupted by maxWait

Expected Behavior

MySQL Debezium snapshot is completed successfully

Actual Behaviour

During the initial run, Debezium does table data and database schema snapshot.
With the default maxWait time which is 10 seconds, the plugin does not wait until it completes the snapshot and interrupts it.
The same issue occurs if a user sets maxRecords which is less than the row count in the captured tables.
To provide data consistency with the source database the plugin should wait until the snapshot is done.

2023-07-12 23:11:06,494 INFO  stra-change-event-source-coordinator d.c.m.MySqlSnapshotChangeEventSource Reading structure of database 'dynamodb-clone'
2023-07-12 23:11:13,126 INFO  WorkerThread io.debezium.embedded.EmbeddedEngine Stopping the embedded engine
2023-07-12 23:11:13,127 INFO  WorkerThread io.debezium.embedded.EmbeddedEngine Waiting for PT5M for connector to stop
2023-07-12 23:11:13,610 INFO  Capture_0    io.debezium.embedded.EmbeddedEngine Stopping the task and engine
2023-07-12 23:11:13,611 INFO  Capture_0    i.d.connector.common.BaseSourceTask Stopping down connector
2023-07-12 23:12:09,054 INFO  stra-change-event-source-coordinator .RelationalSnapshotChangeEventSource Snapshot step 6 - Persisting schema history
2023-07-12 23:12:09,180 WARN  stra-change-event-source-coordinator .s.AbstractSnapshotChangeEventSource Snapshot was interrupted before completion
2023-07-12 23:12:09,180 INFO  stra-change-event-source-coordinator .s.AbstractSnapshotChangeEventSource Snapshot - Final stage
2023-07-12 23:12:09,180 WARN  stra-change-event-source-coordinator i.d.p.ChangeEventSourceCoordinator Change event source executor was interrupted
java.lang.InterruptedException: Interrupted while processing event SchemaChangeEvent [database=, schema=null, ddl=SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci, tables=[], type=DATABASE]
	at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.createSchemaChangeEventsForTables(MySqlSnapshotChangeEventSource.java:532)
	at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:121)
	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137)
	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
2023-07-12 23:12:09,313 INFO  pool-15-thread-1 io.debezium.jdbc.JdbcConnection Connection gracefully closed
2023-07-12 23:12:09,314 INFO  Capture_0    o.a.k.c.s.FileOffsetBackingStore Stopped FileOffsetBackingStore
2023-07-12 23:12:09,314 INFO  Capture_0    f.t.4.12AIsOBFwmo07iIlg1WqQz Debezium ended successfully with 'Connector 'io.debezium.connector.mysql.MySqlConnector' completed normally.'
2023-07-12 23:12:09,319 INFO  WorkerThread f.t.4.12AIsOBFwmo07iIlg1WqQz Ended after receiving 0 records: {}

Steps To Reproduce

  1. Run Kestra
  2. setup flow to sync from MySQL database with a) hundreds of tables or b) one table with a lot of rows
  3. trigger execution

Environment Information

  • Kestra Version: 0.10.0
  • Operating System (OS / Docker / Kubernetes): MacOS 13.4 /Docker
  • Java Version (If not docker): N/A

Example flow

id: test-pipeline
namespace: dev
tasks:

  • id: "capture"
    type: "io.kestra.plugin.debezium.mysql.Capture"
    snapshotMode: INITIAL
    properties:
    snapshot.locking.mode: none
    hostname: db.host.com
    port: "3306"
    username: user
    password: pass
    maxRecords: 10
    deleted: ADD_FIELD
    deletedFieldName: "is_deleted"
    metadata: ADD_FIELD
    metadataFieldName: "metadata"
    key: ADD_FIELD
    ignoreDdl: true
    format: WRAP

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.