Git Product home page Git Product logo

bireme's Introduction

bireme

Build Status

中文文档

Getting Started Guide

Bireme is an incremental synchronization tool for the Greenplum / HashData data warehouse. It currently supports MySQL, PostgreSQL and MongoDB data sources.

Greenplum is an advanced, fully functional open source data warehouse that provides powerful and fast analysis of the amount of petabyte data. It is uniquely oriented for large data analysis and is supported by the world's most advanced cost-based query optimizer. It can provide high query performance over large amounts of data.

HashData is a flexible cloud data warehouses built based on Greenplum.

Bireme uses DELETE + COPY to synchronize the modification records of the data source to Greenplum / HashData. This mode is faster and better than INSERT + UPDATE + DELETE.

Features and Constraints:

  • Using small batch loading to enhance the performance of data synchronization. The default load delay time is 10 seconds.
  • All tables must have primary keys in the target database.

1.1 Data Flow

data_flow

Bireme supports synchronization work of multiple data sources. It can simultaneously read records from multiple data sources in parallel, and load records to the target database.

1.2 Data Source

1.2.1 Maxwell + Kafka

Maxwell + Kafka is a data source type that bireme currently supports. The structure is as follows:

maxwell

  • Maxwell is an application that reads MySQL binlogs and writes row updates to Kafka as JSON.

1.2.2 Debezium + Kafka

Debezium + Kafka is another data source type that bireme currently supports. The structure is as follows:

debezium

  • Debezium is a distributed platform that turns your existing databases into event streams, so that applications can see and respond immediately to each row-level change in the databases.

1.3 How does bireme work

Bireme reads records from the data source, delivers them into separate pipelines. In each pipeline, bireme converts them into internal format and caches them. When the cached records reaches a certain amount, they are merged into a task. Each task contains two collections, delete collection and insert collection. It finally updates the records to the target database.

Each data source may have several pipelines. For maxwell, each Kafka partition corresponds to a pipeline and for debezium, each Kafka topic corresponds to a pipeline.

bireme

The following picture depicts how change data is processed in a pipeline.

pipeline

1.4 Introduction to configuration files

The configuration files consist of two parts:

  • Basic configuration file: The default is config.properties, which contains the basic configuration of bireme.
  • Table mapping file: <source_name>.properties. Each data source corresponds to a file, which specifies the table to be synchronized and the corresponding table in the target database. <Source_name> is specified in the config.properties file.

1.4.1 config.properties

Required parameters

Parameters Description
target.url Address of the target database. Format:
jdbc:postgresql://<ip>:<port>/<database>
target.user The user name used to connect to the target database
target.passwd The password used to connect to the target database
data.source Specify the data source, which is <source_name>, with multiple data sources separated by commas, ignoring whitespace
<source_name>.type Specify the type of data source, for example maxwell

Note: The data source name is just a symbol for convinence. It can be modified as needed.

Parameters for Maxwell data source

Parameters Description
<source_name>.kafka.server Kafka address. Format:
<ip>:<port>
<source_name>.kafka.topic Corresponding topic of data source
<source_name>.kafka.groupid Kafka consumer group id. Default value is bireme

Parameters for Debezium data source

Parameters Description
<source_name>.kafka.server Kafka address. Format:
<ip>:<port>
<source_name>.kafka.groupid Kafka consumer group id. Default value is bireme
<source_name>.kafka.namespace Debezium's name.

Other parameters

Parameters Description Default
pipeline.thread_pool.size Thread pool size for Pipeline 5
transform.thread_pool.size Thread pool size for Transform 10
merge.thread_pool.size Thread pool size for Merge 10
merge.interval Maxmium interval between Merge in milliseconds 10000
merge.batch.size Maxmium number of Row in one Merge 50000
loader.conn_pool.size Number of connections to target database, which is less or equal to the number of Change Loaders 10
loader.task_queue.size The length of task queue in each Change Loader 2
metrics.reporter Bireme specifies two monitoring modes, consolo or jmx. If you do not need to monitor, you can specify this as none jmx
metrics.reporter.console.interval Time interval between metrics output in seconds. It is valid as long as metrics.reporter is console 10
state.server.port Port for state server 8080
state.server.addr IP address for state server 0.0.0.0

1.4.2 <source_name>.properties

In the configuration file for each data source, specify the table which the data source includes, and the corresponding table in the target database.

