m358807551 / mysqlsmom Goto Github PK
View Code? Open in Web Editor NEW同步mysql数据到elasticsearch的工具,功能丰富,用法简单,配置灵活,扩展性强;
同步mysql数据到elasticsearch的工具,功能丰富,用法简单,配置灵活,扩展性强;
python3会考虑支持吗
感谢大佬开源的项目,解决了困扰我许久的同步问题
[root@03AF15-A14 video]# mom run -c cron_config.py 2020-03-31 12:38:00,710 apscheduler.scheduler INFO Adding job tentatively -- it will be properly scheduled when the scheduler starts 2020-03-31 12:38:00,711 apscheduler.scheduler INFO Added job "do_one_task" to job store "default" 2020-03-31 12:38:00,711 apscheduler.scheduler INFO Scheduler started 2020-03-31 12:38:00,712 apscheduler.executors.default INFO Running job "do_one_task (trigger: interval[0:00:10], next run at: 2020-03-31 12:38:00 CST)" (scheduled at 2020-03-31 12:38:00.617124+08:00) 2020-03-31 12:38:01,113 apscheduler.executors.default ERROR Job "do_one_task (trigger: interval[0:00:10], next run at: 2020-03-31 12:38:10 CST)" raised an exception Traceback (most recent call last): File "/usr/lib/python2.7/site-packages/apscheduler/executors/base.py", line 125, in run_job retval = job.func(*job.args, **job.kwargs) File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 321, in do_one_task query = MyModel.raw(task["stream"]["sql"].replace("?", "%s"), (last_start_time,)).dicts().iterator() File "/usr/lib/python2.7/site-packages/peewee.py", line 1637, in iterator return iter(self.execute(database).iterator()) File "/usr/lib/python2.7/site-packages/peewee.py", line 1560, in inner return method(self, database, *args, **kwargs) File "/usr/lib/python2.7/site-packages/peewee.py", line 1631, in execute return self._execute(database) File "/usr/lib/python2.7/site-packages/peewee.py", line 1680, in _execute cursor = database.execute(self) File "/usr/lib/python2.7/site-packages/peewee.py", line 2638, in execute return self.execute_sql(sql, params, commit=commit) File "/usr/lib/python2.7/site-packages/peewee.py", line 2632, in execute_sql self.commit() File "/usr/lib/python2.7/site-packages/peewee.py", line 2424, in __exit__ reraise(new_type, new_type(*exc_args), traceback) File "/usr/lib/python2.7/site-packages/peewee.py", line 2625, in execute_sql cursor.execute(sql, params or ()) File "/usr/lib/python2.7/site-packages/pymysql/cursors.py", line 170, in execute result = self._query(query) File "/usr/lib/python2.7/site-packages/pymysql/cursors.py", line 328, in _query conn.query(q) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 516, in query self._affected_rows = self._read_query_result(unbuffered=unbuffered) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 727, in _read_query_result result.read() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1073, in read self._read_result_packet(first_packet) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1143, in _read_result_packet self._read_rowdata_packet() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1177, in _read_rowdata_packet packet = self.connection._read_packet() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 670, in _read_packet % (packet_number, self._next_seq_id)) InternalError: Packet sequence number wrong - got 1 expected 16
怎么解决?
RT 如何给数据动态的设置index的,和index的名字?
遇到ES有鉴权的时候,会报错
2019-12-10 16:54:19,241 elasticsearch WARNING Undecodable raw error response from server: No JSON object could be decoded: line 1 column 0 (char 0) Traceback (most recent call last): File "/usr/local/bin/mom", line 11, in <module> sys.exit(cli()) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 722, in __call__ return self.main(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 697, in main rv = self.invoke(ctx) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 1066, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 895, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 535, in invoke return callback(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 416, in run handle_init_stream(config_) File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 193, in handle_init_stream to_dest.upload_docs() File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 137, in upload_docs helpers.bulk(es, self.docs) File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 300, in bulk for ok, item in streaming_bulk(client, actions, *args, **kwargs): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 230, in streaming_bulk **kwargs File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 116, in _process_bulk_chunk raise e elasticsearch.exceptions.AuthenticationException: AuthenticationException(401, u'Unauthorized')
模版配置是:
NODES= [{"host": "10.10.10.10", "port": 8080}]
鉴权后修改成为如下即可:
NODES = ["http://user:[email protected]:8080"]
您好,我们计划使用您的这个工具同步mysql和es。使用中想完成这些功能:
1、初始化,针对大表,实现区间查询;
2、针对增量,时间辍不使用上次运行时间,而使用上批数据最大时间。并支持:"select id, name from person where update_time >= ?"
->"select id, name from person where update_time >= ? and _id > ?"
即记录的上次增量的最大时间辍及主键id(我们有些场景,偶尔少量一微秒更新几十万数据)。这样更新大量数据,也可一次取只定条数,并记录本次更新到哪个时间辍和记录id。
3、支持嵌套查询
4、增量模式支持delete
您看,是否有计划完成这些功能?或者我们增加向你提交也行。
纯Pythoo编写;
mysql 表中部分字段更新,会造成 mysqlsmom 异常,无法继续执行。
sql :
update tb_emp set message="update" where id=20190715101404;
error:
elasticsearch.exceptions.RequestError: RequestError(400, u'action_request_validation_exception', u'Validation Failed: 1: id is missing;')
2019-07-08 17:31:42,149 root INFO {"status": "\u0001", "lang": -1, "update_time": "2019-07-08 17:23:37", "github": null, "star": 5, "descr": "api", "tags": "["api", "springboot"]", "favicon": "/favicon/spring.png", "title": "SpringBoot", "show": "\u0001", "gitee": null, "link": "https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/api/", "insert_time": "2016-11-28 16:14:01", "indexed": "springboot", "sorted": 1110, "_id": 423, "type": 100026, "id": 423}
2019-07-08 17:31:42,161 elasticsearch INFO POST http://localhost:9200/_bulk [status:200 request:0.011s]
2019-07-08 17:31:42,164 apscheduler.executors.default ERROR Job "do_one_task (trigger: interval[0:01:00], next run at: 2019-07-08 17:32:42 CST)" raised an exception
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/apscheduler/executors/base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 339, in do_one_task
to_dest.upload_docs()
File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 141, in upload_docs
raise e
BulkIndexError: (u'1 document(s) failed to index.', [{u'update': {u'status': 400, u'_type': u'open', u'_index': u'admin', u'error': {u'caused_by': {u'reason': u'Failed to parse value [\x01] as only [true] or [false] are allowed.', u'type': u'illegal_argument_exception'}, u'reason': u'failed to parse field [show] of type [boolean]', u'type': u'mapper_parsing_exception'}, u'_id': u'423', u'data': {'doc': {u'status': '\x01', u'lang': -1, u'update_time': datetime.datetime(2019, 7, 8, 17, 23, 37), u'github': None, u'star': 5, u'descr': u'api', u'tags': u'["api", "springboot"]', u'favicon': u'/favicon/spring.png', u'title': u'SpringBoot', u'show': '\x01', u'gitee': None, u'link': u'https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/api/', u'insert_time': datetime.datetime(2016, 11, 28, 16, 14, 1), u'indexed': u'springboot', u'sorted': 1110, u'type': 100026, u'id': 423}, 'doc_as_upsert': True}}}])
^C
Aborted!
pypi 上面已经删掉这个包了 找不到了 ,源码包clone 下来python setup.py install 也报各种2.7的错误 能不能开发python3 的版本
Traceback (most recent call last):
File "mysqlsmom.py", line 213, in
handle_init_stream(config_module)
File "mysqlsmom.py", line 113, in handle_init_stream
for row in query:
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3672, in iterator
yield self.iterate(False)
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3660, in iterate
result = self.process_row(row)
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 6348, in process_row
result[attr] = convertersi
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3896, in python_value
return value if value is None else self.adapt(value)
ValueError: invalid literal for int() with base 10: 'stff01'
使用大佬的工具运行
mom run -c ./test_mom/cron_config.py
报错了
AttributeError: 'RawQuery' object has no attribute 'iterator'
ElasticSearch用的是最新的 7.4
安装时没有这个选择,就选择了
pip install --upgrade elasticsearch==7.1.0
mom new test_mom/init_config.py -t init --force
vim ./test_mom/init_config.py # 按注释提示修改配置
执行一段时间,大概几分钟,然后就报Killed,什么信息都没有。
[root@luyaodev local]# mom run -c ./test_mom/init_config.py
Killed
[root@luyaodev local]#
mysqlsmom 手动运行 没多久就自动停了 没有日志 也没有报错
i mean, document not find any how to use my_filters.py?, do you want to update it?
pip install mysqlsmom的时候找不到
binlog记录一直卡在:100105408位置,日志也不再打印,也没有报错,持续这个状态很久了,数据库中的binlog位置已经到了126084862。重启程序后也没有任何变化,没有任何日志打印,不知道该如何排查该问题
Traceback (most recent call last):
File "mysqlsmom.py", line 223, in
handle_init_stream(config_module)
File "mysqlsmom.py", line 131, in handle_init_stream
rows = do_pipeline(job["pipeline"], event["values"])
File "mysqlsmom.py", line 65, in do_pipeline
func_name, kwargs = line.items()[0]
TypeError: 'dict_items' object does not support indexing
playhouse/_sqlite_ext.c:5386:21: 错误:‘sqlite3_index_info’没有名为‘estimatedRows’的成员
__pyx_v_pIdxInfo->estimatedRows = 0x186A0;
您好,请问一下做这个在哪里修改代码,修改mapping设置,使得某中文字段在ik分词插件下构建索引,并进行搜索?
比如:我的question字段需要设置:
"question": {
"type": "text",
"analyzer": "ik_smart",
"search_analyzer": "ik_max_word",
}
但是我再代码目录结构中没有找到添加的地方
期待作者的回复,谢谢!!!
错误
Traceback (most recent call last):
File "mysqlsmom.py", line 357, in <module>
handle_cron_stream(config_module)
File "mysqlsmom.py", line 324, in handle_cron_stream
scheduler = BlockingScheduler()
File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 83, in __init__
self.configure(gconfig, **options)
File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 122, in configure
self._configure(config)
File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 691, in _configure
self.timezone = astimezone(config.pop('timezone', None)) or get_localzone()
File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/tzlocal/unix.py", line 131, in get_localzone
_cache_tz = _get_localzone()
File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/tzlocal/unix.py", line 56, in _get_localzone
with open(tzpath, 'rb') as tzfile:
IOError: [Errno 21] Is a directory: '/etc/timezone'
配置:
# 修改数据库连接
CONNECTION = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': ''
}
# redis存储上次同步时间等信息
REDIS = {
"host": "192.168.1.240",
"port": 6379,
"db": 0,
"password": "", # 不需要密码则注释或删掉该行
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
# 修改elasticsearch节点
NODES = [{"host": "192.168,1,240", "port": 9200}]
TASKS = [
{
"stream": {
"database": "welian_commodity", # 在此数据库执行sql语句
"sql": "select * from commodity where modity_time >= ?", # 将该sql语句选中的数据同步到 elasticsearch
"seconds": 10, # 每隔 seconds 秒同步一次,
"init_time": "2018-08-29 15:00:00" # 只有第一次同步会加载
},
"jobs": [
{
"pipeline": [
{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "test_index", # 设置 index
"type": "test" # 设置 type
}
}
}
]
}
]
如果设置了bulk_size 修改的数据不足bulk_size 有没有办法把余量给同步了
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.