Git Product home page Git Product logo

hysds's Introduction

Background

For more information, check out:

HySDS

CircleCI

Core component for the Hybrid Science Data System

Prerequisites

  • pip 9.0.1+
  • setuptools 36.0.1+
  • virtualenv 1.10.1+
  • prov-es 0.1.1+
  • osaka 0.0.1+
  • hysds-commons 0.1+

Installation

  1. Create virtual environment and activate:
virtualenv env
source env/bin/activate
  1. Update pip and setuptools:
pip install -U pip
pip install -U setuptools
  1. Install prov-es:
git clone https://github.com/pymonger/prov_es.git
cd prov_es
pip install .
cd ..
  1. Install hysds:
pip install -r requirements.txt
git clone https://github.com/hysds/hysds.git
cd hysds
pip install .

hysds's People

Contributors

anaerobia avatar buggtb avatar dependabot[bot] avatar drewm-jpl avatar dustinklo avatar fgreg avatar hookhua avatar joshgarde avatar marjo-luc avatar mcayanan avatar mkarim2017 avatar pymonger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hysds's Issues

add datasets.json templating support for fields in dataset.json and met.json

Use fields defined in dataset's dataset.json or met.json in the dataset's template in datasets.json. For example:

  {
      "ipath": "smap::data/L0B_Radiometer",
      "version": "${crid}",
      "level": "l0",
      "type": "L0B_Radiometer",
      "id_template": "${id}",
      "match_pattern": "/(?P<id>SMAP_L0B_RADIOMETER_\\w+\\.bin)$",
      "alt_match_pattern": null,
      "extractor": "/path/to/core_extractor.py",
      "publish": {
        "location": "s3://{{ DATASET_S3_ENDPOINT }}:80/{{ DATASET_BUCKET }}/LOM/PRODUCTS/L0B_Radiometer/${met.SeriesID}/${year}/${month}/${day}/${id}",
        "urls": [
          "http://{{ DATASET_BUCKET }}.{{ DATASET_S3_WEBSITE_ENDPOINT }}/LOM/PRODUCTS/L0B_Radiometer/${met.SeriesID}/${year}/${month}/${day}/${id}",
          "s3://{{ DATASET_S3_ENDPOINT }}:80/{{ DATASET_BUCKET }}/LOM/PRODUCTS/L0B_Radiometer/${met.SeriesID}/${year}/${month}/${day}/${id}"
        ]
      },
      "browse": {
        "s3-profile-name": "default",
        "location":"s3://{{ DATASET_S3_ENDPOINT }}:80/{{ DATASET_BUCKET }}/LOM/PRODUCTS/L0B_Radiometer/${met.SeriesID}/${year}/${month}/${day}/${id}",
        "urls": [      
          "http://{{ DATASET_BUCKET }}.{{ DATASET_S3_WEBSITE_ENDPOINT }}/LOM/PRODUCTS/L0B_Radiometer/${met.SeriesID}/${year}/${month}/${day}/${id}"
        ]
      }
    }

The ${met.SeriesID} would be templated from the SeriesID field in met.json. Similarly, fields from dataset.json would be templated as well, e.g. ${dataset.label}.

give tosca and figaro on-demand jobs their own queue

On-demand jobs submitted from tosca and figaro could be waiting behind many queued jobs on user_rules_dataset and user_rules_job queues, respectively. Task is to give them their own set of queues and workers, e.g. on_demand_dataset and on_demand_job.

HotFix: time value: unit is missing or unrecognized error