<OriginTable_1> = <MappedTable_1>
<OriginTable_2> = <MappedTable_2>
...

1.5 Monitoring

HTTP Server

Bireme starts a light HTTP server for acquiring current Load State.

When the HTTP server is started the following endpoints are exposed:

Endpoint Description
/ Get the load state for all data source.
/<data source> Get the load state for the given data source.

The result is organized in JSON format. Using parameter pretty will print the user-friendly result.

Example

The following is an example of Load State:

{
  "source_name": "XXX",
  "type": "XXX"
  "pipelines": [
    {
      "name": "XXXXXX",
      "latest": "yyyy-MM-ddTHH:mm:ss.SSSZ",
      "delay": XX.XXX,
      "state": "XXXXX"
    },
    {
      "name": "XXXXXX",
      "latest": "yyyy-MM-ddTHH:mm:ss.SSSZ",
      "delay": XX.XXX,
      "state": "XXXXX"
    },
  ]
}
  • source_name is the name of queried data source, as designated in the configuration file.
  • type is the type of data source.
  • pipelines is an array, every element in which corresponds to a pipeline. (Every data source may have several separate pipelines.)
  • name is the pipeline's name.
  • latest is produce time of latest change data that have been successfully loaded to hashdata.
  • delay is the time period for change data from entering bireme to being committed to data source.
  • state is the pipeline's state.

1.6 Reference

Maxwell Reference
Debezium Reference
Kafka Reference

bireme's People

Contributors

rebeccazxy avatar rucfisher avatar wangzw avatar zhouyuze avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bireme's Issues

Bireme启动正常,maxwell的Mysql 同步至Greenplum,在mysql中插入数据greenplum中没有反应。。。

12:56:50 INFO Bireme - Start getting metadata of target tables from target database.
12:56:50 INFO Bireme - Finish getting metadata of target tables from target database.
12:56:50 INFO Bireme - Start establishing connections for loaders.
12:56:50 INFO Bireme - Finishing establishing 1 connections for loaders.
12:56:50 INFO ChangeLoader - Loader Start, corresponding table stg.test.
12:56:50 INFO TaskGenerator - TaskGenerator Start.
12:56:50 INFO Dispatcher - Dispatcher Start.
12:56:50 INFO Provider mysql - Provider mysql Start.

请问哪里出问题了呢?

Bireme 同步失败

Bireme如果和maxwell一起启动的时候,数据可以正常同步,但是在正常同步后Bireme restart后同步就停止了。。。BIreme的启动日志是正常的。。。请问可以如何解决呢~

Failed to start bireme service

启动bireme 失败,日志如下,请问该如何解决呢?

12:47:26 FATAL Bireme - Stack Trace:
java.lang.NullPointerException: null
at cn.hashdata.bireme.Config.fetchProviderAndTableMap(Config.java:168) ~[bireme-1.0.0.jar:?]
at cn.hashdata.bireme.Config.dataSourceConfig(Config.java:150) ~[bireme-1.0.0.jar:?]
at cn.hashdata.bireme.Config.(Config.java:83) ~[bireme-1.0.0.jar:?]
at cn.hashdata.bireme.Bireme.parseCommandLine(Bireme.java:88) ~[bireme-1.0.0.jar:?]
at cn.hashdata.bireme.Bireme.init(Bireme.java:219) [bireme-1.0.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_65]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_65]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_65]
at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_65]
at org.apache.commons.daemon.support.DaemonLoader.load(DaemonLoader.java:207) [commons-daemon-1.0.15.jar:1.0.15]
12:47:26 INFO Bireme - start Bireme daemon.
12:47:26 INFO Bireme - Start getting metadata of target tables from target database.
12:47:48 INFO Bireme - initialize Bireme daemon
12:47:48 FATAL Bireme - start failed. Message: null.

Support numeric type for Debezium

Debezium will use the predefined Kafka Connect org.apache.kafka.connect.data.Decimal logical type for all DECIMAL and NUMERIC columns. Bireme need to analysis first, then load the data.

NullPointerException when getConnection

