Git Product home page Git Product logo

aiocometd's People

Contributors

robertmrk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

aiocometd's Issues

Service channels & websockets only

Hi @robertmrk ,

I'm the author and maintainer of another repo where I've had to hardcode my own Bayeux implementation. I'd really like to switch to using your aiocometd as it looks fantastic but after days of toiling I've discovered 2 things:

  1. There is no support for websocket-only connections.
  2. There is no support for service channels.

I'd like to help implement these if possible; point 1 is relatively easy for me as I've been digging around and modifying the code but point 2 has me stumped and I keep getting 402 :: Unknown client from the remote server when I try to 'fake' the service channel requests through publish().

Example below of what my traffic looks like:

2019-02-20 23:39:03,391 Connecting to url: wss://tasty.dxfeed.com/live/cometd
2019-02-20 23:39:03,393 Opening client with connection types ['websocket', 'long-polling'] ...
2019-02-20 23:39:04,325 [bob] sending: [{'channel': <MetaChannel.HANDSHAKE: '/meta/handshake'>, 'version': '1.0', 'supportedConnectionTypes': ['websocket', 'long-polling'], 'minimumVersion': '1.0', 'id': '0', 'ext': {'com.devexperts.auth.AuthToken': '==REDACTED=='}}]
2019-02-20 23:39:04,557 [bob] received: [{"minimumVersion":"1.0","clientId":"==REDACTED==","supportedConnectionTypes":["websocket"],"advice":{"interval":0,"timeout":30000,"reconnect":"retry"},"channel":"/meta/handshake","id":"0","version":"1.0","successful":true}]
2019-02-20 23:39:04,557 Connection types supported by the server: ['websocket']
2019-02-20 23:39:05,982 [bob] sending: [{'channel': <MetaChannel.CONNECT: '/meta/connect'>, 'clientId': '==REDACTED==', 'connectionType': 'websocket', 'id': '0'}]
2019-02-20 23:39:06,213 [bob] received: [{"channel":"/meta/connect","id":"0","successful":true}]
2019-02-20 23:39:06,213 Connect task finished with: {'channel': '/meta/connect', 'id': '0', 'successful': True}
2019-02-20 23:39:06,214 Client opened with connection_type 'websocket'
2019-02-20 23:39:06,214 Connection setup completed!
2019-02-20 23:39:06,214 [bob] sending: [{'channel': <MetaChannel.CONNECT: '/meta/connect'>, 'clientId': '==REDACTED==', 'connectionType': 'websocket', 'id': '1'}]
2019-02-20 23:39:08,525 Adding subscription: {'Quote': ['/ES']}
2019-02-20 23:39:08,525 [dxFeed] sending: {'reset': True, 'add': {'Quote': ['/ES']}}
2019-02-20 23:39:09,459 [bob] sending: [{'channel': '/service/sub', 'clientId': '==REDACTED==', 'data': {'reset': True, 'add': {'Quote': ['/ES']}}, 'id': '2'}]
2019-02-20 23:39:09,690 [bob] received: [{"channel":"/service/sub","id":"2","error":"402::Unknown client","successful":false}]

Any help and advice is greatly appreciated 😄

Strange typing issue with JsonObject

We encountered a bug today that I would have expected mypy to catch. Our code looks something like this:

from aiocometd.typing import JsonObject

def some_func(response_message: JsonObject) -> None:
    reveal_type(response_message)

Mypy output:

file.py: note: In member "some_func":
file.py:203:25: note: Revealed type is 'Any'

I can see that aiocometd defines JsonObjectto be a Dict[str, Any], so why does mypy think the response_message is of type Any?

This might be a mypy bug, but I thought I'd ask here first.

TypeError: outgoing() takes 2 positional argument but 3 were given

Hi,

I'm not too much familiar with Python or cometD but I was trying my way out and when trying to implement the AuthExtension, for the outgoing function I'm getting this type error

TypeError: outgoing() takes 2 positional argument but 3 were given

Here is the code for that class

`class MyAuthExtension(AuthExtension):

async def incoming(payload, headers=None):
    pass

async def outgoing(payload, headers):
    pass    

async def authenticate():
    return <SOME_VALUE>`

Client is not reconnecting after handshake.

Hi,

i'm using your cometd implementation with your aiofstream to connect to a salesforce streaming api. It seems that the reconnect advice "handshake" is broken.

