Git Product home page Git Product logo

eventsource's People

Contributors

abh avatar bcampbell avatar byxorna avatar donovanhide avatar gambol99 avatar m3co-code avatar mlafeldt avatar spring1843 avatar suin avatar timmmm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

eventsource's Issues

server disconnect causes stream to spew out continuous errors

If an SSE server breaks the connection, the eventsource client just streams out continuous errors.
(I actually once ended up with about 400GB of superfastmatch log files, using up all available disk on the server because of this ;- )

To replicate:

  1. start up some SSE server endpoint
  2. use the test program below to connect:
    $ go run foo.go
  3. kill the SSE server

expected:
stream should output an error, then try again after a delay

observed:
continuous output of ErrUnexpectedEOF errors

foo.go:

package main

import (
    "fmt"
    "github.com/donovanhide/eventsource"
    "os"
)

func main() {

    if len(os.Args) < 2 {
        fmt.Printf("no feed url specified\n")
        os.Exit(1)
    }

    feedUrl := os.Args[1]
    lastEventId := ""

    if len(os.Args) > 2 {
        lastEventId = os.Args[2]
    }

    stream, err := eventsource.Subscribe(feedUrl, lastEventId)
    if err != nil {
        panic(err)
    }

    for {
        select {
        case event := <-stream.Events:
            fmt.Printf("event %s (%d bytes)\n", event.Id(), len(event.Data()))
        case err := <-stream.Errors:
            fmt.Printf("error: '%v'\n", err)
        }
    }
}

Contributing preferences

Hi, We use this library at work and i have made a bunch of somewhat hacky changes to our fork that i would like to eventually upstream. These have mainly been around ensuring it doesn't leak goroutines and supports contexts for canceling requests when complete or no longer required.

I am just wondering if you have any preference on how i submit PRs (or if i do)

Doesn't work with redirects.

It seems like redirects eventually lead to this code in Go's http package (client.go):

