Git Product home page Git Product logo

rsocket-rust's Introduction

rsocket-rust

GitHub Workflow Status Build Status Crates.io Crates.io License GitHub Release

rsocket-rust is an implementation of the RSocket protocol in Rust(1.39+). It's an alpha version and still under active development. Do not use it in a production environment!

Example

Here are some example codes which show how RSocket works in Rust.

Dependencies

Add dependencies in your Cargo.toml.

[dependencies]
tokio = "1.0.3"
rsocket_rust = "0.7"

# add transport dependencies:
# rsocket_rust_transport_tcp = "0.7"
# rsocket_rust_transport_websocket = "0.7"

Server

extern crate log;

use futures::executor::block_on;
use rsocket_rust::prelude::*;
use rsocket_rust::utils::EchoRSocket;
use rsocket_rust::Result;
use rsocket_rust_transport_tcp::*;

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::builder().format_timestamp_millis().init();

    RSocketFactory::receive()
        .transport(TcpServerTransport::from("127.0.0.1:7979"))
        .acceptor(Box::new(|setup, _sending_socket| {
            info!("incoming socket: setup={:?}", setup);
            Ok(Box::new(block_on(async move {
                RSocketFactory::connect()
                    .transport(TcpClientTransport::from("127.0.0.1:7878"))
                    .acceptor(Box::new(|| Box::new(EchoRSocket)))
                    .setup(Payload::from("I'm Rust!"))
                    .start()
                    .await
                    .unwrap()
            })))
        }))
        .serve()
        .await
}

Client

extern crate log;

use rsocket_rust::prelude::*;
use rsocket_rust::utils::EchoRSocket;
use rsocket_rust::Result;
use rsocket_rust_transport_tcp::TcpClientTransport;

#[tokio::main]
async fn main() -> Result<()> {
     env_logger::builder().format_timestamp_millis().init();
    let client = RSocketFactory::connect()
        .transport(TcpClientTransport::from("127.0.0.1:7878"))
        .acceptor(Box::new(|| {
            // Return a responder.
            Box::new(EchoRSocket)
        }))
        .start()
        .await
        .expect("Connect failed!");

    let req = Payload::builder().set_data_utf8("Ping!").build();

    match client.request_response(req).await {
        Ok(res) => info!("{:?}", res),
        Err(e) => error!("{}", e),
    }

    Ok(())
}

Implement RSocket trait

Example for access Redis(crates):

NOTICE: add dependency in Cargo.toml => redis = { version = "0.19.0", features = [ "aio" ] }

use std::str::FromStr;

use redis::Client as RedisClient;
use rsocket_rust::async_trait;
use rsocket_rust::prelude::*;
use rsocket_rust::Result;

#[derive(Clone)]
pub struct RedisDao {
    inner: RedisClient,
}

// Create RedisDao from str.
// Example: RedisDao::from_str("redis://127.0.0.1").expect("Connect redis failed!");
impl FromStr for RedisDao {
    type Err = redis::RedisError;

    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
        let client = redis::Client::open(s)?;
        Ok(RedisDao { inner: client })
    }
}

#[async_trait]
impl RSocket for RedisDao {
    async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
        let client = self.inner.clone();
        let mut conn = client.get_async_connection().await?;
        let value: redis::RedisResult<Option<String>> = redis::cmd("GET")
            .arg(&[req.data_utf8()])
            .query_async(&mut conn)
            .await;
        match value {
            Ok(Some(value)) => Ok(Some(Payload::builder().set_data_utf8(&value).build())),
            Ok(None) => Ok(None),
            Err(e) => Err(e.into()),
        }
    }

    async fn metadata_push(&self, _req: Payload) -> Result<()> {
        todo!()
    }

    async fn fire_and_forget(&self, _req: Payload) -> Result<()> {
        todo!()
    }

    fn request_stream(&self, _req: Payload) -> Flux<Result<Payload>> {
        todo!()
    }

    fn request_channel(&self, _reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
        todo!()
    }
}

TODO

  • Operations
    • METADATA_PUSH
    • REQUEST_FNF
    • REQUEST_RESPONSE
    • REQUEST_STREAM
    • REQUEST_CHANNEL
  • More Operations
    • Error
    • Cancel
    • Fragmentation
    • Resume
    • Keepalive
  • QoS
    • RequestN
    • Lease
  • Transport
    • TCP
    • Websocket
    • WASM
  • Reactor
    • ...
  • High Level APIs
    • Client
    • Server

