Git Product home page Git Product logo

practo / tipoca-stream Goto Github PK

View Code? Open in Web Editor NEW
47.0 26.0 5.0 16.65 MB

Near real time cloud native data pipeline in AWS (CDC+Sink). Hosts code for RedshiftSink. RDS to RedshiftSink Pipeline with masking and reloading support.

Home Page: https://towardsdatascience.com/open-sourcing-tipoca-stream-f261cdcc3a13

License: Apache License 2.0

Dockerfile 0.67% Makefile 2.36% Shell 4.31% Go 92.65%
cdc kafka redshift realtime data

tipoca-stream's Introduction

tipoca-stream

CI Status


A near realtime cloud native data pipeline using Kafka, KafkaConnect, and RedshiftSink in AWS. RedshiftSink is a high performance, low overhead data loader for Redshift, open-sourced by Practo. It comes with a rich data masking support so you can create a universal data access in your organization while preserving your customer's privacy!

Release blog.

Tipoca Stream is a successor to an internal non-realtime datawarehousing project called Tipoca, which itself derives its name from Tipoca City - home of the Clones in the Star Wars universe.

Install

The pipeline is a combination of services deployed independently. This repo holds the code for the redshiftsink only.

  • RedshiftSink Please follow REDSHIFTSINK.md to install the RedshiftSink Kubernetes Operator. Creating the RedshiftSink resource installs Batcher and Loader pods in the cluster. These pods sinks the data from Kafka topics to Redshift, it takes care of the database migration when required. Redshiftsink has a rich masking support. It also supports table reloads in Redshift when masking configurations are modified in Github.
      kubectl get redshiftsink
  • Kafka Install Kafka using Strimzi CRDs or self hosted or managed kafka.
      kubectl get kafka
  • Producer Install Producer using Strimzi CRDs and Debezium. Creating the kafkaconnect and kafkaconnector creates a kafkaconnect pod in the cluster which start streaming the data from the source(MYSQL, RDS, etc..) to Kafka.
      kubectl get kafkaconnect
      kubectl get kafkaconnector

The project has pluggable libraries which can be composed to solve any other data pipeline use case.

Contribute

Please follow this to bring a change.

Thanks

tipoca-stream's People

Contributors

alok87 avatar mayankdharwa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tipoca-stream's Issues

Kafka commit problem due to session closure

Loader consumer group takes a lot of time to load based on the size of the batch and based on how loaded the redshift cluster is.

What we have seen when the cluster is resource starved, 100+ topics are being loaded to redshift concurrently and the batch size being operated by the loader is big (lakhs of rows in one load). Then the Commit to Kafka does not happen and the commits keep getting reprocessed.

What does not work

  1. Big batch (msgBuf)
  2. Auto commit = false

What works

  1. Smaller batch (msgBuf)
  2. Auto commit = true (from false, default of loader)

The cause is still not known

        if len(msgBuf) > 0 {
		lastMessage := msgBuf[len(msgBuf)-1]
		b.session.MarkOffset(
			lastMessage.Topic,
			lastMessage.Partition,
			lastMessage.Offset+1,
			"",
		)

		if b.autoCommit == false {
			b.session.Commit()
			klog.V(2).Infof("%s, Committed (autoCommit=false)", lastMessage.Topic)
		}

		b.lastCommittedOffset = lastMessage.Offset

	} else {
		klog.Warningf("%s, markOffset not possible for empty batch", b.topic)
	}

The b.session.Commit() does not have any error to return even though it is synchronous call. So we are not really sure at present why this happens as both marking offset and commit happens but the commit does not.

We are upgrading the redshift cluster as a short term fix.

Long term: discuss and debug to find the root cause.

Graceful shutdown bug for the batcher

SIGTERM is not leading to pod shutdown.


