Git Product home page Git Product logo

weather-tools's Introduction

weather-tools

Apache Beam pipelines to make weather data accessible and useful.

CI Documentation Status

Introduction

This project contributes a series of command-line tools to make common data engineering tasks easier for researchers in climate and weather. These solutions were born out of the need to improve repeated work performed by research teams across Alphabet.

The first tool created was the weather downloader (weather-dl). This makes it easier to ingest data from the European Center for Medium Range Forecasts (ECMWF). weather-dl enables users to describe very specifically what data they'd like to ingest from ECMWF's catalogs. It also offers them control over how to parallelize requests, empowering users to retrieve data efficiently. Downloads are driven from a configuration file, which can be reviewed (and version-controlled) independently of pipeline or analysis code.

We also provide two additional tools to aid climate and weather researchers: the weather mover (weather-mv) and the weather splitter (weather-sp). These CLIs are still in their alpha stages of development. Yet, they have been used for production workflows for several partner teams.

We created the weather mover (weather-mv) to load geospatial data from cloud buckets into Google BigQuery. This enables rapid exploratory analysis and visualization of weather data: From BigQuery, scientists can load arbitrary climate data fields into a Pandas or XArray dataframe via a simple SQL query.

The weather splitter (weather-sp) helps normalize how archival weather data is stored in cloud buckets: Whether you're trying to merge two datasets with overlapping variables — or, you simply need to open Grib data from XArray, it's really useful to split datasets into their component variables.

Installing

It is currently recommended that you create a local python environment (with Anaconda) and install the sources as follows:

conda env create --name weather-tools --file=environment.yml
conda activate weather-tools

Note: Due to its use of 3rd-party binary dependencies such as GDAL and MetView, weather-tools is transitioning from PyPi to Conda for its main release channel. The instructions above are a temporary workaround before our Conda-forge release.

