Git Product home page Git Product logo

nitroml's Introduction

NitroML

NitroML is a framework for benchmarking Automated Machine Learning (AutoML) pipeline model-quality.

Our mission is to provide model-quality benchmarking tools to accelerate AutoML research and development.

More generally, NitroML enables AutoML research teams to iterate more quickly on their highly-customized, heterogeneous, and multi-stage pipelines. It offers machine learning model-quality benchmarking best practices out-of-the-box, curates raw datasets, and leverages [TFX:OSS] to scale with Cloud-resources. Its benchmark database and analysis tools ensure that AutoML teams can be data-driven as they modify their systems.

This is not an officially supported Google product.

For more information, see go/nitroml.

nitroml's People

Contributors

cweill avatar josefigueroa168 avatar michelleliu1 avatar nikhil-dce avatar scottyak avatar tirthdarji avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nitroml's Issues

Run 3 datasets in a single NitroML DAG

class TitanicBenchmark(nitroml.Benchmark):
  r"""Demos a NitroML benchmark on the 'Titanic' dataset from OpenML."""

  def benchmark(self):
    # NOTE: For convenience, we fetch the OpenML task from the AutoTFX
    # tasks repository.
    datasets = [
   tfds_dataset.TFDSDataset(tfds.builder('titanic')),
   tfds_dataset.TFDSDataset(tfds.builder('adult')),
     ]
    for dataset in datasets:
      with self.sub_benchmark(dataset.name):
    # Compute dataset statistics.
    statistics_gen = tfx.StatisticsGen(examples=dataset.examples)

    # Infer the dataset schema.
    schema_gen = tfx.SchemaGen(
        statistics=statistics_gen.outputs.statistics, infer_feature_shape=True)

    # Apply global transformations and compute vocabularies.
    transform = tfx.Transform(
        examples=dataset.examples,
        schema=schema_gen.outputs.schema,
        module_file=os.path.join(
            os.path.dirname(__file__), 'auto_transform.py'))

    # Define a tf.estimator.Estimator-based trainer.
    trainer = tfx.Trainer(
        module_file=os.path.join(
            os.path.dirname(__file__), 'auto_estimator_trainer.py'),
        custom_executor_spec=executor_spec.ExecutorClassSpec(
            trainer_executor.GenericExecutor),
        transformed_examples=transform.outputs.transformed_examples,
        schema=schema_gen.outputs.schema,
        transform_graph=transform.outputs.transform_graph,
        train_args=trainer_pb2.TrainArgs(num_steps=10000),
        eval_args=trainer_pb2.EvalArgs(num_steps=5000))

    # Collect the pipeline components to benchmark.
    pipeline = dataset.components + [
        statistics_gen, schema_gen, transform, trainer
    ]

    # Finally, call evaluate() on the workflow DAG outputs. This will
    # automatically append Evaluators to compute metrics from the given
    # SavedModel and 'eval' TF Examples.
    self.evaluate(
        pipeline, examples=dataset.examples, model=trainer.outputs.model)

Programmatic skipping

class SkipWrapper:

  def __init__(self, wrappee):
    self._wrappee = wrappee
    self._resolver = tfx.ResolverNode(
        instance_name=f'skip_{wrappee.id}',
        resolver_class=latest_artifacts_resolver.LatestArtifactsResolver,
        **wrappee.outputs)

  @property
  def outputs(self) -> Dict[str, types.Channel]:
    return self._resolver.outputs

  @property
  def resolver(self) -> tfx.ResolverNode:
    return self._resolver

  def __getattr__(self, attr):
    return getattr(self._wrappee, attr)


