Git Product home page Git Product logo

nats-python's Introduction

nats-python

Build Status codecov PyPI PyPI - Downloads

Python client for NATS messaging system.

This project is a replacement for abandoned pynats. nats-python supports only Python 3.6+ and fully covered with typings.

Go to the asyncio-nats project, if you're looking for asyncio implementation.

Installation

$ pip install nats-python

Usage

from pynats import NATSClient

with NATSClient() as client:
    # Connect
    client.connect()

    # Subscribe
    def callback(msg):
        print(f"Received a message with subject {msg.subject}: {msg}")

    client.subscribe(subject="test-subject", callback=callback)

    # Publish a message
    client.publish(subject="test-subject", payload=b"test-payload")

    # wait for 1 message
    client.wait(count=1)

Contributing

To work on the nats-python codebase, you'll want to clone the project locally and install the required dependencies via poetry:

$ git clone [email protected]:Gr1N/nats-python.git
$ make install

To run tests and linters use command below:

$ make lint && make test

If you want to run only tests or linters you can explicitly specify which test environment you want to run, e.g.:

$ make lint-black

License

nats-python is licensed under the MIT license. See the license file for details.

nats-python's People

Contributors

gr1n avatar richard78917 avatar yosshy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

nats-python's Issues

request for sample program

Hi,
could you provide a sample program to connect and publish the message.
i tried the below sample program but its failed.

import os
import socket
import threading
import time

from pynats import NATSClient
from pynats.exceptions import NATSInvalidSchemeError, NATSReadSocketError

def test():
nats_plain_url=socket.gethostbyname(socket.gethostname())+":4222"
client = NATSClient(nats_plain_url, socket_timeout=2)
client.connect()
client.ping()
client.close()

test()

File "test1.py", line 12, in test
client.connect()
File "/RMS/nrsp/nsptnaz1/nrsp/python/venv/lib/python3.8/site-packages/pynats/client.py", line 173, in connect
sock.connect((self._conn_options.hostname, self._conn_options.port))
TypeError: str, bytes or bytearray expected, not NoneType

[Errno 107] Transport endpoint is not connected

Hello,

When trying to post messages to a nats server, I am getting regular errors of the form:

[Errno 107] Transport endpoint is not connected

I set up the connection as follows:

self.nats_client = NATSClient(
            url=self.nats_url, socket_timeout=60., socket_keepalive=True, verbose=True
        )
        self.nats_client.connect()

After I receive the error, I try setting up the connection again:

        try:
            self.nats_client.publish(
                subject=self.nats_subject,
                payload=json.dumps(
                    self.nats_msg, indent=2
                ).encode('utf-8')
            )
        except Exception as e:
            self.nats_client.close()
            self.nats_client.connect()

Am I expected to run self.nats_client.connect() each time before I try publish? There is a performance cost for doing so, which is why I have tried to avoid doing it regularly.

Thanks
Gabriel

Auto reconnect

Hi, thank you for the nats-python.
It would be great if there will be an option to set auto reconnect to server.
I have found solution to detect disconnects when publish:
`

def publish(self, subject, data):
    try:
        self._nats.publish(subject, payload=bytes(json.dumps(data), encoding='utf-8'))
    except BrokenPipeError:
        while True:
            try:
                self._nats.reconnect()
                self.publish(subject, data)
                break
            except:
                pass

`

But I don't know how to detect disconnects when subscribe, program just freezes

Timeout option on request

Great work with the library!

It would be very beneficial if there was a "timeout" option on client.wait and client.request.
As it is now they will just block forever in case of an error.

Thanks!

Subscribe does not pass queue to nats-server

git diff
diff --git a/pynats/client.py b/pynats/client.py
index 9b23311..8083063 100644
--- a/pynats/client.py
+++ b/pynats/client.py
@@ -166,7 +166,7 @@ class NATSClient:
sub = NATSSubscription(
sid=self._ssid,
subject=subject,

  •        queue="",
    
  •        queue=queue,
           callback=callback,
           max_messages=max_messages,
       )
    

Does this client work out of the box with a postgres backend?

I'm running the streaming server locally with

nats-streaming-server --debug -store sql -sql_driver postgres --sql_source="user=postgres password=postgres host=localhost port=5432 sslmode=disable database=nats"

In one process I write messages with:

with NATSClient(url='nats://0.0.0.0:4222', socket_timeout=2) as client:
    client.publish(
        'message-name',
        payload=json.dumps({'sample_payload_id': 1})
     )

In another process I am trying to read them with:

with NATSClient(url='nats://0.0.0.0:4222', socket_timeout=2) as client:
   client.subscribe(
        'message-name',
        callback=message_callback
    )
   client.wait(count=1)

In this second process I keep getting socket timeouts. I've also checked my database, and don't see any messages in the table.

How to set properties other than the subject and queue when subscribing

I would like to subscribe with following properties

  1. a durable queue
  2. set the start at sequence
  3. close the connection without unsubscribing

The above properties are available in GOLANG client. I'm wondering they are already available or planned to add support in this client.

Set start at sequence in GO

sub, err := sc.Subscribe("foo",
  func(m *stan.Msg) {...},
  stan.StartAtSequence(22))

Set durable in GO
sc.Subscribe("foo", func(m *stan.Msg) {...}, stan.DurableName("my-durable"))

How to create a client without the "with" statement ?

Hi, thanks a lot for nats-python, that is a great project.
I have a very small class with static methods: the first one to initialize the connection and save it in a class variable, the second one to publish a message. It basically looks like the following.

class NATS(object):
    NATS_URL = os.getenv('NATS_URL', 'nats://nats:4222')
    client = None

    @staticmethod
    def initialize():
        while True:
            time.sleep(1)
            try:
                with NATSClient(NATS.NATS_URL) as client:
                    NATS.client = client
                    logging.debug("-> connected to NATS")
                return
            except Exception as e:
                logging.debug("-> cannot connect to NATS, retrying...")
                pass

    @staticmethod
    def publish(event):
        try:
           NATS.client.publish(">", payload=json.dumps(event).encode())
        except Exception as e:
             logging.error(str(e))

It is called from another module with:

NATS.initialize()
NATS.publish({"test":"1"})

But, as the connection is done with NATSClient(NATS.NATS_URL) as client, the file descriptor is closed right after the with instruction. I then get the following error when trying to publish a message.

ERROR:root:[Errno 9] Bad file descriptor

Is there a way to create a client without using "with" so the socket handler is not closed ?
Thanks a lot

Option to specify Cluster ID

Hello,

I'm trying to interact with Python and Go through NATs. I'm able to publish and subscribe with Python just fine. And I can also publish and subscribe with Go just fine.

The issue comes when trying to publish with Python and subscribe with Go. Or Vesa Versa.

I'm pretty sure I've tracked this down to nats-python not supporting the Cluster ID parameter.

The Go code requires a Cluster ID and using the default test-cluster doesn't seem to get Go on the same page as Python.

Here is the Trace and Debug output from GO:

