Git Product home page Git Product logo

futures-rs's Introduction

futures-rs

Zero-cost asynchronous programming in Rust

Build Status crates.io

Documentation | Website

futures-rs is a library providing the foundations for asynchronous programming in Rust. It includes key trait definitions like Stream, as well as utilities like join!, select!, and various futures combinator methods which enable expressive asynchronous control flow.

Usage

Add this to your Cargo.toml:

[dependencies]
futures = "0.3"

The current futures requires Rust 1.56 or later.

Feature std

Futures-rs works without the standard library, such as in bare metal environments. However, it has a significantly reduced API surface. To use futures-rs in a #[no_std] environment, use:

[dependencies]
futures = { version = "0.3", default-features = false }

License

Licensed under either of Apache License, Version 2.0 or MIT license at your option.

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

futures-rs's People

Contributors

46bit avatar alexcrichton avatar aturon avatar carllerche avatar cramertj avatar diggsey avatar ebkalderon avatar ibraheemdev avatar kentfredric avatar khuey avatar killercup avatar liranringel avatar luciofranco avatar majorbreakfast avatar marwes avatar matthias247 avatar najamelan avatar nemo157 avatar olegnn avatar ralith avatar seanmonstar avatar srijs avatar stbuehler avatar stepancheg avatar taiki-e avatar tailhook avatar thomasdezeeuw avatar tinaun avatar tmccombs avatar tmiasko avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

futures-rs's Issues

Stream: Pipe, back pressure

Be warned: I didn't do much research before writing this and was just wondering if the abstractions provided here allow implementing some of the nice features of node's streams. Commencing brain dump…


I saw that the only place where back pressure in streams is addressed is the documentation for futures::stream::channel, which says

This channel is unique in that it implements back pressure to ensure that the sender never outpaces the receiver. The Sender::send method will only allow sending one message and the next message can only be sent once the first was consumed.

This sounds like its very specific to that one implementation. Is there a plan to generalize this? I/O streams using internal buffers would also need some kind of 'high water mark' (node.js calls it that way IIRC) or a way to pause streams they depend on when their buffer is full.


This brings me to my next question: Is there a plan to introduce a pipe method? I.e., a method that works like pipe in a shell by combining multiple stream processors? and_then allows transforming each item of a stream, but pipe would go a bit further than that, using the current stream's output as input for the next stream.

For example: A stream of data read from an HTTP request (the body of the request) is piped into a (streaming) JSON parser whose output is piped into CSV serialization which then writes to a file. Ideally, this would 'just' be http::get("https://some.example/data").pipe(Json::array_deserializer::<SomeStruct>()).pipe(Csv::from).pipe(File::writeStream(filename)). I guess there is a lot of work involved to make something like this work (and a lot more crates, and implicit worker/event loop handling somehow).

A Pipeable trait might become pretty complicated if you wanted it to handle back pressure across n piped streams. If I'm not mistaken, it could abstract over map and and_then as well (implementing it for closures that return Self::Item or Future<Item, Error> resp.).

Dependency Updates

Dependencies in lots of the sub-libraries are specified as absolute numbers down to the patch. This can pose problems when new versions of the libraries are released or the downstream user needs to use an older one because of various reasons.

For example, in Cargo.toml of futures-curl, the version string of curl should be changed to 0.3 if that works. If not that then ^0.3.3 since 0.3.4 is already out. At the moment I'm unable to compile by specifying 0.3.4 in the Cargo.toml of my binary because of a version conflict. Similarly, the version of crossbeam in futures-cpupool should be changed to ^0.2.10. Same for scoped-tls and slab in futures-mio. And the Cargo.toml of futures itself specifies the version for various sub-libraries as 0.1.0 whereas the sub-libraries reference futures as 0.1. I think the former should be changed to 0.1.

e9381c2d doesn't compile for me

I get the following error:

