Git Product home page Git Product logo

go-astisrt's Introduction

GoReportCard GoDoc Test Coveralls

SRT server, client and socket in GO

First off, a big thanks to the official GO bindings that was an amazing source of inspiration for this project.

However a few things bothered me in it therefore I decided to write my own bindings with a few goals in mind:

  • split the API between both high level entities, similar to GO's net/http entities, with very guided methods (ListenAndServe, Shutdown, Dial, Read, Write etc.) and low level entities, closer to C, with I-should-know-what-I-am-doing-before-using-them methods (Socket, etc.)
  • provide named and typed option setters and getters
  • make sure all errors are handled properly since they are thread-stored and ban the use of runtime.LockOSThread()
  • make sure there's a context specific to each connection in high level methods
  • make sure pointers are the same between the ListenCallback and Accept(), and between the ConnectCallback and Connect()
  • only use blocking mode in high level entities

astisrt has been tested on v1.5.0.

Examples

Examples are located in the examples directory

WARNING: the code below doesn't handle errors for readibility purposes. However you SHOULD!

Server

Go to full example

// Capture SIGTERM
doneSignal := make(chan os.Signal, 1)
signal.Notify(doneSignal, os.Interrupt)

// Create server
s, _ := astisrt.NewServer(astisrt.ServerOptions{
    // Provide options that will be passed to accepted connections
    ConnectionOptions: []astisrt.ConnectionOption{
        astisrt.WithLatency(300),
        astisrt.WithTranstype(astisrt.TranstypeLive),
    },

    // Specify how an incoming connection should be handled before being accepted
    // When false is returned, the connection is rejected.
    OnBeforeAccept: func(c *astisrt.Connection, version int, streamID string) bool {
        // Check stream id
        if streamID != "test" {
            // Set reject reason
            c.SetPredefinedRejectReason(http.StatusNotFound)
            return false
        }

        // Update passphrase
        c.Options().SetPassphrase("passphrase")

        // Add stream id to context
        *c = *c.WithContext(context.WithValue(c.Context(), ctxKeyStreamID, streamID))
        return true
    },

    // Similar to http.Handler behavior, specify how a connection
    // will be handled once accepted
    Handler: astisrt.ServerHandlerFunc(func(c *astisrt.Connection) {
        // Get stream id from context
        if v := c.Context().Value(ctxKeyStreamID); v != nil {
            log.Printf("main: handling connection with stream id %s\n", v.(string))
        }

        // Loop
        for {
            // Read
            b := make([]byte, 1500)
            n, _ := c.Read(b)

            // Log
            log.Printf("main: read `%s`\n", b[:n])

            // Get stats
            s, _ := c.Stats(false, false)

            // Log
            log.Printf("main: %d total bytes received\n", s.ByteRecvTotal())
        }
    }),

    // Addr the server should be listening to
    Host: "127.0.0.1",
    Port: 4000,
})
defer s.Close()

// Listen and serve in a goroutine
doneListenAndServe := make(chan error)
go func() { doneListenAndServe <- s.ListenAndServe(1) }()

// Wait for SIGTERM
<-doneSignal

// Create shutdown context with a timeout to make sure it's cancelled if it takes too much time
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Shutdown
s.Shutdown(ctx)

// Wait for listen and serve to be done
<-doneListenAndServe

Client

Go to full example

// Capture SIGTERM
doneSignal := make(chan os.Signal, 1)
signal.Notify(doneSignal, os.Interrupt)

// Dial
doneWrite := make(chan err)
c, _ := astisrt.Dial(astisrt.DialOptions{
    // Provide options to the connection
    ConnectionOptions: []astisrt.ConnectionOption{
        astisrt.WithLatency(300),
        astisrt.WithPassphrase("passphrase"),
        astisrt.WithStreamid("test"),
    },

    // Callback when the connection is disconnected
    OnDisconnect: func(c *astisrt.Connection, err error) { doneWrite <- err },

    // Addr that should be dialed
    Host: "127.0.0.1",
    Port: 4000,
})
defer c.Close()

// Write in a goroutine
go func() {
    defer func() { close(doneWrite) }()

    // Loop
    r := bufio.NewReader(os.Stdin)
    for {
        // Read from stdin
        t, _ := r.ReadString('\n')

        // Write to the server
        c.Write([]byte(t))
    }
}()