13:29:53 INFO Maxwell-mysql-edad-earth_market-0 - Loader for public.tcustomer_service failed. Message: Loader failed. .
13:29:53 INFO Maxwell-mysql-edad-earth_market-0 - Stack Trace:
cn.hashdata.bireme.BiremeException: Loader failed.
at cn.hashdata.bireme.RowCache.startLoad(RowCache.java:147) ~[bireme-1.0.0.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine.call(PipeLine.java:141) [bireme-1.0.0.jar:?]
at cn.hashdata.bireme.pipeline.PipeLine.call(PipeLine.java:39) [bireme-1.0.0.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.lang.NullPointerException
at cn.hashdata.bireme.ChangeLoader.getConnection(ChangeLoader.java:176) ~[bireme-1.0.0.jar:?]
at cn.hashdata.bireme.ChangeLoader.call(ChangeLoader.java:111) ~[bireme-1.0.0.jar:?]
at cn.hashdata.bireme.ChangeLoader.call(ChangeLoader.java:42) ~[bireme-1.0.0.jar:?]
... 4 more

可否提供解析binlog的DDL操作日志

同步库表较多时,表字段变更在目标库操作起来往往会有遗漏, 如果bireme提供了记录这些DDL操作, 则可以知道哪些表发生了变更, 再去手动处理.

bireme是怎样保证数据一致性?

kafka消费存在指定多个Partition时, 会不会在执行update时导致先后顺序问题?

bireme消费过程中出现异常, 好像没有看到数据重新打回到kafka, 那么就可能会造成数据丢失不一致的情况.

数据插入排重有问题

执行过程中终止,在执行bireme.

数据库会报duplicate key value violates unique constraint "external_order_result_log_pkey"

提问

您好:

我在使用BIREME从MYSQL导数据到GP数据库的时候,在执行
DELETE FROM public.bireme_test WHERE EXISTS (SELECT 1 FROM public_bireme_test,bireme_test WHERE public.bireme_test.id=public_bireme_test.id)
从临时表来删除数据的时候,该SQL一直卡在GP数据库无法执行,我只能把源码改成
DELETE FROM " + table + " WHERE id in (" + sbt.substring(0,sbt.length()-1) + ");";
直接从批量的数据中获取主键的值,直接删除。

问题1:
可否帮忙定位一下第一个DELETE语句卡住的原因。
问题2:
请问这两种方式的在性能上有区别吗?

经常出现Unable to get Connection

经常出现连接问题, 这个参数设置需要怎么优化? 还是程序的bug?

2018-03-15 11:31:25 FATAL cn.hashdata.bireme.ChangeLoader getConnection - Unable to get Connection.

重写了TABLE类里面校验表及主键的逻辑,提高初始化的速度

目前生产数据里表比较大,每天的增删改操作比较多,当需要同步的表过多,或者系统表逐渐膨胀的时候,dbMetaData = conn.getMetaData();
rs = dbMetaData.getTables();
rs = dbMetaData.getPrimaryKeys();
几个方法光校验表和主键就要花费10多分钟的时间,会影响数据同步的效率。

查询了POSTGRESQL里JDBC中getTables()和getPrimaryKeys()的原理,通过批量查询校验表及主键,从原有的初始化12分钟缩短到12秒,提供代码参考。

Bireme 类getTableInfo()方法的代码段:
protected void getTableInfo() throws BiremeException {
logger.info("Start getting metadata of target tables from target database.");
Map<String, JSONObject> tableInfoMap = null;
tableMap = cxt.tableMap;
String[] strArray;
Connection conn = BiremeUtility.jdbcConn(cxt.conf.target);
try {
tableInfoMap = GetPrimaryKeys.getPrimaryKeys(cxt.tableMap, conn);
} catch (Exception e) {
String message = "董鉴之写的方法有问题!";
throw new BiremeException(message, e);
}

    for (String fullname : cxt.tableMap.values()) {
        if (cxt.tablesInfo.containsKey(fullname)) {
            continue;
        }
        strArray = fullname.split("\\.");
        cxt.tablesInfo.put(fullname, new Table(strArray[1], tableInfoMap, conn));
    }


    try {
        conn.close();
    } catch (SQLException ignore) {
    }

    logger.info("Finish getting metadata of target tables from target database.");
}

TABLE类重写的代码段:
/**
* Get metadata of a specific table using a given connection and construct a new {@code Table}.
*
* @param tableMap Table name Map
* @param conn Connection to the database
* @throws BiremeException - Wrap and throw Exception which cannot be handled.
*/
public Table(String tableName, Map<String, JSONObject> tableMap, Connection conn) throws BiremeException {
this.ncolumns = 0;
this.columnName = new ArrayList();
this.columnType = new ArrayList();
this.columnPrecision = new ArrayList();
this.columnScale = new ArrayList();
this.keyNames = new ArrayList();
this.keyIndexs = new ArrayList();

    Statement statement = null;
    ResultSet rs = null;
    ResultSetMetaData rsMetaData = null;

    try {
        this.keyIndexs.add(tableMap.get(tableName).getInteger("keyindexs"));
        this.keyNames.add(tableMap.get(tableName).getString("column_name"));

        statement = conn.createStatement();

        String queryTableInfo = "select * from public." + tableName + " where 1=2";
        rs = statement.executeQuery(queryTableInfo);
        rsMetaData = rs.getMetaData();
        this.ncolumns = rsMetaData.getColumnCount();

        for (int i = 0, len = rsMetaData.getColumnCount(); i < len; i++) {
            this.columnName.add(rsMetaData.getColumnName(i + 1));
            this.columnType.add(rsMetaData.getColumnType(i + 1));
            this.columnPrecision.add(rsMetaData.getPrecision(i + 1));
            this.columnScale.add(rsMetaData.getScale(i + 1));
        }
    } catch (SQLException e) {
        try {
            conn.close();
        } catch (SQLException ignore) {
        }
        String message = "Could not get metadata for public." + tableName + ".\n";
        throw new BiremeException(message, e);
    }
}

自己写的校验表和主键的类:
public class GetPrimaryKeys {
private static Logger logger = LogManager.getLogger("Bireme." + GetPrimaryKeys.class);

public static Map<String, JSONObject> getPrimaryKeys(HashMap<String, String> tableMap, Connection conn) throws Exception {
    Statement statement = null;
    ResultSet resultSet = null;
    ResultSet tableRs = null;
    Map<String, JSONObject> table_map = new HashMap<>();
    List<String> checkTableMap = new ArrayList<>();
    String[] strArray;
    StringBuilder sb = new StringBuilder();
    sb.append("(");
    for (String fullname : tableMap.values()) {
        strArray = fullname.split("\\.");
        sb.append("'").append(strArray[1]).append("',");
    }
    String tableList = sb.toString().substring(0, sb.toString().length() - 1) + ")";
    String tableSql = "select tablename from pg_tables where schemaname='public' and tablename in " + tableList + "";
    String prSql = "SELECT NULL AS TABLE_CAT, " +
            "n.nspname  AS TABLE_SCHEM, " +
            "ct.relname AS TABLE_NAME, " +
            "a.attname  AS COLUMN_NAME, " +
            "(i.keys).n AS KEY_SEQ, " +
            "ci.relname AS PK_NAME " +
            "FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) " +
            "JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) " +
            "JOIN ( SELECT i.indexrelid, i.indrelid, i.indisprimary, information_schema._pg_expandarray(i.indkey) AS KEYS FROM pg_catalog.pg_index i) i ON (a.attnum = (i.keys).x AND a.attrelid = i.indrelid) " +
            "JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE TRUE AND n.nspname = 'public' AND ct.relname in " + tableList + " AND i.indisprimary ORDER BY TABLE_NAME, pk_name, key_seq";
    try {
        statement = conn.createStatement();
        tableRs = statement.executeQuery(tableSql);

        while (tableRs.next()) {
            checkTableMap.add(tableRs.getString("tablename"));
        }

        resultSet = statement.executeQuery(prSql);
        while (resultSet.next()) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("keyindexs", resultSet.getInt("KEY_SEQ") - 1);
            jsonObject.put("column_name", resultSet.getString("COLUMN_NAME"));
            table_map.put(resultSet.getString("TABLE_NAME"), jsonObject);
        }


        if (table_map.size() != tableMap.size()) {
            String message = "GP的表和MYSQL的表对不上!";
            throw new BiremeException(message);
        } else {
            logger.info("MYSQL、GREENPLUM同步对应表校验完毕,状态正常!");
        }

        if (table_map.size() != tableMap.size()) {
            String message = "有表没主键!";
            throw new BiremeException(message);
        } else {
            logger.info("GREENPLUM表主键校验完毕,状态正常!");
        }

    } catch (SQLException e) {
        try {
            statement.close();
            conn.close();
        } catch (SQLException ignore) {
        }
        String message = "Could not get PrimaryKeys";
        throw new BiremeException(message, e);
    }

    return table_map;
}

}

Bireme daemon exits with NullPointerException

10:51:45 FATAL Dispatcher - Dispatcher exit on error: java.lang.NullPointerException
10:51:45 INFO Bireme - stop Bireme daemon
10:51:45 INFO Bireme - set stop flag to true
10:51:45 INFO ChangeLoader - Loader exit, corresponding table public.tbpaymentorder.
10:51:45 INFO Bireme - Bireme exit
10:51:45 ERROR Provider maxwell1 - Provider maxwell1 exit on error. Message null
10:51:45 INFO Bireme - destroy Bireme daemon

table映射:
testpayment.tbpaymentorder= public.tbpaymentorder

make Debezium's name configurable

Debezium can configure a name as prefix of kafka topic, Currently we use data source's name as it. We should make it configurable.

导入数据时异常

cn.hashdata.bireme.BiremeException: Fail to create tmporary table.
        at cn.hashdata.bireme.ChangeLoader.createTemporaryTable(ChangeLoader.java:503) ~[bireme-1.0.0.jar:?]
        at cn.hashdata.bireme.ChangeLoader.getConnection(ChangeLoader.java:204) ~[bireme-1.0.0.jar:?]
        at cn.hashdata.bireme.ChangeLoader.call(ChangeLoader.java:116) [bireme-1.0.0.jar:?]
        at cn.hashdata.bireme.ChangeLoader.call(ChangeLoader.java:44) [bireme-1.0.0.jar:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: org.postgresql.util.PSQLException: ERROR: Interconnect error reading register message from 180.163.8.254:41787: format not recognized  (seg1 edaddw05:40000 pid=2699)
  Detail: msgBytes=65 expected=32 sockfd=26 local=10.141.39.211:1981
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2102) ~[postgresql-9.1-901.jdbc4.jar:?]
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1835) ~[postgresql-9.1-901.jdbc4.jar:?]
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:257) ~[postgresql-9.1-901.jdbc4.jar:?]
        at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:500) ~[postgresql-9.1-901.jdbc4.jar:?]
        at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:374) ~[postgresql-9.1-901.jdbc4.jar:?]
        at org.postgresql.jdbc2.AbstractJdbc2Statement.executeUpdate(AbstractJdbc2Statement.java:302) ~[postgresql-9.1-901.jdbc4.jar:?]
        at cn.hashdata.bireme.ChangeLoader.createTemporaryTable(ChangeLoader.java:500) ~[bireme-1.0.0.jar:?]
        ... 9 more