From here, you can use the weather-* tools from your python environment. Currently, the following tools are available:

  • weather-dl (beta) – Download weather data (namely, from ECMWF's API).
  • ⛅️ weather-mv (alpha) – Load weather data into analytics engines, like BigQuery.
  • 🌪 weather-sp (alpha) – Split weather data by arbitrary dimensions.

Quickstart

In this tutorial, we will download the Era 5 pressure level dataset and ingest it into Google BigQuery using weather-dl and weather-mv, respectively.

Prerequisites

  1. Register for a license from ECMWF's Copernicus (CDS) API.
  2. Install your license by copying your API url & key from this page to a new file $HOME/.cdsapirc.1 The file should look like this:
    url: https://cds.climate.copernicus.eu/api/v2
    key: <YOUR_USER_ID>:<YOUR_API_KEY>
    
  3. If you do not already have a Google Cloud project, create one by following these steps. If you are working on an existing project, make sure your user has the BigQuery Admin role. To learn more about granting IAM roles to users in Google Cloud, visit the official docs.
  4. Create an empty BigQuery Dataset. This can be done using the Google Cloud Console or via the bq CLI tool. For example:
    bq mk --project_id=$PROJECT_ID $DATASET_ID
  5. Follow these steps to create a bucket for staging temporary files in Google Cloud Storage.

Steps

For the purpose of this tutorial, we will use your local machine to run the data pipelines. Note that all weather-tools can also be run in Cloud Dataflow which is easier to scale and fully managed.

  1. Use weather-dl to download the Era 5 pressure level dataset.

    weather-dl configs/era5_example_config_local_run.cfg \
       --local-run # Use the local machine

    Recommendation: Pass the -d, --dry-run flag to any of these commands to preview the effects.

    NOTE: By default, local downloads are saved to the ./local_run directory unless another file system is specified. The recommended output location for weather-dl is Cloud Storage. The source and destination of the download are configured using the .cfg configuration file which is passed to the command. To learn more about this configuration file's format and features, see this reference. To learn more about the weather-dl command, visit here.

  2. (optional) Split your downloaded dataset up with weather-sp:

     weather-sp --input-pattern "./local_run/era5-*.nc" \
        --output-dir "split_data" 

    Visit the weather-sp docs for more information.

  3. Use weather-mv to ingest the downloaded data into BigQuery, in a structured format.

    weather-mv bigquery --uris "./local_run/**.nc" \ # or "./split_data/**.nc" if weather-sp is used
       --output_table "$PROJECT.$DATASET_ID.$TABLE_ID" \ # The path to the destination BigQuery table
       --temp_location "gs://$BUCKET/tmp" \  # Needed for stage temporary files before writing to BigQuery
       --direct_num_workers 2

    See these docs for more about the weather-mv command.

That's it! After the pipeline is completed, you should be able to query the ingested dataset in BigQuery SQL workspace and analyze it using BigQuery ML.

Contributing

The weather tools are under active development, and contributions are welcome! Please check out our guide to get started.

License

This is not an official Google product.

Copyright 2021 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Footnotes

  1. Note that you need to be logged in for the CDS API page to actually show your user ID and API key. Otherwise, it will display a placeholder, which is confusing to some users.

weather-tools's People

Contributors

alxmrs avatar aniketsinghrawat avatar arunsathiya avatar blackvvine avatar cillianfn avatar dabhicusp avatar darshansp19 avatar deepgabani8 avatar dependabot[bot] avatar dlowell12 avatar j9sh264 avatar ksic8 avatar lakshmanok avatar mahrsee1997 avatar mt467 avatar pbattaglia avatar piyush-ingale avatar pramodg avatar pranay101 avatar raspstephan avatar roshinifernando avatar sangamswadik avatar saveriogzz avatar seancampbell avatar sgreenberg avatar uhager avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

weather-tools's Issues

Publish weather-tools on PyPi

This will help with distribution of the tool. Python users would only have to run pip install weather-tools to get the source and updates.

`weather-mv`: Automatic database migrations

As a user, when I run a newer version of weather-mv on a dataset, and the schema of the BigQuery database has changed (i.e. changed wrt to the dataset), I would like weather-mv to automatically migrate the table. I need this, as a user, so I don't my data pipelines don't break when new versions of weather-mv are released, and so I can get the latest tool features.

Last, as a user, I would like some control over weather migrations are applied, and what migrations to make. Ideally, the CLI should let me opt out of migrations if I know best. Otherwise, I want the tools to default to the recommended case.

Add support for dry-runs to `weather-mv`.

A helpful step towards fixing #21.

ticket updated: 2022-03-24

As a weather tools user, I would like to be able to preview the effect of each tool before incurring the cost of data movement and infrastructure. These light-weight previews will help me test pipelines before deployment, lowering the number of iterations needed to set up a data pipeline. For this issue, I want to be able to perform dry runs with the weather mover.

Acceptance Criteria

  • Provide a common interface for exercising dry runs for every Data Sink
  • When a user passes the -d or --dry-run flag to the weather-mv cli, this feature will be activated.
  • When a user checks tool documentation (the README or CLI help message), they will have a good understanding of what the feature does
  • As a user, I will still have some way to monitor the execution flow of the tool during a dry run
    • Log messages from non-dry runs will remain the same as those within a dry-run
    • As a user, I can inspect the kinds of messages that would have been written to BigQuery.
    • (optional) Maybe more log messages are needed to see what's happening?
  • As a user, I can execute dry runs locally or remotely on Dataflow
  • Where appropriate, data is simulated in memory. No data is written to disk or cloud storage during a dry run.
    • As a user, I still would like to validate execution on actual user-suppled URIs.
  • Where there are contradictions in requirements, the ergonomic option for weather-mv users is preferred.
  • All code should be completely covered by tests.

Implementation Notes

Problems downloading MARS data

I registered on ECMWF site (https://apps.ecmwf.int/registration/) and got an API key from https://api.ecmwf.int/v1/key/ and created ~/.ecmwfapirc as shown on the page.

When I run:

weather-dl mars_example_config.cfg --local-run

I get:

File "/opt/conda/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1635, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File "/opt/conda/lib/python3.7/site-packages/weather_dl/download_pipeline/pipeline.py", line 241, in fetch_data
    client.retrieve(dataset, selection, temp.name)
  File "/opt/conda/lib/python3.7/site-packages/weather_dl/download_pipeline/clients.py", line 172, in retrieve
    self.c.execute(req=selection, target=output)
  File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 645, in execute
    quiet=self.quiet,
  File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 418, in __init__
    info = self.connection.call("%s/%s/%s" % (self.url, self.service, "info")).get(
  File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 148, in wrapped
    return func(self, *args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 350, in call
    raise APIException("ecmwf.API error 1: %s" % (self.last["error"],))
ecmwfapi.api.APIException: "ecmwf.API error 1: User 'lak@...' has no access to services/mars [while running 'FetchData']"

Error in handling grib file

I have tried with A1Dxxxxxxxxx.bz2 file, extract_rows completed without error but writing to BQ shows the below error:

Traceback (most recent call last):
  File "/opt/conda/bin/weather-mv", line 74, in <module>
    cli(['--extra_package', pkg_archive])
  File "/opt/conda/lib/python3.7/site-packages/weather_mv/loader_pipeline/__init__.py", line 23, in cli
    run(sys.argv + extra)
  File "/opt/conda/lib/python3.7/site-packages/weather_mv/loader_pipeline/pipeline.py", line 363, in run
    custom_gcs_temp_location=known_args.temp_location)
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/pipeline.py", line 586, in __exit__
    self.result.wait_until_finish()
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1635, in wait_until_finish
    self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process
    self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 578, in wait_for_bq_job
    job_reference.jobId, job.status.errorResult))
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_43_6c0ec7fd665d0e72361b31b2c0dff157_57b5577d56954d57ab20e7d97c1fdca6 failed. Error Result: <ErrorProto
 location: 'gs://ecmwf-xxxxx/bq_load/10a305afebc345c7bff3632233370d22/xxxxxxx-xxx.xxxxx.ecmwf_realtime_test_bz2/4ed537d9-66f5-4e64-8927-95a0bfce62f3'
 message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'
 reason: 'invalid'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process
    self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 578, in wait_for_bq_job
    job_reference.jobId, job.status.errorResult))
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_43_6c0ec7fd665d0e72361b31b2c0dff157_57b5577d56954d57ab20e7d97c1fdca6 failed. Error Result: <ErrorProto
 location: 'gs://ecmwf-xxxx/bq_load/10a305afebc345c7bff3632233370d22/xxxxx-xxxx.xxxx.ecmwf_realtime_test_bz2/4ed537d9-66f5-4e64-8927-95a0bfce62f3'
 message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'
 reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/WaitForDestinationLoadJobs']

