Comments (6)
from rxnetty.
I've tried this, and it does get better (in the sense that it writes more before freezing) but the behaviour is the same.
If I run this snippet:
@Test
public void testRxNettyIssue596() {
Observable<ByteBuf> body = Bytes.from(new File("/Users/frank/git/reactive-servlet/rxjava-extras-0.8.0.8.jar"))
.doOnNext(e->System.err.println("Read some data"))
.doOnCompleted(()->System.err.println("All data complete"))
.observeOn(Schedulers.io())
.map(b->Unpooled.copiedBuffer(b));
HttpClient.newClient("localhost",8080).createPost("/reactive-servlet/reactive")
.writeContentAndFlushOnEach(body)
.map(response->response.getContent().asObservable())
.concatMap(e->e)
.observeOn(Schedulers.io())
.doOnNext(e->System.err.println("Something written back"))
.doOnCompleted(()->System.err.println("All data received"))
.toBlocking()
.subscribe();
}
It works out (the reactive-servlet just echo's the same data as it comes in), because this file is small enough (400k) buf if I look at the output:
Read some data
.....
Read some data
Read some data
Read some data
All data complete
Something written back
Something written back
Something written back
Something written back
......
Something written back
All data received
I see that the data only starts streaming back after all data has been sent. For bigger files, my server won't accept all data if it can't write back.
from rxnetty.
I see what you mean now. I certainly find this behaviour unexpected. I have a repro below. It will log each with block written or read.
writeContent...
does not appear to be emitting the response until it has been fully returned. Looking at the chunk example from 0.4.x I wonder if something similar is happening on 0.5.x. I haven't been able to identify is causing it yet. More investigation is required.
import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.server.HttpServer;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;
public class TestLargePayloads {
private static AtomicLong clientWrote = new AtomicLong(0);
private static AtomicLong serverRead = new AtomicLong(0);
private static AtomicLong serverWrote = new AtomicLong(0);
private static AtomicLong clientRead = new AtomicLong(0);
// private static AtomicLong iteration = new AtomicLong(0);
public static void main(String[] args) throws Exception {
HttpServer<ByteBuf, ByteBuf> server = HttpServer.newServer()
.enableWireLogging("FOO", LogLevel.WARN)
.start(
(request, response) ->
response.writeStringAndFlushOnEach( // force flush server output
request.getContent().autoRelease() // release bytebufs
.subscribeOn(Schedulers.computation())
.map((byteBuf) -> byteBuf.toString(Charset.defaultCharset()))
.doOnNext(s -> update(serverRead, s.length()))
.compose(TestLargePayloads::incrementChars)
.doOnNext(s -> update(serverWrote, s.length()))
.doOnNext(s -> sleep()) // force thread switching
)
);
TestSubscriber<String> subscriber = TestSubscriber.create();
printStatus();
HttpClient.newClient("localhost", server.getServerPort())
.enableWireLogging("TMP", LogLevel.INFO)
.createPost("/")
.writeStringContent(
Observable.range(0, 10)
.map(i -> String.format("%010d", i))
.doOnNext(s -> update(clientWrote, s.length()))
.doOnNext(s -> sleep()), // force thread switching
s -> true // force flush client output
)
.flatMap(
response -> {
System.out.println("Got response");
return response.getContent().autoRelease()
.map((byteBuf) -> byteBuf.toString(Charset.defaultCharset()));
}
)
.doOnNext(s -> update(clientRead, s.length()))
.subscribe(subscriber);
subscriber.awaitTerminalEvent();
// printStatus();
System.err.println();
subscriber.assertNoErrors();
server.shutdown();
}
private static void sleep() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static Observable<String> incrementChars(Observable<String> ss) {
// busy work
return ss.map(
s ->
s.chars()
.map(i -> i + 1)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString()
);
}
static void update(AtomicLong v, int by) {
v.addAndGet(by);
// if (iteration.getAndIncrement() % 1000 == 0) {
printStatus();
// }
}
private static void printStatus() {
System.out.println(
String.format(
"Client wrote %s, Server read %s, Server wrote %s, Client read %s [%s]",
clientWrote,
serverRead,
serverWrote,
clientRead,
Thread.currentThread().getName()
)
);
}
}
from rxnetty.
I've honestly never seen a HTTP server do this in the wild, and I really don't know if this behaviour is normal (or even allowed by the HTTP standard (I couldn't find any references to it)), but for me it would be the most efficient.
While researching this, I do get the impression that this is at least a less-trodden path (on the server side I got issues in Jetty and Undertow). Also, I discovered on the client side that cURL handles this well, it starts streaming immediately, but wget behaves like RxNetty, it doesn't stream anything until this upload is done.
from rxnetty.
@flyaruu If you change your observeOn(Schedulers.io()) to subscribeOn(Schedulers.io()), I think you will get the behavior you want.
You may want to do subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()) to ensure io/computation being done on separate threads.
from rxnetty.
I've tried doing that but I don't see the behaviour changing. If I add the observeOn / subscribeOn to James' example, it still uploads the entire post first.
from rxnetty.
Related Issues (20)
- Adding PipelineConfigurator in RxNetty 0.5.x HOT 1
- My async client keeps getting “Content stream is already disposed” error HOT 2
- Any plan to build RxNetty on top of RxJava 2?
- ConnectionHandler on 0.5.x-java2 branch imports rx.Observable HOT 1
- ClosedChannelException while TcpClient reads from TcpServer HOT 3
- 0.5 Intuitive bytebuf handling HOT 2
- RxNetty 0.5.2 stable? HOT 4
- Create Http Client without binding it to an endpoint HOT 3
- [0.4.20] NullPointException when closing socket HOT 2
- Writes out of order when using multiple threads
- Does RxNetty support http2 multiplexing? HOT 1
- why before response.close() must response.getChannel().deregister()? HOT 2
- Unnecessary synchronised lock
- Connection Leak 0.5.2 HOT 6
- require for documentation around how backpressure works HOT 4
- HTTP POST example (REST) HOT 4
- [SECURITY] unsafeSecure() should not be used in samples HOT 1
- ResourceLeakDetector,LEAK: ByteBuf.release()
- HTTPS Server with RxNetty and existing certificate
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rxnetty.