The following happens:

  • after several successful '/meta/connect' the client is receiving a client-id unknown message and is advised to do a handshake
  • the client performs a handshake, but salesforce does not seem to return a response with a new advice ('connect/retry'), so the client is sending handshakes over and over again
  • then after 2000 handshake requests (unlimited edition) salesforce responses with a '403::Organization concurrent user limit exceeded'

I'm not sure if it is a salesforce bug not sending a new advice, or there is some issue with your client, not reconnecting after a handshake.

Here are some log entries:

2019-02-11 13:06:52,761 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '36', 'successful': True}

2019-02-11 13:08:43,359 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '37', 'successful': True}

2019-02-11 13:10:33,381 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '38', 'successful': True}

2019-02-11 13:12:23,401 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '39', 'successful': True}

2019-02-11 13:14:13,434 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '40', 'successful': True}

2019-02-11 13:15:20,519 - WARNING:aiocometd.transports.long_polling:Failed to send payload, None

2019-02-11 13:15:20,520 - DEBUG:aiocometd.transports.base:Connect task finished with: TransportError('None')

2019-02-11 13:15:24,738 - DEBUG:aiocometd.transports.base:Connect task finished with: {'advice': {'interval': 0, 'reconnect': 'handshake'}, 'channel': '/meta/connect', 'id': '43', 'error': '403::Unknown client', 'successful': False}

2019-02-11 13:15:24,770 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxxx1', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}

2019-02-11 13:15:24,791 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxxx2', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}

.
. 2000 requests later
.

2019-02-11 13:16:03,756 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxxx1999', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}

2019-02-11 13:16:03,777 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxxx2000', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}

2019-02-11 13:16:04,018 - DEBUG:aiocometd.transports.base:Connect task finished with: KeyError('interval')

2019-02-11 13:16:04,018 - WARNING:aiocometd.transports.base:No reconnect advice provided, no more operations will be scheduled.

2019-02-11 13:16:04,019 - INFO:aiocometd.client:Closing client...

2019-02-11 13:16:04,276 - INFO:aiocometd.client:Client closed.

aiosfstream.exceptions.ServerError: ('Connection closed by the server', {'ext': {'sfdc': {'failureReason': '403::Organization concurrent user limit exceeded'}, 'replay': True, 'payload.format': True}, 'advice': {'reconnect': 'none'}, 'channel': '/meta/handshake', 'id': '1', 'error': '403::Handshake denied', 'successful': False})

Regards Christoph

AuthExtension Example Request

I can't find any other discussion forum for this, but I am looking for an example of how to authenticate, because I can't seem to figure it out. I need to include an Authorization header with a "Bearer tokenvalue" and I am stumped. If anyone has an example, I would appreciate it.

BUG: does not handle own CancelledError on client.close()

  1. great project, thank you

  2. in v 0.4.5: BUG: does not handle own CancelledError on client.close():

2020-01-19 08:54:02 INFO aiocometd.client:287  close                   Closing client...
2020-01-19 08:54:02 ERRO      asyncio:199  exception_reactor       Exception in callback <bound method TransportBase._connect_done of <aiocometd.transports.websocket.WebSocketTransport object at 0x7f9e62e90fa0>>
handle: <Handle TransportBase._connect_done>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 70, in uvloop.loop.Handle._run
  File "/usr/lib/python3.8/site-packages/aiocometd/transports/base.py", line 541, in _connect_done
    result: Union[JsonObject, Exception] = future.result()
asyncio.exceptions.CancelledError
2020-01-19 08:54:02 ERRO      asyncio:199  exception_reactor       Exception in callback <bound method WebSocketTransport._receive_done of <aiocometd.transports.websocket.WebSocketTransport object at 0x7f9e62e90fa0>>
handle: <Handle WebSocketTransport._receive_done>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 70, in uvloop.loop.Handle._run
  File "/usr/lib/python3.8/site-packages/aiocometd/transports/websocket.py", line 261, in _receive_done
    result = future.result()
asyncio.exceptions.CancelledError
2020-01-19 08:54:03 INFO aiocometd.client:298  close                   Client closed.
  1. in aiocometd/transports/base.py:
    https://github.com/robertmrk/aiocometd/blob/0.4.5/aiocometd/transports/base.py#L541

  2. in aiocometd/transports/websocket.py:
    https://github.com/robertmrk/aiocometd/blob/0.4.5/aiocometd/transports/websocket.py#L261

  3. in both cases these are internal tasks, which are both created and canceled by aiocometd,
    and they both propagate own aiocometd initiated CancelledError to the event loop
    exception handler