[1] 2020/09/18 05:17:20.694421 [DBG] 127.0.0.1:48332 - cid:5 - Client connection created
[1] 2020/09/18 05:17:20.696551 [TRC] 127.0.0.1:48332 - cid:5 - <<- [CONNECT {"verbose":false,"pedantic":false,"auth_token":"defaultFissionAuthToken","tls_required":false,"name":"NATS Streaming Example Publisher","lang":"go","version":"1.9.1","protocol":1,"echo":true}]
[1] 2020/09/18 05:17:20.696599 [TRC] 127.0.0.1:48332 - cid:5 - <<- [PING]
[1] 2020/09/18 05:17:20.696607 [TRC] 127.0.0.1:48332 - cid:5 - ->> [PONG]
[1] 2020/09/18 05:17:20.698750 [TRC] 127.0.0.1:48332 - cid:5 - <<- [SUB _INBOX.pcTKl0ZQpfrPR1xTXq9Biu  1]
[1] 2020/09/18 05:17:20.698864 [TRC] 127.0.0.1:48332 - cid:5 - <<- [SUB _INBOX.pcTKl0ZQpfrPR1xTXq9Bo9  2]
[1] 2020/09/18 05:17:20.698940 [TRC] 127.0.0.1:48332 - cid:5 - <<- [SUB _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.*  3]
[1] 2020/09/18 05:17:20.698998 [TRC] 127.0.0.1:48332 - cid:5 - <<- [PUB _STAN.discover.fissionMQTrigger _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.Q3com5rU 72]
[1] 2020/09/18 05:17:20.699044 [TRC] 127.0.0.1:48332 - cid:5 - <<- MSG_PAYLOAD: ["\n\tclientPub\x12\x1d_INBOX.pcTKl0ZQpfrPR1xTXq9Biu\x18\x01\"\x16pcTKl0ZQpfrPR1xTXq9Bdf(\x050\x03"]
[1] 2020/09/18 05:17:20.699098 [TRC] 127.0.0.1:40994 - cid:2 - ->> [MSG _STAN.discover.fissionMQTrigger 2 _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.Q3com5rU 72]
[1] 2020/09/18 05:17:20.699393 [TRC] 127.0.0.1:40994 - cid:2 - <<- [PUB _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.Q3com5rU 224]
[1] 2020/09/18 05:17:20.699450 [TRC] 127.0.0.1:40994 - cid:2 - <<- MSG_PAYLOAD: ["\n _STAN.pub.8mwG6lwarlU4fIK08vltmD\x12 _STAN.sub.8mwG6lwarlU4fIK08vltmD\x1a\"_STAN.unsub.8mwG6lwarlU4fIK08vltmD\"\"_STAN.close.8mwG6lwarlU4fIK08vltmD2%_STAN.subclose.8mwG6lwarlU4fIK08vltmD:%_STAN.discover.fissionMQTrigger.pings@\x05H\x03P\x01"]
[1] 2020/09/18 05:17:20.699481 [TRC] 127.0.0.1:48332 - cid:5 - ->> [MSG _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.Q3com5rU 3 224]
[1] 2020/09/18 05:17:20.702273 [TRC] 127.0.0.1:48332 - cid:5 - <<- [SUB _STAN.acks.pcTKl0ZQpfrPR1xTXq9Byd  4]
[1] 2020/09/18 05:17:20.702303 [TRC] 127.0.0.1:48332 - cid:5 - <<- [PUB _STAN.pub.8mwG6lwarlU4fIK08vltmD.tv-request _STAN.acks.pcTKl0ZQpfrPR1xTXq9Byd 88]
[1] 2020/09/18 05:17:20.702313 [TRC] 127.0.0.1:48332 - cid:5 - <<- MSG_PAYLOAD: ["\n\tclientPub\x12\x162X8mH7xCDBwpJGbL1oujwD\x1a\ntv-request*\x0fTesting with Go2\x16pcTKl0ZQpfrPR1xTXq9Bdf"]
[1] 2020/09/18 05:17:20.702345 [TRC] 127.0.0.1:40994 - cid:2 - ->> [MSG _STAN.pub.8mwG6lwarlU4fIK08vltmD.tv-request 3 _STAN.acks.pcTKl0ZQpfrPR1xTXq9Byd 88]
[1] 2020/09/18 05:17:20.702467 [INF] STREAM: Channel "tv-request" has been created
[1] 2020/09/18 05:17:20.702661 [TRC] 127.0.0.1:40992 - cid:1 - <<- [PUB _STAN.acks.pcTKl0ZQpfrPR1xTXq9Byd 24]
[1] 2020/09/18 05:17:20.702672 [TRC] 127.0.0.1:40992 - cid:1 - <<- MSG_PAYLOAD: ["\n\x162X8mH7xCDBwpJGbL1oujwD"]
[1] 2020/09/18 05:17:20.702708 [TRC] 127.0.0.1:48332 - cid:5 - ->> [MSG _STAN.acks.pcTKl0ZQpfrPR1xTXq9Byd 4 24]
[1] 2020/09/18 05:17:20.705680 [TRC] 127.0.0.1:48332 - cid:5 - <<- [UNSUB 1 ]
[1] 2020/09/18 05:17:20.705751 [TRC] 127.0.0.1:48332 - cid:5 - <-> [DELSUB 1]
[1] 2020/09/18 05:17:20.705785 [TRC] 127.0.0.1:48332 - cid:5 - <<- [UNSUB 2 ]
[1] 2020/09/18 05:17:20.705831 [TRC] 127.0.0.1:48332 - cid:5 - <-> [DELSUB 2]
[1] 2020/09/18 05:17:20.705903 [TRC] 127.0.0.1:48332 - cid:5 - <<- [UNSUB 4 ]
[1] 2020/09/18 05:17:20.705954 [TRC] 127.0.0.1:48332 - cid:5 - <-> [DELSUB 4]
[1] 2020/09/18 05:17:20.706006 [TRC] 127.0.0.1:48332 - cid:5 - <<- [PUB _STAN.close.8mwG6lwarlU4fIK08vltmD _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.S1IbDJVg 11]
[1] 2020/09/18 05:17:20.706089 [TRC] 127.0.0.1:48332 - cid:5 - <<- MSG_PAYLOAD: ["\n\tclientPub"]
[1] 2020/09/18 05:17:20.706170 [TRC] 127.0.0.1:40994 - cid:2 - ->> [MSG _STAN.close.8mwG6lwarlU4fIK08vltmD 7 _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.S1IbDJVg 11]
[1] 2020/09/18 05:17:20.706432 [TRC] 127.0.0.1:40996 - cid:3 - <<- [PING]
[1] 2020/09/18 05:17:20.706464 [TRC] 127.0.0.1:40996 - cid:3 - ->> [PONG]
[1] 2020/09/18 05:17:20.706694 [TRC] 127.0.0.1:40994 - cid:2 - <<- [PUB _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.S1IbDJVg 0]
[1] 2020/09/18 05:17:20.706719 [TRC] 127.0.0.1:40994 - cid:2 - <<- MSG_PAYLOAD: [""]
[1] 2020/09/18 05:17:20.706736 [TRC] 127.0.0.1:48332 - cid:5 - ->> [MSG _INBOX.pcTKl0ZQpfrPR1xTXq9BtO.S1IbDJVg 3 0]
[1] 2020/09/18 05:17:20.712840 [DBG] 127.0.0.1:48332 - cid:5 - Client connection closed: Client Closed
[1] 2020/09/18 05:17:20.712958 [TRC] 127.0.0.1:48332 - cid:5 - <-> [DELSUB 3]

