Git Product home page Git Product logo

mysqlsmom's People

Contributors

m358807551 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mysqlsmom's Issues

mysqlsmom 线程报错

[root@03AF15-A14 video]# mom run -c cron_config.py 2020-03-31 12:38:00,710 apscheduler.scheduler INFO Adding job tentatively -- it will be properly scheduled when the scheduler starts 2020-03-31 12:38:00,711 apscheduler.scheduler INFO Added job "do_one_task" to job store "default" 2020-03-31 12:38:00,711 apscheduler.scheduler INFO Scheduler started 2020-03-31 12:38:00,712 apscheduler.executors.default INFO Running job "do_one_task (trigger: interval[0:00:10], next run at: 2020-03-31 12:38:00 CST)" (scheduled at 2020-03-31 12:38:00.617124+08:00) 2020-03-31 12:38:01,113 apscheduler.executors.default ERROR Job "do_one_task (trigger: interval[0:00:10], next run at: 2020-03-31 12:38:10 CST)" raised an exception Traceback (most recent call last): File "/usr/lib/python2.7/site-packages/apscheduler/executors/base.py", line 125, in run_job retval = job.func(*job.args, **job.kwargs) File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 321, in do_one_task query = MyModel.raw(task["stream"]["sql"].replace("?", "%s"), (last_start_time,)).dicts().iterator() File "/usr/lib/python2.7/site-packages/peewee.py", line 1637, in iterator return iter(self.execute(database).iterator()) File "/usr/lib/python2.7/site-packages/peewee.py", line 1560, in inner return method(self, database, *args, **kwargs) File "/usr/lib/python2.7/site-packages/peewee.py", line 1631, in execute return self._execute(database) File "/usr/lib/python2.7/site-packages/peewee.py", line 1680, in _execute cursor = database.execute(self) File "/usr/lib/python2.7/site-packages/peewee.py", line 2638, in execute return self.execute_sql(sql, params, commit=commit) File "/usr/lib/python2.7/site-packages/peewee.py", line 2632, in execute_sql self.commit() File "/usr/lib/python2.7/site-packages/peewee.py", line 2424, in __exit__ reraise(new_type, new_type(*exc_args), traceback) File "/usr/lib/python2.7/site-packages/peewee.py", line 2625, in execute_sql cursor.execute(sql, params or ()) File "/usr/lib/python2.7/site-packages/pymysql/cursors.py", line 170, in execute result = self._query(query) File "/usr/lib/python2.7/site-packages/pymysql/cursors.py", line 328, in _query conn.query(q) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 516, in query self._affected_rows = self._read_query_result(unbuffered=unbuffered) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 727, in _read_query_result result.read() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1073, in read self._read_result_packet(first_packet) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1143, in _read_result_packet self._read_rowdata_packet() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1177, in _read_rowdata_packet packet = self.connection._read_packet() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 670, in _read_packet % (packet_number, self._next_seq_id)) InternalError: Packet sequence number wrong - got 1 expected 16
怎么解决?

elasticsearch 有鉴权,如何配置

