nikipore / stompest Goto Github PK
View Code? Open in Web Editor NEWThis project forked from mozes/stompest
STOMP client library for Python including both synchronous and Twisted implementations.
License: Other
This project forked from mozes/stompest
STOMP client library for Python including both synchronous and Twisted implementations.
License: Other
I'm trying to use async stompest (2.3.0) with RabbitMq (3.7.18) and almost everything works but the failover feature. I've configured client with failover uri, then restarted the broker and the client has not reconnected.
There were error messages about the connection close printed by the DisconnectListener, but I haven't seen any attempts from StompFailoverTransport to re-connect.
I put a breakpoint in StompFailoverTransport iter, it gets triggered once (on client connect) and not more.
After upgrading stompest from 2.1.6 to 2.3.0 I get 'Interrupted system call' whenever the application is waiting in canRead
and I send a signal to the application.
I think this is due to the Python 3 updates here: 75c5692#diff-715c90981f31edb3e6e79120fa0c7e91L35
These types of exceptions were previously handled by stompest but I don't think that code is triggered anymore since if code == errno.EINTR
is no longer true since code
is no longer a number.
I'm running on Python 2.7.13.
This is a drive-by bug report, I don't use this lib, but I noticed that it will be impossible to use on 3.7+ since async
becomes a real keyword then. Ref https://www.python.org/dev/peps/pep-0492/
In my case, I set ack=True (which is also the default value) when create a stompest.async.listener.SubscriptionListener instance. And the onMessage function will call connection.ack(frame) after message handler finishes(stompest.async.listener.py line 220). But thread-unsafe issue comes up if I set the twisted reactor to contains more threads, like reactor.suggestThreadPoolSize(size=4) .
The reason is yield connection.ack(frame) can cause the ack action to be executed in another thread different from main thread. If you trace the ack action, this will ultimately twisted.internet.abstract.FileDescriptor.write(self, data). This function is thread-unsafe because some data is corrupted(twisted.internet.abstract.py line 357).
The result is, some of the acks are missing in fact although there is log of "Sending ACK frame...".
Now, I set ack=False and do ack in main thread to solve this problem
stompest throws this error. However, according to the spec the session header is optional, both in versions 1.1 and 1.2.
~$ ./consumer.py
Traceback (most recent call last):
File "./consumer.py", line 11, in <module>
client.connect()
File "/usr/local/lib/python2.7/dist-packages/stompest/sync/client.py", line 100, in connect
self._connect(headers, versions, host, heartBeats, connectedTimeout)
File "/usr/local/lib/python2.7/dist-packages/stompest/sync/client.py", line 113, in _connect
self.session.connected(frame)
File "/usr/local/lib/python2.7/dist-packages/stompest/protocol/session.py", line 213, in connected
(self.version, self._server, self._id, (self._serverHeartBeat, self._clientHeartBeat)) = commands.connected(frame, versions=self._versions)
File "/usr/local/lib/python2.7/dist-packages/stompest/protocol/commands.py", line 241, in connected
raise StompProtocolError('Invalid %s frame (%s header is missing) [headers=%s]' % (StompSpec.CONNECTED, StompSpec.SESSION_HEADER, headers))
stompest.error.StompProtocolError: Invalid CONNECTED frame (session header is missing) [headers={'version': '1.2'}]
I'm attempting to authenticate to STOMP on ActiveMQ that requires SSL clients to present a x509 keypair in order to connect.
For the stompest sync client, it is really simple, I just have to provide the public cert and key to my ssl context with load_cert_chain()
:
context = ssl.create_default_context()
context.load_cert_chain(certfile="kdreyer.pem", keyfile='kdreyer.key')
...
CONFIG = StompConfig(BROKER, sslContext=context)
... and then I can receive messages in my queue, etc.
Unfortunately this does not work for the stompest async client. Here's the error I'm getting
INFO:stompest.async.protocol:Connecting to server.example.com:61612 ...
DEBUG:stompest.async.protocol:Sending CONNECT frame [version=1.0]
Unhandled error in Deferred:
INFO:stompest.async.listener:Disconnected: [('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')]
ERROR:stompest.async.listener:Disconnect because of failure: Unexpected connection loss [[('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')]]
DEBUG:stompest.async.listener:Calling disconnected errback: Unexpected connection loss [[('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')]]
I've been looking over Twisted's docs for Client cert auth, but I'm a bit lost as to where I would set those options in stompest.async. Somewhere in util.py
?
see http://stomp.github.com/stomp-specification-1.2.html#Changes_in_the_Protocol
Hi,
Sorry for asking here, but google group has no members...
I'm trying to convert the following java example to python using stompest:
StompConnection connection = new StompConnection();
connection.open("datafeeds.networkrail.co.uk", 61618);
connection.connect("system", "manager");
StompFrame connect = connection.receive();
if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
throw new Exception ("Not connected");
}
connection.subscribe("/topic/TRAIN_MVT_ALL_TOC", Subscribe.AckModeValues.CLIENT);
connection.begin("tx2");
StompFrame message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
connection.commit("tx2");
connection.disconnect();
How do I pass a username and password when connecting using stompest?
cheers,
Chris
http://activemq.apache.org/exclusive-consumer.html
java code is
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
I have try to config uri
failover:(tcp://xxxx:61613,tcp://xxx)?exclusive=true,randomize=false,maxReconnectDelay=30000
there is a errror:
[invalid options: 'exclusize']
Exception AttributeError: "'ActiveMQConsumer' object has no attribute 'client'" in <bound method
I would like to know how many times , the message has redelivered, so that if it crosses more than 2 or 3 times, I would like to update DB status something like that.
Factor a Listener API out of the async client which has callbacks for all events like onMessage(), onError(), onConnected(), ... One could add and remove per queue any number of Listener instances (observer pattern). The client would then more clearly represent the connection with its associated StompSession. This is a generalization of the current handlers which just deal wih onMessage() and cannot be removed.
Right now for STOMP's host header to work at all the host= must be manually set when calling connect. If you don't then stompest will always leave it empty.
Instead of leaving it empty when unset the hostname of the fallback server being connected to should be used.
ie: When you use tcp://localhost:61613
in config and leave host as None then host: localhost
should be used.
ie: When you use fallback:(tcp://a.stomp.local:61613,tcp://b.stomp.local:61613)
, leave host as None, and stompest connected to the second server then host: b.stomp.local
should be used.
I am facing a issue with ActiveMQ. The consumer is not receiving messages, even though it's connected to ActiveMQ queue. This starts happening when queue is idle for some time(~ 3 hours).
I am using synchronous client.
Stompest client can try write to temporary topic, but the client listening that topic might already be disconnected. AMQ will response with an error. stompest.async.client will call onError handlers in method _onError, one of which disconnects the client.
Right now it doesn't look like stompest properly supports version negotiation.
StompConfig
only supports a single version. And trying to use the versions parameter to connect on the sync client results in: StompProtocolError: Invalid versions: ['1.1'] [version=1.0]
Also ideally when version(s) is left as None we should not default to 1.0. Instead in that case we should include every version natively supported by stompest into accepts-version. And then use the version specified by the server in the CONNECTED
header. So we can properly use the most recent version supported by both the server and stompest.
collections.MutableMapping
is deprecated in Python 3.3:
DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.10 it will stop working
In Python 3.10, it's not usable at all, so stompest does not work with py310 (reported at https://bugzilla.redhat.com/1926350)
It would be great to merge #54 and then edit the parent class of InFlightOperations
to be collections.abc.MutableMapping
.
Using Stomp version 1.1 the subscribe function does not work. In Stomp 1.1 you require a id field in the header which the function does not do and thus recieves an error from the server saying so. My current workaround for this issue is as follows bellow. The code is shown set up for connecting to a simsig session when the interface gateway is enabled and output all messages that arrive in a format simular to National Rail data feeds. Copy of the protocol specification is here
from stompest.config import StompConfig
from stompest.protocol import StompSpec, frame
from stompest.sync import Stomp
CONFIG = StompConfig('tcp://localhost:51515', version=StompSpec.VERSION_1_1)
QUEUE = '/topic/TD_ALL_SIG_AREA'
if __name__ == '__main__':
client = Stomp(CONFIG)
client.connect()
# client.subscribe(QUEUE, {StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL})
a = frame.StompFrame(command='SUBSCRIBE', rawHeaders=[('id', '0'), ('destination', '/topic/TD_ALL_SIG_AREA'),('ack', 'auto')])
client.sendFrame(a)
while True:
frame = client.receiveFrame()
print(dict(frame))
client.disconnect()
if self.client.canRead(0):
self.frame = self.client.receiveFrame()
self.client.ack(self.frame)
I use this code for activemq, then I found the consumer used stompest is slow to consumer the msg, the qps is 1/s ,every seconds only consume one msg.
why ? It is the python problem?
Python 3 support would really be awesome, but there are some caveats:
Hi,
I want to dynamically increase the number of consumers as the number of messages increases in the Queues, what approach should be considering in achieving the same, any help would be appreciated.
I am using activemq, with stompest.async
Thanks.
Sample Code from stompest used :
class Consumer(object):
QUEUE = '/queue/testIn'
ERROR_QUEUE = '/queue/testConsumerError'
def __init__(self, config=None):
if config is None:
config = StompConfig('tcp://localhost:61613')
self.config = config
@defer.inlineCallbacks
def run(self):
client = Stomp(self.config)
yield client.connect()
headers = {
# client-individual mode is necessary for concurrent processing
# (requires ActiveMQ >= 5.2)
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
# the maximal number of messages the broker will let you work on at the same time
'activemq.prefetchSize': '100',
}
client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE))
def consume(self, client, frame):
"""
NOTE: you can return a Deferred here
"""
data = json.loads(frame.body.decode())
print('Received frame with count %d' % data['count'])
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Consumer().run()
reactor.run()
Hi,
When I enable version=1.2
to use heart-beat, the stompest client successfully to connect on ActiveMQ, but it seems to never send frame to connect on Topic. On ActiveMQ UI, I not look the consumer.
The same code work fine when I remove heartBeats option and put version=1.0.
Any idea ?
Here my relevant code:
# coding: utf8
from __future__ import unicode_literals
import json
import logging
from twisted.internet import reactor, defer
from stompest.async import Stomp
from stompest.async.listener import SubscriptionListener
from stompest.config import StompConfig
from stompest.protocol import StompSpec
from fr.sihm.grabInventoryToSupervision.service.InventoryServerListener import InventoryServerListener
import logging
import sys, traceback, time
logger = logging
class ConsumerServer(object):
ERROR_QUEUE = '/queue/testConsumerError'
def connect(self, IP, port=61613, login=None, password=None):
if IP is None or IP == "":
raise ("IP can't be null and can't be empty")
if login is not None and password is not None:
self._config = StompConfig("tcp://%s:%s" % (IP, str(port)), login=login,
passcode=password, version="1.2")
else:
self._config = StompConfig("tcp://%s:%s" % (IP, str(port)), version="1.2")
@defer.inlineCallbacks
def run(self, destinations):
if isinstance(destinations, dict) is False:
raise ("destination can't be null and can't be empty")
if "serverInventory" not in destinations:
raise ("You must set the destination to consume db inventory AMQP")
headers = {
# client-individual mode is necessary for concurrent processing
# (requires ActiveMQ >= 5.2)
StompSpec.ACK_HEADER:
StompSpec.ACK_CLIENT_INDIVIDUAL,
# the maximal number of messages the broker will let you work on at the same time
'activemq.prefetchSize':
'2000'
}
try:
client = yield Stomp(self._config).connect(headers=headers, heartBeats=(10000, 10000))
client.disconnected.addCallbacks(
lambda _: client.disconnected, lambda _: self.reconnect(
destinations))
client.subscribe(
destinations["serverInventory"],
headers,
listener=SubscriptionListener(
InventoryServerListener.consume,
onMessageFailed=self.manageError))
except Exception, e:
logger.warn(
"Can't connect to ActiveMQ. Retry connexion in 60 seconds. Error: %s",
str(e))
logger.warn(str(traceback.format_exc()))
time.sleep(60)
self.run(destinations)
def reconnect(self, destinations):
logger.warn(
"The connexion with ActivMQ is lost. Retry connexion in 60 seconds"
)
time.sleep(60)
self.run(destinations)
def manageError(self, connection, failure, frame, errorDestination):
logging.error("Error : " + str(failure))
The __str__ method must return a string in Python 3 but for StompFrame and StompHeartBeat it returns bytes. For example:
from stompest.protocol import StompFrame, StompSpec
frame = StompFrame(StompSpec.SEND, rawHeaders=[('foo', 'bar1'), ('foo', 'bar2')])
bytes(frame)
b'SEND\nfoo:bar1\nfoo:bar2\n\n\x00'
str(frame)
Traceback (most recent call last):
File "", line 1, in
TypeError: str returned non-string (type bytes)
Maybe __str__ should just call info()?
Some integration tests do not adapt to all brokers (ActiveMQ, RabbitMQ, Apollo), and some which used to work in former versions of those brokers are broken with the current versions.
Its possible that ActiveMQ Artemis will eventually take over and become Active MQ 6. Sync messaging works well but when I try to do the async examples it doesn't seem to work properly on Artemis.
I also tried changing to the naming used on Artemis for JMS mapping, ie: /queue/testIn to jms.queue.testIn
I've tried it on Artemis 1.5.3 and 2.1.0. Here is a snippet of the transfer.py executing.
ActiveMQ Example
DEBUG:stompest.async.listener:Handler for message ID:host-63961-1495574070738-3:1:-1:1:9 complete. DEBUG:stompest.async.protocol:Received MESSAGE frame [headers={u'priority': u'4', u'timestamp': u'1495574098692', u'destination': u'/queue/testIn', u'message-id': u'ID:host-63961-1495574070738-3:1:-1:1:10', u'expires': u'0'}, body='{"count": 9}', version=1.0]
Artemis Example:
DEBUG:stompest.async.protocol:Received MESSAGE frame [headers={u'expires': u'0', u'timestamp': u'1495575970275', u'receipt': u'message-9', u'persistent': u'false', u'priority': u'4', u'destination': u'/queue/testIn', u'redelivered': u'false', u'message-id': u'2147483999', u'subscription': u'subscription//queue/testIn'}, body='{"count": 9}', version=1.0] ERROR:stompest.async.client:Ignoring message (no handler found): 2147483999 [MESSAGE frame [headers={u'expires': u'0', u'timestamp': u'1495575970275', u'receipt': u'message-9', u'persistent': u'false', u'priority': u'4', u'destination': u'/queue/testIn', u'redelivered': u'false', u'message-id': u'2147483999', u'subscription': u'subscription//queue/testIn'}, body='{"count": 9}', version=1.0]]
According to the STOMP 1.2 protocol, a client SHOULD keep the first header entry and MAY ignore repeated header entries.
Currently, the stompest API uses a dictionary to store headers, so the internal structure of StompFrame.headers would have to be changed, ideally without breaking too much of existing client code (although breaking changes are allowed, that's why stompest 2 is still alpha). @theduderog suggested to implement a structure similar to werkzeug's MultiDict.
I am fine without since it is a MAY feature which I don't like too much because in my opinion it mainly complicates things, except for the very appealing usage as routing envelopes à la ZeroMQ, but if anybody insists on a full implementation with repeated headers, I'm willing to assist.
Producer in example make disconnect before recieve RECIEPT from server:
avsytar@Watcher:~/проекты/Print_System$ python producer.py
INFO:stompest.async.protocol:Connecting to 192.168.1.214:61613 ...
DEBUG:stompest.async.protocol:Sending CONNECT frame [headers={u'passcode': 'guest', u'login': 'guest', u'host': '', u'accept-version': '1.0,1.1,1.2'}, version=1.0]
DEBUG:stompest.async.protocol:Received CONNECTED frame [headers={u'session': u'ID:UVM-PG-PROD-TEST-47225-1362589067666-9:2783', u'heart-beat': u'0,5000', u'version': u'1.2', u'server': u'ActiveMQ/5.8.0'}, version=1.0]
INFO:stompest.async.client:Connected to stomp broker [session=ID:UVM-PG-PROD-TEST-47225-1362589067666-9:2783, version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-0'}, body='{"count": 0}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-1'}, body='{"count": 1}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-2'}, body='{"count": 2}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-3'}, body='{"count": 3}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-4'}, body='{"count": 4}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-5'}, body='{"count": 5}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-6'}, body='{"count": 6}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-7'}, body='{"count": 7}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-8'}, body='{"count": 8}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-9'}, body='{"count": 9}', version=1.2]
INFO:stompest.async.listener:Disconnecting ...
DEBUG:stompest.async.protocol:Sending DISCONNECT frame [version=1.2]
avsytar@Watcher:~/проекты/Print_System$
but if do not make disconnect:
avsytar@Watcher:~/проекты/Print_System$ python producer.py
INFO:stompest.async.protocol:Connecting to 192.168.1.214:61613 ...
DEBUG:stompest.async.protocol:Sending CONNECT frame [headers={u'passcode': 'guest', u'login': 'guest', u'host': '', u'accept-version': '1.0,1.1,1.2'}, version=1.0]
DEBUG:stompest.async.protocol:Received CONNECTED frame [headers={u'session': u'ID:UVM-PG-PROD-TEST-47225-1362589067666-9:2787', u'heart-beat': u'0,5000', u'version': u'1.2', u'server': u'ActiveMQ/5.8.0'}, version=1.0]
INFO:stompest.async.client:Connected to stomp broker [session=ID:UVM-PG-PROD-TEST-47225-1362589067666-9:2787, version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-0'}, body='{"count": 0}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-1'}, body='{"count": 1}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-2'}, body='{"count": 2}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-3'}, body='{"count": 3}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-4'}, body='{"count": 4}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-5'}, body='{"count": 5}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-6'}, body='{"count": 6}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-7'}, body='{"count": 7}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-8'}, body='{"count": 8}', version=1.2]
DEBUG:stompest.async.protocol:Sending SEND frame [headers={u'destination': '/queue/testIn', u'receipt': 'message-9'}, body='{"count": 9}', version=1.2]
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-0'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-1'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-2'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-3'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-4'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-5'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-6'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-7'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-8'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Received RECEIPT frame [headers={u'receipt-id': u'message-9'}, version=1.2]
DEBUG:stompest.async.protocol:Received heart-beat
DEBUG:stompest.async.protocol:Sending heart-beat
^C
This would allow to use Thrift (RPC and cross-language serialization) via STOMP.
Currently the synchronous version of stompest do not accept a timeout paramter to the method receiveFrame()
. While one could use the method canRead()
in a queue with a single consumer, this will still cause the cause execution to be indefinitely blocked, should another consumer fetch a frame in between the calls to canRead
and receiveFrame
. Notably, both the methods connect()
and canRead()
accepts a timeout
parameter, while neither are extremely blocking in nature.
Does not work with v6 addresses
File "/usr/lib/python2.6/site-packages/stompest-2.1.6-py2.6.egg/stompest/protocol/failover.py", line 189, in _parse
raise ValueError('invalid uri: %s [%s]' % (self.uri, msg))
exceptions.ValueError: invalid uri: tcp://xxx:xxx:c081:20::70:65001 [invalid broker(s): 'NoneType' object has no attribute 'groupdict']
The heartbeat negotiation logic is not STOMP compliant. It just takes the server's proposals which is fine for ActiveMQ but not for ApolloMQ.
http://stomp.github.io/stomp-specification-1.1.html#Heart-beating
Even though, I subscribe to a topic using a header such as ```python
{ StompSpec.SELECTOR_HEADER: "" }
all messages are received which means a filtering does not occur. I tested with RabbitMQ 3.0.4
Someone asked you about it here, and you said:
you would never want to send two CONNECT frames over the same wire-level connection, which is why there is an @exclusive wrapper around the connect() method. If you want to run more than one STOMP session, then you'll have to instantiate a dedicated async.Stomp connection for each STOMP session
But the @exclusive wrapper doesn't pay any attention to what connection it's being called on. It prevents all concurrent calls to the connect method, in all objects.
Then STOMP server stopped consumer raise exception, but don't reconnecting:
013-02-03 23:12:34+0400 [StompProtocol,client] No handlers could be found for logger "stompest.async.client"
2013-02-03 23:12:34+0400 [StompProtocol,client] Unhandled error in Deferred:
2013-02-03 23:12:34+0400 [StompProtocol,client] Unhandled Error
Traceback (most recent call last):
Failure: stompest.error.StompConnectionError: Unexpected connection loss [Connection was closed cleanly.]
2013-02-03 23:12:34+0400 [StompProtocol,client] Stopping factory <stompest.async.protocol.StompFactory instance at 0x1814908>
It would be nice to have explicit gevent support. Right now stompest will work fine in gevent if you monkey patch python's socket and select. But it would be nice to have stompest work without using monkey patching.
Here's a demo script showing stompest with gevent:
https://gist.github.com/dantman/5290438
Running that script right now will work due to the monkey patch lines. But see what happens when you comment out the two monkey.patch_*()
lines.
Ideally stompest would either have explicit gevent support in the sync client or there would be a stompest.gevent that could be imported instead of stompest.sync.
Just in case anyone wants to emulate a browser, this uses websocket-client
to implement the transport for stomper
. It's a bit hacky, but it works. You provide a message receive handler, and messages may be sent normally with the client.
usage:
from wsstomp import WSStompClient
def receive_message(command, data, headers):
"""Handle a STOMP message. data is parsed JSON"""
print(f"Message received")
ws_client = WSStompClient("<ws-url>", on_receive=receive_message, host="<ws-host>", origin="<origin>", headers={
"Sec-WebSocket-Protocol": "v12.stomp"
}, debug=verbose)
ws_client.connect(versions=[StompSpec.VERSION_1_2], heartBeats=(0, 0), host="<hostname>")
token = ws_client.subscribe("/user/messages")
wsstomp.py:
import json
import logging
import time
from urllib import parse
from threading import Thread
import websocket
from stompest.config import StompConfig
from stompest.protocol import StompParser, StompSpec
from stompest.sync import client
from websocket import ABNF
class WSTransport(object):
"""Stomp client handles connect only"""
def __init__(self, url, host=None, origin=None, headers=None, debug=False,
on_frame_received=None, **kwargs):
if debug:
websocket.enableTrace(True)
else:
logging.getLogger("websocket").setLevel(logging.ERROR)
self.ws = websocket.WebSocketApp(url, on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
on_data=self._on_data,
header=[f"{k}: {v}" for k, v in (headers or {}).items()],
**kwargs)
self.url = url
self.debug = debug
self.host = host or parse.urlparse(url).hostname
self.origin = origin
self.opened = False
self.connected = False
self._parser = StompParser()
self.on_frame_received = on_frame_received
def _connect(self, timeout=None):
thread = Thread(target=lambda: self.ws.run_forever(host=self.host, origin=self.origin), daemon=True)
thread.start()
start = time.time()
while not self.opened:
time.sleep(0.25)
if timeout and (time.time() - start) * 1000 >= timeout:
raise TimeoutError(f"Connection to {self.url} timed out")
def _on_open(self, ws):
self.opened = True
def _on_close(self, ws, status_code, message):
self.connected = False
def _on_error(self, ws, error):
print(f"Error: {error}")
def _on_message(self, ws, data):
self._parser.add(data)
if not self.connected:
self.connected = True
elif self.on_frame_received:
frame = self._parser.get()
self.on_frame_received(frame)
def _on_data(self, ws, data, data_type, cont_flag):
pass
def canRead(self, timeout=None):
return True
def connect(self, timeout=None):
self._connect(timeout)
def disconnect(self):
self.ws.on_close = None
self.ws.close()
self.connected = False
def receive(self):
# Used on connect only
return self._parser.get()
def send(self, stomp_frame):
# Incoming: StompFrame => outgoing ABNF
ws_frame = ABNF.create_frame(bytes(stomp_frame), ABNF.OPCODE_BINARY)
self.ws.sock.send_frame(ws_frame)
def setVersion(self, version):
self._parser.version = version
class WSStompClient(client.Stomp):
def __init__(self, url, url_params=None, config=None, debug=False, on_receive=None, **kwargs):
pstr = "&".join([f"{parse.quote(key)}={parse.quote(str(value))}" for key, value in (url_params or {}).items()])
url = f"{url.replace('https', 'wss')}?{pstr}"
if debug:
print(f"url {url}")
parsed = parse.urlparse(url)
# Provide a dummy config; websocket-client does the actual connection
super().__init__(config or StompConfig(f"tcp://{parsed.hostname}:443", version=StompSpec.VERSION_1_2))
self.debug = debug
def handle_frame(frame):
if frame.command == StompSpec.MESSAGE:
self.ack(frame)
if on_receive:
on_receive(self, frame.command, json.loads(frame.body.decode() or "{}"), frame.headers)
factory = lambda host, port, sslContext: WSTransport(url, debug=debug,
on_frame_received=handle_frame, **kwargs)
self._transportFactory = factory
I tried to run the consumer example ,but always got this error
ERROR:stompest.async.client:Ignoring message (no handler found)
How to fix this?
The SubscriptionListener class of the listeners API is broken as follows: The onSubscribe() method reacts to all new subscriptions. It should only react to its first invokation. The fix looks as follows:
class SubscriptionListener(listener.SubscriptionListener):
def onSubscribe(self, connection, frame, context):
if self._headers is not None:
return
return super(SubscriptionListener, self).onSubscribe(connection, frame, context)
Currently, async.Stomp._onFrame
and async.Stomp._onMessage
call self._notify
to dispatch the callbacks for the message. self._notify
wraps the notification in a task
and dispatches it asynchronously from the creation of the StompFrame
in StompProtocol.dataReceived
. If messages arrive fast enough, stompest can end up receiving them and creating StompFrames for them faster than the event loop dispatches the notifications. This can cause the Python process' memory use to very quick balloon as things back up more and more. The gc starts kicking in very aggressively, slowing everything down even further.
I've experimented with synchronous notifications in _onFrame
and _onMessage
, and that makes the pipeline keep up much better, with flat memory use and much less message backup.
I have seen where there is support for the Twisted framework. Is there plans to move to asyncio in Python 3.x?
If the server is down unexpectedly stompest will throw an error "Unexpected connection loss" so is there a way to make stompest try to connect the server periodically after connection loss?
The URI scheme supports only TCP, no SSL (the author doesn't need it because the client is run in "safe" production environments). For the async client, it should be straightforward to support SSL by means of the Endpoint API. Contributions are welcome!
so that keep consistent with what described in ActiveMQ server documentation:
http://activemq.apache.org/failover-transport-reference.html
I tried to use failover:(xxx)?timeout=xxx
, but the stompest
does not support this. Then I read the source code and found that some transport options are commented out in src/core/stompest/protocol/failover.py
_SUPPORTED_OPTIONS = {
'initialReconnectDelay': _configurationOption(int, 10)
, 'maxReconnectDelay': _configurationOption(int, 30000)
, 'useExponentialBackOff': _configurationOption(_bool, True)
, 'backOffMultiplier': _configurationOption(float, 2.0)
, 'maxReconnectAttempts': _configurationOption(int, -1)
, 'startupMaxReconnectAttempts': _configurationOption(int, 0)
, 'reconnectDelayJitter': _configurationOption(int, 0)
, 'randomize': _configurationOption(_bool, True)
, 'priorityBackup': _configurationOption(_bool, False)
#, 'backup': _configurationOption(_bool, False), # initialize and hold a second transport connection - to enable fast failover
#, 'timeout': _configurationOption(int, -1), # enables timeout on send operations (in miliseconds) without interruption of reconnection process
#, 'trackMessages': _configurationOption(_bool, False), # keep a cache of in-flight messages that will flushed to a broker on reconnect
#, 'maxCacheSize': _configurationOption(int, 131072), # size in bytes for the cache, if trackMessages is enabled
#, 'updateURIsSupported': _configurationOption(_bool, True), # determines whether the client should accept updates to its list of known URIs from the connected broker
}
The param timeout
just be commented out.
I did not find the reasons about this.
Could someone help me explain it ?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.