Git Product home page Git Product logo

Comments (17)

jordaneremieff avatar jordaneremieff commented on July 23, 2024 4

I've started on an implementation for this, it appears to work as expected in my test application. I'm happy to open a PR with tests, documentation, etc. after getting some feedback here.

How does this look so far?

class WebSocketEndpoint:
    def __init__(self, scope: Scope):
        self.scope = scope

    async def __call__(self, receive: Receive, send: Send):
        websocket = WebSocket(self.scope, receive=receive, send=send)
        kwargs = self.scope.get("kwargs", {})
        await self.on_connect(websocket, **kwargs)
        try:
            while True:
                message = await websocket.receive()
                websocket._raise_on_disconnect(message)
                if message["type"] == "websocket.receive":
                    await self.on_receive(websocket, message)
                elif message["type"] == "websocket.disconnect":
                    await self.on_disconnect(websocket)
        except WebSocketDisconnect as exc:
            await self.on_disconnect(websocket)

    async def on_connect(self, websocket, **kwargs):
        """Override to handle an incoming websocket connection"""
        await websocket.accept()

    async def on_receive(self, websocket, message):
        """Override to handle an incoming websocket message"""

    async def on_disconnect(self, websocket):
        """Override to handle a disconnecting websocket"""
        await websocket.close(code=1000)

from starlette.

tomchristie avatar tomchristie commented on July 23, 2024

Yeah, looks about right.

It's not 100% clear if want to send websocket.close() by default in the on_disconnect by default, since the disconnect might occur because we sent a close.

Also the WebSocketDisconnect isn't necessarily what we want there - really we ought to be getting a websocket.disconnect message, and break'ing out of the while True.

Plus I think we should additionally store the websocket and the kwargs on the instance so that they're available everywhere if needed.

All of those are things that we could deal with in review tho.

from starlette.

jeffbuttars avatar jeffbuttars commented on July 23, 2024

This is a good step.
I'm concerned this class/endpoint enforces a protocol pattern though: always receiving.

I'd like to see a class much like this, but does not assume any kind of protocol patterns.
So in this base class call would finish the upgrade and accept, but that's it.

Then there could be sub classes for common patterns like WebSocketReceiveSendEndpoint/WebSocketPublisherEndpoint,Etc...
Which could be included in Starlette or a separate package and of course enable easy extension by users.

from starlette.

jordaneremieff avatar jordaneremieff commented on July 23, 2024

@jeffbuttars I could update the current PR with a base class and the current implementation could exist as a subclass, but I'm unsure what the base behavior might look like because the implementation I've put forward is the only pattern of this sort I'm familiar enough with to define.

Would you share an example of what you're thinking?

from starlette.

tomchristie avatar tomchristie commented on July 23, 2024

Personally I think the event-driven style is what we want by default.

A sensible pattern for running some sub-task off of this would be to start in on accept, and cancel it on disconnect.

from starlette.

jeffbuttars avatar jeffbuttars commented on July 23, 2024

