Git Product home page Git Product logo

disdat's People

Contributors

eaddingtonwhite avatar jcessna avatar karenlo2 avatar kyocum avatar pmdaly avatar sayantansatpati avatar seanr15 avatar tmwong2003 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

disdat's Issues

Can't run through tutorial steps successfully

I took dsdt for a quick spin and tried to run the Short Test Drive instructions. I've followed the mkvirtualenv/git clone instructions on my local Mac, then tried it in a Docker container to make it easier to reproduce:

# disdat Dockerfile
FROM python:3
RUN pip install disdat
RUN pip install awscli
$ docker run --rm -it -v $PWD:/disdat -t disdat /bin/bash
# dsdt init
# dsdt context mycontext
Disdat created data context None/mycontext at object dir /root/.disdat/context/mycontext/objects.
# dsdt switch mycontext
Switched to context mycontext
# dsdt add my.first.bundle /disdat/examples/hello_data.csv
disdat.api.Register: Source file /usr/local/lib/python3.7/site-packages/luigi/task_register.py not under git version control
# dsdt ls -v
NAME                	PROC_NAME           	OWNER   	DATE              	COMMITTED	UUID                                    	TAGS
# dsdt cat
['/root/.disdat/context/mycontext/objects/fde8963b-f3c5-48e0-93f7-52bbfa5c3f0b/hello_data.csv']

The output doesn't look like the one in README.rst, but perhaps I'm not running this correctly? I verified I have access to the public files on S3.

Really interesting work, and I'm excited to make it run 😄

Localize pull only latest

Dsdt pull localize should grab only the latest bundle(s) version unless you pass in like —all-versions

Proposed third name for bundle: content hash

Provide a third name to allow users and the system to identify "same data."

For each Frame in the bundle HyperFrame, hash key and contents. If it is a link frame, then hash the files as well.

Further research needed to determine what "same" means. That is, does a content hash of a link frame also hash the file name? Does the content hash include the frame's (columns) name?

bundle unable to present as a scalar

Task return , but when we try to present the bundle as a scalar, it is packed in an array, and we get the following error:

Traceback (most recent call last):
File "/Users/bkang1/.virtualenvs/cstream/bin/dsdt", line 10, in
sys.exit(main())
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/dsdt.py", line 132, in main
args.func(args)
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/fs.py", line 1884, in
cat_p.set_defaults(func=lambda args: _cat(fs, args))
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/fs.py", line 1781, in _cat
result = fs.cat(args.bundle, uuid=args.uuid, tags=common.parse_args_tags(args.tag), file=args.file)
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/fs.py", line 748, in cat
other = data_context.present_hfr(hfr)
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/data_context.py", line 1345, in present_hfr
return self.convert_hfr2scalar(hfr)
File "/Users/bkang1/.virtualenvs/cstream/lib/python3.6/site-packages/disdat/data_context.py", line 1268, in convert_hfr2scalar
return nda.item()
ValueError: can only convert an array of size 1 to a Python scalar

Allow API push/pull without `dsdt init`