您好,在BIREME执行过程中发现报出该错误,定位到是
while (iterator.hasNext() && !cxt.stop) {
data = iterator.next().getBytes("UTF-8");
pipeOut.write(data);
}
在getBytes时出现的问题。
请您协助定位一下原因,谢谢!

maxwell和bireme处理时间问题

当库表较多和kafka消息队列中 消息数 过于庞大时, 当maxwell.properties配置同步一个表时, bireme入库时间会很长.

table.keyNames是主键,old中不一定包含主键值, 致foramtColumns抛出异常

default

修改示例:
{"database":"test","table":"test","type":"update","ts":1521807645,"xid":10638,"commit":true,"data":{"id":3,"name":"2额","val":4312},"old":{"val":4212}}

主键: table.keyNames = new String {"id"};
从修改中的old获取主键对应的值? 可是只有主键发生改变old才会有主键值啊.

 if (Objects.equals(row.type, RowType.UPDATE)) {
            row.oldKeys = formatColumns(record, table, table.keyNames, true);

            if (row.keys.equals(row.oldKeys)) {
                row.oldKeys = null;
            }
        }


   

出错行: formatColumns(record, table, table.keyNames, true) > field = BiremeUtility.jsonGetIgnoreCase(old, fieldName);

同步库表过多导致无法进行数据同步

