Git Product home page Git Product logo

kafka_fdw's Introduction

Kafka Foreign Data Wrapper for PostgreSQL

build

At this point the project is not yet production ready. Use with care. Pull requests welcome

A simple foreign data wrapper for Kafka which allows it to be treated as a table.

Currently kafka_fdw allows message parsing in csv and json format. More might come in a future release.

Build

The FDW uses the librdkafka C client library. https://github.com/edenhill/librdkafka to build against installed librdkafka and postgres run make && make install

to run test

make installcheck

not this runs an integration test against an asumed running kafka on localhost:9092 with zookeeper on localhost:2181 see test/init_kafka.sh

Usage

CREATE SERVER must specify a brokerlist using option brokers

CREATE SERVER kafka_server
FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');

CREATE USER MAPPING

CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;

CREATE FOREIGN TABLE must specify the two meta columns for partition and offset. These can be named abritrary just must be specified wich is what using options. Note offset is a sql reserved keyword so naming a column offset needs quotation when used. The remaining columns must match the expected csv message format. For more usage options see test/expected

CREATE FOREIGN TABLE kafka_test (
    part int OPTIONS (partition 'true'),
    offs bigint OPTIONS (offset 'true'),
    some_int int,
    some_text text,
    some_date date,
    some_time timestamp
)
SERVER kafka_server OPTIONS
    (format 'csv', topic 'contrib_regress', batch_size '30', buffer_delay '100');

The offset and partition columns are special. Due to the way Kafka works, we should specify these on all queries.

Notes on Supported Formats

CSV

CSV, like a PostgreSQL relation, represents data as a series of tuples. In this respect the mapping is fairly straight forward. We use position to map to columns. What CSV lacks' however is any sort of schema enforcement between rows, to ensure that all values of a particular column have the same data types, and other schema checks we expect from a relational database. For this reason, it is important to ask how much one trusts the schema enforcement of the writers. If the schema enforcement is trusted then you can assume that bad data should throw an error. But if it is not, then the error handling options documented here should be used to enforce schema on read and skip but flag malformed rows.

On one side you can use strict 'true' if the format will never change and you fully trust the writer to properly enforce schemas. If you trust the writer to always be correct and allow new columns to be added on to the end, however, you should leave this setting off.

