Git Product home page Git Product logo

sseclient's Introduction

Python SSE Client

This is a Python client library for iterating over http Server Sent Event (SSE) streams (also known as EventSource, after the name of the Javascript interface inside browsers). The SSEClient class accepts a url on init, and is then an iterator over messages coming from the server.

Maintenance

I (btubbs) haven't been using this library in my own work for years, so I put limited time into maintaining it. I will check in on pull requests and issues once per month. If you are interested in providing more active support for the library, please reach out.

Installation

Use pip:

pip install sseclient

Usage

from sseclient import SSEClient

messages = SSEClient('http://mysite.com/sse_stream/')
for msg in messages:
    do_something_useful(msg)

Each message object will have a 'data' attribute, as well as optional 'event', 'id', and 'retry' attributes.

Optional init parameters:

  • last_id: If provided, this parameter will be sent to the server to tell it to return only messages more recent than this ID.
  • retry: Number of milliseconds to wait after disconnects before attempting to reconnect. The server may change this by including a 'retry' line in a message. Retries are handled automatically by the SSEClient object.

You may also provide any additional keyword arguments supported by the Requests library, such as a 'headers' dict and a (username, password) tuple for 'auth'.

Development

Install the library in editable mode:

pip install -e .

Install the test dependencies:

pip install pytest backports.unittest_mock

Run the tests with py.test:

(sseclient)vagrant sseclient $ py.test
===================== test session starts ======================
platform linux2 -- Python 2.7.6 -- py-1.4.30 -- pytest-2.7.2
rootdir: /vagrant/code/sseclient, inifile: 
plugins: backports.unittest-mock
collected 11 items 

test_sseclient.py ...........

================== 11 passed in 0.19 seconds ===================

There are a couple TODO items in the code for getting the implementation completely in line with the finer points of the SSE spec.

Additional Resources

sseclient's People

Contributors

bachp avatar btubbs avatar count-count avatar cyprienc avatar davidmstraub avatar dotlambda avatar dunglas avatar felixb avatar jaraco avatar mutantmonkey avatar nuclon avatar ondrejslamecka avatar ottomata avatar radarhere avatar scop avatar shoelace avatar stj avatar thesanddoctor avatar toabi avatar xqt avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

sseclient's Issues

TypeError on Python 3

When deploying Velociraptor's web proc on Python 3, we see this error in the logs:

Traceback (most recent call last):
  File "/app/.heroku/venv/lib/python3.6/site-packages/gunicorn/workers/base_async.py", line 56, in handle
    self.handle_request(listener_name, req, client, addr)
  File "/app/.heroku/venv/lib/python3.6/site-packages/gunicorn/workers/ggevent.py", line 160, in handle_request
    addr)
  File "/app/.heroku/venv/lib/python3.6/site-packages/gunicorn/workers/base_async.py", line 114, in handle_request
    for item in respiter:
  File "/app/.heroku/venv/lib/python3.6/site-packages/newrelic/api/web_transaction.py", line 646, in __iter__
    for item in self.generator:
  File "/app/.heroku/venv/lib/python3.6/site-packages/newrelic/api/web_transaction.py", line 1053, in __iter__
    for data in self.iterable:
  File "/app/.heroku/venv/lib/python3.6/site-packages/vr/server/events.py", line 51, in __iter__
    yield self.next()
  File "/app/.heroku/venv/lib/python3.6/site-packages/vr/server/events.py", line 61, in next
    return ev.dump()
  File "/app/.heroku/venv/lib/python3.6/site-packages/sseclient.py", line 116, in dump
    lines.extend('data: %s' % d for d in self.data.split('\n'))
TypeError: a bytes-like object is required, not 'str'

sseclient can't detect end of next event?

I am connecting to a server and not receiving any server-sent events.

After looking into the code, this is because in sseclient.py function next(), the code beginning with while not self._event_complete() never completes, so it doesn't complete and process any events, but it only appends to the internal buffer.

Any hints why this might be happening? Thanks.

Do retry on urllib3.ProtocolErrors

When reading responses at self.resp.raw.read, a protocol error might occur, for example, when the connection is somehow disturbed. On such an error, we want to retry and connect to the server again, as with the other errors. Presently this is not the case, since the protocol error isn't caught.

Stacktrace