Exclude test data from tarball

Looking at the upload-to-pypi logs, it looks like testdata is included in the tarball (I see .grib and .nc files in the logs). This bug involves updating the MANIFEST.in to exclude testdata in the tars. That way, they will be lightweight / quick-to-install.

Improve speed of extracting rows

With a global forecast dataset, I've the need to extract data with ~3,244.6k coordinate values (time x lat x lng). As of today, it takes about ~9 seconds to extract 1k rows. So: 9*3243.6/60/60 = ~8 hours per file.

While in streaming pipelines, data is written as soon as it's available, we'd ideally like all of the data to be processed within ~1 hour, so the first forecast is actionable.

`weather-dl`: Out-of-order license distribution error

In the weather downloader, we underutilize licenses due to the current way they are distributed. Worse, if we use near the max number of queued workers (like we do by default for the MARS downloads), pipelines will fail with a "MaxAPIRequests" error from ECMWF.

My leading hypothesis about what's causing this is that our license distribution scheme depends on a particular order of future requests. However, we've made recent changes to better work with the dataflow autoscaling algorithm to parallelize download requests, which allocates the fetch_data step in a random order.

How to reproduce

  1. Run weather-dl with multiple licenses (CDS is fine).
  2. Look up the request queue for the client (the docstrings for the client have an appropriate URL) for each license / account.

Outcome: There will be an uneven distribution of download requests. Some may have duplicates while others don't have any work dispatched. This leads to underutilization of licenses and in some cases brittle pipelines.
Expected: Each worker should at least have their request limit maxed out (licenses fully utilized). Requests, in general, should be evenly distributed. Pipelines should never hit the "Too many requests" error.

Workarounds

The strategy to get a robust yet high utilization of downloads is to split data requests into multiple configs and run weather-dl twice (or multiple times). It especially helps to leverage the -n flag to limit the number of data requests to the access limit (for CDS, the defaults are currently fine. For MARS, set -n 2).

Potential fix

A good strategy would be to restructure the pipeline such that:

  • License creation is separate from config partitioning (i.e. different beam.Create() steps).
  • The two streams (licenses and partitions) get joined together (like a zip in Python) after a reshuffle so licenses are evenly distributed.

`weather-mv`: Support streaming conversions to BigQuery.

Use Case

