Git Product home page Git Product logo

kafka_fdw's Issues

Failed to stop consuming partition 706: Local: Unknown partition

I sometimes see the error
Failed to stop consuming partition 706: Local: Unknown partition
raised on Stop here
https://github.com/adjust/kafka_fdw/blob/master/src/kafka_fdw.c#L1026

however there is no such partition and this one should have never been consumed.
My thought is that the error was introduce on parallel support with using shared memory and happens if two concurrent scans mess up things

Reading composite types from Kafka is broken

CREATE TYPE abc AS (a INT, b TEXT, c TIMESTAMP);

CREATE FOREIGN TABLE kafka_json (
	part int OPTIONS (partition 'true'),
	offs bigint OPTIONS (offset 'true'),
	x abc)
SERVER kafka_server
OPTIONS (format 'json', topic 'json', batch_size '30', buffer_delay '100');

INSERT INTO kafka_json (x) VALUES ((1, 'test', current_timestamp));

SELECT * FROM kafka_json;
ERROR:  malformed record literal: "{"a":1,"b":"test","c":"2018-07-04T11:16:55.671986"}"
DETAIL:  Missing left parenthesis.

Postgres expects an input string formatted like:

(1,"test","2018-07-04T11:16:55.671986")

for composite type. The easy solution would be to just remove keys from input string and replace {} with (). But this won't work if JSON document has different key order or extra or missing keys. So it makes sense to write a simple JSON parser to collect individual key-value pairs, reorder them if needed and fill the gaps with NULLs. It's also possible to use built-in JSONB facilities from postgres to parse and manage JSON documents but I anticipate that it would be less efficient than custom designed solution.

Compilation failure with postgresql-13

rm -f kafka_fdw.so   libkafka_fdw.a  libkafka_fdw.pc
rm -f src/connection.o src/kafka_fdw.o src/kafka_expr.o src/option.o src/parser.o src/planning.o src/utils.o src/connection.bc src/kafka_fdw.bc src/kafka_expr.bc src/option.bc src/parser.bc src/planning.bc src/utils.bc
rm -rf kafka_fdw--0.0.2.sql
rm -rf results/ regression.diffs regression.out tmp_check/ tmp_check_iso/ log/ output_iso/
x86_64-pc-linux-gnu-gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -pipe -ggdb -march=haswell -fPIC -std=c99 -Wall -Wextra -Wno-unused-parameter -I. -I./ -I/usr/include/postgresql-13/server -I/usr/include/postgresql-13/internal  -D_GNU_SOURCE   -c -o src/connection.o src/connection.c
x86_64-pc-linux-gnu-gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Werror=vla -Wendif-labels -Wmissing-format-attribute -Wimplicit-fallthrough=3 -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -Wno-format-truncation -Wno-stringop-truncation -O2 -pipe -ggdb -march=haswell -fPIC -std=c99 -Wall -Wextra -Wno-unused-parameter -I. -I./ -I/usr/include/postgresql-13/server -I/usr/include/postgresql-13/internal  -D_GNU_SOURCE   -c -o src/kafka_fdw.o src/kafka_fdw.c
STDERR:
Makefile:57: warning: overriding recipe for target 'installcheck'
/usr/lib64/postgresql-13/lib64/pgxs/src/makefiles/pgxs.mk:420: warning: ignoring old recipe for target 'installcheck'
Makefile:57: warning: overriding recipe for target 'installcheck'
/usr/lib64/postgresql-13/lib64/pgxs/src/makefiles/pgxs.mk:420: warning: ignoring old recipe for target 'installcheck'
src/kafka_fdw.c: In function �kafkaPlanForeignModify�:
src/kafka_fdw.c:1187:15: warning: implicit declaration of function �heap_open� [-Wimplicit-function-declaration]
 1187 |     rel     = heap_open(rte->relid, NoLock);
      |               ^~~~~~~~~
src/kafka_fdw.c:1187:13: warning: assignment to �Relation� {aka �struct RelationData *�} from �int� makes pointer from integer without a cast [-Wint-conversion]
 1187 |     rel     = heap_open(rte->relid, NoLock);
      |             ^
src/kafka_fdw.c:1207:5: warning: implicit declaration of function �heap_close� [-Wimplicit-function-declaration]
 1207 |     heap_close(rel, NoLock);
      |     ^~~~~~~~~~