` File "/usr/local/lib/python2.7/dist-packages/sseclient.py", line 92, in next
next_chunk = next(self.resp_iterator)

File "/usr/local/lib/python2.7/dist-packages/sseclient.py", line 76, in generate
chunk = self.resp.raw.read(self.chunk_size)

File "/usr/lib/python2.7/dist-packages/urllib3/response.py", line 459, in read
raise IncompleteRead(self._fp_bytes_read, self.length_remaining)

File "/usr/lib/python2.7/contextlib.py", line 35, in exit
self.gen.throw(type, value, traceback)

File "/usr/lib/python2.7/dist-packages/urllib3/response.py", line 378, in _error_catcher
raise ProtocolError('Connection broken: %r' % e, e)

ProtocolError: ('Connection broken: IncompleteRead(831 bytes read)', `

Messages truncated at 2032 characters

I'm using the sseclient library on py 3.7.11 and the sse messages are truncated at 2032 characters. This is not the case when accessing the data from other methods. Is there an argument to extend the character length?

I haven't found much in the source code other than chunk_size but that does not seem to be the issue. Could it be something about line endings?

How to pass timeout seconds.

There are cases when a client would not know if the connection is still healthy. I encountered similar issue when server closed the connection but client is still listening. In such cases I would want to close the connection (after timeout) and reopen.
Basically I should be able to do something like:
$ self.resp = requester.get(self.url, timeout=120, stream=True, **self.requests_kwargs)
But the library doesnt allow me the way to pass timeout value.

Changing the encoding?

The requests library provides a way to change the encoding used to read a response:

r = requests.get('http://my.url/')
r.encoding = 'utf-8' # this forces the encoding used to decode the response

However as a user of sseclient I cannot use this directly to fix the encoding used by SSEClient to read a stream.

Sometimes reading a stream can fail because of a wrong encoding:

  File "/data/project/editgroups/www/python/src/store/stream.py", line 13, in stream
    for event in EventSource(url):
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/sseclient.py", line 98, in __next__
    self._connect()
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/sseclient.py", line 56, in _connect
    self.resp = requester.get(self.url, stream=True, **self.requests_kwargs)
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/requests/api.py", line 75, in get
    return request('get', url, params=params, **kwargs)
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/requests/api.py", line 60, in request
    return session.request(method=method, url=url, **kwargs)
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "/data/project/editgroups/listener_venv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 354, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/lib/python3.5/http/client.py", line 1107, in request
    self._send_request(method, url, body, headers)
  File "/usr/lib/python3.5/http/client.py", line 1147, in _send_request
    self.putheader(hdr, value)
  File "/usr/lib/python3.5/http/client.py", line 1079, in putheader
    values[i] = one_value.encode('latin-1')
UnicodeEncodeError: 'latin-1' codec can't encode characters in position 0-1: ordinal not in range(256)

So it'd be nice if we could have an option in the SSEClient constructor to set the encoding, which would be passed down to the requests library. Would you be open to that?

Linefeed related regression in 0.0.17

0.0.17 introduces a regression that is related to extra linefeeds between events. Here's a reproducer "stream": https://gist.githubusercontent.com/scop/c9b8face7b0c0ad0808b155a9611e8e4/raw/78276bc59c6c2c2f8845d8f56a65b15259636c52/test.txt

With 0.0.15 all is ok (let it loop for a few seconds):

$ python -c $'import sseclient\nfor msg in sseclient.SSEClient("https://gist.githubusercontent.com/scop/c9b8face7b0c0ad0808b155a9611e8e4/raw/78276bc59c6c2c2f8845d8f56a65b15259636c52/test.txt"): print("event:'{msg.event}' data:{msg.data}".format(msg=msg))'
event:hello data:hello1
event:hello data:hello2
event:hello data:hello3
event:hello data:hello1
event:hello data:hello2
event:hello data:hello3

...but with 0.0.17, things go south:

$ python -c $'import sseclient\nfor msg in sseclient.SSEClient("https://gist.githubusercontent.com/scop/c9b8face7b0c0ad0808b155a9611e8e4/raw/78276bc59c6c2c2f8845d8f56a65b15259636c52/test.txt"): print("event:'{msg.event}' data:{msg.data}".format(msg=msg))'
event:hello data:hello1
event:hello data:hello2event: hello
hello1
event:hello data:hello2event: hello
hello1

@btubbs, @ottomata: thoughts?

Idea: Is it possible to wait for a given/ hardcoded custom line ending "end_of_field" vs chunk_size

Hello a newbie question

Idea: Is it possible to wait for a given/ hardcoded custom line ending "end_of_field" vs chunk_size
for example this one
end_of_field = re.compile(r'\n\n\r\n\x27|\r\n\x27') # \n\n\r\n' or \r\n'