As a user, I want to be able to load real-time forecasts from cloud storage to BigQuery. I want to do this in the simplest, most efficient computational means. For example, ECMWF disseminates data 4x per day into cloud storage buckets. Once an upload event occurs, I want data to automatically start ingesting into BigQuery.

Proposal

Support a new argument --topic that lets users specify a GCP PubSub topic of CREATE or FINALIZE events to a GCS bucket. This argument or --uris will be required flags. Usage of --topic will deploy a streaming dataflow job that subscribes to PubSub events, windows them into 1 minute buckets, and performs the extract_rows operation on incoming data.

Note: for other clouds, users can bridge other notification systems to GCP's PubSub in the short term. Apache Kafka support can come later.

Document common workflows of weather-tools

After #7 is complete, we should prepare a "cookbook" of common dev-ops workflows that use weather-tools. These should include:

  • A walkthrough of using each tool in context with each other on GCP.
  • How to use weather-dl to download priority data first.
  • How to schedule dataflow jobs in the future (if possible).
  • How to set up Alerts and monitoring Dashboards in GCP.
  • (reach) using a Cloud Montoring dashboard to view results from the Download Manifest.

TypeError: Type is not JSON serializable: numpy.float64

Ran the below command for one grib bz2 file for batch processing:

weather-mv --uris "gs://xxxxxx/A4D0113xxxxxxx.bz2" --output_table xxxx-xxx.xxxx.ecmwf_realtime_test_a4 --region us-central1 --runner DataflowRunner --setup_file ./setup.py --project xxxx-xxx --temp_location gs://xxxxxx-test/tmp

The table was not available in the BQ so the pipeline creates it on the go.

