Comments (9)
Hi @cmetz
Thanks for the detailed error report!
From a quick glance at the log messages I see that everything worked fine until 2019-02-11 13:15:20,519
, when a network issue occured and Salesforce decided to terminate the server side session and requested a new handshake from the client. All of this is normal, and it's part of the connection recovery strategy used by the cometd server.
However, according to the specs a successful handshake response must contain an advice
field, and it's missing from the response returned by Salesforce. So the handshake
advice remained the most recent advice, and because the client has to follow the advices given by the server, it sent handshake requests until the client limit was reached.
At the moment, from what I see the server implementation at Salesforce doesn't fully conforms to the specs. Unfortunately I have no idea how to report a technical issues like this for them, or if they would respond to it at all, so I will adapt the library to this issue instead. I think it shouldn't take more than a few days if I can reliably reproduce the issue.
from aiocometd.
Hi @robertmrk,
following the specs https://docs.cometd.org/current/reference/#_bayeux_meta_handshake
A successful handshake response MAY contain the message fields: ... "advice".
so it seems to be optional sending an advice.
For debugging purpose i could force the behavior this way:
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>") as client:
client._transport._client_id = "forcehandshake"
...
from aiocometd.
Hi @GULP-CM
Oh, you're right! I missed the line which separates the MUST/MAY message fields...
It looks like I'll have to rethink this part of the client's implementation.
Thanks for your help!
from aiocometd.
Hi @cmetz
I released a new version of aiocometd which contains the fix for the reconnection issue.
Can you confirm that your code works as expected with the new version?
from aiocometd.
Hi @robertmrk,
it's also me GULP-CM , i'll test it and give you my results. From what i have seen in your code changes it looks fine. Thanks for you effort and your wonderful client.
from aiocometd.
It works 👍 , but there is one minor issue with the replay_fallback=ReplayOption.ALL_EVENTS option. if the client never got a new replay and a new handshake is done the subscriptions are failing, maybe that also happens if the channel was idle for more then 24 hours, but i think that is a more likely a problem in the aiofstream replay extension. i can open an issue there, if you want.
Update:
Ok i did some investigation on your code and i do not see any easy solution for this Problem. The initially subscriptions have an exception handler to capture this failure and then retry it once with a fallback replay id, but if the ComendClient subscribes by itself it does not handle this kind of fallback stuff. You might want to set the fallback replay id (-2 / -1) as new replay_id in the ReplayStore, but this will not prevent failures, if the replay id got outdated during a handshake/reconnect. Maybe there is a way to provide the parent subscribe function/behavior to the _transport subscribe function and use this to resubscribe the channels.
2019-02-12 21:45:34,038 - INFO:aiocometd.client:Connection types supported by the server: ['long-polling']
2019-02-12 21:45:34,055 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxx1', 'advice': {'interval': 0, 'timeout': 110000, 'reconnect': 'retry'}, 'channel': '/meta/connect', 'id': '1', 'successful': True}
2019-02-12 21:45:34,055 - INFO:aiocometd.client:Client opened with connection_type 'long-polling'
2019-02-12 21:45:37,292 - WARNING:aiosfstream.client:Subscription failed with message: 'The replayId {19} you provided was invalid. Please provide a valid ID, -2 to replay all events, or -1 to replay only new events.', retrying subscription with <ReplayOption.ALL_EVENTS: -2>.
2019-02-12 21:45:37,334 - INFO:aiocometd.client:Subscribed to channel /topic/Account
2019-02-12 21:47:27,277 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxx1', 'channel': '/meta/connect', 'id': '3', 'successful': True}
2019-02-12 21:47:27,535 - DEBUG:aiocometd.transports.base:Connect task finished with: {'advice': {'interval': 0, 'reconnect': 'handshake'}, 'channel': '/meta/connect', 'id': '10', 'error': '403::Unknown client', 'successful': False}
2019-02-12 21:47:27,555 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxx2', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}
2019-02-12 21:47:27,622 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': '4ao14gvtz7jubk4513j879diyhob4', 'advice': {'interval': 0, 'timeout': 110000, 'reconnect': 'retry'}, 'channel': '/meta/connect', 'id': '1', 'successful': True}
2019-02-12 21:47:27,626 - INFO:aiocometd.client:Closing client...
2019-02-12 21:47:27,628 - DEBUG:aiocometd.transports.base:Connect task finished with: CancelledError()
2019-02-12 21:47:27,972 - INFO:aiocometd.client:Client closed.
aiocometd.exceptions.ServerError: ('Subscribe request failed.', {'clientId': 'xxx2', 'channel': '/meta/subscribe', 'id': '3', 'subscription': '/topic/Account', 'error': '400::The replayId {19} you provided was invalid. Please provide a valid ID, -2 to replay all events, or -1 to replay only new events.', 'successful': False})
The above exception was the direct cause of the following exception:
.....
from aiocometd.
Yes, the replay_fallback
is only used for the initial subscription at the moment. Which is still pretty useful but not a complete solution that covers all possible cases.
The problem is that replay
is just an extension, it's not a core part of the protocol, that's why it's implemented in aiosfstream along with the fallback feature, but the automatic resubscription to all channels after a new handshake should be handled in aiocometd...
I would still like to keep the core part of the protocol separated from the replay extensions, and I think I have pretty good idea how to achieve this in aiosfstream.
Until this change is implemented, the next best thing would be to catch ServerError
exceptions and restart the client to achieve a more robust behavior
while True:
try:
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
replay=replay_storage,
replay_fallback=ReplayOption.ALL_EVENTS) as client:
await client.subscribe("/topic/foo")
async for message in client:
# process message
except ServerError as error:
LOGGER.debug("Handling server side error by restarting the client")
from aiocometd.
yeah you are right, i'm using my own storage_provider which keeps the replay ids tracked in AWS - S3 so maybe i can handle this stuff by observing the stored replay timestamps anyway. so in my case i'll try to catch this specific error and restart the client, otherwise let it fail.
so i'll think we can close this now.
Thanks for your support.
from aiocometd.
I agree, checking the timestamp of replay ids before they're returned is a good solution as well.
Thanks for the error report!
from aiocometd.
Related Issues (15)
- Error: Attempt to decode JSON
- TypeError: outgoing() takes 2 positional argument but 3 were given HOT 1
- Strange typing issue with JsonObject HOT 1
- Support for Python 3.10 HOT 4
- Is there a hook for reconnection?
- AuthExtension Example Request
- TypeError: Semaphore.__init__() got an unexpected keyword argument 'loop' in transport/long_polling.py file HOT 2
- Service channels & websockets only HOT 13
- using aiocometd to connect the CometD cluster of nginx reverse proxy error HOT 11
- No response message received for the first message in the payload HOT 5
- Add check on status code to long-polling HOT 7
- Long Polling flow
- BUG: does not handle own CancelledError on client.close() HOT 3
- TypeError: string indices must be integers
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiocometd.