kyocum / disdat Goto Github PK
View Code? Open in Web Editor NEWData science tool for creating and deploying pipelines with versioned data
License: Apache License 2.0
Data science tool for creating and deploying pipelines with versioned data
License: Apache License 2.0
I took dsdt
for a quick spin and tried to run the Short Test Drive instructions. I've followed the mkvirtualenv/git clone
instructions on my local Mac, then tried it in a Docker container to make it easier to reproduce:
# disdat Dockerfile
FROM python:3
RUN pip install disdat
RUN pip install awscli
$ docker run --rm -it -v $PWD:/disdat -t disdat /bin/bash
# dsdt init
# dsdt context mycontext
Disdat created data context None/mycontext at object dir /root/.disdat/context/mycontext/objects.
# dsdt switch mycontext
Switched to context mycontext
# dsdt add my.first.bundle /disdat/examples/hello_data.csv
disdat.api.Register: Source file /usr/local/lib/python3.7/site-packages/luigi/task_register.py not under git version control
# dsdt ls -v
NAME PROC_NAME OWNER DATE COMMITTED UUID TAGS
# dsdt cat
['/root/.disdat/context/mycontext/objects/fde8963b-f3c5-48e0-93f7-52bbfa5c3f0b/hello_data.csv']
The output doesn't look like the one in README.rst, but perhaps I'm not running this correctly? I verified I have access to the public files on S3.
Really interesting work, and I'm excited to make it run 😄
Dsdt pull localize should grab only the latest bundle(s) version unless you pass in like —all-versions
Provide a third name to allow users and the system to identify "same data."
For each Frame in the bundle HyperFrame, hash key and contents. If it is a link frame, then hash the files as well.
Further research needed to determine what "same" means. That is, does a content hash of a link frame also hash the file name? Does the content hash include the frame's (columns) name?
Task return , but when we try to present the bundle as a scalar, it is packed in an array, and we get the following error:
Traceback (most recent call last):
File "/Users/bkang1/.virtualenvs/cstream/bin/dsdt", line 10, in
sys.exit(main())
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/dsdt.py", line 132, in main
args.func(args)
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/fs.py", line 1884, in
cat_p.set_defaults(func=lambda args: _cat(fs, args))
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/fs.py", line 1781, in _cat
result = fs.cat(args.bundle, uuid=args.uuid, tags=common.parse_args_tags(args.tag), file=args.file)
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/fs.py", line 748, in cat
other = data_context.present_hfr(hfr)
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/data_context.py", line 1345, in present_hfr
return self.convert_hfr2scalar(hfr)
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/data_context.py", line 1268, in convert_hfr2scalar
return nda.item()
ValueError: can only convert an array of size 1 to a Python scalar
Sometimes lightweight functions (like AWS Lambda's) need to read bundles from remotes.
However, the API currently requires Disdat to be initialized (i.e., have a ~/.disdat directory for local contexts). This isn't required if you want to pull a bundle into memory (or push to a remote).
Bug: Create a base class C that implements a pipe_run function that just has a return
statement. Create a task A that inherits from C, with no pipe_run function and an empty pipe_requires. Create a second task that requires B and also inherits from C.
The code doesn't expect pipe_run to receive None arguments in. Second, when you declare an upstream dependency, Disdat tries to place it in the downstream arg list. So we need to make two changes: 1.) allow None as a return value in prepare_pipe_kwargs:461, and 2.) allow add_dependency to add a dependency that doesn't add to pipe_run kwargs.
Currently, tags are part of the bundle definition. One cannot add tags to an existing bundle.
Note that some system tags (i.e., the param tags) are used to form the unique processing name of the bundle. One must not be able to remove or alter those.
However, if someone wanted to add a tag to an existing bundle, i.e., I saw this data and I've marked it as good, should that be allowed? At the moment, one would do so by making an entirely new bundle with the additional tag.
running api.apply vs dsdt apply yields different driver task, which looks like we don't re-use existing computation.
proposal is for calls like api.local_context_exists(<ctxt>)
and api.remote_context_exists(<ctxt>,<remote>)
I need to have my workflow reference a system command that is installed via apt-get install ...
.
Is there a way to allow the dockerize step to build a custom docker image?
Or another option that would work in my case would be to have the base image be something else. Not sure if this is possible either.
===== Luigi Execution Summary =====
Scheduled 566 tasks of which:
2019-03-13 19:23:01,742 - main - ERROR - Failed to run pipeline: ApplyException Disdat Apply ran, but one or more tasks failed.
When a DataFrame with an index that is not standard (0, 1, 2...) is returned from run, the original index is not present when the DataFrame is loaded into memory for the next task or accessed from the API. This issue can be recreated using the code below:
import disdat.api as dsdt
from disdat.pipe import PipeTask
import pandas as pd
class TestIndex(PipeTask):
def pipe_requires(self, pipeline_input=None):
self.set_bundle_name('test_index')
def pipe_run(self, pipeline_input=None):
data = {
'a': [2, 3, 4],
'b': [5, 6, 7]
}
df = pd.DataFrame(data)
print 'Index should be 0, 1, 2'
print df
df.index = [7, 8, 9]
print 'Index should be 7, 8, 9'
print df
return df
if __name__ == '__main__':
dsdt.apply('tt', '-', '-', 'TestIndex', params={}, force=True)
print dsdt.search('tt', 'test_index')[0].data.index.values
This code will correctly update and print the index before it returns, however, once the DataFrame is retrieved by disdat, the index is back to the default index.
Output:
2018-12-18 15:24:25,151 - luigi-interface - INFO - Loaded []
2018-12-18 15:24:25,175 - luigi-interface - INFO - Informed scheduler that task DriverTask_True______a92c94fa32 has status PENDING
2018-12-18 15:24:25,175 - luigi-interface - INFO - Informed scheduler that task TestIndex_None_None____71e4869f25 has status PENDING
2018-12-18 15:24:25,175 - luigi-interface - INFO - Done scheduling tasks
2018-12-18 15:24:25,175 - luigi-interface - INFO - Running Worker with 1 processes
2018-12-18 15:24:25,176 - luigi-interface - INFO - [pid 53226] Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) running TestIndex(closure_bundle_proc_name_root=None, closure_bundle_uuid_root=None, output_tags={})
Index should be 0, 1, 2
a b
0 2 5
1 3 6
2 4 7
Index should be 7, 8, 9
a b
7 2 5
8 3 6
9 4 7
2018-12-18 15:24:25,211 - disdat.pipe_base - WARNING - __main__.TestIndex: Source file /Users/srowan/Development/ds/turbotiles/turbotiles/pipeline/test_index.py not under git version control
2018-12-18 15:24:25,237 - luigi-interface - INFO - [pid 53226] Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) done TestIndex(closure_bundle_proc_name_root=None, closure_bundle_uuid_root=None, output_tags={})
2018-12-18 15:24:25,238 - luigi-interface - INFO - Informed scheduler that task TestIndex_None_None____71e4869f25 has status DONE
2018-12-18 15:24:25,239 - luigi-interface - INFO - [pid 53226] Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) running DriverTask(input_bundle=-, output_bundle=-, pipe_params={}, pipe_cls=TestIndex, input_tags={}, output_tags={}, force=True)
2018-12-18 15:24:25,240 - luigi-interface - INFO - [pid 53226] Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) done DriverTask(input_bundle=-, output_bundle=-, pipe_params={}, pipe_cls=TestIndex, input_tags={}, output_tags={}, force=True)
2018-12-18 15:24:25,240 - luigi-interface - INFO - Informed scheduler that task DriverTask_True______a92c94fa32 has status DONE
2018-12-18 15:24:25,245 - luigi-interface - INFO - Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) was stopped. Shutting down Keep-Alive thread
2018-12-18 15:24:25,248 - luigi-interface - INFO -
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 2 ran successfully:
- 1 DriverTask(...)
- 1 TestIndex(...)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
Index: [0 1 2]
Currently it's a one-by-one affair. Instead, push all.
At the moment, if using the CLI or using the API to refer to classes in other modules, you have to add the path to your PYTHONPATH environment variable. Can we remove this dependency?
Currently it requires a named bundle and that it's pushed. For running locally this isn't required.
In df_dup.py, how is the following parameter used?
stop = luigi.BoolParameter(default=False)
The behavior should be that it is not a file, then ignore it (treat as string).
Baking in passwords, etc., into a docker image is generally a bad idea. We need a way to allow users to pass existing secrets through to their running containers.
Under "Bundle Collections", it says:
● status: Report current context and branch.
● branch: Create a new local branch from some context.
● checkout: Move to a different local branch.
● remote: Attach an s3 path to this local context.
But this doesn't match the text below, which describes "status", "context", "switch" , and "remote" comands.
Is there still such a thing as branches? I don't see it referenced anyplace else in the doc. Maybe there are just contexts?
Issue: Sometimes pip will fail to install a dependency when installing user's source distribution. However the dockerize command continues on and produces a container. When trying to run the pipeline, the execution will complain that the user module or package cannot be found.
Possible solution: Check that failed docker file command propagates up to dockerize output.
During bundle creation, check for uuid uniqueness in local context.
I have a local context:
$ dsdt --version
Running Disdat version 0.9.13
$ dsdt context
* test [None@None]
I am trying to attach a remote context to an existing bucket with a non-existing key but I get a RuntimeError
:
$ dsdt remote new s3://my-bucket/non-existing-key
Error code 404
Unable to bind context new because URL s3://my-bucket/non-existing-key does not exist.
Traceback (most recent call last):
File "/mnt/data/.virtualenvs/F7DBigBang/bin/dsdt", line 11, in <module>
load_entry_point('disdat==0.9.13', 'console_scripts', 'dsdt')()
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/dsdt.py", line 91, in main
args.func(args)
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1579, in <lambda>
remote_p.set_defaults(func=lambda args: _remote(fs, args))
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1363, in _remote
fs.remote_add(args.context, args.s3_url)
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1348, in remote_add
ctxt_obj.bind_remote_ctxt(remote_context, s3_url)
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/data_context.py", line 151, in bind_remote_ctxt
raise RuntimeError
RuntimeError
If fails in the same way if the key exists:
$ touch test && aws s3 cp test s3://my-bucket/non-existing-key/test
upload: ./test to s3://my-bucket/non-existing-key/test
$ dsdt remote new s3://my-bucket/non-existing-key
Error code 404
Unable to bind context new because URL s3://my-bucket/non-existing-key does not exist.
Traceback (most recent call last):
File "/mnt/data/.virtualenvs/F7DBigBang/bin/dsdt", line 11, in <module>
load_entry_point('disdat==0.9.13', 'console_scripts', 'dsdt')()
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/dsdt.py", line 91, in main
args.func(args)
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1579, in <lambda>
remote_p.set_defaults(func=lambda args: _remote(fs, args))
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1363, in _remote
fs.remote_add(args.context, args.s3_url)
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1348, in remote_add
ctxt_obj.bind_remote_ctxt(remote_context, s3_url)
File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/data_context.py", line 151, in bind_remote_ctxt
raise RuntimeError
RuntimeError
It succeeds to attach a remote context if I point to the file that I uploaded for the previous test:
$ dsdt remote new s3://my-bucket/non-existing-key/test
$ dsdt context
* test [new@s3://my-bucket/non-existing-key/test]
Am I using it wrong? It doesn't seem to be the expected behavior.
Remove deprecated botocore-session-cache library. https://github.com/mixja/boto3-session-cache
if api.push is not able to connect and push to a remote, it simply prints (not logs) the error
rather, it should throw an exception that can be caught by the user.
When a user wants to create a remote, a user should supply some command that will create an S3 bucket to use as the remote. The user could provide parameters to point to an existing bucket if the user desires
Problem: Run a task graph with one root requiring N upstream tasks each issuing a sql query, then use workers > 1.
objc[96714]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[96714]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
INFO: Task AnalystSingleHistoric____2019_02_27_analyst_historic_674f680a2c died unexpectedly with exit code -6
One can "fix" this by setting the variable below. However, this feels bad so recording.
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
see ansible/ansible#32499
This is a feature request. Currently you must return all local files or all remote (s3) files in a bundle column. I.e., b.data = ["/some/file", "/some/otherfile"]
is valid, but b.data = ["/some/file", "s3::/bucket/key/file"]
is not. Note that it is currently possible to get around this limitation by returning separate bundle columns: b.data = {'local':["/local/file"], 'remote': ["s3://bucket/file"]}
At a minimum we should provide a useful error message instead of silently failing, and consider supporting this mix. At the moment, one can use the Bundle object to selectively localize and delocalize file links, but this only works for pre-existing bundles. The above request would align that feature with creation as well.
Allow dsdt rm
to remove bundles on remote.
At the moment, dsdt ls
plus aws s3 rm
is the solution, but it's dangerous.
The uncommitted history currently maintains a maximum of N uncommitted bundles with the same human name. This can have the unintended consequence of deleting bundles needed by downstream tasks if users do not assign unique human names to the bundles. Note that, if the user does not set the human name in pipe_requires, then this shouldn't be an issue. But what we need to determine is a.) whether an uncommitted history is important to keep as a feature and b.) if so, is there a more intuitive mode of operation.
To re-run the last task you currently have to
a.) remove the last bundle
b.) change an input parameter (if possible)
replace pull with localize
add de-localize to remove the files if already available on cloud provider / remote.
although much of disdat output uses python logging, there are still many print statements that make it difficult to filter out unwanted verbose output for users of disdat library
When a task returns None
, it creates a bundle whose presentation is HF (i.e., HyperFrame) and which has zero frames (no named columns). When trying to present this bundle, an assertion checking for frames>=1 fails.
Push bundle, drop links locally b/c they're on the context.
Please add the example from Ken's Vimeo talk (ExamplePipeline.py) to examples/pipelines. It's nice since it's really simple. What worked for me is:
import numpy as np
from disdat.pipe import PipeTask
class GenData(PipeTask):
def pipe_requires(self, **kwargs):
self.set_bundle_name("GenData")
def pipe_run(self, **kwargs):
return np.array([77, 100, 88])
class Average(PipeTask):
def pipe_requires(self, **kwargs):
params = [{}]
self.add_dependency("my_input_data", GenData, {})
def pipe_run(self, my_input_data=None, **kwargs):
print my_input_data, type(my_input_data)
return {'average' : [np.average(my_input_data)]}
Can we please add a "run" shell script to each example py file in examples/pipelines?
For example, for "ExamplePipeline.py" mentioned in the previous issue, we would add a "run_ExamplePipeline.sh" shell script, with the following contents:
dsdt apply - average ExamplePipeline.Average
Having the run scripts is really helpful for a new person, rather than having to guess at what the apply parameters should be for each py script.
Currently dsdt ls
lists only the final output bundles of pipelines. One must use dsdt ls -i
to see all the final and intermediate outputs. Propose dsdt ls -f
to only print final outputs, and dsdt ls
spills everything, and dsdt ls -i
only spills intermediate.
There are still several messages that reference the 'dsdt checkout' command, which I believe is deprecated:
$ grep -r --include *.py 'dsdt checkout' .
./build/lib/disdat/fs.py: print "No current local context, please change context with 'dsdt checkout'"
./build/lib/disdat/fs.py: print("Disdat on context {}, please 'dsdt checkout ' before deleting.".format(local_context))
./disdat/fs.py: print "No current local context, please change context with 'dsdt checkout'"
./disdat/fs.py: print("Disdat on context {}, please 'dsdt checkout ' before deleting.".format(local_context))
Disdat checks for the ability to find dynamo, but that requires an aws region. For now, we don't use dynamo, but we're also spitting out an error that the region isn't found. We need to simply remove this message.
2018-10-10 20:09:28,680 - disdat.data_context - ERROR - Unable to use a string-based database reference[db://Analytics.DATA_SCIENCE.DISDAT_careops3_tto_daily_forecast_blend_77e7ddf1fac24617984f19eac1ce0338@pprddaavth-vip.ie.intuit.net], please return DBTarget object instead.
Traceback (most recent call last):
File "/Users/crivera5/.virtualenvs/careops/bin/dsdt", line 11, in
sys.exit(main())
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/dsdt.py", line 122, in main
args.func(args)
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/fs.py", line 1727, in
pull_p.set_defaults(func=lambda args: _pull(fs, args))
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/fs.py", line 1565, in _pull
fs.pull(bundle, uuid, localize=args.localize)
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/fs.py", line 1474, in pull
s3_uuid, data_context)
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/fs.py", line 1312, in _localize_hfr
DataContext.copy_in_files(f, managed_path)
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/data_context.py", line 1116, in copy_in_files
raise Exception("data_context:copy_in_files error trying to copy in string-based database reference.")
Exception: data_context:copy_in_files error trying to copy in string-based database reference.
We sys.exit(1) if we try to initialize an already initialized disdat home directory. It shouldn't be an error, it should just create a message and return.
i get this a lot when i have an error (on my side) creating a bundle:
MOONSCAPE INFO - creating bundle "inputs" with job_id=e549985059fa49008b3ecbaa239b8191
disdat ERROR - Removal of hyperframe directory 7edd576a-fd35-482f-917b-47dae6f12559 failed with error [Errno 2] No such file or directory: '/Users/jcessna/.disdat/context/moonscape-dev/objects/7edd576a-fd35-482f-917b-47dae6f12559'. Continuing removal...
it seems like if there is an error, disdat is trying to do some cleanup, but then it logs an error if the hyperframe directory cannot be found… probably because it wasn’t created before the error happened.
this should not be a disdat error. it is confusing and misleading. instead, can there be some type of log message that says something like… user error, cleaning up corrupted bundle. and then maybe a warning if it doesn’t find things…?
While the Disdat container entrypoint supports both -f and --force-all, we need to add --force-all to the CLI.
All tasks in a pipeline (and requires) have an optional pipeline_inputs parameter. However, this is a.) not often used b.) can be emulated by passing the bundle name in a parameter and passing through via std. Luigi patterns.
Currently dsdt run
requires a remote to be set when running container locally. Instead, only require a remote if they either push or pull (whether or not it runs locally or remotely).
Also change default behavior when running locally to no-push and no-pull, just as if the user called dsdt apply
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.