Below is the error traceback:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1256, in process
return self._flush_batch(destination)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1312, in _flush_batch
skip_invalid_rows=True)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1136, in insert_rows
for r in rows
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1136, in
for r in rows
TypeError: Type is not JSON serializable: numpy.float64 [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']

Publish Sphinx docs to webpage

Users should be able to read documentation without looking at the source repository. We can either use Github pages or readthedocs. I have a preference for the latter.

line 1099, in read return self._sslobj.read(len, buffer) socket.timeout: The read operation timed out

After running weather-dl for a long time (~25 days), I hit this error:

line 1099, in read return self._sslobj.read(len, buffer) socket.timeout: The read operation timed out

My suspicion is that reading or writing large files to GCS is timing out. While we have retries for this, we filter out certain errors for retrying, including this one. A fix for this might look like revisiting that retry logic and making sure to include these type of timeout errors.

Unit tests are running slow / CI takes a long time.

I observe that our CI takes a long time to complete. It is early in the lifetime of this project for our unit tests to be slow (certainly, the codebase is small enough). Having fast tests and an expedient CI system increases the pace of development & bugfixes.

This bug involves investigating why our unit tests takes a long time to run. This may be related to #53.

`weather-dl`: Document the -n argument

In an oversight, a recent feature of weather-dl has not been documented. Let's make known to readers of the tool's README that the flag exists and describe what it does. Further, let's add a usage example.

Splitter template syntax is not expressive enough for backwards compatability.

I used the weather splitter on a file system before it had templates, and now in its current version. The original file system I have looks like:
.../2020/202001_hres_pcp.grb2_surface_d2m.grib

Using the new templating system in the weather splitter, I cannot format a template string to match this file path. This is because I don't have control over the position of the level and short_name variables. It seems like the templating system is oriented towards setting variable file paths instead of templating the variable and level placement in the string. The latter templating options are more needed than the former.

Fix Windows setup

Issue:
When running the following setup command on Windows 10 (Python 3.8.5),
pip install -e ."[dev]"

Getting the following error
ERROR: Command errored out with exit status 1: command: 'C:\Users\Cillian\Anaconda3\envs\py385\python.exe' -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'D:\\Projects\\weather-tools\\weather-tools\\setup.py'"'"'; __file__='"'"'D:\\Projects\\weather-tools\\weather-tools\\setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base 'C:\Users\Cillian\AppData\Local\Temp\pip-pip-egg-info-ewy709ue' cwd: D:\Projects\weather-tools\weather-tools\ Complete output (7 lines): Traceback (most recent call last): File "<string>", line 1, in <module> File "D:\Projects\weather-tools\weather-tools\setup.py", line 66, in <module> long_description=open('README.md', 'r').read(), File "C:\Users\Cillian\Anaconda3\envs\py385\lib\encodings\cp1252.py", line 23, in decode return codecs.charmap_decode(input,self.errors,decoding_table)[0] UnicodeDecodeError: 'charmap' codec can't decode byte 0x8f in position 2775: character maps to <undefined> ----------------------------------------

Explanation of issue: https://dev.to/methane/python-use-utf-8-mode-on-windows-212i

Expected:
Run command with no issue

Fix:
Update the following line https://github.com/google/weather-tools/blob/main/setup.py#L66 to include encoding
long_description=open('README.md', 'r', encoding='utf-8').read(),

@alxmrs I have a branch locally with this fix, happy to put up a PR but I seem to not have write permissions for this repo. Would appreciate getting those as I'd like to make more (interesting 😉 ) contributions in the future!

Extend CI to test example configs are valid

Hey @CillianFn and @pranay101, this issue was inspired by #52 and #47. If either of you would like to pick this up, that would be a great help!

To summarize: we should start treating our configs like code and include them in our automated checks. For this issue to be fixed, we should add to our CI build to test that all the example configs parse correctly (check for correct syntax). For now, I think it's sufficient to run each config with the -d, --dry-run flag. Later, we can check that the semantics of the configs are correct.

Skipping variable error

Trying to spin up a streaming dataflow job with the below command:

weather-mv --uris "gs://ecmwf-xxxxx/A1D*.bz2"  \ 
  --output_table xxxx-xx.xxxx.xxxx  \
  --topic "projects/xxxx/topics/ecmwf-xxxxx" \
  --runner DataflowRunner   \
  --project xxx-xxxx   \
  --region us-central1   \
  --temp_location gs://xxxxx-test \
  --job_name ecmwf-xxxxxx-test2

Below is the error. The dataflow job hasn't been created.

image

Limit grib in-memory copy

Add a command line flag to control whether in-memory copy should be enabled or not.

If disabled grib files should set cache=True during open_dataset() to speed up reads.

`weather-mv`: Errors inserting into BigQuery during streaming uploads.

I just noticed in an e2e run, the pipeline doesn't fail but issues warnings about failure to insert data into BigQuery:

There were errors inserting to BigQuery. Will retry. Errors were [{'index': 0, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 1, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 2, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 3, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 4, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 5, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 6, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 7, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 8, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 9, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 10, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 11, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 12, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 13, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 14, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 15, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 16, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 17, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 18, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 19, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 20, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 21, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 22, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 23, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 24, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}]

I think the issue is this logic in extract_rows:

row[c] = to_json_serializable_type(ensure_us_time_resolution(row_ds[c].values))
These lines don't match anything in the schema.

Unable to run command line tools on Windows

This might be just an issue for me on Windows but I am unable to run the command line tools after installing them via
pip install git+http://github.com/google/weather-tools.git#egg=weather-tools

E.g.
C:\Users\Cillian>weather-dl configs/era5_example_config_local_run.cfg --local-run 'weather-dl' is not recognized as an internal or external command, operable program or batch file.

Edition: Windows 10 Home
Version: 21H1
OS build 19043.1415

64-bit Operating System

Python version: 3.8.5

IO backends error

Ran the below command of weather-mv to import a batch of grib bz2 files into BQ.

weather-mv --uris "gs://xxxxx/A4D0113*.bz2" --output_table xxxx.xxxxx.ecmwf_realtime_test_a4 --runner DataflowRunner --project xxxxxx --temp_location gs://xxxxx/tmp --job_name ecmwf_xxxxx_test_a4

Traceback (most recent call last): File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 870, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 238, in extract_rows with open_dataset(uri) as ds: File "/usr/local/lib/python3.7/contextlib.py", line 112, in enter return next(self.gen) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 94, in open_dataset xr_dataset: xr.Dataset = __open_dataset_file(dest_file.name) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 62, in __open_dataset_file return xr.open_dataset(filename) File "/usr/local/lib/python3.7/site-packages/xarray/backends/api.py", line 479, in open_dataset engine = plugins.guess_engine(filename_or_obj) File "/usr/local/lib/python3.7/site-packages/xarray/backends/plugins.py", line 155, in guess_engine raise ValueError(error_msg) ValueError: did not find a match in any of xarray's currently installed IO backends ['scipy']. Consider explicitly selecting one of the installed engines via the engine parameter, or installing additional IO dependencies, see: http://xarray.pydata.org/en/stable/getting-started-guide/installing.html http://xarray.pydata.org/en/stable/user-guide/io.html During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 870, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 238, in extract_rows with open_dataset(uri) as ds: File "/usr/local/lib/python3.7/contextlib.py", line 112, in enter return next(self.gen) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 94, in open_dataset xr_dataset: xr.Dataset = __open_dataset_file(dest_file.name) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 62, in __open_dataset_file return xr.open_dataset(filename) File "/usr/local/lib/python3.7/site-packages/xarray/backends/api.py", line 479, in open_dataset engine = plugins.guess_engine(filename_or_obj) File "/usr/local/lib/python3.7/site-packages/xarray/backends/plugins.py", line 155, in guess_engine raise ValueError(error_msg) ValueError: did not find a match in any of xarray's currently installed IO backends ['scipy']. Consider explicitly selecting one of the installed engines via the engine parameter, or installing additional IO dependencies, see: http://xarray.pydata.org/en/stable/getting-started-guide/installing.html http://xarray.pydata.org/en/stable/user-guide/io.html [while running 'ExtractRows']

Make CLI arguments more uniform.

All CLIs should follow the same conventions, including:

  • snake case or kebab case flags
  • Common argument (log verbosity, dry-runs, local runs, etc.)
  • interactions with Dataflow

`weather-sp`: Support arbitrary hypercube splits

Our initial use case of weather-sp was to split monolithic files by variable. Now, we've encountered a case where we'd like to split atmospheric data by model level.

To better generalize to all future uses of the splitter, let's re-design the tool to support arbitrary divisions of the data hypercube. This, ultimately, would help us improve usage of the weather-dl tool: If we didn't have to worry about the final file structure (because we could make it uniform), we could better optimize downloads. Specifically, we can get as much data as we can under the memory limit, which is ~75 GB for MARS as of writing.

`weather-mv`: Add columns to Schema for faster geolocation queries

A suggestion from @lakshmanok:

It would be good to if weather-mv were to add a ST_GeogPoint(longitude, latitude) column instead of (or in addition to) the longitude, latitude columns. This precomputes the S2 location and makes querying faster.

Even better if the column was a polygon with the extent of the pixel. That way ST_INTERSECTS etc. will also work

ticket updated: 2022-03-24

Acceptance Criteria

  • Users of the resulting BigQuery table produced by weather-mv can make use of the Geography capabilities of BigQuery.
    • Every row of the BigQuery table (and thus, all the available variables of that row) has a column of type GEOGRAPHY, which uses "point geography" from the raw data's lat/lng data.
  • Users can still acquire the underling float values for latitude and longitude.

Implementation Notes

  • One approach for implementing this feature would be to use BQ's SQL functions to add perform the conversion to GEOGRAPHY type. Namely, ST_GEOPOINT is a function that can take lat/lng FLOAT64 columns and create a GEOGRAPHY Point value from it.
  • Another simple approach would be to handle the Geography conversion from lat/lng in Python (instead of SQL). The BQ Geospatial docs list at least two pathways to do this: Either by writing WKT / WKB data or by writing in the GeoJSON spec.
  • Please note that the GEOGRAPHY values must fall within certain lat/lng ranges. This means that data outside of these ranges will need to be converted to those ranges (e.g. something like this – see the example section).
  • This will likely require a 2 part code change.

Extended feature to delight users

Even better if the column was a polygon with the extent of the pixel. That way ST_INTERSECTS etc. will also work

Implementing extra columns to include the polygon of the (grid) area where the values are relevant is a great bonus feature. However, it will not be possible for all types of input data. This experience is only possible if the XArray DataFrame includes extra information, like a coordinate or variable attribute.

For now, while we implement the first part of this ticket, let's also investigate if such metadata exists on our happy-path data sources. If it does, we'll create a follow up ticket to implement this.

Acceptance Criteria

  • Investigate the underlying real-time data source in XArray to determine if variables along coordinates are associated with a specific area.
  • Demonstrate results in an interactive report (e.g. a Colab notebook).
  • If this is possible with the data, create a design doc to further extend the BQ ingestion to include this feature, taking into account edge cases (the metadata is not available, users may want to override these columns, complexity in types of grid, etc.).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.