Git Product home page Git Product logo

Comments (16)

long2ice avatar long2ice commented on June 1, 2024

redis和kafka是必须的组件,你可以使用docker-compose.yml一键部署。

from synch.

imxintian avatar imxintian commented on June 1, 2024

谢谢,有没有群可以交流呀

from synch.

long2ice avatar long2ice commented on June 1, 2024

还没有

from synch.

imxintian avatar imxintian commented on June 1, 2024

你好,我在使用docker-compose.yml 和 自己在服务器上搭建kafka和redis过程中
当使用 mysql2ch -c config.json produce时都会出现如下错误

[root@mysql mysql2ch]# mysql2ch -c config.json produce
Traceback (most recent call last):
  File "/usr/local/python372/bin/mysql2ch", line 11, in <module>
    load_entry_point('mysql2ch==0.4.5', 'console_scripts', 'mysql2ch')()
  File "/usr/local/python372/lib/python3.7/site-packages/mysql2ch/cli.py", line 86, in cli
    parse_args.run(parse_args)
  File "/usr/local/python372/lib/python3.7/site-packages/mysql2ch/cli.py", line 39, in run
    args.func(args)
  File "/usr/local/python372/lib/python3.7/site-packages/mysql2ch/producer.py", line 22, in produce
    init_partitions(settings)
  File "/usr/local/python372/lib/python3.7/site-packages/mysql2ch/common.py", line 137, in init_partitions
    client = KafkaAdminClient(bootstrap_servers=settings.kafka_server,)
  File "/usr/local/python372/lib/python3.7/site-packages/kafka/admin/client.py", line 214, in __init__
    self._refresh_controller_id()
  File "/usr/local/python372/lib/python3.7/site-packages/kafka/admin/client.py", line 274, in _refresh_controller_id
    controller_version = self._client.check_version(controller_id)
  File "/usr/local/python372/lib/python3.7/site-packages/kafka/client_async.py", line 907, in check_version
    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
  File "/usr/local/python372/lib/python3.7/site-packages/kafka/conn.py", line 1230, in check_version
    raise Errors.NodeNotReadyError()
kafka.errors.NodeNotReadyError: NodeNotReadyError
Sentry is attempting to send 2 pending error messages
Waiting up to 2 seconds
Press Ctrl-C to quit

一直解决不了,请求帮助 @long2ice

from synch.

long2ice avatar long2ice commented on June 1, 2024

kafka没有正常启动

from synch.

imxintian avatar imxintian commented on June 1, 2024

kafka没有正常启动
谢谢,kafka问题已解决。
主要是我采用的最新的kafka,
解决方案参照https://github.com/dpkp/kafka-python/issues/1861
在config/server.properties配置文件中添加后重启即可

from synch.

imxintian avatar imxintian commented on June 1, 2024

现在又遇到了如下问题:
1."skip_dmls"字段中去掉了跳过语句,但依然没有生效,不会进行update和delete操作
2.每次插入多行数据到mysql中,只会插入头一条到ClickHouse(假如一次插入3条数据到mysql,那么剩下2条数据要到下次数据插入时才能插入ClickHouse)。

from synch.

long2ice avatar long2ice commented on June 1, 2024

应该不会有这样的问题,你检查下配置文件

from synch.

imxintian avatar imxintian commented on June 1, 2024

应该不会有这样的问题,你检查下配置文件

如下是生产者生产的消息

2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 74, 'pay_money': Decimal('60.21'), 'pay_day': datetime.date(2019, 5, 25), 'pay_time': datetime.datetime(2019, 5, 25, 14, 0)}, 'event_unixtime': 1591780549887858, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 75, 'pay_money': Decimal('61.21'), 'pay_day': datetime.date(2019, 5, 26), 'pay_time': datetime.datetime(2019, 5, 26, 14, 0)}, 'event_unixtime': 1591780549892432, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 76, 'pay_money': Decimal('62.21'), 'pay_day': datetime.date(2019, 5, 24), 'pay_time': datetime.datetime(2019, 9, 24, 14, 0)}, 'event_unixtime': 1591780549894906, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 77, 'pay_money': Decimal('63.21'), 'pay_day': datetime.date(2020, 5, 3), 'pay_time': datetime.datetime(2020, 5, 3, 14, 0)}, 'event_unixtime': 1591780549899752, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 78, 'pay_money': Decimal('64.21'), 'pay_day': datetime.date(2020, 5, 4), 'pay_time': datetime.datetime(2020, 5, 4, 14, 0)}, 'event_unixtime': 1591780549905187, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922

如下是消费者消费的消息,如下第二行显示只成功插入一条数据

