long2ice / synch Goto Github PK
View Code? Open in Web Editor NEWSync data from the other DB to ClickHouse(cluster)
Home Page: https://github.com/long2ice/synch
License: Apache License 2.0
Sync data from the other DB to ClickHouse(cluster)
Home Page: https://github.com/long2ice/synch
License: Apache License 2.0
[root@k8smaster site-packages]# synch --config /etc/synch.ini etl --schema test --renew
2020-06-30 10:39:16 - synch.reader:19 - DEBUG - select COLUMN_NAME from information_schema.COLUMNS where TABLE_SCHEMA='test' and TABLE_NAME='hexin_erp_product' and COLUMN_KEY='PRI'
2020-06-30 10:39:16 - synch.replication.clickhouse:27 - DEBUG - drop table test.hexin_erp_product
2020-06-30 10:39:16 - synch.reader:43 - INFO - drop table success:test.hexin_erp_product
2020-06-30 10:39:16 - synch.replication.clickhouse:27 - DEBUG - select count(*)from system.tables where database = 'test' and name = 'hexin_erp_product'
2020-06-30 10:39:16 - synch.replication.clickhouse:27 - DEBUG - CREATE TABLE test.hexin_erp_product ENGINE = MergeTree ORDER BY id AS SELECT * FROM mysql('192.168.66.33:3306', 'test', 'hexin_erp_product', 'root', 'Hexin2007')
2020-06-30 10:39:17 - synch.replication.clickhouse:38 - DEBUG - select COLUMN_NAME, COLUMN_TYPE from information_schema.COLUMNS where TABLE_NAME = 'hexin_erp_product' and COLUMN_TYPE like '%decimal%'and TABLE_SCHEMA = 'test'
2020-06-30 10:39:17 - synch.replication.clickhouse:27 - DEBUG - alter table test.hexin_erp_product modify column purchase_minprice Decimal(10,2)
2020-06-30 10:39:17 - synch.replication.clickhouse:27 - DEBUG - alter table test.hexin_erp_product modify column purchase_maxprice Decimal(10,2)
2020-06-30 10:39:18 - synch.replication.clickhouse:27 - DEBUG - alter table test.hexin_erp_product modify column apply_price Decimal(10,2) #一直卡在这边有10分钟,最好报如下错误
Traceback (most recent call last):
File "/usr/local/bin/synch", line 11, in
sys.exit(cli())
File "/usr/local/lib/python3.6/site-packages/synch/cli.py", line 66, in cli
parse_args.run(parse_args)
File "/usr/local/lib/python3.6/site-packages/synch/cli.py", line 19, in run
args.func(args)
File "/usr/local/lib/python3.6/site-packages/synch/replication/etl.py", line 12, in make_etl
Global.reader.etl_full(Global.writer, schema, tables, renew)
File "/usr/local/lib/python3.6/site-packages/synch/reader/init.py", line 50, in etl_full
writer.fix_table_column_type(self, schema, table)
File "/usr/local/lib/python3.6/site-packages/synch/replication/clickhouse.py", line 45, in fix_table_column_type
self.execute(fix_sql)
File "/usr/local/lib/python3.6/site-packages/synch/replication/clickhouse.py", line 28, in execute
return self._client.execute(sql, params=params, *args, **kwargs)
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 224, in execute
columnar=columnar
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 347, in process_ordinary_query
columnar=columnar)
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 89, in receive_result
return result.get_result()
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/result.py", line 50, in get_result
for packet in self.packet_generator:
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 101, in packet_generator
packet = self.receive_packet()
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/client.py", line 115, in receive_packet
packet = self.connection.receive_packet()
File "/usr/local/lib64/python3.6/site-packages/clickhouse_driver/connection.py", line 409, in receive_packet
packet.type = packet_type = read_varint(self.fin)
File "clickhouse_driver/varint.pyx", line 40, in clickhouse_driver.varint.read_varint
File "clickhouse_driver/bufferedreader.pyx", line 55, in clickhouse_driver.bufferedreader.BufferedReader.read_one
File "clickhouse_driver/bufferedreader.pyx", line 188, in clickhouse_driver.bufferedreader.BufferedSocketReader.read_into_buffer
socket.timeout: timed out
这个表(hexin_erp_product)的字段如下:
CREATE TABLE hexin_erp_product
(
id
INT(11) NOT NULL DEFAULT '0' COMMENT '主键ID',
cate_id
INT(11) NULL DEFAULT '0' COMMENT '分类ID',
supplier_id
INT(11) NULL DEFAULT '0' COMMENT '供应商ID',
brand_id
INT(11) NULL DEFAULT '0',
product_cname
VARCHAR(128) NULL DEFAULT '' COMMENT '中文名称' COLLATE 'utf8mb4_general_ci',
product_ename
VARCHAR(128) NULL DEFAULT '' COMMENT '英文名称' COLLATE 'utf8mb4_general_ci',
product_status
TINYINT(4) NULL DEFAULT '1' COMMENT '商品状态(已审核通过的商品) 1正常 2停售 3清仓 4打折',
old_parent_sku
VARCHAR(100) NULL DEFAULT '' COMMENT '旧的商品编码(兼容新旧数据)' COLLATE 'utf8_general_ci',
parent_sku
VARCHAR(100) NULL DEFAULT '' COMMENT '商品sku' COLLATE 'utf8_general_ci',
purchase_day
INT(11) NULL DEFAULT '0' COMMENT '采购到货天数',
purchaser
VARCHAR(50) NULL DEFAULT '' COMMENT '采购人员' COLLATE 'utf8_general_ci',
purchase_minprice
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '最低采购价',
purchase_maxprice
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '最高采购价',
product_image
LONGTEXT NULL COMMENT '主图' COLLATE 'utf8mb4_general_ci',
purchase_link
VARCHAR(255) NULL DEFAULT '' COMMENT '采购链接' COLLATE 'utf8mb4_general_ci',
is_electric
TINYINT(1) NULL DEFAULT '1' COMMENT '是否带电(1否2是)',
is_powder
TINYINT(1) NULL DEFAULT '1' COMMENT '是否粉末(1否2是)',
is_liquid
TINYINT(1) NULL DEFAULT '1' COMMENT '是否液体(1否2是)',
is_magnetic
TINYINT(1) NULL DEFAULT '1' COMMENT '是否带磁(1否2是)',
is_tort
TINYINT(1) NULL DEFAULT '1' COMMENT '是否侵权(1否2是)',
is_knowledge
TINYINT(1) NULL DEFAULT '1' COMMENT '是否知识产权(1否2是)',
material
VARCHAR(255) NULL DEFAULT '' COMMENT '材质' COLLATE 'utf8_general_ci',
unit
VARCHAR(20) NULL DEFAULT '' COMMENT '单位' COLLATE 'utf8_general_ci',
season
VARCHAR(10) NULL DEFAULT '' COMMENT '季节(春季、夏季、秋季、冬季)' COLLATE 'utf8_general_ci',
apply_cname
VARCHAR(50) NULL DEFAULT NULL COMMENT '申报中文' COLLATE 'utf8_general_ci',
apply_ename
VARCHAR(50) NULL DEFAULT '' COMMENT '申报英文' COLLATE 'utf8_general_ci',
apply_price
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '申报价值',
apply_code
VARCHAR(50) NULL DEFAULT '' COMMENT '申报海关编码' COLLATE 'utf8_general_ci',
storage_id
INT(11) NULL DEFAULT '0' COMMENT '默认发货仓库',
origin_country
VARCHAR(50) NULL DEFAULT '' COMMENT '原产国二字代码' COLLATE 'utf8_general_ci',
origin_country_code
VARCHAR(20) NULL DEFAULT '' COLLATE 'utf8_general_ci',
max_stock
INT(11) NULL DEFAULT '0' COMMENT '库存上限',
min_stock
INT(11) NULL DEFAULT '0' COMMENT '最小库存',
cost_price
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '成本价格',
out_box_single_weight
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱净重(单位:克)',
out_box_height
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱高(单位:cm)',
out_box_length
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱长(单位:cm)',
out_box_width
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱宽(单位:cm)',
out_box_gross_weight
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '外箱毛重(单位:克)',
box_single_weight
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒净重(单位:克)',
box_height
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒高(单位:cm)',
box_length
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒长(单位:cm)',
box_width
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒宽(单位:cm)',
box_gross_weight
DECIMAL(10,2) NULL DEFAULT '0.00' COMMENT '内盒毛重(单位:克)',
checker
VARCHAR(50) NULL DEFAULT '' COMMENT '审核人的uuid' COLLATE 'utf8mb4_general_ci',
check_status
TINYINT(1) NULL DEFAULT '1' COMMENT '审核状态(1待审核 2审核通过 3审核不通过4待发布)',
check_time
INT(11) NULL DEFAULT '0' COMMENT '审核时间',
check_info
VARCHAR(255) NULL DEFAULT '' COMMENT '审核时间' COLLATE 'utf8mb4_general_ci',
developer
VARCHAR(50) NULL DEFAULT '' COMMENT '开发者uuid 业绩归属人' COLLATE 'utf8_general_ci',
create_time
INT(11) NULL DEFAULT '0',
modify_time
INT(11) NULL DEFAULT '0' COMMENT '修改时间',
del_flag
TINYINT(1) NULL DEFAULT '1' COMMENT '是否删除(1否2是)',
product_sub_images
LONGTEXT NULL COMMENT '商品图片明细,多个以逗号隔开' COLLATE 'utf8_general_ci',
property_data
TEXT NULL COMMENT '属性信息' COLLATE 'utf8_general_ci',
pid
VARCHAR(50) NULL DEFAULT '' COMMENT '平台商品ID' COLLATE 'utf8_general_ci',
description
TEXT NULL COMMENT '描述' COLLATE 'utf8_general_ci',
unsale_time
INT(11) NULL DEFAULT '0' COMMENT '停售时间',
comment
TEXT NULL COMMENT '商品链接标题' COLLATE 'utf8mb4_general_ci',
comment2
TEXT NULL COMMENT '授权标记' COLLATE 'utf8_general_ci',
tag_id
VARCHAR(255) NOT NULL DEFAULT '' COMMENT '自定义标签id' COLLATE 'utf8_general_ci',
product_link
VARCHAR(255) NOT NULL DEFAULT '' COMMENT '商品链接' COLLATE 'utf8_general_ci',
fabric_weight
INT(11) NULL DEFAULT '0' COMMENT '面料克重',
size_img_str
TEXT NULL COMMENT '尺码表图片路径' COLLATE 'utf8_general_ci',
is_model
TINYINT(4) NULL DEFAULT '0' COMMENT '是否有模型图(0否1是)',
is_real
TINYINT(4) NULL DEFAULT '0' COMMENT '是否有实物图(0否1是)',
model_from
TINYINT(4) NULL DEFAULT '0' COMMENT '模型图来源(1和新自拍2工厂自拍3其他来源)',
real_from
TINYINT(4) NULL DEFAULT '0' COMMENT '模型图来源(1和新自拍2工厂自拍3其他来源)',
is_order
TINYINT(4) NULL DEFAULT '0' COMMENT '是否订做(0否1是)',
first_arrive_time
INT(11) NULL DEFAULT '0' COMMENT '首单到货时间',
first_order_num
VARCHAR(255) NULL DEFAULT '' COMMENT '首单订做数量' COLLATE 'utf8_general_ci',
start_order_num
VARCHAR(255) NULL DEFAULT '0' COMMENT '起订量' COLLATE 'utf8_general_ci',
other_comment
TEXT NULL COMMENT '其他备注' COLLATE 'utf8_general_ci',
is_paste
TINYINT(4) NULL DEFAULT '1' COMMENT '是否膏体(1否2是)',
un_sale_reason
TEXT NULL COMMENT '停售原因' COLLATE 'utf8_general_ci',
is_new
TINYINT(1) NULL DEFAULT '0' COMMENT 'new(0 fou 1 shi )',
package_size
VARCHAR(255) NULL DEFAULT '' COMMENT '产品包装袋尺寸' COLLATE 'utf8_general_ci',
publish_time
INT(11) NOT NULL DEFAULT '0' COMMENT '发布时间',
tort_reason
VARCHAR(255) NOT NULL COMMENT '侵权原因' COLLATE 'utf8_general_ci',
tort_time
INT(11) NOT NULL DEFAULT '0' COMMENT '侵权时间',
size_adress
VARCHAR(70) NOT NULL DEFAULT '' COMMENT '尺寸表名字' COLLATE 'utf8_general_ci',
edit_status
TINYINT(4) NULL DEFAULT '1' COMMENT '编辑状态 1未编辑 2已编辑',
original_cate_id
INT(11) NOT NULL DEFAULT '0' COMMENT '生成商品子sku的分类原始id(0-无做过移动)',
edit_time
INT(11) NOT NULL DEFAULT '0' COMMENT '编辑商品的时间记录',
warehouse_entry_time
INT(11) NOT NULL DEFAULT '0' COMMENT '入库时间',
tag_attribute
INT(1) NOT NULL DEFAULT '1' COMMENT '1-普货;2-敏感货;3-液体',
image_tag
VARCHAR(20) NOT NULL DEFAULT '' COMMENT '图片标签以","分隔开(可多选),1-工厂自拍实物图,2-和新自拍实物图,,3-工厂自拍模特图,4-和新自拍模特图,5.其他来源' COLLATE 'utf8_general_ci',
version
INT(11) NOT NULL DEFAULT '0' COMMENT '版本号',
PRIMARY KEY (id
)
)
COLLATE='latin1_swedish_ci'
ENGINE=InnoDB
;
synch配置文件如下:
[core]
debug = True
broker_type = redis
source_db = mysql
skip_delete_tables =
skip_update_tables =
skip_dmls =
insert_num = 1
insert_interval = 1
auto_full_etl = True
[sentry]
environment = development
dsn =
[redis]
host = 127.0.0.1
port = 6379
password =
db = 0
prefix = synch
sentinel = False
sentinel_hosts = 127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002
sentinel_master = master
queue_max_len = 200000
[mysql]
server_id = 33
show master status
result if emptyinit_binlog_file =mysql-bin.000992
show master status
result if emptyinit_binlog_pos =66134615
host = 192.168.66.33
port = 3306
user = root
password = 123456
[mysql.test]
tables = hexin_erp_product
kafka_partition = 0
[postgres]
host = postgres
port = 5432
user = postgres
password =
[postgres.postgres]
tables = test
kafka_partition = 0
[clickhouse]
host = 127.0.0.1
port = 9000
user = default
password =
[kafka]
servers = 127.0.0.1:9092
topic = synch
能帮忙看一下是什么原因吗?
mysql同步Clickhouse 如果Clickhouse想使用分片 怎么让clickhouse的表名和MySQL的表名不一样
Centos 7.3 ClickHouse 21.3.5.42 最新的synch 报如下错误,不知是什么原因?
[root@webserver synch]# synch --alias mysql_db etl --schema fmmp --tables T_AGENCY
Traceback (most recent call last):
File "/usr/local/python/bin/synch", line 8, in
sys.exit(cli())
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 1137, in call
return self.main(*args, **kwargs)
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 1062, in main
rv = self.invoke(ctx)
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 1665, in invoke
super().invoke(ctx)
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/python/lib/python3.9/site-packages/click/core.py", line 763, in invoke
return __callback(*args, **kwargs)
File "/usr/local/python/lib/python3.9/site-packages/click/decorators.py", line 26, in new_func
return f(get_current_context(), *args, **kwargs)
File "/usr/local/python/lib/python3.9/site-packages/synch/cli.py", line 36, in cli
init(config)
File "/usr/local/python/lib/python3.9/site-packages/synch/factory.py", line 161, in init
dsn = Settings.get("sentry", "dsn")
File "/usr/local/python/lib/python3.9/site-packages/synch/settings.py", line 105, in get
c = c.get(arg)
AttributeError: 'NoneType' object has no attribute 'get'
mysql binlog_format must be ROW ?
thanks your great repo.
[root@localhost bin]# mysql2ch etl -h
usage: mysql2ch etl [-h] --schema SCHEMA [--tables TABLES] [--renew]
optional arguments:
-h, --help show this help message and exit
--schema SCHEMA Schema to full etl.
--tables TABLES Tables to full etl,multiple tables split with comma,default
read from environment.
--renew Etl after try to drop the target tables.
配置文件:
[core]
mysql_server_id = 1
queue_max_len = 200000
init_binlog_file = binlog.000024
init_binlog_pos = 252563
skip_delete_tables =
skip_update_tables =
skip_dmls =
insert_num = 1
insert_interval = 1
[sentry]
environment = development
dsn = https://[email protected]/1
[redis]
host = 127.0.0.1
port = 6379
password =
db = 0
prefix = mysql2ch
sentinel = false
sentinel_hosts = 127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002
sentinel_master = master
[mysql]
host = 127.0.0.1
port = 3306
user = root
password =
[mysql.saas_im]
tables = letter_record_back
[clickhouse]
host = 127.0.0.1
port = 9000
user = default
password =
按照您的最新配置文件,
mysql_server_id = 1
init_binlog_file = binlog.000088
init_binlog_pos = 2238
运行produce报错:
File "/usr/local/bin/mysql2ch", line 8, in
sys.exit(cli())
File "/usr/local/lib/python3.6/site-packages/mysql2ch/cli.py", line 66, in cli
parse_args.run(parse_args)
File "/usr/local/lib/python3.6/site-packages/mysql2ch/cli.py", line 27, in run
args.func(args)
File "/usr/local/lib/python3.6/site-packages/mysql2ch/producer.py", line 43, in produce
skip_update_tables=settings.skip_update_tables,
File "/usr/local/lib/python3.6/site-packages/mysql2ch/reader.py", line 93, in binlog_reading
for binlog_event in stream:
File "/usr/local/lib/python3.6/site-packages/pymysqlreplication/binlogstream.py", line 430, in fetchone
pkt = self._stream_connection._read_packet()
File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 684, in _read_packet
packet.check_error()
File "/usr/local/lib/python3.6/site-packages/pymysql/protocol.py", line 220, in check_error
err.raise_mysql_exception(self._data)
File "/usr/local/lib/python3.6/site-packages/pymysql/err.py", line 109, in raise_mysql_exception
raise errorclass(errno, errval)
pymysql.err.InternalError: (1236, 'Could not find first log file name in binary log index file')
请求帮助!
你好,看到你的作品真的很赞!
小白初学clickhouse,请问再不使用kafka和redis情况下,只是想把mysql表同步到clickhouse,config配置文件怎么配呢?感觉config配置写的文档不是很详细。请求帮助,谢谢!
When I ran this
[eric@localhost mysetup]$ synch -c synch.yaml
Traceback (most recent call last):
File "/home/wenching/.local/bin/synch", line 11, in <module>
sys.exit(cli())
File "/home/wenching/.local/lib/python3.6/site-packages/synch/cli.py", line 78, in cli
parse_args.run(parse_args)
AttributeError: 'Namespace' object has no attribute 'run'
If I look thru the codes, I believe it's expecting the command line parameters
parse_args = parser.parse_args()
parse_args.run(parse_args)
Any help? Thanks.
尝试了一下在mysql新建表,clickhouse这边监听的时候不会同步建表
- synch.replication.clickhouse:57 - DEBUG - alter table sheme.table delete where id in (xxxxx)
主键id是string
正确的sql应该是 alter table epsandbox.q_order delete where id in (‘xxxxx’)
File "/opt/app/python3/clickhouse_synch/lib/python3.9/site-packages/pymysqlreplication/binlogstream.py", line 9, in
from pymysql.util import int2byte
ModuleNotFoundError: No module named 'pymysql.util
2020-12-11 19:25:11 - synch.replication.continuous:65 - INFO - Block 1 seconds timeout, insert current 0 events
mysql bit类型数据 无法同步到clickhouse。
是否支持表通配符配置
source_dbs:
- db_type: mysql
databases:
- database: synch_mysql_test
auto_create: true
tables:
- table: log_*
I noticed in the synch.yaml
insert_num: 1 # how many num to submit,recommend set 20000 when production
insert_interval: 1 # how many seconds to submit,recommend set 60 when production
Why can't production, I set both as 1? If i set the interval to 60, it won't be real time or near real time updates?
Same to insert_num, it seems in production after hitting 20,000 records will get the updates.
If I maintain 1 for both settings in production, will the program crashed?
作者,你好,请问下 同步多张表的yaml或者 conf配置文件 应该怎么写 我用 逗号分隔,程序直接连两张表都取出来了。
2021-08-16 15:45:29 - synch.replication.etl:84 - WARNING - No pk found in clickhouse.order_goods,invoice_order, skip
请教一下,目前遇到了这么一种情况:
kafka出错导致索引文件和日志文件时间戳不一致(可能是重启或zookeeper出问题或内存不足等原因)
producer.send正常发送,但是消费者无法消费。
网上搜索,大都是说删除索引文件,重启kafka。按照这种方式做了以后会导致消息(mysql数据)丢失,因为代码中producer.send正常(错误好像不影响send,暂未复现该情况),redis中的binlog位置会更新。
怎么才能让producer感知到kafka(容器单机部署非集群)出问题了,使其发送消息失败且不更新redis中的binlog的位置。
我试着将producer改成同步,但是感觉好像没关系。
报错信息
ARN [Log partition=mysql2ch-4, dir=/var/lib/kafka/data] Found a corrupted index file corresponding to log file /var/lib/kafka/data/mysql2ch-4/00000000000000000000.log due to Corrupt time index found, time index file (/var/lib/kafka/data/mysql2ch-4/00000000000000000000.timeindex) has non-zero size but the last timestamp is 0 which is less than the first timestamp 1587959542575}, recovering segment and rebuilding index files... (kafka.log.Log)
我处理的方式是 删除每个分区中的.timeindex和.index文件,重启kafka。
结果导致了数据丢失,因为redis更新了位置
接受到的消息里有数字类型的。encode失败?
"/usr/local/python3/lib/python3.7/site-packages/synch/replication/continuous.py" 171L, 6023C 1
return f(get_current_context(), *args, **kwargs)
File "/usr/local/python3/lib/python3.7/site-packages/synch/cli.py", line 82, in consume
alias, schema, tables_pk, table_dict, last_msg_id, skip_error,
File "/usr/local/python3/lib/python3.7/site-packages/synch/replication/continuous.py", line 130, in continuous_etl
f"insert event error,error: {e}", exc_info=True, stack_info=True
Message: "insert event error,error: 'int' object has no attribute 'encode'"
如下是消息
2020-11-24 20:41:53 - synch.reader.mysql:126 - DEBUG - send to queue success: key:ipark,event:{'table': 'indicators_statistical', 'schema': 'ipark', 'action': 'insert', 'values': {'id': 467, 'tenant_id': '1', 'indicators_type': '12', 'unique_key': '50189.1154933992', 'indicators_number': 1, 'indicators_date': datetime.datetime(2020, 11, 24, 20, 41, 52), 'create_time': datetime.datetime(2020, 11, 24, 20, 41, 52), 'modify_time': datetime.datetime(2020, 11, 24, 20, 41, 52)}, 'event_unixtime': 1606221713103948, 'action_seq': 2}
使用docker-compose部署时,synch镜像没有kafka模块
mysql添加新列并在clickhouse中也添加该列后,依然报错。
因为拼接的插入clickhouse的sql语句是不带列名的,而insert数据有部分没有新列,有部分有,比如insert into tb values(...),(...,‘新列’)
是不是可以考虑获取clickhouse列名来进行添加,或者有其他更好的办法吗
功能正常可用,运行时报错
Traceback (most recent call last):
File "/synch/synch/replication/continuous.py", line 126, in continuous_etl
writer.insert_events(schema, table, insert_events)
File "/synch/synch/writer/init.py", line 82, in insert_events
self.execute(insert_sql, list(map(lambda x: x.get("values"), insert_data)))
File "/synch/synch/writer/init.py", line 55, in execute
return self._client.execute(sql, params=params, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/client.py", line 214, in execute
rv = self.process_insert_query(
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/client.py", line 371, in process_insert_query
rv = self.send_data(sample_block, data,
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/client.py", line 407, in send_data
self.connection.send_data(block)
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/connection.py", line 509, in send_data
self.block_out.write(block)
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/streams/native.py", line 38, in write
write_column(self.context, col_name, col_type, items,
File "/usr/local/lib/python3.9/site-packages/clickhouse_driver/columns/service.py", line 112, in write_column
raise errors.TypeMismatchError(
clickhouse_driver.errors.TypeMismatchError: Code: 53. Type mismatch in VALUES section. Repeat query with types_check=True for detailed info. Column PUBLISH_DATE: 'I' format requires 0 <= number <= 4294967295
字段“PUBLISH_DATE”是datatime类型,大部分数据同步成功,应该是有个别数据报错了
请教:这是什么原因的报错?
作者你好,Docker可否支持一下Mysql更高版本比如8.x 感谢
merge_tree.py line 80
这个地方 tuple(values[pk[i]] for i in pk) 是不是要改成下面的?
tuple(values[i] for i in pk)
HI there,
Like to install pip for the dev version.
I don't want to use docker.
Any help? Thanks.
使用consume时出错,提示
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/service.py", line 100, in write_column
column.write_data(items, buf)
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 77, in write_data
self._write_data(items, buf)
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 80, in _write_data
prepared = self.prepare_items(items)
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 61, in prepare_items
check_item_type(x)
File "/home/mysql2ch/venv/lib/python3.7/site-packages/clickhouse_driver/columns/base.py", line 37, in check_item_type
raise exceptions.ColumnTypeMismatchException(value)
clickhouse_driver.columns.exceptions.ColumnTypeMismatchException: 0.00
During handling of the above exception, another exception occurred:
writer.py 第208->212行 应改为
if 'delete' not in skip_dmls_all and skip_dml_table_name not in skip_delete_tb_name:
否则正常添加或更新可能会跟删除一起过滤掉
[[email protected] mysql2ch]# /usr/bin/mysql2ch
Traceback (most recent call last):
File "/usr/bin/mysql2ch", line 11, in
load_entry_point('mysql2ch==0.3.0', 'console_scripts', 'mysql2ch')()
File "/usr/lib/python2.7/site-packages/pkg_resources/init.py", line 489, in load_entry_point
return get_distribution(dist).load_entry_point(group, name)
File "/usr/lib/python2.7/site-packages/pkg_resources/init.py", line 2852, in load_entry_point
return ep.load()
File "/usr/lib/python2.7/site-packages/pkg_resources/init.py", line 2443, in load
return self.resolve()
File "/usr/lib/python2.7/site-packages/pkg_resources/init.py", line 2449, in resolve
module = import(self.module_name, fromlist=['name'], level=0)
File "build/bdist.linux-x86_64/egg/mysql2ch/init.py", line 4, in
File "build/bdist.linux-x86_64/egg/mysql2ch/settings.py", line 57, in
File "build/bdist.linux-x86_64/egg/mysql2ch/settings.py", line 19, in parse_schema_table
AttributeError: 'NoneType' object has no attribute 'split'
CentOS7 执行 pip install synch 异常(已安装 python 3.7 和 pip3)
WARNING: Discarding https://files.pythonhosted.org/packages/2d/df/5440ee86bbb248325cdd6e1fc9cbbba365018a0d2d57f673610e020e6b1d/mysqlclient-1.3.2.tar.gz#sha256=e44025830501b9f70f8c2fe8eeff66f0df2df198802f7295801dac199b8236ef (from https://pypi.org/simple/mysqlclient/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
Downloading mysqlclient-1.3.1.tar.gz (76 kB)
|████████████████████████████████| 76 kB 299 kB/s
ERROR: Command errored out with exit status 1:
command: /usr/local/python3/bin/python3.7 -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup.py'"'"'; __file__='"'"'/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-lyq28ain
cwd: /tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/
Complete output (10 lines):
/bin/sh: mysql_config: 未找到命令
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup.py", line 17, in <module>
metadata, options = get_config()
File "/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup_posix.py", line 47, in get_config
libs = mysql_config("libs_r")
File "/tmp/pip-install-c31l6e8r/mysqlclient_7f635c4393eb4ce88ef9a713c1ffb032/setup_posix.py", line 29, in mysql_config
raise EnvironmentError("%s not found" % (mysql_config.path,))
OSError: mysql_config not found
----------------------------------------
WARNING: Discarding https://files.pythonhosted.org/packages/6b/ba/4729d99e85a0a35bb46d55500570de05b4af10431cef174b6da9f58a0e50/mysqlclient-1.3.1.tar.gz#sha256=3549e8a61f10c8cd8eac6581d3f44d0594f535fb7b29e6090db3a0bc547b25ad (from https://pypi.org/simple/mysqlclient/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
Downloading mysqlclient-1.3.0.tar.gz (76 kB)
|████████████████████████████████| 76 kB 336 kB/s
ERROR: Command errored out with exit status 1:
command: /usr/local/python3/bin/python3.7 -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup.py'"'"'; __file__='"'"'/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-7v_13f8p
cwd: /tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/
Complete output (10 lines):
/bin/sh: mysql_config: 未找到命令
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup.py", line 17, in <module>
metadata, options = get_config()
File "/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup_posix.py", line 47, in get_config
libs = mysql_config("libs_r")
File "/tmp/pip-install-c31l6e8r/mysqlclient_8f4f36bd64564c3095482e0cb4995727/setup_posix.py", line 29, in mysql_config
raise EnvironmentError("%s not found" % (mysql_config.path,))
OSError: mysql_config not found
----------------------------------------
WARNING: Discarding https://files.pythonhosted.org/packages/6a/91/bdfe808fb5dc99a5f65833b370818161b77ef6d1e19b488e4c146ab615aa/mysqlclient-1.3.0.tar.gz#sha256=06eb5664e3738b283ea2262ee60ed83192e898f019cc7ff251f4d05a564ab3b7 (from https://pypi.org/simple/mysqlclient/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
Collecting synch
Downloading synch-0.6.7-py3-none-any.whl (38 kB)
Collecting sqlparse
Downloading sqlparse-0.4.1-py3-none-any.whl (42 kB)
|████████████████████████████████| 42 kB 392 kB/s
Collecting synch
Downloading synch-0.6.6-py3-none-any.whl (37 kB)
Downloading synch-0.6.1-py3-none-any.whl (36 kB)
Collecting pydantic
Downloading pydantic-1.8.2-cp37-cp37m-manylinux2014_x86_64.whl (10.1 MB)
|████████████████████████████████| 10.1 MB 740 kB/s
Collecting synch
Downloading synch-0.6.0-py3-none-any.whl (35 kB)
ERROR: Cannot install synch==0.6.0, synch==0.6.1, synch==0.6.6, synch==0.6.7 and synch==0.7.1 because these package versions have conflicting dependencies.
The conflict is caused by:
synch 0.7.1 depends on mysqlclient
synch 0.6.7 depends on mysqlclient
synch 0.6.6 depends on mysqlclient
synch 0.6.1 depends on mysqlclient
synch 0.6.0 depends on mysqlclient
To fix this you could try to:
1. loosen the range of package versions you've specified
2. remove package versions to allow pip attempt to solve the dependency conflict
ERROR: ResolutionImpossible: for help visit https://pip.pypa.io/en/latest/user_guide/#fixing-conflicting-dependencies
可以不填写redis和kafka进行启动吗
Hi, i'm getting the fillowing error from the producer(using docker compose, with dev tag, as sentry breaks latest tag)
what might cause it?
i see producer is connected, and pulling data + sending to queue
some records have this error, is there a way to overcome it?
producer_1 | Traceback (most recent call last):
producer_1 | File "/usr/local/bin/synch", line 5, in <module>
producer_1 | cli()
producer_1 | File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1137, in __call__
producer_1 | return self.main(*args, **kwargs)
producer_1 | File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1062, in main
producer_1 | rv = self.invoke(ctx)
producer_1 | File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1668, in invoke
producer_1 | return _process_result(sub_ctx.command.invoke(sub_ctx))
producer_1 | File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
producer_1 | return ctx.invoke(self.callback, **ctx.params)
producer_1 | File "/usr/local/lib/python3.9/site-packages/click/core.py", line 763, in invoke
producer_1 | return __callback(*args, **kwargs)
producer_1 | File "/usr/local/lib/python3.9/site-packages/click/decorators.py", line 26, in new_func
producer_1 | return f(get_current_context(), *args, **kwargs)
producer_1 | File "/synch/synch/cli.py", line 91, in produce
producer_1 | reader.start_sync(broker)
producer_1 | File "/synch/synch/reader/mysql.py", line 109, in start_sync
producer_1 | for schema, table, event, file, pos in self._binlog_reading(
producer_1 | File "/synch/synch/reader/mysql.py", line 178, in _binlog_reading
producer_1 | for row in binlog_event.rows:
producer_1 | File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 433, in rows
producer_1 | self._fetch_rows()
producer_1 | File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 428, in _fetch_rows
producer_1 | self.__rows.append(self._fetch_one_row())
producer_1 | File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 481, in _fetch_one_row
producer_1 | row["values"] = self._read_column_data(self.columns_present_bitmap)
producer_1 | File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 132, in _read_column_data
producer_1 | values[name] = self.__read_string(1, column)
producer_1 | File "/usr/local/lib/python3.9/site-packages/pymysqlreplication/row_event.py", line 224, in __read_string
producer_1 | string = string.decode(encoding)
producer_1 | UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte
假设用来同步日志的话,mysql表里面的日志是定期删除的,这样click house是不会同步删除数据的吧?
环境:centos 7, pip version = 20.1.1 , python -V = 3.6.5
pip install synch[all]后没有生成bin目录下的synch执行文件。所有依赖安装好了的。
同样的synch,读同样的表 。mysql8.0.13 版本的 slave 就不报错 ,换成mysql 8.0.20的 slave 就报这个错
File "/usr/local/python383/lib/python3.8/site-packages/synch/reader/mysql.py", line 185, in _binlog_reading
for row in binlog_event.rows:
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 433, in rows
self._fetch_rows()
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 428, in _fetch_rows
self.__rows.append(self._fetch_one_row())
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 481, in _fetch_one_row
row["values"] = self._read_column_data(self.columns_present_bitmap)
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 132, in _read_column_data
values[name] = self.__read_string(1, column)
File "/usr/local/python383/lib/python3.8/site-packages/pymysqlreplication/row_event.py", line 224, in __read_string
string = string.decode(encoding)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa6 in position 2: invalid start byte
支持windows调试吗
运行:
synch --alias mysql_db etl --renew
报错信息:
Traceback (most recent call last):
File "f:\python38\lib\runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "f:\python38\lib\runpy.py", line 87, in run_code
exec(code, run_globals)
File "F:\Python38\Scripts\synch.exe_main.py", line 7, in
File "f:\python38\lib\site-packages\click\core.py", line 829, in call
return self.main(*args, **kwargs)
File "f:\python38\lib\site-packages\click\core.py", line 782, in main
rv = self.invoke(ctx)
File "f:\python38\lib\site-packages\click\core.py", line 1256, in invoke
Command.invoke(self, ctx)
File "f:\python38\lib\site-packages\click\core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "f:\python38\lib\site-packages\click\core.py", line 610, in invoke
return callback(*args, **kwargs)
File "f:\python38\lib\site-packages\click\decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "f:\python38\lib\site-packages\synch\cli.py", line 36, in cli
init(config)
File "f:\python38\lib\site-packages\synch\factory.py", line 159, in init
Settings.init(config_file)
File "f:\python38\lib\site-packages\synch\settings.py", line 12, in init
with open(file_path, "r") as f:
FileNotFoundError: [Errno 2] No such file or directory: './synch.yaml'
我是从 mysql-clickhouse-replication看到mysql2ch的,如果是小规模业务使用,勉强够用。
为什么这么说呢,因为clickhouse的优势是批量写入和OLAP,在数据的修复和删除方面,性能非常的差,在这个项目中,只是从功能上满足的同步的需求,但性能方面有较大的问题。
基于MergeTree引擎来做,如何做到大数据量下的数据实时同步,通过实践来看,MergeTree家族中VersionedCollapsingMergeTree是一个更适合同步场景的方案。官方链接:https://clickhouse.tech/docs/zh/engines/table-engines/mergetree-family/versionedcollapsingmergetree/#versionedcollapsingmergetree
版本折叠合并树
这个引擎:
允许快速写入不断变化的对象状态。
删除后台中的旧对象状态。 这显着降低了存储体积。
请参阅部分 崩溃 有关详细信息。
引擎继承自 MergeTree 并将折叠行的逻辑添加到合并数据部分的算法中。 VersionedCollapsingMergeTree 用于相同的目的 折叠树 但使用不同的折叠算法,允许以多个线程的任何顺序插入数据。 特别是, Version 列有助于正确折叠行,即使它们以错误的顺序插入。
非常适合数据库同步,特别是mysql数据在不断变化的同步,因此基于这个引擎实现了我们内部有mysql--->clickhouse同步方案。
此方案是将DML全部转换成insert语句,依靠clickhouse的批量写入优势,再结合VersionedCollapsingMergeTree引擎的特点,基于version进行数据版本控制,最终形成升级的方案。
从原理上来看,实现思路是一致的,如:
1、kafka、redis支持;
2、多线程支持;
3、数据库、数据表的自定义配置;
4、基于binlog;
5、ETL外部动态加载方法;
6、高实时,TPS相较于此方案,有数量级的提升,特别是数据库高并发场景下;
缺点:查询会稍微麻烦一点,需要进行查询的聚合,官方给出的sql如下:
SELECT
UserID,
sum(PageViews * Sign) AS PageViews,
sum(Duration * Sign) AS Duration,
Version
FROM UAct
GROUP BY UserID, Version
HAVING sum(Sign) > 0
目前此方案应用于地铁领域的实施数据交易处理,用于大数据报表,主要处理金融业务,目前使用良好,欢迎交流!
这个框架可以加入Mongo sync to Clickhouse的支持吗?直接加个Mongo Reader就可以了吧?
INSERT_NUMS=20000
INSERT_INTERVAL=60
即使过了60s,只要INSERT_NUMS没有达到设定的值,就不会提交!
Synch installation failed, do you have detailed steps?> synch --alias mysql_db etl-h: no synch command found
命令:synch --alias mysql_db etl --schema flink_t --table sych --renew
错误:ImportError: cannot import name 'charset_to_encoding' from 'pymysql.charset' (/usr/local/python3/lib/python3.7/site-packages/pymysql/charset.py)
source database: hexin
source database: hexin_storage
target database:hexin_count
说明:
hexin,hexin_storage这两个源数据库中的表需要时时同步到clickhouse的hexin_count数据库
我在操作过程中报以下错误:
cat /etc/synch.ini
[mysql]
server_id = 33
show master status
result if emptyinit_binlog_file =mysql-bin.001000
show master status
result if emptyinit_binlog_pos =2930840
host = 192.168.66.33
port = 3306
user = root
password = 123456
[mysql.hexin]
tables = hexin_erp_storage_out_detail,hexin_erp_storage_out_main,hexin_erp_order,hexin_erp_order_goods
kafka_partition = 0
[root@k8smaster ~]# synch --config /etc/synch.ini etl --schema hexin_count --renew
Traceback (most recent call last):
File "/usr/local/bin/synch", line 11, in
sys.exit(cli())
File "/usr/local/lib/python3.6/site-packages/synch/cli.py", line 66, in cli
parse_args.run(parse_args)
File "/usr/local/lib/python3.6/site-packages/synch/cli.py", line 19, in run
args.func(args)
File "/usr/local/lib/python3.6/site-packages/synch/replication/etl.py", line 12, in make_etl
Global.reader.etl_full(Global.writer, schema, tables, renew)
File "/usr/local/lib/python3.6/site-packages/synch/reader/init.py", line 32, in etl_full
for table in tables:
TypeError: 'NoneType' object is not iterable
可以实现不同库中的表同步到同一个clickhouse的同一个数据库中吗?
(sync2) [root@hadoop1 sync2]# synch -c synch.yaml --alias mysql_db produce
Traceback (most recent call last):
File "/data/sync2/sync2/bin/synch", line 8, in
sys.exit(cli())
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 829, in call
return self.main(*args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/cli.py", line 91, in produce
broker = get_broker(alias)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/factory.py", line 87, in get_broker
b = KafkaBroker(alias)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/broker/kafka.py", line 27, in init
key_serializer=lambda x: x.encode(),
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/producer/kafka.py", line 383, in init
**self.config)
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/client_async.py", line 244, in init
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
(sync2) [root@hadoop1 sync2]#
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.