If you do not trust the writer and wish to enforce schema on read only, then set a column with the option junk 'true'and another with the optionjunk_error 'true'`.

JSON

JSON has many of the same schema validation issues that CSV does but there are tools and standards to validate and check JSON documents against schema specifications. Thus the same error handling recommendations that apply to CSV above apply here.

Mapping JSON fields to the relation fields is somewhat less straight forward than it with CSV. JSON objects represent key/value mappings in an arbitrary order. For JSON we apply a mapping of the tupple attribute name to the JSON object key name. For JSON tables one uses the json option to specify the json property mapped to.

The example in our test script is:

CREATE FOREIGN TABLE kafka_test_json (
    part int OPTIONS (partition 'true'),
    offs bigint OPTIONS (offset 'true'),
    some_int int OPTIONS (json 'int_val'),
    some_text text OPTIONS (json 'text_val'),
    some_date date OPTIONS (json 'date_val'),
    some_time timestamp OPTIONS (json 'time_val')
)

SERVER kafka_server OPTIONS
    (format 'json', topic 'contrib_regress_json', batch_size '30', buffer_delay '100');

Here you can see that a message on partition 2, with an offset of 53 containing the document:

{
   "text_val": "Some arbitrary text, apparently",
   "date_val": "2011-05-04",
   "int_val": 3,
   "time_val": "2011-04-14 22:22:22"
}

would be turned into

(2, 13, 3, "Some text, apparently", 2011-05-04, "2011-04-14 22:22:22")

as a row in the above table.

Currently the Kafka FDW does not support series of JSON arrays, only JSON objects. JSON arrays in objects can be presented as text or JSON/JSONB fields, however.

Querying

With the defined meta columns you can query like so:

SELECT * FROM kafka_test WHERE part = 0 AND offs > 1000 LIMIT 60;

Here offs is the offset column. And defaults to offset beginning. Without any partition specified all partitions will be scanned.

Querying across partitions could be done as well.

SELECT * FROM kafka_test WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300) OR (part = 3 AND offs > 700)

Error handling

The default for consuming kafka data is not very strict i.e. to less columns will be assumed be NULL and to many will be ignored. If you don't like this behaviour you can enable strictness via table options strict 'true'. Thus any such column will error out the query. However invalid or unparsable data e.g. text for numeric data or invalid date or such will still error out per default. To ignore such data you can pass ignore_junk 'true' as table options and these columns will be set to NULL. Alternatively you can add table columns with the attributes junk 'true' and / or junk_error 'true'. While fetching data kafka_fdw will then put the whole payload into the junk column and / or the errormessage(s) into the junk_error column. see test/sql/junk_test.sql for a usage example.

Producing

Inserting Data into kafka works with INSERT statements. If you provide the partition as a values that will be user otherwise kafkas builtin partitioner will select partition.

add partition as a value

INSERT INTO kafka_test(part, some_int, some_text)
VALUES
    (0, 5464565, 'some text goes into partition 0'),
    (1, 5464565, 'some text goes into partition 1'),
    (0, 5464565, 'some text goes into partition 0'),
    (3, 5464565, 'some text goes into partition 3'),
    (NULL, 5464565, 'some text goes into partition selected by kafka');

use built in partitioner

INSERT INTO kafka_test(some_int, some_text)
VALUES
    (5464565, 'some text goes into partition selected by kafka');

Testing

is currently broken I can't manage to have a proper repeatable topic setup

Development

Although it works when used properly we need way more error handling. Basically more test are needed for inapproiate usage like no topic specified, topic doesn't exist, no partition and offsetcolumn defined wrong format specification and stuff that might come.

Future

The idea is to make the FDW more flexible in usage

  • specify other formats like protobuf or binary

  • specify encoding

  • optimize performance with check_selective_binary_conversion i.e. WHEN just a single column is projected like SELECT one_coll FROM forein_table WHERE ... we won't need to take the effort to convert all columns

  • better cost and row estmate

  • some analyze options would be nice

  • parallelism with multiple partitions we could theoretically consum them in parallel ....

kafka_fdw's People

Contributors

dmolik avatar einhverfr avatar marwahaha avatar rapimo avatar rekgrpth avatar za-arthur avatar zilder avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka_fdw's Issues

Reading composite types from Kafka is broken

CREATE TYPE abc AS (a INT, b TEXT, c TIMESTAMP);

CREATE FOREIGN TABLE kafka_json (
	part int OPTIONS (partition 'true'),
	offs bigint OPTIONS (offset 'true'),
	x abc)
SERVER kafka_server
OPTIONS (format 'json', topic 'json', batch_size '30', buffer_delay '100');

INSERT INTO kafka_json (x) VALUES ((1, 'test', current_timestamp));

SELECT * FROM kafka_json;
ERROR:  malformed record literal: "{"a":1,"b":"test","c":"2018-07-04T11:16:55.671986"}"
DETAIL:  Missing left parenthesis.

Postgres expects an input string formatted like:

(1,"test","2018-07-04T11:16:55.671986")

for composite type. The easy solution would be to just remove keys from input string and replace {} with (). But this won't work if JSON document has different key order or extra or missing keys. So it makes sense to write a simple JSON parser to collect individual key-value pairs, reorder them if needed and fill the gaps with NULLs. It's also possible to use built-in JSONB facilities from postgres to parse and manage JSON documents but I anticipate that it would be less efficient than custom designed solution.

Cases not handled well

In my input handling branch I have been able to cause certain misbehaviours on bad inputs into Kafka.

So far there are no segmentation faults or similar errors, but these could still cause serious issues reading the Kafka data. The major difficulties have to do with handling bad representations of expected data types.

So for example if there is an integer field in the foreign table based on csv and I write a record to Kafka with that field set to a value of "foo" or "100000000000" I will get an error such as "Integer out of range" or "Invalid representation of integer."

This means that we have an important case we have to address, which is what happens when the data is actually unexpected or bad in Kafka. Do we skip over those records? Do we ignore the problem and assume we fix it on the Kafka side? Do we do something else?

For test cases you can insert into the Kafka queue CSV examples like:

100000000000,100000000000,1000000000,1000000000
"Test", 123, 122.22
"Test, the game", 1222, 122,22
122,22
Foo,Bar,Baz,123
{"foo" 123, "Bar": 134}

Failed to stop consuming partition 706: Local: Unknown partition

I sometimes see the error
Failed to stop consuming partition 706: Local: Unknown partition
raised on Stop here
https://github.com/adjust/kafka_fdw/blob/master/src/kafka_fdw.c#L1026

however there is no such partition and this one should have never been consumed.
My thought is that the error was introduce on parallel support with using shared memory and happens if two concurrent scans mess up things

No connection timeout

If there is no connection to the kafka broker an attempt to establish such a connection hangs indefinitely and doesn't check for interrupts.

Unfortunately I don't have a full backtrace, but here's librdkafka part of it:

#0  0x00007f127898d96a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f1279e61e59 in cnd_timedwait (cond=cond@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, ts=ts@entry=0x7ffe3d7ca300) at tinycthread.c:462
#2  0x00007f1279e62273 in cnd_timedwait_abs (cnd=cnd@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, tspec=tspec@entry=0x7ffe3d7ca300) at tinycthread_extra.c:100
#3  0x00007f1279e2bdff in rd_kafka_q_serve (rkq=rkq@entry=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, 
    callback=callback@entry=0x7f1279df67f0 <rd_kafka_consume_cb>, opaque=opaque@entry=0x7ffe3d7ca410) at rdkafka_queue.h:475
#4  0x00007f1279df6f51 in rd_kafka_consume_callback0 (rkq=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=<optimized out>, consume_cb=<optimized out>, opaque=<optimized out>)
    at rdkafka.c:2617

Compilation failure with postgresql-13

rm -f kafka_fdw.so   libkafka_fdw.a  libkafka_fdw.pc
rm -f src/connection.o src/kafka_fdw.o src/kafka_expr.o src/option.o src/parser.o src/planning.o src/utils.o src/connection.bc src/kafka_fdw.bc src/kafka_expr.bc src/option.bc src/parser.bc src/planning.bc src/utils.bc
rm -rf kafka_fdw--0.0.2.sql
rm -rf results/ regression.diffs regression.out tmp_check/ tmp_check_iso/ log/ output_iso/
x86_64-pc-linux-gnu-gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -pipe -ggdb -march=haswell -fPIC -std=c99 -Wall -Wextra -Wno-unused-parameter -I. -I./ -I/usr/include/postgresql-13/server -I/usr/include/postgresql-13/internal  -D_GNU_SOURCE   -c -o src/connection.o src/connection.c
x86_64-pc-linux-gnu-gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -pipe -ggdb -march=haswell -fPIC -std=c99 -Wall -Wextra -Wno-unused-parameter -I. -I./ -I/usr/include/postgresql-13/server -I/usr/include/postgresql-13/internal  -D_GNU_SOURCE   -c -o src/kafka_fdw.o src/kafka_fdw.c
STDERR:
Makefile:57: warning: overriding recipe for target 'installcheck'
/usr/lib64/postgresql-13/lib64/pgxs/src/makefiles/pgxs.mk:420: warning: ignoring old recipe for target 'installcheck'
Makefile:57: warning: overriding recipe for target 'installcheck'
/usr/lib64/postgresql-13/lib64/pgxs/src/makefiles/pgxs.mk:420: warning: ignoring old recipe for target 'installcheck'
src/kafka_fdw.c: In function �kafkaPlanForeignModify�:
src/kafka_fdw.c:1187:15: warning: implicit declaration of function �heap_open� [-Wimplicit-function-declaration]
 1187 |     rel     = heap_open(rte->relid, NoLock);
      |               ^~~~~~~~~
src/kafka_fdw.c:1187:13: warning: assignment to �Relation� {aka �struct RelationData *�} from �int� makes pointer from integer without a cast [-Wint-conversion]
 1187 |     rel     = heap_open(rte->relid, NoLock);
      |             ^
src/kafka_fdw.c:1207:5: warning: implicit declaration of function �heap_close� [-Wimplicit-function-declaration]
 1207 |     heap_close(rel, NoLock);
      |     ^~~~~~~~~~
src/kafka_fdw.c: In function �kafkaBeginForeignModify�:
src/kafka_fdw.c:1297:28: warning: passing argument 1 of �lnext� from incompatible pointer type [-Wincompatible-pointer-types]
 1297 |         next       = lnext(lc);
      |                            ^~
      |                            |
      |                            ListCell * {aka union ListCell *}
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
                 from /usr/include/postgresql-13/server/access/htup_details.h:19,
                 from src/kafka_fdw.h:12,
                 from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:321:19: note: expected �const List *� {aka �const struct List *�} but argument is of type �ListCell *� {aka �union ListCell *�}
  321 | lnext(const List *l, const ListCell *c)
      |       ~~~~~~~~~~~~^
src/kafka_fdw.c:1297:22: error: too few arguments to function �lnext�
 1297 |         next       = lnext(lc);
      |                      ^~~~~
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
                 from /usr/include/postgresql-13/server/access/htup_details.h:19,
                 from src/kafka_fdw.h:12,
                 from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:321:1: note: declared here
  321 | lnext(const List *l, const ListCell *c)
      | ^~~~~
src/kafka_fdw.c:1301:35: error: too many arguments to function �list_delete_cell�
 1301 |             festate->attnumlist = list_delete_cell(festate->attnumlist, lc, prev);
      |                                   ^~~~~~~~~~~~~~~~
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
                 from /usr/include/postgresql-13/server/access/htup_details.h:19,
                 from src/kafka_fdw.h:12,
                 from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:538:14: note: declared here
  538 | extern List *list_delete_cell(List *list, ListCell *cell);
      |              ^~~~~~~~~~~~~~~~
make: *** [<builtin>: src/kafka_fdw.o] Error 1 

add get_watermark helper function

we already query some watermark infos on scanning here

err = rd_kafka_query_watermark_offsets(

What would be great is having some helper function to present this data to the end user

e.g.

SELECT  * FROM kafka_get_watermarks('table_name');
 partition | offset_low | offsset_high
-----------+------------+--------------
         1 |       1001 |         3001
         2 |       1002 |         3002
         3 |       1003 |         3003
         4 |       1004 |         3004
         5 |       1005 |         3005
         6 |       1006 |         3006
         7 |       1007 |         3007
         8 |       1008 |         3008
         9 |       1009 |         3009
        10 |       1010 |         3010
        11 |       1011 |         3011
        12 |       1012 |         3012
(12 rows)

hello, can i update the foreign table?

i try with
delete from test_kafka where id=102; update test_kafka SET modified_date='now()' WHERE restaurant_id=12153;
and i get the error

ERROR: cannot update foreign table "foregin_table"
ERROR: cannot delete foreign table "foregin_table"

and I get the another error when I call transaction rollback;
ERROR: 42P01: relation "kafka.broker" does not exist

thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.