Sometimes lightweight functions (like AWS Lambda's) need to read bundles from remotes.
However, the API currently requires Disdat to be initialized (i.e., have a ~/.disdat directory for local contexts). This isn't required if you want to pull a bundle into memory (or push to a remote).

Support `None` return type in argument and in add_dependency

Bug: Create a base class C that implements a pipe_run function that just has a return statement. Create a task A that inherits from C, with no pipe_run function and an empty pipe_requires. Create a second task that requires B and also inherits from C.

The code doesn't expect pipe_run to receive None arguments in. Second, when you declare an upstream dependency, Disdat tries to place it in the downstream arg list. So we need to make two changes: 1.) allow None as a return value in prepare_pipe_kwargs:461, and 2.) allow add_dependency to add a dependency that doesn't add to pipe_run kwargs.

Add tags dynamically

Currently, tags are part of the bundle definition. One cannot add tags to an existing bundle.

Note that some system tags (i.e., the param tags) are used to form the unique processing name of the bundle. One must not be able to remove or alter those.

However, if someone wanted to add a tag to an existing bundle, i.e., I saw this data and I've marked it as good, should that be allowed? At the moment, one would do so by making an entirely new bundle with the additional tag.

Is there a way to install system packages in the dockerize step?

I need to have my workflow reference a system command that is installed via apt-get install ....
Is there a way to allow the dockerize step to build a custom docker image?

Or another option that would work in my case would be to have the base image be something else. Not sure if this is possible either.

Success container with retries still appears failed on batch

===== Luigi Execution Summary =====
Scheduled 566 tasks of which:

  • 41 present dependencies were encountered:
  • 12 xxx
  • 12 yyyy
  • 525 ran successfully:
  • 1 Backfill(...)
  • 3 Sqlish(...)
  • 220 Delta(...)
    This progress looks :) because there were failed tasks but they all succeeded in a retry
    ===== Luigi Execution Summary =====

2019-03-13 19:23:01,742 - main - ERROR - Failed to run pipeline: ApplyException Disdat Apply ran, but one or more tasks failed.

Disdat loses index when returning DataFrame

When a DataFrame with an index that is not standard (0, 1, 2...) is returned from run, the original index is not present when the DataFrame is loaded into memory for the next task or accessed from the API. This issue can be recreated using the code below:

import disdat.api as dsdt
from disdat.pipe import PipeTask
import pandas as pd


class TestIndex(PipeTask):
    def pipe_requires(self, pipeline_input=None):
        self.set_bundle_name('test_index')

    def pipe_run(self, pipeline_input=None):

        data = {
            'a': [2, 3, 4],
            'b': [5, 6, 7]
        }

        df = pd.DataFrame(data)
        print 'Index should be 0, 1, 2'
        print df

        df.index = [7, 8, 9]
        print 'Index should be 7, 8, 9'

        print df

        return df

if __name__ == '__main__':
    dsdt.apply('tt', '-', '-', 'TestIndex', params={}, force=True)
    print dsdt.search('tt', 'test_index')[0].data.index.values

This code will correctly update and print the index before it returns, however, once the DataFrame is retrieved by disdat, the index is back to the default index.

Output:

2018-12-18 15:24:25,151 - luigi-interface - INFO - Loaded []
2018-12-18 15:24:25,175 - luigi-interface - INFO - Informed scheduler that task   DriverTask_True______a92c94fa32   has status   PENDING
2018-12-18 15:24:25,175 - luigi-interface - INFO - Informed scheduler that task   TestIndex_None_None____71e4869f25   has status   PENDING
2018-12-18 15:24:25,175 - luigi-interface - INFO - Done scheduling tasks
2018-12-18 15:24:25,175 - luigi-interface - INFO - Running Worker with 1 processes
2018-12-18 15:24:25,176 - luigi-interface - INFO - [pid 53226] Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) running   TestIndex(closure_bundle_proc_name_root=None, closure_bundle_uuid_root=None, output_tags={})
Index should be 0, 1, 2
   a  b
0  2  5
1  3  6
2  4  7
Index should be 7, 8, 9
   a  b
