Comments (16)
redis和kafka是必须的组件,你可以使用docker-compose.yml一键部署。
from synch.
谢谢,有没有群可以交流呀
from synch.
还没有
from synch.
你好,我在使用docker-compose.yml 和 自己在服务器上搭建kafka和redis过程中
当使用 mysql2ch -c config.json produce
时都会出现如下错误
[root@mysql mysql2ch]# mysql2ch -c config.json produce
Traceback (most recent call last):
File "/usr/local/python372/bin/mysql2ch", line 11, in <module>
load_entry_point('mysql2ch==0.4.5', 'console_scripts', 'mysql2ch')()
File "/usr/local/python372/lib/python3.7/site-packages/mysql2ch/cli.py", line 86, in cli
parse_args.run(parse_args)
File "/usr/local/python372/lib/python3.7/site-packages/mysql2ch/cli.py", line 39, in run
args.func(args)
File "/usr/local/python372/lib/python3.7/site-packages/mysql2ch/producer.py", line 22, in produce
init_partitions(settings)
File "/usr/local/python372/lib/python3.7/site-packages/mysql2ch/common.py", line 137, in init_partitions
client = KafkaAdminClient(bootstrap_servers=settings.kafka_server,)
File "/usr/local/python372/lib/python3.7/site-packages/kafka/admin/client.py", line 214, in __init__
self._refresh_controller_id()
File "/usr/local/python372/lib/python3.7/site-packages/kafka/admin/client.py", line 274, in _refresh_controller_id
controller_version = self._client.check_version(controller_id)
File "/usr/local/python372/lib/python3.7/site-packages/kafka/client_async.py", line 907, in check_version
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
File "/usr/local/python372/lib/python3.7/site-packages/kafka/conn.py", line 1230, in check_version
raise Errors.NodeNotReadyError()
kafka.errors.NodeNotReadyError: NodeNotReadyError
Sentry is attempting to send 2 pending error messages
Waiting up to 2 seconds
Press Ctrl-C to quit
一直解决不了,请求帮助 @long2ice
from synch.
kafka没有正常启动
from synch.
kafka没有正常启动
谢谢,kafka问题已解决。
主要是我采用的最新的kafka,
解决方案参照https://github.com/dpkp/kafka-python/issues/1861
在config/server.properties配置文件中添加后重启即可
from synch.
现在又遇到了如下问题:
1."skip_dmls"字段中去掉了跳过语句,但依然没有生效,不会进行update和delete操作
2.每次插入多行数据到mysql中,只会插入头一条到ClickHouse(假如一次插入3条数据到mysql,那么剩下2条数据要到下次数据插入时才能插入ClickHouse)。
from synch.
应该不会有这样的问题,你检查下配置文件
from synch.
应该不会有这样的问题,你检查下配置文件
如下是生产者生产的消息
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 74, 'pay_money': Decimal('60.21'), 'pay_day': datetime.date(2019, 5, 25), 'pay_time': datetime.datetime(2019, 5, 25, 14, 0)}, 'event_unixtime': 1591780549887858, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 75, 'pay_money': Decimal('61.21'), 'pay_day': datetime.date(2019, 5, 26), 'pay_time': datetime.datetime(2019, 5, 26, 14, 0)}, 'event_unixtime': 1591780549892432, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 76, 'pay_money': Decimal('62.21'), 'pay_day': datetime.date(2019, 5, 24), 'pay_time': datetime.datetime(2019, 9, 24, 14, 0)}, 'event_unixtime': 1591780549894906, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 77, 'pay_money': Decimal('63.21'), 'pay_day': datetime.date(2020, 5, 3), 'pay_time': datetime.datetime(2020, 5, 3, 14, 0)}, 'event_unixtime': 1591780549899752, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
2020-06-10 17:15:49 - mysql2ch.producer:70 - DEBUG - send to kafka success: key:etldb,event:{'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 78, 'pay_money': Decimal('64.21'), 'pay_day': datetime.date(2020, 5, 4), 'pay_time': datetime.datetime(2020, 5, 4, 14, 0)}, 'event_unixtime': 1591780549905187, 'action_core': '2'}
2020-06-10 17:15:49 - mysql2ch.producer:73 - DEBUG - success set binlog pos:mysql-bin.000001:13922
如下是消费者消费的消息,如下第二行显示只成功插入一条数据
2020-06-10 17:15:50 - mysql2ch.writer:23 - DEBUG - INSERT INTO etldb.test VALUES [{'id': 74, 'pay_money': Decimal('60.21'), 'pay_day': datetime.datetime(2019, 5, 25, 0, 0), 'pay_time': datetime.datetime(2019, 5, 25, 14, 0)}]
2020-06-10 17:15:50 - mysql2ch.writer:236 - INFO - etldb.test:success insert 1 rows!
2020-06-10 17:15:50 - mysql2ch.consumer:111 - INFO - commit success 136 events!
2020-06-10 17:15:50 - mysql2ch.consumer:53 - DEBUG - kafka msg:ConsumerRecord(topic='etl', partition=0, offset=221, timestamp=1591780549892, timestamp_type=0, key='etldb', value={'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 75, 'pay_money': Decimal('61.21'), 'pay_day': datetime.datetime(2019, 5, 26, 0, 0), 'pay_time': datetime.datetime(2019, 5, 26, 14, 0)}, 'event_unixtime': 1591780549892432, 'action_core': '2'}, headers=[], checksum=None, serialized_key_size=5, serialized_value_size=316, serialized_header_size=-1)
2020-06-10 17:15:50 - mysql2ch.consumer:53 - DEBUG - kafka msg:ConsumerRecord(topic='etl', partition=0, offset=222, timestamp=1591780549895, timestamp_type=0, key='etldb', value={'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 76, 'pay_money': Decimal('62.21'), 'pay_day': datetime.datetime(2019, 5, 24, 0, 0), 'pay_time': datetime.datetime(2019, 9, 24, 14, 0)}, 'event_unixtime': 1591780549894906, 'action_core': '2'}, headers=[], checksum=None, serialized_key_size=5, serialized_value_size=316, serialized_header_size=-1)
2020-06-10 17:15:50 - mysql2ch.consumer:53 - DEBUG - kafka msg:ConsumerRecord(topic='etl', partition=0, offset=223, timestamp=1591780549900, timestamp_type=0, key='etldb', value={'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 77, 'pay_money': Decimal('63.21'), 'pay_day': datetime.datetime(2020, 5, 3, 0, 0), 'pay_time': datetime.datetime(2020, 5, 3, 14, 0)}, 'event_unixtime': 1591780549899752, 'action_core': '2'}, headers=[], checksum=None, serialized_key_size=5, serialized_value_size=316, serialized_header_size=-1)
2020-06-10 17:15:50 - mysql2ch.consumer:53 - DEBUG - kafka msg:ConsumerRecord(topic='etl', partition=0, offset=224, timestamp=1591780549905, timestamp_type=0, key='etldb', value={'table': 'test', 'schema': 'etldb', 'action': 'insert', 'values': {'id': 78, 'pay_money': Decimal('64.21'), 'pay_day': datetime.datetime(2020, 5, 4, 0, 0), 'pay_time': datetime.datetime(2020, 5, 4, 14, 0)}, 'event_unixtime': 1591780549905187, 'action_core': '2'}, headers=[], checksum=None, serialized_key_size=5, serialized_value_size=316, serialized_header_size=-1)
数据库表字段如下
clichouse :) select * from test;
SELECT *
FROM test
┌─id─┬─pay_money─┬────pay_day─┬────────────pay_time─┐
│ 74 │ 60.21 │ 2019-05-25 │ 2019-05-25 14:00:00 │
└────┴───────────┴────────────┴─────────────────────┘
1 rows in set. Elapsed: 0.004 sec.
配置文件如下:
{
"debug": true,
"environment": "development",
"mysql_host": "xxx",
"mysql_port": 3306,
"mysql_user": "xxxx",
"mysql_password": "xxxx",
"mysql_server_id": 1,
"redis_host": "xxx",
"redis_port": 6379,
"redis_password": "xxxxxx",
"redis_db": 0,
"clickhouse_host": "xxxx",
"clickhouse_port": 9000,
"clickhouse_user": "xxx",
"clickhouse_password": "xxxx",
"kafka_server": "xxxxx:9092",
"kafka_topic": "xxx",
"sentry_dsn": "https://[email protected]/1",
"schema_table": {
"xxxxdb": {
"tables": [
"test"
],
"kafka_partition": 0
}
},
"skip_delete_tables": [
"test.test2"
],
"skip_update_tables": [
"test.test2"
],
"skip_dmls": [
],
"init_binlog_file": "mysql-bin.000001",
"init_binlog_pos": 2179,
"log_pos_prefix": "mysql2ch",
"insert_num": 20000,
"insert_interval": 60
}
from synch.
是不是删除了什么的
from synch.
是不是删除了什么的
去掉了 "delete"和"update"
"skip_dmls": [
]
from synch.
你把skip相关的都删了
from synch.
你把skip相关的都删了
因为我发现,mysql 中 delete 和update 操作不能同步到clickhouse,想着删除它会不会及时同步到clickhouse,然而并没有!
同时每次插入多行数据到mysql中,只会插入头一条到ClickHouse(假如一次插入3条数据到mysql,那么剩下2条数据要到下次数据插入时才能插入ClickHouse)。
from synch.
你把insert_num和insert_interval都设置为1试试
from synch.
你把insert_num和insert_interval都设置为1试试
谢谢!完美解决!
from synch.
嗯,生产环境最好设置为默认值,效率更高。
from synch.
Related Issues (20)
- pymysql.util 依赖问题 HOT 4
- 支持windows调试吗 HOT 1
- pip install error HOT 2
- 支持表通配符配置 HOT 4
- 一些修复 希望检阅合并一下代码
- redis队列消费后,队列中的消息不删除 HOT 2
- 增量同步的时候支持mysql定期删数据吗? HOT 1
- 多表复制,配置文件应该怎么配置 HOT 3
- 可以不填写redis和kafka进行启动吗 HOT 1
- AttributeError: 'NoneType' object has no attribute 'get' HOT 1
- UnicodeDecodeError - 'utf-8' codec can't decode byte 0xff HOT 1
- AttributeError: 'NoneType' object has no attribute 'get' HOT 8
- BUG HOT 1
- postgres全量同步,数据过大导致错误
- 日志问题
- mysql到clickhouse HOT 2
- mysql同步到clickhouse全量初始化会超时 HOT 1
- 源数据时间有1970-01-01时间 增量同步失败
- 能否增加自定义目标数据库表名?
- 如何添加Oracle数据库? HOT 8
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from synch.