Comments (4)
I extended the pk11 router for 0.5 here https://github.com/Trunkplatform/rxnetty-router We used this extensively. I'll work up a post example for you shortly/
from rxnetty.
Thanks a lot! appreciated.
from rxnetty.
Here's the example (with some gradle stuff at the bottom). It's also worth checking through the issues for more info on releasing ByteBuf
s. Memory leaks are one of the biggest problems people have when receiving data.
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.util.CollectBytes;
import org.pk11.rxnetty.router.Router;
import rx.Observable;
import java.nio.charset.Charset;
import static org.pk11.rxnetty.router.Dispatch.using;
public class PostExample {
public static void main(String[] args) {
HttpServer<ByteBuf, ByteBuf> server = HttpServer.newServer()
.start(
using(
new Router<ByteBuf, ByteBuf>()
.POST(
"/echo",
new EchoHandler()
)
.notFound(new Handler404())
)
);
HttpClient.newClient("localhost", server.getServerPort())
.createPost("/echo")
.writeStringContent(
Observable.range(1, 10).map(i -> "This is test " + i + "\n"),
s -> true // flush on each selector, see below
)
.flatMap(
response ->
response.getContent().compose(CollectBytes.all())
.map(
b -> {
String s = b.toString(Charset.defaultCharset());
b.release();
return s;
}
)
)
.subscribe(
System.out::println,
System.err::println
);
server.awaitShutdown();
}
private static class EchoHandler implements io.reactivex.netty.protocol.http.server.RequestHandler<ByteBuf, ByteBuf> {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
// Three notes here:
// 1) we have to release the content manually. Where you only need to know
// about the completion of processing, you can also use autoRelease()
// 2) flushing is also manual - we are using a naive flush algorithm to
// prevent the response blocking once the write buffer is full.
// 3) it is not guaranteed the ByteBuf block boundaries are the same as those sent
return response.writeStringAndFlushOnEach(
request.getContent()
.map(
b -> {
String s = b.toString(Charset.defaultCharset());
b.release();
return s;
}
)
);
}
}
private static class Handler404 implements io.reactivex.netty.protocol.http.server.RequestHandler<ByteBuf, ByteBuf> {
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
System.out.println("Not found: " + request.getUri());
return response.setStatus(HttpResponseStatus.NOT_FOUND)
.writeString(Observable.just("Not found"));
}
}
}
Gradle requires manually setting a repo to get the router code
plugins {
id 'java'
}
group 'io.reactivex.netty'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven {
url 'https://dl.bintray.com/trunkplatform/trunk-java-oss'
}
}
dependencies {
implementation 'io.reactivex:rxnetty-http:0.5.3'
compile 'org.pk11.rxnetty:rxnetty-router-core:1.4.2'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
from rxnetty.
Thanks, @jamesgorman2 .
from rxnetty.
Related Issues (20)
- Adding PipelineConfigurator in RxNetty 0.5.x HOT 1
- My async client keeps getting “Content stream is already disposed” error HOT 2
- Any plan to build RxNetty on top of RxJava 2?
- ConnectionHandler on 0.5.x-java2 branch imports rx.Observable HOT 1
- For large POSTs RxNetty seems to need to write everything before reading anything HOT 6
- ClosedChannelException while TcpClient reads from TcpServer HOT 3
- 0.5 Intuitive bytebuf handling HOT 2
- RxNetty 0.5.2 stable? HOT 4
- Create Http Client without binding it to an endpoint HOT 3
- [0.4.20] NullPointException when closing socket HOT 2
- Writes out of order when using multiple threads
- Does RxNetty support http2 multiplexing? HOT 1
- why before response.close() must response.getChannel().deregister()? HOT 2
- Unnecessary synchronised lock
- Connection Leak 0.5.2 HOT 6
- require for documentation around how backpressure works HOT 4
- [SECURITY] unsafeSecure() should not be used in samples HOT 1
- ResourceLeakDetector,LEAK: ByteBuf.release()
- HTTPS Server with RxNetty and existing certificate
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rxnetty.