遇到ES有鉴权的时候,会报错
2019-12-10 16:54:19,241 elasticsearch WARNING Undecodable raw error response from server: No JSON object could be decoded: line 1 column 0 (char 0) Traceback (most recent call last): File "/usr/local/bin/mom", line 11, in <module> sys.exit(cli()) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 722, in __call__ return self.main(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 697, in main rv = self.invoke(ctx) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 1066, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 895, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 535, in invoke return callback(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 416, in run handle_init_stream(config_) File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 193, in handle_init_stream to_dest.upload_docs() File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 137, in upload_docs helpers.bulk(es, self.docs) File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 300, in bulk for ok, item in streaming_bulk(client, actions, *args, **kwargs): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 230, in streaming_bulk **kwargs File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 116, in _process_bulk_chunk raise e elasticsearch.exceptions.AuthenticationException: AuthenticationException(401, u'Unauthorized')

模版配置是:
NODES= [{"host": "10.10.10.10", "port": 8080}]

鉴权后修改成为如下即可:
NODES = ["http://user:[email protected]:8080"]

一些建议

您好,我们计划使用您的这个工具同步mysql和es。使用中想完成这些功能:
1、初始化,针对大表,实现区间查询;
2、针对增量,时间辍不使用上次运行时间,而使用上批数据最大时间。并支持:"select id, name from person where update_time >= ?"
->"select id, name from person where update_time >= ? and _id > ?"

即记录的上次增量的最大时间辍及主键id(我们有些场景,偶尔少量一微秒更新几十万数据)。这样更新大量数据,也可一次取只定条数,并记录本次更新到哪个时间辍和记录id。
3、支持嵌套查询
4、增量模式支持delete

您看,是否有计划完成这些功能?或者我们增加向你提交也行。

mysql update 同步异常

mysql 表中部分字段更新,会造成 mysqlsmom 异常,无法继续执行。
sql :
update tb_emp set message="update" where id=20190715101404;
error:
elasticsearch.exceptions.RequestError: RequestError(400, u'action_request_validation_exception', u'Validation Failed: 1: id is missing;')

es6.6 同步有问题

2019-07-08 17:31:42,149 root INFO {"status": "\u0001", "lang": -1, "update_time": "2019-07-08 17:23:37", "github": null, "star": 5, "descr": "api", "tags": "["api", "springboot"]", "favicon": "/favicon/spring.png", "title": "SpringBoot", "show": "\u0001", "gitee": null, "link": "https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/api/", "insert_time": "2016-11-28 16:14:01", "indexed": "springboot", "sorted": 1110, "_id": 423, "type": 100026, "id": 423}
2019-07-08 17:31:42,161 elasticsearch INFO POST http://localhost:9200/_bulk [status:200 request:0.011s]
2019-07-08 17:31:42,164 apscheduler.executors.default ERROR Job "do_one_task (trigger: interval[0:01:00], next run at: 2019-07-08 17:32:42 CST)" raised an exception
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/apscheduler/executors/base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 339, in do_one_task
to_dest.upload_docs()
File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 141, in upload_docs
raise e
BulkIndexError: (u'1 document(s) failed to index.', [{u'update': {u'status': 400, u'_type': u'open', u'_index': u'admin', u'error': {u'caused_by': {u'reason': u'Failed to parse value [\x01] as only [true] or [false] are allowed.', u'type': u'illegal_argument_exception'}, u'reason': u'failed to parse field [show] of type [boolean]', u'type': u'mapper_parsing_exception'}, u'_id': u'423', u'data': {'doc': {u'status': '\x01', u'lang': -1, u'update_time': datetime.datetime(2019, 7, 8, 17, 23, 37), u'github': None, u'star': 5, u'descr': u'api', u'tags': u'["api", "springboot"]', u'favicon': u'/favicon/spring.png', u'title': u'SpringBoot', u'show': '\x01', u'gitee': None, u'link': u'https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/api/', u'insert_time': datetime.datetime(2016, 11, 28, 16, 14, 1), u'indexed': u'springboot', u'sorted': 1110, u'type': 100026, u'id': 423}, 'doc_as_upsert': True}}}])
^C
Aborted!

请求开发python3的版本

pypi 上面已经删掉这个包了 找不到了 ,源码包clone 下来python setup.py install 也报各种2.7的错误 能不能开发python3 的版本

当在MySQL中的id为字符串时会报错

Traceback (most recent call last):
File "mysqlsmom.py", line 213, in
handle_init_stream(config_module)
File "mysqlsmom.py", line 113, in handle_init_stream
for row in query:
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3672, in iterator
yield self.iterate(False)
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3660, in iterate
result = self.process_row(row)
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 6348, in process_row
result[attr] = convertersi
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3896, in python_value
return value if value is None else self.adapt(value)
ValueError: invalid literal for int() with base 10: 'stff01'

配置好,然后运行一段时间后报Killed

ElasticSearch用的是最新的 7.4
安装时没有这个选择,就选择了
pip install --upgrade elasticsearch==7.1.0
mom new test_mom/init_config.py -t init --force
vim ./test_mom/init_config.py # 按注释提示修改配置
执行一段时间,大概几分钟,然后就报Killed,什么信息都没有。
[root@luyaodev local]# mom run -c ./test_mom/init_config.py
Killed
[root@luyaodev local]#

程序不再读取binlog日志

binlog记录一直卡在:100105408位置,日志也不再打印,也没有报错,持续这个状态很久了,数据库中的binlog位置已经到了126084862。重启程序后也没有任何变化,没有任何日志打印,不知道该如何排查该问题

TypeError: 'dict_items' object does not support indexing

Traceback (most recent call last):
File "mysqlsmom.py", line 223, in
handle_init_stream(config_module)
File "mysqlsmom.py", line 131, in handle_init_stream
rows = do_pipeline(job["pipeline"], event["values"])
File "mysqlsmom.py", line 65, in do_pipeline
func_name, kwargs = line.items()[0]
TypeError: 'dict_items' object does not support indexing

中文字段使用ik插件进行分词

您好,请问一下做这个在哪里修改代码,修改mapping设置,使得某中文字段在ik分词插件下构建索引,并进行搜索?

比如:我的question字段需要设置:
"question": {
"type": "text",
"analyzer": "ik_smart",
"search_analyzer": "ik_max_word",
}

但是我再代码目录结构中没有找到添加的地方

期待作者的回复,谢谢!!!

执行 python mysqlsmom.py config/example_cron.py 报错

错误

Traceback (most recent call last):
  File "mysqlsmom.py", line 357, in <module>
    handle_cron_stream(config_module)
  File "mysqlsmom.py", line 324, in handle_cron_stream
    scheduler = BlockingScheduler()
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 83, in __init__
    self.configure(gconfig, **options)
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 122, in configure
    self._configure(config)
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 691, in _configure
    self.timezone = astimezone(config.pop('timezone', None)) or get_localzone()
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/tzlocal/unix.py", line 131, in get_localzone
    _cache_tz = _get_localzone()
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/tzlocal/unix.py", line 56, in _get_localzone
    with open(tzpath, 'rb') as tzfile:
IOError: [Errno 21] Is a directory: '/etc/timezone'
配置:
# 修改数据库连接
CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': ''
}

# redis存储上次同步时间等信息
REDIS = {
    "host": "192.168.1.240",
    "port": 6379,
    "db": 0,
    "password": "",  # 不需要密码则注释或删掉该行
}

# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1

# 修改elasticsearch节点
NODES = [{"host": "192.168,1,240", "port": 9200}]

TASKS = [
    {
        "stream": {
            "database": "welian_commodity",  # 在此数据库执行sql语句
            "sql": "select * from commodity where modity_time >= ?",  # 将该sql语句选中的数据同步到 elasticsearch
            "seconds": 10,  # 每隔 seconds 秒同步一次,
            "init_time": "2018-08-29 15:00:00"  # 只有第一次同步会加载
        },
        "jobs": [
            {
                "pipeline": [
                    {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        "index": "test_index",   # 设置 index
                        "type": "test"          # 设置 type     
                    }
                }
            }
        ]
    }
]

BULK_SIZE 设置成1000

如果设置了bulk_size 修改的数据不足bulk_size 有没有办法把余量给同步了

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.