src/lib.rs:427:5: 431:6 error: reached the recursion limit while instantiating `<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<forget::ThunkFuture<empty::Empty<(), ()>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as Future>::boxed````

Using futures::stream::Sender in a loop

I'm working on a small program that reads lines from stdin and passes them off to another function, using a stream. From the tutorial document and the rustdoc, I gather that the way to implement this is to use futures::stream::channel and spawn a thread that loops through lines from stdin and sends them through the Sender.

Sender is consumed when a value is sent, so you must use a future combinator to send another value. It'd be really helpful to show an example of this. From what I can tell, this behavior means that the loop cannot be an iteration, but must use recursion because the logic must be passed as a closure/function to the combinator.

The ergonomics of that aren't ideal, but I'm not even sure I'm going about it the right way because I can't get my program to compile using that approach. I'm also concerned about memory usage. If I'm allocating a string for each line read, then recursively calling a function for the next iteration, will the strings from previous loops stay in memory until the loop finally ends? Or is this optimized somehow?

To explain this in pseudo-Rust, I'd like to do something like this:

fn run() -> Box<Stream<Item = String, Error = ::std::io::Error>{
    let (tx, rx) = channel();

    spawn(move || {
        for line in stdin.lock().lines() {
            match line {
                Ok(line) => {
                    tx.send(Ok(line));
                }
                Err(error) => {
                    tx.send(Err(error));
                }
            }
        }
    });

    rx.boxed()
}

But I think I have to do something like this:

fn run(self) -> Box<Stream<Item = String, Error = ::std::io::Error> {
    let (tx, rx) = channel();

    spawn(move || {
        fn dispatch_line(tx: Sender<String, ::std::io::Error>) {
            let mut buffer = String::new();

            match stdin().read_line(&mut buffer) {
                Ok(bytes_read) if bytes_read > 0 => {
                    tx.send(Ok(buffer)).and_then(|tx| dispatch_line(tx));
                }
                Err(error) => {
                    tx.send(Err(Error::Io(error))).and_then(|tx| dispatch_line(tx));
                }
            }
        }

        dispatch_line(tx);
    });

    rx.boxed()
}

I can't get the latter form to compile exactly as written because dispatch_line should be returning a future, but it's getting ugly quickly, so I'm probably going about this really wrong.

Future ~ Result, Stream ~ Iterator

@alexcrichton, in the tutorial (and other places) you liken Future to Iterator, and Stream to… errr, nothing, IIRC.

But isn't the correlation more like this?

# items Sync Async Common operations
1 Result<T, E> Future<T, E> map, and_then, joinN (a.k.a. zip!)
Iterator<_> Stream<_> map, fold, collect, flatten

Many operators on Option/Result/Iterator has the same/similar names and semantics (… monads…), so the key differentiator is the number of items they can yield. (With respect to Stream's error handling, the sync side should be Iterator<Item=Result<T,E>.)

Or am I missing some key insight of the implementation? I started reading the tutorial on this earlier, and my background is JS async primitives. (That Streams are actually Futures yielding (val, next_future) is a pretty nice property, btw.)

Confusing order of tuple returned by promise()

The promise function returns a tuple of (Promise, Complete).

The Promise object is similar to the Receiver object of the channels in the stdlib, while the Complete object is similar to the Sender object.

However in the stdlib, the channel() function returns a tuple of (Sender, Receiver).

I find it confusing that the order of what promise() returns is the opposite of what channel() returns.

Timer wheel needs optimizing

The timer wheel used for timeouts is suboptimal in a few cases:

  • If a server is alseep for more than the duration of the entire wheel it'll scan the wheel buckets more than once. Should ensure that "one round of polling" only looks at each bucket at most once.
  • Figuring out the next timeout is currently a scan-the-whole-world operation, which can be slow.
  • Scanning forward involves traversing the entire list of timeouts up to the point you're going towards, but that seems... bad?

Send + 'static trait requirements?

Are these trait really required on Future and Stream?
I don't really see how Futures or Streams in general absolutely need to be Send or 'static.

If mio requires them, they should probably be enforced in the code of the futures-mio crate and not in futures itself.

Panic when "forgetting" ReadTask.

When experimenting with futures and task, I got the weird panic:

thread 'main' panicked at 'assertion failed: `(left == right)` (left: `4567965872`, right: `4567966032`)', /Users/admytrenko/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task.rs:202
...
  7:        0x10fc46e01 - futures::task::Task::get::h11c78cea13602c1a
   8:        0x10fc44056 - _<futures_io..task..TaskIoTake<'a, 'b, T>>::new::h733b3b84a5e2dd45
   9:        0x10fc460b6 - _<futures_io..task..TaskIoRead<T> as futures_io..ReadTask>::read::h8e19612329de43fb
  10:        0x10fc4a451 - _<futures_io..read_exact..ReadExact<A, T> as futures..Future>::poll::ha3f0ac7f0692a919

I understand, that "forgetting" task here probably does not make much sense, but "safe" rust should not panic unless I'm calling some functions that may panic (unwrap, etc).

futures-rs version: f672963 (latest master at this time).
The smallest example I came up with, which crashes when you try to connect to the server:

extern crate futures;
extern crate futures_io;
extern crate futures_mio;

use std::env;
use std::net::SocketAddr;

use futures::Future;
use futures_io::{copy, TaskIo, read_exact, write_all};
use futures::stream::Stream;

fn main() {
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse::<SocketAddr>().unwrap();
    let mut l = futures_mio::Loop::new().unwrap();
    let server = l.handle().tcp_listen(&addr);
    let done = server.and_then(move |socket| {
        // Once we've got the TCP listener, inform that we have it
        println!("Listening on: {}", addr);
        socket.incoming().for_each(|(socket, addr)| {
            let io = TaskIo::new(socket);
            let pair = io.map(|io| io.split());
            let amt = pair.map(|(reader, writer)| {
                read_exact(reader, vec![0; 1]).forget()
            }).forget();
            Ok(())
        })
    });
    l.run(done).unwrap();
}

Cannot "forget" a future with `Sender`

extern crate futures_mio;
extern crate futures;

use std::io;
use futures::*;

use futures_mio::Sender;

fn foo(s: Sender<()>) {
    let result: Result<(), io::Error> = Ok(());
    done(result)
        .map(|x| {
            s.send(x);
            ()
        })
        .forget();
}

Error is:

tests/channel.rs:16:10: 16:16 error: the trait bound `std::sync::mpsc::Sender<()>: std::marker::Sync` is not satisfied [E0277]
tests/channel.rs:16         .forget();
                             ^~~~~~
tests/channel.rs:16:10: 16:16 help: run `rustc --explain E0277` to see a detailed explanation
tests/channel.rs:16:10: 16:16 note: `std::sync::mpsc::Sender<()>` cannot be shared between threads safely
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `mio::channel::Sender<()>`
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `futures_mio::Sender<()>`
tests/channel.rs:16:10: 16:16 note: required because of the requirements on the impl of `std::marker::Send` for `&futures_mio::Sender<()>`
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `[closure@tests/channel.rs:12:14: 15:10 s:&futures_mio::Sender<()>]`
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `std::option::Option<[closure@tests/channel.rs:12:14: 15:10 s:&futures_mio::Sender<()>]>`
tests/channel.rs:16:10: 16:16 note: required because it appears within the type `futures::Map<futures::Done<(), std::io::Error>, [closure@tests/channel.rs:12:14: 15:10 s:&futures_mio::Sender<()>]>`
error: aborting due to previous error

Add loop combinators

In order to conveniently deal with loops which may have a body that completes asynchronously, we should ensure that there are a suite of combinators for working with multiple kinds of loops. Especially in light of #77 where tailcall is being removed (it never really worked anyway) it'll be important to express looping not through recursion.

Also see #62

Current ideas for loops:

  • Finite loops -- stream::iter(some_iterator).fold(...)
  • Infinite loops stream::iter(iter::repeat(()).map(Ok)).fold(...)
  • @aturon mentions we could work on a fixed point combinator
  • @carllerche suggests the trampoline function in clojure

compile without nightly compiler

the current futures code doesnt compile with stable due to use of the scoped_tls.
I tried to use the code with just future-rs and futures-cpupool.

future::Fuse is a bit weird and has out of date documentation

The documentation talks about Some/None which are nowhere in sight. Instead, it returns NotReady if the value has already been returned. That's probably fine although returning Some(value) and then None forever would be more in-line with Iterator::fuse.

I know this issue is a bit vague but, beyond fixing the documentation, I don't really know the correct thing to do here.


Just to be clear, this is talking about this Fuse.

Taking a CpuPool

Suppose a library wants to do some work on a thread pool and return a future for the result. For concreteness, it might be:

  • Wrapping a blocking IO API (in this case #threads > #cpus makes sense).
  • Doing CPU intensive work (in this case #threads > #cpus does not make sense).

It does not sound like a good idea for each such library to create its own thread pool (CpuPool). Hence, the library should take the thread pool by dependency injection. This is good for sharing a thread pool among multiple libraries, testing, leaving control in the hands of the library user, etc.

My question is: what type should the library take for the thread pool? Taking a CpuPool is an option but seems overly restrictive. Does it make sense to have a trait for that, and the library can then take the trait instead of a concrete struct?

For example, in Java the standard method for this is the "executor framework" - particularly the interfaces Executor, ExecutorService etc. This was also stolen by python and ruby-concurrency and maybe others.

Get an ironclad guarantee about safety of TaskData

Right now the safety of TaskData relies on the fact that the id field of Task is never reused. Thinking through this now I'm not actually sure that there's any reasonable way we can reuse it, however, as we would have to otherwise invalidate all existing TaskData handles.

We may need to alter the scheme to make TaskData resilient when used with a recycled Task. I have a few ideas.

cc @aturon

adding certificates using backend ssl context object

let tls_handshake = socket.and_then(|socket| {
        let mut cx = ClientContext::new().unwrap();
        {
            let backend_cx = cx.ssl_context_mut();
            backend_cx.set_CA_file(ca);
            backend_cx.set_certificate_file(crt,  X509FileType::PEM);
            backend_cx.set_certificate_file(key, X509FileType::PEM);
        }
        cx.handshake(host_name, socket)
    });

I need to use this type

use futures_tls::backend::openssl::x509::X509FileType;

But seems like not everything from backend is exposed.

http://alexcrichton.com/futures-rs/futures_tls/backend/openssl/index.html

Do you think these should be exposed?

Publish/Broadcast stream transformer

One thing which I noticed was missing from the Streams implementation was some sort of "broadcast" transformer which allowed a stream to be subscribed to multiple times (i.e. polled/scheduled by multiple tasks in the language of futures-rs).

I was wondering if there was any interest in having this as I've been prototyping this and would love to upstream it!

unbounded channel with non-blocking non-future sender and Stream receiver

I need to send messages to the event loop from other threads. Event loop works like this (in pseudocode):

loop {
    let item = channel.receive();
    write(socket, serialize(item));
}

futures-rs provides only synchronous channel, which it not suitable for this case: because sender should not block indefinitely if socket is not available for writing.

Seems like futures-rs could provide another building block: channel with unbounded size and non-blocking sender (as in std::sync::mpsc::channel) and with Stream interface to the receiver.

only epoll blocking?

Question for the comment in locks.rs

" As a futures library the eventual call to epoll should be the only thing
that ever blocks"

Does that mean that only epoll will be supported for network calls (at least on Linux)?

A Future could hold a blocking network call or any other longer-running computation.

e.g.

let str: Future = count_words_in_rust_doc();

Maybe I do not understand the comment correctly. I assume that futures are meant to be generic.

I like this project a lot and I also like that that Rust community is not rushing this. It took Java many years and JDK 8 to finally get usable futures (CompletableFuture).
Similar for Python and C+++.
And Scala is still improving it's futures:
https://github.com/viktorklang/blog/blob/master/Futures-in-Scala-2.12-part-1.md

So it is good to design this carefully, maybe together with non-blocking IO. It will be crucial for Rust and very important for anyone working on stuff like databases, web servers, big data analytics and many other distributed systems.

Minihttp as library

Hi,
first of all great work, futures was what I was waiting for! Is there any plan to keep the HTTP lib, maybe as a separate lib, in a stable way instead of proof-of-concept?
It would be interesting and I can also try to help on that!

Thnx.

Bye,
Paolo

Add support for not blocking indefinitely

The following sorts of things would be nice:

  • Functionality to poll a future without worrying about what machinery must be set up for tasks to work. The documentation says that users shouldn't call poll themselves. I'm not sure how this API would work precisely.
  • Functionality to wait on a future with a timeout, arranged such that the timeout is never late. This is important for games and similar applications which must attempt to run loops at a steady rate. wait blocks indefinitely.
  • An iterator that pulls elements from streams, but only if those elements arrive in a specific time period. That is, something like Straem.iter_timeout(duration).
  • The ability to spin the Mio event loop once, indicating if the future I passed it completed or not. This doesn't appear to be exposed, but spinning the loop once with a timeout does exist in Mio.

My goal here is to potentially rewrite my UDP networking protocol project on top of futures. At the moment, it's using an API based around callbacks that get called on a background thread, leaving any inter-thread communication up to the end user. This obviously makes for a horrible API. I could force all projects that want to use it to opt into Mio instead, but mio doesn't appear to have game-friendly timing and games are one of the primary applications.

I'm not sure which of the above are possible, but this does appear to be a set of missing functionality. Even my first item would make Channel drastically more useful.

Reference counting in CpuPool and Inner

What's the reasoning behind using a custom reference counting in Inner in futures-cpupool? The inner is already wrapped in an Arc and sending the close messages on drop can be implemented for Inner itself instead of CpuPool.

Atomically draining & dropping a consumer of Channel / Oneshot

Channel and Oneshot are multithreaded "message passing" constructs. Both include some level of internal storage for in-flight messages.

There would be utility in providing a way for the receiving end to atomically drain & drop (shutdown) their handles. This would provide a way to guarantee that values don't get "lost" in the internal storage.

I implemented this initially as a try_release fn, but this may or may not be the best way to expose the functionality:

https://github.com/tokio-rs/tokio/blob/master/src/util/future/channel.rs#L126

empty.rs:37 drops a `&mut Task`

If I understand the specifics of std::mem::drop(_), this only drops the &mut reference, not the Task. Unfortunately, it is impossible to drop a value by its mutable borrow. I am not sure how to handle this, perhaps the Task needs a drop(&mut self) method that can be called instead.

A question about `future-rs` regarding immutable borrows

I am not sure if that is the right place to ask this question, so feel free to ignore it.

I have created a proof of concept task system before I even knew about future-rs. It is not even remotely polished and I haven't touched it in a while. I had a look at future-rs and I am not sure if it would satisfy my needs.

Let me quickly show how my task system works

fn main() {
    let res: Future<i32> = TASK.submit(move || {
        println!("Before long running task");
        let short_running_task: Future<i32> = TASK.submit(|| 24);
        // Submits a long running task
        let long_running_task: Future<i32> = TASK.submit(|| {
            std::thread::sleep(Duration::from_secs(10));
            return 42;
        });
        // Waits for the short running task to complete, does not block other tasks!
        println!("After short running task {}", short_running_task.await());
        // Waits for the long running task to complete, does not block other tasks!
        println!("After long running task {}", long_running_task.await());
        42
    });

    let v: Vec<Future<()>> = (0 .. 20).map(|i|{
        TASK.submit(move || println!("Another Task {}", i))
    }).collect();

    println!("{}", res.await());
}

which prints

Another Task 2
Another Task 0
Before long running task
Another Task 3
Another Task 5
Another Task 8
Another Task 6
Another Task 17
Another Task 7
Another Task 9
Another Task 1
Another Task 10
Another Task 4
Another Task 13
Another Task 11
Another Task 14
Another Task 12
Another Task 19
Another Task 15
Another Task 16
Another Task 18
After short running task 24
After long running task 42
42

I basically create n-1 local task queues which receives a task, it then wraps that task in a fiber/ coroutine and continuously tries to do some work.

Now I had a very rough look at future-rs. http://alexcrichton.com/futures-rs/src/futures/src/lib.rs.html#260

And you require a 'static lifetime, I think that means that I can basically not have references in my task right?

I am currently designing a Vulkan library and I want to be able to do some work on different threads. To satisfy a 'static would basically mean that I would have to wrap a lot of stuff in Arc. I made also a reddit post recently about this problem.

The thing is, it complicates the library and I don't think it is actually needed. In my head I want to do something like this

let physical_device = ...;
let ref_physical_device = &physical_device;
let future_something: Future<Something> = TASK.submit(move || {
    ref_physical_device.create_something()
};
//Blocks
let something: Something = future_something.await();

The only thing I have to worry about is that the reference lives long enough. But I know that it will live long enough because I am calling await which will block the main thread or reschedule the fiber (otherwise I could call await in the destructor to make sure that it always lives long enough). I haven't currently implemented this because I wanted to see if future-rs might also allow this.

The TLS connection was non-properly terminated

Compiling and executing techempower1 (cargo run --release) works fine, but getting the following error message from curl:

$ curl https://localhost:8080/json
curl: (35) gnutls_handshake() failed: The TLS connection was non-properly terminated.

Not clear how to make composition of futures with `CpuPool::execute`

I use CpuPool to perform parallel computations in streams, here is my snippet of code:

let entires: Vec<DirEntry> = ...
let cpu_pool = CpuPool::new(num_cpus::get() as u32);
let stream = futures::stream::iter(entries.into_iter().map(|entry| Ok(entry)))
    .map(|entry| {
        cpu_pool.execute(move || {
            // - open file using `File::open` - returns `Result`
            // - read file using `futures_io::read_to_end` - returns `ReadToEnd` future
            // - parse content of a file using `ReadToEnd::and_then` (this is a heavy task)
        })
    })
    .buffered(num_cpus::get() * 2)
    .filter(...)
    .collect();

thread 'main' panicked at 'assertion failed: `(left == right)`

I'm getting a bizarre error at runtime:

thread 'main' panicked at 'assertion failed: (left == right) (left: 140241379868944, right: 140241379869200)', /home/srwalter/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/lib.rs:208

If this is caused by an error in my program, I'm at a loss to understand what I'm doing wrong. Attached is a test case that should reproduce the error. At a high level I'm trying to adapt a socket into a Stream of messages. From the backtrace it seems to be failing in read_exact() called by my poll() implementation, but I can't find anything that looks like the assertion printed.

main2.txt

Note that you'll need a listening socket at localhost:1234 for the program to connect to; I just used netcat: nc -l 127.0.0.1:1234

Q: how to attach a callback to a future?

Hello,

Just seen

One of the most powerful features of Guava futures and Akka is the possibility to attach callbacks to a future, which are called immediately as soon as the future operation completes. Lack of this operation in the original Java futures libraries made the library almost useless for us as we wanted a full non-blocking system.

Is this feature implemented or planned? It's definitely a deal-breaker otherwise.

Thanks!

BoxFuture.wait() does not work unless futures::Future is imported

xx.rs:

use futures::*;

pub fn yy() -> BoxFuture<u32, ::std::io::Error> {
    done(Ok(1)).boxed()
}

main.rs:

extern crate futures;

mod xx;

use futures::Future;

fn main() {
    let f = xx::yy();

    println!("wait: {:?}", f.wait());
}

This code works.

If use futures::Future is commented out in main.rs, compiler complains:

src/main.rs:10:30: 10:34 error: the trait bound `futures::Future<Error=std::io::Error, Item=u32> + Send: std::marker::Sized` is not satisfied [E0277]
src/main.rs:10     println!("wait: {:?}", f.wait());
                                            ^~~~
<std macros>:2:27: 2:58 note: in this expansion of format_args!
<std macros>:3:1: 3:54 note: in this expansion of print! (defined in <std macros>)
src/main.rs:10:5: 10:38 note: in this expansion of println! (defined in <std macros>)
src/main.rs:10:30: 10:34 help: run `rustc --explain E0277` to see a detailed explanation
src/main.rs:10:30: 10:34 note: `futures::Future<Error=std::io::Error, Item=u32> + Send` does not have a constant size known at compile-time
error: aborting due to previous error
error: Could not compile `futures-rs-td`.

I'm not sure if is it a bug, or just a usability problem, is it in futures-rs or in rust language, but it is hard to understand error message.

rustc 1.11.0 (9b21dcd6a 2016-08-15), futures-rs from master

Extract io2 out of minihttp and make public more structs

I was trying to implement a non-http server using future-mio and the abstractions in io2.rs look quite useful for any sort of server, and none of its code is specific to http.

I was wondering if there's a plan to extract those outside of minihttp and make other structs like ParseStream publicly available, or if there's a better suggestion for handling these cases.

Thanks

busy loop when waiting on Stream::buffered()

Consider the following program:

extern crate futures;
extern crate futures_io;
extern crate futures_mio;

use futures::Future;
use futures::stream::Stream;
use futures_io::{copy, TaskIo};

pub fn main() {
    let mut l = futures_mio::Loop::new().unwrap();
    let srv = l.handle().tcp_listen(&"127.0.0.1:5000".parse().unwrap());

    let future = srv.and_then(move |server| {
        server.incoming()
            .and_then(|s| TaskIo::new(s.0))
            .map(|i| i.split())
            .map(|(a,b)| copy(a,b).map(|_| ()))
            .buffered(10)
            .collect()
    });

    let _ = l.run(future);
}

Before 9804cec, this program successfully acted as a TCP echo server. Now it hangs, pegs a CPU, and fails to echo any data sent to it.

no_std support

The core futures crate is very close to being buildable in no_std crates (with liballoc and libcollections). I've applied all necessary changes in an experimental branch and it compiles without problems.

There are only two rough spots:

  • The LIMITED executor can't be used as it relies on a thread local variable. To circumvent this, we can set the default executor to Inline.
  • We can't catch_unwind.

I'd love to use this library in my no_std crates. Are there any plans to add no_std support, e.g. through a cargo feature?

DNS resolution

This is a feature that's missing from mio and futures-rs. All the APIs take a SocketAddr, and do not offer a way of resolving host names. The lookup_host method from the standard library is only available on nightly, making it "technically" impossible to resolve host names on stable. I found a hack around this using TcpStream::connect followed by a call to peer_addr() but it's far from ideal and it's not asynchronous.

I think a really simple implementation could be offered using a thread pool and a call to getaddrinfo. Eventually there could be a better solution such as bindings to the getdns library, or a home made solution.

Stack overflow when using streams

The issue is that streams require recursion in order to loop. The current assumptions in the futures library are that, at some point, a future / stream will not be ready, forcing a callback to be registered (essentially a defer). However, this assumption is not always true. The following example demonstrates the problem w/ immediately ready futures but the issue exists in any situation where futures become ready faster than they are consumed.

Example:

extern crate futures;

use futures::Future;
use futures::stream::{channel, Stream, Sender};

fn count_down(i: u32, f: Sender<u32, ()>)
    -> Box<Future<Item = (), Error = ()>>
{
    let busy = f.send(Ok(i));
    if i > 0 {
        busy
            .map_err(|_| ())
            .and_then(move |sender| {
                count_down(i - 1, sender)
            })
            .boxed()
    } else {
        Box::new(futures::finished::<(), ()>(()))
    }
}

fn main() {
    let (tx, rx) = channel::<u32, ()>();

    rx.for_each(move |v| {
        Ok(())
    }).forget();

    count_down(100_000, tx).forget();
}

Never vs. PhantomData

An RFC was recently approved for the ! (Never) type, which represents the type of a value that can never exist.

Several structs in of futures-rs use PhantomData to mark the type of values that can't exist, such as the Error type of Finished, the Item type of Failed, and the Item and Error types of Empty. I believe that, once this PR lands (and then makes its way to stable), these structs should be rewritten to use the Never type. Empty, for example, could be written as follows:

pub struct Empty {}

pub fn empty() -> Empty { // Is this even necessary any more?
    Empty {}
}

impl Future for Empty {
    type Item = !;
    type Error = !;

    fn poll(&mut self, _: &mut Task) -> Poll<Self::Item, Self::Error> {
        Poll::NotReady
    }

    fn schedule(&mut self, task: &mut Task) {
        drop(task);
    }
}

Unfortunately, this feature has yet to land on nightly, let alone stable. Therefore, a transition to Never is likely a ways off. As this library is already getting lots of attention, I believe there should be a plan for how and if to structure the library so that a future (hehe 😃 ) transition to Never is possible.

In order to be more consistent with terminology, I also think that Empty should be renamed to Never. ! is called Never rather than Empty or Void as a result of the discussion at the end of the Never PR. The gist of it is that Empty isn't a future for an Empty value, it's a future for a value that can Never exist (i.e. it Never completes).

P.S. Great work on this library, @alexcrichton & company! I'm excited to use this in my future Rust projects.

compilation of futures-tls fails due to dependence on openssl release

Hi Alex

You probably know about this already, but when trying to use futures-tls the compilation fails with this message:

futures-tls-0.1.0/src/openssl.rs:66:32: 66:63 error: failed to resolve. Could not find `HandshakeError` in `openssl::openssl::ssl` [E0433]

Digging into it, it looks like it's because the changes you made to the openssl crate (HandshakeError) haven't made it into a release yet. Give me a shout when there's an updated dependency that fixes this, or if you know of an easy workaround (just learning Rust). Looks like a really promising library, so thanks!

Regards

Tom

Rename forget to ensure.

When I first saw the forget method, I immediately assumed that this was a way to cancel the future (forget about it). While I understand this was named after the std function mem::forget and it means "don't clean this is up but let it finish", assuming that the user will know about this relatively obscure std method is probably not a good idea.

Personally, I recommend renaming it to ensure like the python method asyncio.ensure_future as it seems to do something fairly similar.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.