Git Product home page Git Product logo

pgwire's People

Contributors

exe-dealer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pgwire's Issues

TypeScript support

First off this is awesome work - we've been looking for a simpler alternative to Debezium that supported pgoutput for logical decoding. This fits the bill perfectly (and it's Node with 0 deps!).

You knew this was coming but what are your thoughts on converting this library to TypeScript? I would love to open some PR's to help.

Error.stack message overwrite

Object.assign(this._fetchError, err),

Error
    at new Response (/opt/web/node_modules/pgwire/lib/connection.js:476:24)
    at Connection._request (/opt/web/node_modules/pgwire/lib/connection.js:262:22)
    at Connection._queryExtended (/opt/web/node_modules/pgwire/lib/connection.js:62:17)
    at Connection.query (/opt/web/node_modules/pgwire/lib/connection.js:37:17)
    at Pool.query (/opt/web/node_modules/pgwire/lib/pool.js:19:19)
    at page_route (file:///opt/web/server.js:257:14)
    at convey (file:///opt/web/valves.js:36:10)
    at file:///opt/web/valves.js:63:32
    at convey (file:///opt/web/valves.js:36:10)
    at next (file:///opt/web/valves.js:37:36)

Add option to not use `JSON.parse` to decode `json` or `jsonb`

Thanks for the great lib

We're currently using the logical replication feature to publish messages from a table to a SNS topic
But some messages are big and the JSON.parse blocks the node process, also to publish it we need to do a JSON.stringify of the data again, resulting in performance issues.

We could benefit a lot by disabling the use of JSON.parse for the jsonb field, because in this case we only want to move the message to the destination, regardless of what is in the message

It could be an option in the LogicalReplicationOptions interface

I'll be willing to help with the necessary modifications

How to close replication?

Hello!
I would like to close connection gracefully on some event. How to do that?

const events = replstream.pgoutput();
for await (const pgomsg of events) {
  const saveSuccess = await sendToHana(hanaClient, pgomsg);
  if (saveSuccess) {
    replstream.ack(pgomsg.lsn);
  }
}
event.then( () => {
   // ???
})

Should .ack after .destroy throw error?

Calling replicationStream.ack after replicationStream.destroy will throw error in current implementation. But now I think that this case is not an exceptional and .ack after .destroy should be silently ignored

for await (const msg of replicationStream) {
  await handleMessage(msg); // replicationStream.destroy() was called during this await
  replicationStream.ack(msg.lsn);
}

Feature request: Make ackImmediate() return a Promise

It would be a simple change:

  ackImmediate(lsn) {
    this.ack(lsn);
    this._ackTimer.refresh();
    return new Promise(resolve => {
      if (this._tx.writableHighWaterMark > this._tx.writableLength) {
        this._tx.write(updateStatusMessage(incLsn(this._ackingLsn)), resolve);
      } else {
        resolve(false);
      }
    });
  }

Feature request: Add Pool.newConnection() which always creates a new connection, similar to Pool.logicalReplication()

Currently, pgwire.pool() method doesn't require awaiting (i.e. it returns a non-Promise), and this is great, because I can call it in e.g. objects' constructors.

Also, there is Pool.logicalReplication() method which is also very convenient: it always creates a new connection for streaming the changes.

But Pool can't create connections for another type of usecases: when I run a huge SELECT * FROM tbl query and want to stream the rows from it for minutes (the table is big). This also requires a new connection creation, and Pool can't do this.

I propose to close the gap and add newConnection() to Pool class. Otherwise, there is no other way to create a connection without awaiting on it (pgwire.connect() returns a Promise), and non-async connecting is a good thing to have.

Below is a code example, what I have to do - use Connection class directly (although it's not default-exported by the library). I could've used pgwire.pool() for this (with poolMaxConnections=1), but it doesn't have newConnection().

image

incorrect lastLsn

.endLsn of PrimaryKeepaliveMessage is greater than XLogData messages lsn. This causes replication slot moved to position before XLogData messages actually consumed

pgwire/mod.js

Line 1611 in 12826ad

if (lastLsn < msg.endLsn) lastLsn = msg.endLsn;

Sending multiple single-queries through the same connection?

The doc doesn't say if it's possible to do something like:

const conn = await pgwire.connect(...);
await Promise.all([
  conn.query("first query"),
  conn.query("second query"),
  ...
  conn.query("nth query")
]);

(Of course, it's an artificial example, in real life all these queries come from different parts of the code, and some of them may spawn while other queries are still running.)

  1. I suspect this is a totally working case, right? I.e. is it supported?
  2. If the answer to (1) is "yes", then what about stream-like queries? Imagine I run two huge "SELECT * FROM tbl1" and "SELECT * FROM tbl2" on the same connection and then for await on both of resulting streams. Will it work? Or the 2nd stream won't start until the 1st one finishes?

Feature request: support for “eager filtering” while reading from replication a slot

Preface: in our setup, we have 5 replication slots and 5 independent workers reading the same publication (i.e. the same set of tables in the same publication). All of those 5 workers basically read the same streamed rows from Postgres, but then decide, should they process the row or not, by doing id % 5 == N (where N is worker number). This helps to achieve better parallelism.

But in some sense it’s a waste of CPU, because in all of those 5 workers, pgwire decodes the whole row, all the attributes. Although in 4 cases of 5 it needs to unpack just the 1st column (id), do this % 5 comparison and, if the result doesn’t match the worker number, skip the row and save CPU on other attributes decoding.

Would be great to have a callback which, if passed, is fed with the packed (raw) row buffer, and if it returns false, skips attributes decoding in this buffer.

unsafe int8 json

select to_json(int8(9007199254740991 + 1))

JSON.parse parses all numbers as 53bit integers . When number is too big then JSON.parse silently loose precision what is not safe. Need to fix JSON.parse behavior (which will degrade perfomance) or document this pitfall

logical decoding using pgoutput, fetch the before data for all the updates.

I am trying to do logical replication using pgoutput decoder. While handling tables having toastable columns, If there is an update to a column which is not toasted, We see that the after event does not have all the column information. We also see that the before event is always null. Is it possible to get the before event as well for all the updates.

Example:

postgres@cdctest:postgres> describe publicationtest1.toast_types;
+--------------+-----------------------+----------------------------------------------------------------------------+
| Column       | Type                  | Modifiers                                                                  |
|--------------+-----------------------+----------------------------------------------------------------------------|
| id           | integer               |  not null default nextval('publicationtest1.toast_types_id_seq'::regclass) |
| an_integer   | integer               |                                                                            |
| varchar_col  | character varying(10) |                                                                            |
| toasted_col1 | text                  |                                                                            |
| toasted_col2 | text                  |                                                                            |
| toasted_col3 | json                  |                                                                            |
| toasted_col4 | jsonb                 |                                                                            |
+--------------+-----------------------+----------------------------------------------------------------------------+
Indexes:
    "toast_types_pkey" PRIMARY KEY, btree (id)

Time: 0.498s

update publicationtest1.toast_types set an_integer = 10;

The event received using logical decoding (pgoutput decoder) - We see that the after event does not have all the columns and the before event is null, is there any way we get all the before data too ?

{
  "tag": "update",
  "lsn": "00000538/A80004D8",
  "endLsn": "00000538/A80004D8",
  "time": {
    "type": "Buffer",
    "data": [0, 2, 99, 126, 193, 93, 62, 184]
  },
  "relation": {
    "relationid": 4181745,
    "schema": "publicationtest1",
    "name": "toast_types",
    "replicaIdentity": "d",
    "attrs": [{
      "flags": 1,
      "name": "id",
      "typeid": 23,
      "typemod": -1
    }, {
      "flags": 0,
      "name": "an_integer",
      "typeid": 23,
      "typemod": -1
    }, {
      "flags": 0,
      "name": "varchar_col",
      "typeid": 1043,
      "typemod": 14
    }, {
      "flags": 0,
      "name": "toasted_col1",
      "typeid": 25,
      "typemod": -1
    }, {
      "flags": 0,
      "name": "toasted_col2",
      "typeid": 25,
      "typemod": -1
    }, {
      "flags": 0,
      "name": "toasted_col3",
      "typeid": 114,
      "typemod": -1
    }, {
      "flags": 0,
      "name": "toasted_col4",
      "typeid": 3802,
      "typemod": -1
    }]
  },
  "before": null,
  "after": {
    "id": 3,
    "an_integer": 10,
    "varchar_col": "test"
  }
}

pool.session

implement connection pooling for pool.session accessor. Currently it creates new connection per call

Question: close the connection of the streamed query before the results are fully read

Hi. Is there a proper (clean) way to close the connection when a SELECT query is being streamed through it? I tried connection.end() and it does not terminate the connection if the query is still active.

E.g. imagine we run a SELECT on 100M rows and, after reading from it for 1 minute, we want to stop reading, preserve the last read position and close the connection (to later reschedule).

Currently, I have to use a hack to call conn.destroy() inside a stream's _destroy handler:

image

I suspect the fact that connection.end() doesn't help here may be a bug in the library. Because for replication streams it actually works fine:

image

P.S.
List of open connection at PG side: select application_name, pid, state, query from pg_stat_activity

pgoutput does not support message_cb, so pgwire can't work with pg_logical_emit_message() even theoretically

This is FYI just for the history, for people who'll google for it in the future. Because pgoutput doesn't define any message_cb callback (since it's not needed for pub-sub replication purposes).

Proof: https://github.com/postgres/postgres/blob/9de77b5453130242654ff0b30a551c9c862ed661/src/backend/replication/pgoutput/pgoutput.c#L111-L117

In comparison to this, wal2json does support messages:
https://github.com/eulerto/wal2json/blob/master/wal2json.c#L221

Feature request: Non-async pgwire.connect() version

Currently, one has to await on pgwire.connect() (because it returns a Promise). This makes it impossible to be called in e.g. constructor of some other class, so I have to do the following hack:

const Connection = require("pgwire/lib/connection");
...
constructor(...) {
  this.connection = new Connection({...});
}

I notices that pgwire.pool() does NOT required awaiting, and this is very convenient. I'd use pgwire.pool() everywhere instead of the above example, but unfortunately Pool doesn't allow to create new connections (see #14 for more details).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.