class Benchmark(abc.ABC):
  """A benchmark which can be composed of several benchmark methods.

  The Benchmark object design is inspired by `unittest.TestCase`.

  A benchmark file can contain multiple Benchmark subclasses to compose a suite
  of benchmarks.
  """

  def __init__(self):
    self._benchmark = self  # The sub-benchmark stack.
    self._result = None
    self._seen_benchmarks = None
    self._pipeline = []
    self._within_skip_context = False

  @abc.abstractmethod
  def benchmark(self, **kwargs):
    """Benchmark method to be overridden by subclasses.

    Args:
      **kwargs: Keyword args that are propagated from the called to
        nitroml.run(...).
    """

  def _run_component(self, component):
    if self._within_skip_context:
      logging.warning(
          'Skipping "%s". Downstream components will use most recent artifacts.',
          component.id)
      skip_wrapper = SkipWrapper(component)
      self._pipeline.append(skip_wrapper.resolver)
    else:
      if hasattr(component, 'components'):
        for c in component.components:
          self._pipeline.append(c)
      else:
        self._pipeline.append(component)
      skip_wrapper = component
    return skip_wrapper

    # if self._within_skip_context:
    #   self._pipeline.append(component)
    #   skip_wrapper = SkipWrapper(component)
    #   self._pipeline.append(skip_wrapper.resolver)
    #   return skip_wrapper

    # self._pipeline.append(component)
    # return component

  def _rename_component(self, component):
    # pylint: disable=protected-access
    component._instance_name = _qualified_name(component._instance_name,
                                               self.id())
    for _, ch in component.outputs.items():
      ch.producer_component_id = component.id
    # pylint: enable=protected-access

  def run(self, component):
    if hasattr(component, 'components'):
      for c in component.components:
        self._rename_component(c)
    else:
      self._rename_component(component)
    return self._run_component(component)

  @contextlib.contextmanager
  def skip(self):
    old_within_skip_context = self._within_skip_context
    self._within_skip_context = True
    try:
      yield
    finally:
      self._within_skip_context = old_within_skip_context

Add regression tasks

  • Add a regression dataset.
  • Add functionality for regression tasks in auto_estimator_trainer.

import nitrolMl giving error for the following from ml_metadata

import Nitroml giving error for the following files:
from ml_metadata.google.services.mlmd_service.proto import mlmd_service_pb2
from ml_metadata.google.tfx import metadata_store
from ml_metadata.proto import metadata_store_pb2

When i checked the details ml_metadata package does not have those files.

Starter Project

3 week starter project:

  • Put together an end-to-end example running locally using BeamDagRunner.
  • Create Dataset object that downloads the 72 OpenML-CC18 datasets from Openml.org.
  • Dataset should produce ExampleGen components.
  • Should have a components property that returns TFX components required to add to pipeline.
  • Should cache downloads (not download again if already present).
  • Final pipeline = example_gens + [stats_gen, schema_gen + (transform) + trainer + evaluator]

Create a test that runs a pipeline using the Beam runner.

Create a absltest.TestCase subclass that makes it easy to define pipeline tests. It should take a tfx.Pipeline and run it using the Beam runner. It should fail if any of the components fail.

While this can currently be limited to the starter project, eventually we will want to add this to
a nitroml.testing package to help benchmark authors.

MetaLearning Evaluation

Extract Area under-Learning Curve (ALC) from Keras Tuner for evaluation.

The plot should show on the x axis trial_number and on the y axis the objective which will be the best AUC or Accuracy found so far. The plot should be monotonically increasing. Can be a Colab notebook or the Custom Tuner can spit out the plot.

[MetaLearning] Custom Tuner

We want to find good configurations early on. So the custom tuner contains either one or two Keras Tuners:

  1. If Keras Tuner supports warm-starting (suggested trials): Use those first, then fallback to the original search space.
  2. Else create one Keras Tuner for the meta-learner-defined search space, and a second for the original search space.

This ensures that if our metalearner fails to find good configurations, it won't harm final performance if the user requests sufficient trials.

Add a `problem_statement` field on `Task`

Task should have problem_statement property which returns a ProblemStatement proto. Several TFX components consume this proto, and therefore we should use it.

A task is essentially a two-tuple of (dataset, problem_statement).

We only need to populate the minimal fields on the proto. A minimalist proto (in text format) would look like:

owner: "nitroml"
tasks {
  type {
    multi_class_classification {
      label: "Contraceptive_method_used"
      n_classes: 3
    }
  }
  name: "openml_cmc"
}

Keras Trainer and AutoData adapter

Examples:

Trainer

# Lint as: python3
import os

