Git Product home page Git Product logo

fluxus's People

Contributors

breakbotz avatar j-ittner avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

fluxus's Issues

Add progress bar to the run function

The challenge is, there is no straightforward way of telling how many results a flow will produce, given that individual steps or parallel steps can create multiple outputs for any one input. Thus, while it is easy to count how many run results are already complete, we cannot easily translate this to 100%.

For a meaningful progress bar, we need to come up with progress estimates for all steps & propagate these along the flow. I.e. once we know the number of inputs, we estimate the number of outputs of every step to be 1, then update that estimate if the input/output ratio starts to be different. in most cases this will lead to a reasonable progress bar. In some cases, estimates may need to be updated and progress can go from say 80% back to 60%.

We should implement this as an optional argument to the run function, called progress_bar, which defaults to False. This approach ensures that the progress bar is only shown when it is actually useful, avoiding unnecessary clutter when the pipeline runs quickly due to cached results or because the user is testing out a simple pipeline.

Add Badges to Repo

Issue

Currently, the README.rst file does not contain any displayed badges that represent the repository.

Additional details on the .rst file format are located here

Specifically, we'd like to add badges for the following:

  • pypi
  • anaconda version
  • python versions
  • code style (black)
  • sphinx (doc build)
  • license (apache)
  • GitHub Action build status
  • test coverage
  • Contributor Convenant

Solution

  • Update the Readme.rst file with the relevant badges
    • Add the following where the badges should be displayed (example)
.. Begin-Badges

|pypi| |conda| |python_versions| |code_style| |made_with_sphinx_doc| |License_badge|GHA|TestCov|Contributor_Convenant

.. End-Badges
    • Add the targets for the URLs where the badges should point to (example)
.. Begin-Badges

.. |conda| image:: https://anaconda.org/bcg_gamma/gamma-facet/badges/version.svg
    :target: https://anaconda.org/BCG_Gamma/fluxus

.. |pypi| image:: https://badge.fury.io/py/artkit.svg
    :target: https://pypi.org/project/fluxus/

.. |python_versions| image:: https://img.shields.io/badge/python-3.10|3.11|3.12-blue.svg
   :target: https://www.python.org/downloads/release/python-3100/

.. |code_style| image:: https://img.shields.io/badge/code%20style-black-000000.svg
   :target: https://github.com/psf/black

.. |made_with_sphinx_doc| image:: https://img.shields.io/badge/Made%20with-Sphinx-1f425f.svg
   :target: https://bcg-x-official.github.io/facet/index.html

.. |license_badge| image:: https://img.shields.io/badge/License-Apache%202.0-olivegreen.svg
   :target: https://opensource.org/licenses/Apache-2.0

.. End-Badges
  • Update the RELEASE_NOTES.rst file:
    • Increment a minor version here

BUG: Error when returning an Iterable of Mappings in an asynchronous step

Description: When using an async function in a fluxus pipeline that returns a normal iterable an error is thrown, saying that a mapping is being expected. However, other combinations like an async iterable returned from an async function are allowed.

Expected behaviour: The pipeline runs through like it would with an async iterable or an error message explaining that this combination is not allowed is being thrown.

Consider the following three examples:
(Runs) Synchronous function, synchronous iterable:

import artkit.api as ak

def toy(input: int):
    return [{"input": i} for i in range(3)]


pipeline = ak.chain(
    ak.step("toy", toy),
    ak.step("toy", toy),
)

ak.run(pipeline, input={"input": 1}).to_frame()

(Runs) Asynchronous function, asynchronous iterable:

import artkit.api as ak

async def toy(input: int):
    for i in range(3):
        yield {"input": i}


pipeline = ak.chain(
    ak.step("toy", toy),
    ak.step("toy", toy),
)

ak.run(pipeline, input={"input": 1}).to_frame()

(Throws error) Asynchronous function, synchronous Iterable:

import artkit.api as ak

async def toy(input: int):
    return [{"input": i} for i in range(3)]


pipeline = ak.chain(
    ak.step("toy", toy),
    ak.step("toy", toy),
)
ak.run(pipeline, input={"input": 1}).to_frame()