[2023-06-15 12:35:08,714: INFO/ForkPoolWorker-1] Backing off search_es(...) for 9.5s (elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', 'failed to parse setting [timeout] with value [30] as a time value: unit is missing or unrecognized'))
[2023-06-15 12:35:18,246: WARNING/ForkPoolWorker-1] POST http://100.104.10.49:9200/grq/_search?timeout=30 [status:400 request:0.002s]
[2023-06-15 12:35:18,246: INFO/ForkPoolWorker-1] Backing off search_es(...) for 48.7s (elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', 'failed to parse setting [timeout] with value [30] as a time value: unit is missing or unrecognized'))

user/password should not be passed on command line to sync_ec2* scripts

The sync_ec2 scripts need to communicate with RabbitMQ admin console, which requires authentication.

Currently, they will take the credentials via command line arguments, which means credentials are revealed to anyone who view processes on Mozart.

It would be better if the credentials were read in from a config file or a .netrc file.

add feature to override harikiri shutdown on verdi job execution

Currently:

  1. harikiri.py will see that no jobs came in for the last 10 minutes and flags the instance to terminate
  2. harikiri.py does a random sleep to smear out thundering herd api calls to AWS
  3. during the sleep, a job is queued that the verdi worker is pulling from and start processing that job
  4. after waking from the sleep, harikiri.py gracefully shuts down all docker images and supervisord
  5. The shut down of docker images is caught by verdi and thus results in a job-failed with exit code 143

Task is to add feature at start of each job to check for and cancel harikiri if it's in the middle of termination.

ensure unique celery worker hostname

On Google Compute Engine, preempted instances may have lingering celery/rabbitmq connections to mozart rabbitmq. Replacement instances will get the same IP address and when verdi starts up and this results in this error:

[2018-12-21 23:02:44,596: INFO/MainProcess] Connected to amqp://guest:**@10.138.0.2:5672//
[2018-12-21 23:02:44,619: WARNING/MainProcess] /home/ops/verdi/lib/python2.7/site-packages/kombu/pidbox.py:75: UserWarning: A node named celery@grfn-job_worker-small.10.138.0.49 is already using this process mailbox!

Maybe you forgot to shutdown the other node or did not do so properly?
Or if you meant to start multiple nodes on the same host please make sure
you give each node a unique node name!

  warnings.warn(W_PIDBOX_IN_USE.format(node=self))
[2018-12-21 23:02:44,623: WARNING/MainProcess] celery@grfn-job_worker-small.10.138.0.49 ready.

When this worker picks up a job, it gets redelivered via RabbitMQ redelivery mechanism (no ack) and the job is orphaned in job-started state on figaro. When the job is picked up by another worker, the redelivered_job_dup() function will process it as a job-dedup but job_worker.py will not update ES. Pertinent change is something to the effect of:

    # redelivered job dedup
    if redelivered_job_dup(job):
        logger.info("Encountered duplicate redelivered job:%s" % json.dumps(job))
        job_status_json =  {
            'uuid': job['task_id'],
            'job_id': job['job_id'],
            'payload_id': payload_id,
            'payload_hash': payload_hash,
            'dedup': dedup,
            'status': 'job-deduped',
            'celery_hostname': run_job.request.hostname,
        }
        log_job_status(job_status_json)
        return job_status_json

improve disk usage of L1 caching feature

Currently, L1 cache will copy the file from cache to the work directory to ensure that modifications to the cache is not possible from the execution of a job. This can result in increased disk usage when caching very large files. Possible solution is to make the cached files read-only and hard-link them into the work directories but this doesn't guarantee immutability. Another option is to mount in each cached file into the docker container as read only.

add watchdog for job periodicity

Add watchdog script that will be run from cron and will monitor for successful job execution of configured job type and optionally job version. Will be able to notify via email or slack.

configure docker daemon to use dm.min_free_space to allow auto docker image purge

On verdi autoscale instance with 10GB docker volume, get the following error when 2 docker images totalling over 10GB are loaded:

Traceback (most recent call last):
  File "/home/ops/verdi/ops/hysds/hysds/job_worker.py", line 859, in run_job
    image_info = ensure_image_loaded(image_name, image_url, cache_dir_abs)
  File "/home/ops/verdi/ops/hysds/hysds/container_utils.py", line 133, in ensure_image_loaded
    raise(RuntimeError("Failed to load image %s (%s): %s" % (image_file, image_name, stderr)))
RuntimeError: Failed to load image /data/work/cache/container-hysds_lightweight-jobs:master.tar.gz (container-hysds_lightweight-jobs:master): devmapper: Thin Pool has 0 free data blocks which is less than minimum required 1944 free data blocks. Create more free space in thin pool or use dm.min_free_space option to change behavior

localize/load integration docker images up-front

Currently docker images are loaded after data localization. Should be done up front in case of errors and to prevent potential wasted cost when egressing out large numbers of dataset inputs (e.g. time-series).

add watchdog script to monitor for mozart jobs that have not successfully completed in timeout period

Use Case: Jobs may fail to run for a variety of reasons including verdi issues with docker, disk space, etc. or fail due to external dependencies such as remote data sources being down or changed their interfaces. We need the value-added ability for Mozart to automatically do first-order sanity check that a job type has at least successfully run (job-successful / exit code 0) in a given time period check.

Task is to add a cron-level script that periodically (e.g. one per hour) checks for specific JobSpec types/version that within the last n-seconds (varies by JobSpec), that Mozart at least had a job-successful instance. If none within this period, then notify ops (e.g. slack bot to channel and/or email to ops account).

Important points are that this should be kept for low-level and treated as core system that needs to run independently from mozart jobs in case that fails too. (e.g. "who polices the police?") So keeping this simple and lightweight is key.

bypass time.sleep(10) in evaluate_user_rules_[dataset|job]

we call the queue_dataset_evaluation celery task, which in turn will run time.sleep(10) before evaluation (for the ES index to refresh)
explanation of ES index refresh_interval

def queue_dataset_evaluation(info):
    """Queue dataset id for user_rules_dataset evaluation."""
    payload = {
        "type": "user_rules_dataset",
        "function": "hysds.user_rules_dataset.evaluate_user_rules_dataset",
        "args": [info["id"], info["system_version"]],
    }
    hysds.task_worker.run_task.apply_async((payload,), queue=app.conf.USER_RULES_DATASET_QUEUE)

https://github.com/hysds/hysds/blob/develop/hysds/user_rules_dataset.py#L159-L166

def evaluate_user_rules_dataset(
    objectid, system_version, alias=DATASET_ALIAS, job_queue=JOBS_PROCESSED_QUEUE
):
    """
    Process all user rules in ES database and check if this objectid matches.
    If so, submit jobs. Otherwise do nothing.
    """

    time.sleep(10)  # sleep for 10 seconds; let any documents finish indexing in ES
    ...

https://github.com/hysds/hysds/blob/develop/hysds/user_rules_dataset.py#L164

it can potentially cause a bottleneck if there are too many datasets in the queue because each worker has to wait 10 seconds before evaluation

maybe we can add an extra arg/kwarg to the evaluate_user_rules_dataset function and set it to the timestamp of when it queues the task

then evaluate_user_rules_dataset can get the time diff when the task was queued and when the task starts

  • if > 10 seconds, then no need to tiime.sleep(10)
  • else time.sleep(10)

it can speed up user rules evaluation, less sleep -> more open workers

also should we reduce the time sleep between each query?
https://github.com/hysds/hysds/blob/develop/hysds/user_rules_dataset.py#L115
maybe use a random jitter to speed things up a little more

time.sleep(random.uniform(0.3, 0.75))

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.