I was thinking something like this (I'm not proposing this exact code)

class WebSocketBaseEndpoint:
    def __init__(self, scope: Scope):
        self.scope = scope

    async def __call__(self, receive: Receive, send: Send):
        self.websocket = WebSocket(self.scope, receive=receive, send=send)
        kwargs = self.scope.get("kwargs", {})
        await self.on_connect(self.websocket, **kwargs)

    async def on_connect(self, websocket, **kwargs):
        """Override to handle an incoming websocket connection"""
        await websocket.accept()

    async def on_receive(self, websocket, message):
        """Override to handle an incoming websocket message"""

    async def on_disconnect(self, websocket):
        """Override to handle a disconnecting websocket"""
        await websocket.close(code=1000)

class WebSocketEndpoint(WebSocketBaseEndpoint):
# A receive driven endpoint by default
async def __call__(self, receive: Receive, send: Send):
        super().__call__(receive, send)
        try:
            while True:
                message = await self.websocket.receive()
                websocket._raise_on_disconnect(message)
                if message["type"] == "websocket.receive":
                    await self.on_receive(websocket, message)
                elif message["type"] == "websocket.disconnect":
                    await self.on_disconnect(websocket)
        except WebSocketDisconnect as exc:
            await self.on_disconnect(websocket)

class WebSocketPubSubEndpoint:
# A user defined pub/sub implementation that only sends after connection is established
  async def __call__(self, receive: Receive, send: Send):
        super().__call__(receive, send)
          while True:
              message = message_generator()
              self.websocket.send(message)

  async def on_connect(self, websocket, **kwargs):
    # Accept and setup the subscription here

The crux of if it being the instantiation of WebSocket and the accept completion will likely happen the same way regardless of the pattern/protocol used. Would be nice to have a base class to cover those and any other boilerplate that arises.

On the other hand, if the vast majority of users are just going to be using a receive driven WebSocket and that is the best base class, it's an easy thing to override.

Either way, I like the class based WebSocketEndpoint.

from starlette.

tomchristie avatar tomchristie commented on July 23, 2024

Here's the thing... the implementation has to be message driven, because if it isn't there's no way it'll ever get the disconnect message, and know when to exit.

It's okay if there's also a some separately driven aspect to that, but if so then it needs to be treated as a secondary task that'll get cancelled once disconnect is received.

Aside: I think there's a bit of sharpening up of both the ASGI spec and the websocket server implementations in that I'm pretty sure that send shouldn't raise an error once disconnected, but that clients must listen to the incoming messages for the disconnect.

from starlette.

jeffbuttars avatar jeffbuttars commented on July 23, 2024

@tomchristie I think you pinpointed the real issue here, there is not currently a good way in ASGI to gracefully get an unexpected disconnect from the client without checking the receive message. In practice, I've found an error is raised on send in that situation. The problem is worsened when the server can't check receive before every send. Which I personally wouldn't like anyway even if that did work (an endless "Did it close?" check).

I think we disagree on how to handle the situation: clients must listen vs. catch a disconnected error .
I would be fine with 'ASGI clients must listen' if it could be done in a way that didn't break a send only websocket. I hope I'm making sense.

Anyway, I don't want to get into the weeds here about ASGI. Maybe somewhere else (is there virtual beer?).

I fully trust @tomchristie decision on this. @tomchristie has way more experience in building these things than I and I think I've made my case in a couple places on this.

from starlette.

tomchristie avatar tomchristie commented on July 23, 2024

I'm moderately sure about this... Exceptions from send/receive ought to be treated as opaque, indicating an error condition. An exception ought to end up getting logged with a traceback.

Conversely disconnect is a normal, expected state. We don't want it to be logged as an error state, and assuming exceptions are opaque, then the right way to signal them is on the receive channel, rather than via exceptions.

We can still perfectly have send-only websockets, it's just that the pattern for "how do I disconnect this" is "cancel the child task once you get disconnect" rather than "wait for a DisconnectOccurred-type exception".

from starlette.

jeffbuttars avatar jeffbuttars commented on July 23, 2024

I'm not sure what you mean by "cancel the child task once you get disconnect", could you elaborate? Do you mean that the ASGI service should cancel the task?
What would that look like to Starlette's WebSocket class?

I agree with you on the opacity and Exceptions being for errors here. Looking at it that way helps.

from starlette.

tomchristie avatar tomchristie commented on July 23, 2024

This is the sort of pattern I mean...

class WebSocketTicks(WebSocketEndpoint):
    async def on_connect(self, websocket):
        await websocket.accept()
        self.ticker_task = asyncio.create_task(self.tick)

    async def on_disconnect(self):
        self.ticker_task.cancel()

    async def tick(self):
        counter = 0
        while True:
            self.websocket.send_text("Counter %d" % counter)
            counter += 1
            await asyncio.sleep(1)

from starlette.

jeffbuttars avatar jeffbuttars commented on July 23, 2024

Interesting. I'll try that out.
Much better than an Exception.

from starlette.

jordaneremieff avatar jordaneremieff commented on July 23, 2024

I was thinking again about how we are passing the WebSocket instance throughout the WebSocketEndpoint instance in the current proposed pattern, and I was wondering if it may be better to store the websocket state on the endpoint instead as @tomchristie initially suggested. This also made me wonder if there is any reason not to inherit from the WebSocket class and expose its methods directly.

Here are two potential ways of implementing that seem to be a bit more flexible based on how I've been using it:

class WebSocketEndpoint(WebSocket):

    encoding = None

    async def __call__(self, receive: Receive, send: Send):
        self._receive = receive
        self._send = send
        close_code = 1000

        try:
            while True:
                message = await self.receive()
                if message["type"] == "websocket.connect":
                    await self.on_connect(message)
                elif message["type"] == "websocket.receive":
                    data = await self.decode(message)
                    await self.on_receive(data)
                elif message["type"] == "websocket.disconnect":
                    close_code = message["code"]
                    return
        finally:
            if close_code == 1006:
                raise RuntimeError("WebSocket error, connection closed abnormally")
            await self.on_disconnect(close_code=close_code)

    async def decode(self, message):
        if self.encoding == "text":
            if "text" not in message:
                await self.close(code=1003)
                raise RuntimeError("Expected text websocket messages, but got bytes")
            return message["text"]

        elif self.encoding == "bytes":
            if "bytes" not in message:
                await self.close(code=1003)
                raise RuntimeError("Expected bytes websocket messages, but got text")
            return message["bytes"]

        elif self.encoding == "json":
            if "bytes" not in message:
                await self.close(code=1003)
                raise RuntimeError(
                    "Expected JSON to be transferred as bytes websocket messages, but got text"
                )
            return json.loads(message["bytes"].decode("utf-8"))

        assert (
            self.encoding is None
        ), f"Unsupported 'encoding' attribute {self.encoding}"

        return message["text"] if "text" in message else message["bytes"]

    async def on_connect(self, message):
        """Override to handle an incoming websocket connection"""
        await self.accept()

    async def on_receive(self, data):
        """Override to handle an incoming websocket message"""
        await self.receive(data)

    async def on_disconnect(self, close_code):
        """Override to handle a disconnecting websocket"""
        await self.close(code=close_code)

Alternatively, we could do the same as above except like this:

class WebSocketEndpoint:
    def __init__(self, scope: Scope) -> None:
        self.scope = scope

    async def __call__(self, receive: Receive, send: Send):
        self.websocket = WebSocket(self.scope, receive=receive, send=send)
        ...

    async def on_connect(self, message):
        """Override to handle an incoming websocket connection"""
        await self.websocket.accept()

    async def on_receive(self, data):
        """Override to handle an incoming websocket message"""
        await self.websocket.receive(data)

    async def on_disconnect(self, close_code):
        """Override to handle a disconnecting websocket"""
        await self.websocket.close(code=close_code)

Likely a bit of finessing required. Also, I haven't been across all of the discussion surrounding the disconnect/close behaviour, so maybe there is something from there that could help inform this as well.

Thoughts?

from starlette.

tomchristie avatar tomchristie commented on July 23, 2024

Neat idea. One reason to prefer not subclassing WebSocket is that it makes ongoing comparability more awkward in new versions - every time we introduce extra API on the class thereโ€™s a chance of that clashing with some existing user code. For that reason we should probably prefer composition rather than inheritance.

from starlette.

jordaneremieff avatar jordaneremieff commented on July 23, 2024

Good point, I was initially setting out to explore the question of storing the websocket on the instance generally and figured the inheritance approach was worth looking into as well.

What about the alternative of storing the websocket state on the class in favor of passing it to the on_* methods?

I was thinking about how a broadcast (pub/sub, groups, etc.) middleware may work applied to this endpoint, and it seems that exposing the websocket as an attribute on the endpoint instance would provide a convenient interface for the wrapping application.

EDIT: Thinking about this some more, maybe that would be better as a subclass for this kind of endpoint?

from starlette.

tomchristie avatar tomchristie commented on July 23, 2024

@ERM I think the broadcast question is kinda seperate (eg. we'll need to handle it for HTTP requests as well, since eg. a POST to an HTTP endpoint could result in a broadcast notification to all attached websockets)

Anyways, I like what we have here now, I reckon it's good enough for the moment.

from starlette.

vlori2k avatar vlori2k commented on July 23, 2024

I was thinking again about how we are passing the WebSocket instance throughout the WebSocketEndpoint instance in the current proposed pattern, and I was wondering if it may be better to store the websocket state on the endpoint instead as @tomchristie initially suggested. This also made me wonder if there is any reason not to inherit from the WebSocket class and expose its methods directly.

Here are two potential ways of implementing that seem to be a bit more flexible based on how I've been using it:

class WebSocketEndpoint(WebSocket):

    encoding = None

    async def __call__(self, receive: Receive, send: Send):
        self._receive = receive
        self._send = send
        close_code = 1000

        try:
            while True:
                message = await self.receive()
                if message["type"] == "websocket.connect":
                    await self.on_connect(message)
                elif message["type"] == "websocket.receive":
                    data = await self.decode(message)
                    await self.on_receive(data)
                elif message["type"] == "websocket.disconnect":
                    close_code = message["code"]
                    return
        finally:
            if close_code == 1006:
                raise RuntimeError("WebSocket error, connection closed abnormally")
            await self.on_disconnect(close_code=close_code)

    async def decode(self, message):
        if self.encoding == "text":
            if "text" not in message:
                await self.close(code=1003)
                raise RuntimeError("Expected text websocket messages, but got bytes")
            return message["text"]

        elif self.encoding == "bytes":
            if "bytes" not in message:
                await self.close(code=1003)
                raise RuntimeError("Expected bytes websocket messages, but got text")
            return message["bytes"]

        elif self.encoding == "json":
            if "bytes" not in message:
                await self.close(code=1003)
                raise RuntimeError(
                    "Expected JSON to be transferred as bytes websocket messages, but got text"
                )
            return json.loads(message["bytes"].decode("utf-8"))

        assert (
            self.encoding is None
        ), f"Unsupported 'encoding' attribute {self.encoding}"

        return message["text"] if "text" in message else message["bytes"]

    async def on_connect(self, message):
        """Override to handle an incoming websocket connection"""
        await self.accept()

    async def on_receive(self, data):
        """Override to handle an incoming websocket message"""
        await self.receive(data)

    async def on_disconnect(self, close_code):
        """Override to handle a disconnecting websocket"""
        await self.close(code=close_code)

Alternatively, we could do the same as above except like this:

class WebSocketEndpoint:
    def __init__(self, scope: Scope) -> None:
        self.scope = scope

    async def __call__(self, receive: Receive, send: Send):
        self.websocket = WebSocket(self.scope, receive=receive, send=send)
        ...

    async def on_connect(self, message):
        """Override to handle an incoming websocket connection"""
        await self.websocket.accept()

    async def on_receive(self, data):
        """Override to handle an incoming websocket message"""
        await self.websocket.receive(data)

    async def on_disconnect(self, close_code):
        """Override to handle a disconnecting websocket"""
        await self.websocket.close(code=close_code)

Likely a bit of finessing required. Also, I haven't been across all of the discussion surrounding the disconnect/close behaviour, so maybe there is something from there that could help inform this as well.

Thoughts?

Does this work? im having trouble with " RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'."

from starlette.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.