Git Product home page Git Product logo

ralf's Introduction

ralf

Ralf is a feature store for rapidly changing data. Ralf incrementally propagates raw data changes to derived feature tables which are queryable by downstream applications such as model training and inference.

Installation

ralf can be installed from source via pip install -e .. You can then run import ralf.

Quickstart

Ralf ingests data from user-defined Source objects, which can connect streaming services to recieve new data or generate data synthetically. Source data is transformed by operators to define downstream feature tables. In the example below, we create a simple feature pipeline which generates random numbers for a set of keys ["sam", "bob", "sarah"], and calculates the average value for each key.

import ralf 
from ralf import Operator, Source

import time
import random

# define a custom source with generates synthetic data (random numbers for each key) 
@ray.remote 
class FakeSource(Source): 
  def __init__(self, keys): 
    self.keys = keys 
    self.sleep_time = 0.01

  def next(self): 
    records = [Record(key=key, value=random.random()) for key in self.keys]
    time.sleep(self.sleep_time)
    return records

@ray.remote 
class Average(Operator):
  def on_record(self, record: Record) -> Record: 
    return sum(record.values) / len(record.values)

ralf_server = Ralf()
source = ralf_server.create_source(FakeSource, args=(["sam", "bob", "sarah"]))
window = source.window(window_size=100, slide_size=50)
average = window.map(Average).as_queryable("average")
ralf_server.deploy()

Once we've deployed the server, we can query feature tables from a RalfClient.

ralf_client = RalfClient() 
res = ralf_client.point_query(table="average", key="bob") # query a single key 
res = ralf_client.bulk_query(table="average") # query all keys 

Concepts

Ralf's API is centered around Table objects which can be treated like database tables or dataframes. Tables are defined in terms of one of more parents tables and an Operator, which defines how to process new parent data to update the table's values.

Sources

Sources feed raw data into ralf. Sources can be from streams (e.g. kafka), files (e.g. CSV files), or custom sources (e.g. synthetically generated data).

ralf_server = Ralf()

# Read data from kafka stream 
kafka_source = ralf_server.create_kafka_source(topic="kafka_topic") 

# Read data from CSV file 
csv_source = ralf_server.create_csv_source(filename="csv_filename")

# Ingest data from a custom source 
custom_source = ralf_server.create_source(SourceOperator, args=(...))

Operators

Opeartors are the featurization transformations for deriving features. Operators define how parent table data should be transformed into child table data. For example, we can define a feature table containing the results of running source data through a model:

class ModelOperator(Operator): 

  def __init__(self, model_file):
    self.model = load(model_file)
    
  def on_record(self, record: Record): 
    # featurization logic 
    feature = self.model(record.value)
    return Record(key, feature) 
    
feature_table = source.map(ModelOperator, args=("model.pt"))
    

Processing Policies

Ralf enables customizeable load shedding and prioritization policies at the per-table level.

ralf's People

Contributors

narang-amit avatar sarahwooders avatar simon-mo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

ralf's Issues

Sharded Input Data Streams

We want to have multiple replicas for the source, so we need to shard data streams with either kafka or a database (postgres, redis).

State Backend

ralf currently uses python dictionaries to manage internal state. We should persist state across sessions and allow for flexibility with where features are stored by creating a connector to different state back-ends.

To start with - probably makes sense to store on SQLLite or on-disk, then can move onto RocksDB/Redis based off what works best for python. We might want to later extend this connector to also work with Feast or other feature stores.

Attributes

  • Versioned (i.e. save historical data) vs. not
  • Can combine pre-filled data + updates
    • e.g. if you filled features with your offline pipeline, you can connect to the same StateConnector in the online pipeline to have pre-filled values that continue to be incrementally updated
    • persistence between runs
  • StateConnector should basically replace the internal python dictionary we are using now - so the interface for making tables queryable (adding .as_queryable(โ€ฆ)) should stay the same.

