Git Product home page Git Product logo

vertx-grpc's People

Contributors

cescoffier avatar eclipsewebmaster avatar franz1981 avatar jckhoe avatar kureuil avatar lwlee2608 avatar sg-o avatar tsegismont avatar vietj avatar yeikel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

vertx-grpc's Issues

ProtoReflectionService "server" is null exception

Version

4.3.5

Context

Encountering exception when querying grpc reflection service

Testing with grpcurl:
grpcurl -plaintext localhost:8080list

Exception:
15:01:35.948 [vert.x-eventloop-thread-2] ERROR io.vertx.core.impl.ContextBase - Unhandled exception java.lang.NullPointerException: Cannot invoke "io.grpc.Server.getImmutableServices()" because "server" is null at io.grpc.protobuf.services.ProtoReflectionService.getRefreshedIndex(ProtoReflectionService.java:93) at io.grpc.protobuf.services.ProtoReflectionService.serverReflectionInfo(ProtoReflectionService.java:134) at io.grpc.reflection.v1alpha.ServerReflectionGrpc$MethodHandlers.invoke(ServerReflectionGrpc.java:208) at io.grpc.stub.ServerCalls$StreamingServerCallHandler.startCall(ServerCalls.java:235) at io.vertx.grpc.server.impl.GrpcServiceBridgeImpl.lambda$bind$2(GrpcServiceBridgeImpl.java:62) at io.vertx.grpc.server.impl.GrpcServerImpl$MethodCallHandler.handle(GrpcServerImpl.java:95) at io.vertx.grpc.server.impl.GrpcServerImpl.handle(GrpcServerImpl.java:62)

val grpcServer: GrpcServer = GrpcServer.server(vertx)
val mainBridge = GrpcServiceBridge.bridge(MainBridge())
val reflectionBridge = GrpcServiceBridge.bridge(ProtoReflectionService.newInstance())
mainBridge.bind(grpcServer)
reflectionBridge.bind(grpcServer)

httpServer = vertx
  .createHttpServer(httpOptions)
  .requestHandler(grpcServer)
  //.requestHandler(router)
  .listen(appConfig.webPort).await()

Using SSL with client results in "handle me"

See VertxClientCall class.

