Git Product home page Git Product logo

quokka's People

Contributors

a-jm avatar ea-coding avatar fabioibanez avatar gkaretka avatar jobhdez avatar marsupialtail avatar robcxyz avatar romainr avatar sarahfujimori avatar skrawcz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

quokka's Issues

Streaming algorithms

  • Covariance (One pass Welford)
  • Approx Count Distinct (HLL sketch)
  • Approximate quantiles (Greenwald-Khanna )

join order and CBO meta-thread

Currently the Quokka execution plan should support CBO and join ordering. This is a great issue for someone who wants to impress a Snowflake or Redshift interviewer! You can say you implemented a CBO join optimizer almost from scratch haha.

What needs to be done:

Currently Quokka's optimizer implements predicate pushdown early projection and map fusion.

After these three steps, most of the nodes in the system should be MapNodes and StatefulNodes (aka joins). Some of those joins might already be broadcast joins with materialized arguments, that's okay.

  1. Join Fusion. We are now going to replace trees of join nodes with join nodes with multiple operands. This will greatly simplify the tree and expose different choice of joins. We will do predicate pull up from the individual join nodes. This should be a pretty straightforward optimization pass.

  2. Cardinality Estimation. Input nodes now should have all the relevant early projection and pushed down filter, and can estimate how much data will be produced based on this. We are going to estimate the number of unique values for each column as well as the total expected size of the output from this node. This estimate will be pushed up the tree as far as possible.
    If the estimated size is smaller than some threshold, the entire input will be materialized as a constant variable in the query plan. The relevant joins will be converted to broadcast joins at this stage.
    This should also be a pretty straightforward pass on the graph. The hardest part will be to implement the actual CBO that answers the cardinality estimates given a predicate.

  3. Join order selection. The join node will now figure out the order in which to do the joins. If some of its inputs are materialized it will join them in the main thread first. We are going to restrict ourselves to left deep plans and enumerate all n! possible plans. We will use all the nice independence assumptions to compute the join cost of each ordering. If some of the tables have already been materialized, they will be joined first to produce bigger/smaller materialized tables. This could potentially lead to a size blowup, which we don't address.

Schema not correct after aggregation

what

The "schema" of the DataStream after the aggregation is missing the mean columns and is thus incorrect.

to reproduce

Run the hello world example:

from pyquokka.df import QuokkaContext
qc = QuokkaContext()
lineitem = qc.read_csv("lineitem.tbl.named", sep="|", has_header=True)
d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
print(len(d.schema))
d = d.with_column("disc_price", lambda x: x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})
print(len(d.schema))
d = d.with_column("charge", lambda x: x["l_extendedprice"] * (1 - x["l_discount"]) * (1 + x["l_tax"]), required_columns={"l_extendedprice", "l_discount", "l_tax"})
print(len(d.schema))
f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag","l_linestatus"]).agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "charge":"sum", "l_discount":"avg","*":"count"})
print(f.schema)
print(len(d.schema)) # <--- this is incorrect and only has 8 columns
df = f.collect()
print(df.columns)  
print(len(df.columns)) # <--- this has 10 columns and is correct

what I expect

Is that when I get a DataStream, the "schema" it has should reflect what it would actually produce.

Write Apache Arrow Flight server in C++

This needs to happen at some point.

Issues are static linking apache arrow. ideally we just copy a binary. I guess we could also copy a binary and the arrow shared object. The problem now is we have to find the right arrow shared object for the target cloud environment. This probably fine. If you install pyarrow it comes with the shared objects.

Getting this to work is critical for performance.

add predicate pushdown to broadcast join operator.

this could be very beneficial if it significantly reduce number of output rows. Good first issue for optimizer related things.

Likely will need to make a new node type in logical.py to accomplish this.

Adding progress tracking/UI to Quokka meta-thread

How do we track progress and get estimated job completion time with Quokka?

In Quokka we have input actors whose progress can easily be tracked (we know the upper limit of the iteration). We also have stateful executors downstream who we have no idea how many inputs they have to process.

Either way we need to have a centralized actor that tracks all the progress and displays the progress bar. This actor can also be used to host a dashboard in the future.

One plan could be as follows: first we track all the input nodes. Then after those are done, the stateful executor nodes in the next stage should have all the inputs they ever need in their mailbox, which is kinda an upper limit on how fast they can go. The batching of inputs can potentially create a problem as this corresponds to big jumps in the progress bar, but it should be ok.

Some simple version of this is very good to have.

Absence of CI

Hi! I did not find any CI setup. It’d be good to have some unit tests and installation for developer instruction.

Support ordering

