google / weather-tools Goto Github PK
View Code? Open in Web Editor NEWTools to make weather data accessible and useful.
Home Page: https://weather-tools.readthedocs.io/
License: Apache License 2.0
Tools to make weather data accessible and useful.
Home Page: https://weather-tools.readthedocs.io/
License: Apache License 2.0
Similar to #13, except the manifest should be a BigQuery table. This makes sense if users choose to use weather-mv
and want their data queryable from one place.
Extend our manifest system to store download statuses in a relational table, namely in PostgreSQL dialect.
Optional: Document using the Postgres interface on top of Cloud Spanner: https://cloud.google.com/blog/topics/developers-practitioners/postgresql-interface-adds-familiarity-and-portability-cloud-spanner
I have tried with A1Dxxxxxxxxx.bz2 file, extract_rows completed without error but writing to BQ shows the below error:
Traceback (most recent call last):
File "/opt/conda/bin/weather-mv", line 74, in <module>
cli(['--extra_package', pkg_archive])
File "/opt/conda/lib/python3.7/site-packages/weather_mv/loader_pipeline/__init__.py", line 23, in cli
run(sys.argv + extra)
File "/opt/conda/lib/python3.7/site-packages/weather_mv/loader_pipeline/pipeline.py", line 363, in run
custom_gcs_temp_location=known_args.temp_location)
File "/opt/conda/lib/python3.7/site-packages/apache_beam/pipeline.py", line 586, in __exit__
self.result.wait_until_finish()
File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1635, in wait_until_finish
self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process
self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 578, in wait_for_bq_job
job_reference.jobId, job.status.errorResult))
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_43_6c0ec7fd665d0e72361b31b2c0dff157_57b5577d56954d57ab20e7d97c1fdca6 failed. Error Result: <ErrorProto
location: 'gs://ecmwf-xxxxx/bq_load/10a305afebc345c7bff3632233370d22/xxxxxxx-xxx.xxxxx.ecmwf_realtime_test_bz2/4ed537d9-66f5-4e64-8927-95a0bfce62f3'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'
reason: 'invalid'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 877, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 730, in process
self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 578, in wait_for_bq_job
job_reference.jobId, job.status.errorResult))
RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_43_6c0ec7fd665d0e72361b31b2c0dff157_57b5577d56954d57ab20e7d97c1fdca6 failed. Error Result: <ErrorProto
location: 'gs://ecmwf-xxxx/bq_load/10a305afebc345c7bff3632233370d22/xxxxx-xxxx.xxxx.ecmwf_realtime_test_bz2/4ed537d9-66f5-4e64-8927-95a0bfce62f3'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details.'
reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/WaitForDestinationLoadJobs']
Internal tracker: b/173814654
Our initial use case of weather-sp
was to split monolithic files by variable. Now, we've encountered a case where we'd like to split atmospheric data by model level.
To better generalize to all future uses of the splitter, let's re-design the tool to support arbitrary divisions of the data hypercube. This, ultimately, would help us improve usage of the weather-dl
tool: If we didn't have to worry about the final file structure (because we could make it uniform), we could better optimize downloads. Specifically, we can get as much data as we can under the memory limit, which is ~75 GB for MARS as of writing.
As discussed , it should be possible to not specify a partition key and download all data as one chunk
Currently the absence of partition_keys
in the config file causes a KeyError
A request from Emma Pidduck from ECMWF:
It would be great if you could add the licence conditions for each of datasets and also the attribution to the datasets, if possible.
CDS/ERA5 has a different licence to the Archive Catalogue: https://cds.climate.copernicus.eu/api/v2/terms/static/licence-to-use-copernicus-products.pdf
Archive catalogue licence & Terms of Use: https://apps.ecmwf.int/datasets/licences/general/
I observe that our CI takes a long time to complete. It is early in the lifetime of this project for our unit tests to be slow (certainly, the codebase is small enough). Having fast tests and an expedient CI system increases the pace of development & bugfixes.
This bug involves investigating why our unit tests takes a long time to run. This may be related to #53.
Ran the below command for one grib bz2 file for batch processing:
weather-mv --uris "gs://xxxxxx/A4D0113xxxxxxx.bz2" --output_table xxxx-xxx.xxxx.ecmwf_realtime_test_a4 --region us-central1 --runner DataflowRunner --setup_file ./setup.py --project xxxx-xxx --temp_location gs://xxxxxx-test/tmp
The table was not available in the BQ so the pipeline creates it on the go.
Below is the error traceback:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1299, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 572, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1256, in process
return self._flush_batch(destination)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1312, in _flush_batch
skip_invalid_rows=True)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1136, in insert_rows
for r in rows
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1136, in
for r in rows
TypeError: Type is not JSON serializable: numpy.float64 [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
As a user, when I run a newer version of weather-mv
on a dataset, and the schema of the BigQuery database has changed (i.e. changed wrt to the dataset), I would like weather-mv
to automatically migrate the table. I need this, as a user, so I don't my data pipelines don't break when new versions of weather-mv
are released, and so I can get the latest tool features.
Last, as a user, I would like some control over weather migrations are applied, and what migrations to make. Ideally, the CLI should let me opt out of migrations if I know best. Otherwise, I want the tools to default to the recommended case.
After #7 is complete, we should prepare a "cookbook" of common dev-ops workflows that use weather-tools. These should include:
To foment better community support of these tools, let's schedule a weekly 30 minute meeting for weather-tools
developers. If you are interested in contributing to this codebase, please let us know what time(s) you are available to meet here: http://whenisgood.net/weather-tools-meeting
Once a date is found, we'll update the contributing docs with the time and steps to attend the weekly meetings.
target_path
(and target_filename
) should be totally compliant with Python's standard string formatting. This includes being able to use named arguments (e.g. 'gs://bucket/{year}/{month}/{day}'
) as well as specifying formats for strings (e.g.'gs://bucket/{year:04d}/{month:02d}/{day:02d}'
).
Add a command line flag to control whether in-memory copy should be enabled or not.
If disabled grib files should set cache=True during open_dataset() to speed up reads.
Similar to the *.cfg
configuration file examples found in /configs
, create example JSON config(s)
Check out TODO in unit_test.py
.
All CLIs should follow the same conventions, including:
I just noticed in an e2e run, the pipeline doesn't fail but issues warnings about failure to insert data into BigQuery:
There were errors inserting to BigQuery. Will retry. Errors were [{'index': 0, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 1, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 2, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 3, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 4, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 5, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 6, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 7, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 8, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 9, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 10, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 11, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 12, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 13, 'errors': [{'reason': 'invalid', 'location': 'surface', 'debugInfo': '', 'message': 'no such field: surface.'}]}, {'index': 14, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 15, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 16, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 17, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 18, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 19, 'errors': [{'reason': 'invalid', 'location': 'number', 'debugInfo': '', 'message': 'no such field: number.'}]}, {'index': 20, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 21, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}, {'index': 22, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 23, 'errors': [{'reason': 'invalid', 'location': 'step', 'debugInfo': '', 'message': 'no such field: step.'}]}, {'index': 24, 'errors': [{'reason': 'invalid', 'location': 'valid_time', 'debugInfo': '', 'message': 'no such field: valid_time.'}]}]
I think the issue is this logic in extract_rows
:
After running weather-dl for a long time (~25 days), I hit this error:
line 1099, in read return self._sslobj.read(len, buffer) socket.timeout: The read operation timed out
My suspicion is that reading or writing large files to GCS is timing out. While we have retries for this, we filter out certain errors for retrying, including this one. A fix for this might look like revisiting that retry logic and making sure to include these type of timeout errors.
I registered on ECMWF site (https://apps.ecmwf.int/registration/) and got an API key from https://api.ecmwf.int/v1/key/ and created ~/.ecmwfapirc as shown on the page.
When I run:
weather-dl mars_example_config.cfg --local-run
I get:
File "/opt/conda/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1635, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File "/opt/conda/lib/python3.7/site-packages/weather_dl/download_pipeline/pipeline.py", line 241, in fetch_data
client.retrieve(dataset, selection, temp.name)
File "/opt/conda/lib/python3.7/site-packages/weather_dl/download_pipeline/clients.py", line 172, in retrieve
self.c.execute(req=selection, target=output)
File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 645, in execute
quiet=self.quiet,
File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 418, in __init__
info = self.connection.call("%s/%s/%s" % (self.url, self.service, "info")).get(
File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 148, in wrapped
return func(self, *args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/ecmwfapi/api.py", line 350, in call
raise APIException("ecmwf.API error 1: %s" % (self.last["error"],))
ecmwfapi.api.APIException: "ecmwf.API error 1: User 'lak@...' has no access to services/mars [while running 'FetchData']"
A helpful step towards fixing #21.
ticket updated: 2022-03-24
As a weather tools user, I would like to be able to preview the effect of each tool before incurring the cost of data movement and infrastructure. These light-weight previews will help me test pipelines before deployment, lowering the number of iterations needed to set up a data pipeline. For this issue, I want to be able to perform dry runs with the weather mover.
-d
or --dry-run
flag to the weather-mv
cli, this feature will be activated.weather-mv
users is preferred.In each tool, the documentation refers to dataflow options. Then, it links to docs on how to set programmatic pipeline options. Instead, it should link to these docs.
https://cloud.google.com/dataflow/docs/reference/pipeline-options
Looking at the upload-to-pypi logs, it looks like testdata is included in the tarball (I see .grib and .nc files in the logs). This bug involves updating the MANIFEST.in to exclude testdata in the tars. That way, they will be lightweight / quick-to-install.
In an oversight, a recent feature of weather-dl
has not been documented. Let's make known to readers of the tool's README that the flag exists and describe what it does. Further, let's add a usage example.
In the weather downloader, we underutilize licenses due to the current way they are distributed. Worse, if we use near the max number of queued workers (like we do by default for the MARS downloads), pipelines will fail with a "MaxAPIRequests" error from ECMWF.
My leading hypothesis about what's causing this is that our license distribution scheme depends on a particular order of future requests. However, we've made recent changes to better work with the dataflow autoscaling algorithm to parallelize download requests, which allocates the fetch_data
step in a random order.
weather-dl
with multiple licenses (CDS is fine).Outcome: There will be an uneven distribution of download requests. Some may have duplicates while others don't have any work dispatched. This leads to underutilization of licenses and in some cases brittle pipelines.
Expected: Each worker should at least have their request limit maxed out (licenses fully utilized). Requests, in general, should be evenly distributed. Pipelines should never hit the "Too many requests" error.
The strategy to get a robust yet high utilization of downloads is to split data requests into multiple configs and run weather-dl
twice (or multiple times). It especially helps to leverage the -n
flag to limit the number of data requests to the access limit (for CDS, the defaults are currently fine. For MARS, set -n 2
).
A good strategy would be to restructure the pipeline such that:
beam.Create()
steps).zip
in Python) after a reshuffle so licenses are evenly distributed.This might be just an issue for me on Windows but I am unable to run the command line tools after installing them via
pip install git+http://github.com/google/weather-tools.git#egg=weather-tools
E.g.
C:\Users\Cillian>weather-dl configs/era5_example_config_local_run.cfg --local-run 'weather-dl' is not recognized as an internal or external command, operable program or batch file.
Edition: Windows 10 Home
Version: 21H1
OS build 19043.1415
64-bit Operating System
Python version: 3.8.5
A lot of the names, like partition_keys
, were selected independent of knowledge of weather data ecosystems (e.g. like terms used in XArray). In this issue, we need to come up with proper names in preparation for the 1.0 release.
Right now, weather-dl
takes in one config file at a time and starts a download for them. Instead, the CLI could be updated to take multiple config files, such that once one config is finished, it will start the next one.
I used the weather splitter on a file system before it had templates, and now in its current version. The original file system I have looks like:
.../2020/202001_hres_pcp.grb2_surface_d2m.grib
Using the new templating system in the weather splitter, I cannot format a template string to match this file path. This is because I don't have control over the position of the level
and short_name
variables. It seems like the templating system is oriented towards setting variable file paths instead of templating the variable and level placement in the string. The latter templating options are more needed than the former.
As a user, I want to be able to load real-time forecasts from cloud storage to BigQuery. I want to do this in the simplest, most efficient computational means. For example, ECMWF disseminates data 4x per day into cloud storage buckets. Once an upload event occurs, I want data to automatically start ingesting into BigQuery.
Support a new argument --topic
that lets users specify a GCP PubSub topic of CREATE
or FINALIZE
events to a GCS bucket. This argument or --uris
will be required flags. Usage of --topic
will deploy a streaming dataflow job that subscribes to PubSub events, windows them into 1 minute buckets, and performs the extract_rows
operation on incoming data.
Note: for other clouds, users can bridge other notification systems to GCP's PubSub in the short term. Apache Kafka support can come later.
The current config-value parser treats all values as strings. Instead, we need to try to infer types in a best-effort way. This is a pre-requisite step in order to fix #4.
With a global forecast dataset, I've the need to extract data with ~3,244.6k coordinate values (time x lat x lng). As of today, it takes about ~9 seconds to extract 1k rows. So: 9*3243.6/60/60
= ~8 hours per file.
While in streaming pipelines, data is written as soon as it's available, we'd ideally like all of the data to be processed within ~1 hour, so the first forecast is actionable.
Users should be able to read documentation without looking at the source repository. We can either use Github pages or readthedocs. I have a preference for the latter.
The testing docs have a missing word: pytype checking types
should read pytype for checking types
.
Internal tracker: b/190200288
Hey @CillianFn and @pranay101, this issue was inspired by #52 and #47. If either of you would like to pick this up, that would be a great help!
To summarize: we should start treating our configs like code and include them in our automated checks. For this issue to be fixed, we should add to our CI build to test that all the example configs parse correctly (check for correct syntax). For now, I think it's sufficient to run each config with the -d, --dry-run
flag. Later, we can check that the semantics of the configs are correct.
A suggestion from @lakshmanok:
It would be good to if weather-mv were to add a
ST_GeogPoint(longitude, latitude)
column instead of (or in addition to) the longitude, latitude columns. This precomputes the S2 location and makes querying faster.Even better if the column was a polygon with the extent of the pixel. That way
ST_INTERSECTS
etc. will also work
ticket updated: 2022-03-24
weather-mv
can make use of the Geography capabilities of BigQuery.
GEOGRAPHY
, which uses "point geography" from the raw data's lat/lng data.GEOGRAPHY
type. Namely, ST_GEOPOINT
is a function that can take lat/lng FLOAT64 columns and create a GEOGRAPHY
Point value from it.Even better if the column was a polygon with the extent of the pixel. That way
ST_INTERSECTS
etc. will also work
Implementing extra columns to include the polygon of the (grid) area where the values are relevant is a great bonus feature. However, it will not be possible for all types of input data. This experience is only possible if the XArray DataFrame includes extra information, like a coordinate or variable attribute.
For now, while we implement the first part of this ticket, let's also investigate if such metadata exists on our happy-path data sources. If it does, we'll create a follow up ticket to implement this.
The scheme or template is less powerful than reading the output that would happen from the input printed.
In the future, we should write a script to generate these testdata files. Checking them in slows down how fast we can clone the repo.
Originally posted by @alxmrs in #40 (comment)
Trying to spin up a streaming dataflow job with the below command:
weather-mv --uris "gs://ecmwf-xxxxx/A1D*.bz2" \
--output_table xxxx-xx.xxxx.xxxx \
--topic "projects/xxxx/topics/ecmwf-xxxxx" \
--runner DataflowRunner \
--project xxx-xxxx \
--region us-central1 \
--temp_location gs://xxxxx-test \
--job_name ecmwf-xxxxxx-test2
Below is the error. The dataflow job hasn't been created.
Can you add a flag so the user can control if the dataset should be opened in memory? I can see this tool being used on O(100GB) files, where users would like to turn this off.
Originally posted by @alxmrs in #74 (comment)
This will help with distribution of the tool. Python users would only have to run pip install weather-tools
to get the source and updates.
Ran the below command of weather-mv to import a batch of grib bz2 files into BQ.
weather-mv --uris "gs://xxxxx/A4D0113*.bz2" --output_table xxxx.xxxxx.ecmwf_realtime_test_a4 --runner DataflowRunner --project xxxxxx --temp_location gs://xxxxx/tmp --job_name ecmwf_xxxxx_test_a4
Traceback (most recent call last): File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 870, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 238, in extract_rows with open_dataset(uri) as ds: File "/usr/local/lib/python3.7/contextlib.py", line 112, in enter return next(self.gen) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 94, in open_dataset xr_dataset: xr.Dataset = __open_dataset_file(dest_file.name) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 62, in __open_dataset_file return xr.open_dataset(filename) File "/usr/local/lib/python3.7/site-packages/xarray/backends/api.py", line 479, in open_dataset engine = plugins.guess_engine(filename_or_obj) File "/usr/local/lib/python3.7/site-packages/xarray/backends/plugins.py", line 155, in guess_engine raise ValueError(error_msg) ValueError: did not find a match in any of xarray's currently installed IO backends ['scipy']. Consider explicitly selecting one of the installed engines via the engine
parameter, or installing additional IO dependencies, see: http://xarray.pydata.org/en/stable/getting-started-guide/installing.html http://xarray.pydata.org/en/stable/user-guide/io.html During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start File "apache_beam/runners/worker/operations.py", line 353, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 712, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 713, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1234, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1232, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 752, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 870, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1368, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 238, in extract_rows with open_dataset(uri) as ds: File "/usr/local/lib/python3.7/contextlib.py", line 112, in enter return next(self.gen) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 94, in open_dataset xr_dataset: xr.Dataset = __open_dataset_file(dest_file.name) File "/usr/local/lib/python3.7/site-packages/loader_pipeline/pipeline.py", line 62, in __open_dataset_file return xr.open_dataset(filename) File "/usr/local/lib/python3.7/site-packages/xarray/backends/api.py", line 479, in open_dataset engine = plugins.guess_engine(filename_or_obj) File "/usr/local/lib/python3.7/site-packages/xarray/backends/plugins.py", line 155, in guess_engine raise ValueError(error_msg) ValueError: did not find a match in any of xarray's currently installed IO backends ['scipy']. Consider explicitly selecting one of the installed engines via the engine
parameter, or installing additional IO dependencies, see: http://xarray.pydata.org/en/stable/getting-started-guide/installing.html http://xarray.pydata.org/en/stable/user-guide/io.html [while running 'ExtractRows']
Issue:
When running the following setup command on Windows 10 (Python 3.8.5),
pip install -e ."[dev]"
Getting the following error
ERROR: Command errored out with exit status 1: command: 'C:\Users\Cillian\Anaconda3\envs\py385\python.exe' -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'D:\\Projects\\weather-tools\\weather-tools\\setup.py'"'"'; __file__='"'"'D:\\Projects\\weather-tools\\weather-tools\\setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base 'C:\Users\Cillian\AppData\Local\Temp\pip-pip-egg-info-ewy709ue' cwd: D:\Projects\weather-tools\weather-tools\ Complete output (7 lines): Traceback (most recent call last): File "<string>", line 1, in <module> File "D:\Projects\weather-tools\weather-tools\setup.py", line 66, in <module> long_description=open('README.md', 'r').read(), File "C:\Users\Cillian\Anaconda3\envs\py385\lib\encodings\cp1252.py", line 23, in decode return codecs.charmap_decode(input,self.errors,decoding_table)[0] UnicodeDecodeError: 'charmap' codec can't decode byte 0x8f in position 2775: character maps to <undefined> ----------------------------------------
Explanation of issue: https://dev.to/methane/python-use-utf-8-mode-on-windows-212i
Expected:
Run command with no issue
Fix:
Update the following line https://github.com/google/weather-tools/blob/main/setup.py#L66 to include encoding
long_description=open('README.md', 'r', encoding='utf-8').read(),
@alxmrs I have a branch locally with this fix, happy to put up a PR but I seem to not have write permissions for this repo. Would appreciate getting those as I'd like to make more (interesting 😉 ) contributions in the future!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.