Comments (5)
Thanks for the feedback I will add an example that does exactly that. Sharing them first here below:
Example consumer receiving messages forever:
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
async def run(loop):
nc = NATS()
sc = STAN()
# Start session with NATS Streaming cluster using
# the established NATS connection.
await nc.connect(io_loop=loop)
await sc.connect("test-cluster", "client-123", nats=nc)
# Example async subscriber
async def cb(msg):
print("Received a message (seq={}): {}".format(msg.seq, msg.data))
# Subscribe to get all messages from the beginning.
await sc.subscribe("greetings", start_at='first', cb=cb)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
Example producer sending messages forever:
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
async def run(loop):
nc = NATS()
sc = STAN()
# First connect to NATS, then start session with NATS Streaming.
await nc.connect(io_loop=loop)
await sc.connect("test-cluster", "client-456", nats=nc)
# Periodically send a message
while True:
await sc.publish("greetings", b'Hello World!')
await asyncio.sleep(1, loop=loop)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
from stan.py.
@wallyqs thanks a lot for your prompt response! I've tried your Example consumer receiving messages forever but it's not working, python client is trying to subscribe and unsubscribe a lot of times (probably coz the infinite loop), we can see the six tries of subscriptions on nats server logs:
2018/06/28 23:10:35.286569 [DBG] 172.18.0.1:43394 - cid:7 - Deferring actual UNSUB(_INBOX.lqc06jDlxe63OS3rz3jN8O): 1 max, 0 received
2018/06/28 23:10:35.286581 [TRC] 172.18.0.1:43394 - cid:7 - ->> [PUB _STAN.close.FUlqxXDk0C3F29nclnZ5Kh _INBOX.lqc06jDlxe63OS3rz3jN8O 21]
2018/06/28 23:10:35.286585 [TRC] 172.18.0.1:43394 - cid:7 - ->> MSG_PAYLOAD: [python-client-andry]
2018/06/28 23:10:35.286590 [TRC] 127.0.0.1:40514 - cid:2 - <<- [MSG _STAN.close.FUlqxXDk0C3F29nclnZ5Kh 7 _INBOX.lqc06jDlxe63OS3rz3jN8O 21]
2018/06/28 23:10:35.286657 [TRC] 127.0.0.1:40514 - cid:2 - ->> [PUB _INBOX.lqc06jDlxe63OS3rz3jN8O 0]
2018/06/28 23:10:35.286664 [TRC] 127.0.0.1:40514 - cid:2 - ->> MSG_PAYLOAD: []
2018/06/28 23:10:35.286672 [DBG] 127.0.0.1:40514 - cid:2 - Auto-unsubscribe limit of 1 reached for sid '6'
2018/06/28 23:10:35.286676 [TRC] 172.18.0.1:43394 - cid:7 - <<- [MSG _INBOX.lqc06jDlxe63OS3rz3jN8O 6 0]
2018/06/28 23:10:35.286681 [TRC] 172.18.0.1:43394 - cid:7 - <-> [DELSUB 6]
2018/06/28 23:10:35.286960 [DBG] 172.18.0.1:43394 - cid:7 - Client connection closed
Any idea of how to solve this? Thanks in advance
from stan.py.
@yanpozka which versions of the asyncio-nats-client
and asyncio-nats-streaming
clients are you using? Sharing a gif below of how it works if using v0.7.0
version of the NATS client and latest asyncio-nats-streaming. Are you not receiving the messages in the consumer client?
from stan.py.
@wallyqs I'm using the latest version of server and client, I found the problem I was closing the connections at the end of run() method, it's working 👍 Thank you very much for your help!
from stan.py.
@wallyqs If the nats server were to restart while the client is running, how can the client become aware of that and reconnect?
It seems like there is already a heartbeat mechanism implement (https://github.com/nats-io/stan.py/blob/d5858b91501fc067ace49da6ea2bcde6ea1da7f5/stan/aio/client.py), but I believe I might be missing something to make use of it.
from stan.py.
Related Issues (20)
- Do NATS has asynchronous publisher in another language besides Python? HOT 3
- Inconsistent naming of subscription inbox compare to other clients
- pending_limits implications
- Support for PINGs from client to server to detect connection loss
- How to handle max_payload limit? HOT 1
- Retrieve GUID from subscription msg HOT 3
- Document connection lost handler usage and reconnection examples
- Cannot reconnect after connection is lost due to PING HOT 1
- Do you have plans and timelines for supporting sync subscriptions? HOT 4
- Canonical example for systemd
- Lost data for subscription
- any examples how to send binary data (with protobuf) ? HOT 2
- How to using with Django?
- [DBG] Skipping redelivery to subid=<> due to missed client heartbeat
- ModuleNotFoundError: No module named 'nats.aio' HOT 3
- how to get all available messages and exit ? HOT 1
- Reconnection problem after server reboot HOT 2
- Using synchronous sc.publish ?
- unittest.makeSuite() is deprecated and will be removed in Python 3.13
- Not compatible with Python 3.10 (loop parameter for async.Queue has been removed) HOT 5
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from stan.py.