Git Product home page Git Product logo

bigflow's Introduction

BigFlow

Documentation

  1. What is BigFlow?
  2. Getting started
  3. Installing Bigflow
  4. Help me
  5. BigFlow tutorial
  6. CLI
  7. Configuration
  8. Project structure and build
  9. Deployment
  10. Workflow & Job
  11. Starter
  12. Technologies
  13. Development

Cookbook

What is BigFlow?

BigFlow is a Python framework for data processing pipelines on GCP.

The main features are:

Getting started

Start from installing BigFlow on your local machine. Next, go through the BigFlow tutorial.

Installing BigFlow

Prerequisites. Before you start, make sure you have the following software installed:

  1. Python = 3.8
  2. Google Cloud SDK
  3. Docker Engine

You can install the bigflow package globally, but we recommend installing it locally with venv, in your project's folder:

python -m venv .bigflow_env
source .bigflow_env/bin/activate

Install the bigflow PIP package:

pip install bigflow[bigquery,dataflow]

Test it:

bigflow -h

Read more about BigFlow CLI.

To interact with GCP you need to set a default project and log in:

gcloud config set project <your-gcp-project-id>
gcloud auth application-default login

Finally, check if your Docker is running:

docker info

Help me

You can ask questions on our gitter channel or stackoverflow.

bigflow's People

Contributors

agnieszkarybak avatar anjensan avatar artnowo-alle avatar bartoszwalacik avatar beczkowb avatar busunkim96 avatar deejay1 avatar dependabot[bot] avatar ewebed avatar faderskd avatar grzegorzwitkowski avatar iwko avatar jpawl avatar pawelzwierzchowski avatar pzwierzchowski avatar sobelek avatar thegunner157 avatar tismas avatar tsienki avatar zygm0nt avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bigflow's Issues

Beam + BigQuery E2E testing

Setting up an E2E automatic test is tedious. We need a helper TestCase class which helps with the most common case: reading data from BQ, transforming, writing data back to the BQ.

The BQ TestCase helper needs to:

  • easily create test input tables
  • clear leftovers after the test
  • easily assert table content
  • allow parallel test running
  • integrate with BigFlow configuration tool and BigQuery tools

BigFlow instalator

Our users are not necessarily Python devs. So installing Python, virtual env management, requirements installation, etc. are not obvious things. We need an installation script (or some other solution) for the scaffold.

Errors by migrating to new version 1.1.3

Hello, I have encountered the issue while migrating to the new version of bigflow 1.1.3. Here I attach the stacktrace of errors:
If use Apache Beam 2.26.0
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 288, in _execute
response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 336, in
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 581, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 612, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 441, in get
self.data_channel_factory)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 875, in init
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 932, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 931, in
get_operation(transform_id))) for transform_id in sorted(
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 819, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 913, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 913, in
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 911, in
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 819, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 916, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1205, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1553, in create_par_do
parameter)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1589, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 294, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class
return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute '_JsonToDictCoder' on <module 'apache_beam.io.gcp.bigquery_read_internal' from '/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_read_internal.py'>

If use Apache Beam 2.27.0
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 638, in process_bundle instruction_id, request.process_bundle_descriptor_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 467, in get self.data_channel_factory) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in init self.ops = self.create_execution_tree(self.process_bundle_descriptor) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 925, in create_execution_tree descriptor.transforms, key=topological_height, reverse=True)]) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in get_operation(transform_id))) for transform_id in sorted( File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in pcoll_id in descriptor.transforms[transform_id].outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper result = cache[args] = func(*args) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in get_operation transform_id, transform_consumers) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation return creator(self, transform_id, transform_proto, payload, consumers) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1546, in create_par_do parameter) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1623, in _create_pardo_operation output_coders = factory.get_output_coders(transform_proto) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1245, in get_output_coders pcoll_id in transform_proto.outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1245, in pcoll_id in transform_proto.outputs.items() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1230, in get_windowed_coder coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1222, in get_coder return self.context.coders.get_by_id(coder_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", line 116, in get_by_id self._id_to_proto[id], self._pipeline_context) File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 370, in from_runner_api [context.coders.get_by_id(c) for c in coder_proto.component_coder_ids], File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 370, in [context.coders.get_by_id(c) for c in coder_proto.component_coder_ids], File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", line 116, in get_by_id self._id_to_proto[id], self._pipeline_context) File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 371, in from_runner_api context) File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 405, in _pickle_from_runner_api_parameter return deserialize_coder(payload.value) File "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line 116, in deserialize_coder return pickler.loads(serialized.split(b'$', 1)[1], use_zlib=True) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 282, in loads s = zlib.decompress(c) zlib.error: Error -3 while decompressing data: incorrect header check