2020-06-10 17:15:50 - mysql2ch.writer:23 - DEBUG - INSERT INTO etldb.test VALUES  [{'id': 74, 'pay_money': Decimal('60.21'), 'pay_day': datetime.datetime(2019, 5, 25, 0, 0), 'pay_time': datetime.datetime(2019, 5, 25, 14, 0)}]
2020-06-10 17:15:50 - mysql2ch.writer:236 - INFO - etldb.test:success insert 1 rows!
2020-06-10 17:15:50 - mysql2ch.consumer:111 - INFO - commit success 136 events!
2020-06-10 17:15:50 - mysql2ch.consumer:53 - DEBUG - kafka msg:ConsumerRecord(topic='etl', partition=0, offset=221, timestamp=1591780549892, timestamp_type=0, key='etldb', value={'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 75, 'pay_money': Decimal('61.21'), 'pay_day': datetime.datetime(2019, 5, 26, 0, 0), 'pay_time': datetime.datetime(2019, 5, 26, 14, 0)}, 'event_unixtime': 1591780549892432, 'action_core': '2'}, headers=[], checksum=None, serialized_key_size=5, serialized_value_size=316, serialized_header_size=-1)
2020-06-10 17:15:50 - mysql2ch.consumer:53 - DEBUG - kafka msg:ConsumerRecord(topic='etl', partition=0, offset=222, timestamp=1591780549895, timestamp_type=0, key='etldb', value={'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 76, 'pay_money': Decimal('62.21'), 'pay_day': datetime.datetime(2019, 5, 24, 0, 0), 'pay_time': datetime.datetime(2019, 9, 24, 14, 0)}, 'event_unixtime': 1591780549894906, 'action_core': '2'}, headers=[], checksum=None, serialized_key_size=5, serialized_value_size=316, serialized_header_size=-1)
2020-06-10 17:15:50 - mysql2ch.consumer:53 - DEBUG - kafka msg:ConsumerRecord(topic='etl', partition=0, offset=223, timestamp=1591780549900, timestamp_type=0, key='etldb', value={'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 77, 'pay_money': Decimal('63.21'), 'pay_day': datetime.datetime(2020, 5, 3, 0, 0), 'pay_time': datetime.datetime(2020, 5, 3, 14, 0)}, 'event_unixtime': 1591780549899752, 'action_core': '2'}, headers=[], checksum=None, serialized_key_size=5, serialized_value_size=316, serialized_header_size=-1)
2020-06-10 17:15:50 - mysql2ch.consumer:53 - DEBUG - kafka msg:ConsumerRecord(topic='etl', partition=0, offset=224, timestamp=1591780549905, timestamp_type=0, key='etldb', value={'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 78, 'pay_money': Decimal('64.21'), 'pay_day': datetime.datetime(2020, 5, 4, 0, 0), 'pay_time': datetime.datetime(2020, 5, 4, 14, 0)}, 'event_unixtime': 1591780549905187, 'action_core': '2'}, headers=[], checksum=None, serialized_key_size=5, serialized_value_size=316, serialized_header_size=-1)

数据库表字段如下

clichouse :) select * from test;

SELECT *
FROM test

┌─id─┬─pay_money─┬────pay_day─┬────────────pay_time─┐
│ 7460.212019-05-252019-05-25 14:00:00 │
└────┴───────────┴────────────┴─────────────────────┘

1 rows in set. Elapsed: 0.004 sec.

配置文件如下:

{
  "debug": true,
  "environment": "development",
  "mysql_host": "xxx",
  "mysql_port": 3306,
  "mysql_user": "xxxx",
  "mysql_password": "xxxx",
  "mysql_server_id": 1,
  "redis_host": "xxx",
  "redis_port": 6379,
  "redis_password": "xxxxxx",
  "redis_db": 0,
  "clickhouse_host": "xxxx",
  "clickhouse_port": 9000,
  "clickhouse_user": "xxx",
  "clickhouse_password": "xxxx",
  "kafka_server": "xxxxx:9092",
  "kafka_topic": "xxx",
  "sentry_dsn": "https://[email protected]/1",
  "schema_table": {
    "xxxxdb": {
      "tables": [
        "test"
      ],
      "kafka_partition": 0
    }
  },
  "skip_delete_tables": [
    "test.test2"
  ],
  "skip_update_tables": [
    "test.test2"
  ],
   "skip_dmls": [
   ],
  "init_binlog_file": "mysql-bin.000001",
  "init_binlog_pos": 2179,
  "log_pos_prefix": "mysql2ch",
  "insert_num": 20000,
  "insert_interval": 60
}

from synch.

long2ice avatar long2ice commented on June 1, 2024

是不是删除了什么的

from synch.

imxintian avatar imxintian commented on June 1, 2024

是不是删除了什么的
去掉了 "delete"和"update"

   "skip_dmls": [
   ]

from synch.

long2ice avatar long2ice commented on June 1, 2024

你把skip相关的都删了

from synch.

imxintian avatar imxintian commented on June 1, 2024

你把skip相关的都删了

因为我发现,mysql 中 delete 和update 操作不能同步到clickhouse,想着删除它会不会及时同步到clickhouse,然而并没有!
同时每次插入多行数据到mysql中,只会插入头一条到ClickHouse(假如一次插入3条数据到mysql,那么剩下2条数据要到下次数据插入时才能插入ClickHouse)。

from synch.

long2ice avatar long2ice commented on June 1, 2024

你把insert_num和insert_interval都设置为1试试

from synch.

imxintian avatar imxintian commented on June 1, 2024

你把insert_num和insert_interval都设置为1试试

谢谢!完美解决!

from synch.

long2ice avatar long2ice commented on June 1, 2024

嗯,生产环境最好设置为默认值,效率更高。

from synch.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.