import tensorflow.compat.v2 as tf
import tensorflow.google.compat.v2 as tfg

import data_provider
from tfx.components.trainer import executor as trainer_executor


def get_hparams() -> tfg.HParams:
  """Defines the set of hyper parameters recognized by this model.

  NOTE these hyperparameters are just for exemplifying purposes,
  and should be tuned.

  Returns:
    An tf.HParams instance.
  """
  return tfg.HParams(
      train_batch_size=128,
      eval_batch_size=128,
  )


# TFX Trainer will call this function.
def run_fn(fn_args: trainer_executor.TrainerFnArgs):
  """Train a Keras Model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
      - train_files: A list of uris for train files.
      - transform_output: An optional single uri for transform graph produced by
        TFT. Will be None if not specified.
      - serving_model_dir: A single uri for the output directory of the serving
        model.
      - eval_model_dir: A single uri for the output directory of the eval model.
        Note that this is for estimator only, Keras doesn't require it for TFMA.
      - eval_files:  A list of uris for eval files.
      - schema_file: A single uri for schema file.
      - train_steps: Number of train steps.
      - eval_steps: Number of eval steps.
      - base_model: Base model that will be used for this training job.
      - hyperparameters: An optional kerastuner.HyperParameters config.
  """

  model_hparams = get_hparams()

  data_provider = data_provider.DataProvider(
      transform_graph_dir=fn_args.transform_output)

  feature_columns = data_provider.get_numeric_feature_columns(
  ) + data_provider.get_embedding_feature_columns()
  input_layers = data_provider.get_input_layers()

  # All input_layers must be consumed for the Keras Model to work.
  assert len(feature_columns) >= len(input_layers)

  x = tf.keras.layers.DenseFeatures(feature_columns)(input_layers)
  for numnodes in [64, 64]:
    x = tf.keras.layers.Dense(numnodes)(x)
  output = tf.keras.layers.Dense(
      data_provider.forecast_horizon, activation=None, name='logits')(
          x)

  model = tf.keras.Model(input_layers, output)
  model.compile(
      loss=tf.keras.losses.MeanSquaredError(),
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[
          tf.keras.metrics.RootMeanSquaredError(),
          tf.keras.metrics.MeanSquaredError(),
          tf.keras.metrics.MeanAbsoluteError(),
          tf.keras.metrics.MeanSquaredLogarithmicError(),
      ])
  model.summary()

  # This log path might change in the future.
  log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir, update_freq='batch')

  train_dataset = data_provider.get_dataset(
      file_pattern=fn_args.train_files,
      batch_size=model_hparams.train_batch_size,
      num_epochs=None,
      shuffle=True)
  eval_dataset = data_provider.get_dataset(
      file_pattern=fn_args.eval_files,
      batch_size=model_hparams.eval_batch_size,
      num_epochs=1,
      shuffle=False)

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      epochs=1,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  signatures = {
      'serving_default':
          data_provider.get_serve_tf_examples_fn(model).get_concrete_function(
              tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

DataProvider/AutoData adapter

# Lint as: python3
"""An data provider for Keras Models.

The consumed artifacts include:
 * Dataset schema.
 * Dataset statistics.
 * TensorFlow Transform outputs.
"""

from typing import Any, Dict, List, Optional, Text

import tensorflow.compat.v2 as tf
import tensorflow_transform as tft

from google3.third_party.tensorflow_metadata.proto.v0 import schema_pb2

FeatureColumn = Any


class DataProvider():
  """Creates feature columns and specs from TFX artifacts."""

  def __init__(self, transform_graph_dir: Text):
    """Initializes the DataProvider from TFX artifacts.

    Args:
      transform_graph_dir: Path to the TensorFlow Transform graph artifacts.
    """

    # Parse transform.
    self._tf_transform_output = tft.TFTransformOutput(transform_graph_dir)

    # Parse schema.
    self._dataset_schema = self._tf_transform_output.transformed_metadata.schema

  @property
  def raw_label_keys(self) -> List[Text]:
    """The raw label key as defined in the ProblemStatement."""

    # TODO(weill): Make this label configurable.
    return ['future_sales']

  @property
  def transformed_label_keys(self) -> List[Text]:
    """The label key after applying TensorFlow Transform to the Examples."""

    return self.raw_label_keys

  @property
  def forecast_horizon(self) -> int:
    """The int forecast horizon for future sales."""

    # 28 days.
    return 28

  def get_input_layers(self) -> Dict[Text, tf.keras.layers.Input]:
    """Returns input layers for a Keras Model."""

    feature_spec = self._tf_transform_output.transformed_feature_spec().copy()
    feature_spec.pop(self.transformed_label_keys[0])
    input_layers = {}
    for colname, spec in feature_spec.items():
      input_layers[colname] = tf.keras.layers.Input(
          name=colname, shape=spec.shape, dtype=spec.dtype)
    return input_layers

  def get_numeric_feature_columns(self,
                                  include_integer_columns: bool = False
                                 ) -> List[FeatureColumn]:
    """Creates a set of feature columns.

    Args:
      include_integer_columns: Whether integer columns in the examples should be
        included in the numeric columns as floats.

    Returns:
      A list of feature columns.
    """

    numeric_columns = []
    for feature in self._dataset_schema.feature:

      feature_name = feature.name
      if feature_name in self.raw_label_keys:
        continue

      feature_storage_type = _get_feature_storage_type(self._dataset_schema,
                                                       feature_name)

      if feature_storage_type == tf.int64 and not include_integer_columns:
        continue

      # NOTE: Int features are treated as both numerical and categorical. For
      # example MNIST stores its features as int16 features, but are continuous.
      if feature_storage_type == tf.float32 or feature_storage_type == tf.int64:

        # Numerical feature.
        dim = _get_feature_dim(self._dataset_schema, feature_name)

        # Numerical feature normalized in 0-1.
        current_feature = tf.feature_column.numeric_column(
            feature_name, shape=(dim,), dtype=feature_storage_type)
        numeric_columns.append(current_feature)
    return numeric_columns

  def get_sparse_categorical_feature_columns(
      self, include_integer_columns: bool = True) -> List[FeatureColumn]:
    """Creates a set of sparse categorical feature columns.

    Args:
      include_integer_columns: Whether integer columns in the examples should be
        included in the categorical columns.

    Returns:
      A list of feature columns.
    """

    feature_columns = []
    for feature in self._dataset_schema.feature:

      feature_name = feature.name
      if feature_name in self.raw_label_keys:
        continue

      feature_storage_type = _get_feature_storage_type(self._dataset_schema,
                                                       feature_name)

      if feature_storage_type == tf.float32:
        continue

      if feature_storage_type == tf.int64:
        if not include_integer_columns:
          continue

        # Categorical or categorical-set feature stored as an integer(s).
        num_buckets = (
            self._tf_transform_output.num_buckets_for_transformed_feature(
                feature_name))
        new_feature_column = tf.feature_column.categorical_column_with_identity(
            feature_name, num_buckets=num_buckets)
      else:
        # Note TFT automatically converts string columns to int columns.
        raise ValueError('Unsupported dtype.')
      feature_columns.append(new_feature_column)
    return feature_columns

  def get_embedding_feature_columns(self,
                                    include_integer_columns: bool = True
                                   ) -> List[FeatureColumn]:
    """Creates a set of embedding feature columns.

    Args:
      include_integer_columns: Whether integer columns in the examples should be
        included in the embeddings.

    Returns:
      A list of feature columns.
    """

    return [
        tf.feature_column.embedding_column(column, dimension=10) for column in
        self.get_sparse_categorical_feature_columns(include_integer_columns)
    ]

  def get_dataset(self,
                  file_pattern: Text,
                  batch_size: int,
                  num_epochs: Optional[int] = None,
                  shuffle: Optional[bool] = True,
                  shuffle_buffer_size: int = 10000,
                  shuffle_seed: Optional[int] = None,
                  prefetch_buffer_size: Optional[int] = None,
                  reader_num_threads: Optional[int] = None,
                  parser_num_threads: Optional[int] = None,
                  sloppy_ordering: bool = False,
                  drop_final_batch: bool = False) -> tf.data.Dataset:
    """Returns an input_fn that returns a `tf.data.Dataset` from Examples.

    Args:
      file_pattern: List of files or patterns of file paths containing Example
        records. See tf.io.gfile.glob for pattern rules.
      batch_size: An int representing the number of records to combine in a
        single batch.
      num_epochs: Integer specifying the number of times to read through the
        dataset. If None, cycles through the dataset forever. Defaults to None.
      shuffle: A boolean, indicates whether the input should be shuffled.
        Defaults to True.
      shuffle_buffer_size: Buffer size of the ShuffleDataset. A large capacity
        ensures better shuffling but would increase memory usage and startup
        time.
      shuffle_seed: Randomization seed to use for shuffling.
      prefetch_buffer_size: Number of feature batches to prefetch in order to
        improve performance. Recommended value is the number of batches consumed
        per training step. Defaults to auto-tune.
      reader_num_threads: Number of threads used to read Example records. If >1,
        the results will be interleaved. Defaults to 1.
      parser_num_threads: Number of threads to use for parsing Example tensors
        into a dictionary of Feature tensors. Defaults to 2.
      sloppy_ordering: If True, reading performance will be improved at the cost
        of non-deterministic ordering. If False, the order of elements produced
        is deterministic prior to shuffling (elements are still randomized if
        shuffle=True. Note that if the seed is set, then order of elements after
        shuffling is deterministic). Defaults to False.
      drop_final_batch: If True, and the batch size does not evenly divide the
        input dataset size, the final smaller batch will be dropped. Defaults to
        False.

    Returns:
      Returns an input_fn that returns a `tf.data.Dataset`.
    """

    # Since we're not applying the transform graph here, we're using Transform
    # materialization.
    feature_spec = self._tf_transform_output.transformed_feature_spec().copy()

    def _pop_labels(features):
      label_keys = self.transformed_label_keys
      labels = []
      for key in label_keys:
        labels.append(features.pop(key))
      return features, tf.concat(labels, axis=1)

    def _gzip_reader_fn(files):
      return tf.data.TFRecordDataset(files, compression_type='GZIP')

    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern,
        batch_size,
        feature_spec,
        reader=_gzip_reader_fn,
        num_epochs=num_epochs,
        shuffle=shuffle,
        shuffle_buffer_size=shuffle_buffer_size,
        shuffle_seed=shuffle_seed,
        prefetch_buffer_size=prefetch_buffer_size,
        reader_num_threads=reader_num_threads,
        parser_num_threads=parser_num_threads,
        sloppy_ordering=sloppy_ordering,
        drop_final_batch=drop_final_batch)
    return dataset.map(_pop_labels)

  def get_serve_tf_examples_fn(self, model: tf.keras.Model):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = self._tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
      """Returns the output to be used in the serving signature."""
      feature_spec = self._tf_transform_output.raw_feature_spec()
      feature_spec.pop(self.transformed_label_keys[0])
      parsed_features = tf.io.parse_example(serialized_tf_examples,
                                            feature_spec)
      transformed_features = model.tft_layer(parsed_features)
      return model(transformed_features)

    return serve_tf_examples_fn


def _get_feature_storage_type(schema: schema_pb2.Schema,
                              feature_name: Text) -> tf.dtypes.DType:
  """Get the storage type of at tf.Example feature."""

  for feature in schema.feature:
    if feature.name == feature_name:
      if feature.type == schema_pb2.FeatureType.BYTES:
        return tf.string
      if feature.type == schema_pb2.FeatureType.FLOAT:
        return tf.float32
      if feature.type == schema_pb2.FeatureType.INT:
        return tf.int64
  raise ValueError('Feature not found: {}'.format(feature_name))


def _get_feature_dim(schema: schema_pb2.Schema, feature_name: Text) -> int:
  """Get the dimension of the tf.Example feature."""

  for feature in schema.feature:
    if feature.name == feature_name:
      return feature.shape.dim[0].size
  raise ValueError('Feature not found: {}'.format(feature_name))

Create pip package for nitroml

We need to claim the name nitroml. This will require creating a script that can create a .whl file from the nitroml source.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.