To reproduce

    public void testNewSmoke() throws Exception {
        Vertx vertx = vertx();
        try {
            HttpClientOptions options = new HttpClientOptions();
            Buffer buffer;
            try (InputStream stream = getClass().getClassLoader().getResourceAsStream("tls/ca.pem")) {
                buffer = Buffer.buffer(stream.readAllBytes());
            }
            options.setTrustOptions(new PemTrustOptions().addCertValue(buffer));
            GrpcClient client = GrpcClient.client(vertx, options);
            try {
                GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(port(), "localhost"));
                GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
                HelloReply reply = stub.sayHello(HelloRequest.newBuilder().setName("neo-blocking").build());

Client builder API

We need a client builder API to let user configure load balancing / discovery in a friendly fashion.

RST_STREAM frame after some client request

Using this client code:

client
  .request(server, GreeterGrpc.getSayHelloMethod()).compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

I have observed that RST_STREAM (8) is being sent after each request. However, my understanding of the RST_STREAM frame is that it should only be used when there is an error that prevents the sender or receiver from completing a particular stream.

To confirm my understanding, I also used the older VertxGrpcStub and found no instances of RST_STREAM being used.

I attempted to change request.end() to request.write() but this did not resolve the issue. Is there a way to send a request without including the RST_STREAM frame?

To gain further insights, I also performed a debug session using this unit test to trace the code that produces the RST_STREAM frame. I think it probably comes from here.

Protoc: generated stub cannot be used with interceptors

gRPC interceptors are useful for platform code like logging, monitoring or tracing.

The futurized stubs provided by the Vert.x protoc plugin cannot be integrated with gRPC Java interceptor code because they don't implement BindableService.

Users should be able to write interceptors for their futurized gRPC services.

The GrpcClient constructor should add http2ClearTextUpgrade(false) to passed HttpOptions

Version

4.3.7

Context

The GrpcClient constructors allow you to pass in custom HttpOptions.

Because httpOptions.setHttp2ClearTextUpgrade(false) must be set, the constructor should do it for you, like the default constructor does.

Steps to reproduce

  1. Create a client with GrpcClient.client(vertx, new HttpClientOptions()).
  2. Observe the server replies with an unexpected HTTP/1.1 protocol error.
  3. Observe GrpcClient.client(vertx, new HttpClientOptions().setHttp2ClearTextUpgrade(false)) resolves the issue.

This is a known issue. It is never valid to have this set to true (or the default) on a GrpcClient.

Missing request/response header/trailer keys

Headers/trailers are not always correctly handled. gRPC specific metadata keys (beginning with -grpc) are sometimes skipped and binary keys (ending with -bin) are not properly handled.

`GrpcServiceBridgeImpl` should set exception handling callback

Questions

When I using GrpcServiceBridgeImpl, some of the behavior of vertx-grpc is not as expected when the network is abnormal.

Expect Behavior

void init(ServerCall.Listener<Req> listener) {
this.listener = listener;
req.errorHandler(error -> {
if (error == GrpcError.CANCELLED && !closed) {
listener.onCancel();
}
});
readAdapter.init(req, new BridgeMessageDecoder<>(methodDef.getMethodDescriptor().getRequestMarshaller(), decompressor));
writeAdapter.init(req.response(), new BridgeMessageEncoder<>(methodDef.getMethodDescriptor().getResponseMarshaller(), compressor));
}

if the network between client and server was interrupted, listener.onCancel() should be invoked.

Actual Behavior

nothing happened.

Version

4.5.7


The errorHandler is an method from GrpcReadStream, it was invoked only when the httpStream has a StreamResetException, but when client was killed, an IOException occurred.

public void init() {
stream.handler(this);
stream.endHandler(v -> queue.write(END_SENTINEL));
stream.exceptionHandler(err -> {
if (err instanceof StreamResetException) {
handleReset(((StreamResetException)err).getCode());
} else {
handleException(err);
}
});
queue.drainHandler(v -> stream.resume());

java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.SocketDispatcher.writev0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:66)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:227)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:158)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:574)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:430)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:359)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:935)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:197)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:941)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
	at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
	at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.checkFlush(VertxHttp2ConnectionHandler.java:247)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.writeData(VertxHttp2ConnectionHandler.java:242)
	at io.vertx.core.http.impl.VertxHttp2Stream.doWriteData(VertxHttp2Stream.java:244)
	at io.vertx.core.http.impl.VertxHttp2Stream.writeData(VertxHttp2Stream.java:216)
	at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:469)
	at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:347)
	at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:48)
	at io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:242)
	at io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:112)
	at io.vertx.grpc.server.impl.GrpcServerResponseImpl.write(GrpcServerResponseImpl.java:102)
	at io.vertx.grpc.server.impl.GrpcServerResponseImpl.write(GrpcServerResponseImpl.java:248)
	at io.vertx.core.streams.impl.PipeImpl.lambda$to$1(PipeImpl.java:81)
	at io.vertx.grpc.server.impl.GrpcServerRequestImpl.lambda$handler$0(GrpcServerRequestImpl.java:79)
	at io.vertx.grpc.common.impl.GrpcReadStreamBase.handleMessage(GrpcReadStreamBase.java:207)
	at io.vertx.grpc.common.impl.GrpcReadStreamBase.lambda$init$3(GrpcReadStreamBase.java:89)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
	at io.vertx.grpc.common.impl.GrpcReadStreamBase.handle(GrpcReadStreamBase.java:125)
	at io.vertx.grpc.common.impl.GrpcReadStreamBase.handle(GrpcReadStreamBase.java:39)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
	at io.vertx.core.http.impl.HttpEventHandler.handleChunk(HttpEventHandler.java:51)
	at io.vertx.core.http.impl.Http2ServerRequest.handleData(Http2ServerRequest.java:148)
	at io.vertx.core.http.impl.Http2ServerStream.handleData(Http2ServerStream.java:206)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:75)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
	at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:242)
	at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:295)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
	at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

how to get remoteAddress from io.vertx.grpc.server.GrpcServerRequest

