Git Product home page Git Product logo

Comments (6)

jamesgorman2 avatar jamesgorman2 commented on August 11, 2024

from rxnetty.

flyaruu avatar flyaruu commented on August 11, 2024

I've tried this, and it does get better (in the sense that it writes more before freezing) but the behaviour is the same.

If I run this snippet:

	@Test
	public void testRxNettyIssue596() {
	Observable<ByteBuf> body = Bytes.from(new File("/Users/frank/git/reactive-servlet/rxjava-extras-0.8.0.8.jar"))
			.doOnNext(e->System.err.println("Read some data"))
			.doOnCompleted(()->System.err.println("All data complete"))
			.observeOn(Schedulers.io())
			.map(b->Unpooled.copiedBuffer(b));
		
	
	HttpClient.newClient("localhost",8080).createPost("/reactive-servlet/reactive")
	    .writeContentAndFlushOnEach(body)
	    .map(response->response.getContent().asObservable())
	    .concatMap(e->e)
	    .observeOn(Schedulers.io())
	    .doOnNext(e->System.err.println("Something written back"))
	    .doOnCompleted(()->System.err.println("All data received"))
	    .toBlocking()
	    .subscribe();
	}

It works out (the reactive-servlet just echo's the same data as it comes in), because this file is small enough (400k) buf if I look at the output:

Read some data
.....
Read some data
Read some data
Read some data
All data complete
Something written back
Something written back
Something written back
Something written back
......
Something written back
All data received

I see that the data only starts streaming back after all data has been sent. For bigger files, my server won't accept all data if it can't write back.

from rxnetty.

jamesgorman2 avatar jamesgorman2 commented on August 11, 2024

I see what you mean now. I certainly find this behaviour unexpected. I have a repro below. It will log each with block written or read.

writeContent... does not appear to be emitting the response until it has been fully returned. Looking at the chunk example from 0.4.x I wonder if something similar is happening on 0.5.x. I haven't been able to identify is causing it yet. More investigation is required.

import io.netty.buffer.ByteBuf;
import io.netty.handler.logging.LogLevel;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.server.HttpServer;
import rx.Observable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;

public class TestLargePayloads {
  private static AtomicLong clientWrote = new AtomicLong(0);
  private static AtomicLong serverRead = new AtomicLong(0);
  private static AtomicLong serverWrote = new AtomicLong(0);
  private static AtomicLong clientRead = new AtomicLong(0);

//  private static AtomicLong iteration = new AtomicLong(0);

  public static void main(String[] args) throws Exception {

    HttpServer<ByteBuf, ByteBuf> server = HttpServer.newServer()
      .enableWireLogging("FOO", LogLevel.WARN)
      .start(
        (request, response) ->
          response.writeStringAndFlushOnEach( // force flush server output
            request.getContent().autoRelease() // release bytebufs
              .subscribeOn(Schedulers.computation())
              .map((byteBuf) -> byteBuf.toString(Charset.defaultCharset()))
              .doOnNext(s -> update(serverRead, s.length()))
              .compose(TestLargePayloads::incrementChars)
              .doOnNext(s -> update(serverWrote, s.length()))
              .doOnNext(s -> sleep()) // force thread switching
          )
      );

    TestSubscriber<String> subscriber = TestSubscriber.create();

    printStatus();

    HttpClient.newClient("localhost", server.getServerPort())
      .enableWireLogging("TMP", LogLevel.INFO)
      .createPost("/")
      .writeStringContent(
        Observable.range(0, 10)
          .map(i -> String.format("%010d", i))
          .doOnNext(s -> update(clientWrote, s.length()))
          .doOnNext(s -> sleep()), // force thread switching
        s -> true // force flush client output
      )
      .flatMap(
        response -> {
          System.out.println("Got response");
          return response.getContent().autoRelease()
            .map((byteBuf) -> byteBuf.toString(Charset.defaultCharset()));
        }
      )
      .doOnNext(s -> update(clientRead, s.length()))
      .subscribe(subscriber);

    subscriber.awaitTerminalEvent();

//    printStatus();

    System.err.println();

    subscriber.assertNoErrors();

    server.shutdown();
  }

  private static void sleep() {
    try {
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }


  static Observable<String> incrementChars(Observable<String> ss) {
    // busy work
    return ss.map(
      s ->
        s.chars()
          .map(i -> i + 1)
          .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
          .toString()
    );
  }

  static void update(AtomicLong v, int by) {
    v.addAndGet(by);

//    if (iteration.getAndIncrement() % 1000 == 0) {
      printStatus();
//    }
  }

  private static void printStatus() {
    System.out.println(
      String.format(
        "Client wrote %s, Server read %s, Server wrote %s, Client read %s [%s]",
        clientWrote,
        serverRead,
        serverWrote,
        clientRead,
        Thread.currentThread().getName()
      )
    );
  }
}

from rxnetty.

flyaruu avatar flyaruu commented on August 11, 2024

I've honestly never seen a HTTP server do this in the wild, and I really don't know if this behaviour is normal (or even allowed by the HTTP standard (I couldn't find any references to it)), but for me it would be the most efficient.

While researching this, I do get the impression that this is at least a less-trodden path (on the server side I got issues in Jetty and Undertow). Also, I discovered on the client side that cURL handles this well, it starts streaming immediately, but wget behaves like RxNetty, it doesn't stream anything until this upload is done.

from rxnetty.

frankbolander avatar frankbolander commented on August 11, 2024

@flyaruu If you change your observeOn(Schedulers.io()) to subscribeOn(Schedulers.io()), I think you will get the behavior you want.

You may want to do subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()) to ensure io/computation being done on separate threads.

from rxnetty.

flyaruu avatar flyaruu commented on August 11, 2024

I've tried doing that but I don't see the behaviour changing. If I add the observeOn / subscribeOn to James' example, it still uploads the entire post first.

from rxnetty.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.