同步涉及10多个库, 总计200多张表时, 无法进行数据同步.

请问有什么好的处理方式.

是否可以通过增加 bireme 启动实例来解决此问题呢?
2.0版本什么时候发呢?

当同步的表个数过多时,数据不能同步.

这些日志完全看不懂

[admin@bireme ~]$ tail -f app/bireme/logs/bireme.err 
18:04:56,062 DEBUG QueuedThreadPool - queue acceptor-0@73163d48
18:04:56,063 INFO  AbstractConnector - Started ServerConnector@1e6a3214{HTTP/1.1,[http/1.1]}{10.62.2.8:9090}
18:04:56,063 DEBUG AbstractLifeCycle - STARTED @9794ms ServerConnector@1e6a3214{HTTP/1.1,[http/1.1]}{10.62.2.8:9090}
18:04:56,063 INFO  Server - Started @9794ms
18:04:56,063 DEBUG AbstractLifeCycle - STARTED @9794ms org.eclipse.jetty.server.Server@49049a04
18:04:56,064 DEBUG QueuedThreadPool - run acceptor-0@73163d48
18:07:47,905 TRACE ConsumerCoordinator - Sending OffsetCommit request with {maxwell_topic-0=OffsetAndMetadata{offset=28341, metadata=''}} to coordinator localhost:9091 (id: 2147483647 rack: null) for group bireme
18:07:47,906 TRACE NetworkClient - Sending OFFSET_COMMIT {group_id=bireme,group_generation_id=-1,member_id=,retention_time=-1,topics=[{topic=maxwell_topic,partitions=[{partition=0,offset=28341,metadata=}]}]} to node 2147483647.
18:07:47,908 TRACE NetworkClient - Completed receive from node 2147483647, for key 8, received {throttle_time_ms=0,responses=[{topic=maxwell_topic,partition_responses=[{partition=0,error_code=0}]}]}
18:07:47,909 DEBUG ConsumerCoordinator - Group bireme committed offset 28341 for partition maxwell_topic-0