rsocket-rust's People

Contributors

adoerr avatar hplewis avatar huahouye avatar jjeffcaii avatar kingljl avatar kuronyago avatar seal90 avatar yuriykulikov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rsocket-rust's Issues

Support CANCEL Frames for Request-Stream

RSocket CANCEL frames can be used to close (abort, short-circuit, ⏹️ ) streams from the client, when the client is no longer interested in consuming the rest of the stream items.

Motivation

Closing the streams upon reception of a CANCEL frame allows to free up the resources (for example memory allocated for buffers).

Desired solution

CANCEL Frame support should be implemented on both the server and the client side in the DuplexSocket.

Desired solution (client)

On the client side, the DuplexSocket has to send a CANCEL frame when the client code drops the stream handle.

There are several ways to achieve that. One is check the error code when sending values to the Handler::ReqRS(sender) sender. Unfortunately, this will send the cancel frame only when the server sends the next value after the stream was dropped. Another possible solution is to check the tx.is_closed() periodically. I don't know which one is better and most likely there is some other way.

Desired solution (server)

On the server side, the responder RSocket has to be notified about the fact that the stream is closed by the client.

To achieve that, the stream which is returned by the responder can be wrapped in an Abortable:

let (abort_handle, abort_registration) = AbortHandle::new_pair();
let mut payloads = Abortable::new(responder.request_stream(input),abort_registration);

The handle can be stored for the future use in a map:

abort_handles.insert(sid, abort_handle);

When the client send the CANCEL Frame and it is received on the server side, DuplexSocket performs a lookup in the map and aborts the stream:

if let Some((_,abort_handle)) = self.abort_handles.remove(&sid) {
     abort_handle.abort();
}

This will allow the RSocket responder implementation to perform a tx.is_closed() check (which will return true) and free up the resources.

Considered alternatives

One alternative solution would involve changing the RSocket API to expose the AbortHandle to the RSocket implementation. In this case the client can use this handle to abort explicitly instead of dropping the stream. The server would get the AbortHandle and can use it to check whether it is aborted or not. It seems that AbortHandle cannot be polled making this alternative pretty useless.

Additional context

A TODO is present in the DuplexSocket code:
// TODO: support cancel

I would be glad to implement a PR if this issue is accepted 😄

WebAssembly support

Construct transport layer and delegate the requests to Wasm Runtime, no API change for to compile Rust to WebAssembly. Now just client transport required only. For example WasmClientTransport, can accept "ws://xxx:8080/rsocket" or "tcp://xxx:42252", and requests will be delegated to Wasm runtime(Browser, Node) by calling import functions. Example as following:

RSocketFactory::connect()
        .acceptor(|| Box::new(EchoRSocket))
        .transport(WasmClientTransport::new("ws://localhost:8080/rsocket"))
        .setup(Payload::from("I am Wasm Requester!"))
        .mime_type("text/plain", "text/plain")

Convert Result to Option when use std::str::from_utf8

代码中有不少地方涉及到Result转换为Option, �使用expect会导致panic错误,这种场景中,可能转换为None可能更符合语义,如std::str::from_utf8(b.as_ref()).ok()

pub fn data_utf8(&self) -> Option<&str> {
        match &self.d {
            Some(b) => Some(std::str::from_utf8(b.as_ref()).expect("Invalid UTF-8 bytes.")),
            None => None,
        }
    }

Unable to run on windows system

System Version: Windows 11 home

rsocket version:
rsocket_rust = "0.7.2"
rsocket_rust_transport_tcp = "0.7.2"

Compilation time error:

error[E0432]: unresolved import `tokio::net::UnixStream`
 use tokio::net::UnixStream;
  |     ^^^^^^^^^^^^^^^^^^^^^^ no `UnixStream` in `net`

Support headers on WebsocketClientTransport

It would be good to support setting headers when using the WebsocketClientTransport. I imagine this would be done kinda like how the rsocket-java project does it, in that you can add any number of customer headers when setting up the connection, to be used when the connection is established.

Motivation