Notice on line 8 that it mentions the Cluster ID (which is: fissionMQTrigger)

Now here is the very small Python Trace and Debug output:

[1] 2020/09/18 05:15:46.950857 [DBG] 10.42.0.223:50942 - cid:4 - Client connection created
[1] 2020/09/18 05:15:46.951992 [TRC] 10.42.0.223:50942 - cid:4 - <<- [CONNECT {"name": "nats-python", "lang": "python", "protocol": 0, "version": "0.8.0", "verbose": false, "pedantic": false, "auth_token": "defaultFissionAuthToken"}]
[1] 2020/09/18 05:15:46.952640 [TRC] 10.42.0.223:50942 - cid:4 - <<- [PUB tv-request  45]
[1] 2020/09/18 05:15:46.952689 [TRC] 10.42.0.223:50942 - cid:4 - <<- MSG_PAYLOAD: ["{\"user\": null, \"text\": \"Testing with Python\"}"]
[1] 2020/09/18 05:15:46.952736 [DBG] 10.42.0.223:50942 - cid:4 - Client connection closed: Client Closed

Any advice on being able to specify the Cluster ID to make these guys line up?

Thank you,
Cody Hill

How to perform request-response inside one NATSClient object?

I really appreciate your project, and glad to see it developing,
But I'm wondering, is it possible to do request-response inside one NATSClient?
I've copied your example from README.md & modified it a bit:

from pynats import NATSClient

with NATSClient() as client:
    # Connect
    client.connect()

    # Subscribe
    def callback(msg):
        print(f"Received a message with subject {msg.subject}: {msg}")
        client.publish(subject=msg.reply, payload=b"test-response")

    client.subscribe(subject="test-subject", callback=callback)

    # Request a message
    print(client.request(subject="test-subject", payload=b"test-payload"))

Btw, I was waiting to see the response, but I've got an error instead:

Traceback (most recent call last):
    client.request(subject="test-subject", payload=b"test-payload")
  File ".../python3.8/site-packages/pynats/client.py", line 291, in request
    return reply_messages[sub.sid]
KeyError: 1
Received a message with subject test-subject: NATSMessage(sid=0, subject='test-subject', reply='_INBOX.jR2XP1kZIIGMIz3ldUYbYX ', payload=b'test-payload')

I'm using Ubuntu 20.04 with python 3.8.5 & nats-python==0.8.0.
Will really appreciate your support

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.