Git Product home page Git Product logo

yc-data-streaming-flink-with-yds's Introduction

Использование Apache Flink c топиками YDB

В этом решении показана возможность использования сервисов Yandex Managed Service for YDB и Yandex Data Streams, как замены Apache Kafka при работе с Flink SQL stream processing. В качестве задачи взят простейший пример фильтрации платежей с суммой более 600000.

Для этого будут созданы

  • Инфраструктура для примера: сеть, подсети, группы доступа и сервисный аккаунт.
  • Бессерверная СУБД YDB и два топика в ней.
  • VM, на которой будет развернут Apache Flink.

Рисунок 1

В решении используется Flink SQL, во многом совместимый со стандартом ANSI SQL. Это один из вариантов использования Apache Flink, в котором можно обойтись без создания процедур обработки данных, а только декларативно описать источник, приемник и правило обработки потока данных.

Подготовка инфраструктуры

Для подготовки инфраструктуры используется Terraform configuration. При необходимости ознакомиться с работой Terraform в Yandex Cloud можно здесь

Указать корректные настройки в terraform.tfvars для развертывания инфраструктуры с помощью Terraform

Получить IAM токен

export TF_VAR_yc_token=$(yc iam create-token)

Инициализировать провайдеров для работы с конфигурацией

cd tf
terraform init

Запустить процесс создания облачной инфраструктуры

terraform apply

Установить переменные среды для использования в kafkacat

export API_KEY=$(terraform output -raw api_key)  
export DATABASE_PATH=$(terraform output -raw database_path)

Здесь необходмо учесть, что после создания VM сloud-init agent ставит и конфигурирует Apache Flink. Это занимает некоторое время - до нескольких минут. К следующим шагам стоит переходить только после окончания процесса установки. Дождаться его можно, например, запустив следующий сценарий

until curl -s -f -o /dev/null "http://$(terraform output -raw instance_external_ip):8081"
do
  sleep 5
done
Если не удается дождаться его окончания, то имеет смысл проверить логи cloud-init в /var/log/cloud-init-output.log

Создание задания обработки данных

На этом этапе можно зайти браузером в WebUI. В дальнейшем таким образом можно контролировать работу SQL Streaming job

http://<instance_external_ip>:8081/#/overview

Подключиться к VM через ssh

ssh ubuntu@$(terraform output -raw instance_external_ip)

Запустить Flink SQL client

/opt/flink-1.19.0/bin/sql-client.sh

Создать Flink таблицы - здесь заменить плейсхолдеры <> на соответствующие значения, полученные из terraform output

create table PaymentTable (  
    id INT,  
    accdt INT,  
    acckt INT,  
    amount FLOAT,  
    ts TIMESTAMP(3) METADATA FROM 'timestamp'  
) with (  
    'connector' = 'kafka',  
    'topic' = 'bank/payment-in',  
    'value.format' = 'json',  
    'key.format' = 'raw',  
    'key.fields' = 'id',  
    'properties.group.id' = 'test-group',  
    'scan.startup.mode' = 'earliest-offset',  
    'properties.bootstrap.servers' = 'ydb-01.serverless.yandexcloud.net:9093',  
    'properties.acks' = 'all',  
    'properties.check.crcs' = 'false',  
    'properties.security.protocol' = 'SASL_SSL',  
    'properties.sasl.mechanism' = 'PLAIN',  
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="@<DATABASE_PATH>" password="<API_KEY>";'
);
create table FilteredPaymentTable (  
    id INT,  
    accdt INT,  
    acckt INT,  
    amount FLOAT,  
    ts TIMESTAMP(3) METADATA FROM 'timestamp'  
) with (  
    'connector' = 'kafka',  
    'topic' = 'bank/payment-out',  
    'value.format' = 'json',  
    'key.format' = 'json',  
    'key.fields' = 'id',  
    'properties.group.id' = 'test-group',  
    'scan.startup.mode' = 'earliest-offset',  
    'properties.bootstrap.servers' = 'ydb-01.serverless.yandexcloud.net:9093',  
    'properties.acks' = 'all',  
    'properties.security.protocol' = 'SASL_SSL',  
    'properties.sasl.mechanism' = 'PLAIN',  
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="@<DATABASE_PATH>" password="<API_KEY>";'
);

Запустить SQL job фильтрации

insert into FilteredPaymentTable select * from PaymentTable where amount > 600000;

Проверка работоспособности

Сымитировать поступление данных - добавить несколько записей в топик payment-in

echo '{"id":1,"accdt":1,"acckt":2,"amout":10.3}' | kcat -P \
    -b ydb-01.serverless.yandexcloud.net:9093 \
    -t bank/payment-in \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanism=PLAIN \
    -X sasl.username="@$DATABASE_PATH" \
    -X sasl.password="$API_KEY"
echo '{"id":3,"accdt":10,"acckt":11,"amount":600001.3}' | kcat -P \
    -b ydb-01.serverless.yandexcloud.net:9093 \
    -t bank/payment-in \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanism=PLAIN \
    -X sasl.username="@$DATABASE_PATH" \
    -X sasl.password="$API_KEY"

Проверить корректность работы решения. Для этого убедиться, что в исходящем топике payment-out появляются только записи с amount > 600000

kafkacat -C \
    -b ydb-01.serverless.yandexcloud.net:9093 \
    -t bank/payment-out \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanism=PLAIN \
    -X sasl.username="@$DATABASE_PATH" \
    -X sasl.password="$API_KEY"  -Z

Примерный вид созданного задания в Web UI Рисунок 2

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.