Comments (32)
I'm trying to do a simple working example for server side events in starlette
without luck, maybe if you could check what's wrong with my logic I could refine it, and then raise a PR for a new EventResponse
class.
This is my code:
from asyncio.queues import Queue
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, StreamingResponse
class SSE:
def __init__(self, data, event=None, event_id=None, retry=None):
self.data = data
self.event = event
self.id = event_id
self.retry = retry
def encode(self):
message = f"data: {self.data}"
if self.event is not None:
message += f"\nevent: {self.event}"
if self.id is not None:
message += f"\nid: {self.id}"
if self.retry is not None:
message += f"\nretry: {self.retry}"
message += "\r\n\r\n"
return message.encode("utf-8")
app = Starlette(debug=True)
app.queues = []
@app.route("/subscribe", methods=["GET"])
async def subscribe(request: Request):
async def event_publisher():
while True:
event = await queue.get()
yield event.encode()
queue = Queue()
app.queues.append(queue)
headers = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
return StreamingResponse(content=event_publisher(), headers=headers)
@app.route("/publish", methods=["POST"])
async def publish(request: Request):
payload = await request.json()
data = payload["data"]
for queue in app.queues:
event = SSE(data)
await queue.put(event)
return JSONResponse({"message": "ok"})
if __name__ == "__main__":
uvicorn.run("__main__:app", host="0.0.0.0", port=4321, reload=True)
Obviously it's a naive implementation at the moment, but the main thing is that whenever I publish a new event it won't get broadcasted to the subscribers. When debugging I can see that the event is added to the queue, and also the generator can fetch it from the queue, but I never see it streamed to the client.
from starlette.
There is a third-party package that implements SSE for starlette: https://github.com/sysid/sse-starlette
from starlette.
@jacopofar No, it's not specific to Starlette - it's a constraint of how SSE works.
You could potentially do something like compress the content of the individual messages themselves if they were large enough for that to mater, but you can't compress the stream itself. (It wouldn't be a valid SSE response if you did, since you'd be scrambling the framing that indicates "here's a new message".)
from starlette.
I think the above implementation would still block until a new value is being yielded from the body_iterator, and only after that could check the value of the disconnected flag.
Sure - tweakable by pushing the streaming into a stream_response
task, and cancelling if required. Probably something similar to this...
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
async def stream_response():
nonlocal self, send
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
async for chunk in self.body_iterator:
if not isinstance(chunk, bytes):
chunk = chunk.encode(self.charset)
await send({"type": "http.response.body", "body": chunk, "more_body": True})
await send({"type": "http.response.body", "body": b"", "more_body": False})
async def listen_for_disconnect(task):
nonlocal self, receive
while True:
message = await receive()
if message['type'] == 'http.disconnect':
if not task.done():
task.cancel()
break
stream_task = asyncio.create_task(stream_response())
disconnect_task = asyncio.create_task(listen_for_disconnect(stream_task))
await stream_task
disconnect_task.result() if disconnect_task.done() else disconnect_task.cancel()
stream_task.result()
if self.background is not None:
await self.background()
While it should solve the problem with disconnected clients I'm still not sure that it will solve the hang issue of the server shutdown process, what do you think?
I've not looked into it - depends if uvicorn (or daphne/hypercorn) sends disconnected
events during shutdown, right?
from starlette.
Also I found another caveat specific to event source subscriptions. With the above implementation there is no way to check when a response was cancelled.
You'll get a cancelled exception raised within the streaming code. The sensible thing to do here would be to use
with
context blocks ortry ... finally:
statements will end up executing clean-up code.
I think I found a legitimate solution for the cleanup as well using background tasks.
E.g.:
@app.route("/subscribe", methods=["GET"])
async def subscribe(request: Request):
async def remove_subscriptions():
app.subscriptions.remove(queue)
async def event_iterator():
while True:
# yielding events here
queue = Queue()
app.subscriptions.add(queue)
return EventSourceResponse(
content=event_iterator(), background=BackgroundTask(remove_subscriptions)
)
Since background tasks are executed whenever the response is disconnected or finished it kinda feels appropriate to do cleanups with them.
from starlette.
Hi would be nice if the GZipMiddleware or maybe middlewares in general, would have a set of routes (strings or regex pattern) to ignore
from starlette.
Any update on this? It would be really helpful for building realtime applications
from starlette.
Not yet - tho I'd be happy to help guide anyone who's interested in taking on a pull request for it.
from starlette.
Oh, to be honest it seems like it's working, I tried it first from Firefox and it tries to download the stream by default as a file, but with Chrome it works just fine.
from starlette.
Interesting, @Kamforka do you have the frontend code as well? It should definitely be supported on Firefox
from starlette.
@jacopofar There is no code, usually I just navigate to the url localhost:4321/subscribe
and on Chrome it starts listening to the event stream and displays the messages published by the backend.
Seems like Firefox lacks this feature.
This is how it looks like in Chrome (pretty convenient for debugging):
from starlette.
So at the moment I created these POC classes to enable starlette
to send server side events:
class SSE:
def __init__(self, data, event=None, event_id=None, retry=None):
self.data = data
self.event = event
self.id = event_id
self.retry = retry
def encode(self, charset="utf-8"):
message = f"data: {self.data}"
if self.event is not None:
message += f"\nevent: {self.event}"
if self.id is not None:
message += f"\nid: {self.id}"
if self.retry is not None:
message += f"\nretry: {self.retry}"
message += "\r\n\r\n"
return message.encode(charset)
class EventSourceResponse(StreamingResponse):
def __init__(
self, content, headers={}, media_type=None, status_code=200, background=None,
):
default_headers = {
**headers,
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
super().__init__(
content=content,
status_code=status_code,
headers=default_headers,
media_type=media_type,
background=background,
)
async def __call__(self, scope, receive, send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
async for event in self.body_iterator:
if not isinstance(event, SSE):
raise Exception("Event source body must be an SSE instance")
await send(
{
"type": "http.response.body",
"body": event.encode(self.charset),
"more_body": True,
}
)
await send({"type": "http.response.body", "body": b"", "more_body": False})
if self.background is not None:
await self.background()
It works just fine, but I identified two pain points:
- If the client disconnects (i.e. the request is disconnected) the response object will just keep hanging and streaming events to the already gone request. However this one can be solved in the generator logic (more or less):
async def event_publisher():
while True:
if not await request.is_disconnected():
try:
event = await asyncio.wait_for(queue.get(), 1.0)
except asyncio.TimeoutError:
continue
yield event
else:
return
- When one wants to shutdown the
uvicorn
worker but there is still event streaming going on, the shutdown process will first be stuck atWaiting for connections to close.
lifespan, and if the client disconnects in the meantime it will again be stuck at theWaiting for background tasks to complete.
lifespan. (It seems likeuvicorn
just won't kill streamed responses unless it's forced to do so, but it's really annoying duringreload
because you will have to pressCtrl + C
all the time when a file has been changed
Any thoughts what is the most idiomatic way to overcome these issues within starlette
or maybe uvicorn
?
from starlette.
Ah, I didn't know that it was possible to see them in Chrome by just visiting the address.
I extended the code with a minimal JS to react to the events, and can confirm it works on Firefox.
Here the gist: https://gist.github.com/jacopofar/b328948c018dc360da8471a930140c06
MDN reports it's not supported only by IE and Edge (probably the new Edge based on Blink will)
I really like it, seems much easier to manage than websockets
from starlette.
So, I think we ought to change StreamingResponse
so that it listens in the background for disconnects, and breaks out of the iteration when it occurs.
Something along these lines...
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
disconnected = False
async def listen_for_disconnect():
while True:
message = await receive()
if message['type'] == 'http.disconnect':
disconnected = True
break
task = asyncio.create_task(listen_for_disconnect())
try:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
async for chunk in self.body_iterator:
if not isinstance(chunk, bytes):
chunk = chunk.encode(self.charset)
await send({"type": "http.response.body", "body": chunk, "more_body": True})
if disconnected:
break
if not disconnected:
await send({"type": "http.response.body", "body": b"", "more_body": False})
finally:
if task.done():
task.result()
else:
task.cancel()
if self.background is not None:
await self.background()
from starlette.
@tomchristie I think the above implementation would still block until a new value is being yielded from the body_iterator
, and only after that could check the value of the disconnected
flag.
Also the nested listen_for_disconnect
doesn't have access to the disconnected
flag defined in the scope of the __call__
method.
I think we should somehow cancel the async for
when the listen_for_disconnect
receives the 'http.disconnect'
event. I'll try to look into the possibilities for that.
While it should solve the problem with disconnected clients I'm still not sure that it will solve the hang issue of the server shutdown process, what do you think?
from starlette.
I think the above implementation would still block until a new value is being yielded from the body_iterator, and only after that could check the value of the disconnected flag.
Sure - tweakable by pushing the streaming into a
stream_response
task, and cancelling if required. Probably something similar to this...async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: async def stream_response(): nonlocal self, send await send( { "type": "http.response.start", "status": self.status_code, "headers": self.raw_headers, } ) async for chunk in self.body_iterator: if not isinstance(chunk, bytes): chunk = chunk.encode(self.charset) await send({"type": "http.response.body", "body": chunk, "more_body": True}) await send({"type": "http.response.body", "body": b"", "more_body": False}) async def listen_for_disconnect(task): nonlocal self, receive while True: message = await receive() if message['type'] == 'http.disconnect': if not task.done(): task.cancel() break stream_task = asyncio.create_task(stream_response()) disconnect_task = asyncio.create_task(listen_for_disconnect(stream_task)) await stream_task disconnect_task.result() if disconnect_task.done() else disconnect_task.cancel() stream_task.result() if self.background is not None: await self.background()While it should solve the problem with disconnected clients I'm still not sure that it will solve the hang issue of the server shutdown process, what do you think?
I've not looked into it - depends if uvicorn (or daphne/hypercorn) sends
disconnected
events during shutdown, right?
Wow I like this one! I reworked it a bit though:
async def stream_response(self, send):
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
async for chunk in self.body_iterator:
if not isinstance(chunk, bytes):
chunk = chunk.encode(self.charset)
await send({"type": "http.response.body", "body": chunk, "more_body": True})
await send({"type": "http.response.body", "body": b"", "more_body": False})
async def listen_for_disconnect(self, receive):
while True:
message = await receive()
if message["type"] == "http.disconnect":
break
async def __call__(self, scope, receive, send):
done, pending = await asyncio.wait(
[self.stream_response(send), self.listen_for_disconnect(receive)],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
if self.background is not None:
await self.background()
Tested and works. Thoughts?
from starlette.
That's a nice implementation, yup.
My one other concern here would be cases where we might inadvertantly end up with multiple readers listening for the disconnect event. For example, the HTTP base middleware returns a StreamingResponse
... https://github.com/encode/starlette/blob/master/starlette/middleware/base.py#L58 ...which I assume would break our proposals here, since we'd end up with more than one task listening on the receive
, with only one of them ending up with the message.
from starlette.
@tomchristie I will try adding a middleware like that to my setup and check what happens.
Also I found another caveat specific to event source subscriptions. With the above implementation there is no way to check when a response was cancelled.
What do you think implementing a callback e.g.: on_disconnect
for the StreamingResponse
so one could pass clean up logic there?
I mean something like this:
return StreamingResponse(content=event_publisher(), on_disconnect=lambda: app.subscriptions.remove(queue))
And then the listener logic could call it when receives a disconnect:
if message["type"] == "http.disconnect":
self.on_disconnect()
break
from starlette.
Also I found another caveat specific to event source subscriptions. With the above implementation there is no way to check when a response was cancelled.
You'll get a cancelled exception raised within the streaming code. The sensible thing to do here would be to use with
context blocks or try ... finally:
statements will end up executing clean-up code.
from starlette.
@tomchristie however that require to add __aenter__
and __aexit__
to StreamingResponse
right?
And then you could do something like this:
async def hello(request):
async with StreamingResponse() as resp:
while True:
await resp.send(data)
await asyncio.sleep(1)
return resp
So in this case you could try-catch the exception when the disconnect cancels, right?
Or maybe I overthink something? Cuz my problem here is that I need to do the cleanup logic from the view function and not inside the response object.
from starlette.
however that require to add aenter and aexit to StreamingResponse right?
The async iterator that gets passed to the response instance will have the exception raised there.
from starlette.
Hmm, you sure about that?
I don't get any exceptions in my iterator when I run my previously posted codes.
from starlette.
Which python version are you running?
from starlette.
3.7.x and 3.8.x
from starlette.
That's a nice implementation, yup.
My one other concern here would be cases where we might inadvertantly end up with multiple readers listening for the disconnect event. For example, the HTTP base middleware returns a
StreamingResponse
... https://github.com/encode/starlette/blob/master/starlette/middleware/base.py#L58 ...which I assume would break our proposals here, since we'd end up with more than one task listening on thereceive
, with only one of them ending up with the message.
I tested the proposal using the CustomHeaderMiddleware
and it works just fine.
It seems like both the middleware's streaming response, and the event source's streaming response got the message from receive
.
from starlette.
So at the moment I think I'd like to see any implementation here tackled as a third party package, as per comment on #757.
I'm trying to keep Starlette's scope down to a minimum, and having an SSEResponse
maintained independantly would be really helpful here.
from starlette.
So basically there were an already worked out PR on this all time long?
from starlette.
Btw I think that so far we discussed an alternative implementation for the StreamingResponse
, therefore most of the brainstorming that we had here should directly go to starlette
, don't you think?
from starlette.
Potentially. Let's wait and see what any pull request here looks like, then we'd be in a better position to take a call on it.
from starlette.
It says Caveat: SSE streaming does not work in combination with GZipMiddleware., is it because of #919 ?
from starlette.
I guess the summary here is:
- Starlette will not implement SSE internally.
sse-starlette
is the official recommendation to use SSE with Starlette.
from starlette.
As per @tomchristie 's reply on #51 (comment) 3 months ago, I'm closing this.
from starlette.
Related Issues (20)
- Fetching API docs occasionally fails at starlette framework middleware issue with streaming large responses
- links on the release-notes not found. HOT 1
- Add `Partitioned` cookie attribute HOT 2
- `RuntimeError("No response returned")` in `BaseHTTPMiddleware` HOT 11
- Incompatibility with anyio v3
- TestClient DeprecationWarning with httpx 0.27.0 HOT 5
- Bug: `_TemplateResponse` is still relying on `request` key from `context` which maybe different from `request` argument to `TemplateResponse`. HOT 4
- How to change the value of request body?
- A `RuntimeError: Stream consumed` error is raised when `request.body()` is read in the custom middleware
- CORSMiddleware always add access-control-allow-credentials regardless of Origin
- ValueError when null byte in URL HOT 2
- responses.py - appears UTF incompatible (Line 58 and probably other places)
- Double path unquote inside _TestClientTransport.__init__ method
- [FeatureRequest] Cant configure request.stream() chunk size -- always 128kb per chunk read HOT 1
- 0.35.0 refactor of root_path handling is potentially returning incorrect route
- pathsend causing issues with BaseHTTPMiddleware
- Python 3.13.0b2: KeyError: 'content-type' in `test_debug_html[asyncio]` and `test_debug_html[trio]`
- Python 3.13.0b2: `test_gzip_ignored_for_responses_with_encoding_set[trio]` fails with a `ValueError: I/O operation on closed file` HOT 3
- [Websockets] RuntimeError: Cannot call "receive" once a disconnect message has been received. HOT 4
- middleware causes exceptions to not be raised/handled silently (back again) HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from starlette.