adjust / kafka_fdw Goto Github PK
View Code? Open in Web Editor NEWkafka foreign database wrapper for postresql
License: PostgreSQL License
kafka foreign database wrapper for postresql
License: PostgreSQL License
I sometimes see the error
Failed to stop consuming partition 706: Local: Unknown partition
raised on Stop here
https://github.com/adjust/kafka_fdw/blob/master/src/kafka_fdw.c#L1026
however there is no such partition and this one should have never been consumed.
My thought is that the error was introduce on parallel support with using shared memory and happens if two concurrent scans mess up things
CREATE TYPE abc AS (a INT, b TEXT, c TIMESTAMP);
CREATE FOREIGN TABLE kafka_json (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
x abc)
SERVER kafka_server
OPTIONS (format 'json', topic 'json', batch_size '30', buffer_delay '100');
INSERT INTO kafka_json (x) VALUES ((1, 'test', current_timestamp));
SELECT * FROM kafka_json;
ERROR: malformed record literal: "{"a":1,"b":"test","c":"2018-07-04T11:16:55.671986"}"
DETAIL: Missing left parenthesis.
Postgres expects an input string formatted like:
(1,"test","2018-07-04T11:16:55.671986")
for composite type. The easy solution would be to just remove keys from input string and replace {}
with ()
. But this won't work if JSON document has different key order or extra or missing keys. So it makes sense to write a simple JSON parser to collect individual key-value pairs, reorder them if needed and fill the gaps with NULL
s. It's also possible to use built-in JSONB
facilities from postgres to parse and manage JSON documents but I anticipate that it would be less efficient than custom designed solution.
rm -f kafka_fdw.so libkafka_fdw.a libkafka_fdw.pc
rm -f src/connection.o src/kafka_fdw.o src/kafka_expr.o src/option.o src/parser.o src/planning.o src/utils.o src/connection.bc src/kafka_fdw.bc src/kafka_expr.bc src/option.bc src/parser.bc src/planning.bc src/utils.bc
rm -rf kafka_fdw--0.0.2.sql
rm -rf results/ regression.diffs regression.out tmp_check/ tmp_check_iso/ log/ output_iso/
x86_64-pc-linux-gnu-gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -pipe -ggdb -march=haswell -fPIC -std=c99 -Wall -Wextra -Wno-unused-parameter -I. -I./ -I/usr/include/postgresql-13/server -I/usr/include/postgresql-13/internal -D_GNU_SOURCE -c -o src/connection.o src/connection.c
x86_64-pc-linux-gnu-gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -pipe -ggdb -march=haswell -fPIC -std=c99 -Wall -Wextra -Wno-unused-parameter -I. -I./ -I/usr/include/postgresql-13/server -I/usr/include/postgresql-13/internal -D_GNU_SOURCE -c -o src/kafka_fdw.o src/kafka_fdw.c
STDERR:
Makefile:57: warning: overriding recipe for target 'installcheck'
/usr/lib64/postgresql-13/lib64/pgxs/src/makefiles/pgxs.mk:420: warning: ignoring old recipe for target 'installcheck'
Makefile:57: warning: overriding recipe for target 'installcheck'
/usr/lib64/postgresql-13/lib64/pgxs/src/makefiles/pgxs.mk:420: warning: ignoring old recipe for target 'installcheck'
src/kafka_fdw.c: In function �kafkaPlanForeignModify�:
src/kafka_fdw.c:1187:15: warning: implicit declaration of function �heap_open� [-Wimplicit-function-declaration]
1187 | rel = heap_open(rte->relid, NoLock);
| ^~~~~~~~~
src/kafka_fdw.c:1187:13: warning: assignment to �Relation� {aka �struct RelationData *�} from �int� makes pointer from integer without a cast [-Wint-conversion]
1187 | rel = heap_open(rte->relid, NoLock);
| ^
src/kafka_fdw.c:1207:5: warning: implicit declaration of function �heap_close� [-Wimplicit-function-declaration]
1207 | heap_close(rel, NoLock);
| ^~~~~~~~~~
src/kafka_fdw.c: In function �kafkaBeginForeignModify�:
src/kafka_fdw.c:1297:28: warning: passing argument 1 of �lnext� from incompatible pointer type [-Wincompatible-pointer-types]
1297 | next = lnext(lc);
| ^~
| |
| ListCell * {aka union ListCell *}
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
from /usr/include/postgresql-13/server/access/htup_details.h:19,
from src/kafka_fdw.h:12,
from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:321:19: note: expected �const List *� {aka �const struct List *�} but argument is of type �ListCell *� {aka �union ListCell *�}
321 | lnext(const List *l, const ListCell *c)
| ~~~~~~~~~~~~^
src/kafka_fdw.c:1297:22: error: too few arguments to function �lnext�
1297 | next = lnext(lc);
| ^~~~~
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
from /usr/include/postgresql-13/server/access/htup_details.h:19,
from src/kafka_fdw.h:12,
from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:321:1: note: declared here
321 | lnext(const List *l, const ListCell *c)
| ^~~~~
src/kafka_fdw.c:1301:35: error: too many arguments to function �list_delete_cell�
1301 | festate->attnumlist = list_delete_cell(festate->attnumlist, lc, prev);
| ^~~~~~~~~~~~~~~~
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
from /usr/include/postgresql-13/server/access/htup_details.h:19,
from src/kafka_fdw.h:12,
from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:538:14: note: declared here
538 | extern List *list_delete_cell(List *list, ListCell *cell);
| ^~~~~~~~~~~~~~~~
make: *** [<builtin>: src/kafka_fdw.o] Error 1
we already query some watermark infos on scanning here
Line 1127 in 9dc5eaa
What would be great is having some helper function to present this data to the end user
e.g.
SELECT * FROM kafka_get_watermarks('table_name');
partition | offset_low | offsset_high
-----------+------------+--------------
1 | 1001 | 3001
2 | 1002 | 3002
3 | 1003 | 3003
4 | 1004 | 3004
5 | 1005 | 3005
6 | 1006 | 3006
7 | 1007 | 3007
8 | 1008 | 3008
9 | 1009 | 3009
10 | 1010 | 3010
11 | 1011 | 3011
12 | 1012 | 3012
(12 rows)
i try with
delete from test_kafka where id=102; update test_kafka SET modified_date='now()' WHERE restaurant_id=12153;
and i get the error
ERROR: cannot update foreign table "foregin_table"
ERROR: cannot delete foreign table "foregin_table"
and I get the another error when I call transaction rollback;
ERROR: 42P01: relation "kafka.broker" does not exist
thanks
If there is no connection to the kafka broker an attempt to establish such a connection hangs indefinitely and doesn't check for interrupts.
Unfortunately I don't have a full backtrace, but here's librdkafka
part of it:
#0 0x00007f127898d96a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00007f1279e61e59 in cnd_timedwait (cond=cond@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, ts=ts@entry=0x7ffe3d7ca300) at tinycthread.c:462
#2 0x00007f1279e62273 in cnd_timedwait_abs (cnd=cnd@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, tspec=tspec@entry=0x7ffe3d7ca300) at tinycthread_extra.c:100
#3 0x00007f1279e2bdff in rd_kafka_q_serve (rkq=rkq@entry=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN,
callback=callback@entry=0x7f1279df67f0 <rd_kafka_consume_cb>, opaque=opaque@entry=0x7ffe3d7ca410) at rdkafka_queue.h:475
#4 0x00007f1279df6f51 in rd_kafka_consume_callback0 (rkq=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=<optimized out>, consume_cb=<optimized out>, opaque=<optimized out>)
at rdkafka.c:2617
In my input handling branch I have been able to cause certain misbehaviours on bad inputs into Kafka.
So far there are no segmentation faults or similar errors, but these could still cause serious issues reading the Kafka data. The major difficulties have to do with handling bad representations of expected data types.
So for example if there is an integer field in the foreign table based on csv and I write a record to Kafka with that field set to a value of "foo" or "100000000000" I will get an error such as "Integer out of range" or "Invalid representation of integer."
This means that we have an important case we have to address, which is what happens when the data is actually unexpected or bad in Kafka. Do we skip over those records? Do we ignore the problem and assume we fix it on the Kafka side? Do we do something else?
For test cases you can insert into the Kafka queue CSV examples like:
100000000000,100000000000,1000000000,1000000000
"Test", 123, 122.22
"Test, the game", 1222, 122,22
122,22
Foo,Bar,Baz,123
{"foo" 123, "Bar": 134}
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.