Git Product home page Git Product logo

concurrency-utils's Introduction

Hi there ๐Ÿ‘‹

Keybase PGP

Work Status

I'm Charles, a Quebec-based software developer and tech enthusiast. I'm currently most proficient coding in Python(~5 years of professional experience), but I also have some practical knowledge and coding experience in various other languages.

  • Lisps(Scheme, Common Lisp, Racket)
  • Javascript/Typescript
  • Haskell
  • C
  • Lua.

And I love to read and learn on all kinds of programming languages(e.g. Erlang, Elixir, Go, Rust, Scala, Clojure, Ruby, SmallTalk).

I'm a big fan of the functional programming paradigm and try to apply its principles as often as possible(to the extent that the language and context allows) to manage complexity and ensure reliability, modulability and extensibility of software.

I have a particular interest in the open web, open protocols and open standards, networking, language design, developer ergonomics, decentralized and distributed systems and related movements.

I believe in the value of open software and open collaboration in technology(as well as other domains), both for technology developers and owners as well as for technology users(which often is pretty much everyone, as technology shapes and interacts with so much aspects of the social realities).

concurrency-utils's People

Contributors

drpyser avatar

Watchers

 avatar  avatar

concurrency-utils's Issues

Channel

Implement async and sync versions of go-like channels. Also look at haskell semantics. Basically, a bounded queue that can be shared between tasks(or units of concurrency). Closing a channel should somehow unblock any task waiting on it(perhaps raising an exception).

actor system: abstract model/interface & compatibility

Abstract interfaces for actor, system, mailbox, the various features, should be extracted from the current implementation(based on asyncio and threading). That way, alternative implementations could be made compatible.

actor system: low-level signal layer

The actor system in concurrency/threads/thread_actor.py needs a low-level signal interface to handle such things as exit signals.
Basically, there needs to be a side-channel beside the mailbox to handle exit signals. Maybe another queue?
The trick is to simultaneously handle events from this low-level signal queue and run user code, which may block waiting on mailbox events.
This suggests the user code has to cooperate with the underlying framework and allow it to process this signal queue alongside running the user code. Two models come to mind:

  • Use an async/await coroutine framework like asyncio/trio as basis for framework. blocking queues would be replaced with async equivalent from those libraries, and interacting with mailbox or signal queue would be done with async code.
    • Pro: can exploit existing async/await-friendly libraries and tools
    • Con: Adds complexity when dealing with blocking sync code(must be executed in separate thread)
    • Con: adds overall complexity to system by adding another layer of concurrency
  • Use custom coroutine framework: user code must be a coroutine generator. Interaction with the mailbox and actor system could be done by yielding requests objects, giving the framework a chance to also check for low-level signals. e.g.:
    def ping(pong):
        yield send("ping", pong, self())
        while True:
            message, key = yield receive([
                ("pong", lambda m: m.message == "pong"), 
                ("terminate", lambda m: m.message == "terminate")
            ])
            if key == "pong":
                print(f"Received pong from {message.sender}")
            elif key == "terminate":
                print(f"Received terminate message from {message.sender}")
                # we could ignore the message and continue running
                break
    On each yield, the underlying framework running this code could first check if a signal has been received in the signal queue. For example, if an exit signal is received, the framework would stop running the user code and terminate the thread.
    It could also call throw(ExitSignalError(signal.reason)) on the generator to give user code a chance to do cleanup before terminating. Or like in erlang, different type of exit signals could have different behaviors, and e.g. a "kill" signal would just break out of the loop and terminate as quickly as possible without giving user code opportunity to cleanup.
    • Pro: Custom made to use case, could mean minimal complexity
      Con: Harder to integrate with async/await code.
      Pro: Simple and flexible: can add new features to framework by adding handler for yielded requests.

actor system: name registration

Something to implement for the actor system, is a name registration mechanism, allowing actors to be addressed using a logical name instead of by their ActorRef.

Usage of registered names could be transparent, being implicitly resolved by system calls that would require an ActorRef, or require manual/explicit resolving by user code using an additional system call. For example,

  • Manual/explicit:
    ...
    pong = yield System.Resolve("pong")
    # pong is an ActorRef
    yield System.Send("ping", pong)
  • Implicit/automatic:
    ...
    yield System.Send("ping", "pong")
    # equivalently
    yield System.Send("ping", pong_ref)

Alternatively, using an abstraction layer over manual resolving:

def send(target_name_or_ref, message):
    if isinstance(target_name_or_ref, ActorRef):
        yield System.Send(message, target_name_or_ref)
    else:
        ref = yield System.Resolve(target_name_or_ref)
        yield System.Send(message, ref)

def ping():
    ...
    yield from send("pong", "ping")
    ...

An implementation concern would be to maintain an actor-local cache of resolved names, perhaps with automatic updates. The Resolve system call would first lookup the name in the actor's cache, before asking the system's name registry.

Task tree and supervisor

Implement something similar to Erlang's supervisor tree.

A supervisor is an entity that has the responsibility to start(spawn) a set of child tasks, and react to their lifecycle events(e.g. restart on exceptions, consume/log/store outputs).
Child tasks can themselves be supervisors, thus forming a supervisor tree.
Like a task group, a supervisor defines a scope for the execution of the child tasks, such that child tasks cannot outlive their supervisor(so cancellation propagates to child tasks, and an exception cancel child tasks).

Find best api to customize handling of task group "events"

Ideally, one could customize how a task group deals with task events when "joining" on context exit. That is, how to handle cancellations and exceptions, and whether to wait for all tasks to finish no matter what, cancel remaining tasks on the first exceptional completion, accumulate exceptions and reraise them together, etc.

However, its important to maintain the invariant that spawned tasks cannot outlive the scope of the task group context manager.

actor system: join_all

System.join_all needs to loop through all alive actors and poll-join them.
A simple for actor in self._actors.values() does not work if new actors can be spawned during the loop.

  • alternative way of iterating through alive actors?
  • need to maintain set of alive actors and dead actors, and check if new actors have appeared on each iteration?

actor system: request namespacing

Logical namespaces should be defined for "requests".
Basic requests could use a namespace like Actor, i.e.

  • Actor.Receive
  • Actor.Send
  • Actor.Wait
  • Actor.Self
  • Actor.Run
  • Actor.Link
  • Actor.Exit
  • Actor.Signal

Other well-scoped features would use their own namespace for extended requests, e.g. Port.Open, Port.Close, Port.*.

actor system: Port

A port is an interface for interacting with external processes, or programs running as subprocesses, or file descriptors.
A port has much the same interface as an actor. Communication can be done by message passing(ports should be addressable):

  • messages can be sent to a port, as to an actor
  • ports can send data to the mailbox of an owner process

Propagate all exceptions instead of just first in TaskGroup

Currently, awaiting a TaskGroup uses asyncio.gather, which will reraise the first exception raised from a task, without waiting for other tasks to finish. Alternatively, the implementation could wait for all tasks to finish, then, if any task failed, raise an exception representing all exceptions from tasks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.