using aiocometd to connect the CometD cluster of nginx reverse proxy error

When using aiocometd to connect the CometD cluster of nginx reverse proxy, the server returns 402,When using aiocometd to connect CometD single server of nginx reverse proxy, the server returns normal data。

/root/PycharmProjects/tomorrow/venv/bin/python /root/PycharmProjects/client/tt.py
Traceback (most recent call last):
File "/root/PycharmProjects/client/tt.py", line 58, in
loop.run_until_complete(chat())
File "/usr/local/python3.6/lib/python3.6/asyncio/base_events.py", line 473, in run_until_complete
return future.result()
File "/root/PycharmProjects/client/tt.py", line 32, in chat
await client.open()
File "/root/PycharmProjects/tomorrow/venv/lib/python3.6/site-packages/aiocometd/client.py", line 264, in open
self._verify_response(response)
File "/root/PycharmProjects/tomorrow/venv/lib/python3.6/site-packages/aiocometd/client.py", line 357, in _verify_response
self._raise_server_error(response)
File "/root/PycharmProjects/tomorrow/venv/lib/python3.6/site-packages/aiocometd/client.py", line 373, in _raise_server_error
raise ServerError(message, response)
aiocometd.exceptions.ServerError: ('Connect request failed.', {'id': '0', 'error': '402::Unknown client', 'successful': False, 'advice': {'interval': 0, 'reconnect': 'handshake'}, 'channel': '/meta/connect'})

Add check on status code to long-polling

Hi there, We're using this to connect to salesforce. One thing we've observed is that there is no exception when salesforce runs out of platformevents for the day. We feel it would make sense for some kind of exception to be thrown when this happens. (But we're certainly open to other ways to detect and manage any error codes returned from the server). Does it make sense to check the response.status in transports/long_polling.py and throw an exception if it is not a 200 after the session.post which collects the payload from salesforce? The code we are thinking of looks like:

            if response.status != 200:
                raise TransportError(f"Session post for payload failed with: {response.status}: {response.reason}")

Support for Python 3.10

Usage of the "loop" variable within asyncio was deprecated in 3.8. Now in 3.10 it appears the use of the variable now errors.

Good news seems to be simply removing the use of loop allows the code to work "as is". I'm still trying to wrap my brain around the library, however at least wanted to drop a message that support for 3.10+ may be easy to implement.

Long Polling flow

Hi @robertmrk

I'm trying using your library to receive notifications from a platform that only supports long-polling. I am rather new to Cometd but I did testing using the platform's example through Postman. It seems like the order of messages to set up that they are suggesting is /meta/handshake, /meta/subscribe, then /meta/connect, where the actual long-polling is held at the connect message until a notification is received.

However as I adapted your example to receive notifications from the platform, I can see the outgoing handshake message, followed by the incoming handshake message and finally the outgoing connect message where the long-polling is held. As the subscribe method is not
called until client is set up, I'm never able to receive any notification from the channel I'm interested in.

I am not sure if the order was defined in the protocol spec or whether there's something I've missed, but this is what I found in Salesforce's documentation (not the platform I'm referring to), where 'subscribe' happens before 'connect'.
https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/using_streaming_api_client_connection.htm

Code:

async def chat():
    # connect to the server
    async with Client(f'https://platform.com/notifications', extensions=[MyExtension]) as client:
        
        await client.subscribe("/DecodedEventListener/trigger")
        
        # listen for incoming messages
        async for message in client:
            print(message)

Output:

Outgoing messages: [{'channel': <MetaChannel.HANDSHAKE: '/meta/handshake'>, 'version': '1.0', 'supportedConnectionTypes': ['websocket', 'long-polling'], 'minimumVersion': '1.0', 'id': '0'}]

response message: {'id': '0', 'minimumVersion': '1.0', 'supportedConnectionTypes': ['long-polling', 'smartrest-long-polling'], 'successful': True, 'channel': '/meta/handshake', 'ext': {'ack': True}, 'clientId': '2918jmzu83wu9gwi7av3thi1h5s', 'version': '1.0'}

Outgoing messages: [{'channel': <MetaChannel.CONNECT: '/meta/connect'>, 'clientId': '2918jmzu83wu9gwi7av3thi1h5s', 'connectionType': 'long-polling', 'id': '1'}]

TypeError: string indices must be integers