[admin@bireme ~]$ tail -f app/bireme/logs/bireme.out 
18:04:54 INFO  ChangeLoader - Loader Start, corresponding table vm_report.report_record_attr.
18:04:54 INFO  ChangeLoader - Loader Start, corresponding table vm_campaign.campaign_instance_attr.
18:04:54 INFO  ChangeLoader - Loader Start, corresponding table vm_cs.service_order_category.
18:04:54 INFO  ChangeLoader - Loader Start, corresponding table vm_auth.privilege_resource.
18:04:54 INFO  ChangeLoader - Loader Start, corresponding table vm_cs.device_sales_registration.
18:04:54 INFO  ChangeLoader - Loader Start, corresponding table vm_layout.share_info.
18:04:54 INFO  TaskGenerator - TaskGenerator Start.
18:04:54 INFO  ChangeLoader - Loader Start, corresponding table vm_campaign.redpack_record.
18:04:54 INFO  Dispatcher - Dispatcher Start.
18:04:54 INFO  Provider maxwell1 - Provider maxwell1 Start.

===============================================

[admin@bireme ~]$ tail -f app/bireme/logs/bireme.err 
18:14:17,050 INFO  ContextHandler - Stopped o.e.j.s.h.ContextHandler@3401a114{/vm_store.mijia_order_sku,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - STOPPED o.e.j.s.h.ContextHandler@3401a114{/vm_store.mijia_order_sku,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - stopping o.e.j.s.h.ContextHandler@655ef322{/vm_campaign.campaign_present,null,SHUTDOWN}
18:14:17,050 DEBUG AbstractHandler - stopping o.e.j.s.h.ContextHandler@655ef322{/vm_campaign.campaign_present,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - stopping cn.hashdata.bireme.StateServer$StateHandler@7e276594
18:14:17,050 DEBUG AbstractHandler - stopping cn.hashdata.bireme.StateServer$StateHandler@7e276594
18:14:17,050 DEBUG AbstractLifeCycle - STOPPED cn.hashdata.bireme.StateServer$StateHandler@7e276594
18:14:17,050 INFO  ContextHandler - Stopped o.e.j.s.h.ContextHandler@655ef322{/vm_campaign.campaign_present,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - STOPPED o.e.j.s.h.ContextHandler@655ef322{/vm_campaign.campaign_present,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - stopping o.e.j.s.h.ContextHandler@73173f63{/vm_channel.channel_delivery_info,null,SHUTDOWN}
18:14:17,050 DEBUG AbstractHandler - stopping o.e.j.s.h.ContextHandler@73173f63{/vm_channel.channel_delivery_info,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - stopping cn.hashdata.bireme.StateServer$StateHandler@55562aa9
18:14:17,050 DEBUG AbstractHandler - stopping cn.hashdata.bireme.StateServer$StateHandler@55562aa9
18:14:17,050 DEBUG AbstractLifeCycle - STOPPED cn.hashdata.bireme.StateServer$StateHandler@55562aa9
18:14:17,050 INFO  ContextHandler - Stopped o.e.j.s.h.ContextHandler@73173f63{/vm_channel.channel_delivery_info,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - STOPPED o.e.j.s.h.ContextHandler@73173f63{/vm_channel.channel_delivery_info,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - stopping o.e.j.s.h.ContextHandler@25e2ab5a{/vm_wares.unilife_prod_info_attr,null,SHUTDOWN}
18:14:17,050 DEBUG AbstractHandler - stopping o.e.j.s.h.ContextHandler@25e2ab5a{/vm_wares.unilife_prod_info_attr,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - stopping cn.hashdata.bireme.StateServer$StateHandler@35e5d0e5
18:14:17,050 DEBUG AbstractHandler - stopping cn.hashdata.bireme.StateServer$StateHandler@35e5d0e5
18:14:17,050 DEBUG AbstractLifeCycle - STOPPED cn.hashdata.bireme.StateServer$StateHandler@35e5d0e5
18:14:17,050 INFO  ContextHandler - Stopped o.e.j.s.h.ContextHandler@25e2ab5a{/vm_wares.unilife_prod_info_attr,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - STOPPED o.e.j.s.h.ContextHandler@25e2ab5a{/vm_wares.unilife_prod_info_attr,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - stopping o.e.j.s.h.ContextHandler@3d9f6567{/vm_channel.channel_purchase_after_service_info,null,SHUTDOWN}
18:14:17,050 DEBUG AbstractHandler - stopping o.e.j.s.h.ContextHandler@3d9f6567{/vm_channel.channel_purchase_after_service_info,null,UNAVAILABLE}
18:14:17,050 DEBUG AbstractLifeCycle - stopping cn.hashdata.bireme.StateServer$StateHandler@c055c54
18:14:17,050 DEBUG AbstractHandler - stopping cn.hashdata.bireme.StateServer$StateHandler@c055c54
18:14:17,050 DEBUG AbstractLifeCycle - STOPPED cn.hashdata.bireme.StateServer$StateHandler@c055c54

```
```
[admin@bireme ~]$ tail -f app/bireme/logs/bireme.err 
......................................
...............................
lue=199 bytes)), (record=DefaultRecord(offset=82670, timestamp=1520391951184, key=72 bytes, value=169 bytes)), (record=DefaultRecord(offset=82671, timestamp=1520391951184, key=72 bytes, value=186 bytes)), (record=DefaultRecord(offset=82672, timestamp=1520391951184, key=72 bytes, value=199 bytes)), (record=DefaultRecord(offset=82673, timestamp=1520391951184, key=72 bytes, value=186 bytes)), (record=DefaultRecord(offset=82674, timestamp=1520391951184, key=72 bytes, value=169 bytes)), (record=DefaultRecord(offset=82675, timestamp=1520391951184, key=72 bytes, value=169 bytes))]}]}]}
18:14:28,424 DEBUG Fetcher - Fetch READ_UNCOMMITTED at offset 78640 for partition maxwell_topic-0 returned fetch data (error=NONE, highWaterMark=572082, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=1048576)
18:14:28,424 TRACE Fetcher - Preparing to read 1048576 bytes of data for partition maxwell_topic-0 with offset 78640
18:14:28,424 TRACE Fetcher - Updating high watermark for partition maxwell_topic-0 to 572082
18:14:28,425 TRACE Fetcher - Returning fetched records at offset 78640 for assigned partition maxwell_topic-0 and update position to 79140
18:14:28,425 TRACE Fetcher - Returning fetched records at offset 79140 for assigned partition maxwell_topic-0 and update position to 79640
18:14:28,426 TRACE Fetcher - Returning fetched records at offset 79640 for assigned partition maxwell_topic-0 and update position to 80140
```

```
[admin@bireme ~]$ tail -f app/bireme/logs/bireme.out
18:15:46 INFO  ChangeLoader - Chang to passimistic mode.
18:16:16 INFO  ChangeLoader - Chang to passimistic mode.
18:16:31 INFO  ChangeLoader - Chang to passimistic mode.
18:16:37 INFO  ChangeLoader - Chang to passimistic mode.
18:16:46 INFO  ChangeLoader - Chang to passimistic mode.
18:16:53 INFO  ChangeLoader - Chang to passimistic mode.
18:16:55 INFO  ChangeLoader - Chang to passimistic mode.
18:17:20 INFO  ChangeLoader - Chang to passimistic mode.
18:17:51 INFO  ChangeLoader - Chang to passimistic mode.
18:17:52 INFO  ChangeLoader - Chang to passimistic mode.
18:19:23 INFO  ChangeLoader - Chang to optimistic mode.
```

我想在bireme入口时,更改json的格式。

由于我使用OGG to kafka,数据格式和bireme所接收格式不太符合
所以想在kafka接收后 更改其数据格式
想在获取到record时 更改下 数据结构,我模拟用maxwell的格式,
我只更改了MaxwellProvider.java
在解析数据之前,先更改下数据格式,但是我更改编译完,运行启动,却发现他不会copy数据到GP中,
不知道哪里还需要更改?
我用的版本是bireme-1.0.0

代码如下
`public class MaxwellChangeTransformer extends KafkaTransformer {
public class MaxwellRecord implements Record {
public String dataSource;
public String database;
public String table;
public Long produceTime;
public RowType type;
public JsonObject data;
public JsonObject old;

  public MaxwellRecord(String changeValue) {
    JsonParser jsonParser = new JsonParser();
    JsonObject value = (JsonObject) jsonParser.parse(changeValue);
    
    //先转换为bireme所需格式
	value=changeJsonForOracle(jsonParser, value);

    
    this.dataSource = getProviderName();
    this.database = value.get("database").getAsString();
    this.table = value.get("table").getAsString();
    this.produceTime = value.get("ts").getAsLong();
    this.data = value.get("data").getAsJsonObject();

    if (value.has("old") && !value.get("old").isJsonNull()) {
      this.old = value.get("old").getAsJsonObject();
    }

    switch (value.get("type").getAsString()) {
      case "insert":
        type = RowType.INSERT;
        break;

      case "update":
        type = RowType.UPDATE;
        break;

      case "delete":
        type = RowType.DELETE;
        break;
    }
  }

/**
 * 增加转换oracle日志为bireme所需日志方法
 * @param jsonParser
 * @param value
 */
private JsonObject changeJsonForOracle(JsonParser jsonParser, JsonObject value) {
	// 解析出database名称并存入json中
	value.addProperty("database",
			value.get("table").getAsString().split("\\.")[0]);
	// 解析出table名称并存入json中
	value.addProperty("table", value.get("table").getAsString()
			.split("\\.")[1]);
	// 解析出type并存入json中
	String typeStr = "";
	switch (value.get("op_type").getAsString()) {
	case "I":
		typeStr = "insert";
		break;

	case "U":
		typeStr = "update";
		break;

	case "D":
		typeStr = "delete";
		break;
	}
	value.addProperty("type", typeStr);
	// 删除op_type字段
	value.remove("op_type");
	//如果是update操作,则解析after和before
	if (typeStr.equals("update")) {
		JsonObject valueBefore = (JsonObject) jsonParser.parse(value.get(
				"before").toString());
		JsonObject valueAfter = (JsonObject) jsonParser.parse(value.get(
				"after").toString());
		JsonObject old = new JsonObject();
		for (Map.Entry<String, JsonElement> entry : valueBefore.entrySet()) {
			if (!valueBefore.get(entry.getKey()).equals(
					valueAfter.get(entry.getKey()))) {
				old.add(entry.getKey(), valueBefore.get(entry.getKey()));
			}
		}
		value.add("old", old);
		value.remove("before");
		value.add("data", valueAfter);
		value.remove("after");
	}else{
		//如果不是update,则只把after字段换成data字段
		value.add("data", value.get("after"));
		value.remove("after");
	}
	//转换ts为long值
	SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
    Date date=null;
	try {
		date = (Date) sdf.parse(value.get("current_ts").getAsString());
	} catch (ParseException e) {
		e.printStackTrace();
	}
    value.addProperty("ts", date.getTime()/1000);
    value.remove("current_ts");
    return value;
}

`

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.