7  2  5
8  3  6
9  4  7
2018-12-18 15:24:25,211 - disdat.pipe_base - WARNING - __main__.TestIndex: Source file /Users/srowan/Development/ds/turbotiles/turbotiles/pipeline/test_index.py not under git version control
2018-12-18 15:24:25,237 - luigi-interface - INFO - [pid 53226] Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) done      TestIndex(closure_bundle_proc_name_root=None, closure_bundle_uuid_root=None, output_tags={})
2018-12-18 15:24:25,238 - luigi-interface - INFO - Informed scheduler that task   TestIndex_None_None____71e4869f25   has status   DONE
2018-12-18 15:24:25,239 - luigi-interface - INFO - [pid 53226] Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) running   DriverTask(input_bundle=-, output_bundle=-, pipe_params={}, pipe_cls=TestIndex, input_tags={}, output_tags={}, force=True)
2018-12-18 15:24:25,240 - luigi-interface - INFO - [pid 53226] Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) done      DriverTask(input_bundle=-, output_bundle=-, pipe_params={}, pipe_cls=TestIndex, input_tags={}, output_tags={}, force=True)
2018-12-18 15:24:25,240 - luigi-interface - INFO - Informed scheduler that task   DriverTask_True______a92c94fa32   has status   DONE
2018-12-18 15:24:25,245 - luigi-interface - INFO - Worker Worker(salt=715866385, workers=1, host=sdgl141c3d83b, username=srowan, pid=53226) was stopped. Shutting down Keep-Alive thread
2018-12-18 15:24:25,248 - luigi-interface - INFO - 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 DriverTask(...)
    - 1 TestIndex(...)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

Index: [0 1 2]

Support secrets in dockerizer

Baking in passwords, etc., into a docker image is generally a bad idea. We need a way to allow users to pass existing secrets through to their running containers.

small issue in Disdat Architecture and Design doc

Under "Bundle Collections", it says:
● status: Report current context and branch.
● branch: Create a new local branch from some context.
● checkout: Move to a different local branch.
● remote: Attach an s3 path to this local context.
But this doesn't match the text below, which describes "status", "context", "switch" , and "remote" comands.
Is there still such a thing as branches? I don't see it referenced anyplace else in the doc. Maybe there are just contexts?

dockerize creates container even if pip install fails

Issue: Sometimes pip will fail to install a dependency when installing user's source distribution. However the dockerize command continues on and produces a container. When trying to run the pipeline, the execution will complain that the user module or package cannot be found.
Possible solution: Check that failed docker file command propagates up to dockerize output.

dsdt remote does not attach a remote context as expected

I have a local context:

$ dsdt --version
Running Disdat version 0.9.13
$ dsdt context
*	test	[None@None]

I am trying to attach a remote context to an existing bucket with a non-existing key but I get a RuntimeError:

$ dsdt remote new s3://my-bucket/non-existing-key
Error code 404
Unable to bind context new because URL s3://my-bucket/non-existing-key does not exist.
Traceback (most recent call last):
  File "/mnt/data/.virtualenvs/F7DBigBang/bin/dsdt", line 11, in <module>
    load_entry_point('disdat==0.9.13', 'console_scripts', 'dsdt')()
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/dsdt.py", line 91, in main
    args.func(args)
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1579, in <lambda>
    remote_p.set_defaults(func=lambda args: _remote(fs, args))
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1363, in _remote
    fs.remote_add(args.context, args.s3_url)
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1348, in remote_add
    ctxt_obj.bind_remote_ctxt(remote_context, s3_url)
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/data_context.py", line 151, in bind_remote_ctxt
    raise RuntimeError
RuntimeError

If fails in the same way if the key exists:

$ touch test && aws s3 cp test s3://my-bucket/non-existing-key/test
upload: ./test to s3://my-bucket/non-existing-key/test
 $ dsdt remote new s3://my-bucket/non-existing-key
Error code 404
Unable to bind context new because URL s3://my-bucket/non-existing-key does not exist.
Traceback (most recent call last):
  File "/mnt/data/.virtualenvs/F7DBigBang/bin/dsdt", line 11, in <module>
    load_entry_point('disdat==0.9.13', 'console_scripts', 'dsdt')()
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/dsdt.py", line 91, in main
    args.func(args)
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1579, in <lambda>
    remote_p.set_defaults(func=lambda args: _remote(fs, args))
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1363, in _remote
    fs.remote_add(args.context, args.s3_url)
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/fs.py", line 1348, in remote_add
    ctxt_obj.bind_remote_ctxt(remote_context, s3_url)
  File "/mnt/data/.virtualenvs/F7DBigBang/lib/python3.6/site-packages/disdat/data_context.py", line 151, in bind_remote_ctxt
    raise RuntimeError
