kagis / pgwire Goto Github PK
View Code? Open in Web Editor NEWPostgreSQL client library for Deno and Node.js that exposes all features of wire protocol.
License: MIT License
PostgreSQL client library for Deno and Node.js that exposes all features of wire protocol.
License: MIT License
First off this is awesome work - we've been looking for a simpler alternative to Debezium that supported pgoutput for logical decoding. This fits the bill perfectly (and it's Node with 0 deps!).
You knew this was coming but what are your thoughts on converting this library to TypeScript? I would love to open some PR's to help.
pgwire is not able to serialize bigint if the primary key is bigint
Line 493 in dad026a
Error
at new Response (/opt/web/node_modules/pgwire/lib/connection.js:476:24)
at Connection._request (/opt/web/node_modules/pgwire/lib/connection.js:262:22)
at Connection._queryExtended (/opt/web/node_modules/pgwire/lib/connection.js:62:17)
at Connection.query (/opt/web/node_modules/pgwire/lib/connection.js:37:17)
at Pool.query (/opt/web/node_modules/pgwire/lib/pool.js:19:19)
at page_route (file:///opt/web/server.js:257:14)
at convey (file:///opt/web/valves.js:36:10)
at file:///opt/web/valves.js:63:32
at convey (file:///opt/web/valves.js:36:10)
at next (file:///opt/web/valves.js:37:36)
Thanks for the great lib
We're currently using the logical replication feature to publish messages from a table to a SNS topic
But some messages are big and the JSON.parse
blocks the node process, also to publish it we need to do a JSON.stringify
of the data again, resulting in performance issues.
We could benefit a lot by disabling the use of JSON.parse
for the jsonb
field, because in this case we only want to move the message to the destination, regardless of what is in the message
It could be an option in the LogicalReplicationOptions
interface
I'll be willing to help with the necessary modifications
Hello!
I would like to close connection gracefully on some event. How to do that?
const events = replstream.pgoutput();
for await (const pgomsg of events) {
const saveSuccess = await sendToHana(hanaClient, pgomsg);
if (saveSuccess) {
replstream.ack(pgomsg.lsn);
}
}
event.then( () => {
// ???
})
Calling replicationStream.ack
after replicationStream.destroy
will throw error in current implementation. But now I think that this case is not an exceptional and .ack after .destroy should be silently ignored
for await (const msg of replicationStream) {
await handleMessage(msg); // replicationStream.destroy() was called during this await
replicationStream.ack(msg.lsn);
}
It would be a simple change:
ackImmediate(lsn) {
this.ack(lsn);
this._ackTimer.refresh();
return new Promise(resolve => {
if (this._tx.writableHighWaterMark > this._tx.writableLength) {
this._tx.write(updateStatusMessage(incLsn(this._ackingLsn)), resolve);
} else {
resolve(false);
}
});
}
Currently, pgwire.pool() method doesn't require awaiting (i.e. it returns a non-Promise), and this is great, because I can call it in e.g. objects' constructors.
Also, there is Pool.logicalReplication() method which is also very convenient: it always creates a new connection for streaming the changes.
But Pool can't create connections for another type of usecases: when I run a huge SELECT * FROM tbl
query and want to stream the rows from it for minutes (the table is big). This also requires a new connection creation, and Pool can't do this.
I propose to close the gap and add newConnection() to Pool class. Otherwise, there is no other way to create a connection without awaiting on it (pgwire.connect() returns a Promise), and non-async connecting is a good thing to have.
Below is a code example, what I have to do - use Connection class directly (although it's not default-exported by the library). I could've used pgwire.pool() for this (with poolMaxConnections=1), but it doesn't have newConnection().
because postgres encodes \t
\n
and other special chars different than json.
Line 2309 in 0dc927f
.endLsn of PrimaryKeepaliveMessage is greater than XLogData messages lsn. This causes replication slot moved to position before XLogData messages actually consumed
Line 1611 in 12826ad
The doc doesn't say if it's possible to do something like:
const conn = await pgwire.connect(...);
await Promise.all([
conn.query("first query"),
conn.query("second query"),
...
conn.query("nth query")
]);
(Of course, it's an artificial example, in real life all these queries come from different parts of the code, and some of them may spawn while other queries are still running.)
for await
on both of resulting streams. Will it work? Or the 2nd stream won't start until the 1st one finishes?Preface: in our setup, we have 5 replication slots and 5 independent workers reading the same publication (i.e. the same set of tables in the same publication). All of those 5 workers basically read the same streamed rows from Postgres, but then decide, should they process the row or not, by doing id % 5 == N
(where N is worker number). This helps to achieve better parallelism.
But in some sense it’s a waste of CPU, because in all of those 5 workers, pgwire decodes the whole row, all the attributes. Although in 4 cases of 5 it needs to unpack just the 1st column (id), do this % 5
comparison and, if the result doesn’t match the worker number, skip the row and save CPU on other attributes decoding.
Would be great to have a callback which, if passed, is fed with the packed (raw) row buffer, and if it returns false, skips attributes decoding in this buffer.
select to_json(int8(9007199254740991 + 1))
JSON.parse parses all numbers as 53bit integers . When number is too big then JSON.parse silently loose precision what is not safe. Need to fix JSON.parse behavior (which will degrade perfomance) or document this pitfall
Subj, FYI.
Also, streaming of SELECT queries is super-useful, because this is the essential thing to do before switching to incremental (pgoutput) replication - first read all the rows from the tables. The code is super-sharp.
(Just wanted to say "thanks".)
I am trying to do logical replication using pgoutput decoder. While handling tables having toastable columns, If there is an update to a column which is not toasted, We see that the after event does not have all the column information. We also see that the before event is always null. Is it possible to get the before event as well for all the updates.
Example:
postgres@cdctest:postgres> describe publicationtest1.toast_types;
+--------------+-----------------------+----------------------------------------------------------------------------+
| Column | Type | Modifiers |
|--------------+-----------------------+----------------------------------------------------------------------------|
| id | integer | not null default nextval('publicationtest1.toast_types_id_seq'::regclass) |
| an_integer | integer | |
| varchar_col | character varying(10) | |
| toasted_col1 | text | |
| toasted_col2 | text | |
| toasted_col3 | json | |
| toasted_col4 | jsonb | |
+--------------+-----------------------+----------------------------------------------------------------------------+
Indexes:
"toast_types_pkey" PRIMARY KEY, btree (id)
Time: 0.498s
update publicationtest1.toast_types set an_integer = 10;
The event received using logical decoding (pgoutput decoder) - We see that the after event does not have all the columns and the before event is null, is there any way we get all the before data too ?
{
"tag": "update",
"lsn": "00000538/A80004D8",
"endLsn": "00000538/A80004D8",
"time": {
"type": "Buffer",
"data": [0, 2, 99, 126, 193, 93, 62, 184]
},
"relation": {
"relationid": 4181745,
"schema": "publicationtest1",
"name": "toast_types",
"replicaIdentity": "d",
"attrs": [{
"flags": 1,
"name": "id",
"typeid": 23,
"typemod": -1
}, {
"flags": 0,
"name": "an_integer",
"typeid": 23,
"typemod": -1
}, {
"flags": 0,
"name": "varchar_col",
"typeid": 1043,
"typemod": 14
}, {
"flags": 0,
"name": "toasted_col1",
"typeid": 25,
"typemod": -1
}, {
"flags": 0,
"name": "toasted_col2",
"typeid": 25,
"typemod": -1
}, {
"flags": 0,
"name": "toasted_col3",
"typeid": 114,
"typemod": -1
}, {
"flags": 0,
"name": "toasted_col4",
"typeid": 3802,
"typemod": -1
}]
},
"before": null,
"after": {
"id": 3,
"an_integer": 10,
"varchar_col": "test"
}
}
implement connection pooling for pool.session accessor. Currently it creates new connection per call
Hi. Is there a proper (clean) way to close the connection when a SELECT query is being streamed through it? I tried connection.end() and it does not terminate the connection if the query is still active.
E.g. imagine we run a SELECT on 100M rows and, after reading from it for 1 minute, we want to stop reading, preserve the last read position and close the connection (to later reschedule).
Currently, I have to use a hack to call conn.destroy() inside a stream's _destroy handler:
I suspect the fact that connection.end() doesn't help here may be a bug in the library. Because for replication streams it actually works fine:
P.S.
List of open connection at PG side: select application_name, pid, state, query from pg_stat_activity
This is FYI just for the history, for people who'll google for it in the future. Because pgoutput doesn't define any message_cb callback (since it's not needed for pub-sub replication purposes).
In comparison to this, wal2json does support messages:
https://github.com/eulerto/wal2json/blob/master/wal2json.c#L221
Currently, one has to await on pgwire.connect() (because it returns a Promise). This makes it impossible to be called in e.g. constructor of some other class, so I have to do the following hack:
const Connection = require("pgwire/lib/connection");
...
constructor(...) {
this.connection = new Connection({...});
}
I notices that pgwire.pool() does NOT required awaiting, and this is very convenient. I'd use pgwire.pool() everywhere instead of the above example, but unfortunately Pool doesn't allow to create new connections (see #14 for more details).
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.