how to get remoteAddress from io.vertx.grpc.server.GrpcServerRequest,i can`t find any method to get remoteAddress

GrpcServer grpcServer = GrpcServer.server(vertx);
grpcServer.callHandler(StreamingGrpc.getPipeMethod(), request -> {
  // how to get remoteAddress from request
});

if use socket,i can use this method
io.vertx.core.http.WebSocketBase#remoteAddress

withCredentials / authorization header support with stubs

Version

4.3.7

Context

Add support for withCallCredentials on gRPC stubs.

Do you have a reproducer?

SomeStub.newStub(new GrpcClientChannel(...)).withCallCredentials does not propagate call credentials.

Is this a bug or a feature?

It's both.

Investigate suspicious ByteBuf allocation

Caught this in CI:

SEVERE: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:96)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
	io.netty.handler.codec.base64.Base64.encode(Base64.java:108)
	io.netty.handler.codec.base64.Base64.encode(Base64.java:100)
	io.netty.handler.codec.base64.Base64.encode(Base64.java:80)
	io.netty.handler.codec.base64.Base64.encode(Base64.java:74)
	io.vertx.grpc.server.impl.GrpcServerResponseImpl.encodeMessage(GrpcServerResponseImpl.java:274)
	io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:267)
	io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:123)
	io.vertx.grpc.common.impl.WriteStreamAdapter.write(WriteStreamAdapter.java:47)
	io.vertx.grpc.server.impl.GrpcServiceBridgeImpl$ServerCallImpl.sendMessage(GrpcServiceBridgeImpl.java:169)
	io.grpc.ForwardingServerCall.sendMessage(ForwardingServerCall.java:32)
	io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:380)
	io.vertx.grpc.server.web.TestServiceImpl.streamingCall(TestServiceImpl.java:58)
	io.vertx.grpcweb.TestServiceGrpc$MethodHandlers.invoke(TestServiceGrpc.java:335)
	io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
	io.vertx.grpc.server.impl.GrpcServiceBridgeImpl$ServerCallImpl$1.handleClose(GrpcServiceBridgeImpl.java:100)
	io.vertx.grpc.common.impl.ReadStreamAdapter.lambda$init$1(ReadStreamAdapter.java:33)
	io.vertx.grpc.common.impl.GrpcReadStreamBase.handleEnd(GrpcReadStreamBase.java:195)
	io.vertx.grpc.common.impl.GrpcReadStreamBase.lambda$init$3(GrpcReadStreamBase.java:83)
	io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
	io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
	io.vertx.grpc.common.impl.GrpcReadStreamBase.lambda$init$0(GrpcReadStreamBase.java:72)
	io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:237)
	io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:219)
	io.vertx.core.http.impl.HttpEventHandler.handleEnd(HttpEventHandler.java:76)
	io.vertx.core.http.impl.Http1xServerRequest.onEnd(Http1xServerRequest.java:541)
	io.vertx.core.http.impl.Http1xServerRequest$1.handleMessage(Http1xServerRequest.java:104)
	io.vertx.core.net.impl.InboundMessageQueue.test(InboundMessageQueue.java:73)
	io.vertx.core.streams.impl.InboundReadQueue.drain(InboundReadQueue.java:250)
	io.vertx.core.streams.impl.InboundReadQueue.drain(InboundReadQueue.java:224)
	io.vertx.core.net.impl.InboundMessageQueue.drainInternal(InboundMessageQueue.java:164)
	io.vertx.core.net.impl.InboundMessageQueue.drain(InboundMessageQueue.java:144)
	io.vertx.core.http.impl.Http1xServerRequest.handleEnd(Http1xServerRequest.java:153)
	io.vertx.core.http.impl.Http1xServerConnection.onEnd(Http1xServerConnection.java:197)
	io.vertx.core.http.impl.Http1xServerConnection.onContent(Http1xServerConnection.java:188)
	io.vertx.core.http.impl.Http1xServerConnection.handleOther(Http1xServerConnection.java:173)
	io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:166)
	io.vertx.core.net.impl.VertxConnection.read(VertxConnection.java:239)
	io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:87)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.vertx.core.http.impl.Http1xOrH2CHandler.end(Http1xOrH2CHandler.java:61)
	io.vertx.core.http.impl.Http1xOrH2CHandler.channelRead(Http1xOrH2CHandler.java:38)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:829)

Bug when reading metadata

The recent changs regarding the metadata/trailer seem to be causing issues:

2023-02-13T13:15:31.1187617Z 2023-02-13 13:15:31,036 ERROR [io.qua.ver.cor.run.VertxCoreRecorder] (vert.x-eventloop-thread-1) Uncaught exception received by Vert.x: java.lang.ArrayIndexOutOfBoundsException: Index 10 out of bounds for length 10
2023-02-13T13:15:31.1188475Z 	at io.vertx.grpc.common.impl.Utils.readMetadata(Utils.java:34)
2023-02-13T13:15:31.1189648Z 	at io.vertx.grpc.client.VertxClientCall.lambda$null$2(VertxClientCall.java:90)
2023-02-13T13:15:31.1190216Z 	at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
2023-02-13T13:15:31.1190754Z 	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
2023-02-13T13:15:31.1191353Z 	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
2023-02-13T13:15:31.1192254Z 	at io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:40)
2023-02-13T13:15:31.1192716Z 	at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
2023-02-13T13:15:31.1193146Z 	at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
2023-02-13T13:15:31.1193585Z 	at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
2023-02-13T13:15:31.1193972Z 	at io.vertx.core.Promise.complete(Promise.java:66)
2023-02-13T13:15:31.1194411Z 	at io.vertx.core.http.impl.HttpClientRequestImpl.handleResponse(HttpClientRequestImpl.java:346)
2023-02-13T13:15:31.1194955Z 	at io.vertx.core.http.impl.HttpClientRequestBase.handleResponse(HttpClientRequestBase.java:182)
2023-02-13T13:15:31.1195481Z 	at io.vertx.core.http.impl.HttpClientRequestBase.lambda$new$0(HttpClientRequestBase.java:71)
2023-02-13T13:15:31.1195926Z 	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
2023-02-13T13:15:31.1196306Z 	at io.vertx.core.impl.ContextBase.emit(ContextBase.java:239)
2023-02-13T13:15:31.1196736Z 	at io.vertx.core.http.impl.Http2ClientConnection$Stream.onHeaders(Http2ClientConnection.java:331)
2023-02-13T13:15:31.1197244Z 	at io.vertx.core.http.impl.Http2ClientConnection.onHeadersRead(Http2ClientConnection.java:189)
2023-02-13T13:15:31.1197749Z 	at io.vertx.core.http.impl.Http2ConnectionBase.onHeadersRead(Http2ConnectionBase.java:206)
2023-02-13T13:15:31.1198331Z 	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onHeadersRead(Http2FrameListenerDecorator.java:48)
2023-02-13T13:15:31.1198958Z 	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onHeadersRead(Http2EmptyDataFrameListener.java:63)
2023-02-13T13:15:31.1199599Z 	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:409)
2023-02-13T13:15:31.1200202Z 	at io.netty.handler.codec.http2.DefaultHttp2FrameReader$1.processFragment(DefaultHttp2FrameReader.java:450)
2023-02-13T13:15:31.1200773Z 	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:457)
2023-02-13T13:15:31.1201603Z 	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:253)
2023-02-13T13:15:31.1202200Z 	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
2023-02-13T13:15:31.1202797Z 	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
2023-02-13T13:15:31.1203487Z 	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
2023-02-13T13:15:31.1204102Z 	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:393)
2023-02-13T13:15:31.1204641Z 	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:453)
2023-02-13T13:15:31.1205217Z 	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
2023-02-13T13:15:31.1205789Z 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
2023-02-13T13:15:31.1206292Z 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
2023-02-13T13:15:31.1206840Z 	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.channelRead(VertxHttp2ConnectionHandler.java:408)

IndexOutOfBoundsException in GrpcMethodCall

The method serviceName() in GrpcMethodCall can throw an index out of bounds exception if the service path doesn't contain a dot.

I encountered this bug in 4.4.2.

The path I tried it with is "/Auth/GetChallenge"

Context storage based on Vert.x context

gRPC provides a hook io.grpc.override.ContextStorageOverride to let implementations provide their own storage.

This should be in a standalone jar, to let users decide or not to use this storage (as the io.grpc.override.ContextStorageOverride is an hardcoded loaded class by gRPC for the moment).

Deadline setting in vertx grpc stub client is ignored

Version

4.4.1

Context

When using vertx grpc client it seems that callOptions that holds deadline configuration is not being propagated to VertxClientCall.

Steps to reproduce

Create a vertx stub:

GrpcClientChannel channel =
    new GrpcClientChannel(
        GrpcClient.client(vertx),
        SocketAddress.inetSocketAddress(
            port, host));
SomeVertxStub stub = VertxSomeServiceGrpc.newVertxStub(
        channel);

Use this stub to make calls with deadline:

  stub
      .withDeadlineAfter(1, TimeUnit.MILLISECONDS)

The deadline which is a part callOptions is not being sent to VertxClientCall and not taken into account when making grpc calls so there is no way to set request timeout for grpc client calls.

GrpcServer can't be mounted in a Vert.x Web router

The documentation and the 4.3 release blog post indicate that a GrpcServer can be mounted in a Vert.x Web router.

In fact, it is not possible because the gRPC server is a Handler<HttpServerRequest> not a Handler<RoutingContext>. So this blog post example is wrong:

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
    responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
    responseObserver.onCompleted();
  }
};

GrpcServer grpcServer = GrpcServer.server(vertx);
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(service);
serverStub.bind(grpcServer);

router.consumes("application/grpc").handler(grpcServer); // does not compile

Use Vertx Grpc Server But do not use Vertx Grpc Client,Loop 10 call ,but blocked at first

Questions

I am Grpc Server Use Vertx Grpc Serve build.Grpc Client do not use Vertx Grpc Client.Then I Loop 10 call ,but blocked at first。

Version

4.3.7

Context

  • Server
public class VertxStart extends AbstractVerticle {
    @Override
    public void start() {
        final var server = GrpcServer.server(vertx);
        final var greeter = new GreeterGrpc.GreeterImplBase() {
            @Override
            public void sayHello(final HelloRequest request, final StreamObserver<HelloReply> responseObserver) {
                System.out.println("Hello " + request.getName());
                final var reply = HelloReply.newBuilder().setMessage(request.getName()).build();
                responseObserver.onNext(reply);
                responseObserver.onCompleted();
            }

        };
        GrpcServiceBridge
                .bridge(greeter)
                .bind(server);
        vertx.createHttpServer().requestHandler(server).listen(4240);
    }
}
  • Client
    public static void main(String[] args) {
     
        new GreeterClient().start();
    }

   
    public void start() {
        final var channel = NettyChannelBuilder.forAddress("127.0.0.1", 4240).usePlaintext()
                .build();
        final var sub = GreeterGrpc.newBlockingStub(channel);
        for (int i = 0; i < 10; i++) {
            System.out.println("开始idx\t" + i);
            final var data = HelloRequest.newBuilder().setName("qwx" + i).build();
            final var resp = sub.sayHello(data);
            System.out.println("resp\t" + resp.getMessage());
            System.out.println("结束idx\t" + i);
        }
        channel.shutdown() ;
    }
}

Extra

  • JDK 11

When the server is unavailable, the client does not have any exception logs.

Version

4.3.1 and 4.3.3-SNAPSHOT

Context

My server is interrupted while running, the client does not have any exceptions or logs, It is not easy to locate the problem in time.
I looked at the source code and found that in the vertx-grpc-client module io.vertx.grpc.client.VertxClientCall #start() does not handle the client.request error.

The gRPC server bidirectional streams do not receive exceptions when the client loses connectivity

Version

4.3.7

Context

Using request.exceptionHandler(...), bidirectional streams can respond to ungraceful stream closure, which could be killing an app, a browser crashing, a phone running out of battery, network disconnection or other failure.

This package does not correctly receive exceptions the way Netty does. It lacks the configuration that the Netty server has for keep alives. This appears to be the root issue.

Steps to reproduce

  1. Generate a bidi streaming method stub using the original Netty-based Vertx package's jprotoc based code generator. You will have a method of the signature:
void bidiCall(ReadStream<X> request, WriteStream<Y> response);
  1. Bind it to the Netty-based VertxServer. Configure it with timeouts:
var builder = VertxServerBuilder.forPort(vertx, port);
var nettyServerBuilder = builder.nettyBuilder();
nettyServerBuilder
.maxConnectionIdle(Long.MAX_VALUE, TimeUnit.NANOSECONDS)
.maxConnectionAge(29, TimeUnit.DAYS)
.keepAliveTime(400, TimeUnit.MILLISECONDS)
.keepAliveTimeout(8000, TimeUnit.MILLISECONDS)
.permitKeepAliveTime(100, TimeUnit.MILLISECONDS)
.permitKeepAliveWithoutCalls(true);
builder.build();
  1. Proxy the server with ToxiProxy.
  2. Connect to the bidi RPC endpoint.
  3. Observe you are connected.
  4. Disconnect the network using ToxiProxy.
  5. After a short while, observe a StatusRuntimeException with status Status.CANCELLED is delivered to request.exceptionHandler.
  6. Bind the same service to a GrpcServer.
  7. Proxy it with ToxiProxy.
  8. Connect to the bidi RPC endpoint.
  9. Observe you are connected.
  10. Disconnect the network using ToxiProxy.
  11. Observe no exception is received nor the endHandler called in a short while.

Extra

This is reproduced in the tests of my own open source program. I can share that one. Otherwise I will need help authoring tests in this package, since Maven in my IntelliJ does not appear to generate the protos / reference them correctly and it's impossible to code. Additionally I suggest adding TestContainers as a dependency for using ToxiProxy.

io.vertx.grpc.client.GrpcClientResponse.errorHandler is not called when grpc response code is not 0

Questions

Thanks to your awesome work, now I can build a grpc reversed proxy using this library.

But the proxyResponse.errorHandler is not called when the inner grpc service returns code like 2. Is this a bug or expected behavior? I can use the code:

              // if (proxyResponse.status() != null) {
              //   clientReq.response().status(proxyResponse.status());
              // }

to make a workaround, but it seems not that good.

  private void startGrpcGatewayService() {
    GrpcServer server = GrpcServer.server(vertx);
    GrpcClient client = GrpcClient.client(vertx);
    server.callHandler(clientReq -> {
      clientReq.pause();
      System.out.println(clientReq.fullMethodName());
      SocketAddress address = SocketAddress.inetSocketAddress(8081, "127.0.0.1");
      client.request(address)
        .onSuccess(proxyRequest -> {
          System.out.println("on grpc proxy success");
          proxyRequest.response()
            .onSuccess(proxyResponse -> {
              System.out.println("proxy response success: " + proxyResponse.status());
              // if (proxyResponse.status() != null) {
              //   clientReq.response().status(proxyResponse.status());
              // }
              proxyResponse.messageHandler(clientReq.response()::writeMessage);
              proxyResponse.errorHandler(error -> {
                // TODO why we cannot get proxy call fail code here?
                System.out.println("proxy call error happened: " + proxyResponse.status());
                clientReq.response().status(error.status);
              });
              proxyResponse.endHandler(voids -> clientReq.response().end());
            })
            .onFailure(exception -> {
              System.out.println("proxy response fail: " + exception.getMessage());
              exception.printStackTrace();
              clientReq.response().status(GrpcStatus.UNKNOWN).end();
            });

          proxyRequest.fullMethodName(clientReq.fullMethodName());
          clientReq.messageHandler(proxyRequest::writeMessage);
          clientReq.endHandler(v -> proxyRequest.end());
          clientReq.resume();
        })
        .onFailure(exception -> {
          System.out.println("on grpc proxy fail");
          clientReq.response().status(GrpcStatus.UNKNOWN).end();
          clientReq.resume();
        });
    });

Version

  • vertxVersion = "4.3.1"

Do you have a reproducer?

https://github.com/LangInteger/vertx-grpc-reversed-proxy

Steps to reproduce

git clone [email protected]:LangInteger/vertx-grpc-reversed-proxy.git
cd vertx-grpc-reversed-proxy
./gradlew clean build

take a look at the helloWorldBrokenTest

Grpc transcoding

Describe the feature

This feature request proposes gRPC transcoding capabilities for the server. This would allow the server to accept json formatted grpc requests and transcode them according to the specifications defined in google.api.http.

Use cases

While grpc web is already implemented, it utilizes a unique format that differs from the transcoding of REST requests into grpc requests. Implementing this feature would be particularly useful when publicly exposing a grpc service, where supporting web based requests or providing users with a REST api option is desired.

Contribution

I am willing to contribute to the implementation of this feature. I have already implemented this in quarkus repository. After discussion in this PR, the maintainers concluded that this should be implemented directly in grpc server implementation.

Protoc: let users decide if the plugin should generate clients, server or both

The protoc plugin is not configurable. Consequently, it generates code for clients and servers, forcing users to add dependencies to both vertx-grpc-client and vertx-grpc-server.

The plugin should be configurable, in particular it should be easy to switch off code generation for clients or server from Maven.

Support custom Compression/Decompressino

Describe the feature

We are using the older (Netty) version of the gRPC Server and implemented custom Snappy compression/decompression. We extended the (De)Compressor registry and registered it in the GrpcServer builder via

  • compressorRegistry
  • decompressorRegistry

I couldn't find any option to register custom (de)compressors in grpc-netty

Use cases

This feature is usefull when you need to compress the payload and don't want to use gzip because it's very heavy on CPU. I our case we decided to use Snappy because is comes with a nice balance between CPU usage and compression ratio

Contribution

I'm happy to implement this feature and it might be enough to just allow a custom Compressor/Decompressor Registry to be added to the GrpcServiceBridge and use them instead of the default registries to find the compressor needed.
But I'm not sure

Thanks you
Markus

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.