Full error stacktrace:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[154], line 11
      4     return [{"input": i} for i in range(3)]
      7 pipeline = ak.chain(
      8     ak.step("toy", toy),
      9     ak.step("toy", toy),
     10 )
---> 11 ak.run(pipeline, input={"input": 1}).to_frame()

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/fluxus/functional/_functions.py:753, in run(steps, input, timestamps)
    750     return arun((steps >> consumer).arun())
    752 elif isinstance(steps, BaseTransformer):
--> 753     return arun(
    754         (
    755             DictProducer(lambda: {} if input is None else input, name="input")
    756             >> steps
    757             >> consumer
    758         ).arun()
    759     )
    761 else:
    762     message = (
    763         f"arg steps must be a step or composition of steps, but got a "
    764         f"{type(steps).__qualname__}"
    765     )

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:252, in arun(coroutine)
    247 try:
    248     # Use ThreadPoolExecutor to run the event loop in a new thread
    249     with ThreadPoolExecutor() as executor:
    250         # Schedule the execution of the coroutine in the new event loop,
    251         # and return the result once the coroutine has completed
--> 252         return executor.submit(_run_event_loop, new_loop, coroutine).result()
    253 except ExceptionGroup as e:
    254     # One or more coroutines raised exceptions. These exceptions are combined
    255     # into a (possibly nested) ExceptionGroup, which we catch here.
    256     # Traverse the nested exception groups to find the first atomic
    257     # sub-exception.
    258     exception_group = e

File ~/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:456, in Future.result(self, timeout)
    454     raise CancelledError()
    455 elif self._state == FINISHED:
--> 456     return self.__get_result()
    457 else:
    458     raise TimeoutError()

File ~/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:220, in arun.<locals>._run_event_loop(loop, awaitable)
    218 def _run_event_loop(loop: asyncio.AbstractEventLoop, awaitable: Awaitable[T]) -> T:
    219     asyncio.set_event_loop(loop)
--> 220     return loop.run_until_complete(awaitable)

File ~/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py:654, in BaseEventLoop.run_until_complete(self, future)
    651 if not future.done():
    652     raise RuntimeError('Event loop stopped before Future completed.')
--> 654 return future.result()

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/fluxus/core/producer/_chained_.py:126, in _ProducerFlow.arun(self)
    123 """[see superclass]"""
    125 # noinspection PyProtectedMember
--> 126 return await _aconsume(producer=self._producer, consumer=self._consumer)

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/fluxus/core/producer/_chained_.py:274, in _aconsume(producer, consumer)
    271         yield (producer_index, product)
    273 # noinspection PyTypeChecker
--> 274 return await consumer.aconsume(
    275     async_flatten(
    276         _annotate(producer_index, producer)
    277         async for producer_index, producer in (
    278             aenumerate(producer.aiter_concurrent_conduits())
    279         )
    280     )
    281 )

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/fluxus/functional/conduit/_consumer.py:92, in DictConsumer.aconsume(self, products)
     89 run_start = time.perf_counter()
     90 cumulative_time = 0.0
---> 92 async for group, end_product in products:
     93     # Get the lineage attributes for the end product, which is a dictionary
     94     # of dictionaries. The outer dictionary maps product names to their
     95     # attributes, and the inner dictionaries map attribute names to their
     96     # values.
     97     attributes = end_product.get_lineage_attributes()
     99     # Iterate over products and their attribute dicts

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:166, in _async_flatten_310(async_iter_of_iters)
    163     yield value
    165 # Wait for the producer task to complete
--> 166 await producer_task

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:145, in _async_flatten_310.<locals>._producer()
    141 async def _producer() -> None:
    142     try:
    143 
    144         # Wait for all tasks to complete, propagating any exceptions.
--> 145         await asyncio.gather(
    146             *[
    147                 asyncio.create_task(_process_nested(async_iter))
    148                 async for async_iter in async_iter_of_iters
    149             ]
    150         )
    151     finally:
    152         # Signal the end of processing by putting a sentinel value in the queue.
    153         await queue.put(cast(T, _END))

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:138, in _async_flatten_310.<locals>._process_nested(async_iter)
    137 async def _process_nested(async_iter: AsyncIterable[T]) -> None:
--> 138     async for _value in async_iter:
    139         await queue.put(_value)

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/fluxus/core/producer/_chained_.py:270, in _aconsume.<locals>._annotate(producer_index, producer_)
    267 async def _annotate(
    268     producer_index: int, producer_: SerialProducer[T_SourceProduct_arg]
    269 ) -> AsyncIterator[tuple[int, T_SourceProduct_arg]]:
--> 270     async for product in producer_:
    271         yield (producer_index, product)

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:166, in _async_flatten_310(async_iter_of_iters)
    163     yield value
    165 # Wait for the producer task to complete
--> 166 await producer_task

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:145, in _async_flatten_310.<locals>._producer()
    141 async def _producer() -> None:
    142     try:
    143 
    144         # Wait for all tasks to complete, propagating any exceptions.
--> 145         await asyncio.gather(
    146             *[
    147                 asyncio.create_task(_process_nested(async_iter))
    148                 async for async_iter in async_iter_of_iters
    149             ]
    150         )
    151     finally:
    152         # Signal the end of processing by putting a sentinel value in the queue.
    153         await queue.put(cast(T, _END))

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:138, in _async_flatten_310.<locals>._process_nested(async_iter)
    137 async def _process_nested(async_iter: AsyncIterable[T]) -> None:
--> 138     async for _value in async_iter:
    139         await queue.put(_value)

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:166, in _async_flatten_310(async_iter_of_iters)
    163     yield value
    165 # Wait for the producer task to complete
--> 166 await producer_task

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:146, in _async_flatten_310.<locals>._producer()
    141 async def _producer() -> None:
    142     try:
    143 
    144         # Wait for all tasks to complete, propagating any exceptions.
    145         await asyncio.gather(
--> 146             *[
    147                 asyncio.create_task(_process_nested(async_iter))
    148                 async for async_iter in async_iter_of_iters
    149             ]
    150         )
    151     finally:
    152         # Signal the end of processing by putting a sentinel value in the queue.
    153         await queue.put(cast(T, _END))

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/pytools/asyncio/_asyncio.py:146, in <listcomp>(.0)
    141 async def _producer() -> None:
    142     try:
    143 
    144         # Wait for all tasks to complete, propagating any exceptions.
    145         await asyncio.gather(
--> 146             *[
    147                 asyncio.create_task(_process_nested(async_iter))
    148                 async for async_iter in async_iter_of_iters
    149             ]
    150         )
    151     finally:
    152         # Signal the end of processing by putting a sentinel value in the queue.
    153         await queue.put(cast(T, _END))

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/fluxus/core/transformer/_chained_.py:196, in <genexpr>(.0)
    193 """[see superclass]"""
    195 # noinspection PyTypeChecker
--> 196 return async_flatten(
    197     self.second.atransform(tx_first)
    198     async for tx_first in self.first.atransform(source_product)
    199 )

File ~/.pyenv/versions/3.11.9/envs/rai_red_ft/lib/python3.11/site-packages/fluxus/functional/conduit/_step.py:270, in Step.atransform(self, source_product)
    267 end_time = time.perf_counter()
    269 if not isinstance(attributes, Mapping):
--> 270     raise TypeError(
    271         f"Expected function {self._function.__name__}() of step "
    272         f"{self.name!r} to return a Mapping or dict, but got: "
    273         f"{attributes!r}"
    274     )
    276 log.debug(
    277     f"Completed step {self.name!r} in {end_time - start_time:g} "
    278     f"seconds:\n"
   (...)
    285     )
    286 )
    288 yield DictProduct(
    289     name=self.name,
    290     product_attributes=attributes,
   (...)
    293     end_time=end_time,
    294 )

TypeError: Expected function toy() of step 'toy' to return a Mapping or dict, but got: [{'input': 0}, {'input': 1}, {'input': 2}]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.