src/kafka_fdw.c: In function �kafkaBeginForeignModify�:
src/kafka_fdw.c:1297:28: warning: passing argument 1 of �lnext� from incompatible pointer type [-Wincompatible-pointer-types]
 1297 |         next       = lnext(lc);
      |                            ^~
      |                            |
      |                            ListCell * {aka union ListCell *}
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
                 from /usr/include/postgresql-13/server/access/htup_details.h:19,
                 from src/kafka_fdw.h:12,
                 from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:321:19: note: expected �const List *� {aka �const struct List *�} but argument is of type �ListCell *� {aka �union ListCell *�}
  321 | lnext(const List *l, const ListCell *c)
      |       ~~~~~~~~~~~~^
src/kafka_fdw.c:1297:22: error: too few arguments to function �lnext�
 1297 |         next       = lnext(lc);
      |                      ^~~~~
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
                 from /usr/include/postgresql-13/server/access/htup_details.h:19,
                 from src/kafka_fdw.h:12,
                 from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:321:1: note: declared here
  321 | lnext(const List *l, const ListCell *c)
      | ^~~~~
src/kafka_fdw.c:1301:35: error: too many arguments to function �list_delete_cell�
 1301 |             festate->attnumlist = list_delete_cell(festate->attnumlist, lc, prev);
      |                                   ^~~~~~~~~~~~~~~~
In file included from /usr/include/postgresql-13/server/access/tupdesc.h:19,
                 from /usr/include/postgresql-13/server/access/htup_details.h:19,
                 from src/kafka_fdw.h:12,
                 from src/kafka_fdw.c:1:
/usr/include/postgresql-13/server/nodes/pg_list.h:538:14: note: declared here
  538 | extern List *list_delete_cell(List *list, ListCell *cell);
      |              ^~~~~~~~~~~~~~~~
make: *** [<builtin>: src/kafka_fdw.o] Error 1 

add get_watermark helper function

we already query some watermark infos on scanning here

err = rd_kafka_query_watermark_offsets(

What would be great is having some helper function to present this data to the end user

e.g.

SELECT  * FROM kafka_get_watermarks('table_name');
 partition | offset_low | offsset_high
-----------+------------+--------------
         1 |       1001 |         3001
         2 |       1002 |         3002
         3 |       1003 |         3003
         4 |       1004 |         3004
         5 |       1005 |         3005
         6 |       1006 |         3006
         7 |       1007 |         3007
         8 |       1008 |         3008
         9 |       1009 |         3009
        10 |       1010 |         3010
        11 |       1011 |         3011
        12 |       1012 |         3012
(12 rows)

hello, can i update the foreign table?

i try with
delete from test_kafka where id=102; update test_kafka SET modified_date='now()' WHERE restaurant_id=12153;
and i get the error

ERROR: cannot update foreign table "foregin_table"
ERROR: cannot delete foreign table "foregin_table"

and I get the another error when I call transaction rollback;
ERROR: 42P01: relation "kafka.broker" does not exist

thanks

No connection timeout

If there is no connection to the kafka broker an attempt to establish such a connection hangs indefinitely and doesn't check for interrupts.

Unfortunately I don't have a full backtrace, but here's librdkafka part of it:

#0  0x00007f127898d96a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f1279e61e59 in cnd_timedwait (cond=cond@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, ts=ts@entry=0x7ffe3d7ca300) at tinycthread.c:462
#2  0x00007f1279e62273 in cnd_timedwait_abs (cnd=cnd@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, tspec=tspec@entry=0x7ffe3d7ca300) at tinycthread_extra.c:100
#3  0x00007f1279e2bdff in rd_kafka_q_serve (rkq=rkq@entry=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, 
    callback=callback@entry=0x7f1279df67f0 <rd_kafka_consume_cb>, opaque=opaque@entry=0x7ffe3d7ca410) at rdkafka_queue.h:475
#4  0x00007f1279df6f51 in rd_kafka_consume_callback0 (rkq=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=<optimized out>, consume_cb=<optimized out>, opaque=<optimized out>)
    at rdkafka.c:2617

Cases not handled well

In my input handling branch I have been able to cause certain misbehaviours on bad inputs into Kafka.

So far there are no segmentation faults or similar errors, but these could still cause serious issues reading the Kafka data. The major difficulties have to do with handling bad representations of expected data types.

So for example if there is an integer field in the foreign table based on csv and I write a record to Kafka with that field set to a value of "foo" or "100000000000" I will get an error such as "Integer out of range" or "Invalid representation of integer."

This means that we have an important case we have to address, which is what happens when the data is actually unexpected or bad in Kafka. Do we skip over those records? Do we ignore the problem and assume we fix it on the Kafka side? Do we do something else?

For test cases you can insert into the Kafka queue CSV examples like:

100000000000,100000000000,1000000000,1000000000
"Test", 123, 122.22
"Test, the game", 1222, 122,22
122,22
Foo,Bar,Baz,123
{"foo" 123, "Bar": 134}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.