marsupialtail / quokka Goto Github PK
View Code? Open in Web Editor NEWMaking data lake work for time series
Home Page: https://marsupialtail.github.io/quokka/
License: Apache License 2.0
Making data lake work for time series
Home Page: https://marsupialtail.github.io/quokka/
License: Apache License 2.0
Currently the Quokka execution plan should support CBO and join ordering. This is a great issue for someone who wants to impress a Snowflake or Redshift interviewer! You can say you implemented a CBO join optimizer almost from scratch haha.
What needs to be done:
Currently Quokka's optimizer implements predicate pushdown early projection and map fusion.
After these three steps, most of the nodes in the system should be MapNodes and StatefulNodes (aka joins). Some of those joins might already be broadcast joins with materialized arguments, that's okay.
Join Fusion. We are now going to replace trees of join nodes with join nodes with multiple operands. This will greatly simplify the tree and expose different choice of joins. We will do predicate pull up from the individual join nodes. This should be a pretty straightforward optimization pass.
Cardinality Estimation. Input nodes now should have all the relevant early projection and pushed down filter, and can estimate how much data will be produced based on this. We are going to estimate the number of unique values for each column as well as the total expected size of the output from this node. This estimate will be pushed up the tree as far as possible.
If the estimated size is smaller than some threshold, the entire input will be materialized as a constant variable in the query plan. The relevant joins will be converted to broadcast joins at this stage.
This should also be a pretty straightforward pass on the graph. The hardest part will be to implement the actual CBO that answers the cardinality estimates given a predicate.
Join order selection. The join node will now figure out the order in which to do the joins. If some of its inputs are materialized it will join them in the main thread first. We are going to restrict ourselves to left deep plans and enumerate all n! possible plans. We will use all the nice independence assumptions to compute the join cost of each ordering. If some of the tables have already been materialized, they will be joined first to produce bigger/smaller materialized tables. This could potentially lead to a size blowup, which we don't address.
The "schema" of the DataStream after the aggregation is missing the mean
columns and is thus incorrect.
Run the hello world example:
from pyquokka.df import QuokkaContext
qc = QuokkaContext()
lineitem = qc.read_csv("lineitem.tbl.named", sep="|", has_header=True)
d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
print(len(d.schema))
d = d.with_column("disc_price", lambda x: x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})
print(len(d.schema))
d = d.with_column("charge", lambda x: x["l_extendedprice"] * (1 - x["l_discount"]) * (1 + x["l_tax"]), required_columns={"l_extendedprice", "l_discount", "l_tax"})
print(len(d.schema))
f = d.groupby(["l_returnflag", "l_linestatus"], orderby=["l_returnflag","l_linestatus"]).agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "charge":"sum", "l_discount":"avg","*":"count"})
print(f.schema)
print(len(d.schema)) # <--- this is incorrect and only has 8 columns
df = f.collect()
print(df.columns)
print(len(df.columns)) # <--- this has 10 columns and is correct
Is that when I get a DataStream, the "schema" it has should reflect what it would actually produce.
This needs to happen at some point.
Issues are static linking apache arrow. ideally we just copy a binary. I guess we could also copy a binary and the arrow shared object. The problem now is we have to find the right arrow shared object for the target cloud environment. This probably fine. If you install pyarrow it comes with the shared objects.
Getting this to work is critical for performance.
@savebuffer
Steps:
this could be very beneficial if it significantly reduce number of output rows. Good first issue for optimizer related things.
Likely will need to make a new node type in logical.py to accomplish this.
How do we track progress and get estimated job completion time with Quokka?
In Quokka we have input actors whose progress can easily be tracked (we know the upper limit of the iteration). We also have stateful executors downstream who we have no idea how many inputs they have to process.
Either way we need to have a centralized actor that tracks all the progress and displays the progress bar. This actor can also be used to host a dashboard in the future.
One plan could be as follows: first we track all the input nodes. Then after those are done, the stateful executor nodes in the next stage should have all the inputs they ever need in their mailbox, which is kinda an upper limit on how fast they can go. The batching of inputs can potentially create a problem as this corresponds to big jumps in the progress bar, but it should be ok.
Some simple version of this is very good to have.
Hi! I did not find any CI setup. It’d be good to have some unit tests and installation for developer instruction.
Currently backpressuring thorugh the executor watermark table is only supported during normal execution. This should be supported during fault recovery too.
Not a priority.
the projection attribute in api.py is sometimes a list and sometimes a set. This needs to be cleared up.
by changing the right_col handle logic in join in datastream... needs tests to make sure it work properly
It is useful to have a notion of ordering among input datastreams to a node, to support things like build-probe joins that use less memory than the streaming two sided join Quokka uses today.
The ordering requirements at each node can probably just be translated down into a requirement of which input sources have to be completed before other input sources start.
Once the algorithm for the above is completed, shouldn't be that hard to change the Quokka runtime to support this.
The current data plane uses Ray for same machine comms and Redis for inter-machine comms.
Both should be replaced by a custom Arrow Flight server, with two benefits:
I am following this tutorial and I get the following exception:
exceptions.ShapeError: 17 column names provided for a dataframe of width 16
lineitem.count()
also throws another exception, namely,
exceptions.ShapeError: 17 column names provided for a dataframe of width 16
and here is the code:
from pyquokka.df import *
qc = QuokkaContext()
disk_path = "Downloads/demo-tpch/"
# the last column is called NULL, because the TPC-H data generator likes to put a | at the end of each row, making it appear as if there is a final column
# with no values. Don't worry, we can drop this column.
lineitem_scheme = ["l_orderkey","l_partkey","l_suppkey","l_linenumber","l_quantity","l_extendedprice", "l_discount","l_tax","l_returnflag","l_linestatus","l_shipdate","l_commitdate","l_receiptdate","l_shipinstruct","l_shipmode","l_comment", "null"]
#lineitem = qc.read_csv(disk_path + "lineitem.tbl", sep="|", has_header=True)
lineitem = qc.read_csv(disk_path + "lineitem.tbl", lineitem_scheme, sep="|")
orders = qc.read_csv(disk_path + "orders.tbl", sep="|", has_header=True)
customer = qc.read_csv(disk_path + "customer.tbl",sep = "|", has_header=True)
part = qc.read_csv(disk_path + "part.tbl", sep = "|", has_header=True)
supplier = qc.read_csv(disk_path + "supplier.tbl", sep = "|", has_header=True)
partsupp = qc.read_csv(disk_path + "partsupp.tbl", sep = "|", has_header=True)
nation = qc.read_csv(disk_path + "nation.tbl", sep = "|", has_header=True)
region = qc.read_csv(disk_path + "region.tbl", sep = "|", has_header=True)
lineitem.count()
I am using Arch Linux and redis-7.0.10-1.
DuckDB can be used in many places in Quokka, mostly replacing Polars. This can be approached in stages.
===== SQL predicates =======
Currently Quokka maintains an interpreter that executes SQL predicates with Polars or Pandas (https://github.com/marsupialtail/quokka/blob/master/pyquokka/sql_utils.py#L19)
Perhaps we should just execute this predicate with DuckDB.
Pros of switching:
Cons of switching:
====== Aggregations and groupbys ======
Currently Quokka uses Apache Arrow to do aggregations and groupbys.
Perhaps we should also just use DuckDB.
======= Executor kernels ===========
Quokka kernels today almost exclusively use Polars. Some can probably be switched to DuckDB.
Pros of switching:
Cons of switching:
This is a long running issue on trying to make Quokka work on Kubernetes.
Currently Quokka works on a Dockerized setup on a Ray cluster, so the leap to Kubernetes is probably not big. We can also refer to how Spark works on Kubernetes: https://spark.apache.org/docs/latest/running-on-kubernetes.html
We can probably get started with assuming that Quokka pods will not get pre-empted. If pods do get pre-empted, Quokka does not tolerate coordinator failures.
Currently the correctness of the staged execution hinges on:
This works just fine in normal execution. When there is a failure, there could be an intricate scenario as follows. Consider a left deep join tree, where all intermediates and probe input have stage 0 and build inputs have stage -1. Consider the join node at the top, which has stage 0. Upon normal execution all build inputs have finished and thus stage counter has incremented to 0. Now the join node on the top has not yet executed anything.
Now if it is to execute, the Arrow Flight server will preferentially serve up the build side first. Great.
However if the machine it is one dies, and it is resurrected on another machine, it will ask for any inputs from the build and the intermediate node before it, which could be replayed. Now the global stage counter is 0. If the probe side replayer finishes fast, and this node executes before the build side replayer is done, then we will have a problem.
need to figure out why groupbys are distributed, perhaps every one should be?
Due to it being a proprietary DB that doesn't speak mysql or Postgres dialects, and is a columnar store - I'd love to see the ability to use Quokka to query SAP HANA
Currently it is not possible to run redis server for local development in docker:
Initializing local Quokka cluster.
2023-06-08 08:01:39,652 INFO worker.py:1625 -- Started a local Ray instance.
Traceback (most recent call last):
File "/Users/x/Documents/Projects/quokka-test/main.py", line 8, in <module>
local_cluster = LocalCluster()
File "/Users/x/Documents/Projects/quokka-test/venv/lib/python3.10/site-packages/pyquokka/utils.py", line 148, in __init__
self.redis_process = subprocess.Popen(["redis-server" , pyquokka_loc + "redis.conf", "--port 6800", "--protected-mode no"], preexec_fn=preexec_function)
File "/Users/x/.pyenv/versions/3.10.5/lib/python3.10/subprocess.py", line 969, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "/Users/x/.pyenv/versions/3.10.5/lib/python3.10/subprocess.py", line 1845, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
FileNotFoundError: [Errno 2] No such file or directory: 'redis-server'
Add optional **kwargs
to LocalCluster
, so it is backward compatible.
options = {
'docker_redis_enabled': True,
}
local_cluster = LocalCluster(**options)
qc = QuokkaContext(local_cluster)
Delta Lake is a common OSS table format that would be useful to support with Quokka.
handle situations where a table is used in two places in a pipelined plan. Not common in normal SQL, but note:
d1 = customer.join(nation, left_on = "c_nationkey", right_on = "n_nationkey")
d1 = d1.join(orders, left_on = "c_custkey", right_on = "o_custkey", suffix = "_3")
d2 = supplier.join(nation, left_on="s_nationkey", right_on = "n_nationkey")
d2 = lineitem.join(d2, left_on = "l_suppkey", right_on = "s_suppkey", suffix = "_3")
d = d1.join(d2, left_on = "o_orderkey", right_on = "l_orderkey",suffix="_4")
Currently it is needed to do self.PFT.get after self.PFT.set to ensure that the PFT table has been properly updated before invoking the Ray RPCs which will cause the PFT table to be read across the workers.
If we don't do this I observe failures in properly initializing the workers. Adding a delay immediately after the self.PFT.set also works, which leads me to suspect a Redis timing issue.
This theoretically should not happen as the self.PFT.set should be blocking in the client, as in it should finish updating Redis atomically before returning, thus the Redis state should be good before the RPCs are launched.
This is very odd. Though this doesn't pose a problem right now we should understand this behavior.
Code:
class MyClass {
#value: number;
[Symbol.iterator]() {
}
}
new MyClass();
Output:
Quokka PRO 'Untitled-1.ts' (node: v18.14.2, TypeScript: v5.1.6, plugins: jsdom-quokka-plugin)
.\quokka.js:8
[_MyClass_value = new WeakMap(), Symbol.iterator]() {
^
SyntaxError: Unexpected token ','
at internalCompileFunction (node:internal/vm:73:18)
at wrapSafe (node:internal/modules/cjs/loader:1176:20)
at Module._compile (node:internal/modules/cjs/loader:1218:27)
https://pysimdjson.tkte.ch/native.html
Good first issue.
Quokka currently has very high initialization overhead for launching input reader actors.
This should go away when we migrate towards an architecture where we only have one actor per machine, but before that happens it would be good to have some sort of way to reduce this initialization overhead.
This is similar to Spark's optimization of moving input partitioning to parallel across the workers instead of on the coordinator in the olden days.
The problem is mainly two fold:
Our architecture change should take care of the second one but won't solve the first one. We should probably work on optimizing parallelizing this process, and have the actual worker nodes, who should be alive when all this is happening anyways, to do a lot of this work, and communicate the results back to the master with ray object store or something.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.