donovanhide / eventsource Goto Github PK
View Code? Open in Web Editor NEWServer Side Events client and server for Go
Home Page: http://godoc.org/github.com/donovanhide/eventsource
License: Other
Server Side Events client and server for Go
Home Page: http://godoc.org/github.com/donovanhide/eventsource
License: Other
If an SSE server breaks the connection, the eventsource client just streams out continuous errors.
(I actually once ended up with about 400GB of superfastmatch log files, using up all available disk on the server because of this ;- )
To replicate:
expected:
stream should output an error, then try again after a delay
observed:
continuous output of ErrUnexpectedEOF errors
foo.go:
package main
import (
"fmt"
"github.com/donovanhide/eventsource"
"os"
)
func main() {
if len(os.Args) < 2 {
fmt.Printf("no feed url specified\n")
os.Exit(1)
}
feedUrl := os.Args[1]
lastEventId := ""
if len(os.Args) > 2 {
lastEventId = os.Args[2]
}
stream, err := eventsource.Subscribe(feedUrl, lastEventId)
if err != nil {
panic(err)
}
for {
select {
case event := <-stream.Events:
fmt.Printf("event %s (%d bytes)\n", event.Id(), len(event.Data()))
case err := <-stream.Errors:
fmt.Printf("error: '%v'\n", err)
}
}
}
Currently there's no way to override the http.Client used for requests, i.e for override TLS settings for instance, among others
Hi, We use this library at work and i have made a bunch of somewhat hacky changes to our fork that i would like to eventually upstream. These have mainly been around ensuring it doesn't leak goroutines and supports contexts for canceling requests when complete or no longer required.
I am just wondering if you have any preference on how i submit PRs (or if i do)
It seems like redirects eventually lead to this code in Go's http package (client.go
):
func (c *Client) doFollowingRedirects(ireq *Request, shouldRedirect func(int) bool) (resp *Response, err error) {
// ...
if redirect != 0 {
nreq := new(Request)
nreq.Method = ireq.Method
if ireq.Method == "POST" || ireq.Method == "PUT" {
nreq.Method = "GET"
}
nreq.Header = make(Header)
nreq.URL, err = base.Parse(urlStr)
if err != nil {
break
}
if len(via) > 0 {
// Add the Referer header.
lastReq := via[len(via)-1]
if ref := refererForURL(lastReq.URL, nreq.URL); ref != "" {
nreq.Header.Set("Referer", ref)
}
err = redirectChecker(nreq, via)
if err != nil {
redirectFailed = true
break
}
}
reqmu.Lock()
req = nreq
reqmu.Unlock()
}
urlStr = req.URL.String()
if resp, err = c.send(req); err != nil {
if wasCanceled() {
err = &httpError{
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
break
}
Basically it means that the Accept: text/content-stream
header is lost. I don't know how to fix this but it means that redirects don't work at all. In my case I get an EOF error because the server closes the connection. This is probably a bug in Go, but can it be worked around?
Currently the duration keeps increasing, and after a while of server downtime the client will be unusable since it'll take ages for it to retry. Both the initial and maximum wait time should be configurable.
I need to be be able to verify a JWT token before the request gets added to the poll. Currently I don't think you can do this, unless I create my own handler. Is this right?
Hi,
We're using eventsource for streaming logs from a remote API to a local command-line client, which works great in general.
Unfortunately, the library currently logs to stdout at two places:
At least the first one is annoying since the message will be printed along the normal logs we're streaming.
What do you think about either removing the log message or making it optional? Both would be fine for us.
/cc @seiffert
I just replaced antage/eventsource
with donovanhide/eventsource
in my application to benefit from the replay feature. ๐
As of now, everything works just fine: events are pushed on demand and replayed as needed, however I noticed some intriguing 2016/01/26 16:08:09 http: multiple response.WriteHeader calls
messages in the logs.
I am using the Handler("channel")
method through a codegangsta/negroni middleware and I created a minimal app to reproduce the problem.
This could be related to urfave/negroni@1398c5e but I am not quite sure how I could fix / workaround this issue.
Have you tried to use eventsource
in some kind of middleware system?
package main
import (
"fmt"
"net/http"
"time"
"github.com/codegangsta/negroni"
"github.com/donovanhide/eventsource"
)
type TimeEvent time.Time
func (t TimeEvent) Id() string { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string { return time.Time(t).String() }
const (
TICK_COUNT = 1000
)
func TimePublisher(srv *eventsource.Server) {
start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
ticker := time.NewTicker(time.Second)
for i := 0; i < TICK_COUNT; i++ {
<-ticker.C
srv.Publish([]string{"time"}, TimeEvent(start))
start = start.Add(time.Second)
}
}
func main() {
router := http.NewServeMux()
SSE := eventsource.NewServer()
defer SSE.Close()
go TimePublisher(SSE)
router.HandleFunc("/stream", SSE.Handler("time"))
n := negroni.Classic()
n.UseHandler(router)
n.Run("127.0.0.1:3456")
}
Connect with:
curl 'http://127.0.0.1:3456/stream'
Observe logs:
go run test.go
[negroni] listening on 127.0.0.1:3456
[negroni] Started GET /stream
2016/01/26 16:16:46 http: multiple response.WriteHeader calls
Error line:
stream.Errors <- err; in receiveEvents() in stream.go.
In go multi-threading environment, it's possible one thread runs to
stream.Errors <- err;
Before running this line,
close(stream.Errors); is called in stream.Close()
I notice there isn't a way to Close()
a subscription at the moment (or am I missing something?). I'm happy to do the work and submit a pull request but I thought I'd ask here first if you had an approach in mind since Decode()
blocks and you may not want to wait till after the next event to close the connection and exit the stream()
loop.
Let's see the example below:
Server side:
package main
import (
"fmt"
"net"
"net/http"
"time"
"github.com/donovanhide/eventsource"
)
type TimeEvent time.Time
func (t TimeEvent) Id() string { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string { return time.Time(t).String() }
const (
TICK_COUNT = 5
)
func TimePublisher(srv *eventsource.Server) {
start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
ticker := time.NewTicker(time.Second)
for {
select {
case <- ticker.C:
}
srv.Publish([]string{"time"}, TimeEvent(start))
start = start.Add(time.Second)
}
}
func main() {
srv := eventsource.NewServer()
srv.Gzip = true
defer srv.Close()
l, err := net.Listen("tcp", "127.0.0.1:8099")
if err != nil {
return
}
defer l.Close()
http.HandleFunc("/time", srv.Handler("time"))
go http.Serve(l, nil)
go TimePublisher(srv)
fmt.Println("event source started.")
select {}
}
Client side:
package main
import (
"fmt"
"github.com/donovanhide/eventsource"
)
func main() {
stream, err := eventsource.Subscribe("http://127.0.0.1:8099/time", "")
if err != nil {
return
}
for ev := range stream.Events{
fmt.Println(ev.Id(), ev.Event(), ev.Data())
}
}
You'll find that after the connection has been extablished, and the client side has print logs like this:
1356998406000000000 Tick 2013-01-01 00:00:06 +0000 UTC
1356998407000000000 Tick 2013-01-01 00:00:07 +0000 UTC
1356998408000000000 Tick 2013-01-01 00:00:08 +0000 UTC
1356998409000000000 Tick 2013-01-01 00:00:09 +0000 UTC
1356998410000000000 Tick 2013-01-01 00:00:10 +0000 UTC
1356998411000000000 Tick 2013-01-01 00:00:11 +0000 UTC
But, if you kill the server side process now, you can find that client just hang there, and no errors occurs.
And there is no retry actions.
How to solve this, guys ?
Some server implementations of SSE/eventsource will send a 101:Switching Protocols followed by a 200:OK , followed by the stream of events.
This does not play well with this implementation because
if resp.StatusCode != 200 {
message, _ := ioutil.ReadAll(resp.Body)
err = SubscriptionError{
Code: resp.StatusCode,
Message: string(message),
}
}
will choke on the first 101 response.
I believe this is something clients should be able to handle because this does not seem to violate the spec.
Unfortunately I don't think there is an easy way to rework this using golang's http client. Other language implementations that I've seen (notably, ruby) seem to rely on parsing the http response themselves, thereby handling the possible 101 response that precedes the 200.
Unfortunately, CloseNotify() is deprecated:
"Deprecated: the CloseNotifier interface predates Go's context package. New code should use Request.Context instead." (https://golang.org/pkg/net/http/#CloseNotifier)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.