I0218 06:30:54.409376       1 main.go:102] SIGTERM signal received
I0218 06:30:54.409400       1 main.go:120] Cancelling context to trigger graceful shutdown...
I0218 06:30:54.409418       1 main.go:126] Waiting the some routines to gracefully shutdown (some don't)
I0218 06:30:54.663943       1 consumer.go:118] ConsumeClaim ended for topic: XX partition: 0
W0218 06:30:54.669826       1 schemaregistry.go:153] Retrying. Error getting latest schema, topic:X not found.

Kafkaconnect does not auto recover on failures

Kubernetes is designed to restart stuffs if they fail, for them to recover. Though we need to look at why it failed but a failure which lies as failed is more of problem and k8s restart does help.

Strimzi's KafkaConnect does not implement any probes, here is a discussion. strimzi/strimzi-kafka-operator#3850

We should do something for k8s restart to take effect if it cannot be done from Strimzi. It is very much required

SQL "LONGTEXT" transformation to redshift datatype "VARCHAR(MAX)" is not happening

mysql schema

 `Info` longtext,

debezium schema for the same thing

{
     "name": "Info",
      "type": [
          "null",
          "string"
       ],
      "default": null
},

When this data is loaded in redshift it fails as it expects the data type to be large. VARCHAR(MAX) but it is getting VARCHAR(255) since debezium is not transforming longtext to long.

Stackoverflow: https://stackoverflow.com/questions/63880976/debezium-mysql-longtext-to-debezium-data-type-conversions-is-not-correct

Error and retry instead of Fatal, major requirement for loader

Every topic runs in its own go routine, so a Fatal in one routine impacts all the other go routines working on the data.
This is a very bad behaviour for the loader, as loader takes a lot of Redshift resources and issue in one of the loads of table impacts all the tables due to the shutdown due to the fatal. We should error and not fatal.

Fatal was for the development and stability phase and is no longer required. We should migrate to errors and retry particularly for the loader

Operators wrapping and managing the complete solution.

Build Kubernetes CRDs and wrap the complete solution around.

kubectl get datapipeline
kubectl get redshifsink

https://twitter.com/alsingh87/status/1301892718000394241?s=20

Current Requirements

  • Resource Management: Manage batcher and loader Kubernetes Deployment with all the configurations. CRD should wrap the batcher and loader deployment and manage sink by treating both as one sink group.
  • Controller vs Resource: Separate out the controller part of configurations for a clean CRD resource specification. Like Redshift conf moves out of Loader conf into controller.
  • [Mask Reloader] Parallel sink for mask configuration updates: Create two sink group, master and reloader, master always runs the current sink with current mask configuration. And reloader sink group sinks with a table suffix parallely to update the mask configuration in the tables.
  • [Mask Reloader] Switch Sink Group: When reloader offsets consumed becomes close to the master offset a switch should happen
  • Scaling batcher and loader: Need to scale memory of batcher based on number of entires being processed. When a lot of topics are present in batcher and all perform at high RPM then batcher runs out of memory. Need to solve this.
  • GRANTs Handle redshift grants.

Skip merge when possible

Merge needs to be performed only when the batch has a combination of update and delete operations. Otherwise skip merge and directly COPY to target table.

Performance improvement. First load are slow because of this not required merge.

Geometry source conversion handling

F0119 06:58:03.527475       1 batch_processor.go:326] Transforming schema:8 => inputTable failed: DebeziumType: record, SourceType: geometry, not handled

Need to handle conversion for geometry in mysql.

ALTER COLUMN is not supported in redshift for all cases

Stackoverflow 1

StackOverflow 2

Docs https://docs.aws.amazon.com/redshift/latest/dg/r_ALTER_TABLE_examples_basic.html

Find a way around this.

Works:

type changes

ALTER TABLE "datapipe.inventory"."customers" ALTER COLUMN first_name TYPE VARCHAR(300);

Does not work:

not null not supported

ALTER TABLE "datapipe.inventory"."customers" ALTER COLUMN first_name SET NOT NULL;
ERROR:  ALTER COLUMN SET NOT NULL is not supported

set and drop are not supported