func (c *Client) doFollowingRedirects(ireq *Request, shouldRedirect func(int) bool) (resp *Response, err error) {

    // ...
        if redirect != 0 {
            nreq := new(Request)
            nreq.Method = ireq.Method
            if ireq.Method == "POST" || ireq.Method == "PUT" {
                nreq.Method = "GET"
            }
            nreq.Header = make(Header)
            nreq.URL, err = base.Parse(urlStr)
            if err != nil {
                break
            }
            if len(via) > 0 {
                // Add the Referer header.
                lastReq := via[len(via)-1]
                if ref := refererForURL(lastReq.URL, nreq.URL); ref != "" {
                    nreq.Header.Set("Referer", ref)
                }

                err = redirectChecker(nreq, via)
                if err != nil {
                    redirectFailed = true
                    break
                }
            }
            reqmu.Lock()
            req = nreq
            reqmu.Unlock()
        }

        urlStr = req.URL.String()
        if resp, err = c.send(req); err != nil {
            if wasCanceled() {
                err = &httpError{
                    err:     err.Error() + " (Client.Timeout exceeded while awaiting headers)",
                    timeout: true,
                }
            }
            break
        }

Basically it means that the Accept: text/content-stream header is lost. I don't know how to fix this but it means that redirects don't work at all. In my case I get an EOF error because the server closes the connection. This is probably a bug in Go, but can it be worked around?

Make max backoff duration configurable

Currently the duration keeps increasing, and after a while of server downtime the client will be unusable since it'll take ages for it to retry. Both the initial and maximum wait time should be configurable.

Authenticate a request

I need to be be able to verify a JWT token before the request gets added to the poll. Currently I don't think you can do this, unless I create my own handler. Is this right?

Logging of messages to stdout

Hi,

We're using eventsource for streaming logs from a remote API to a local command-line client, which works great in general.

Unfortunately, the library currently logs to stdout at two places:

At least the first one is annoying since the message will be printed along the normal logs we're streaming.

What do you think about either removing the log message or making it optional? Both would be fine for us.

/cc @seiffert

`http: multiple response.WriteHeader calls` messages when used with `negroni`

I just replaced antage/eventsource with donovanhide/eventsource in my application to benefit from the replay feature. ๐Ÿ‘Œ

As of now, everything works just fine: events are pushed on demand and replayed as needed, however I noticed some intriguing 2016/01/26 16:08:09 http: multiple response.WriteHeader calls messages in the logs.

I am using the Handler("channel") method through a codegangsta/negroni middleware and I created a minimal app to reproduce the problem.

This could be related to urfave/negroni@1398c5e but I am not quite sure how I could fix / workaround this issue.

Have you tried to use eventsource in some kind of middleware system?

Steps to reproduce

package main

import (
    "fmt"
    "net/http"
    "time"

    "github.com/codegangsta/negroni"
    "github.com/donovanhide/eventsource"
)

type TimeEvent time.Time

func (t TimeEvent) Id() string    { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string  { return time.Time(t).String() }

const (
    TICK_COUNT = 1000
)

func TimePublisher(srv *eventsource.Server) {
    start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
    ticker := time.NewTicker(time.Second)
    for i := 0; i < TICK_COUNT; i++ {
        <-ticker.C
        srv.Publish([]string{"time"}, TimeEvent(start))
        start = start.Add(time.Second)
    }
}

func main() {
    router := http.NewServeMux()
    SSE := eventsource.NewServer()
    defer SSE.Close()
    go TimePublisher(SSE)
    router.HandleFunc("/stream", SSE.Handler("time"))

    n := negroni.Classic()
    n.UseHandler(router)
    n.Run("127.0.0.1:3456")
}

Connect with:

curl 'http://127.0.0.1:3456/stream'

Observe logs:

go run test.go
[negroni] listening on 127.0.0.1:3456
[negroni] Started GET /stream
2016/01/26 16:16:46 http: multiple response.WriteHeader calls

After call stream.Close(), panic: send on closed channel

Error line:
stream.Errors <- err; in receiveEvents() in stream.go.

In go multi-threading environment, it's possible one thread runs to
stream.Errors <- err;
Before running this line,
close(stream.Errors); is called in stream.Close()

Close() for subscriptions.

I notice there isn't a way to Close() a subscription at the moment (or am I missing something?). I'm happy to do the work and submit a pull request but I thought I'd ask here first if you had an approach in mind since Decode() blocks and you may not want to wait till after the next event to close the connection and exit the stream() loop.

Client will hang there if the server side crashed after the connection has already been established.

Let's see the example below:

Server side:

package main

import (
	"fmt"
	"net"
	"net/http"
	"time"

	"github.com/donovanhide/eventsource"
)

type TimeEvent time.Time

func (t TimeEvent) Id() string    { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string  { return time.Time(t).String() }

const (
	TICK_COUNT = 5
)

func TimePublisher(srv *eventsource.Server) {
	start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <- ticker.C:
		}
		srv.Publish([]string{"time"}, TimeEvent(start))
		start = start.Add(time.Second)
	}
}

func main() {
	srv := eventsource.NewServer()
	srv.Gzip = true
	defer srv.Close()
	l, err := net.Listen("tcp", "127.0.0.1:8099")
	if err != nil {
		return
	}
	defer l.Close()
	http.HandleFunc("/time", srv.Handler("time"))
	go http.Serve(l, nil)
	go TimePublisher(srv)
	fmt.Println("event source started.")
	select {}
}

Client side:

package main

import (
	"fmt"
	"github.com/donovanhide/eventsource"
)

func main() {
	stream, err := eventsource.Subscribe("http://127.0.0.1:8099/time", "")
	if err != nil {
		return
	}
	for ev := range stream.Events{
		fmt.Println(ev.Id(), ev.Event(), ev.Data())
	}
}

You'll find that after the connection has been extablished, and the client side has print logs like this:

1356998406000000000 Tick 2013-01-01 00:00:06 +0000 UTC
1356998407000000000 Tick 2013-01-01 00:00:07 +0000 UTC
1356998408000000000 Tick 2013-01-01 00:00:08 +0000 UTC
1356998409000000000 Tick 2013-01-01 00:00:09 +0000 UTC
1356998410000000000 Tick 2013-01-01 00:00:10 +0000 UTC
1356998411000000000 Tick 2013-01-01 00:00:11 +0000 UTC

But, if you kill the server side process now, you can find that client just hang there, and no errors occurs.

And there is no retry actions.

How to solve this, guys ?

eventsource stream does not handle "101:Switching Protocols"

Some server implementations of SSE/eventsource will send a 101:Switching Protocols followed by a 200:OK , followed by the stream of events.

This does not play well with this implementation because

if resp.StatusCode != 200 {
        message, _ := ioutil.ReadAll(resp.Body)
        err = SubscriptionError{
            Code:    resp.StatusCode,
            Message: string(message),
        }
    }

will choke on the first 101 response.

I believe this is something clients should be able to handle because this does not seem to violate the spec.

Unfortunately I don't think there is an easy way to rework this using golang's http client. Other language implementations that I've seen (notably, ruby) seem to rely on parsing the http response themselves, thereby handling the possible 101 response that precedes the 200.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.