My Signify Hue Bridge does send varialbe lenght of event message data which end with the \n\n\r\n'
or with \r\n' incase of long event message data then the Hue Bridge does split up in arround 4096bytes chunks followed \r\n'
and the end of any event message data is always indicated by the \n\n\r\n'

maybe chunk_size behaviour can be set to None to skip completely or set to chunk_size a max safeguard value like 10000
or for now i set chunk_size to 9 \n\n\r\n'

decoder setting

Hi,
I use sseclient to send a request to https://api.trafikinfo.trafikverket.se/v2/data.json, and the got the following error

decoder = lookup(encoding).incrementaldecoder

TypeError: lookup() argument must be str, not None

But when I am looking into the test scripts or examples from programcreek.com, I don't see any example were setting encoder or decoder in the request.

Can someone help? Thanks a lot.

vers. 0.0.25

Please bump the master branch to vers. 0.0.25 and upload it to pypi.

How to close the sseclient?

Hi,
That's more a question than an issue: how do we close the sseclient? I mean how do we close the connection?
Sorry, it might be a python newbie question...

issue updating 0.0.12 to 0.0.14

Had my project running with 0.0.12, and as I installed again it got bumped up to 0.0.14 and I started getting this error:

File "xxxx/lib/python2.7/site-packages/sseclient.py", line 64, in __next__
    nextline = self.resp_file.readline()
AttributeError: 'SSEClient' object has no attribute 'resp_file'

It seems to be an issue with the request not having raw or something, as thecode has the line
self.resp_file = self.resp.raw

requirement is for requests>2.0.0 and I'm using 2.11.1

should the request requirement be higher?

For now, I downgraded to 0.0.12, but looking for a solution to this issue.
Thanks

Suggestion for simplification and ease of use

Overview
The current algorithm used by this client is to read a certain number of bytes from the response stream until an end of event is detected. At that point the event is parsed and emitted and the remainder of the bytes is added to a buffer. The number of bytes to read defaults to 1024 and is customizable by the user.

At one point in this clients history, a problem was noted when the number of bytes is greater than the size of an event. For example, imagine there is a single event in the stream of 1023 bytes. The read will then block until another byte is emitted from the server which can cause significant delays in the first event being emitted by the client.

In order to remedy this, a change was made to reach for a private variable of the requests library (response.raw._fp), in order to do short reads (read1), instead of reading from the main raw stream. This remedies the above because, now, read1 will attempt to read up to 1024 bytes in a single call and will not block if it cannot read that much, allowing events shorter than this size to be emitted as they are received.

Problems
This all works mostly but there are a number of problems with this approach:

  1. This library now depends on an internal piece of the requests library which makes this library more brittle
  2. response.raw._fp has not been processed by the requests library's chunked transfer encoding handling. This means it cannot be used if this encoding is being used and the client must fall back to using blocking reads
  3. As a user, it is hard to know what chunk size I should specify, especially in the case of blocking reads and events that differ significantly in size.

Proposal
I believe all 3 problems can be fixed by instead reading the stream line by line, instead of with a given number of bytes.

  1. response.raw supports readline so there is no longer any need to rely on response.raw._fp
  2. response.raw has the chunked transfer encoding removed so there is no need for any specific handling when a server emits this.
  3. There is no longer any need to specify a chunk size as the client can just block until an entire line is read.

Since Server Sent Events is a line oriented format, we can be sure it is always safe to block until an entire line is emitted. Consider the case where a server only emits some bytes but without any new lines. In that case, we know that the content cannot contain the end of an event and thus blocking until more bytes are read cannot result in any delayed event deliveries.

Alternatively, for backwards compatibility, a chunk size could continue to be accepted and passed into the readline causing it to return even earlier, if lines exceeded that length but I don't think this would have any practical benefit the option should be discouraged going forwards.

If you are happy with this suggestion, please let me know and I will submit a PR.

Asyncronous events

Hello,
I have seen from your code that to parse the events from the server you are using a for loop.
I guess that this mean that the program will lock the resource on the loop.
Is there any possibility to use it in an asynchronous way?
In this way I can consume the events once they arrive.

why self.resp not in __init__ , def iter_content(self): can use it

self.resp not in class SSEClient init
but def iter_content(self): can use it
This problem has puzzled me for a long time. How to achieve it
Let me do a quick example

`

class A(object):
def init(self,name):
self.name = name
self._add()

def _add(self):
    self.b= 1
    self.resp = "haha"
    while b<10:
        yield self.b
        self.b+=1
def p(self):
    chunk = self.resp
    print(chunk)`

a = A("a")
a.p()