// Wait for either SIGTERM or write end
select {
case <-doneSignal:
    c.Close()
case err := <-doneWrite:
    log.Println(err)
}

// Make sure write is done
select {
case <-doneWrite:
default:
}

Socket

Listen

Go to full example

// Create socket
s, _ := astisrt.NewSocket()
defer s.Close()

// Set listen callback
s.SetListenCallback(func(s *astisrt.Socket, version int, addr *net.UDPAddr, streamID string) bool {
    // Check stream id
    if streamID != "test" {
        // Set reject reason
        s.SetRejectReason(1404)
        return false
    }

    // Update passphrase
    s.Options().SetPassphrase("passphrase")
    return true
})

// Bind
s.Bind("127.0.0.1", 4000)

// Listen
s.Listen(1)

// Accept
as, _, _ := s.Accept()

// Receive message
b := make([]byte, 1500)
n, _ := as.ReceiveMessage(b)

// Log
log.Printf("main: received `%s`\n", b[:n])

Connect

Go to full example

// Create socket
s, _ := astisrt.NewSocket()
defer s.Close()

// Set connect callback
doneConnect := make(chan error)
s.SetConnectCallback(func(s *astisrt.Socket, addr *net.UDPAddr, token int, err error) {
    doneConnect <- err
})

// Set passphrase
s.Options().SetPassphrase("passphrase")

// Set stream id
s.Options().SetStreamid("test")

// Connect
s.Connect("127.0.0.1", 4000)

// Send message
s.SendMessage([]byte("this is a test message"))

// Give time to the message to be received
time.Sleep(500 * time.Millisecond)

// Close socket
s.Close()

// Wait for disconnect
<-doneConnect

Install srtlib from source

You can find the instructions to install srtlib here.

However if you don't feel like doing it manually you can use the following command:

$ make install-srt

srtlib will be built from source in a directory named tmp and located in you working directory.

For your GO code to pick up srtlib dependency automatically, you'll need to add the following environment variables:

(don't forget to replace {{ path to your working directory }} with the absolute path to your working directory)

export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:{{ path to your working directory }}/tmp/v1.5.0/lib/",
export CGO_LDFLAGS="-L{{ path to your working directory }}/tmp/v1.5.0/lib/",
export CGO_CFLAGS="-I{{ path to your working directory }}/tmp/v1.5.0/include/",
export PKG_CONFIG_PATH="{{ path to your working directory }}/tmp/v1.5.0/lib/pkgconfig",

go-astisrt's People

Contributors

asticode avatar flavioribeiro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-astisrt's Issues

After enough socket connections, `OnBeforeAccept` starts getting ignored

No clue how this is happening, but it's a pretty rough bug. After an undefined number of connections handled (10-20 usually), I'm seeing the OnBeforeAccept callback getting skipped. Here's a few logs:

Working:

2023/02/08 11:31:30 11:31:30.448548/SRT:RcvQ:w1.N:SRT.cn: PASSING request from: 10.169.1.52:34472 to agent:949957560

2023/02/08 11:31:30 11:31:30.448801/SRT:RcvQ:w1.N:SRT.cn: Listener managed the connection request from: 10.169.1.52:34472 result:waveahand

2023/02/08 11:31:30 11:31:30.449051/SRT:RcvQ:w1.N:SRT.cn: PASSING request from: 10.169.1.52:34472 to agent:949957560

2023/02/08 11:31:30 11:31:30.449103/SRT:RcvQ:w1 D:SRT.sm: generateSocketID: : @949957541

2023/02/08 11:31:30 conn request
2023/02/08 11:31:30 [onBeforeAccept] New connection request from `10.169.1.52` for account ID `1001` and destination ID `x2KasaV5awqZVhPekJvwHU`.
2023/02/08 11:31:30 11:31:30.451088/SRT:RcvQ:w1.N:SRT.cn: HSREQ/rcv: cmd=1(HSREQ) len=12 vers=0x10501 opts=0xbf delay=120

2023/02/08 11:31:30 11:31:30.451205/SRT:RcvQ:w1.N:SRT.cn: listen ret: -1 - conclusion

2023/02/08 11:31:30 11:31:30.451256/SRT:RcvQ:w1.N:SRT.cn: Listener managed the connection request from: 10.169.1.52:34472 result:waveahand

Broken after a number of sockets handled:

2023/02/08 11:36:22 11:36:22.169017/SRT:RcvQ:w1.N:SRT.cn: PASSING request from: 10.169.1.52:37752 to agent:949957560

2023/02/08 11:36:22 11:36:22.169273/SRT:RcvQ:w1.N:SRT.cn: Listener managed the connection request from: 10.169.1.52:37752 result:waveahand

2023/02/08 11:36:22 11:36:22.169564/SRT:RcvQ:w1.N:SRT.cn: PASSING request from: 10.169.1.52:37752 to agent:949957560

2023/02/08 11:36:22 11:36:22.169655/SRT:RcvQ:w1 D:SRT.sm: generateSocketID: : @949957535

2023/02/08 11:36:22 11:36:22.169867/SRT:RcvQ:w1.N:SRT.cn: HSREQ/rcv: cmd=1(HSREQ) len=12 vers=0x10501 opts=0xbf delay=120

2023/02/08 11:36:22 11:36:22.169957/SRT:RcvQ:w1.N:SRT.cn: listen ret: -1 - conclusion

2023/02/08 11:36:22 11:36:22.169998/SRT:RcvQ:w1.N:SRT.cn: Listener managed the connection request from: 10.169.1.52:37752 result:waveahand```

"astisrt: Connection setup failure: connection timed out" while connecting for the second time

Hi folks ๐Ÿ‘‹ ,

Thanks for the library. I'm not sure if this is an issue or even related to the lib. Anyway, I'm using your library to learn a little bit of srt and web rtc.

A user inputs information about the SRT streaming, and the browser creates an p2p communication with the server. The server connects to the SRT and reads its stream and send it back to the web rtc channel.

sequenceDiagram
    actor User

    box Navy Browser
        participant browser
        participant donut-video
    end

    locahost8080->>+locahost8080: setup local ICE 8081/udp and 8081/tcp
    browser->>+locahost8080: GET /
    locahost8080->>+browser: 200 /index.html
    User->>+browser: feed SRT host, port, and id
    User->>+browser: click on [connect]
    browser->>+donut-video: play

    Note over locahost8080,donut-video: WebRTC connection setup

    donut-video->>+donut-video: web rtc createOffer
    donut-video->>+locahost8080: POST /doSignaling {srtOffer}
    locahost8080->>+locahost8080: process {srtOffer}
    locahost8080->>+locahost8080: create video track
    locahost8080->>+locahost8080: set remote {srtOffer}
    locahost8080->>+locahost8080: set local {answer}
    locahost8080->>+donut-video: {local description}

    Note over locahost8080,donut-video: WebRTC connection setup

    locahost8080->>+SRT: connect

    loop SRT to WebRTC
        locahost8080-->SRT: SRT | WebRTC
        locahost8080-->browser: WebRTC.WriteSample(SRT.PES.Data)
    end

    donut-video-->>donut-video: WebRTC.ontrack(video)
    donut-video-->>browser: renders video at the <video> tag
    browser-->>User: show frames
Loading

You can test that right now, it's working. For the first client.

git clone https://github.com/leandromoreira/donut
cd donut
git checkout refactor
make run
# open chrome on http://localhost:8080 and click on [connect]

But when I try to connect another browser tab, or even when I try to refresh the current page; it doesn't work, it raises an seemingly time out error.

astisrt: connecting failed: astisrt: connecting failed: astisrt: Connection setup failure: connection timed out

Just to help you see how the code flows:

  1. Here's where I connect to the SRT server
  2. Then I run a go routine to stream the data (from SRT to WebRTC) noticed that I finish the request cycle for the browser but the go routine keeps running in background streaming the data.
  3. Here's the unattached go routine that keep pulling data from SRT and sending to the WebRTC peer.

I have an hypothesis of unclosed stream, does SRT auto-closes an open stream after the listener stop pulling data? Or have any limitation about number of clients connecting at the same time?

Do you have any idea of why the program is raising an SRT timeout for the second time?

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.