RuntimeError

It succeeds to attach a remote context if I point to the file that I uploaded for the previous test:

 $ dsdt remote new s3://my-bucket/non-existing-key/test
 $ dsdt context
*	test	[new@s3://my-bucket/non-existing-key/test]

Am I using it wrong? It doesn't seem to be the expected behavior.

api.push fails silently

if api.push is not able to connect and push to a remote, it simply prints (not logs) the error

rather, it should throw an exception that can be caught by the user.

Default remote context

When a user wants to create a remote, a user should supply some command that will create an S3 bucket to use as the remote. The user could provide parameters to point to an existing bucket if the user desires

OS X, multiple workers fork issue.

Problem: Run a task graph with one root requiring N upstream tasks each issuing a sql query, then use workers > 1.

objc[96714]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called.
objc[96714]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.
INFO: Task AnalystSingleHistoric____2019_02_27_analyst_historic_674f680a2c died unexpectedly with exit code -6

One can "fix" this by setting the variable below. However, this feels bad so recording.

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
see ansible/ansible#32499

Allow mix of local and remote files in bundle.

This is a feature request. Currently you must return all local files or all remote (s3) files in a bundle column. I.e., b.data = ["/some/file", "/some/otherfile"] is valid, but b.data = ["/some/file", "s3::/bucket/key/file"] is not. Note that it is currently possible to get around this limitation by returning separate bundle columns: b.data = {'local':["/local/file"], 'remote': ["s3://bucket/file"]} At a minimum we should provide a useful error message instead of silently failing, and consider supporting this mix. At the moment, one can use the Bundle object to selectively localize and delocalize file links, but this only works for pre-existing bundles. The above request would align that feature with creation as well.

Remote bundle delete

Allow dsdt rm to remove bundles on remote.
At the moment, dsdt ls plus aws s3 rm is the solution, but it's dangerous.

uncommitted history has unexpected implications for bundle naming

The uncommitted history currently maintains a maximum of N uncommitted bundles with the same human name. This can have the unintended consequence of deleting bundles needed by downstream tasks if users do not assign unique human names to the bundles. Note that, if the user does not set the human name in pipe_requires, then this shouldn't be an issue. But what we need to determine is a.) whether an uncommitted history is important to keep as a feature and b.) if so, is there a more intuitive mode of operation.

remove print statements in favor of logging

although much of disdat output uses python logging, there are still many print statements that make it difficult to filter out unwanted verbose output for users of disdat library

tasks that return `None` have un-presentable bundles

When a task returns None, it creates a bundle whose presentation is HF (i.e., HyperFrame) and which has zero frames (no named columns). When trying to present this bundle, an assertion checking for frames>=1 fails.

please add ExamplePipeline.py to examples/pipelines

Please add the example from Ken's Vimeo talk (ExamplePipeline.py) to examples/pipelines. It's nice since it's really simple. What worked for me is:

import numpy as np
from disdat.pipe import PipeTask

class GenData(PipeTask):

def pipe_requires(self, **kwargs):
    self.set_bundle_name("GenData")

def pipe_run(self, **kwargs):
    return np.array([77, 100, 88])

class Average(PipeTask):

def pipe_requires(self, **kwargs):
    params = [{}]
    self.add_dependency("my_input_data", GenData, {})

def pipe_run(self, my_input_data=None, **kwargs):
    print my_input_data, type(my_input_data)
    return {'average' : [np.average(my_input_data)]}

Please add "run" shell scripts to examples/pipelines folder

Can we please add a "run" shell script to each example py file in examples/pipelines?

For example, for "ExamplePipeline.py" mentioned in the previous issue, we would add a "run_ExamplePipeline.sh" shell script, with the following contents:

dsdt apply - average ExamplePipeline.Average

Having the run scripts is really helpful for a new person, rather than having to guess at what the apply parameters should be for each py script.

`ls` to list all bundles by default, instead of final bundles

Currently dsdt ls lists only the final output bundles of pipelines. One must use dsdt ls -i to see all the final and intermediate outputs. Propose dsdt ls -f to only print final outputs, and dsdt ls spills everything, and dsdt ls -i only spills intermediate.

references to 'dsdt checkout' command

There are still several messages that reference the 'dsdt checkout' command, which I believe is deprecated:

$ grep -r --include *.py 'dsdt checkout' .
./build/lib/disdat/fs.py: print "No current local context, please change context with 'dsdt checkout'"
./build/lib/disdat/fs.py: print("Disdat on context {}, please 'dsdt checkout ' before deleting.".format(local_context))
./disdat/fs.py: print "No current local context, please change context with 'dsdt checkout'"
./disdat/fs.py: print("Disdat on context {}, please 'dsdt checkout ' before deleting.".format(local_context))

fix error reporting when no aws region

Disdat checks for the ability to find dynamo, but that requires an aws region. For now, we don't use dynamo, but we're also spitting out an error that the region isn't found. We need to simply remove this message.

db table link fails on pull

2018-10-10 20:09:28,680 - disdat.data_context - ERROR - Unable to use a string-based database reference[db://Analytics.DATA_SCIENCE.DISDAT_careops3_tto_daily_forecast_blend_77e7ddf1fac24617984f19eac1ce0338@pprddaavth-vip.ie.intuit.net], please return DBTarget object instead.
Traceback (most recent call last):
File "/Users/crivera5/.virtualenvs/careops/bin/dsdt", line 11, in
sys.exit(main())
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/dsdt.py", line 122, in main
args.func(args)
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/fs.py", line 1727, in
pull_p.set_defaults(func=lambda args: _pull(fs, args))
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/fs.py", line 1565, in _pull
fs.pull(bundle, uuid, localize=args.localize)
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/fs.py", line 1474, in pull
s3_uuid, data_context)
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/fs.py", line 1312, in _localize_hfr
DataContext.copy_in_files(f, managed_path)
File "/Users/crivera5/.virtualenvs/careops/lib/python2.7/site-packages/disdat/data_context.py", line 1116, in copy_in_files
raise Exception("data_context:copy_in_files error trying to copy in string-based database reference.")
Exception: data_context:copy_in_files error trying to copy in string-based database reference.

fix logging for bundle error cleanups

i get this a lot when i have an error (on my side) creating a bundle:

MOONSCAPE INFO - creating bundle "inputs" with job_id=e549985059fa49008b3ecbaa239b8191
disdat ERROR - Removal of hyperframe directory 7edd576a-fd35-482f-917b-47dae6f12559 failed with error [Errno 2] No such file or directory: '/Users/jcessna/.disdat/context/moonscape-dev/objects/7edd576a-fd35-482f-917b-47dae6f12559'. Continuing removal...

it seems like if there is an error, disdat is trying to do some cleanup, but then it logs an error if the hyperframe directory cannot be found… probably because it wasn’t created before the error happened.
this should not be a disdat error. it is confusing and misleading. instead, can there be some type of log message that says something like… user error, cleaning up corrupted bundle. and then maybe a warning if it doesn’t find things…?

remove pipeline input

All tasks in a pipeline (and requires) have an optional pipeline_inputs parameter. However, this is a.) not often used b.) can be emulated by passing the bundle name in a parameter and passing through via std. Luigi patterns.

`dsdt run` local container to not require remote

Currently dsdt run requires a remote to be set when running container locally. Instead, only require a remote if they either push or pull (whether or not it runs locally or remotely).
Also change default behavior when running locally to no-push and no-pull, just as if the user called dsdt apply

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.