ALTER TABLE "datapipe.inventory"."customers" ALTER COLUMN first_name SET DEFAULT "alok";
ERROR:  ALTER COLUMN SET/DROP DEFAULT is not supported

First schema creation un-necessarily retrys

Please do not retry in the first attempt, after 10 retry failures things proceed. Retry is not needed in first case.

W0120 12:59:39.182145       1 schemaregistry.go:153] Retrying. Error getting latest schema, topic:loader-06e960-ts.inventory.customers, err:404 Not Found: Subject 'loader-06e960-ts.inventory.customers-value' not found.

allowShuffle improvements

Device a better algorithm to prevent repeated shuffling of sink groups without impacting the rollout time. Rollout time gets impacted due to releases happening only at 30mins interval (default).

Consumer Group offsets got reset to -2(oldest) on its own.

  • OffsetOldest stands for the oldest offset available on the broker for a partition. (-2)

  • Initital: The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest. Loader has it configured as OffsetOldest

Initial Offset for many consumer group claims started showing as -2(OffsetOldest). i.e. no data found, nothing was ever committed for these topics. But last committed offset for the topic does show nos > 0 but not for the same consumer group topic.

A drop in topics count was seen after a date. This looks related to Kafka data loss issue, but investigation is required.
Could be related to rebalancing as described here. IBM/sarama#1181

Screenshot 2020-11-23 at 2 25 03 PM

ALTER COLUMN in a transaction block is a problem

Error running schema migration, err: cmd failed, cmd:
ALTER TABLE "oneness_stream"."kill_log_prod_oneness_acl" ALTER COLUMN kill_id TYPE VARCHAR(MAX)
err: pq: ALTER TABLE ALTER COLUMN cannot run inside a transaction block

Invalid timestamp bug

Guess is debezium timestamp date or datetime, one is still not handled as expected.

raw_line        | "XX","XX","XX","XX","2020-06-13 06:49:48","XX","","","2020-06-13 06:49:49","2020-06-13 06:50:05"
err_code        | 1206
err_reason      | Invalid timestamp format or value [YYYY-MM-DD HH24:MI:SS]
colname         | received_at

Support to run Grants

Support to run Grants on newly created tables for groups.

Helps the users of the database, with table replacements, grants changes get lost also should be done for the first time.

Fix table column order mismatch migration failures

F1223 05:47:10.258633       1 load_processor.go:561] Schema migration failed, err: 1 error occurred:
	* mismatch col: settlement_plan, prop: Name, input: settlement_plan, target: reach_plan_enabled

When column mismatch happens the migration is failing instead of applying the table reload. Big migrations like these should be handled by the operator. #37

Error in loads due to staging table not found / offset not commiting

Intermiitently seeing this

I0224 12:11:26.424636       1 redshift.go:805] Running: UNLOAD from table_staged to s3
F0224 12:11:27.279014       1 load_processor.go:418] Error dropping staging table: cmd failed, cmd:DROP TABLE "schema"."table_adx_reload
_staged";, err: pq: Table "table_staged" does not exist

Deduplication not happening when duplicate kafkaoffset

When two rows are present for the same Kafka offset then deduplication does not happen.

practowarehouse=# select * from "inventory"."duplicate_staged";
-[ RECORD 1 ]----------------------
kafkaoffset | 1
operation   | CREATE
id          | [email protected]
-[ RECORD 2 ]----------------------
kafkaoffset | 2
operation   | CREATE
id          | [email protected]
-[ RECORD 3 ]----------------------
kafkaoffset | 1
operation   | CREATE
id          | [email protected]
delete from "inventory"."duplicate_staged" where kafkaoffset in (
select t1.kafkaoffset from "inventory"."duplicate_staged" t1 join "inventory"."duplicate_staged" t2 on t1.id=t2.id where t1.kafkaoffset < t2.kafkaoffset);

The above query wont deduplicate the data, since duplication is there for same kafka offset.

