Apache Beam pipelines to make weather data accessible and useful.
This project contributes a series of command-line tools to make common data engineering tasks easier for researchers in climate and weather. These solutions were born out of the need to improve repeated work performed by research teams across Alphabet.
The first tool created was the weather downloader (weather-dl
). This makes it easier to ingest data from the European
Center for Medium Range Forecasts (ECMWF). weather-dl
enables users to describe very specifically what data they'd
like to ingest from ECMWF's catalogs. It also offers them control over how to parallelize requests, empowering users to
retrieve data efficiently. Downloads are driven from a
configuration file, which can be reviewed (and version-controlled) independently of pipeline or
analysis code.
We also provide two additional tools to aid climate and weather researchers: the weather mover (weather-mv
) and the
weather splitter (weather-sp
). These CLIs are still in their alpha stages of development. Yet, they have been used for
production workflows for several partner teams.
We created the weather mover (weather-mv
) to load geospatial data from cloud buckets
into Google BigQuery. This enables rapid exploratory analysis and visualization of
weather data: From BigQuery, scientists can load arbitrary climate data fields into a Pandas or XArray dataframe via a
simple SQL query.
The weather splitter (weather-sp
) helps normalize how archival weather data is stored in cloud buckets:
Whether you're trying to merge two datasets with overlapping variables — or, you simply need
to open Grib data from XArray, it's really useful to split datasets into
their component variables.
It is currently recommended that you create a local python environment (with Anaconda) and install the sources as follows:
conda env create --name weather-tools --file=environment.yml
conda activate weather-tools
Note: Due to its use of 3rd-party binary dependencies such as GDAL and MetView, weather-tools
is transitioning from PyPi to Conda for its main release channel. The instructions above
are a temporary workaround before our Conda-forge release.
From here, you can use the weather-*
tools from your python environment. Currently, the following tools are available:
- ⛈
weather-dl
(beta) – Download weather data (namely, from ECMWF's API). - ⛅️
weather-mv
(alpha) – Load weather data into analytics engines, like BigQuery. - 🌪
weather-sp
(alpha) – Split weather data by arbitrary dimensions.
In this tutorial, we will
download the Era 5 pressure level dataset
and ingest it into Google BigQuery using weather-dl
and weather-mv
, respectively.
- Register for a license from ECMWF's Copernicus (CDS) API.
- Install your license by copying your API url & key from this page to a new file
$HOME/.cdsapirc
.1 The file should look like this:url: https://cds.climate.copernicus.eu/api/v2 key: <YOUR_USER_ID>:<YOUR_API_KEY>
- If you do not already have a Google Cloud project, create one by following these steps. If you are working on an existing project, make sure your user has the BigQuery Admin role. To learn more about granting IAM roles to users in Google Cloud, visit the official docs.
- Create an empty BigQuery Dataset. This can be done using the Google Cloud Console or via the
bq
CLI tool. For example:bq mk --project_id=$PROJECT_ID $DATASET_ID
- Follow these steps to create a bucket for staging temporary files in Google Cloud Storage.
For the purpose of this tutorial, we will use your local machine to run the data pipelines. Note that all
weather-tools
can also be run in Cloud Dataflow which is easier to scale and fully managed.-
Use
weather-dl
to download the Era 5 pressure level dataset.weather-dl configs/era5_example_config_local_run.cfg \ --local-run # Use the local machine
Recommendation: Pass the
-d, --dry-run
flag to any of these commands to preview the effects.NOTE: By default, local downloads are saved to the
./local_run
directory unless another file system is specified. The recommended output location forweather-dl
is Cloud Storage. The source and destination of the download are configured using the.cfg
configuration file which is passed to the command. To learn more about this configuration file's format and features, see this reference. To learn more about theweather-dl
command, visit here.(optional) Split your downloaded dataset up with
weather-sp
:weather-sp --input-pattern "./local_run/era5-*.nc" \ --output-dir "split_data"
Visit the
weather-sp
docs for more information.Use
weather-mv
to ingest the downloaded data into BigQuery, in a structured format.weather-mv bigquery --uris "./local_run/**.nc" \ # or "./split_data/**.nc" if weather-sp is used --output_table "$PROJECT.$DATASET_ID.$TABLE_ID" \ # The path to the destination BigQuery table --temp_location "gs://$BUCKET/tmp" \ # Needed for stage temporary files before writing to BigQuery --direct_num_workers 2
See these docs for more about the
weather-mv
command.That's it! After the pipeline is completed, you should be able to query the ingested dataset in BigQuery SQL workspace and analyze it using BigQuery ML.
The weather tools are under active development, and contributions are welcome! Please check out our guide to get started.
This is not an official Google product.
Copyright 2021 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Footnotes
-
Note that you need to be logged in for the CDS API page to actually show your user ID and API key. Otherwise, it will display a placeholder, which is confusing to some users. ↩
weather-tools's People
Forkers
saveriogzz cillianfn lakshmanok tanishqackerman pranay101 philip-huma uhager vs132 3driada kazumaendo sammlung pbattaglia shriekdj physolia isabella232 fosstheory python-repository-hub tryweirdier sangamswadik bahmandar haim0n blackvvine chaitanya176 pinkdiamond1 roshinifernando khauta javi-ei jongwooo aniketsinghrawat piyush-ingale satish1872 dabhicusp sunilgitb alhridoy iharshit15 known-samy s50600822 kb5fls windsor718 thecodeofmontecristoweather-tools's Issues
`weather-sp`: Support arbitrary hypercube splits
Our initial use case of
weather-sp
was to split monolithic files by variable. Now, we've encountered a case where we'd like to split atmospheric data by model level.To better generalize to all future uses of the splitter, let's re-design the tool to support arbitrary divisions of the data hypercube. This, ultimately, would help us improve usage of the
weather-dl
tool: If we didn't have to worry about the final file structure (because we could make it uniform), we could better optimize downloads. Specifically, we can get as much data as we can under the memory limit, which is ~75 GB for MARS as of writing.Unit tests are running slow / CI takes a long time.
I observe that our CI takes a long time to complete. It is early in the lifetime of this project for our unit tests to be slow (certainly, the codebase is small enough). Having fast tests and an expedient CI system increases the pace of development & bugfixes.
This bug involves investigating why our unit tests takes a long time to run. This may be related to #53.
`weather-dl`: Implement BigQuery manifest
Similar to #13, except the manifest should be a BigQuery table. This makes sense if users choose to use
weather-mv
and want their data queryable from one place.Publish Sphinx docs to webpage
Users should be able to read documentation without looking at the source repository. We can either use Github pages or readthedocs. I have a preference for the latter.
Correct Link to Dataflow docs
In each tool, the documentation refers to dataflow options. Then, it links to docs on how to set programmatic pipeline options. Instead, it should link to these docs.
https://cloud.google.com/dataflow/docs/reference/pipeline-options
The weather splitter only matches `**` blobs, not `*` blobs
Splitter template syntax is not expressive enough for backwards compatability.
I used the weather splitter on a file system before it had templates, and now in its current version. The original file system I have looks like:
.../2020/202001_hres_pcp.grb2_surface_d2m.grib
Using the new templating system in the weather splitter, I cannot format a template string to match this file path. This is because I don't have control over the position of the
level
andshort_name
variables. It seems like the templating system is oriented towards setting variable file paths instead of templating the variable and level placement in the string. The latter templating options are more needed than the former.Exclude test data from tarball
Looking at the upload-to-pypi logs, it looks like testdata is included in the tarball (I see .grib and .nc files in the logs). This bug involves updating the MANIFEST.in to exclude testdata in the tars. That way, they will be lightweight / quick-to-install.
`weather-mv`: Errors inserting into BigQuery during streaming uploads.
I just noticed in an e2e run, the pipeline doesn't fail but issues warnings about failure to insert data into BigQuery:
There were errors inserting to BigQuery. Will retry. Errors were [{'index': 0, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 1, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 2, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 3, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 4, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 5, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 6, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 7, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 8, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 9, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 10, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 11, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 12, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 13, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 14, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 15, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 16, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 17, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 18, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 19, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 20, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 21, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 22, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 23, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 24, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}]
I think the issue is this logic in
These lines don't match anything in the schema.extract_rows
:Parse CDS Limits page to set data-specific worker limits.
line 1099, in read return self._sslobj.read(len, buffer) socket.timeout: The read operation timed out
After running weather-dl for a long time (~25 days), I hit this error:
line 1099, in read return self._sslobj.read(len, buffer) socket.timeout: The read operation timed out
My suspicion is that reading or writing large files to GCS is timing out. While we have retries for this, we filter out certain errors for retrying, including this one. A fix for this might look like revisiting that retry logic and making sure to include these type of timeout errors.
TypeError: Type is not JSON serializable: numpy.float64
Ran the below command for one grib bz2 file for batch processing:
weather-mv --uris "gs://xxxxxx/A4D0113xxxxxxx.bz2" --output_table xxxx-xxx.xxxx.ecmwf_realtime_test_a4 --region us-central1 --runner DataflowRunner --setup_file ./setup.py --project xxxx-xxx --temp_location gs://xxxxxx-test/tmp
The table was not available in the BQ so the pipeline creates it on the go.
Below is the error traceback:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1256, in process
return self._flush_batch(destination)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1312, in _flush_batch
skip_invalid_rows=True)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1136, in insert_rows
for r in rows
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1136, in
for r in rows
TypeError: Type is not JSON serializable: numpy.float64 [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']IO backends error
Ran the below command of weather-mv to import a batch of grib bz2 files into BQ.
weather-mv --uris "gs://xxxxx/A4D0113*.bz2" --output_table xxxx.xxxxx.ecmwf_realtime_test_a4 --runner DataflowRunner --project xxxxxx --temp_location gs://xxxxx/tmp --job_name ecmwf_xxxxx_test_a4
Traceback (most recent call last): File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 870, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 238, in extract_rows with open_dataset(uri) as ds: File "/usr/local/lib/python3.7/contextlib.py", line 112, in enter return next(self.gen) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 94, in open_dataset xr_dataset: xr.Dataset = __open_dataset_file(dest_file.name) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 62, in __open_dataset_file return xr.open_dataset(filename) File "/usr/local/lib/python3.7/site-packages/xarray/backends/api.py", line 479, in open_dataset engine = plugins.guess_engine(filename_or_obj) File "/usr/local/lib/python3.7/site-packages/xarray/backends/plugins.py", line 155, in guess_engine raise ValueError(error_msg) ValueError: did not find a match in any of xarray's currently installed IO backends ['scipy']. Consider explicitly selecting one of the installed engines via the
engine
parameter, or installing additional IO dependencies, see: http://xarray.pydata.org/en/stable/getting-started-guide/installing.html http://xarray.pydata.org/en/stable/user-guide/io.html During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 870, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 238, in extract_rows with open_dataset(uri) as ds: File "/usr/local/lib/python3.7/contextlib.py", line 112, in enter return next(self.gen) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 94, in open_dataset xr_dataset: xr.Dataset = __open_dataset_file(dest_file.name) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 62, in __open_dataset_file return xr.open_dataset(filename) File "/usr/local/lib/python3.7/site-packages/xarray/backends/api.py", line 479, in open_dataset engine = plugins.guess_engine(filename_or_obj) File "/usr/local/lib/python3.7/site-packages/xarray/backends/plugins.py", line 155, in guess_engine raise ValueError(error_msg) ValueError: did not find a match in any of xarray's currently installed IO backends ['scipy']. Consider explicitly selecting one of the installed engines via theengine
parameter, or installing additional IO dependencies, see: http://xarray.pydata.org/en/stable/getting-started-guide/installing.html http://xarray.pydata.org/en/stable/user-guide/io.html [while running 'ExtractRows']Skipping variable error
Trying to spin up a streaming dataflow job with the below command:
weather-mv --uris "gs://ecmwf-xxxxx/A1D*.bz2" \ --output_table xxxx-xx.xxxx.xxxx \ --topic "projects/xxxx/topics/ecmwf-xxxxx" \ --runner DataflowRunner \ --project xxx-xxxx \ --region us-central1 \ --temp_location gs://xxxxx-test \ --job_name ecmwf-xxxxxx-test2
Below is the error. The dataflow job hasn't been created.
Add license conditions and dataset attribution to the documentation.
A request from Emma Pidduck from ECMWF:
It would be great if you could add the licence conditions for each of datasets and also the attribution to the datasets, if possible.
CDS/ERA5 has a different licence to the Archive Catalogue: https://cds.climate.copernicus.eu/api/v2/terms/static/licence-to-use-copernicus-products.pdf
Archive catalogue licence & Terms of Use: https://apps.ecmwf.int/datasets/licences/general/Create example JSON config(s)
Similar to the
*.cfg
configuration file examples found in/configs
, create example JSON config(s)`weather-mv`: Create generic way to pass arguments to xarray.open_dataset
`weather-dl`: Implement PostgreSQL Manifest
Extend our manifest system to store download statuses in a relational table, namely in PostgreSQL dialect.
Optional: Document using the Postgres interface on top of Cloud Spanner: https://cloud.google.com/blog/topics/developers-practitioners/postgresql-interface-adds-familiarity-and-portability-cloud-spanner
Rename config concepts to better map into weather data ecosystem.
A lot of the names, like
partition_keys
, were selected independent of knowledge of weather data ecosystems (e.g. like terms used in XArray). In this issue, we need to come up with proper names in preparation for the 1.0 release.`weather-mv`: Automatic database migrations
As a user, when I run a newer version of
weather-mv
on a dataset, and the schema of the BigQuery database has changed (i.e. changed wrt to the dataset), I would likeweather-mv
to automatically migrate the table. I need this, as a user, so I don't my data pipelines don't break when new versions ofweather-mv
are released, and so I can get the latest tool features.Last, as a user, I would like some control over weather migrations are applied, and what migrations to make. Ideally, the CLI should let me opt out of migrations if I know best. Otherwise, I want the tools to default to the recommended case.
Generate weather-mv testdata
In the future, we should write a script to generate these testdata files. Checking them in slows down how fast we can clone the repo.
Originally posted by @alxmrs in #40 (comment)
Publish weather-tools on PyPi
This will help with distribution of the tool. Python users would only have to run
pip install weather-tools
to get the source and updates.Update config parsers handling of MARS range syntax to allow users to express reverse orders.
`weather-dl`: Benchmark performance.
Internal tracker: b/190200288
Document common workflows of weather-tools
After #7 is complete, we should prepare a "cookbook" of common dev-ops workflows that use weather-tools. These should include:
- A walkthrough of using each tool in context with each other on GCP.
- How to use weather-dl to download priority data first.
- How to schedule dataflow jobs in the future (if possible).
- How to set up Alerts and monitoring Dashboards in GCP.
- (reach) using a Cloud Montoring dashboard to view results from the Download Manifest.
Promote secure licenses handling by adding Secrets Manager support.
Internal tracker: b/173814654
Improve speed of extracting rows
With a global forecast dataset, I've the need to extract data with ~3,244.6k coordinate values (time x lat x lng). As of today, it takes about ~9 seconds to extract 1k rows. So:
9*3243.6/60/60
= ~8 hours per file.While in streaming pipelines, data is written as soon as it's available, we'd ideally like all of the data to be processed within ~1 hour, so the first forecast is actionable.
Thoroughly unit tests JSON Serialization
Check out TODO in
unit_test.py
.`weather-dl`: Out-of-order license distribution error
In the weather downloader, we underutilize licenses due to the current way they are distributed. Worse, if we use near the max number of queued workers (like we do by default for the MARS downloads), pipelines will fail with a "MaxAPIRequests" error from ECMWF.
My leading hypothesis about what's causing this is that our license distribution scheme depends on a particular order of future requests. However, we've made recent changes to better work with the dataflow autoscaling algorithm to parallelize download requests, which allocates the
fetch_data
step in a random order.How to reproduce
- Run
weather-dl
with multiple licenses (CDS is fine). - Look up the request queue for the client (the docstrings for the client have an appropriate URL) for each license / account.
Outcome: There will be an uneven distribution of download requests. Some may have duplicates while others don't have any work dispatched. This leads to underutilization of licenses and in some cases brittle pipelines.
Expected: Each worker should at least have their request limit maxed out (licenses fully utilized). Requests, in general, should be evenly distributed. Pipelines should never hit the "Too many requests" error.Workarounds
The strategy to get a robust yet high utilization of downloads is to split data requests into multiple configs and run
weather-dl
twice (or multiple times). It especially helps to leverage the-n
flag to limit the number of data requests to the access limit (for CDS, the defaults are currently fine. For MARS, set-n 2
).Potential fix
A good strategy would be to restructure the pipeline such that:
- License creation is separate from config partitioning (i.e. different
beam.Create()
steps). - The two streams (licenses and partitions) get joined together (like a
zip
in Python) after a reshuffle so licenses are evenly distributed.
Problems downloading MARS data
I registered on ECMWF site (https://apps.ecmwf.int/registration/) and got an API key from https://api.ecmwf.int/v1/key/ and created ~/.ecmwfapirc as shown on the page.
When I run:
weather-dl mars_example_config.cfg --local-run
I get:
File "/opt/conda/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1635, in <lambda> wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] File "/opt/conda/lib/python3.7/site-packages/weather_dl/download_pipeline/pipeline.py", line 241, in fetch_data client.retrieve(dataset, selection, temp.name) File "/opt/conda/lib/python3.7/site-packages/weather_dl/download_pipeline/clients.py", line 172, in retrieve self.c.execute(req=selection, target=output) File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 645, in execute quiet=self.quiet, File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 418, in __init__ info = self.connection.call("%s/%s/%s" % (self.url, self.service, "info")).get( File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 148, in wrapped return func(self, *args, **kwargs) File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 350, in call raise APIException("ecmwf.API error 1: %s" % (self.last["error"],)) ecmwfapi.api.APIException: "ecmwf.API error 1: User 'lak@...' has no access to services/mars [while running 'FetchData']"
Write guide to help users write efficient queries (downloads).
Extend CI to test example configs are valid
Hey @CillianFn and @pranay101, this issue was inspired by #52 and #47. If either of you would like to pick this up, that would be a great help!
To summarize: we should start treating our configs like code and include them in our automated checks. For this issue to be fixed, we should add to our CI build to test that all the example configs parse correctly (check for correct syntax). For now, I think it's sufficient to run each config with the
-d, --dry-run
flag. Later, we can check that the semantics of the configs are correct.Config values need to be typed on parsing.
The current config-value parser treats all values as strings. Instead, we need to try to infer types in a best-effort way. This is a pre-requisite step in order to fix #4.
`weather-mv`: Support streaming conversions to BigQuery.
Use Case
As a user, I want to be able to load real-time forecasts from cloud storage to BigQuery. I want to do this in the simplest, most efficient computational means. For example, ECMWF disseminates data 4x per day into cloud storage buckets. Once an upload event occurs, I want data to automatically start ingesting into BigQuery.
Proposal
Support a new argument
--topic
that lets users specify a GCP PubSub topic ofCREATE
orFINALIZE
events to a GCS bucket. This argument or--uris
will be required flags. Usage of--topic
will deploy a streaming dataflow job that subscribes to PubSub events, windows them into 1 minute buckets, and performs theextract_rows
operation on incoming data.Note: for other clouds, users can bridge other notification systems to GCP's PubSub in the short term. Apache Kafka support can come later.
Limit grib in-memory copy
Add a command line flag to control whether in-memory copy should be enabled or not.
If disabled grib files should set cache=True during open_dataset() to speed up reads.
JSON files do not support MARs range syntax.
Splitter dry runs should preview the output file, not the output scheme
The scheme or template is less powerful than reading the output that would happen from the input printed.
`weather-mv`: Add columns to Schema for faster geolocation queries
A suggestion from @lakshmanok:
It would be good to if weather-mv were to add a
ST_GeogPoint(longitude, latitude)
column instead of (or in addition to) the longitude, latitude columns. This precomputes the S2 location and makes querying faster.Even better if the column was a polygon with the extent of the pixel. That way
ST_INTERSECTS
etc. will also workticket updated: 2022-03-24
Acceptance Criteria
- Users of the resulting BigQuery table produced by
weather-mv
can make use of the Geography capabilities of BigQuery.- Every row of the BigQuery table (and thus, all the available variables of that row) has a column of type
GEOGRAPHY
, which uses "point geography" from the raw data's lat/lng data.
- Every row of the BigQuery table (and thus, all the available variables of that row) has a column of type
- Users can still acquire the underling float values for latitude and longitude.
Implementation Notes
- One approach for implementing this feature would be to use BQ's SQL functions to add perform the conversion to
GEOGRAPHY
type. Namely,ST_GEOPOINT
is a function that can take lat/lng FLOAT64 columns and create aGEOGRAPHY
Point value from it. - Another simple approach would be to handle the Geography conversion from lat/lng in Python (instead of SQL). The BQ Geospatial docs list at least two pathways to do this: Either by writing WKT / WKB data or by writing in the GeoJSON spec.
- Please note that the GEOGRAPHY values must fall within certain lat/lng ranges. This means that data outside of these ranges will need to be converted to those ranges (e.g. something like this – see the example section).
- This will likely require a 2 part code change.
- First, the BQ schema will need to be updated.
- Second, the processing of the data into Dicts / JSON rows will need to change.
Extended feature to delight users
Even better if the column was a polygon with the extent of the pixel. That way
ST_INTERSECTS
etc. will also workImplementing extra columns to include the polygon of the (grid) area where the values are relevant is a great bonus feature. However, it will not be possible for all types of input data. This experience is only possible if the XArray DataFrame includes extra information, like a coordinate or variable attribute.
For now, while we implement the first part of this ticket, let's also investigate if such metadata exists on our happy-path data sources. If it does, we'll create a follow up ticket to implement this.
Acceptance Criteria
- Investigate the underlying real-time data source in XArray to determine if variables along coordinates are associated with a specific area.
- Demonstrate results in an interactive report (e.g. a Colab notebook).
- If this is possible with the data, create a design doc to further extend the BQ ingestion to include this feature, taking into account edge cases (the metadata is not available, users may want to override these columns, complexity in types of grid, etc.).
Add support for dry-runs to `weather-mv`.
A helpful step towards fixing #21.
ticket updated: 2022-03-24
As a weather tools user, I would like to be able to preview the effect of each tool before incurring the cost of data movement and infrastructure. These light-weight previews will help me test pipelines before deployment, lowering the number of iterations needed to set up a data pipeline. For this issue, I want to be able to perform dry runs with the weather mover.
Acceptance Criteria
- Provide a common interface for exercising dry runs for every Data Sink
- When a user passes the
-d
or--dry-run
flag to theweather-mv
cli, this feature will be activated. - When a user checks tool documentation (the README or CLI help message), they will have a good understanding of what the feature does
- As a user, I will still have some way to monitor the execution flow of the tool during a dry run
- Log messages from non-dry runs will remain the same as those within a dry-run
- As a user, I can inspect the kinds of messages that would have been written to BigQuery.
- (optional) Maybe more log messages are needed to see what's happening?
- As a user, I can execute dry runs locally or remotely on Dataflow
- Where appropriate, data is simulated in memory. No data is written to disk or cloud storage during a dry run.
- As a user, I still would like to validate execution on actual user-suppled URIs.
- Where there are contradictions in requirements, the ergonomic option for
weather-mv
users is preferred. - All code should be completely covered by tests.
Implementation Notes
- The best place to provide a common interface for dry runs is
- Most code changes will happen in this class
`weather-dl`: Document the -n argument
In an oversight, a recent feature of
weather-dl
has not been documented. Let's make known to readers of the tool's README that the flag exists and describe what it does. Further, let's add a usage example.`weather-mv`: Add flag to control if grib data is opened in memory.
Can you add a flag so the user can control if the dataset should be opened in memory? I can see this tool being used on O(100GB) files, where users would like to turn this off.
Originally posted by @alxmrs in #74 (comment)
Unable to run command line tools on Windows
This might be just an issue for me on Windows but I am unable to run the command line tools after installing them via
pip install git+http://github.com/google/weather-tools.git#egg=weather-tools
E.g.
C:\Users\Cillian>weather-dl configs/era5_example_config_local_run.cfg --local-run 'weather-dl' is not recognized as an internal or external command, operable program or batch file.
Edition: Windows 10 Home
Version: 21H1
OS build 19043.141564-bit Operating System
Python version: 3.8.5
Error in handling grib file
I have tried with A1Dxxxxxxxxx.bz2 file, extract_rows completed without error but writing to BQ shows the below error:
Traceback (most recent call last): File "/opt/conda/bin/weather-mv", line 74, in <module> cli(['--extra_package', pkg_archive]) File "/opt/conda/lib/python3.7/site-packages/weather_mv/loader_pipeline/__init__.py", line 23, in cli run(sys.argv + extra) File "/opt/conda/lib/python3.7/site-packages/weather_mv/loader_pipeline/pipeline.py", line 363, in run custom_gcs_temp_location=known_args.temp_location) File "/opt/conda/lib/python3.7/site-packages/apache_beam/pipeline.py", line 586, in __exit__ self.result.wait_until_finish() File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1635, in wait_until_finish self) apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 578, in wait_for_bq_job job_reference.jobId, job.status.errorResult)) RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_43_6c0ec7fd665d0e72361b31b2c0dff157_57b5577d56954d57ab20e7d97c1fdca6 failed. Error Result: <ErrorProto location: 'gs://ecmwf-xxxxx/bq_load/10a305afebc345c7bff3632233370d22/xxxxxxx-xxx.xxxxx.ecmwf_realtime_test_bz2/4ed537d9-66f5-4e64-8927-95a0bfce62f3' message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.' reason: 'invalid'> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0) File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 578, in wait_for_bq_job job_reference.jobId, job.status.errorResult)) RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_43_6c0ec7fd665d0e72361b31b2c0dff157_57b5577d56954d57ab20e7d97c1fdca6 failed. Error Result: <ErrorProto location: 'gs://ecmwf-xxxx/bq_load/10a305afebc345c7bff3632233370d22/xxxxx-xxxx.xxxx.ecmwf_realtime_test_bz2/4ed537d9-66f5-4e64-8927-95a0bfce62f3' message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.' reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/WaitForDestinationLoadJobs']
Absence of partition_keys in config file should not cause an exception
As discussed , it should be possible to not specify a partition key and download all data as one chunk
Currently the absence of
partition_keys
in the config file causes aKeyError
Fix Windows setup
Issue:
When running the following setup command on Windows 10 (Python 3.8.5),
pip install -e ."[dev]"
Getting the following error
ERROR: Command errored out with exit status 1: command: 'C:\Users\Cillian\Anaconda3\envs\py385\python.exe' -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'D:\\Projects\\weather-tools\\weather-tools\\setup.py'"'"'; __file__='"'"'D:\\Projects\\weather-tools\\weather-tools\\setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base 'C:\Users\Cillian\AppData\Local\Temp\pip-pip-egg-info-ewy709ue' cwd: D:\Projects\weather-tools\weather-tools\ Complete output (7 lines): Traceback (most recent call last): File "<string>", line 1, in <module> File "D:\Projects\weather-tools\weather-tools\setup.py", line 66, in <module> long_description=open('README.md', 'r').read(), File "C:\Users\Cillian\Anaconda3\envs\py385\lib\encodings\cp1252.py", line 23, in decode return codecs.charmap_decode(input,self.errors,decoding_table)[0] UnicodeDecodeError: 'charmap' codec can't decode byte 0x8f in position 2775: character maps to <undefined> ----------------------------------------
Explanation of issue: https://dev.to/methane/python-use-utf-8-mode-on-windows-212i
Expected:
Run command with no issueFix:
Update the following line https://github.com/google/weather-tools/blob/main/setup.py#L66 to include encoding
long_description=open('README.md', 'r', encoding='utf-8').read(),
@alxmrs I have a branch locally with this fix, happy to put up a PR but I seem to not have write permissions for this repo. Would appreciate getting those as I'd like to make more (interesting 😉 ) contributions in the future!
weather-dl: Update CLI to accept multiple configs so we can queue downloads.
Right now,
weather-dl
takes in one config file at a time and starts a download for them. Instead, the CLI could be updated to take multiple config files, such that once one config is finished, it will start the next one.Make CLI arguments more uniform.
All CLIs should follow the same conventions, including:
- snake case or kebab case flags
- Common argument (log verbosity, dry-runs, local runs, etc.)
- interactions with Dataflow
Schedule open weather-tools developer meetings.
To foment better community support of these tools, let's schedule a weekly 30 minute meeting for
weather-tools
developers. If you are interested in contributing to this codebase, please let us know what time(s) you are available to meet here: http://whenisgood.net/weather-tools-meetingOnce a date is found, we'll update the contributing docs with the time and steps to attend the weekly meetings.
`target_path` templates should include be totally compatible with Python format strings
target_path
(andtarget_filename
) should be totally compliant with Python's standard string formatting. This includes being able to use named arguments (e.g.'gs://bucket/{year}/{month}/{day}'
) as well as specifying formats for strings (e.g.'gs://bucket/{year:04d}/{month:02d}/{day:02d}'
).Typo: missing “for” in Contributing Documentation.
The testing docs have a missing word:
pytype checking types
should readpytype for checking types
.Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.