Git Product home page Git Product logo

flink-clickhouse-sink's Introduction

Flink-ClickHouse-Sink

Build Status Maven Central

Description

Flink sink for ClickHouse database. Powered by Async Http Client.

High-performance library for loading data to ClickHouse.

It has two triggers for loading data: by timeout and by buffer size.

Version map
flink flink-clickhouse-sink
1.3.* 1.0.0
1.9.* 1.3.1

Install

Maven Central
<dependency>
  <groupId>ru.ivi.opensource</groupId>
  <artifactId>flink-clickhouse-sink</artifactId>
  <version>1.3.1</version>
</dependency>

Usage

Properties

The flink-clickhouse-sink uses two parts of configuration properties: common and for each sink in you operators chain.

The common part (use like global):

clickhouse.sink.num-writers - number of writers, which build and send requests,

clickhouse.sink.queue-max-capacity - max capacity (batches) of blank's queue,

clickhouse.sink.timeout-sec - timeout for loading data,

clickhouse.sink.retries - max number of retries,

clickhouse.sink.failed-records-path- path for failed records,

clickhouse.sink.ignoring-clickhouse-sending-exception-enabled - required boolean parameter responsible for raising (false) or not (true) ClickHouse sending exception in main thread. if ignoring-clickhouse-sending-exception-enabled is true, exception while clickhouse sending is ignored and failed data automatically goes to the disk. if ignoring-clickhouse-sending-exception-enabled is false, clickhouse sending exception thrown in "main" thread (thread which called ClickhHouseSink::invoke) and data also goes to the disk.

The sink part (use in chain):

clickhouse.sink.target-table - target table in ClickHouse,

clickhouse.sink.max-buffer-size- buffer size.

In code

The main thing: the clickhouse-sink works with events in string (ClickHouse insert format, like CSV) format. You have to convert your event to csv format (like usual insert in database).

For example, you have event-pojo:

class A {
   public final String str;
   public final int integer;
   
   public A(String str, int i){
       this.str = str;
       this.integer = i;
   }
}

You have to convert this pojo like this:

public static String convertToCsv(A a) {
    StringBuilder builder = new StringBuilder();
    builder.append("(");
    
    // add a.str
    builder.append("'");
    builder.append(a.str);
    builder.append("', ");
    
    // add a.intger
    builder.append(String.valueOf(a.integer));
    builder.append(" )");
    return builder.toString();
}

And then add record to sink.

You have to add global parameters for Flink environment:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
Map<String, String> globalParameters = new HashMap<>();

// ClickHouse cluster properties
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, ...);

// sink common
globalParameters.put(ClickHouseSinkConsts.TIMEOUT_SEC, ...);
globalParameters.put(ClickHouseSinkConsts.FAILED_RECORDS_PATH, ...);
globalParameters.put(ClickHouseSinkConsts.NUM_WRITERS, ...);
globalParameters.put(ClickHouseSinkConsts.NUM_RETRIES, ...);
globalParameters.put(ClickHouseSinkConsts.QUEUE_MAX_CAPACITY, ...);
globalParameters.put(ClickHouseSinkConsts.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, ...);

// set global paramaters
ParameterTool parameters = ParameterTool.fromMap(buildGlobalParameters(config));
environment.getConfig().setGlobalJobParameters(parameters);

And add your sink like this:

// create converter
public YourEventConverter {
    String toClickHouseInsertFormat (YourEvent yourEvent){
        String chFormat = ...;
        ....
        return chFormat;
    }
}

// create props for sink
Properties props = new Properties();
props.put(ClickHouseSinkConsts.TARGET_TABLE_NAME, "your_table");
props.put(ClickHouseSinkConsts.MAX_BUFFER_SIZE, "10000");

// build chain
DataStream<YourEvent> dataStream = ...;
dataStream.map(YourEventConverter::toClickHouseInsertFormat)
          .name("convert YourEvent to ClickHouse table format")
          .addSink(new ClickHouseSink(props))
          .name("your_table ClickHouse sink);

Roadmap

  • reading files from "failed-records-path"
  • migrate to gradle

flink-clickhouse-sink's People

Contributors

mchernyakov avatar ashulenko avatar aleksanchezz avatar dependabot[bot] avatar eksd avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.