Git Product home page Git Product logo

node-sinek's Introduction

High Level Node.js Kafka Client

Build Status npm version

The most advanced Kafka Client.

Features

  • easy promise based API
  • a lot of Kafka pitfalls already taken care of
  • backpressure and stream consume modes
  • secure committing in backpressure (1:n, batch) mode
  • plain Javascript implementation based on kafka-node and a super fast native implementation based on node-rdkafka
  • SSL, SASL & Kerberos support
  • auto reconnects
  • auto partition recognition and deterministic spreading for producers
  • intelligent health-checks and analytic events for consumers and producers

You might also like

Latest Changes

Can be found here

Install

npm install --save sinek

Usage

Usage - JS Client (based on kafka.js)

const {
  JSConsumer,
  JSProducer
} = require("sinek");

const jsProducerConfig = {
  clientId: "my-app",
  brokers: ["kafka1:9092"]
}

(async () => {

  const topic = "my-topic";

  const producer = new JSProducer(jsProducerConfig);
  const consumer = new JSConsumer(topic, jsConsumerConfig);

  producer.on("error", error => console.error(error));
  consumer.on("error", error => console.error(error));

  await consumer.connect();

  // consume from a topic.
  consumer.consume(async (messages) => {
    messages.forEach((message) => {
      console.log(message);
    })
  });

  // Produce messages to a topic.
  await producer.connect();
  producer.send(topic, "a message")
})().catch(console.error);

Further Docs

make it about them, not about you

  • Simon Sinek

node-sinek's People

Contributors

alkiskal avatar atd-schubert avatar c24w avatar darky avatar dependabot[bot] avatar elmarx avatar gilesbradshaw avatar greenkeeper[bot] avatar holgeradam avatar imnotjames avatar juriwiens avatar krystianity avatar maikelmclauflin avatar rd-johannes-jambor avatar rjmasikome avatar rob3000 avatar sondremare avatar ubershmekel avatar vidhill avatar vuza avatar yacut avatar zakjholt avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-sinek's Issues

An in-range update of async is breaking the build ๐Ÿšจ

The dependency async was updated from 3.1.0 to 3.1.1.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

async is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

An in-range update of eslint is breaking the build ๐Ÿšจ

The devDependency eslint was updated from 5.15.1 to 5.15.2.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

eslint is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