It is useful to have a notion of ordering among input datastreams to a node, to support things like build-probe joins that use less memory than the streaming two sided join Quokka uses today.

The ordering requirements at each node can probably just be translated down into a requirement of which input sources have to be completed before other input sources start.

Once the algorithm for the above is completed, shouldn't be that hard to change the Quokka runtime to support this.

Remove Redis as a dependency

The current data plane uses Ray for same machine comms and Redis for inter-machine comms.

Both should be replaced by a custom Arrow Flight server, with two benefits:

  • No more disk flushing and reading on demand on same machine
  • No more pickle.loads or pickle.dumps
  • Possibility for more custom behavior
  • Better control from Python side

exceptions.ShapeError: 17 column names provided for a dataframe of width 16

I am following this tutorial and I get the following exception:

   exceptions.ShapeError: 17 column names provided for a dataframe of width 16

lineitem.count() also throws another exception, namely,

   exceptions.ShapeError: 17 column names provided for a dataframe of width 16

and here is the code:

from pyquokka.df import * 
qc = QuokkaContext()

disk_path = "Downloads/demo-tpch/"
# the last column is called NULL, because the TPC-H data generator likes to put a | at the end of each row, making it appear as if there is a final column
# with no values. Don't worry, we can drop this column. 
lineitem_scheme = ["l_orderkey","l_partkey","l_suppkey","l_linenumber","l_quantity","l_extendedprice", "l_discount","l_tax","l_returnflag","l_linestatus","l_shipdate","l_commitdate","l_receiptdate","l_shipinstruct","l_shipmode","l_comment", "null"]
#lineitem = qc.read_csv(disk_path + "lineitem.tbl", sep="|", has_header=True)
lineitem = qc.read_csv(disk_path + "lineitem.tbl", lineitem_scheme, sep="|")
orders = qc.read_csv(disk_path + "orders.tbl", sep="|", has_header=True)
customer = qc.read_csv(disk_path + "customer.tbl",sep = "|", has_header=True)
part = qc.read_csv(disk_path + "part.tbl", sep = "|", has_header=True)
supplier = qc.read_csv(disk_path + "supplier.tbl", sep = "|", has_header=True)
partsupp = qc.read_csv(disk_path + "partsupp.tbl", sep = "|", has_header=True)
nation = qc.read_csv(disk_path + "nation.tbl", sep = "|", has_header=True)
region = qc.read_csv(disk_path + "region.tbl", sep = "|", has_header=True)

lineitem.count()

I am using Arch Linux and redis-7.0.10-1.

Explore using DuckDB as computation engine

DuckDB can be used in many places in Quokka, mostly replacing Polars. This can be approached in stages.

===== SQL predicates =======

