Git Product home page Git Product logo

calloop's Introduction

Smithay

Crates.io docs.rs Build Status Join the chat on matrix at #smithay:matrix.org Join the chat via bridge on #smithay on libera.chat

A smithy for rusty wayland compositors

Goals

Smithay aims to provide building blocks to create wayland compositors in Rust. While not being a full-blown compositor, it'll provide objects and interfaces implementing common functionalities that pretty much any compositor will need, in a generic fashion.

It supports the core Wayland protocols, the official protocol extensions, and some external extensions, such as those made by and for wlroots and KDE

Also:

  • Documented: Smithay strives to maintain a clear and detailed documentation of its API and its functionalities. Compiled documentations are available on docs.rs for released versions, and here for the master branch.
  • Safety: Smithay will target to be safe to use, because Rust.
  • Modularity: Smithay is not a framework, and will not be constraining. If there is a part you don't want to use, you should not be forced to use it.
  • High-level: You should be able to not have to worry about gory low-level stuff (but Smithay won't stop you if you really want to dive into it).

Anvil

Smithay as a compositor library has its own sample compositor: anvil.

To get informations about it and how you can run it visit anvil README

Other compositors that use Smithay

  • Cosmic: Next generation Cosmic desktop environment
  • Catacomb: A Wayland Mobile Compositor
  • MagmaWM: A versatile and customizable Wayland Compositor
  • Niri: A scrollable-tiling Wayland compositor
  • Strata: A cutting-edge, robust and sleek Wayland compositor
  • Pinnacle: A WIP Wayland compositor, inspired by AwesomeWM
  • Sudbury: Compositor designed for ChromeOS

System Dependencies

(This list can depend on features you enable)

  • libwayland
  • libxkbcommon
  • libudev
  • libinput
  • libgbm
  • libseat
  • xwayland

Contact us

If you have questions or want to discuss the project with us, our main chatroom is on Matrix: #smithay:matrix.org.

calloop's People

Contributors

13r0ck avatar atouchet avatar chrisduerr avatar daxpedda avatar dependabot[bot] avatar detly avatar djmcnab avatar drakulix avatar elinorbgr avatar fengalin avatar guhwanbae avatar hafeoz avatar i509vcb avatar ids1024 avatar kchibisov avatar notgull avatar rtzoeller avatar vberger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

calloop's Issues

Can callback functions be nested?

I'm trying to write a server with calloop that handles read and write events.
First, The event loop should listen to the server socket and wait for the client to connect.
Second, If a client connects to this socket, the event loop will process read write events from the client.

1 fn main() {
2    let mut event_loop: EventLoop<i32> = EventLoop::try_new().expect("Failed to initialize the event loop");
3    // server socket
4    let socket = bind_ipc_socket("/xxx");
5    // wait for client to connect
6    event_loop.handle().insert_source(
7        Generic::new(socket, Interest::READ, Mode::Edge),
8        |event, metadata, shared_data| {
9            // accept the client
10            let rw_socket = accept_client(*metadata);
11            // wait for the message from the connected client
12            event_loop.handle().insert_source(
13               Generic::new(rw_socket, Interest::READ, Mode::Edge),
14                |event, metadata, shared_data| {
15                    // process read or write event from client
16                    process_message(*metadata);
17                    core::result::Result::Ok(PostAction::Continue)
18                }
19            );
20            core::result::Result::Ok(PostAction::Continue)
21        }
22    );
23 }
fn bind_ipc_socket(path: &str) -> i32 {}
fn accept_client(socket: i32) -> i32 {}
fn process_message(socket: i32) {}

I get the following error when running:

error[E0597]: `event_loop` does not live long enough
  --> src/main.rs:12:13
   |
8 |         |event, metadata, shared_data| {
   |         ------------------------------ value captured here
...
12 |             event_loop.handle().insert_source(
   |             ^^^^^^^^^^ borrowed value does not live long enough
...
23 | }
   | -
   | |
   | `event_loop` dropped here while still borrowed
   | borrow might be used here, when `event_loop` is dropped and runs the destructor for type `EventLoop<'_, i32>`

I'm confused about that. Can callback functions be nested? Or there is another way to deal with.
Thanks a lot.

Use kqueue `EVFILT_SIGNAL` to support signals on BSD

The fact that signals are only supported on Linux is currently one of the issues using Smithay on BSDs. It looks like it should be possible to implement this with kqueue instead of signalfd, though I'm not currently familiar with kqueue.

Token sub-ID system does not allow more than one level of composition

Consider two custom event sources:

struct SourceOne {
    ping_one_a: calloop::ping::PingSource,
    ping_one_b: calloop::ping::PingSource,
}

struct SourceTwo {
    ping_two_a: calloop::ping::PingSource,
    ping_two_b: calloop::ping::PingSource,
}

Following the composition technique suggested in #36, you might create either of these with their own token sub-IDs:

impl SourceOne {
    const ID_A: u32 = 1;
    const ID_B: u32 = 2;
}

impl SourceTwo {
    const ID_A: u32 = 1;
    const ID_B: u32 = 2;
}

This works fine, you can use the ID_* sub-IDs to differentiate the ping sources in either custom event source.

Now consider what happens if you want to create an event source that uses either or both of those. Where should an API user start their token sub-IDs?

I am making the assumption here that any token sub-IDs used by a custom source are implementation details. If that's the case, then a writer of a new source SourceThree that contains SourceOne plus another source would not know to start their sub-IDs at 3 instead of 1. If they don't, then they will collide: they only get the one token in register(), and so in process_events() there is no way to distinguish eg. SourceOne::ping_one_b from whichever SourceThree source has sub-ID 2.

Even if we don't make that assumption, there's still a problem. If SourceThree wants to combine both SourceOne and SourceTwo, the sub-IDs will always collide, and SourceThree::process_events() will have no way to distinguish whether a given event was due to SourceOne::ping_one_a or SourceTwo::ping_two_a. So even if sub-IDs are considered part of the API of an event source, any given event source needs to know about every other kind of event source, even any created and published by 3rd parties, to avoid collisions.

Or have I missed something?

Consider retrying on EINTR

If you'll get EINTR during dispatch it could make sense to handle it internally in calloop and retry behind the scenes.

Possible soundness improvements for recent slotmap changes

I noticed the changes to use slotmap a while back, and the introduction of a few unsafe code blocks and functions. What mainly caught my eye were a couple of safety notices eg.

Safety

You must ensure that the *const Token pointer provided remains alive until your event source is re-registered or unregistered.

(I'll stress that what follows is my analysis, which could be mistaken, so if you're a user take it with a grain of salt until a maintainer of the project provides their feedback.)

The functions in question are marked unsafe, and so there's a "contract" with the caller that they Not Do The Thing you warn against. However, it's not a difficult thing to do, especially when using transient sources (I've certainly done it by mistake myself during development). If it does happen, it's a pretty dire consequence - epoll() now has callback data that's effectively a dangling pointer into arbitrary memory.

I think there's a way to change the nature of this, so that instead of a dangling pointer, the result is a leak of a file descriptor and token. Still sub-optimal, but not as bad and technically safe by Rust's contract. Basically it follows the "leak amplification" pattern for other unsafe code in the standard library. An upshot is that it also "pushes back" the unsafe boundary a bit further into the internals of Calloop.

It still uses the same token mechanics, but the initial, invalid token is boxed and converted to a raw pointer as early as possible (see generic.rs). The Poll type is now responsible for creating a new, valid token at the time of registration, and the source that uses it (probably Generic) is responsible for freeing/dropping it. This means that if it is not freed, it leaks, instead of the Generic freeing the Box when it's dropped but leaving the pointer in the event loop.

You can take a look at my changes in my slotmap-tweaks branch. They are definitely not finished (as in, they don't even compile yet), because I haven't quite sorted out the type changes required to avoid a leak in the unregister() process. If you agree with my analysis, I can open an initial PR and you can review it more fully. If not, I'm happy to discuss it, since I'm certainly not an expert in Rust soundness and safety.

Not all tasks scheduled with `Executor` are run

Not sure exactly what's going on, but some code I have trying to use calloops Executor doesn't always end up running the scheduled tasks. Adding a print to the start of the async { block shows it never starts. Using a different executor doesn't have this problem.

Here's a minimal case where I see an issue like this. It's not really reflective of exactly what I was doing where I ran into this, but it's an interesting case, where "Baz" surely should be printed, but isn't:

fn main() {
    let (executor, scheduler) = calloop::futures::executor().unwrap();
    let scheduler_clone = scheduler.clone();
    scheduler.schedule(async move {
        println!("Foo");
        scheduler_clone.schedule(async {
            println!("Bar");
        });
    });
    let mut event_loop =  calloop::EventLoop::try_new().unwrap();
    event_loop.handle().insert_source(executor, |_: (), _, _| {}).unwrap();
    event_loop.run(None::<std::time::Duration>, &mut (), |_| {
        scheduler.schedule(async {
            println!("Baz");
        });
    });
}

Update dependencies

Making this issue to not forget to bump all relevant dependencies when making the 0.12 release

Build failure on macOS Big Sur

When I try to build calloop (as a dependency of client-toolkit) on macOS Big Sur 10.1, I get the following error messages:

MacBook :: Desktop/client-toolkit » cargo check
    Checking calloop v0.6.5
error[E0432]: unresolved import `nix::unistd::pipe2`
  --> /Users/cadenhaustein/.cargo/registry/src/github.com-1ecc6299db9ec823/calloop-0.6.5/src/sources/ping.rs:18:21
   |
18 |     unistd::{close, pipe2, read, write},
   |                     ^^^^^
   |                     |
   |                     no `pipe2` in `unistd`
   |                     help: a similar name exists in the module: `pipe`

error[E0433]: failed to resolve: use of undeclared type `Poller`
   --> /Users/cadenhaustein/.cargo/registry/src/github.com-1ecc6299db9ec823/calloop-0.6.5/src/sys/mod.rs:172:21
    |
172 |             poller: Poller::new()?,
    |                     ^^^^^^ use of undeclared type `Poller`

error[E0412]: cannot find type `Poller` in this scope
   --> /Users/cadenhaustein/.cargo/registry/src/github.com-1ecc6299db9ec823/calloop-0.6.5/src/sys/mod.rs:166:13
    |
166 |     poller: Poller,
    |             ^^^^^^ not found in this scope

error: aborting due to 3 previous errors

Some errors have detailed explanations: E0412, E0432, E0433.
For more information about an error, try `rustc --explain E0412`.
error: could not compile `calloop`

Calloop cannot be shared between objects safely

Hi,
I was experimenting with calloop to run a task periodically, for that I added a method with to eventloop and execute it after sometime, I wrapped the eventloop inside tokio multi-threaded code, but getting an error
Rc<calloop::loop_logic::LoopInner<'a, ()>> cannot be shared between threads safely. It would be very helpful, if someone could provide how to make calloop object be shared across threads safely
The multi threaded tokio code looks like
`tokio::spawn(async move {
loop {
tokio::time::sleep(idle_duration).await;

    let sem = semaphore.clone().acquire_owned().await;
    tokio::task::spawn_blocking(move || {
        // Eventloop operations
        drop(sem);
    }).await.unwrap();
}

});`

Storing registration token in structure

Hi,
I want to store the RegistrationToken returned by insert_source in a structure as below example. What is the right way to initialize the token variable to a default value..

struct Test {
    token : RegistrationToken,
}

impl Test{
    new() ->Test {
       return Test {
           token : // how to initialize to dummy/default RegistrationToken value
       };
    }
}

Any help on this will be appreciated..

Add self-repeating timers

In some contexts, there is use for cases like "start a timer for XX seconds and then repeat it every YY seconds until cancelled" (for example keyboard repeat in Wayland), we could provide an API for that for convenience.

Advice on Metadata in EventSource of Timer

impl EventSource for Timer {
type Event = Instant;
type Metadata = ();
type Ret = TimeoutAction;
type Error = std::io::Error;

... ...
}

Because Metadata of Timer is (), specific data cannot be passed in the callback function of Timer.
Why not add a generic to Timer to support specific Metadata?
Thanks.

Use own error type for API

One of the little-but-common annoyances I run into with Calloop is that eg. process_events() returns a std::io::Result, but std::io::Error is just about the only Rust error type in existence that doesn't have a single-argument constructor. So I always have to tack on .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)). This is compounded by the fact that you can use any type you like for EventSource's associated Ret, which is usually going to be a Result<T, E> where E: std::error::Error rather than std::io::Error (precisely because std::io::Error is a pain to use in your own API and arguably is not intended to be).

Should Calloop have its own error (and corresponding result) for process_events() (and possibly other trait methods on EventSource? It would be possible to have From implementations to ensure that ? still works. Then (a) using ? on std::io::Results from fd operations would still be fine, and (b) so would using ? on core::result::Result or anyhow::Result or errors from thiserror etc.

I would be happy to make such a change, but it is potentially a breaking one so I wanted to see if it's something you'd be open to.

Clarifications on how the Event Loop works and how to avoid blocking the event loop.

I am new to concurrent programming. I read the Calloop book and I am struggling to understand how this enables concurrency.

From the documentation of the run method it seems like it will wait for timeout seconds for a new event. The moment we get a new event it dispatches the associated callback. I have a few questions

  1. Will I get no new events when the callback is being executed?
  2. What happens if we wait for timeout seconds and there is no new event.
  3. Maybe some extreme examples would help. What's the difference between setting timeout to zero vs setting it to None?

Here is a potential scenario:
If I have potentially blocking code in a callback such as reading a huge file, what should I do? Let's say I have an event that indicates that a large file is ready to be read. We will dispatch an associated callback for it. In the callback I want to read the file. But that's a blocking operation now. How would I do this? Will I need to spawn a thread to do this operation so that I can immediately return from the callback? If so it seems like any parallelism/concurrency comes from other code and not calloop itself?

I know this library was written with Smithay in mind. But a full concrete example would be helpful. The ZeroMQ example in the book is good for understanding event sources, but I am still struggling to understand the concurrency aspect. Maybe add to that example with a full main function that also handles some unix signals etc. and some actual processing of the messages: request/response etc.

Double borrows when registering sources and idle callbacks

Inside of pre_run and post_run, registering an event source will cause a double borrow.

This is also possible inside of the callback provided to insert_idle since registering an idle callback inside of an idle callback will double borrow.

I reproduced the double borrow with the following code:

use calloop::{
    ping::PingSource,
    EventLoop, EventSource, LoopHandle, Poll, PostAction, Readiness, Token, TokenFactory,
};

struct State {
    loop_handle: LoopHandle<'static, Self>,
}

#[test]
fn insert_in_dispatch() {
    let mut event_loop = EventLoop::try_new().unwrap();
    let handle = event_loop.handle();

    let mut state = State {
        loop_handle: handle.clone(),
    };

    let timer = OneshotRemove::new();

    handle.insert_source(timer, handle_source).unwrap();

    event_loop.run(None, &mut state, |_| {}).unwrap();
}

fn handle_source(_: (), _: &mut (), state: &mut State) {
    state
        .loop_handle
        .insert_source(OneshotRemove::new(), handle_source)
        .unwrap();
}

struct OneshotRemove {
    source: PingSource,
}

impl OneshotRemove {
    pub fn new() -> Self {
        let (ping, source) = calloop::ping::make_ping().unwrap();
        let source = Self { source };

        // Immediately ping the source to dispatch on register
        ping.ping();
        source
    }
}

impl EventSource for OneshotRemove {
    type Event = ();
    type Metadata = ();
    type Ret = ();
    type Error = calloop::Error;

    fn pre_run<F>(&mut self, mut callback: F) -> calloop::Result<()>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        callback((), &mut ());
        Ok(())
    }

    fn process_events<F>(
        &mut self,
        readiness: Readiness,
        token: Token,
        _callback: F,
    ) -> Result<PostAction, Self::Error>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        self.source
            .process_events(readiness, token, |_, _| {})
            .unwrap();

        Ok(PostAction::Remove)
    }

    fn register(
        &mut self,
        poll: &mut Poll,
        token_factory: &mut TokenFactory,
    ) -> calloop::Result<()> {
        self.source.register(poll, token_factory)
    }

    fn reregister(
        &mut self,
        poll: &mut Poll,
        token_factory: &mut TokenFactory,
    ) -> calloop::Result<()> {
        self.source.reregister(poll, token_factory)
    }

    fn unregister(&mut self, poll: &mut Poll) -> calloop::Result<()> {
        self.source.unregister(poll)
    }
}

The following backtrace is produced:
https://gist.github.com/i509VCB/0b27d3a5eb57ae7594243cd5b4833cf7

generic::Fd wrapper is not necessary from Rust 1.48

As of Rust 1.48, RawFd implements AsRawFd. This seems to be the only purpose served by calloop::generic::Fd (and Generic::from_fd()), so it could be removed. (It's a bit confusing having it there with no knowledge of why.)

As mentioned on Matrix, this is probably not a good reason on its own to bump the MSRV from 1.41 to 1.48. But if it ever gets bumped for other reasons, this issue can remind you that it can be removed.

(I don't know if you do deprecation for your APIs. If so, maybe that or a note in the docs would be good? Happy to do PR for either.)

insert_source_no_interest fails if stdin is redirected

I ran into this while trying to package calloop for Debian. The automated test environment runs with stdin redirected from /dev/null, causing the test to fail.

I added the following diff to shed light on the specific error:

diff --git i/src/loop_logic.rs w/src/loop_logic.rs
index bb3a6d3..49e53f1 100644
--- i/src/loop_logic.rs
+++ w/src/loop_logic.rs
@@ -617,7 +617,7 @@ mod tests {
             crate::sources::generic::Generic::from_fd(0, Interest::EMPTY, Mode::Level),
             |_, _, _| Ok(PostAction::Continue),
         );
-        assert!(ret.is_ok());
+        assert!(ret.is_ok(), "{:?}", ret);
     }
 
     #[test]
$ cargo test -- no_interest </dev/null
    Finished test [unoptimized + debuginfo] target(s) in 0.02s
     Running unittests (target/debug/deps/calloop-035331c4b39aed70)

running 1 test
test loop_logic::tests::insert_source_no_interest ... FAILED

failures:

---- loop_logic::tests::insert_source_no_interest stdout ----
thread 'loop_logic::tests::insert_source_no_interest' panicked at 'Err(Os { code: 1, kind: PermissionDenied, message: "Operation not permitted" })', src/loop_logic.rs:620:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    loop_logic::tests::insert_source_no_interest

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 26 filtered out; finished in 0.00s

Remove the thiserror dependency

It shouldn't be that hard, and it would remove some of the proc-macro crates (syn, quote) from the dependencies of winit.

Enforce IO safety

It is possible to drop a file descriptor registered in the Poll before it's deregistered, which violates I/O safety.

Unstable feature is used when checking coverage in upstream crate

We have recently introduced a transitive dependency on calloop for one of our crates we test coverage for.

After introducing this dependency we cannot use the stable compiler to test for coverage due to the error message.

error[E0554]: `#![feature]` may not be used on the stable release channel
   --> /root/.cargo/registry/src/github.com-1ecc6299db9ec823/calloop-0.10.3/src/lib.rs:146:23
    |
146 | #![cfg_attr(coverage, feature(no_coverage))]
    |                       ^^^^^^^^^^^^^^^^^^^^

I assume this feature exists for testing the coverage of calloop itself, and there are not much reason to use it when calloop is used as a dependency?

Would it be acceptable to make a similar solution where features are not enabled for upstream crates? For instance by creating a feature on the crate itself?

Windows support

This is likely a giant undertaking, but this crate looks amazing. But I can't lock my ui library into not working on Windows. I use Linux daily but you still have to use libraries that work on every platform.

Rethinking calloop

Over the last few days it appeared that mio's behavior, and notably its focus on edge-triggered events, is an undesirable property. And as such, it may be relevant to drop it from being a calloop dependency. That would also mean focusing calloop on unix platforms, which I'm pretty confident it is only used on anyway.

On this process, I'm considering re-shaping calloop to, at the same time, fix the long-stating issue of ownership weirdness regarding the event sources. This issue is a design draft to gather feedback.


Internally, calloop would contain a small abstraction around epoll and the appropriate equivalent for other platforms. This is a mostly-internal API, and only relevant for users that are going to implement their own event sources.

pub struct Poll { /* ... */ }

impl Poll {
    /// Register a new FD to the poll structure
    pub fn register(&mut self,
        fd: RawFd, /* the FD to register for polling */
        interest: Interest, /* Read / Write / Other... bitflag */
        mode: PollMode, /* oneshot, level, edge */
        token, Token /* opaque token to recognize the source */
    -> io::Result<()> {
        /* ... */
    }

    /// Update an FD registration
    pub fn update_registration(&mut self,
        fd: RawFd, i: Interest, m: PollMode, t: Token
    ) -> io::Result<()> {
        /* ... */
    }

    /// Remove a redigstration
    pub fn unregister(&mut self, fd: RawFD) -> io::Result<()> {
        /* ... */
    }
}

Now, comes the central trait of the crate: Source, representing an event source:

pub trait Source {
    /// The type of events this event source generates
    type Event;
    /// The return type expected from the callback closure, if the user
    /// needs to communicate back to the source.
    /// Set to () is not needed.
    type Ret;

    /// Process readiness of the source and translate it as events to the
    /// provided closure
    fn process_events<F>(&mut self, readiness: Readiness, callback: F) -> io::Result<()>
        where F: FnMut(Self::Event) -> Self::Ret;

    /// Make the source register itself to the `Poll`
    pub fn register(&mut self, &mut Poll, token: Token) -> io::Result<()> {
        /* ... */
    }

    /// Make the source update its registration
    pub fn update_registration(&mut self, &mut Poll, token: Token) -> io::Result<()> {
        /* ... */
    }

    /// Unregister the source
    pub fn unregister(&mut self, &mut Poll) -> io::Result<()> {
        /* ... */
    }
}

From this, the calloop user, to insert a new event source in the loop, needs to provide an implementation of the Source trait and a closure with matching signature. The loop takes ownership of both and provide a SourceToken<S> in exchange.

This SourceToken<S> can be usedto remove the source from the EventLoop. Requires the same access as to insert a new source (so likely a LoopToken), the user is given the source S back. The closure is dropped.

Dropping the SourceToken<S> does nothing.


Remaining questions:

  • Is it worth introducing a Waker mechanism to remotely wake-up the event loop?

I'm thinking a special Source that just generates () events and can be remotely made ready should do that well, without requiring a specific Waker mechanism.

  • This API draft allows a source to register more than one FD to the poll instance, however it does not allow the source to know which FD was made ready. Should we handle that?

A possibility would be to introduce a notion of sub-token (by splitting the u64 token into two u32s for example). An other would be to provide a Multiplexer source, which can aggregate several event sources by internally creating its own epoll instance and nesting them.

  • Is it worth giving remote control of the source via the SourceToken? By allowing it to distantly disarm/rearm an event source for example, or destroy it (so that it is dropped) ?

Enhancement: PostAction combinator

In #78 I mentioned a possible "combinator" for Calloop's PostAction enum, to make managing child sources a bit smoother. It's possible this is not as necessary now that TransientSource is a part of the library, but I figured I'd propose something to gauge interest and flesh out the details. I keep calling it a combinator, but you suggested bit-or (|) in that thread. I'm only keeping it open in case it turns out to not quite work the way we expect.

Notes:

  • This is for combining PostActions from child sources at the same level, not for combining a child action with the parent's
  • I'm assuming it's symmetric, I think it would be quite surprising if it weren't (hence the empty lower left side)
  • the parent source might want to do something different, this operator is mainly for "if I had no logic apart from capturing events from my child sources, what would I need to return to be correct"

Here's a "truth" table for PostAction:

op Continue Reregister Disable Remove
Continue Continue Reregister Reregister Reregister
Reregister Reregister Reregister Reregister
Disable Disable?
Remove

The reason I'm unsure about the diagonals is because of that last bullet point above. If a parent has several child sources and they ALL want to be disabled, should the operator return Disabled and let the parent source decide whether it, itself, should be disabled or reregistered? Does the disabled status need to "propagate" downwards through the tree?

Need clarification on why `Generic::get_mut` is unsafe

I understand that dropping the underlying handle of a Generic source would lead to undefined behavior. But is it even possible to do that without requiring some unsafe code (besides the call to get_mut itself)? For example, File and OwnedFd don't have a close method, or anything equivalent as far as I can tell. An example illustrating why get_mut itself has to be unsafe, that is, how one would be able to drop the underlying handle without any other unsafe code, would be helpful.

support for nix 0.27

Hi.

I am one of the Debian rust team, and we are currently investigating upgrading the rust-nix crate to 0.27. While we do have a mechnism for packaging multiple versions of a crate it is something we try to avoid where possible. So I started going through the reverse dependencies seeing what it would take to update them to nix 0.27. In most cases this just means some tweaks to data types.

However, I ran into a brick wall with calloop. Taking the latest git version of calloop, bumping the nix dependency to 0.27 and running "cargo test --all --all-targets --features block_on,executor,signals" results in

Running thread 'main' panicked at 'assertion failed: (left == right)
left: None,
right: Some(SIGUSR2)', tests/signals.rs:88:9
note: run with RUST_BACKTRACE=1 environment variable to display

I have no idea where to start debugging this.

Cancelling an ealier timeout prevents later ones from firing up

I'm trying to reset a timeout by cancelling it first and then adding a new one but the new timeout then never fires up.
It seems to me that cancelling any earlier timeout prevents a later one from firing as is seen below.
I expected the following example to print "timeout" first but it only prints "dispatched":

use calloop::timer::Timer;
use calloop::EventLoop;
use std::time::Duration;

pub fn main() {
        let mut event_loop: EventLoop<()> = EventLoop::try_new().unwrap();
        let handle = event_loop.handle();

        let timer_source = Timer::new().unwrap();
        let timers = timer_source.handle();

        handle.insert_source(timer_source, |_, _, _| println!("timeout")).unwrap();
        timers.add_timeout(Duration::new(3, 0), ());

        let sooner = timers.add_timeout(Duration::new(2, 0), ());
        timers.cancel_timeout(&sooner);

        event_loop.run(None, &mut (), |_| println!("dispatched")).unwrap();
}

Allow event source to mark itself as being ready from the `pre_run` or similar

Some event sources could be read be read outside of the calloop reading.

An example of such source could be a WaylandSource. The wayland display could be read
by other libraries, like mesa, to check that you have events, the wl_display_prepare_read
is used. Such issue created a workaround inside the winit library.

One possible solution would be to add a way for event source to set the indicator about instant
wake ups, so calloop when trying to poll on them again will do instant wakeup instead and call the
event/read part.

cc @rib

System specific implementation for Ping event sources

Currently Ping is implemented using a pipe, which is a generic unix way of implementing it.

However some platforms provide other primitives that could be used more efficiently for this, such as eventfd on Linux.

"Transient" source wrapper (includes channel CPU spike issue)

I accidentally triggered a 100% CPU spike in my code, and managed to reduce it to this situation:

use calloop::{
    channel::{channel, Channel},
    signals::{Signal, Signals},
    EventLoop, EventSource, LoopSignal, PostAction,
};

struct HasChannel(Channel<()>);

impl HasChannel {
    fn new() -> Self {
        Self(channel().1)
    }
}

impl EventSource for HasChannel {
    type Event = calloop::channel::Event<()>;
    type Metadata = ();
    type Ret = ();

    fn process_events<F>(
        &mut self,
        readiness: calloop::Readiness,
        token: calloop::Token,
        callback: F,
    ) -> std::io::Result<calloop::PostAction>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        self.0.process_events(readiness, token, callback)?;
        Ok(PostAction::Continue)
    }

    fn register(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> std::io::Result<()> {
        self.0.register(poll, token_factory)
    }

    fn reregister(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> std::io::Result<()> {
        self.0.reregister(poll, token_factory)
    }

    fn unregister(&mut self, poll: &mut calloop::Poll) -> std::io::Result<()> {
        self.0.unregister(poll)
    }
}

fn main() -> std::io::Result<()> {
    // Event loop initialisation.
    let mut event_loop: EventLoop<LoopSignal> = EventLoop::try_new()?;

    const SIGNALS_TO_STOP_ON: &[Signal] = &[Signal::SIGINT, Signal::SIGTERM];

    let handle = event_loop.handle();

    let mut signaller = event_loop.get_signal();

    handle.insert_source(
        Signals::new(SIGNALS_TO_STOP_ON)?,
        |_, _metadata, signaller| signaller.stop(),
    )?;

    handle.insert_source(HasChannel::new(), |_, _, _| {})?;

    event_loop.run(None, &mut signaller, |_| {})?;

    Ok(())
}

(This can run as a main.rs with only calloop = "0.9.3" as a dependency.)

Running this you should see the CPU hit 100% on one core. It's arguably not a bug! What's happening is that the channel sending end is dropped, so the process_events() method on the channel returns PostAction::Remove. The process_events() method on HasChannel erroneously ignored this return value; what it should do is check for it, and return a PostAction::Reregister instead of PostAction::Continue. Then in the reregister() method, it should unregister the channel while keeping itself registered (if there's any reason to).

You could make a case that the channel should be coded to try to avoid spiking the CPU like this even if it's not unregistered, but I couldn't see an easy way to do that (I didn't look very hard though).

On issue #77 you mentioned maybe having a way for such sources to remove themselves from the loop. The problem is that "top level" sources can be removed by the code in loop_logic itself, but "child" sources can't. Their PostAction return value is only seen by another event source, not the loop logic itself. There are other strategies though. Here's a summary of what I can think of, maybe you have other ideas:

  1. A wrapper type. I actually have one that I could either add to Calloop itself or publish as a crate. It's called TransientSource, the API and struct looks like this:

    //! Wrapper for a transient Calloop event source. If you have high level event
    //! source that you expect to remain in the event loop indefinitely, and another
    //! event source nested inside that one that you expect to require removal or
    //! disabling from time to time, this module can handle it for you.
    
    /// A [`TransientSource`] wraps a Calloop event source and manages its
    /// registration. A user of this type only needs to perform the usual Calloop
    /// calls (`process_events()` and `*register()`) and the return value of
    /// [`process_events()`].
    ///
    /// Rather than needing to check for the full set of [`calloop::PostAction`]
    /// values returned from [`process_events()`], you can just check for `Continue`
    /// or `Reregister` and pass that back out through your own `process_events()`
    /// implementation. In your registration functions, you then only need to call
    /// the same function on this type ie. `register()` inside `register()` etc.
    pub enum TransientSource<T> {
        Keep(Box<T>),
        Register(Box<T>),
        Disable(Box<T>),
        Remove(Box<T>),
        None,
    }

    Then in my HasChannel source you'd just have TransientSource<Channel> instead of Channel. If you like that idea I can open a PR with it.

  2. This works even better if you have a simple combinator/operator for the PostAction values so that eg. PostAction::Continue ⊕ PostAction::Remove => PostAction::Reregister or something like that. I haven't fully thought through the truth table/edge cases for such an operator, so you'd need to review it carefully.

  3. Alternatively to 1&2, you could have a kind of shim layer in between parent and child event sources, such that a parent event source would call this shim code, which would then call process_events() on the child and handle the return value, removing it if necessary. This gets complicated though, because you'd then also want to handle the (re|un)register() methods, and the lifetimes might get tricky.

  4. Ditch that altogether, flatten the recursive composition structure, and have a less direct way for event sources to have child sources. Maybe based on the tokens that already exist? This is the option I've thought about the least, TBH.

Let me know if you have other thoughts, because I basically picked the TransientSource approach and ran with it due to time constraints. It works well though!

API suggestion: clone sender vs. take sender vs. send

There are a few inconsistencies in Calloop's higher-level event sources, and even though they are extremely minor, I thought I'd make the suggestion since I've coded up an alternative for the event sources I've made for ZeroMQ and USB.

Take for example:

  • Ping has make_ping() to construct, which returns a (sender, source) pair. Using it requires calling ping() on the sender.
  • Channel has channel() (not make_channel()!) which returns a (sender, source) pair. Using it requires calling send() on the sender.
  • Timer has Timer::new(). Using it requires getting a handle from the Timer itself.

Both Ping and Channel have handles that close on drop. Timer does not.

This all quickly becomes apparent if you have a composite event source that uses multiple kinds of these, and kind of unwieldy at times. For example, if your composite source has both a ping and a channel for internal reasons, you need four fields to use them.

Here is an API we stabilised on that kind of gives the best of both worlds:

/// This event source also allows you to use different event sources to publish
/// messages over the same writeable ZeroMQ socket (usually PUB or PUSH).
/// Messages should be sent over the Calloop MPSC channel sending end. This end
/// can be cloned and used by multiple senders. Common use cases are:
///
/// - One sender: just use `send()`. Useful for any scoket kind except `PULL` or
///   `SUB`.
/// - Multiple senders: clone the sending end of the channel with
///   `clone_sender()`. Useful for `PUSH`, `PUB`, `DEALER`, but probably not at
///   all useful for `REQ`/`REP`.
/// - No senders: take the sending end of the channel with `take_sender()` and
///   drop it. Useful for `SUB` and `PULL`.

pub struct ZeroMQSource<T> {
    /// Sending end of channel.
    mpsc_sender: Option<calloop::channel::Sender<T>>,
    // ...
}

impl<T> ZeroMQSource<T> {
    // Send a message via the ZeroMQ socket. If the sending end has been
    // taken then this will return an error (as well as for any other error
    // condition on a still-existing channel sending end).
    pub fn send(&self, msg: T) -> Result<(), std::sync::mpsc::SendError<T>> {
        if let Some(sender) = &self.mpsc_sender {
            sender.send(msg)
        } else {
            Err(std::sync::mpsc::SendError(msg))
        }
    }

    // Clone the channel sending end from this ZeroMQ source. Returns [`None`]
    // if it's already been removed via [`take()`], otherwise the sender can be
    // safely cloned more and sent between threads.
    pub fn clone_sender(&self) -> Option<calloop::channel::Sender<T>> {
        self.mpsc_sender.clone()
    }

    // Remove the channel sending end from this ZeroMQ source and return it. If
    // you do not want to use it, you can drop it and the receiving end will be
    // disposed of too. This is useful for ZeroMQ sockets that are only for
    // incoming messages.
    pub fn take_sender(&mut self) -> Option<calloop::channel::Sender<T>> {
        self.mpsc_sender.take()
    }

Disadvantages:

  • more complicated
  • new API
  • extra checks in methods

Advantages:

  • if the source are used internally, no extra senders need to be kept in fields, you can just call self.pinger.ping()
  • if you want to close a channel you can just call self.channel.take_sender() instead of needing to keep (a) the sender and (b) an option wrapper
  • reflects API of std type Option (send/clone_sender/take_sender) == (map/clone/take)
  • constructor is more familar source::Source::new() -> Result<Source> instead of source::make_source() -> Result<(sender, source)>
  • can be made backwards compatible easily eg. make_source() just becomes let source = Source::new(); (source.take_sender(), source)

Let me know what you think, and if you're interested I'll code something up for the existing types that have sending handles.

bug: epoll module is not imported if on #[cfg(target_os = "android")]

Hii ^^

Just found this lovely problem todayy on x86_64-linux-android:

error[E0433]: failed to resolve: use of undeclared type `Poller`
   --> /var/home/fortressia/.cargo/registry/src/github.com-1ecc6299db9ec823/calloop-0.10.0/src/sys/mod.rs:192:21
    |
192 |             poller: Poller::new(high_precision)?,
    |                     ^^^^^^ use of undeclared type `Poller`

error[E0412]: cannot find type `Poller` in this scope
   --> /var/home/fortressia/.cargo/registry/src/github.com-1ecc6299db9ec823/calloop-0.10.0/src/sys/mod.rs:163:13
    |
163 |     poller: Poller,
    |             ^^^^^^ not found in this scope

From the source code it seems Poller is imported if the operating system is Linux. In Rust's cfg implementation, Linux and Android is differentiated I think, sooo if callop is running on Android, it'll not pass #[cfg(target_os = "linux")], even though it should have been able to use since the kernel is based on Linux! <3

#[cfg(target_os = "linux")]
mod epoll;
#[cfg(target_os = "linux")]
use epoll::Epoll as Poller;

API: TransientSource requires the loop to run again before the source can be replaced

Even though I wrote the TransientSource wrapper, I've hit a bit of a API/design problem that I could use some help with.

I'll use a Timer as an example, but this is not specifically a problem with timers, I have the same problem with custom sources that fire a finite number of times, expire and need to be replaced.

Let's say I have a source with a timer, but I don't want to start the timer unless something else happens. In the struct I might have:

struct TimedThing {
    is_sending: bool,
    // Only runs if is_sending is true
    timer: calloop::timer::Timer,
}

My constructor then has this problem:

impl TimedThing {
    fn new() -> Self {
        Self {
            is_sending: false,
            timer: todo!("?????"),
        }
    }
}

This is more or less what TransientSource is designed for, so I try:

struct TimedThing {
    is_sending: bool,
    // Only runs if is_sending is true
    timer: calloop::transient::TransientSource<calloop::timer::Timer>,
}
impl TimedThing {
    fn new() -> Self {
        Self {
            is_sending: false,
            timer: calloop::transient::TransientSource::None,
        }
    }
}

...and then at some point call self.timer = Timer::from_duration(...); when is_sending becomes true. Except that this only works if the timer does not already exist. If it does exist, this will leak an epoll entry whatever underlying thing keeps track of registration (wheel entry for Timers, epoll entry for fd-based sources), because the previous source will be dropped without being unregistered.

That's the problem with the TransientSource design that I'm trying to solve ie. there's no way to unregister a source without waiting for another iteration of the event loop.

Here's an attempt to work around it (pseudo-ish code):

fn process_events(...) {
    if (turn_off_sending) {
        self.is_sending = false;
        self.timer = TransientSource::None;
        // Oh no! Even if we return PostAction::Reregister, self.timer
        // no longer owns the timer we just replaced! It has no way to
        // unregister it now. If this were an event source that generated
        // continuous events
    }
}

Let's try being more diligent. The first thing that's apparent is that TransientSource needs to have a cancel() or remove() method - otherwise we're dependent on the source itself having another event fire, so we can return PostAction::Remove from that, so that the TransientSource "knows" it's child has asked to be unregistered. In the past this was fine, because all the sources I used it with would eventually, definitely, return Remove. Timer does not do that, so note the workaround for that below. (Again, this is pseudo code!)

#[derive(Copy, PartialEq)]
enum CleanupPhase {
    Leave, // <- start in this state
    UnregisterTimer,
    DropTimer,
}

struct TimedThing {
    is_sending: bool,
    // Only runs if is_sending is true
    timer: calloop::transient::TransientSource<calloop::timer::Timer>,
    cleanup: CleanupPhase,
    cleanup_rx: PingSource,
    cleanup_tx: Ping,
}

fn process_events(...) {
    let mut cleanup = CleanupPhase::Leave;

    if (turn_off_sending) {
        self.is_sending = false;
        self.timer.map(|t| {
            // 't' is the Timer itself. We need to force it to
            // generate another event so that we can make it return
            // Remove to TransientSource.
            t.set_deadline(std::time::Instant::now());
            // Now we need to keep track of whether we need another
            // loop, because the timer won't be unregistered until
            // after this method returns!
            cleanup = CleanupPhase::UnregisterTimer;
        }
    }

    // This won't happen until the next call to process_events()!

    self.timer.process_events(..., |...| {
        if (self.cleanup == CleanupPhase::UnregisterTimer) {
            cleanup = CleanupPhase::DropTimer;
            self.cleanup_tx.ping();
            TimeoutAction::Drop
        } else {
            // usual timer code
        }
    })?;

    // We STILL can't drop the timer. TransientSource has it in
    // the Remove variant and will have unregister() called after
    // this method exits. So we need an extra ping source to
    // wake the loop up a third time!

    // The state has to be part of our struct too, so that it
    // survives this function returning and firing again.
    self.cleanup = cleanup;

    // The TransientSource has now unregistered the Timer!
    if self.cleanup == CleanupPhase::DropTimer {
        self.timer = TransientSource::None;
        self.cleanup = CleanupPhase::Leave;
    }

    // Drain ping source, do other things, etc.
}

Yikes.

I have come up with some tentative options (not mutually exclusive) but there are some things I can't currently think of a solution to.

  • Add a cancel() method to TransientSource that triggers the reregistration of the wrapper and unregistration of the nested source. This still requires another go-around of the event loop.
  • Add a replace() or set() method to TransientSource so that the nested sources can be replaced without needing to drop and assign in the parent-of-TransientSource source.
  • Add a "trash can" field to TransientSource where a source set for removal can be stashed when the above method is called.

Maybe there's something else I'm missing here, so please think about it and let me know if you have other ideas. Fundamentally I'm stuck on the fact that you can't complete the unregistiration of a source inside process_events() (easily, anyway). But I can't see a way to change that, and I'm not even sure that you should, because it's quite a change to Calloop's design.

Is there a way for a source to remove itself within its own process_events()?

I have recently found the need to have event sources remove themselves from the loop after they complete a task. Mostly this is with timers and subprocesses (via futures), but it can be demonstrated quite simply with an example. Here's an event source that starts a timer when it's added to the loop, and when the timer expires, prints a message and never runs again.

use calloop::{EventLoop, EventSource, Poll, Readiness, Token};

struct ThingOne {
    timer: calloop::timer::Timer<()>,
    done: bool,
}

impl ThingOne {
    fn new() -> Self {
        Self {
            timer: calloop::timer::Timer::new().unwrap(),
            done: false,
        }
    }
}

impl EventSource for ThingOne {
    type Event = ();
    type Metadata = ();
    type Ret = ();

    fn process_events<F>(&mut self, readiness: Readiness, token: Token, _: F) -> std::io::Result<()>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        if !self.done {
            let done = &mut self.done;
            self.timer.process_events(readiness, token, |_, _| {
                println!("Done!");
                *done = true;
            })?;
        }

        Ok(())
    }

    fn register(&mut self, poll: &mut Poll, token: Token) -> std::io::Result<()> {
        let hdl = self.timer.handle();
        hdl.add_timeout(std::time::Duration::from_secs(1), ());
        self.timer.register(poll, token)
    }

    fn reregister(&mut self, poll: &mut Poll, token: Token) -> std::io::Result<()> {
        let hdl = self.timer.handle();
        hdl.add_timeout(std::time::Duration::from_secs(1), ());
        self.timer.reregister(poll, token)
    }

    fn unregister(&mut self, poll: &mut Poll) -> std::io::Result<()> {
        self.timer.unregister(poll)
    }
}

fn main() {
    let mut event_loop = EventLoop::try_new().unwrap();

    let handle = event_loop.handle();

    let _loop_token = handle.insert_source(ThingOne::new(), |_, _, _| {}).unwrap();

    event_loop
        .run(None, &mut (), |_| {})
        .expect("Error during event loop!");
}

It would be good if I could actually have ThingOne remove itself from the loop when it's done. In this case, it's not a big deal. But when you're dealing with open file descriptors and sockets, it's good to have cleanup. But I can't figure out how.

  • I can't remove it from inside the process_events() method, because I don't have the registration token or the poll argument for self.unregister().
  • I can't remove it via main(), because by the time I have the registration token, the loop has taken ownership of the source.

Is there a way to do this that I'm missing?

Non blocking event loop with calloop

Hi,
I want to add functions to event loop from different parts of the code and want the execution of code to continue without any blocking. Is there a way to use calloop (dispatch method blocks till the execution of callback) so that it's not blocking the further execution of code. It'll be very helpful if some example code is provided with the functionality.

epoll timeout rounding logic is broken on 0.10.x

This is faulty:

calloop/src/sys/epoll.rs

Lines 83 to 85 in e404c80

// add 1 to the millisecond wait, to round up for timer tracking. If the high precision timer is set up
// it'll fire before that timeout
let timeout = timeout.map(|d| (d.as_millis() + 1) as isize).unwrap_or(-1);

Unconditionally adding 1ms to round up is wrong, the 1 should not be added if the duration is already an exact number of ms, especially if the provided duration is 0, which happens either when the user inputted 0 as a timeout, of if there is a timer that has already fired when the method is invoked.

Ownership issues with LoopHandle::insert_source

LoopHandle::insert_source takes ownership of E and moves it to the return value. This behavior is undocumented and might be confusing. A minimal example:

use calloop::EventLoop;
use calloop::signals::{Signal, Signals};
use std::process;

fn main() {
    println!("pid: {}", process::id());
    let mut event_loop = EventLoop::<()>::new().unwrap();
    let handle = event_loop.handle();
    let signals = Signals::new(&[Signal::SIGUSR1]).unwrap();
    handle.insert_source(signals, |_, _| {
        println!("SIGUSR1");
    }).unwrap();
    loop {
        event_loop.dispatch(None, &mut ());
    }
}

Expected behavior: Prints "SIGUSR1" upon receiving SIGUSR1.
Actual behavior: The process terminates at SIGUSR1. Since signals has been moved to the return value of insert_source and dropped, the signals are unmasked.

The fix here is to bind the return value of insert_source. I think this is rather counter-intuitive. Perhaps insert_source should borrow the source instead?

100% CPU after adding an executor to loop

I was trying to integrate async_process into calloop when I found my CPU usage would spike. I initially thought it was perhaps an incompatible combination, but then I reduced it to this example without async_process:

use calloop::futures::executor;
use calloop::signals::{Signal, Signals};
use calloop::{EventLoop, LoopHandle, LoopSignal};
use env_logger::Env;
use log::*;

fn add_future_to_loop<T>(handle: &LoopHandle<T>) {
    let (exec, _) = executor::<()>().unwrap();
    handle.insert_source(exec, |_, _, _| {}).unwrap();
}

fn main() {
    // Set up logging (trace level by default).
    env_logger::Builder::from_env(Env::default().default_filter_or("trace"))
        .format_timestamp(None)
        .init();

    // Event loop initialisation.
    let mut event_loop: EventLoop<LoopSignal> =
        EventLoop::try_new().expect("Could not initialise event loop");

    // Exit on interrupt (for Ctrl-C when running in console) and terminate (for
    // OpenWrt's service control).
    const SIGNALS_TO_STOP_ON: &[Signal] = &[Signal::SIGINT, Signal::SIGTERM];

    let handle = event_loop.handle();

    let mut signaller = event_loop.get_signal();

    handle
        .insert_source(
            Signals::new(SIGNALS_TO_STOP_ON).unwrap(),
            |event, _metadata, signaller| {
                info!("Signal received: {}", event.signal());
                signaller.stop();
            },
        )
        .unwrap();

    add_future_to_loop(&handle);

    event_loop
        .run(None, &mut signaller, |_| {})
        .expect("Could not run event loop");
}

Have I missed something in how to use this? I'm running this on Ubuntu 20.10 with rustc 1.49 via rustup. Calloop is v0.7.1.

Result<(), ()> (from Scheduler::schedule()) does not work well with other error handling

calloop::futures::Scheduler::schedule() returns a Result<(), ()>. Most approaches to error handling will not integrate easily with this because they are expecting the second type parameter of a Result to implement std::error::Error. For example, anyhow cannot easily wrap this result and needs an extra map_err() (or whatever) before it can even add context or extract a stack trace.

I notice that it's mostly due to the .map_err(|_| ()) tacked on to the send() call. Is there a reason not to either pass up the error from send() or wrap it in a "real" error type?

valgrind: possible memory leak

When running the example code given in the README.md with valgrind, a possible memory leak is encountered:

valgrind --leak-check=full target/debug/calloop_demo
==4237== Memcheck, a memory error detector
==4237== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==4237== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==4237== Command: target/debug/calloop_leak
==4237== 
Event fired: Timeout reached!
==4237== 
==4237== HEAP SUMMARY:
==4237==     in use at exit: 630 bytes in 11 blocks
==4237==   total heap usage: 35 allocs, 24 frees, 4,375 bytes allocated
==4237== 
==4237== 288 bytes in 1 blocks are possibly lost in loss record 11 of 11
==4237==    at 0x483DD99: calloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==4237==    by 0x40149CA: allocate_dtv (dl-tls.c:286)
==4237==    by 0x40149CA: _dl_allocate_tls (dl-tls.c:532)
==4237==    by 0x488B322: allocate_stack (allocatestack.c:622)
==4237==    by 0x488B322: pthread_create@@GLIBC_2.2.5 (pthread_create.c:660)
==4237==    by 0x1588C4: std::sys::unix::thread::Thread::new (thread.rs:85)
==4237==    by 0x12BA99: std::thread::Builder::spawn_unchecked (mod.rs:505)
==4237==    by 0x12C134: std::thread::Builder::spawn (mod.rs:388)
==4237==    by 0x13516C: calloop::sources::timer::TimerScheduler::new (timer.rs:300)
==4237==    by 0x117026: calloop::sources::timer::Timer<T>::new (timer.rs:36)
==4237==    by 0x122345: calloop_leak::main (main.rs:20)
==4237==    by 0x117F1A: core::ops::function::FnOnce::call_once (function.rs:227)
==4237==    by 0x114F0D: std::sys_common::backtrace::__rust_begin_short_backtrace (backtrace.rs:125)
==4237==    by 0x11E820: std::rt::lang_start::{{closure}} (rt.rs:63)
==4237== 
==4237== LEAK SUMMARY:
==4237==    definitely lost: 0 bytes in 0 blocks
==4237==    indirectly lost: 0 bytes in 0 blocks
==4237==      possibly lost: 288 bytes in 1 blocks
==4237==    still reachable: 342 bytes in 10 blocks
==4237==         suppressed: 0 bytes in 0 blocks
==4237== Reachable blocks (those to which a pointer was found) are not shown.
==4237== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==4237== 
==4237== For lists of detected and suppressed errors, rerun with: -s
==4237== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0

Used versions:

  • calloop version: 0.9.3
  • cargo version: cargo 1.56.0 (4ed5d137b 2021-10-04)
  • rust version: rustc 1.56.1 (59eed8a2a 2021-11-01)
  • operating system: 20.04.1-Ubuntu

Repeating future gets stuck forever

I'm writing a code that polls an API repeated times to observe the status of a background job. I realized my code starts polling normally, then at some time, the future does not complete anymore but keeps calling the run callback. I reduced the async code to use a simple sleep call to ilustrate

use std::{time::Duration};
use calloop::{EventLoop, futures::executor};

#[tokio::main]
async fn main() {
    let mut app = App::default();
    let mut event_loop = EventLoop::<App>::try_new().unwrap();
    let handle = event_loop.handle();
    let (executor, scheduler) = executor::<()>().unwrap();

    scheduler.schedule(async_operation()).unwrap();
    
    handle
        .insert_source(executor, |event, _metadata, data| {
            data.tick = Some(event);
        }).unwrap();

    event_loop
        .run(Duration::from_millis(1000), &mut app, |data| {
            if let Some(_) = data.tick.take() {
                data.counter += 1;
                scheduler.schedule(async_operation()).unwrap();
            }
            eprintln!("Counter: {}", data.counter);
        }).unwrap();
}

pub async fn async_operation() {
    tokio::time::sleep(Duration::from_millis(100)).await;
}

#[derive(Default)]
struct App {
    tick: Option<()>,
    counter: u64,
}

It prints the counter value each run call and when the sleep is done, it increments the counter. The same counter value is printed three of four times, normally, but when it reaches 128, it does not increments the counter anymore, and begins to call the run faster

Switching tokio for async_std it worked properly, am I missing something?

Struggling to integrate with socket-based code ⇒ API suggestions

I've been trying to integrate some (ZeroMQ) socket based code with Calloop, but I'm running up against lifetime errors around the same kind of pattern over and over again. Here's an example (without any ZeroMQ code at all):

use std::collections::VecDeque;

use calloop::generic::{Fd, Generic};
use calloop::signals::{Signal, Signals};
use calloop::{EventLoop, Interest, LoopHandle, LoopSignal, Mode, Readiness};

fn main() {
    // EventLoop and the handles it produces are parameterised by both a type
    // and lifetime. The lifetime is related to the lifetime of the stored
    // callbacks and sources. The definitions look like:
    //     pub struct EventLoop<'l, Data> {
    //         handle: LoopHandle<'l, Data>,
    //         ...
    let mut event_loop: EventLoop<LoopSignal> = EventLoop::try_new().unwrap();

    // Parameterised with same lifetime 'l as event_loop.
    let handle = event_loop.handle();

    // Exit on interrupt (for Ctrl-C when running in console) and terminate (for
    // OpenWrt's service control).
    const SIGNALS_TO_STOP_ON: &[Signal] = &[Signal::SIGINT, Signal::SIGTERM];

    let handle = event_loop.handle();

    handle
        .insert_source(
            Signals::new(SIGNALS_TO_STOP_ON).unwrap(),
            |event, _metadata, signaller: &mut LoopSignal| {
                println!("Signal received: {}", event.signal());
                signaller.stop();
            },
        )
        .unwrap();

    let mqueue = MessageQueue::new();
    mqueue.register(&handle);

    event_loop
        .run(None, &mut event_loop.get_signal(), |_| {})
        .expect("Could not run event loop");
}

struct Message<'f> {
    message: String,
    callback: Option<Box<dyn FnOnce() + 'f>>,
}

struct MessageQueue<'q> {
    queue: VecDeque<Message<'q>>,
}

impl<'q> MessageQueue<'q> {
    fn new() -> Self {
        MessageQueue {
            queue: VecDeque::new(),
        }
    }

    fn register<'l, D>(&self, handle: &LoopHandle<'l, D>) {
        let event_source = Generic::from_fd(0, Interest::READ, Mode::Edge);

        let callback = |_: Readiness, _: &mut Fd, _: &mut D| {
            self.on_socket_writeable();
            Ok(())
        };

        handle
            .insert_source(event_source, callback)
            .expect("Could not add control socket to event loop");
    }

    fn on_socket_writeable(&self) {
        while !self.queue.is_empty() {
            let _message = self.queue.pop_front().unwrap();
            // Real code sends message here.
        }
    }

    fn enqueue(&self, message: String) {
        self.queue.push_back(Message { message, callback: None });
        // Flush message queue manually.
        self.on_socket_writeable();
    }
}

Attempting to compile this results in the error:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/main.rs:61:24
   |
61 |           let callback = |_: Readiness, _: &mut Fd, _: &mut D| {
   |  ________________________^
62 | |             self.on_socket_writeable();
63 | |             Ok(())
64 | |         };
   | |_________^
   |
note: first, the lifetime cannot outlive the lifetime `'q` as defined on the impl at 51:6...
  --> src/main.rs:51:6
   |
51 | impl<'q> MessageQueue<'q> {
   |      ^^
note: ...so that the types are compatible
  --> src/main.rs:61:24
   |
61 |           let callback = |_: Readiness, _: &mut Fd, _: &mut D| {
   |  ________________________^
62 | |             self.on_socket_writeable();
63 | |             Ok(())
64 | |         };
   | |_________^
   = note: expected `(&&MessageQueue<'_>,)`
              found `(&&MessageQueue<'q>,)`
note: but, the lifetime must be valid for the lifetime `'l` as defined on the method body at 58:17...
  --> src/main.rs:58:17
   |
58 |     fn register<'l, D>(&self, handle: &LoopHandle<'l, D>) {
   |                 ^^
note: ...so that the types are compatible
  --> src/main.rs:67:14
   |
67 |             .insert_source(event_source, callback)
   |              ^^^^^^^^^^^^^
   = note: expected `&LoopHandle<'_, D>`
              found `&LoopHandle<'l, D>`

Basically, when the socket is writeable I want to send messages from the queue. However, I can't actually create the closure that does this because of lifetime issues that I don't quite understand. It doesn't seem like a circular problem on the face of it, since the MessageQueue that's captured by the closure in register() could happily outlive the event loop. Indeed, I tried to express this with:

    fn register<'l, D>(&self, handle: &LoopHandle<'l, D>)
    where 'q: 'l

But that results in:

error[E0623]: lifetime mismatch
  --> src/main.rs:70:14
   |
59 |     fn register<'l, D>(&self, handle: &LoopHandle<'l, D>)
   |                        -----           -----------------
   |                        |
   |                        these two types are declared with different lifetimes...
...
70 |             .insert_source(event_source, callback)
   |              ^^^^^^^^^^^^^ ...but data from `self` flows into `handle` here

Anyway, the way I would break this stalemate in another language (eg. C + libuv) would be to add self (ie. the MessageQueue) as the data that gets passed to the callback instead of putting it in a closure. But by that I mean, the data that gets passed to this specific callback. Calloop only takes a single instance of callback data that gets passed to every function registered with the loop, so my queue structure would need to constrain the type of loop handle it gets eg. register(&self, handle: &LoopHandle<'l, MessageQueue>). Then any code that wants to use this queue would need to construct a loop that only passes a MessageQueue to callbacks, so I'd have to incorporate extra callback data into my queue API, etc. etc.

So my suggestions, which I have not really thought the implications of through at all, are:

  • let Calloop callbacks take per-registration data (perhaps by adding an extra type parameter to LoopHandle so that event_loop.handle() lets you register callbacks of a particular type?)
  • Make Calloop callbacks also take a reference to the loop itself or a handle, so they can add timeouts etc. without having to create a closure over a loop handle

(If you do not or cannot implement this I will absolutely not take issue with that. They're just ideas I had while trying to puzzle this out.)

And if you can see anything I can do with my original code to break the stalemate, please let me know.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.