Pass dict instead of 'PipelineOptions' to 'BeamJob'

Class BeamJob can create PopelineOptions internally by calling from_dictionary - this eliminate needs to import & use this class (sort of internal) by end user. Variants:

  • pass options via dict (typing.TypedDict for py38 for typechecking)
  • pass options via named args of BeamJob

Broken retry_count and delay

The new bigflow.Job interface uses retries and retries_delay variables and the dag builder expects retry_count and retry_pause_sec.

Fail 'bigflow build-dags' when BF can't import module

Current behaviour: dag generation of single workflow is skipped when python package can not be imported. This leads to incomplete deploys (some dags are generated, some are not).
Expected: fail the whole 'build-dags' subcommand (fail-fast strategy).

Add scaffold commands 'add-job' etc

bigflow CLI might provide commands like 'add-job' (or similar) which asks a few questions about the job (beam/pyspark/etc, package / id etc) and generates code for such job with appropriate placeholders.

Tools for BigQuery E2E tests

We need:

  • A tool for creating a BigQuery table from an avro schema (and BigQuery client schema*)
  • A tool for inserting records to a BigQuery table in the form of a dict

We need the tools available from this interface.

Non-deterministic get_version function

Currently, the bigflow.version.get_version function adds a random suffix to the version if the repository is dirty. There are 2 problems with that fact:

  • A developer has to use the function only once, otherwise, a developer can get 2 different versions
  • When a user runs build-image/dags/package commands separately, each command might generate a different version

We need to make the suffix generator deterministic.

Dill serialization/deserialization error

Importing pandas in a Dataflow process causes dill.load failure. It looks like a race condition because the error occurs more often on bigger machines (tested on n1-standard-16) than smaller (n1-standard-4). Maybe the issue is malformed bytecode (overridden concurrently?).

Example stack trace:

Error message from worker: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 381, in get
    processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 279, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
  File "/usr/local/lib/python3.7/site-packages/listing_analytics/listing_data_processing/workflow.py", line 6, in <module>
    from listing_analytics.commons.result_datasets import listing_dataset_config, LISTING_REQUESTS_TABLE_NAME, LISTING_ITEMS_TABLE_NAME
  File "/usr/local/lib/python3.7/site-packages/listing_analytics/commons/result_datasets.py", line 1, in <module>
    from bigflow.bigquery import DatasetConfig
  File "/usr/local/lib/python3.7/site-packages/bigflow/bigquery/__init__.py", line 1, in <module>
    from .interactive import interactive_component as component
  File "/usr/local/lib/python3.7/site-packages/bigflow/bigquery/interactive.py", line 7, in <module>
    import pandas as pd
  File "/usr/local/lib/python3.7/site-packages/pandas/__init__.py", line 52, in <module>
    from pandas.core.api import (
  File "/usr/local/lib/python3.7/site-packages/pandas/core/api.py", line 15, in <module>
    from pandas.core.arrays import Categorical
  File "/usr/local/lib/python3.7/site-packages/pandas/core/arrays/__init__.py", line 9, in <module>
    from pandas.core.arrays.integer import IntegerArray, integer_array
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 724, in exec_module
  File "<frozen importlib._bootstrap_external>", line 857, in get_code
  File "<frozen importlib._bootstrap_external>", line 525, in _compile_bytecode
EOFError: marshal data too short

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute
    response = task()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 513, in process_bundle
    instruction_id, request.process_bundle_descriptor_id)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 387, in get
    self.data_channel_factory)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 854, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 908, in <listcomp>
    (transform_id, get_operation(transform_id)) for transform_id in sorted(
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 798, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 890, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 798, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 890, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 798, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in get_operation
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in <dictcomp>
    pcoll_id in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 890, in <listcomp>
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 798, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 895, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1182, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1526, in create_par_do
    parameter)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1562, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 283, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
  File "/usr/local/lib/python3.7/site-packages/listing_analytics/listing_data_processing/workflow.py", line 6, in <module>
    from listing_analytics.commons.result_datasets import listing_dataset_config, LISTING_REQUESTS_TABLE_NAME, LISTING_ITEMS_TABLE_NAME
  File "/usr/local/lib/python3.7/site-packages/listing_analytics/commons/result_datasets.py", line 1, in <module>
    from bigflow.bigquery import DatasetConfig
  File "/usr/local/lib/python3.7/site-packages/bigflow/bigquery/__init__.py", line 1, in <module>
    from .interactive import interactive_component as component
  File "/usr/local/lib/python3.7/site-packages/bigflow/bigquery/interactive.py", line 7, in <module>
    import pandas as pd
  File "/usr/local/lib/python3.7/site-packages/pandas/__init__.py", line 52, in <module>
    from pandas.core.api import (
  File "/usr/local/lib/python3.7/site-packages/pandas/core/api.py", line 15, in <module>
    from pandas.core.arrays import Categorical
  File "/usr/local/lib/python3.7/site-packages/pandas/core/arrays/__init__.py", line 9, in <module>
    from pandas.core.arrays.integer import IntegerArray, integer_array
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 724, in exec_module
  File "<frozen importlib._bootstrap_external>", line 857, in get_code
  File "<frozen importlib._bootstrap_external>", line 525, in _compile_bytecode
EOFError: marshal data too short

Removing on-the-fly generation of the Beam job `setup.py`

Currently, we have util that generates setup.py on-the-fly for a beam job execution. The generated setup.py is not the same as the project_setup.py which causes various problems (f.e. not being able to reach resources from a beam job).

I suggest that we add project .whl to the resources (during build) so a beam process can just install ready to use the package instead of generating it using the generated setup.py.

Allow to pass all arguments/options to scaffold via command line

Scaffold may be used by wrapper scripts or in some automation environment.
Real use case: it may be integrated into script, which regenerates static project template (such script need to be reexecuted manually when new version of bigflow was released).

To do this 'start-project' (and presumably any scaffold-like commands) should expect to receive all parameters via cmd-line. Like:

bigflow start-project --non-interactive -Pproject_id='my_project' -Pgcp_bucket='some_gcp-bucket'

Alternative 1 - pass parameters via separate file (json, yaml?):

echo ...  > params.json
bigflow start-project --parameters-file params.json

Alternative 2 - provide custom cmd-line option for all parameters:

bigflow start-project --project-id "my-project" --gcp-bucket "..."

Alternative 2b - migrate to https://click.palletsprojects.com/en/7.x/options/#option-prompting #156

Consider switching to 'click' instead of 'argparse'

Better Docker integration

Currently, BigFlow uses docker via the CLI. It's convenient but makes error handling and testing hard. We can start using python client.

find_or_create_setup_for_main_project_package default arguments value

The find_or_create_setup_for_main_project_package function takes 2 parameters. Both arguments can be optional.

Make the search_start_file argument optional by providing a default value. The default value should be Path to a caller module file.

Make the project_name argument optional by providing a default value. The default value should be a top-level package name of a caller. For example, let us say that you use the find_or_create_setup_for_main_project_package function from the caller_module in the following package tree:

my_top_package/
    __init__.py
    my_bottom_package/
        __init__.py
        caller_module.py

In that case, the default project_name value should be 'my_top_packge'.

Faster Dataflow start

Currently, running a Dataflow job has a long initial phase for dependencies download/preparation. It especially long on Airflow workers. We need to find a way to make a job start faster.

Ask about instructions for migration to new version

Hello, may I kindly ask you about providing some instructions for migration to new version to facilitate this process for other developers?
Especially, I found that config.py should be modified in suche way:
options.view_as(SetupOptions).setup_file = str(reflect.materialize_setuppy()) instead of
setup_file_path = resolve(find_or_create_setup_for_main_project_package()) requirements_file_path = resolve(get_resource_absolute_path('requirements.txt'))
and the second thing is about method in setup.py bigflow.build.setup should be used to avoid warnings about deprecation
Hope it will be useful, thanks!

Refactor variables naming / meaning in cli.py

At this moment in 'cli.py' in many places "root_package" means "directory name" (with type Path), "project_dir" means "package name" (with type str), function 'find_root_package' return path... Overall it makes hard to read/maintain the codebase.

E2E base class for test cases

To run each test case in a separate process, we need a base class (so it overrides the basic TestCase behavior and runs each def test_something(self): in a fresh process). Thanks to that we can resolve a bigflow.Workflow object in a secure way (each time with a freshly resolved configuration, no imports caching).

Epic: #145

Beam requirements cache

Currently, when you start a Beam process, it needs to fetch all the requirements every time. It takes a lot of time. We need some mechanism that makes Beam start faster.

Optional workflow step

Sometimes, a user wants to omit the execution of a specific job. For example, a user wants to omit the specific table sensor on the test environment (because very often there is no data on the test environment).

We need some tool that can skip a job execution on runtime, based on a user-defined condition.

Speedup docker image generation

Speedup docker image generation by moving 'static' layers on top:

  • run apt-get ... before adding dist/*
  • add && apt-get clean
  • manually preinstall pip dependencies before adding dist/* via pip install -r

Optionally: don't export docker image if file with the same hash already exists at .images

Independent executions

Currently, we use the default Airflow behavior - if the job fails, execution will not go further in time. Sometimes job executions are independent so we need a flag on bigflow.Workflow to allow independent execution.

Secure Dataflow job timeout on Airflow

Currently, if there is a Dataflow job running on a Cloud Composer worker (Kubernetes pod), and Composer kills the job due to timeout, the Dataflow job is still running (timeout does not kill the job). We need to find a way of handling timeouts securely, especially for Dataflow jobs.

InteractiveComponent does not return results

@log_syntax_error
    def run(self, runtime=DEFAULT_RUNTIME, operation_name=None):
        _, component_callable = decorate_component_dependencies_with_operation_level_dataset_manager(
            self._standard_component, operation_name=operation_name)
        job = Job(component_callable, **self._dependency_config)
        job.execute(bigflow.JobContext.make(runtime=runtime)) # <--- missing return

Use 'pip-tools' in scaffolded projects

It is a good practice to recommend to our clients 'pip-tools' (or other similar tool) for dependencies management.
We should add this to our 'scaffold'.

Removing the optional env variables prefix from Config class

Currently, a user can set a custom prefix to the environmental variables in the Config class. The problem is that in many places we assume that the prefix has the default value. Currently, it makes no sense to keep that as customizable, it needs to be removed.

Moving Cloud Composer to Artifact Registry

We should think about what does it mean for BigFlow:

Hello Cloud Composer Customer,We are writing to let you know that in the December 2020 timeframe, the Artifact Registry API will be enabled for all your projects that contain Cloud Composer environments.We are making this adjustment to prepare for the upcoming transition of Cloud Composer's container image storage from Container Registry to Artifact Registry. This transition will happen gradually throughout 2021. Some new upcoming Composer features might also require usage of Artifact Registry.When you install new PyPI dependencies, Composer creates Airflow container images with your PyPI dependencies and they are stored in Google Container Registry (as it is described in the Composer pricing documentation). We are planning to transition from Google Container Registry to Artifact Registry as the main container registry for Composer. When the transition is complete, you may start incurring charges for Artifact Registry instead of Container Registry for above-mentioned container images used by your Cloud Composer environments. See Artifact Registry Pricing for details.No action is required on your part at this time.If you have any questions or require assistance, please contact Google Cloud Composer Support.Thanks for choosing Cloud Composer.

"Dotify" artefacts

Currently, not all of the deployment artifacts have a . prefix. We want to apply the . prefix to all of them.

BigQuery serialization/deserialization framework for Beam

There are a few issues when it comes to reading and writing to BQ in a Beam process.

  1. When you read a table, you typically want to map the dictionaries you are getting as input, to an object (which represents your domain). The second thing is that it's nice to have type checking when you deserialize BQ input.

We need a tool that can map a dictionary to an object of the specified dataclass, and can perform type checking.

  1. When you write records to a BQ table, you need to map them to the dictionary first. Again, it's nice to have out-of-the-box type checking in this scenario.

We need a tool that can map an object to of the specified dataclass to the dictionary, and can perform type checking.

  1. When you write to a BQ table, you need to specify the table schema. We need a tool that can generate BQ schema from the dataclass. Combined with the serializer from pt. 2, we have a single dataclass which describes the final output table.

Use PEP 440 compliant version for snapthots

During build on dirty git repo:

dist.py:476: UserWarning: The version specified ('0.1.0SNAPSHOT6d2bde52') is an invalid version, this may not work as expected with newer versions of setuptools, pip, and PyPI. Please see PEP 440 for more details.

Cleaning `setup.py` before run

Currently, the setup.py generated for a Beam job is not cleaned before the bf run. It should be, because from version to version, the setup.py content may change.

--no-snapshot flag for `bf build`

Currently, the versioning system adds a unique id if a repository is dirty (so you don't have to commit every time you make a local change and build). In CI/CD servers it would be useful to turn off this behavior, so you don't have to add things (if they occur on build) to gitignore.

The --no-snapshot would prevent from adding a unique id on build:

bf build --no-snapshot

Jobs draining after deployment of new workflow

Problem
For this moment we remove old DAGS from the composer bucket when we deploy a new DAGS. It can result in gaps in data because in this flow it is possible to remove workflow with an in-progress state.

How to fix
We should use some kind of draining mechanism that allows ending the in-progress workflows after the current interval is completed and then starts freshly deployed workflows and remove the old one.

Pre-command Docker validation

We need to check if the user has docker running (on bf run/build/deploy), if not, bf should inform the user to install docker.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.