Currently Quokka maintains an interpreter that executes SQL predicates with Polars or Pandas (https://github.com/marsupialtail/quokka/blob/master/pyquokka/sql_utils.py#L19)

Perhaps we should just execute this predicate with DuckDB.

Pros of switching:

  • No need to maintain this interpreter!
  • Possibly better performance (this needs to be validated)

Cons of switching:

  • It might be better to maintain this interpreter if eventually we want Quokka to generate SIMD code in Gandiva fashion. Then the predicate can just be compiled down into a shared object library that can be loaded at runtime.

====== Aggregations and groupbys ======

Currently Quokka uses Apache Arrow to do aggregations and groupbys.

Perhaps we should also just use DuckDB.

======= Executor kernels ===========

Quokka kernels today almost exclusively use Polars. Some can probably be switched to DuckDB.

Pros of switching:

  • Possibly better out-of-core support

Cons of switching:

  • Want to wait until Arrow 10.0 with the super out-of-core fast hash join support.

Kubernetes support

This is a long running issue on trying to make Quokka work on Kubernetes.

Currently Quokka works on a Dockerized setup on a Ray cluster, so the leap to Kubernetes is probably not big. We can also refer to how Spark works on Kubernetes: https://spark.apache.org/docs/latest/running-on-kubernetes.html

We can probably get started with assuming that Quokka pods will not get pre-empted. If pods do get pre-empted, Quokka does not tolerate coordinator failures.

Very subtle bug in fault tolerance for staged execution (not in release yet, fix before next release)

Currently the correctness of the staged execution hinges on:

  • The global stage counter in Redis increments past X only after all the actors in stage X have pushed data to their targets
  • Arrow Flight server serves up data corresponding to stage X-1 before serving up data corresponding to stage X to whoever asks.

This works just fine in normal execution. When there is a failure, there could be an intricate scenario as follows. Consider a left deep join tree, where all intermediates and probe input have stage 0 and build inputs have stage -1. Consider the join node at the top, which has stage 0. Upon normal execution all build inputs have finished and thus stage counter has incremented to 0. Now the join node on the top has not yet executed anything.

Now if it is to execute, the Arrow Flight server will preferentially serve up the build side first. Great.

However if the machine it is one dies, and it is resurrected on another machine, it will ask for any inputs from the build and the intermediate node before it, which could be replayed. Now the global stage counter is 0. If the probe side replayer finishes fast, and this node executes before the build side replayer is done, then we will have a problem.

Data source challenge: SAP HANA

Due to it being a proprietary DB that doesn't speak mysql or Postgres dialects, and is a columnar store - I'd love to see the ability to use Quokka to query SAP HANA

Unable to use redis running in docker

Description

Currently it is not possible to run redis server for local development in docker:

Initializing local Quokka cluster.
2023-06-08 08:01:39,652 INFO worker.py:1625 -- Started a local Ray instance.
Traceback (most recent call last):
  File "/Users/x/Documents/Projects/quokka-test/main.py", line 8, in <module>
    local_cluster = LocalCluster()
  File "/Users/x/Documents/Projects/quokka-test/venv/lib/python3.10/site-packages/pyquokka/utils.py", line 148, in __init__
    self.redis_process = subprocess.Popen(["redis-server" , pyquokka_loc + "redis.conf", "--port 6800", "--protected-mode no"], preexec_fn=preexec_function)
  File "/Users/x/.pyenv/versions/3.10.5/lib/python3.10/subprocess.py", line 969, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/Users/x/.pyenv/versions/3.10.5/lib/python3.10/subprocess.py", line 1845, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'redis-server'

Solution

Add optional **kwargs to LocalCluster, so it is backward compatible.

options = {
    'docker_redis_enabled': True,
}

local_cluster = LocalCluster(**options)
qc = QuokkaContext(local_cluster)

handle multi-target outputs

handle situations where a table is used in two places in a pipelined plan. Not common in normal SQL, but note:

d1 = customer.join(nation, left_on = "c_nationkey", right_on = "n_nationkey")
d1 = d1.join(orders, left_on = "c_custkey", right_on = "o_custkey", suffix = "_3")
d2 = supplier.join(nation, left_on="s_nationkey", right_on = "n_nationkey")
d2 = lineitem.join(d2, left_on = "l_suppkey", right_on = "s_suppkey", suffix = "_3")

d = d1.join(d2, left_on = "o_orderkey", right_on = "l_orderkey",suffix="_4")

Investigation needed, odd Redis behavior

Currently it is needed to do self.PFT.get after self.PFT.set to ensure that the PFT table has been properly updated before invoking the Ray RPCs which will cause the PFT table to be read across the workers.

If we don't do this I observe failures in properly initializing the workers. Adding a delay immediately after the self.PFT.set also works, which leads me to suspect a Redis timing issue.

This theoretically should not happen as the self.PFT.set should be blocking in the client, as in it should finish updating Redis atomically before returning, thus the Redis state should be good before the RPCs are launched.

This is very odd. Though this doesn't pose a problem right now we should understand this behavior.

Unexpected token ',' when using together an iterator and a private field

Code:

class MyClass {
  #value: number;
  [Symbol.iterator]() {

  }
}
new MyClass();

Output:

​​​​​Quokka PRO 'Untitled-1.ts' (node: v18.14.2, TypeScript: v5.1.6, plugins: jsdom-quokka-plugin)​​​​
.\quokka.js:8 
    [_MyClass_value = new WeakMap(), Symbol.iterator]() { 
                                   ^ 
 
SyntaxError: Unexpected token ',' 
    at internalCompileFunction (node:internal/vm:73:18) 
    at wrapSafe (node:internal/modules/cjs/loader:1176:20) 
    at Module._compile (node:internal/modules/cjs/loader:1218:27) 

reduce initialization overhead - meta thread

Quokka currently has very high initialization overhead for launching input reader actors.

This should go away when we migrate towards an architecture where we only have one actor per machine, but before that happens it would be good to have some sort of way to reduce this initialization overhead.

This is similar to Spark's optimization of moving input partitioning to parallel across the workers instead of on the coordinator in the olden days.

The problem is mainly two fold:

  • Most of the initialization is done in Python and serially, and might involve network calls, e.g. instantiating a pyarrow parquet S3 dataset and dividing up the work.
  • Launching actors in Ray is expensive.

Our architecture change should take care of the second one but won't solve the first one. We should probably work on optimizing parallelizing this process, and have the actual worker nodes, who should be alive when all this is happening anyways, to do a lot of this work, and communicate the results back to the master with ray object store or something.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.