Release Notes for v5.15.2
  • 29dbca7 Fix: implicit-arrow-linebreak adds extra characters (fixes #11268) (#11407) (Mark de Dios)
  • 5d2083f Upgrade: [email protected] (#11513) (Teddy Katz)
  • a5dae7c Fix: Empty glob pattern incorrectly expands to "/**" (#11476) (Ben Chauvette)
  • 448e8da Chore: improve crash reporting (fixes #11304) (#11463) (Alex Zherdev)
  • 0f56dc6 Chore: make config validator params more consistent (#11435) (่–›ๅฎš่ฐ”็š„็Œซ)
  • d6c1122 Docs: Add working groups to maintainer guide (#11400) (Nicholas C. Zakas)
  • 5fdb4d3 Build: compile deps to ES5 when generating browser file (fixes #11504) (#11505) (Teddy Katz)
  • 06fa165 Build: update CI testing configuration (#11500) (Reece Dunham)
  • 956e883 Docs: Fix example in no-restricted-modules docs (#11454) (Paul Oโ€™Shannessy)
  • 2c7431d Docs: fix json schema example dead link (#11498) (kazuya kawaguchi)
  • e7266c2 Docs: Fix invalid JSON in "Specifying Parser Options" (#11492) (Mihira Jayasekera)
  • 6693161 Sponsors: Sync README with website (ESLint Jenkins)
  • 62fee4a Chore: eslint-config-eslint enable comma-dangle functions: "never" (#11434) (่–›ๅฎš่ฐ”็š„็Œซ)
  • 34a5382 Build: copy bundled espree to website directory (#11478) (Pig Fang)
  • f078f9a Chore: use "file:" dependencies for internal rules/config (#11465) (Teddy Katz)
  • 0756128 Docs: Add visualstudio to formatter list (#11480) (Patrick Eriksson)
  • 44de9d7 Docs: Fix typo in func-name-matching rule docs (#11484) (Iulian Onofrei)
Commits

The new version differs by 19 commits.

  • f354770 5.15.2
  • cada7a1 Build: changelog update for 5.15.2
  • 29dbca7 Fix: implicit-arrow-linebreak adds extra characters (fixes #11268) (#11407)
  • 5d2083f Upgrade: [email protected] (#11513)
  • a5dae7c Fix: Empty glob pattern incorrectly expands to "/**" (#11476)
  • 448e8da Chore: improve crash reporting (fixes #11304) (#11463)
  • 0f56dc6 Chore: make config validator params more consistent (#11435)
  • d6c1122 Docs: Add working groups to maintainer guide (#11400)
  • 5fdb4d3 Build: compile deps to ES5 when generating browser file (fixes #11504) (#11505)
  • 06fa165 Build: update CI testing configuration (#11500)
  • 956e883 Docs: Fix example in no-restricted-modules docs (#11454)
  • 2c7431d Docs: fix json schema example dead link (#11498)
  • e7266c2 Docs: Fix invalid JSON in "Specifying Parser Options" (#11492)
  • 6693161 Sponsors: Sync README with website
  • 62fee4a Chore: eslint-config-eslint enable comma-dangle functions: "never" (#11434)

There are 19 commits in total.

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

Throw an error when Consumer can't connect

Right now if you try to connect with the consumer and it cannot for whatever reason the Consumer never throws an error so there is no way to know that this happened and is indistinguishable from just waiting.

You should be able to provide a connection timeout parameter and have the lib throw an error when that timeout span has elapsed

Implementing Murmur2 partitioner, matching the Java client's partitioning logic

Hi,

thanks for the awesome library! The default Kafka Java client is using the Murmur2 Hash function for it's partitioning algorithm. Since our system uses different clients, for us it's important, that they use consistent partitioning functions. Unfortunately neither library, nor node-rdkafka, nor librdkafka implement's the algorithm used at the default Java client. Although there is a pull request for a Murmur2 partitioner pending at librdkafka (confluentinc/librdkafka#1468), we are going to implement our own partitioning function and maybe switch to the librdkafka implementation, once it's live and supported by node-sinek and the underlaying library node-rdkafka.
Is a pull request for node-sinek, supporting the Murmur2 partitioning function welcome if well implemented? If yes we are going to fork this library and then send a pull request, otherwise we are going to solve this problem on application level.

Thank you,
Marlon

Weird behaviour with _firstMessageConsumed

I'm using the NConsumer as suggested with the following settings
batchSize: 3,
noBatchCommits: true (I want to commit every batch regardless of how many messages it handled, e.g. batchSize is 3, messages are only 2, I want to commit if it handled 2)
manualBatching: true (I want all the messages of the batch to be passed on my sync event)

The behaviour I'm seeing is that I have 3 messages in my topic. I would expect to have my syncevent called exactly once with three messages. I see the code and I discover the _firstMessageConsumed and the first-drain-message. So what this does is,

  • it calls my syncevent one time for the first message and a second time with all three messages including the first message.
  • After every batch (regardless of actual messages length) I commit manually (synchronously).

So I have three messages, and batchSize 3.

  • syncvent called with the first message (let's call it A)
  • commit
  • check the kafka offset in the gui and the current offset from 0 goes to 3 (although I handled one message)
  • syncevent called with all three messages (including message A)

This is very odd. What am I missing?

  1. Why the first message is being passed two times?
  2. Why the current offset of the consumer goes to 3 when I only handled one message?

An in-range update of sinon is breaking the build ๐Ÿšจ

The devDependency sinon was updated from 7.2.3 to 7.2.4.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

sinon is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Commits

The new version differs by 13 commits.

  • 06fc27d Update docs/changelog.md and set new release id in docs/_config.yml
  • 54da371 Add release documentation for v7.2.4
  • e5de1fe 7.2.4
  • d158672 Update CHANGELOG.md and AUTHORS for new release
  • 1431c78 minor package updates
  • 37c955d Merge pull request #1979 from fatso83/update-npm-deps
  • fc2a32a Merge pull request #1975 from ehmicky/master
  • 85f2fcd Update eslint-plugin-mocha
  • 707e068 Fix high prio audit warnings
  • 8282bc0 Update nise to use @sinonjs/text-encoding
  • c1d9625 Make all properties non-enumerable in spies, stubs, mocks and fakes
  • 894951c Merge pull request #1973 from mgred/default-sandbox-example
  • 876aebb docs(sandbox): add example for default sandbox

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

Cannot install node-rdkafka because you updated to Mojave?

Here is the fix:

  1. xcode-select --install
  2. (make sure your have the xcode app installed, if not get it from the appstore)
  3. brew reinstall openssl (optional)
  4. LDFLAGS='-L/usr/local/opt/openssl/lib' CPPFLAGS='-I/usr/local/opt/openssl/include' yarn

OpenSSL might be installed but is not linked anymore when updating to mojave, this way yarn or npm wont be able to install (compile) node-rdkafka via node-gyp. By passing the flags you can solve this issue.

An in-range update of mocha is breaking the build ๐Ÿšจ

The devDependency mocha was updated from 6.2.0 to 6.2.1.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

mocha is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Commits

The new version differs by 11 commits.

  • ef6c820 Release v6.2.1
  • 9524978 updated CHANGELOG for v6.2.1 [ci skip]
  • dfdb8b3 Update yargs to v13.3.0 (#3986)
  • 18ad1c1 treat '--require esm' as Node option (#3983)
  • fcffd5a Update yargs-unparser to v1.6.0 (#3984)
  • ad4860e Remove extraGlobals() (#3970)
  • b269ad0 Clarify effect of .skip() (#3947)
  • 1e6cf3b Add Matomo to website (#3765)
  • 91b3a54 fix style on mochajs.org (#3886)
  • 0e9d8ad tty.getWindowSize is not a function inside a "worker_threads" worker (#3955)
  • 48da42e Remove jsdoc index.html placeholder from eleventy file structure and fix broken link in jsdoc tutorial (#3966)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

An in-range update of lodash.merge is breaking the build ๐Ÿšจ

The dependency lodash.merge was updated from 4.6.1 to 4.6.2.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

lodash.merge is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

AutoFlush for buffer in publisher?

Hi,

I really like your package, great work. It would be even more pleasant if the Publisher class had an auto-flush option when using bufferPublishMessage. That way I would not have to trace the total number of messages myself.

Any chance of adding that feature?

Cheers!

An in-range update of bluebird is breaking the build ๐Ÿšจ

The dependency bluebird was updated from 3.5.2 to 3.5.3.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

bluebird is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

Release Notes for v3.5.3

Bugfixes:

  • Update acorn dependency
Commits

The new version differs by 7 commits.

  • a5a5b57 Release v3.5.3
  • c8a7714 update packagelock
  • 8a765fd Update getting-started.md (#1561)
  • f541801 deps: update acorn and add acorn-walk (#1560)
  • 247e512 Update promise.each.md (#1555)
  • e2756e5 fixed browser cdn links (#1554)
  • 7cfa9f7 Changed expected behaviour when promisifying (#1545)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

Error when passing empty array to NConsumer#adjustSubscription

When passing an empty array (removing all subscriptions) to NConsumer#adjustSubscription() the following error occurs.

 Error: Local: Invalid argument or configuration
      at Function.createLibrdkafkaError [as create] (node_modules/node-rdkafka/lib/error.js:260:10)
      at KafkaConsumer.Client._errorWrap (node_modules/node-rdkafka/lib/client.js:470:29)
      at KafkaConsumer.subscribe (node_modules/node-rdkafka/lib/kafka-consumer.js:290:8)
      at NConsumer.adjustSubscription (node_modules/sinek/lib/librdkafka/NConsumer.js:669:19)

I worked around this by calling the unsubscribe() method of the underlying consumer - like adjustSubscription() does before setting the new subscription - in case the new topic list is emtpy.

Thanks and have a nice day!

No way to commit one message at a time when many are in the queue

Hi @krystianity it seems I'm losing data. I was hoping to use node-sinek after reading this statement:

auto-commit / manual drain commit in backpressure-mode (dont loose data)

My use-case is a minute-long process for every message. It's important for me to commit every time I finish processing a single message and not before I'm done processing. Currently because of Drainer._q I see I'm buffering tens of messages and the queue is only drained and committed after many are processed. If I try to consumer.consumer._commit then all of my queued messages are committed.

How does `concurrency` setting behave for partitions?

Hi there!

I'm looking into processing partitions in parallel, but each message in a partition one at a time in order.

In the README, I see:

js
const options = {
  batchSize: 500, //grab up to 500 messages per batch round
  commitEveryNBatch: 5, //commit all offsets on every 5th batch
  concurrency: 2, //calls synFunction in parallel * 2 for messages in batch
  // ...
};

myNConsumer.consume(syncFunction, true, false, options);

If I have 4 partitions, will the concurrency: 2 setting mean that I risk processing 2 messages from the same partition in parallel?

Thanks!

Typescript

Are you open to someone contributing Typescript definitions to sinek?

SSL example?

I have a kafka cluster that is configured using SSL. And what I have for properties are:

  • SSL_TRUSTSTORE_LOCATION
  • SSL_TRUSTSTORE_PASSWORD
  • BROKER_LIST (with SSL://<...> based URLs)

I'm not entirely clear how I would setup a client to consume from that kafka cluster...

Alpine Dockerfile is broken

$ docker build .
Sending build context to Docker daemon  3.584kB
Step 1/7 : FROM node:alpine
 ---> 1df31d366c81
Step 2/7 : RUN apk add --upgrade --no-cache      alpine-sdk     libc6-compat     bash     make     gcc     g++     python     cyrus-sasl-dev     libressl2.5-libcrypto --repository http://dl-3.alpinelinux.org/alpine/edge/main/ --allow-untrusted     libressl2.5-libssl --repository http://dl-3.alpinelinux.org/alpine/edge/main/ --allow-untrusted     librdkafka-dev --repository http://dl-3.alpinelinux.org/alpine/edge/community/ --allow-untrusted     dumb-init --repository http://dl-3.alpinelinux.org/alpine/edge/community/ --allow-untrusted
 ---> Running in 5edf1454954d
fetch http://dl-3.alpinelinux.org/alpine/edge/community/x86_64/APKINDEX.tar.gz
fetch http://dl-3.alpinelinux.org/alpine/edge/main/x86_64/APKINDEX.tar.gz
fetch http://dl-cdn.alpinelinux.org/alpine/v3.8/main/x86_64/APKINDEX.tar.gz
fetch http://dl-cdn.alpinelinux.org/alpine/v3.8/community/x86_64/APKINDEX.tar.gz
ERROR: unsatisfiable constraints:
  libressl2.5-libcrypto (missing):
    required by: world[libressl2.5-libcrypto]
  libressl2.5-libssl (missing):
    required by: world[libressl2.5-libssl]
The command '/bin/sh -c apk add --upgrade --no-cache      alpine-sdk     libc6-compat     bash     make     gcc     g++     python     cyrus-sasl-dev     libressl2.5-libcrypto --repository http://dl-3.alpinelinux.org/alpine/edge/main/ --allow-untrusted     libressl2.5-libssl --repository http://dl-3.alpinelinux.org/alpine/edge/main/ --allow-untrusted     librdkafka-dev --repository http://dl-3.alpinelinux.org/alpine/edge/community/ --allow-untrusted     dumb-init --repository http://dl-3.alpinelinux.org/alpine/edge/community/ --allow-untrusted' returned a non-zero code: 2

Incorrect error message

Hi.

Sinek could produce incorrect error messages while loading native rdkafka client. E.g. here https://github.com/nodefluent/node-sinek/blob/master/lib/librdkafka/NProducer.js#L73 original error is swallowed and new custom error is thrown.

However as rdkafka is native module it could throw not only "Module not found" exceptions. It may also throw exceptions like: "Error relocating... symbol not found". And this is important to preserve original error message or at least show it in debug.

setInterval in native Kafka uses wrong delay

In node-sinek-master/lib/librdkafka/NConsumer.js

    lagFetchInterval = lagFetchInterval || 1e6 * 60 * 3; //3 minutes

    this._analyticsIntv = setInterval(this._runAnalytics.bind(this), analyticsInterval);
    this._lagCheckIntv = setInterval(this._runLagCheck.bind(this), lagFetchInterval);

setInterval and setTimeout use milliseconds, not microseconds. Using a parameter like 45000000 will mean "run this every 45000 seconds".

An in-range update of node-rdkafka is breaking the build ๐Ÿšจ

The optionalDependency node-rdkafka was updated from 2.4.1 to 2.4.2.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

node-rdkafka is a optionalDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

Commits

The new version differs by 4 commits.

  • 4d7ecea Version bump to 2.4.2
  • e82bb2e ConsumeNum returns messages on partition EOF
  • 8687ff1 Point to the correct CONFIGURATION.md
  • 2a4e078 Add isConnected definition (#476)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

An in-range update of uuid is breaking the build ๐Ÿšจ

The dependency uuid was updated from 3.3.2 to 3.3.3.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

uuid is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Commits

The new version differs by 3 commits.

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

Murmur2 partitioner fails for 'รถ', 'รค', 'รผ', etc. to be consistent with Kafka's default Java client

... example: The message with the key "thingรครถรผร„ร–รœ" with a partition count of 10 should be written to the partition 2, but gets written to the partition 4 by the current implementation. Although this is not used here, it is due to the same bug described in this issue: b3nj4m/murmurhash-js#1

I propose to use the npm package murmur-hash-js instead of murmurhash and pass a buffer instead of a string, which leads to results, consistent with Kafka's Java implementation.

The bug described got tested with this bug (#39) and that for assuming it was already resolved.

For reference see the following, tested, implementation: https://github.com/vuza/murmur2-partitioner

Broker transport failure

I'm playing around with your module, but when I try the sasl-ssl-example by calling

DEBUG=* node producer.js

I'm getting back:

sinek:error Error: Local: Broker transport failure
sinek:error at Function.createLibrdkafkaError [as create] (/media/gitprodenv/Elements1/Coding/ApacheKafka/node-sinek/examples/sasl-ssl-example/node_modules/node-rdkafka/lib/error.js:334:10)
sinek:error at /media/gitprodenv/Elements1/Coding/ApacheKafka/node-sinek/examples/sasl-ssl-example/node_modules/node-rdkafka/lib/client.js:339:28 +0ms
sinek:error Error: Local: Broker transport failure
sinek:error at Function.createLibrdkafkaError [as create] (/media/gitprodenv/Elements1/Coding/ApacheKafka/node-sinek/examples/sasl-ssl-example/node_modules/node-rdkafka/lib/error.js:334:10)
sinek:error at /media/gitprodenv/Elements1/Coding/ApacheKafka/node-sinek/examples/sasl-ssl-example/node_modules/node-rdkafka/lib/client.js:339:28 +2ms

Apache Kafka and Zookeeper is set up by calling npm run kafka:start as described in the docs.

Any help is highly appreciated!

An in-range update of kafka-node is breaking the build ๐Ÿšจ

Version 3.0.1 of kafka-node was just published.

Branch Build failing ๐Ÿšจ
Dependency kafka-node
Current Version 3.0.0
Type dependency

This version is covered by your current version range and after updating it in your project the build failed.

kafka-node is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Commits

The new version differs by 5 commits.

  • 2185767 3.0.1 (#1078)
  • bfcc7ff Fix checking new partitions for topics with dots (#1076)
  • 79a49ec Use double ended queue instead of Array for message buffer (#1067)
  • 76f6939 Add test for PR #1066 (#1070)
  • 4e0477e Only set committing=true if it can be set false later (#1066)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

How can I pass a commit callback

Hi!
Is there a specific reason, why you overwrite the commit callback? How can I receive commit errors? Wouldn't it be nice to pass an optional callback to the commit methods, which overwrites the default callback? Or am I overlooking something?

noptions = Object.assign({}, config, noptions, overwriteConfig);

Thank you!

Possible race condition in NConsumer#close

Hi,

I am using sinek's NConsumer to connect to Kafka and came across faulty behavior when trying to disconnect from the broker. It seems that 8fd1846 introduced a race condition between the disconnected event listener (when it calls NConsumer#reset) and checking the abort condition in NConsumer#_singeConsumeRecursive.

The result of this is, that the NConsumer continues trying to read messages from the disconnected node-rdkafka consumer.

{ Error: KafkaConsumer is not connected\n    at Function.createLibrdkafkaError [as create] (/home/node/event-client/node_modules/node-rdkafka/lib/error.js:261:10)\n    at /home/node/event-client/node_modules/node-rdkafka/lib/kafka-consumer.js:448:29\n  message: 'KafkaConsumer is not connected',\n  code: -172,\n  errno: -172,\n  origin: 'kafka' }

Thanks in advance!

An in-range update of bluebird is breaking the build ๐Ÿšจ

The dependency bluebird was updated from 3.5.3 to 3.5.4.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

bluebird is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

Release Notes for v3.5.4
  • Proper version check supporting VSCode(#1576)
Commits

The new version differs by 6 commits.

  • e0222e3 Release v3.5.4
  • 4b9fa33 missing --expose-gc flag (#1586)
  • 63b15da docs: improve and compare Promise.each and Promise.mapSeries (#1565)
  • 9dcefe2 .md syntax fix for coming-from-other-languages.md (#1584)
  • b97c0d2 added proper version check supporting VSCode (#1576)
  • 499cf8e Update jsdelivr url in docs (#1571)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

An in-range update of bluebird is breaking the build ๐Ÿšจ

The dependency bluebird was updated from 3.7.1 to 3.7.2.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

bluebird is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Release Notes for v3.7.2

Bugfixes:

  • Fixes firefox settimeout not initialized error (#1623)
Commits

The new version differs by 2 commits.

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

An in-range update of node-rdkafka is breaking the build ๐Ÿšจ

The dependency node-rdkafka was updated from 2.7.0 to 2.7.1.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

node-rdkafka is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Commits

The new version differs by 14 commits.

  • 5a15a6c Fix typo in test description
  • 28143db Fix producer documentation (#643)
  • 8cd1e9e Avoid a memory leak when assigning fails
  • 2bd2b42 Update binding.gyp
  • f964298 Update librdkafka to 1.1.0 (#639)
  • 3d5d790 Fix make lib rule
  • e4adf30 VSCode and call for collaborators added to README
  • 14f9dc2 Update librdkafka to 1.0.1 (#627)
  • d2dd2a4 Enhance the travis-ci script (#609)
  • 19c27ea Specify the change to the offset.commit event in the typings and README (#626)
  • 6e22631 Support for Node.js 12.x (#610)
  • e5d3d0b Add travis environment variable to version check
  • 9d0c2d5 Add librdkafka version and presence checks
  • 01dde7d Some pre-flight ci stuff in prep for better ci

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

Implemented (murmur2) partitioning logic does not match the Java Kakfa client due to missing hash seed

Thank you for implementing both the murmur2 and 3 partitioning logic. Unfortunately it does not match the default Java Kafka client's logic.

Example: Key "wu" with a partition count of 10 should be written to partition 0, the current logic returns 5.

The Java client's seed for the murmur2 algorithm wasn't provided, see the Java implementation: https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L355

For murmur2, changing https://github.com/nodefluent/node-sinek/blob/master/lib/librdkafka/NProducer.js#L275 to return this._murmur(key, 0x9747b28c) % partitionCount; fixes the issue.

For reference, I've implemented a JS murmur2 partitioner including a testing, testing against Java's partitioner results: https://github.com/vuza/murmur2-partitioner

Is there a way to skip messages conditionally without halting the consumer?

Here is what I'm trying to do:

I have a topic recipes(3 partitions) and I have 2 consumer groups sweet and savoury. When I'm working with sweet I don't want to consume / process / commit any messages that belong to savoury I simply want to skip that message and vice versa.

Here is my code:

recipesListener('group-sweet', 'recipes', 'sweet');
recipesListener('group-savoury', 'recipes', 'savoury');

const { NConsumer } = require("sinek");
const { logger } = require("../logger");

const recipesListener = ({ groupId, topicName, groupType }) => {
  const consumerConfig = {
    logger: {
      debug: (msg) => logger.debug(msg),
      info: (msg) => logger.info(msg),
      warn: (msg) => logger.warn(msg),
      error: (msg) => logger.error(msg),
    },
    noptions: {
      "metadata.broker.list": "localhost:9092",
      "group.id": groupId,
      "socket.keepalive.enable": true,
      "queued.min.messages": 1000,
      "queued.max.messages.kbytes": 5000,
      "fetch.message.max.bytes": 524288,
    },
    tconf: {
      "auto.offset.reset": "latest",
    },
  };

  (async () => {
    const consumer = new NConsumer([topicName], consumerConfig);

    consumer.on("error", (error) => logger.error(error));
    consumer.on("ready", () =>
      logger.info(`Consumer ready to consume ${topicName} for ${groupId}`)
    );

    await consumer.connect();

    consumer.consume(
      async (messages, callback) => {
        /*
         *   messages are produced with headers that can have either sweet or savoury
         */
        if (messages.headers !== groupType) {
          return;
        }

        callback();
      },
      true,
      false,
      { noBatchCommits: true }
    );
  })().catch(logger.error);
};

module.exports = {
  recipesListener,
};

Currently the below lines halts the consumer completely. Is there a workaround for this? Please let me know.

        /*
         *   messages are produced with headers that can have either sweet or savoury
         */
        if (messages.headers !== groupType) {
          return;
        }

I'm using

Node v12.8.2
"node-rdkafka": "^2.9.1",
"sinek": "^9.1.0",

I'm a Kafka noob. Please let me know if any more details are needed.

Consumer Backpressure example is needed

Hello @krystianity
Can you please provide any pointers on how to implement backpressure with this client ? We would like to control how many messages we can consume. Is there way to commit them later on once processing is done and get more messages ?

Resetting offsets to reprocess all messages

Is it possible to "seek" to the beginning of a topic for a particular consumer group and partition?

The use case is being able to reprocess messages from the start. ๐Ÿ˜„

An in-range update of debug is breaking the build ๐Ÿšจ

The dependency debug was updated from 4.1.0 to 4.1.1.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

debug is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build could not complete due to an error (Details).

Commits

The new version differs by 4 commits.

  • 68b4dc8 4.1.1
  • 7571608 remove .coveralls.yaml
  • 57ef085 copy custom logger to namespace extension (fixes #646)
  • d0e498f test: only run coveralls on travis

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

An in-range update of kafka-node is breaking the build ๐Ÿšจ

The dependency kafka-node was updated from 4.0.2 to 4.0.3.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

kafka-node is a direct dependency of this project, and it is very likely causing it to break. If other packages depend on yours, this update is probably also breaking those in turn.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

Commits

The new version differs by 6 commits.

  • fe5a64e Bump version and update changelog for 4.0.3 release (#1215)
  • 29a1016 Improve consumer recovery (#1214)
  • 42d61e9 Add magic byte error to doc closes #1208 (#1212)
  • e2e2fa4 Update example consumer.js (#1211)
  • a693704 Update dependencies (#1204)
  • 0f15012 Add connect event for HighLevelConsumer in types/index.d.ts (#1201)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

Gracefully shutdown

Hello.

I'm wondering of how to terminate a consumer gracefully, for instance, syncEvent is still running, but consumer.close() has called.

I couldn't find what waits for running tasks until finishing below code.

close(commit = false) {
this.haltAnalytics();
if (this.consumer) {
this._inClosing = true;
this._resume = false; //abort any running recursive consumption
if(!commit){
this.consumer.disconnect();
//this.consumer = null;
} else {
this.consumer.commit();
this.config.logger.info("Committing on close.");
process.nextTick(() => {
this.consumer.disconnect();
//this.consumer = null;
});
}
}
}

Is there any code of termination gracefully in rdkafka or something else? Or does it just let them terminate without graceful termination? or would I implement code that clean the resources before shutdown for myself?

Thank you!

An in-range update of sinon is breaking the build ๐Ÿšจ

The devDependency sinon was updated from 7.3.1 to 7.3.2.

๐Ÿšจ View failing branch.

This version is covered by your current version range and after updating it in your project the build failed.

sinon is a devDependency of this project. It might not break your production code or affect downstream projects, but probably breaks your build or test tools, which may prevent deploying or publishing.

Status Details
  • โŒ continuous-integration/travis-ci/push: The Travis CI build failed (Details).

Commits

The new version differs by 6 commits.

  • 15a9d65 Update docs/changelog.md and set new release id in docs/_config.yml
  • 5d770e0 Add release documentation for v7.3.2
  • 585a1e9 7.3.2
  • b51901d Update CHANGELOG.md and AUTHORS for new release
  • 83861a7 Update Lolex to bring in fix for sinonjs/lolex#232
  • 2430fd9 Documentation (#2004)

See the full diff

FAQ and help

There is a collection of frequently asked questions. If those donโ€™t help, you can always ask the humans behind Greenkeeper.


Your Greenkeeper Bot ๐ŸŒด

Documentation?

This might very well be the most advanced client for JS/TS. However, to be honest, it's very hard even to get started due to the near-total lack of documentation. You should really take a week off from writing code and just work on docs, in my not so humble opinion. For instance, I struggle to see the difference between a Producer's send, buffer and bufferFormat methods. Please update the docs so we can use this driver!

Apparently there's a lack of good drivers for Kafka in the Node world in general, so if this is as good as you claim, please write documentation which shows it and underpins it.

Warning: enable.auto.commit has no effect

 if(this._isAutoCommitting !== null && typeof this._isAutoCommitting !== "undefined"){
          this.config.logger.warn("enable.auto.commit has no effect in 1:n consume-mode, set to null or undefined to remove this message." +
            "You can pass 'noBatchCommits' as true via options to .consume(), if you want to commit manually.");
        }

I'm passing noBatchCommits as true in the consume options and I do not pass anything for
enable.auto.commit. If I pass it as null, then kafka responds with error.

NConsumer: Support custom headers

NConsumer send: async send(topicName, message, _partition = null, _key = null, _partitionKey = null, _opaqueKey = null) {

rdkafka produce: Producer.prototype.produce = function(topic, partition, message, key, timestamp, opaque, headers) {

There is no way to send custom headers although the underlying implementation supports it.

Should/Would node-sinek, affect process.exit behaviour

I am replacing kafka-node with node-sinek in a project,

We had some fail on error logic, (- Netflix philosophy, let the pod die in an error scenario)

My calls to process.exit() do not seem to cause the node application to exit any more..

I am using NConsumer
Up until the point where I call the NConsumer constructor, I can exit the process as normal..

Thanks in advance,
D

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.