It would be useful to use this to create an rsocket client in rust which connects to an existing rsocket server that requires Authorization headers.

Desired solution

Either like:

    let client = RSocketFactory::connect()
        .transport(WebsocketClientTransport::from("127.0.0.1:8080"))
        .header("Authorization", "abc123")
        .setup(Payload::from("READY!"))
        .mime_type("text/plain", "text/plain")
        .start()
        .await?;

or

    let client = RSocketFactory::connect()
        .transport(WebsocketClientTransport::from("127.0.0.1:8080").header("Authorization", "abc123"))
        .setup(Payload::from("READY!"))
        .mime_type("text/plain", "text/plain")
        .start()
        .await?;

I'd need to look a little closer to see which would make more sense.

Considered alternatives

None really that would make much sense.

Additional context

N/A

Websocket TLS Support (wss)

Motivation

TLS is an important feature for a network library to support and required for nearly any real world use-case.

Desired solution

Add the native-tls feature for tokio-tungstenite and wrap any TcpStreams with MaybeTlsStream<TcpStream>.

Considered alternatives

None considered at the moment.

Additional context

I have a very simple proof-of-concept branch here, but it would need a bit more work before it's mergeable. I'm unsure how much time I'll have to get it cleaned up, but if I do, I'll submit it as a PR.

Closure support for server acceptor

Currently the acceptor setup for the server takes a function pointer (FnAcceptorWithSetup)/doesn't take closures, so there's no way to create and pass a dynamic/configurable acceptor. I have it modified for this locally (basically fn -> Fn and boxing it). I see you've made similar changes to the server's startup function, so is this something you already have underway/have a plan for or can I open a PR?

thiserror & anyhow

Rust的错误处理中,anyhow和thiserror搭配还是不错的,anyhow::Error 可以作为Root error,然后通过thiserror::Error进行RSocketError enum进行扩展,当然同时可以使用 anyhow::Result<T>,这种方式会让错误处理更清晰些。 Deno在最新的1.4.1版本,也使用了anyhow::Error 进行错误处理。 async fn main() -> Result<(), Box<dyn Error + Send + Sync>> 这样的代码稍微难阅读一些。

use thiserror::Error;

#[derive(Error, Debug)]
#[error("RS-{code:#010x}: {msg}")]
pub struct RSocketError {
    code: u32,
    msg: String,
}

目前可能还是用struct表示RSocket Error合适一些。

Non Trivial Examples?

As I'm quite new to async programming in rust, I really would appreciate some non trivial examples with a real backend. (I tried tokio_postgres, but fail to get it right).

I implemented RSocket trait, but when I try to Box::pin() my result from the db, it complains about the missing sync trait in the tokio_postgres code. I have absolutely no clue how to solve that.

Receiving error when server tries to request client.

Thanks for this nice crate,

Sending a request to the client in the callback of the acceptor method fails.

Expected Behavior

Expected to request_response method return the response for the request.

Actual Behavior

request_response method returns error:

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: APPLICATION_ERROR: TODO: should be error details'

Steps to Reproduce

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
    RSocketFactory::receive()
        .transport(TcpServerTransport::from("127.0.0.1:7878"))
        .acceptor(Box::new(|setup, socket| {
            println!("socket establish: setup={:?}", setup);
            tokio::spawn(async move {
                let req = Payload::builder().set_data_utf8("Hello World!").build();
                let res = socket.request_response(req).await.unwrap();
                println!("SERVER request CLIENT success: response={:?}", res);
            });
            // Return a responder.
            // You can write you own responder by implementing `RSocket` trait.
            Ok(Box::new(EchoRSocket))
        }))
        .on_start(Box::new(|| println!("echo server start success!")))
        .serve()
        .await
}

Your Environment

Darwin Mahdis-MacBook-Pro.local 17.7.0 Darwin Kernel Version 17.7.0: Thu Jun 21 22:53:14 PDT 2018; root:xnu-4570.71.2~1/RELEASE_X86_64 x86_64

Cargo.toml dependencies

[dependencies]
tokio = { version = "0.3.6", features = ["full"] }
rsocket_rust = "0.6.0"
rsocket_rust_transport_tcp = "0.6.0"

I also tried the "0.5.1" with the appropriate tokio version and also the git master branch.

Any suggestion?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.