`

AttributeError Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 a.p()

Input In [1], in A.p(self)
13 def p(self):
---> 14 chunk = self.resp
15 print(chunk)

AttributeError: 'A' object has no attribute 'resp'`

Disconnected server only detected if graceful disconnected or server restarted

I do have sometimes servers which are dying without any notice. They are sometimes restarted afterwards but sometimes not.
And these disconnected servers are not detected. The session still exists and the the SSECLIENT keeps still connected even if not really any more data recieved.

I would like to get noticed and would the possibility to react...

sseclient 0.0.24 loses events due to Unterminated string... ValueError

sseclient 0.0.24 fails randomly when json.load is processed at the EventSource's data. Randomly means that you can find this failure pretty sure if you process say 50 events from stream. The problem occures with Python 3.7; I've not tested other Python releases. This problem does not occur with sseclient 0.0.22.

I've have implemented a test suite to check this behavior, see https://gerrit.wikimedia.org/r/#/c/pywikibot/core/+/509132/2/tests/eventstreams_tests.py

Additional details you can find on our bug tracker https://phabricator.wikimedia.org/T222885

0.0.18 lags

I updated my package from 0.0.12 to 0.0.18 and have lags up to 1minute.

I use it with firebase and have no lags with 0.0.12. Not sure why

Default chunk size of 1024 is inappropriate (regression in 0.0.16)

With the changes made in 6820dc8, event handling has severely regressed. Instead of messages being processed as they come in, they are buffered until the chunk size is hit and then processed. For my use case, I need messages to be processed immediately, and I believe this would be the expected behavior for most other use cases of this library.

I've found that setting the chunk size to 1 fixes this issue, and I believe that to be the only acceptable chunk size for this library; buffering messages until the chunk size is simply not appropriate in a protocol where one or two characters are the difference between a message being processed or buffered. For example, assume that I have events that happen to be exactly 1024 bytes long, not including the message separator; until the next message arrives and fills up the buffer containing the \r\n, the first message will not be processed.

Another potential solution is to continue to use a large chunk size to avoid high CPU load on noisy endpoints, but with a short timeout on the read so that quieter SSE endpoints do not suffer from this issue.

I'm not sure if you want to back out the changes made in that commit or simply change the default chunk size, but please let me know if I can be of any assistance in getting this resolved.

0.0.24 doesn't process large events

I'm not sure exactly where the problem is here, but I just had to downgrade to 0.0.22 (as the last version I know was working, not sure if the issue is in .23 or .24). Events that were particularly large seemed to not be coming through when looping over an EventSource feed. I'll see if I get some time to dig into this a bit further and provide more information soon.

The first event retrieved by sseclient is always empty

`
.>>> from sseclient import SSEClient
.>>> client = SSEClient(url='https://stream.wikimedia.org/v2/stream/recentchange')
.>>> data = next(client)
.>>> data.dict
{'data': '', 'event': 'message', 'id': None, 'retry': None}
.>>> data = next(client)
.>>> data.dict
{'data': '{"$schema":"/mediawiki/recentchange/1.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/File:Geschichte_des_Schultei%C3%9Fenamts_und_der_Stadt_Neumarkt_014.jpg","request_id":"Xj6I2ApAED4AADHcxFEAAABL","id":"7573678b-87b2-4325-81f8-e338613e537b","dt":"2020-02-08T10:09:30Z","domain":"commons.wikimedia.org","stream":"mediawiki.recentchange","topic":"eqiad.mediawiki.recentchange","partition":0,"offset":2157854683},"id":1327149461,"type":"log","namespace":6,"title":"File:Geschichte des Schulteißenamts und der Stadt Neumarkt 014.jpg","comment":"User created page with UploadWizard","timestamp":1581156570,"user":"DALIBRI","bot":false,"log_id":290782528,"log_type":"upload","log_action":"upload","log_params":{"img_sha1":"5oe8esm1e76smmt6zbv811c0m7ay1jo","img_timestamp":"20200208100930"},"log_action_comment":"uploaded "[[File:Geschichte des Schulteißenamts und der Stadt Neumarkt 014.jpg]]": User created page with UploadWizard","server_url":"https://commons.wikimedia.org","server_name":"commons.wikimedia.org","server_script_path":"/w","wiki":"commonswiki","parsedcomment":"User created page with UploadWizard"}', 'event': 'message', 'id': '[{"topic":"eqiad.mediawiki.recentchange","partition":0,"timestamp":1581156570001},{"topic":"codfw.mediawiki.recentchange","partition":0,"offset":-1}]', 'retry': None}

`

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.