soheilhy / cmux Goto Github PK
View Code? Open in Web Editor NEWConnection multiplexer for GoLang: serve different services on the same port!
License: Apache License 2.0
Connection multiplexer for GoLang: serve different services on the same port!
License: Apache License 2.0
Hi, cmux has been working great for us. Recently we added a PATCH endpoint to our application and were surprised the HTTP1Fast
matcher doesn't automatically match on PATCH.
Is there a particular reason for not including PATCH or is this an oversight?
All other methods are in there except PATCH and OPTIONS.
Lines 46 to 55 in 5ec6847
I'm surprised that golint
doesn't complain, perhaps a better name is cmux.Server
?
Hi there,
Situration: my project is running in istio + k8s. When the http1.1 requests comes into the istio, it will be upgraded to http2 and passed to k8s pod. Also, I need to run non-tls gRPC and HTTP server on the same port, e.g. :9000
Issue: I try to use cmux to create machers to split grpc and rest requests but i am failed.
Could you please provide an example to achieve that?
Thanks,
You're probably on it but not sure how to communicate with you otherwise :-) - can we get a release for the latest changes? I want to get off pinning to master and use SemVer.
the connection multiplexer leaks memory for long term usage. i did a pprof this is what i found.
(cmux.serve) -> HTTP2MatchHeaderFieldSendSettings->matchHTTP2Field->ReadFrame->readFrameHeader->io.ReadFull->ReadAtLeast->bufferedReader.Read->s.buffer.Write->bytes.(*Buffer).grow->bytes.makeSlice
and have seen this slice keep increasing and this is inuse space i am talking about which doesn't get freed. so that is causing our application to use too much memory after couple of days it shuts down eventually.
Is there something we can do address this ?
i was thinking once we determine we have that setting frame or not we should release this buffer right . But for some reason it doesn't happen.
Thanks
example code:
package main
import (
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
)
func main() {
lis, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
mux := cmux.New(lis)
grpcL := mux.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
_ = grpcL
s := grpc.NewServer()
// xxx.RegisterXXXServiceServer(s, XXXIMPL)
go func() {
// FIXME: using muxed grpcL here can not graceful stop
err = s.Serve(lis)
fmt.Println(err)
}()
fmt.Println("wait for sig")
c := make(chan os.Signal, 1)
signal.Notify(c, []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT}...)
<-c
// FIXME: hangs on when start server with grpcL
s.GracefulStop()
fmt.Println("stopped")
time.Sleep(time.Second * 1)
}
Apologies for this, but due to weirdness of go modules, see here golang/go#29323, when vendoring cmux as a dependency it also vendors google.golang.org/grpc purely because it is a test dependency.
Would you perhaps consider moving the example test to its own package?
github.com/cockroachdb/cmux is a cmus fork incorporated into Etcd v3. Needless forking is harmful. Perhaps both projects could join efforts and benefit from working together?
Are there material downsides to supporting TLS gRPC + plaintext http using a configuration as below?
listener, _ := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%v", port))
combinedMux := cmux.New(listener)
routeListener := combinedMux.Match(cmux.HTTP1Fast())
grpcListener := combinedMux.Match(cmux.Any())
creds, _ := credentials.NewServerTLSFromFile( serverPemFilePath, serverKeyFilePath)
grpcServer := grpc.NewServer(grpc.Creds(creds)...)
routeMux := http.NewServeMux()
routeMux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
//some handler
}
s := &http.Server{
Handler: routeMux,
}
go grpcServer.Serve(grpcListener)
go s.Serve(routeListener)
combinedMux.Serve()
Hi
I am trying to serve a http and tcp server with cmux. The http server works great but I can not fix a problem with the tcp server.
The tcp server for testing is a simple echo service. The problem is after client is connected and send out the first message, the server will see fragmented message (which is read by two successive conn.Read() )
Attached the source code.
Server: cmux_http_tcp.go
package main
import (
"github.com/soheilhy/cmux"
"net"
"net/http"
"fmt"
"time"
)
func tcpServer (l net.Listener) {
// echo service
for {
conn, _ := l.Accept()
fmt.Println("tcp connected")
go func() {
for {
data := make([]byte, 100)
conn.SetReadDeadline(time.Now().Add(100*time.Second))
n, err := conn.Read(data)
if err==nil {
fmt.Println(string(data[:n]))
conn.Write(data)
}
}
}()
}
}
type exampleHTTPHandler struct{}
func (h *exampleHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "example http response")
}
func serveHTTP(l net.Listener) {
s := &http.Server{
Handler: &exampleHTTPHandler{},
}
if err := s.Serve(l); err != cmux.ErrListenerClosed {
panic(err)
}
}
func both(l net.Listener) {
m := cmux.New(l)
httpL := m.Match(cmux.HTTP1Fast())
tcpL := m.Match(cmux.Any())
go tcpServer(tcpL)
//go httpServer(httpL)
go serveHTTP(httpL)
m.Serve()
}
func main() {
l, err := net.Listen("tcp", ":3999")
if err!=nil {
panic(err.Error())
}
fmt.Println("server started")
// only tcp
//tcpServer(l)
// only http
//serveHTTP(l)
// both
both(l)
}
Client: cmux_tcp_client.go
package main
import (
"net"
"fmt"
"time"
)
func keepWriting(conn net.Conn) {
for {
// request - send something
_, err := conn.Write([]byte("hello tcp server"))
fmt.Printf("sent: hello tcp server\n")
if err != nil {
fmt.Println(err.Error())
}
time.Sleep(5*time.Second)
}
}
func keepReading(conn net.Conn) {
for {
response := make([]byte, 100)
n, err := conn.Read(response)
if err != nil {
fmt.Println(err.Error())
}
fmt.Printf("received: %v\n", string(response[:n]))
}
}
func main() {
// make address
tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:3999")
if err!=nil {
fmt.Println(err.Error())
}
// connect
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
fmt.Println(err.Error())
}
fmt.Println("connected to server")
go keepWriting(conn)
keepReading(conn)
}
the client log:
➜ networking git:(master) ✗ go run cmux_tcp_client.go
connected to server
//*** the first message is fragmented ***
sent: hello tcp server
received: hello tc
received: p server
//******* following messages are complete *******************
sent: hello tcp server
received: hello tcp server
^Csignal: interrupt
BTW I tested the tcp server separately (uncomment tcpServer(l) in server side) and everything works well. Please help.
Edit: I know TCP is not a packet protocol that provide 1:1 packet relationship between server and client ... My question is whether the fragmentation is done by to cmux? Why only the first message is fragmented and the following ones are not?
some protocols are private and strange, so I'd like provide a method to write customer defined protocol Matcher.
Hi,
I'm trying to use cmux
to serve gRPC + SSH connections, but I cannot make it works.
lis, _ := net.Listen("tcp", ":8080")
l := cmux.New(lis)
grpcL := l.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
sshL := l.Match(cmux.Any())
It serves gRPC well but it blocks when I connect with a ssh client.
If I comment the gRPC Match
the SSH connections work. Am I doing something wrong?
Thanks
I have a gRPC server behind cmux. In this server, I follow graceful shutdown practices.
So when the program receives a termination signal, I stop servers in this order:
go func() {
<-ctx.Done()
grpcServer.GracefulStop()
mux.Close()
}()
However, my grpcServer.Serve(grpcListener)
receives an error like:
mux: Server closed
This is not great, because I am trying to do this and I do not have a way of seemingly checking this error with errors.Is
or with something like err == cmux.ErrServerClosed
because no such type is defined.
go func() {
defer wg.Done()
if err := grpcServer.Serve(grpcLis); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatal("grpc: server failed", zap.Error(err))
}
log.Debug("grpc: server closed without error")
}()
I know docs say users can just do go grpcServer.Serve(grpcListener)
but I do not find that to be reliable enough.
Methods exist for
But there is not a SendSettings option for HTTP2(). Does the existing HTTP2 method not have the same conflict with java gRPC clients as HTTP2MatchHeaderField and HTTP2MatchHeaderFieldPrefix (which require the SendSettings alternatives)?
I would like to match only on the protocol, but need to maintain support for java clients. Is this possible?
Hi,
I can't find the precise libs and respective versions required to run this project, so I've saved a godeps package with my current ones.
However when I run the tests, I'm facing the following compilation problems.
Could you advise me, what are the proper libs I should have installed in order to by pass this ?
Cheers.
$ godep go test
# cmux/vendor/google.golang.org/grpc/transport
vendor/google.golang.org/grpc/transport/http2_client.go:840: undefined: http2.MetaHeadersFrame
vendor/google.golang.org/grpc/transport/http2_client.go:943: undefined: http2.MetaHeadersFrame
vendor/google.golang.org/grpc/transport/http2_server.go:145: undefined: http2.MetaHeadersFrame
vendor/google.golang.org/grpc/transport/http2_server.go:281: undefined: http2.MetaHeadersFrame
vendor/google.golang.org/grpc/transport/http_util.go:382: f.fr.ReadMetaHeaders undefined (type *http2.Framer has no field or method ReadMetaHeaders)
vendor/google.golang.org/grpc/transport/http_util.go:512: f.fr.ErrorDetail undefined (type *http2.Framer has no field or method ErrorDetail)
FAIL cmux [build failed]
godep: go exit status 2
This is the godeps I've got after git clone from master and run godep save:
{
"ImportPath": "cmux",
"GoVersion": "go1.7",
"GodepVersion": "v74",
"Deps": [
{
"ImportPath": "github.com/golang/protobuf/proto",
"Rev": "7cc19b78d562895b13596ddce7aafb59dd789318"
},
{
"ImportPath": "github.com/soheilhy/cmux",
"Comment": "v0.1.2",
"Rev": "bf4a8ede9e87c006fe1d4278c6c7f2b8be1fa84c"
},
{
"ImportPath": "golang.org/x/net/context",
"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
},
{
"ImportPath": "golang.org/x/net/http2",
"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
},
{
"ImportPath": "golang.org/x/net/http2/hpack",
"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
},
{
"ImportPath": "golang.org/x/net/internal/timeseries",
"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
},
{
"ImportPath": "golang.org/x/net/trace",
"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
},
{
"ImportPath": "golang.org/x/net/websocket",
"Rev": "cbbbe2bc0f2efdd2afb318d93f1eadb19350e4a3"
},
{
"ImportPath": "google.golang.org/grpc",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/codes",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/credentials",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/examples/helloworld/helloworld",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/grpclog",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/internal",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/metadata",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/naming",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/peer",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
},
{
"ImportPath": "google.golang.org/grpc/transport",
"Comment": "v1.0.2-30-g9eaed1a",
"Rev": "9eaed1a74af580b44448989c8ed830bc210bddf4"
}
]
}
If I change https://github.com/grpc/grpc-go/blob/81b95b1854d7caf3cc21aed316fc222e1749cf31/examples/helloworld/greeter_server/main.go#L53 to:
m := cmux.New(lis)
go func() {
if err := s.Serve(m.Match(cmux.Any())); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
m.Serve()
This works as expected. The client side prints 2020/12/12 09:37:44 Greeting: Hello world
. But if I add the content-type matcher,
m := cmux.New(lis)
go func() {
if err := s.Serve(m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
m.Serve()
The client cannot connect and blocks forever. In this switch
Line 241 in 8a8ea3c
case *http2.SettingsFrame:
is being taken but not case *http2.ContinuationFrame:
or case *http2.HeadersFrame:
and I am guessing either of those two branches are supposed to be taken. Unit tests in this repo are passing. I'm not sure what is wrong.Could be related to #29 and #30
I have a server (gRPC, HTTP, and HTTPs) that is intended to run on Kubernetes, relevant code:
// code in main() simplified.
var serverClosed chan bool
listenAddr := ":8091"
func main() {
serverClosed = make(chan bool)
srvListener, _ = net.Listen("tcp", listenAddr)
mainServer := &http.Server{TLSConfig: &tls.Config{}} // tls configured
healthCheckServer := &http.Server{}
go startServer(mainServer, healthCheckServer, srvListener)
// wait for the servet to exit
// this is actually done because I have another goroutine that updates the certificate
// through let's encrypt and then closes srvListener, changes the TLSConfig on the
// mainServer and calls startServer again. It's not visible here because the server is crashing
// before that goroutine even run (it runs twice a day).
<- serverClosed
}
func startServer(mainServer, healthCheckServer *http.Server, srvListener net.Listener) {
log.Printf("starting the HTTP/HTTPS/gRPC server bound to %q", listenAddr)
// create a new connection multiplexer.
m := cmux.New(srvListener)
// we first match on HTTP 1.1 methods.
httpl := m.Match(cmux.HTTP1Fast())
// if not matched, we assume that its TLS.
tlsl := m.Match(cmux.Any())
// start the mainServer
go mainServer.Serve(tls.NewListener(tlsl, mainServer.TLSConfig))
// start the healthCheckServer
go healthCheckServer.Serve(httpl)
// boot the connection multiplexer, this should return once the srvListener
// is closed.
m.Serve()
select {
case serverClosed <- true:
default:
}
}
Having a pod health check setup as follows:
livenessProbe:
tcpSocket:
port: 8091
initialDelaySeconds: 5
timeoutSeconds: 1
readinessProbe:
httpGet:
port: 8091
path: /healthz
initialDelaySeconds: 5
timeoutSeconds: 1
Given the initialDelaySeconds=5
, I assume the server has no time to setup all of the matchers and:
panic: runtime error: index out of range
goroutine 86 [running]:
panic(0xf203a0, 0xc820014010)
/usr/local/go/src/runtime/panic.go:481 +0x3e6
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*ptNode).match(0xc820405590, 0xc82051d6b8, 0x0, 0x8, 0x1, 0x0)
/go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/patricia.go:162 +0x1f0
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*patriciaTree).matchPrefix(0xc82040ea00, 0x7f5a1e877b08, 0xc820272550, 0x40cfca)
/go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/patricia.go:52 +0x9f
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*patriciaTree).(github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.matchPrefix)-fm(0x7f5a1e877b08, 0xc820272550, 0x7f5a1e8675b0)
/go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/matchers.go:37 +0x34
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.matchersToMatchWriters.func1(0x7f5a1e8675b0, 0xc820022508, 0x7f5a1e877b08, 0xc820272550, 0xc820022508)
/go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/cmux.go:112 +0x32
github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*cMux).serve(0xc82046a200, 0x7f5a1e866e90, 0xc820022508, 0xc8204691a0, 0xc82051d700)
/go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/cmux.go:168 +0x33e
created by github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux.(*cMux).Serve
/go/src/github.com/publica-project/grpc-logger/vendor/github.com/soheilhy/cmux/cmux.go:158 +0x188
Changing initialDelaySeconds
to 20 seconds solves the problem.
Feature request:
Right now once we start cmux.Serve is there a way to clean shut down the cmux listeners and all its internal listeners it starts. So that way during the whole service shut down we call cmux.Stop and make sure all the listeners are shut down clean.
Line 37 in 5ec6847
cmux is very useful.
Thank you!
Google Cloud Run is a serverless solution built atop Knative and it limits deployed services to a single port.
This is a challenge when running, e.g. gRPC services that also expose Prometheus metrics.
Using cmux, I'm able to multiplex both services onto the single port and serve gRPC and Prometheus metrics with little effort.
I wrote up the solution here: Multiplexing gRPC and HTTP (Prometheus) endpoints with Cloud Run
NOTE I did need to use the Java gRPC client solution to get this to work (locally and when deployed to Cloud Run) with Golang clients (my own and gRPCurl
Background: Services on Cloud Run can only expose 1 port. If you want to expose a combination of gRPC and a regular REST API, you have to multiplex them. (e.g. using cmux)
In a standard setup, Cloud Run downgrades HTTP/2 requests to HTTP/1, and the setup with cmux seems works fine.
However, if you want to support gRPC streaming, you have to enable End-to-end HTTP/2 support on your Cloud Run service. The Cloud Run instructions say:
Your Cloud Run service must handle requests in HTTP/2 cleartext (h2c) format, because TLS is still terminated automatically by Cloud Run. To confirm that your service supports h2c requests, test the service locally using this cURL command:
curl -i --http2-prior-knowledge http://localhost:PORT
So, my current setup is:
gsrv := grpc.NewServer(...)
hsrv := &http.Server{ Handler: h2c.NewHandler(..., &http2.Server{})}
srv := cmux.New(conn)
go gsrv.Serve(srv.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")))
go hsrv.Serve(srv.Match(cmux.Any()))
This unfortunately does not work (even locally, no Cloud Run involved):
hsrv
, using curl --http2-prior-knowledge
) works finecurl --http2-prior-knowledge
) no longer works: I get the error http2: server connection error from [::1]:64054: connection error: PROTOCOL_ERROR
Without knowing the code or HTTP/2, my guess from what I'm seeing is that the cmux matcher sends a http2 settings frame during negotiation, but when it still fails to match gRPC and falls back to the next 'regular' http2 handler, the http2 handler starts negotiation from scratch and sends its own settings frame, which causes problems
Your work is very helpful to me.
Do you have any plan to support websocket in cmux?
If possible, plz give examples using http1, https, http2, websocket.
cockroachdb/cmux is a strange mix of dubious changes and fixes
Would you mind looking at it and merging back what's worthwhile? I'd love to forget cockroachdb/cmux ever existed
Documenting my findings debugging a production issue:
tl;dr is that a client can mess with stopping of a server, because the sniffing mechanism has no notion of draining for connections that have yet to be matched to a sub-listener. The specific scenario I encountered is:
Net effect is that grpc.Server.Stop/GracefulStop() & cmux.Serve() can't return until the client connection is remotely closed.
Not entirely sure what the right behavior here is. My gut take is that cmux Accept() should preserve the exit semantics of the wrapped listener Accept, and return its error even though there our outstanding, still-to-be-sniffed connections.
Collected traces:
crux.Serve has found that the wrapped listener Accept has error’d.
It’s trying to return, but is blocked on it's own WG within a defer:
goroutine 1798 [semacquire, 5 minutes]:
sync.runtime_Semacquire(0xc00066c008)
/usr/local/go/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00066c000)
/usr/local/go/src/sync/waitgroup.go:130 +0x64
github.com/soheilhy/cmux.(*cMux).Serve.func1(0xc00012a4b0, 0xc00066c000)
/gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:150 +0x55
github.com/soheilhy/cmux.(*cMux).Serve(0xc00012a4b0, 0x1497b00, 0xc006278640)
/gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:165 +0x120
go.gazette.dev/core/server.(*Server).QueueTasks.func1(0x20, 0x20)
/gazette/server/server.go:124 +0x40
go.gazette.dev/core/task.(*Group).GoRun.func1(0x0, 0x0)
/gazette/task/group.go:72 +0x43
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000530a80, 0xc001390080)
/gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
created by golang.org/x/sync/errgroup.(*Group).Go
/gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66
That WG can't finish because a connection thread is stuck waiting to sniff an HTTP/2 header:
goroutine 491775429 [IO wait, 31 minutes]:
internal/poll.runtime_pollWait(0x7f7a20ed50f8, 0x72, 0xffffffffffffffff)
/usr/local/go/src/runtime/netpoll.go:203 +0x55
internal/poll.(*pollDesc).wait(0xc0065afb98, 0x72, 0x0, 0x9, 0xffffffffffffffff)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).Read(0xc0065afb80, 0xc001e66498, 0x9, 0x9, 0x0, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_unix.go:169 +0x19b
net.(*netFD).Read(0xc0065afb80, 0xc001e66498, 0x9, 0x9, 0x865b8e, 0x10401, 0xc000000000)
/usr/local/go/src/net/fd_unix.go:202 +0x4f
net.(*conn).Read(0xc0013b8020, 0xc001e66498, 0x9, 0x9, 0x0, 0x0, 0x0)
/usr/local/go/src/net/net.go:184 +0x8e
github.com/soheilhy/cmux.(*bufferedReader).Read(0xc004b59820, 0xc001e66498, 0x9, 0x9, 0xc000184a80, 0x7f7a9f3327d0, 0x0)
/gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/buffer.go:53 +0x12c
io.ReadAtLeast(0x1496d40, 0xc004b59820, 0xc001e66498, 0x9, 0x9, 0x9, 0x85db95, 0xc0091bf440, 0xc0091b0004)
/usr/local/go/src/io/io.go:310 +0x87
io.ReadFull(...)
/usr/local/go/src/io/io.go:329
golang.org/x/net/http2.readFrameHeader(0xc001e66498, 0x9, 0x9, 0x1496d40, 0xc004b59820, 0x0, 0x0, 0xc0091bf440, 0x0)
/gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/http2/frame.go:237 +0x87
golang.org/x/net/http2.(*Framer).ReadFrame(0xc001e66460, 0x14a4d60, 0xc0091bf440, 0x0, 0x0)
/gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/http2/frame.go:492 +0xa1
github.com/soheilhy/cmux.matchHTTP2Field(0x1497b80, 0xc0013b8020, 0x1496d40, 0xc004b59820, 0x121d9ea, 0xc, 0xc0054e9ec0, 0x415285)
/gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/matchers.go:236 +0x148
github.com/soheilhy/cmux.HTTP2MatchHeaderFieldSendSettings.func1(0x1497b80, 0xc0013b8020, 0x1496d40, 0xc004b59820, 0xc0013b8020)
/gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/matchers.go:173 +0xca
github.com/soheilhy/cmux.(*cMux).serve(0xc00012a4b0, 0x14c1ea0, 0xc0013b8020, 0xc00015c120, 0xc00066c000)
/gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:184 +0x1d3
created by github.com/soheilhy/cmux.(*cMux).Serve
/gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:171 +0x19d
Meanwhile, gRPC Serve() is blocked waiting for Accept to return. It must do so before it can notify the gRPC server’s own WG, which is a prerequisite for GracefulStop or Stop to return:
goroutine 1800 [chan receive, 5 minutes]:
github.com/soheilhy/cmux.muxListener.Accept(...)
/gazette/.build/go-path/pkg/mod/github.com/soheilhy/[email protected]/cmux.go:229
google.golang.org/grpc.(*Server).Serve(0xc0001661a0, 0x14b2060, 0xc000113820, 0x0, 0x0)
/gazette/.build/go-path/pkg/mod/google.golang.org/[email protected]/server.go:621 +0x210
go.gazette.dev/core/server.(*Server).QueueTasks.func3(0x0, 0x1)
/gazette/server/server.go:136 +0x44
go.gazette.dev/core/task.(*Group).GoRun.func1(0x0, 0x0)
/gazette/task/group.go:72 +0x43
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000530a80, 0xc0013900c0)
/gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
created by golang.org/x/sync/errgroup.(*Group).Go
/gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66
For completeness, here's where GracefulStop is wedged waiting on it's WG, held hostage by grpc.Serve:
goroutine 1802 [semacquire, 5 minutes]:
sync.runtime_Semacquire(0xc00016631c)
/usr/local/go/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00016631c)
/usr/local/go/src/sync/waitgroup.go:130 +0x64
google.golang.org/grpc.(*Server).GracefulStop(0xc0001661a0)
/gazette/.build/go-path/pkg/mod/google.golang.org/[email protected]/server.go:1551 +0x1b1
go.gazette.dev/core/broker.(*Service).QueueTasks.func2(0xc00030eb90, 0x50)
/gazette/broker/service.go:71 +0xa7
go.gazette.dev/core/task.(*Group).GoRun.func1(0x14b6660, 0xc0004b03c0)
/gazette/task/group.go:72 +0x43
golang.org/x/sync/errgroup.(*Group).Go.func1(0xc000530a80, 0xc001390100)
/gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:57 +0x59
created by golang.org/x/sync/errgroup.(*Group).Go
/gazette/.build/go-path/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:54 +0x66
I have a server that receives traffic from a standard tcp listener, but doesn't receive traffic from a cmux.Any() connection derived from the same listener. I have a small, reproducible case:
This works (i.e. can receive traffic):
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
logger.Fatal("Could not listen on port.")
}
go grpcServer.Start(listener) // Works - connections are received and processed.
This doesn't work (i.e. connection hangs when making a request):
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
logger.Fatal("Could not listen on port.")
}
m := cmux.New(listener)
grpcListener := m.Match(cmux.Any())
go grpcServer.Start(grpcListener) // Doesn't work - connections hang.
many thanks for this mux - extremely useful and bulletproof.
I have a framework that muxes grpc and http over plaintext and TLS.
I'd like to add prometheus metrics to count the number of incoming connections on each mux but I can't figure out where to add that.
Can anyone steer me in the right direction?
See #32
Hi,
Thanks for this library. I am trying to serve GRPC and GRPC-gateway(only https) on the same port and terminate SSL for both at the go server. Do you mind showing example on how to do that? Here is a code example of what I am trying to do:
https://github.com/appscode/appstream/blob/master/cmd/appstream/app/server.go#L48
Thanks.
I was looking through the fork graph and found a few commits that might be of interest that should perhaps be merged in.
Once a tcp connection established, but without send any data. Like nc localhost 8080
with no latter
operations.
This may cause the matching process to block for-ever until a min length of data is received and the connection may never receives a stop signal(donec
). Issue code line :
Line 184 in 8a8ea3c
I am partial to interfaces as well, but two things here:
Thoughts?
Looks like there has been no activity since 2021, can we upgrade dependencies and Go versions?
You're example has a tiny bug, which tripped me up for a few days. You're matchers are case sensitive. So
// Match connections in order:
// First grpc, then HTTP, and otherwise Go RPC/TCP.
- grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
+ grpcL := m.Match(cmux.HTTP2HeaderField("Content-type", "application/grpc"))
httpL := m.Match(cmux.HTTP1Fast())
trpcL := m.Match(cmux.Any()) // Any means anything that is not yet matched.
As the title
We are using cmux with gRPC and grpc-gateway like this example. . But we found out that using
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
consumes too much cpu.
$ ~> ps axo pid,etime,%cpu,%mem,cmd | grep 'server-combine' | grep -v grep 7054 00:07 123 0.1 server-combine
Using m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
is ok. but if try to use the L28 this takes too much CPU.
Is there any fix for this?
Please consider assigning version numbers and tagging releases. Tags/releases
are useful for downstream package maintainers (in Debian and other distributions) to export source tarballs, automatically track new releases and to declare dependencies between packages. Read more in the Debian Upstream Guide.
Versioning provides additional benefits to encourage vendoring of a particular (e.g. latest stable) release contrary to random unreleased snapshots.
Thank you.
See also
I created a cmux server as described in the example to match the application/grpc content-type HTTP2 header tag. It successfully matches against C++ and Go gRPC clients.
However, the Java client from https://github.com/grpc/grpc-java is not matched. The implementation of matchHTTP2Field blocks on framer.ReadFrame(). It reads a SETTINGS and WINDOWS_UPDATE frame, but doesn't get to the HEADER frame. Perhaps it needs to negotiate the http2 connection before it gets to the HEADER frame?
This is a notice that grpc-go intends to change in a way that we know will break the way cmux currently works by default. This will bring grpc-go in line with grpc-java's behavior, and C/wrapped languages will be following suit as well. Details and justification for the change can be found in grpc/grpc#17006. grpc-go's migration plan is proposed in grpc/grpc-go#2406. Please feel free to comment in the appropriate PR/issue for questions or concerns about this. Apologies in advance for the breaking change.
Hey there, we're using cmux to serve HTTP and gRPC together but our application is receiving a very large amount of requests (both gRPC and HTTP). We see tons of goroutines leaking around like:
813 goroutine 754 [IO wait, 215 minutes]:
814 internal/poll.runtime_pollWait(0x7eff86b282b0, 0x72, 0x0)
815 /usr/lib/golang/src/runtime/netpoll.go:173 +0x57
816 internal/poll.(*pollDesc).wait(0xc420329718, 0x72, 0xffffffffffffff00, 0x341d980, 0x340d 888)
817 /usr/lib/golang/src/internal/poll/fd_poll_runtime.go:85 +0xae
818 internal/poll.(*pollDesc).waitRead(0xc420329718, 0xc42172a000, 0x1000, 0x1000)
819 /usr/lib/golang/src/internal/poll/fd_poll_runtime.go:90 +0x3d
820 internal/poll.(*FD).Read(0xc420329700, 0xc42172a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
821 /usr/lib/golang/src/internal/poll/fd_unix.go:125 +0x18a
822 net.(*netFD).Read(0xc420329700, 0xc42172a000, 0x1000, 0x1000, 0x13ce, 0xc420956000, 0x13 ce)
823 /usr/lib/golang/src/net/fd_unix.go:202 +0x52
824 net.(*conn).Read(0xc4207b2158, 0xc42172a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
825 /usr/lib/golang/src/net/net.go:176 +0x6d
826 github.com/kubernetes-incubator/cri-o/vendor/github.com/soheilhy/cmux.(*bufferedReader). Read(0xc4207be380, 0xc42172a000, 0x1000, 0x1000, 0xc42098f8c8, 0x74db11, 0xc4202e9688)
827 /root/go/src/github.com/kubernetes-incubator/cri-o/_output/src/github.com/kubern etes-incubator/cri-o/vendor/github.com/soheilhy/cmux/buffer.go:53 +0x144
828 github.com/kubernetes-incubator/cri-o/vendor/github.com/soheilhy/cmux.(*MuxConn).Read(0x c4207be370, 0xc42172a000, 0x1000, 0x1000, 0x0, 0x13ce, 0x1c6e)
829 /root/go/src/github.com/kubernetes-incubator/cri-o/_output/src/github.com/kubern etes-incubator/cri-o/vendor/github.com/soheilhy/cmux/cmux.go:259 +0x4f
830 net/http.(*connReader).Read(0xc4202e9680, 0xc42172a000, 0x1000, 0x1000, 0xc420460380, 0x 1cee4a0, 0xc420460380)
831 /usr/lib/golang/src/net/http/server.go:753 +0x105
832 bufio.(*Reader).fill(0xc4200599e0)
833 /usr/lib/golang/src/bufio/bufio.go:97 +0x11a
834 bufio.(*Reader).ReadSlice(0xc4200599e0, 0x46860a, 0x48d7dd, 0xc42098fa20, 0xfffffffe1104 c0ba, 0xc42098fa60, 0x48da77)
835 /usr/lib/golang/src/bufio/bufio.go:338 +0x2c
836 bufio.(*Reader).ReadLine(0xc4200599e0, 0x100, 0xf8, 0x2157540, 0x468676, 0x22002098faa8, 0xf8)
837 /usr/lib/golang/src/bufio/bufio.go:367 +0x34
838 net/textproto.(*Reader).readLineSlice(0xc4202e96b0, 0xc42098fb20, 0xc42098fb20, 0x41b6d8 , 0x100, 0x2157540)
839 /usr/lib/golang/src/net/textproto/reader.go:55 +0x70
840 net/textproto.(*Reader).ReadLine(0xc4202e96b0, 0xc420750800, 0x0, 0x0, 0xc42098fba8)
841 /usr/lib/golang/src/net/textproto/reader.go:36 +0x2b
842 net/http.readRequest(0xc4200599e0, 0x0, 0xc420750800, 0x0, 0x0)
843 /usr/lib/golang/src/net/http/request.go:925 +0x99
844 net/http.(*conn).readRequest(0xc42096c280, 0x3435900, 0xc4207e4880, 0x0, 0x0, 0x0)
845 /usr/lib/golang/src/net/http/server.go:933 +0x17c
846 net/http.(*conn).serve(0xc42096c280, 0x3435900, 0xc4207e4880)
847 /usr/lib/golang/src/net/http/server.go:1739 +0x50e
848 created by net/http.(*Server).Serve
849 /usr/lib/golang/src/net/http/server.go:2720 +0x288
Do you guys know what's happening?
There is no copyright(s) documentation anywhere in source. Please consider adding copyright statement to README (e.g. Copyright 2015-2016 Name Surname <[email protected]>
).
Thanks.
I think that s.Register(&ExampleRPCRcvr{})
should be trpcS.Register(&ExampleRPCRcvr{})
in README.md.
I can't get gPRC and HTTP/2 health checks (both with TLS) to work. I believe the prescribed approach is to follow the RecursiveCmux example; we need to handle the TLS handshake before we can match on HTTP2HeaderFieldPrefix for gRPC (all other TLS traffic would be routed to the HTTP/2 health check server).
The problem then arises that, for the HTTP/2 health check server, we can't use http.ListenAndServeTLS since the TLS is already resolved using tls.NewListener. So we must use http.Serve. http.Serve documentation says:
HTTP/2 support is only enabled if the Listener returns *tls.Conn connections and they
were configured with "h2" in the TLS Config.NextProtos.
Adding "h2" to the TLS Config.NextProtos is straight forward enough. However, even though tls.NewListener returns a listener that returns *tls.Conn connections, it appears that when this listener is wrapped in the CMux object, it modifies the listener to return MuxConn connections rather than *tls.Conn. This breaks things. When I execute:
curl https://localhost:443 --http2 -Ik
I get a response of
curl: (16) Error in the HTTP2 framing layer
Is it possible to serve gPRC and HTTP/2 health checks (both with TLS) using the cmux package?
Is it possible to Match based on URL? Like:
cmux.MatchURL("/example")
can you give me a client demo code for gRPC ? ths
I saw the comments/documents on the matchers: https://github.com/soheilhy/cmux/blob/master/matchers.go#L137 (HTTP1HeaderFieldPrefix returns a matcher matching the header fields of the first request of an HTTP 1 connection. If the header with key name has a value prefixed with valuePrefix, this will match.
), this will cause issues with HTTP Keep-Alive
option, and some connection pool, actually i am already experiencing problem with Chrome and istio-proxy/envoy.
Explanation:
cmux.HTTP1HeaderFieldPrefix("User-Agent", "xxx/m.n")
, and which has the endpoint /api/a
cmux.HTTP1Fast()
, and which has the endpoint /api/b
cmux.HTTP2HeaderField("content-type", "application/grpc")
When the client turn on the Keep-Alive
, or has the connection pool(only tag the connection by host+port), to reuse the underlying TCP connection, then this will happen:
"User-Agent": "xxx/m.n"
, and call /api/a
, then the TCP connection will be built and connected to above server 1, it will work fine;"User-Agent": "xxx/m.n"
, and call /api/b
which is supposed to go to above server 2, but since client directly reuse the first connection(host+port is same, and no matcher check will happen), now the request will go to hit the above server 1, which will result 404I know the root cause is the client doesn't tag the connection properly, for our own connection pool we can do that, but for those public client, like Chrome, like those sidecar proxy product, if they are not configurable on how to tag one connection, then we can do nothing.
What is the solution for this kind of case, what are you guys' suggestions? Pls help.
The leak happened again. The same condition .
I did a pprof this is what i found.
(cmux.serve) -> HTTP2MatchHeaderFieldSendSettings->matchHTTP2Field->ReadFrame->readFrameHeader->io.ReadFull->ReadAtLeast->bufferedReader.Read->s.buffer.Write->bytes.(*Buffer).grow->bytes.makeSlice
same path as before.
Do you think a rouge client causes a connection leak even after the connection is closed. or a lot of rouge clients trying to connect but since due to bad authorization the connection is rejected but having their BufferReader bytes never freed by garbage collector even after connection being rejected.
The current usage i have seen is a inuse space not a alloc space that means it is still there not being collected by go garbage collector . One thing that is hard to reproduce is when does this happen. The last fix you gave would terminate any bad connections immediately since we are checking the continuos frame as well. But we have seen leak happening in production and server nodes shutting down after 10 days due to out of memory.
I wrote a socks5 matcher like TLS,just match one byte
func SOCKS5(versions ...int) Matcher {
if len(versions) == 0 {
versions = []int{
0x05,
}
}
prefixes := [][]byte{}
for _, v := range versions {
prefixes = append(prefixes, []byte{byte(v)})
}
return prefixByteMatcher(prefixes...)
}
when I put socks5 matcher after HTTP1Fast
matcher, it is not working, not reach the socks5 server handler
but put it before http matcher, it's working
// failed!
httpL := m.Match(cmux.HTTP1FastOptions())
socksL := m.Match(cmux.SOCKS5())
Under https://github.com/soheilhy/cmux#performance there is a TODO to add benchmarks.
I recently used the cmux project in another open source project SeldonIO/seldon-core#1772
Questions were raised in the community about any possible performance hit from using the package, specifically asking for benchmarks.
I will look into this and hope to submit a PR with benchmarks soon.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.