Too many schema registry calls. Add cache

Failure due to too many parallel schema registry calls.

W0220 11:09:28.775278       1 schemaregistry.go:127] Retrying. Error fetching schema by id: 15 err:Get "https://schemareg.example.com/schemas/ids/15": context deadline exceeded (Client.Timeout exceeded while awaiting headers

Add cache.

Client already has cache. Please check that first.

Error sometimes in getting the mask info from the git repo

E0216 10:38:37.854692       1 controller.go:267] controller-runtime/manager/controller/redshiftsink
 "msg"="Reconciler error" "error"="Failed to reconcile: Error doing mask diff, err: Error copying! src: /tmp/maskdir041755531/rdsconfig/inventory.yml, dest: /maskdiff/inventory.yml, err:stat /tmp/maskdir041755531/rdsconfig/inventory.yml: no such file or directory\n"
 "name"="inventory" "namespace"="ts-redshiftsink-latest" 
"reconciler group"="tipoca.k8s.practo.dev" "reconciler kind"="RedshiftSink"

Batcher is masking all data and not reading config sometimes

This is only happening when 100s of topic are operated by the batcher at once. It is masking all data.

It is mostly due to mask config read not happening as exepected and the error is not crashing the batcher also.

Postgress shows/Redshift stl_load_errors: shows as this since all fields are masked.
err_reason | Invalid timestamp format or value [YYYY-MM-DD HH24:MI:SS]

Batcher first time memory problem

During the first time sink of a database topic in Redshift-sink, the batcher gets 25k+ requests per second. Batcher is a memory batch which keeps the messages in memory before flushing the messages to the batch processor.

This leads to very high memory requirement for the batcher in the first time sink.

Need to work on this problem and solve it and make batcher work at very high speed keeping the memory footprint low.

Broker not connected

Getting broker not connected at the time of release

I0218 16:09:13.335796       1 sink_group_controller.go:615] ts.XX.X: waiting to reach realtime
E0218 16:09:14.030355       1 controller.go:267] controller-runtime/manager/controller/redshiftsink "msg"="Reconciler error" "error"="Failed to reconcile: kafka: broker not connected" "name"="XX" "namespace"="ts-redshiftsink-latest" "reconciler group"="tipoca.k8s.practo.dev" "reconciler kind"="RedshiftSink"

Schema migration failed due to syntax

I1123 07:58:54.498683       1 redshift.go:466] Preparing: ALTER TABLE "splits" ALTER SORTKEY();
F1123 07:58:54.501599       1 load_processor.go:561] Schema migration failed, err: pq: syntax error at or near ")"
goroutine 181 [running]:
github.com/practo/klog/v2.stacks(0xc00000e001, 0xc000c460c0, 0x73, 0xb5)
        /src/vendor/github.com/practo/klog/v2/klog.go:1008 +0xb9
github.com/practo/klog/v2.(*loggingT).output(0x14526e0, 0xc000000003, 0x0, 0x0, 0xc000d8c150, 0x13eeb93, 0x11, 0x231, 0x0)
        /src/vendor/github.com/practo/klog/v2/klog.go:957 +0x191
github.com/practo/klog/v2.(*loggingT).printf(0x14526e0, 0xc000000003, 0x0, 0x0, 0xe4e808, 0x21, 0xc000d679d0, 0x1, 0x1)
        /src/vendor/github.com/practo/klog/v2/klog.go:743 +0x187
github.com/practo/klog/v2.Fatalf(...)

API CRD: error building openapi models

API server has errors for installing redshiftsink crd. It is due to the pod template generated CRD.

E0115 14:09:54.584777       1 customresource_handler.go:652] error building openapi models for redshiftsinks.tipoca.k8s.practo.dev: ERROR $root.definitions.dev.practo.k8s.tipoca.v1.RedshiftSink.properties.spec.properties.batcher.properties.podTemplate.properties.resources.properties.limits.additionalProperties.schema has invalid property: anyOf

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.