Object Outline

class StateConnector: 

  def __init__(self, name: str, connection, historical: bool = False): 
    """
    Custom sink that stores 

    :name: Unique handle 
    :connection: Connection to Redis/Hive/Feast/etc. instance to write updates 
    :historical: Whether to store all historical versions (e.g. for an offline store) or the curent version (e.g. for an online store)  
    """
    pass

  def on_record(self, record: Record): 
    self.connection.update(record) 

Example Usage

source_conn = StateConnector(hive_connection, historical=True)
embedding_conn = StateConnector(redis_connection, historical=False) 

source = ralf.from_csv(filename="...", connector=source_conn) 
embeddings = ralf.map(MyOperator, connector=embedding_conn)

Improve SQLite3 Connector Latency

Current benchmarks show SQLite3 lagging far behind other connectors:

Look into optimizing this connection, perhaps by not flushing to disk.

HTTPSource

Add built-in HTTPSource that extends the Source object. The goal is to allow streaming data sources without requiring the user to install kafka (currently required by the KafkaSource).

Recommended task breakdown (subject to change):

  • Specify endpoint that the source listens to

  • Add a .listen(self) endpoint to receive post requests at the endpoint

  • Update the .next(self) function to return records that have been received form the endpoint

  • Write test script which posts events from the client side and ensures the events are received by the HTTPSource and passed through ralf.

[Offline Pipeline] Static sources / one-time execution

The offline pipeline is built for offline development of features for experimentation and model training. The offline pipeline works with a static data source and writes the resulting features to either a CSV or an external DB connection.

The offline pipeline should be easily transferable to the online setting by changing the source type to be dynamic (e.g. a kafka stream). The offline pipeline will complete execution once all the data is read and processed.

Example

# define connectors 
source_conn = TableConnector(hive_connection, historical=True)
embedding_conn = TableConnector(redis_connection, historical=False) 

# offline pipeline requires that all sources are static
pipeline = Ralf(offline=True) 
source = ralf.csv_source(filename="data.csv", static=True, connector=source_conn) 
user_clicks = source.groupby(key="user") 
embedding = user_clicks.map(MyOperator, args=(...), connector=embedding_conn)
pipeline.run()

Feast integration

Write updates (inserts/deletes) into the Feast online and offline stores.

This could potentially be implemented by reading the providers in the Feast config file and writing to them directly. e.g.

from ralf import Ralf, FeastConnector 

ralf = Ralf()
connector = FeastConnector(config="feature_store.yaml") # read providers and connect

[Offline Pipeline] Batch processing

Offline processing can be made more efficient by batch-processing data, so we should offer an option to define a batch update function for the operators.

  • user-defined on_records(self, records: List[Record]) โ†’ List[Record]
  • set batch size for operators
class Operator: 

  def __init__(self, batch_size): 
    self.batch_size = batch_size

  # user defined transformation   
  def on_record(self, record: Record): 
      pass 

  # can be overwritten by user 
  def on_records(self, records: List[Record]): 
    for record in records: 
      self.on_record(record) 

ralf v2 client

Current ralf v2 does not have a support for a client which can query features, post feedback, etc.

The client should be able to call the following methods:

  • server.point_query(key, table_name)
  • server.bulk_query(table_name)
  • server.feedback(key, table_name)

Improve Logging

The on_record() function in Operator sometimes silently fails, so you need to add a try/catch. We should either make sure that errors are printed and cause the server to terminate.

We could do this by adding a try/catch whenever on_record is called by the parent Operator class, and potentially add a log and error file in the metrics folder.

ralf refactoring

  • Generate docstrings for API documentation
  • Create separate client/ folder for RalfClient
  • Remove schemas
  • Set primary key (or select columns) for Tables
  • Move simulation code to separate repo
  • Renaming
    • Operator -> Transform (configure policy)
  • ProcessingPolicy object - TBD

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.