Comments (17)
I've started on an implementation for this, it appears to work as expected in my test application. I'm happy to open a PR with tests, documentation, etc. after getting some feedback here.
How does this look so far?
class WebSocketEndpoint:
def __init__(self, scope: Scope):
self.scope = scope
async def __call__(self, receive: Receive, send: Send):
websocket = WebSocket(self.scope, receive=receive, send=send)
kwargs = self.scope.get("kwargs", {})
await self.on_connect(websocket, **kwargs)
try:
while True:
message = await websocket.receive()
websocket._raise_on_disconnect(message)
if message["type"] == "websocket.receive":
await self.on_receive(websocket, message)
elif message["type"] == "websocket.disconnect":
await self.on_disconnect(websocket)
except WebSocketDisconnect as exc:
await self.on_disconnect(websocket)
async def on_connect(self, websocket, **kwargs):
"""Override to handle an incoming websocket connection"""
await websocket.accept()
async def on_receive(self, websocket, message):
"""Override to handle an incoming websocket message"""
async def on_disconnect(self, websocket):
"""Override to handle a disconnecting websocket"""
await websocket.close(code=1000)
from starlette.
Yeah, looks about right.
It's not 100% clear if want to send websocket.close()
by default in the on_disconnect
by default, since the disconnect might occur because we sent a close.
Also the WebSocketDisconnect
isn't necessarily what we want there - really we ought to be getting a websocket.disconnect
message, and break
'ing out of the while True
.
Plus I think we should additionally store the websocket
and the kwargs
on the instance so that they're available everywhere if needed.
All of those are things that we could deal with in review tho.
from starlette.
This is a good step.
I'm concerned this class/endpoint enforces a protocol pattern though: always receiving.
I'd like to see a class much like this, but does not assume any kind of protocol patterns.
So in this base class call would finish the upgrade and accept, but that's it.
Then there could be sub classes for common patterns like WebSocketReceiveSendEndpoint/WebSocketPublisherEndpoint,Etc...
Which could be included in Starlette or a separate package and of course enable easy extension by users.
from starlette.
@jeffbuttars I could update the current PR with a base class and the current implementation could exist as a subclass, but I'm unsure what the base behavior might look like because the implementation I've put forward is the only pattern of this sort I'm familiar enough with to define.
Would you share an example of what you're thinking?
from starlette.
Personally I think the event-driven style is what we want by default.
A sensible pattern for running some sub-task off of this would be to start in on accept, and cancel it on disconnect.
from starlette.
I was thinking something like this (I'm not proposing this exact code)
class WebSocketBaseEndpoint:
def __init__(self, scope: Scope):
self.scope = scope
async def __call__(self, receive: Receive, send: Send):
self.websocket = WebSocket(self.scope, receive=receive, send=send)
kwargs = self.scope.get("kwargs", {})
await self.on_connect(self.websocket, **kwargs)
async def on_connect(self, websocket, **kwargs):
"""Override to handle an incoming websocket connection"""
await websocket.accept()
async def on_receive(self, websocket, message):
"""Override to handle an incoming websocket message"""
async def on_disconnect(self, websocket):
"""Override to handle a disconnecting websocket"""
await websocket.close(code=1000)
class WebSocketEndpoint(WebSocketBaseEndpoint):
# A receive driven endpoint by default
async def __call__(self, receive: Receive, send: Send):
super().__call__(receive, send)
try:
while True:
message = await self.websocket.receive()
websocket._raise_on_disconnect(message)
if message["type"] == "websocket.receive":
await self.on_receive(websocket, message)
elif message["type"] == "websocket.disconnect":
await self.on_disconnect(websocket)
except WebSocketDisconnect as exc:
await self.on_disconnect(websocket)
class WebSocketPubSubEndpoint:
# A user defined pub/sub implementation that only sends after connection is established
async def __call__(self, receive: Receive, send: Send):
super().__call__(receive, send)
while True:
message = message_generator()
self.websocket.send(message)
async def on_connect(self, websocket, **kwargs):
# Accept and setup the subscription here
The crux of if it being the instantiation of WebSocket and the accept completion will likely happen the same way regardless of the pattern/protocol used. Would be nice to have a base class to cover those and any other boilerplate that arises.
On the other hand, if the vast majority of users are just going to be using a receive driven WebSocket and that is the best base class, it's an easy thing to override.
Either way, I like the class based WebSocketEndpoint.
from starlette.
Here's the thing... the implementation has to be message driven, because if it isn't there's no way it'll ever get the disconnect message, and know when to exit.
It's okay if there's also a some separately driven aspect to that, but if so then it needs to be treated as a secondary task that'll get cancelled once disconnect is received.
Aside: I think there's a bit of sharpening up of both the ASGI spec and the websocket server implementations in that I'm pretty sure that send
shouldn't raise an error once disconnected, but that clients must listen to the incoming messages for the disconnect.
from starlette.
@tomchristie I think you pinpointed the real issue here, there is not currently a good way in ASGI to gracefully get an unexpected disconnect from the client without checking the receive message. In practice, I've found an error is raised on send in that situation. The problem is worsened when the server can't check receive before every send. Which I personally wouldn't like anyway even if that did work (an endless "Did it close?" check).
I think we disagree on how to handle the situation: clients must listen vs. catch a disconnected error .
I would be fine with 'ASGI clients must listen' if it could be done in a way that didn't break a send only websocket. I hope I'm making sense.
Anyway, I don't want to get into the weeds here about ASGI. Maybe somewhere else (is there virtual beer?).
I fully trust @tomchristie decision on this. @tomchristie has way more experience in building these things than I and I think I've made my case in a couple places on this.
from starlette.
I'm moderately sure about this... Exceptions from send
/receive
ought to be treated as opaque, indicating an error condition. An exception ought to end up getting logged with a traceback.
Conversely disconnect is a normal, expected state. We don't want it to be logged as an error state, and assuming exceptions are opaque, then the right way to signal them is on the receive channel, rather than via exceptions.
We can still perfectly have send-only websockets, it's just that the pattern for "how do I disconnect this" is "cancel the child task once you get disconnect" rather than "wait for a DisconnectOccurred-type exception".
from starlette.
I'm not sure what you mean by "cancel the child task once you get disconnect", could you elaborate? Do you mean that the ASGI service should cancel the task?
What would that look like to Starlette's WebSocket class?
I agree with you on the opacity and Exceptions being for errors here. Looking at it that way helps.
from starlette.
This is the sort of pattern I mean...
class WebSocketTicks(WebSocketEndpoint):
async def on_connect(self, websocket):
await websocket.accept()
self.ticker_task = asyncio.create_task(self.tick)
async def on_disconnect(self):
self.ticker_task.cancel()
async def tick(self):
counter = 0
while True:
self.websocket.send_text("Counter %d" % counter)
counter += 1
await asyncio.sleep(1)
from starlette.
Interesting. I'll try that out.
Much better than an Exception.
from starlette.
I was thinking again about how we are passing the WebSocket
instance throughout the WebSocketEndpoint
instance in the current proposed pattern, and I was wondering if it may be better to store the websocket state on the endpoint instead as @tomchristie initially suggested. This also made me wonder if there is any reason not to inherit from the WebSocket
class and expose its methods directly.
Here are two potential ways of implementing that seem to be a bit more flexible based on how I've been using it:
class WebSocketEndpoint(WebSocket):
encoding = None
async def __call__(self, receive: Receive, send: Send):
self._receive = receive
self._send = send
close_code = 1000
try:
while True:
message = await self.receive()
if message["type"] == "websocket.connect":
await self.on_connect(message)
elif message["type"] == "websocket.receive":
data = await self.decode(message)
await self.on_receive(data)
elif message["type"] == "websocket.disconnect":
close_code = message["code"]
return
finally:
if close_code == 1006:
raise RuntimeError("WebSocket error, connection closed abnormally")
await self.on_disconnect(close_code=close_code)
async def decode(self, message):
if self.encoding == "text":
if "text" not in message:
await self.close(code=1003)
raise RuntimeError("Expected text websocket messages, but got bytes")
return message["text"]
elif self.encoding == "bytes":
if "bytes" not in message:
await self.close(code=1003)
raise RuntimeError("Expected bytes websocket messages, but got text")
return message["bytes"]
elif self.encoding == "json":
if "bytes" not in message:
await self.close(code=1003)
raise RuntimeError(
"Expected JSON to be transferred as bytes websocket messages, but got text"
)
return json.loads(message["bytes"].decode("utf-8"))
assert (
self.encoding is None
), f"Unsupported 'encoding' attribute {self.encoding}"
return message["text"] if "text" in message else message["bytes"]
async def on_connect(self, message):
"""Override to handle an incoming websocket connection"""
await self.accept()
async def on_receive(self, data):
"""Override to handle an incoming websocket message"""
await self.receive(data)
async def on_disconnect(self, close_code):
"""Override to handle a disconnecting websocket"""
await self.close(code=close_code)
Alternatively, we could do the same as above except like this:
class WebSocketEndpoint:
def __init__(self, scope: Scope) -> None:
self.scope = scope
async def __call__(self, receive: Receive, send: Send):
self.websocket = WebSocket(self.scope, receive=receive, send=send)
...
async def on_connect(self, message):
"""Override to handle an incoming websocket connection"""
await self.websocket.accept()
async def on_receive(self, data):
"""Override to handle an incoming websocket message"""
await self.websocket.receive(data)
async def on_disconnect(self, close_code):
"""Override to handle a disconnecting websocket"""
await self.websocket.close(code=close_code)
Likely a bit of finessing required. Also, I haven't been across all of the discussion surrounding the disconnect/close behaviour, so maybe there is something from there that could help inform this as well.
Thoughts?
from starlette.
Neat idea. One reason to prefer not subclassing WebSocket is that it makes ongoing comparability more awkward in new versions - every time we introduce extra API on the class thereโs a chance of that clashing with some existing user code. For that reason we should probably prefer composition rather than inheritance.
from starlette.
Good point, I was initially setting out to explore the question of storing the websocket on the instance generally and figured the inheritance approach was worth looking into as well.
What about the alternative of storing the websocket state on the class in favor of passing it to the on_*
methods?
I was thinking about how a broadcast (pub/sub, groups, etc.) middleware may work applied to this endpoint, and it seems that exposing the websocket as an attribute on the endpoint instance would provide a convenient interface for the wrapping application.
EDIT: Thinking about this some more, maybe that would be better as a subclass for this kind of endpoint?
from starlette.
@ERM I think the broadcast question is kinda seperate (eg. we'll need to handle it for HTTP requests as well, since eg. a POST to an HTTP endpoint could result in a broadcast notification to all attached websockets)
Anyways, I like what we have here now, I reckon it's good enough for the moment.
from starlette.
I was thinking again about how we are passing the
WebSocket
instance throughout theWebSocketEndpoint
instance in the current proposed pattern, and I was wondering if it may be better to store the websocket state on the endpoint instead as @tomchristie initially suggested. This also made me wonder if there is any reason not to inherit from theWebSocket
class and expose its methods directly.Here are two potential ways of implementing that seem to be a bit more flexible based on how I've been using it:
class WebSocketEndpoint(WebSocket): encoding = None async def __call__(self, receive: Receive, send: Send): self._receive = receive self._send = send close_code = 1000 try: while True: message = await self.receive() if message["type"] == "websocket.connect": await self.on_connect(message) elif message["type"] == "websocket.receive": data = await self.decode(message) await self.on_receive(data) elif message["type"] == "websocket.disconnect": close_code = message["code"] return finally: if close_code == 1006: raise RuntimeError("WebSocket error, connection closed abnormally") await self.on_disconnect(close_code=close_code) async def decode(self, message): if self.encoding == "text": if "text" not in message: await self.close(code=1003) raise RuntimeError("Expected text websocket messages, but got bytes") return message["text"] elif self.encoding == "bytes": if "bytes" not in message: await self.close(code=1003) raise RuntimeError("Expected bytes websocket messages, but got text") return message["bytes"] elif self.encoding == "json": if "bytes" not in message: await self.close(code=1003) raise RuntimeError( "Expected JSON to be transferred as bytes websocket messages, but got text" ) return json.loads(message["bytes"].decode("utf-8")) assert ( self.encoding is None ), f"Unsupported 'encoding' attribute {self.encoding}" return message["text"] if "text" in message else message["bytes"] async def on_connect(self, message): """Override to handle an incoming websocket connection""" await self.accept() async def on_receive(self, data): """Override to handle an incoming websocket message""" await self.receive(data) async def on_disconnect(self, close_code): """Override to handle a disconnecting websocket""" await self.close(code=close_code)Alternatively, we could do the same as above except like this:
class WebSocketEndpoint: def __init__(self, scope: Scope) -> None: self.scope = scope async def __call__(self, receive: Receive, send: Send): self.websocket = WebSocket(self.scope, receive=receive, send=send) ... async def on_connect(self, message): """Override to handle an incoming websocket connection""" await self.websocket.accept() async def on_receive(self, data): """Override to handle an incoming websocket message""" await self.websocket.receive(data) async def on_disconnect(self, close_code): """Override to handle a disconnecting websocket""" await self.websocket.close(code=close_code)Likely a bit of finessing required. Also, I haven't been across all of the discussion surrounding the disconnect/close behaviour, so maybe there is something from there that could help inform this as well.
Thoughts?
Does this work? im having trouble with " RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'."
from starlette.
Related Issues (20)
- Enable `branch = true` for `coverage.run` HOT 4
- Fetching API docs occasionally fails at starlette framework middleware issue with streaming large responses
- links on the release-notes not found. HOT 1
- Add `Partitioned` cookie attribute HOT 2
- `RuntimeError("No response returned")` in `BaseHTTPMiddleware` HOT 11
- Incompatibility with anyio v3
- TestClient DeprecationWarning with httpx 0.27.0 HOT 5
- Bug: `_TemplateResponse` is still relying on `request` key from `context` which maybe different from `request` argument to `TemplateResponse`. HOT 4
- How to change the value of request body?
- A `RuntimeError: Stream consumed` error is raised when `request.body()` is read in the custom middleware
- CORSMiddleware always add access-control-allow-credentials regardless of Origin
- ValueError when null byte in URL HOT 2
- responses.py - appears UTF incompatible (Line 58 and probably other places)
- Double path unquote inside _TestClientTransport.__init__ method
- [FeatureRequest] Cant configure request.stream() chunk size -- always 128kb per chunk read HOT 1
- 0.35.0 refactor of root_path handling is potentially returning incorrect route
- pathsend causing issues with BaseHTTPMiddleware
- Python 3.13.0b2: KeyError: 'content-type' in `test_debug_html[asyncio]` and `test_debug_html[trio]`
- Python 3.13.0b2: `test_gzip_ignored_for_responses_with_encoding_set[trio]` fails with a `ValueError: I/O operation on closed file` HOT 3
- [Websockets] RuntimeError: Cannot call "receive" once a disconnect message has been received. HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ๐๐๐
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google โค๏ธ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from starlette.