My code:

    async def genesys():
        # connect to the server
        async with Client("https://myserver") as client:

                # subscribe to channels to receive chat messages and
                # notifications about new members
                await client.subscribe("/api/v2/interactions")

                # send initial message
                await client.publish("/api/v2/interactions", {
                    "data": ref,
                })

                # listen for incoming messages
                async for message in client:
                    #if message["channel"] == "/api/v2/interactions":
                    data = message["data"]
                    print(f"{data}: {data}")

    loop = asyncio.get_event_loop()
    loop.run_until_complete(genesys())

Getting error:

>>> CometD Response...
Traceback (most recent call last):
  File "test3.py", line 253, in <module>
    get_me()
  File "test3.py", line 247, in get_me
    get_contact_history(id)
  File "test3.py", line 233, in get_contact_history
    get_cometd(ref)
  File "test3.py", line 196, in get_cometd
    loop.run_until_complete(genesys())
  File "C:\Program Files\Python37\lib\asyncio\base_events.py", line 584, in run_until_complete
    return future.result()
  File "test3.py", line 178, in genesys
    async with Client("https://myserver") as client:
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\client.py", line 432, in __aenter__
    await self.open()
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\client.py", line 273, in open
    self._transport = await self._negotiate_transport()
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\client.py", line 209, in _negotiate_transport
    response = await transport.handshake(self._connection_types)
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 252, in handshake
    supportedConnectionTypes=connection_type_values
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 303, in _send_message
    return await self._send_payload_with_auth([message])
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 318, in _send_payload_with_auth
    response = await self._send_payload(payload)
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 345, in _send_payload
    return await self._send_final_payload(payload, headers=headers)
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\long_polling.py", line 43, in _send_final_payload
    find_response_for=payload[0]
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 451, in _consume_payload
    self._update_subscriptions(message)
  File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 394, in _update_subscriptions
    if response_message["channel"] == MetaChannel.SUBSCRIBE:
TypeError: string indices must be integers

Any idea?

No response message received for the first message in the payload

Hi, I'm trying to setup a connection to the cometd interface of a Logitech Media Server.
I don't get passed the handshake with an Exception:
No response message received for the first message in the payload

I used the Extension to intercept the messages and this is the response:

outgoing payload: [{'channel': <MetaChannel.HANDSHAKE: '/meta/handshake'>, 'version': '1.0', 'supportedConnectionTypes': ['websocket', 'long-polling'], 'minimumVersion': '1.0', 'id': '0'}]

outgoing headers: {}

incoming payload: [{'clientId': 'effe08c7', 'version': '1.0', 'supportedConnectionTypes': ['long-polling', 'streaming'], 'channel': '/meta/handshake', 'advice': {'timeout': 60000, 'reconnect': 'retry', 'interval': 0}, 'successful': True}]

incoming headers: <CIMultiDictProxy('Server': 'Logitech Media Server (7.9.2 - 1554701435)', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Vary': 'Accept-Encoding', 'Content-Encoding': 'gzip', 'Content-Length': '174', 'Content-Type': 'application/json', 'Expires': '-1', 'X-Time-To-Serve': '0.00304794311523438')>
Traceback (most recent call last):
  File "main.py", line 61, in lms_events
    async with Client("http://192.168.1.1:9006/cometd", extensions=[MyExtension()]) as client:
  File "/usr/local/lib/python3.7/site-packages/aiocometd/client.py", line 432, in __aenter__
    await self.open()
  File "/usr/local/lib/python3.7/site-packages/aiocometd/client.py", line 273, in open
    self._transport = await self._negotiate_transport()
  File "/usr/local/lib/python3.7/site-packages/aiocometd/client.py", line 209, in _negotiate_transport
    response = await transport.handshake(self._connection_types)
  File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/base.py", line 252, in handshake
    supportedConnectionTypes=connection_type_values
  File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/base.py", line 303, in _send_message
    return await self._send_payload_with_auth([message])
  File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/base.py", line 318, in _send_payload_with_auth
    response = await self._send_payload(payload)
  File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/base.py", line 345, in _send_payload
    return await self._send_final_payload(payload, headers=headers)
  File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/long_polling.py", line 50, in _send_final_payload
    raise TransportError(error_message)
aiocometd.exceptions.TransportError: No response message received for the first message in the payload

Any clue what is going on ?

Error: Attempt to decode JSON

hi guys,

I try to request HTTP Long Polling .. but, I receive that error

raise TransportError(str(error)) from error aiocometd.exceptions.TransportError: 0, message='Attempt to decode JSON with unexpected mimetype

asinco

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.