Git Product home page Git Product logo

scratchdata's Introduction

Scratch Data

Scratch Data is a wrapper that lets you stream data into and out of your analytics database. It takes arbitrary JSON as input and lets you perform analytical queries.

Quickstart

1. Run the server

Clone the repo:

$ git clone [email protected]:scratchdata/scratchdata.git
$ cd scratchdata

Start the service:

$ go run . 

With no configuration, this will automatically set up a local DuckDB database ready for reading and writing.

Run with custom config

Create a config.yaml file with all of your settings and run:

$ go run . config.yaml

2. Insert JSON data

$ curl -X POST "http://localhost:8080/api/data/insert/events?api_key=local" \
    --json '{"user": "alice", "event": "click"}'

The "events" table and columns are automatically created.

3. Query

curl -G "http://localhost:8080/api/data/query" \
     --data-urlencode="api_key=local" \
     --data-urlencode="query=select * from events" 

Other Features

Share Data

You can share data as CSV or JSON by creating "share links".

$ curl -X POST "http://localhost:8080/api/data/query/share?api_key=local" \
    --json '{"query": "select * from events", "duration": 120}'

This will produce a query ID that expires in 120 seconds. From there, send the following link to users:

http://localhost:8080/share/<query_id>/data.csv
http://localhost:8080/share/<query_id>/data.json

Copy Data

You can set up multiple databases and copy data between them. You can run a SQL query against your source database and Scratch will automatically create a table and insert data into a destination.

$ curl -X POST "http://localhost:8080/api/data/copy?api_key=local" \
    --json '{"query": "select * from events", "destination_id": 3, "destination_table": "events"}'

Next Steps

To see the full list of options, look at: https://docs.scratchdata.com

scratchdata's People

Contributors

breadchris avatar chumaumenze avatar ezhil56x avatar mehulmathur16 avatar mohanish2504 avatar poundifdef avatar sd1li avatar sujeetpillai avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

scratchdata's Issues

Dashboard

Dashboard to see usage, billing, etc

Ignore incompatible data

Sequel to the completion of #1, we want to ignore data when its type does not correspond with the column type.

Requirements

  • We should never fail to insert data.
  • If the types are completely incompatible (for example, an Int and Array type) then we should default to the zero value.

Example does not work

After inserting a record, quering my_table produces an error:

127.0.0.1 | GET     | /query           | Code: 60. DB::Exception: Table default.my_table does not exist. (UNKNOWN_TABLE) (version 23.10.1.1976 (official build))

Support for non-string data types

Today, all data is stored as strings. With this enhancement, we want to be able to support primitive data types.

Requirements

  1. We want to be able to support primitive data types in clickhouse: numbers, booleans, datetime, and null types. We will not change how we handle arrays and maps.
  2. In each user's config, we should have a new config value, string_types. If this is false, then the code will use our new typing logic. If it is true, then we will continue to store all data as strings.
  3. When creating new columns, they should not be nullable in clickhouse. They should have a default zero value.
  4. We should attempt to determine if string columns are formatted as dates. If so, we should create columns as datetime.
  5. We should never fail to insert data. If the types are completely incompatible (for example, an Int and Array type) then we should default to the zero value.

Guidelines

  • There are many ways to have Clickhouse read JSON data. Clickhouse has a JSONEachRow format which could be useful since it automatically infers types. Clickhouse also has guidelines for JSON here.
  • This set of features should be broken into small incremental self-contained PRs.
  • Please include automated tests.
  • I have started this work here, but the code is incomplete, does not compile, and behaves differently than the above requirements.

Insert data to Clickhouse

Implement the InsertBatchFromNDJson() function in clickhouse.go. This will take a stream of JSON as input and insert into the underlying clickhouse database.

  • The logic should be copied from the original code here. This should be the default behavior. https://github.com/scratchdata/ScratchDB/blob/main/importer/import_types.go
  • For backwards-compatibility, we should ensure that if an existing table only has String columns, that data will still be uploaded as expected. This is because the original version of the code used only String types and we want to make sure our logic will still work.

Implement S3 for Storage

Implement an S3 interface for Storage.

  • The settings should be configurable via config.go
  • We should instantiate this object in the main function if the s3 interface is present

Multiple Clickhouse Servers

Today, there is a single Clickhouse configuration in the file but that does not allow me to add more servers in order to scale.

Therefore we want to do the following:

  1. Have an interface in Go to get information about clickhouse server configurations. Right now that interface will read data from a config file, but in the future we will store information about servers in a database.

  2. The configuration and interface should be able to support multiple servers, rather than just 1.

  3. For now, we can just have our software choose the first Clickhouse server in a list when issuing queries. (In the future we may want to use round-robin or something else)

  4. Finally, the ClickhouseServerConfig should be rolled into this new scheme for getting information about Clickhouse servers.

Improve ScratchDB log

Hi, ScratchDB now is using golang 1.21, so we can use new std lib slog to output better logs than log lib.

Create cross-platform releases with CGO enabled

We want a github action which will cross-compile the package when a new tag is pushed and create a github release with the artifacts.

  • The package uses cgo modules, so we must be able to cross-compile for other platforms. This is probably done most easily with docker containers.
  • We may use goreleaser for this. I have an example goreleaser file already but it does not work for cross-compilation. However, we are not required to use goreleaser.
  • We already have the start of a github workflow, release.yml which uses goreleaser. You will want to modify this.

Memory leak for data insert daemon

There is a memory leak somewhere when consuming messages from the queue and inserting to the DB. Memory should basically be constant (proportional to the number of worker threads.)

Controlled ingest

Provide a url that allows data insertion. Make sure schema is strict. possible pre-signed or have some columns pre-filled. To be used for frontend unauthenticated clients.

Redshift Destination

We want to be able to read and write data from RedShift in bulk. Here are the steps to accomplish that:

Step 1: Create new RedShift Destination interface

  • Add a new package in pkg/destinations called redshift
  • Update destinations.go to be able to connect to the new redshift destination.
  • Implement the functions in the Destination interface for the new redshift package. For this first step, you may stub these functions (panic("not implemented"))

Step 2: Implement Queries from RedShift

Set up configuration parameters for RedShift.

  • The YAML configuration should look like this. If there are any fields that we need, then add them:
databases:
  - type: redshift
    api_keys:
      - test_api_key
    settings:
      delete_from_s3: true

      # Redshift connection
      redshift_host: myredshiftcluster.amazonaws.com 
      redshift_port: 5439 
      redshift_user: myuser 
      redshift_password: mypassword 
      redshift_dbname: mydb

      # S3 bucket
      s3_region: "us-east-1"
      s3_access_key_id: "ACCESS_KEY_ID"
      s3_secret_access_key: "SECRET_ACCESS_KEY"
      s3_bucket: "bucket_name"
  • Implement QueryJSON() and QueryCSV() functions. This should execute the input query and output the result in the right format. You may find it useful to create a generic private query() function, and then have the JSON/CSV functions format data while returning.
  • Data should be streamed to the writer and avoid allocations as much as possible. Do not buffer all data into memory before returning to the client. Do not generate the entire JSON/CSV payload in memory.
  • The Close() function should clean up the RedShift connection, and we should also close any open handlers related to executing the query.

Step 3: Implement Table Creation

Next, when the user inserts new data to big query, we want to create tables and columns based on the input data. Refer to the other packages in pkg/destinations for how to do this.

  • Implement the CreateEmptyTable() function. This should create a table with the given name along with an int64 column called __row_id. You may use this as a reference. This should only create the table if it doesn't already exist.
  • Implement the CreateColumns() function. This should read the input file and alter the RedShift table to add columns if they do not exist. Here is a reference for how to do this.

Implement Data Insertion

We want to bulk upload data to RedShift from the input file. This means implementing the InsertFromNDJsonFile() function. We will do this by uploading data to S3, loading from S3 to RedShift, and then optinally deleting the file from S3

  • Data should be streamed to S3 in the location provided by s3. Do not load the data into memory when uploading .
  • Insert data with the COPY sql command. The input type is newline-delimited JSON.
  • If the delete_from_s3 flag is true, then delete the file from S3 if the copy command has been successful.

Use a AWS Secret Manager for DB credentials

Today, the pkg/storage/database package is responsible for fetching the credentials for a database. That interface contains a function called GetDestinationCredentials().

We want to refactor the code to use a separate package for managing secrets.

Step 1: Create pkg/storage/vault package

This will follow the same pattern as the pkg/storage/blobstore package, where different storage types can be configured.

  • The new vault package should have an interface which has two functions: GetCredential(name string) string and SetCredential(name, value string)
  • There should be a default implementation called memory which just returns credentials from config.Destination in YAML. The implementation should basically be identical to the existing functionality here.
  • There should be a new configuration section in our yaml file called vault which has the same format as the database section (type and settings.)

Step 2: Refactor code to use the new vault package instead of database

  • Delete the GetDestinationCredentials() function from the Database interface.
  • Refactor the code to use our new vault instead. This means updating GetStorageServices() to include a Vault as part of the struct and replacing any use of GetDestinationCredentials() to use our new vault instead

Step 3: AWS Secrets Manager

Create a new implementation of the Vault interface which uses AWS Secrets manager

  • Create a new package under vault for AWS.
  • Use the go-v2 API to implement the vault interface
  • The configuration file should take AWS credentials (access id, secret key) along with a prefix. When keys are created or retrieved, we should add this prefix.

Implement data ingest in v2 branch

In the v2 refactor branch, implement the Write(), StartProducer(), and StopProducer() functions in queuestorage.go.

This functionality will mostly be copied from writer.go in the main branch.

We do not have to use the exact code from the existing functions if there are improvements in correctness.

Write()

  1. Takes a []byte which is JSON as input and returns (rowID string, error)
  2. The rowID is automatically generated as ulid.Make().String()
  3. This function will take the input JSON, add the fields __row_id and __batch_file, and write to a local file
  4. This will also rotate the file when it gets too big. The file will be rotated when any of the following conditions is met:
    1. The file is bigger than maxFileSize
    2. The file is older than maxFileAge
    3. The file has more than maxRows added (this is new functionality for v2)
  5. The rotation should be thread-safe

StartProducer()

  1. When files are rotated, we want to do the following in the background:
    1. Upload them to S3
    2. Place a message on a queue with information about that file

StopProducer()

  1. When stop is called, we want to make sure that we stop performing any more writes (the Write() function should return an error)
  2. We then want to make sure any remaining data is flushed to disk and then uploaded to S3 and the Queue before returning

BigQuery Destination

We want to be able to read and write data from BigQuery in bulk. Here are the steps to accomplish that:

Step 1: Create new BigQuery Destination interface

  • Add a new package in pkg/destinations called bigquery
  • Update destinations.go to be able to connect to the new bigquery destination.
  • Implement the functions in the Destination interface for the new bigquery package. For this first step, you may stub these functions (panic("not implemented"))

Step 2: Implement Queries from BigQuery

Set up configuration parameters for BigQuery.

  • The YAML configuration should look like this. If there are any fields that we need, then add them:
databases:
  - type: bigquery
    api_keys:
      - test_api_key
    settings:
      json_credentials: >
        {
          "type": "service_account",
          "project_id": "example-123",
          "private_key_id": "...",
          "private_key": "-----BEGIN PRIVATE KEY-----...",
          ... 
        }
  • Implement QueryJSON() and QueryCSV() functions. This should execute the input query and output the result in the right format. You may find it useful to create a generic private query() function, and then have the JSON/CSV functions format data while returning.
  • Data should be streamed to the writer and avoid allocations as much as possible. Do not buffer all data into memory before returning to the client. Do not generate the entire JSON/CSV payload in memory.
  • The Close() function should clean up the BigQuery connection, and we should also close any open handlers related to executing the query.

Step 3: Implement Table Creation

Next, when the user inserts new data to big query, we want to create tables and columns based on the input data. Refer to the other packages in pkg/destinations for how to do this.

  • Implement the CreateEmptyTable() function. This should create a table with the given name along with an int64 column called __row_id. You may use this as a reference. This should only create the table if it doesn't already exist.
  • Implement the CreateColumns() function. This should read the input file and alter the BigQuery table to add columns if they do not exist. Here is a reference for how to do this.

Implement Data Insertion

We want to bulk upload data to BigQuery from the input file. This means imlementing the InsertFromNDJsonFile() function.

  • Data should be streamed to BigQuery. Do not load the entire data set into memory when uploading to the database.
  • Implement this with the Load API. The input type is newline-delimited JSON.

Update configuration to be able to specify a transporter

In pkg/transport/ we have different implementations to move data from the source JSON to the destination. We want this to be configurable.

  1. In config.toml, we should be able to support different transport implementations. Today, we just have two:
    • Memory
    • QueueStorage
  2. The QueueStorage transport takes two additional parameters: a Queue type (SQS, Redis, RabbitMQ, etc) and a Storage type (S3, FTP, etc.)
  3. Thus, the software should be able to specify different types of configurations for transport, queue, and storage types and correctly instantiate their respective objects.

Right now, the Memory struct does not take any parameters, and QueueStorage relies on two (a queue and a storage backend.) However, this is not the only way we could design it.

Another option could be to have a single implementation for transport. In the case of a memory-only transport, we would create a MemoryQueue and MemoryStorage type that is passed in.

Postgrest Syntax Support

It would be great to support a subset of the Postgrest syntax. This ticket is to implement basic query support for Clickhouse.

There is a skeleton here, which includes the start of a parser and scaffolding: #96

The goal of this ticket is to be able to support the section "Column Filters" in this document.

The above draft pull request already parses the query into a struct. Now the code just needs to parse that struct, create a SQL statement, and perform the query.

For this iteration, do not implement other types of filters, such as AND, OR, or limit/offset. Assume the query is just SELECT * WHERE ... and we will be implementing the WHERE clause for different parameters.

Cache and clean up database connections

Today, every time we perform any DB operation, we create a new connection. Instead we should cache them.

Today, we do the following to get a database connection:

conn := s.DB.GetDatabaseConnection(dbID)
destinations.GetDestination(conn)

Is there a cleaner way to do this?

Second, once we establish a connection with a destination, we should try and maintain it, rather than closing the connection every time. Connection management should be transparent to anyone who is doing an insert/query to that destination.

Sharable URLs for Querying Data

Today, you can query the API by calling an endpoint like so:

http://localhost:8080/api/data/query?api_key=local&query=select * from events" 

We want to be able to create "sharable URLs." This is how it should work:

New Endpoint

  • Create a new POST endpoint for creating a query. It should be POST /api/data/query/share?api_key=local. The query should be part of the POST request body.
  • The POST request body should also take a duration, which is the number of seconds for the link to be valid for.
  • This endpoint should be authenticated via API key
  • The return value should be a new UUID which represents the query.

Save query to cache

  • We have a cache interface which has no implementation. Implement an in-memory cache. You may use a library such as https://github.com/eko/gocache
  • The cache key should be a UUID.
  • The cache value should store the database ID and query. It should also respect the expiration directive.

Endpoint to Query Data

  • Create a new endpoint: GET /share/<uuid>/data.<format>
  • This endpoint should not be authenticated
  • It should run the query for the given <uuid> and return the format specified by <format>. This will either be csv or json.
  • You should refactor the existing Select endpoint in order to reuse code for streaming data to the client

go vet: loop variable command captured by func literal

Full error: ./main.go:103:11: loop variable command captured by func literal

It means that if new commands are added in the future, the go routines will all run the same (likely the last) command in the slice and crash.

Separate queue vs s3 configs

Be able to use different vendors (AWS vs B2) for storage configs. Right now there is a single AWS config that is tightly coupled to both the queue and storage.

Permissions

Create API keys with permissions

  • read only
  • read/write
  • filtered by table, user, row

File Uploads to insert data

We want users to be able to upload files which can be inserted to Clickhouse.

Requirements

  1. The /data endpoint should accept file uploads in addition to a query parameter: ?file_format=x
  2. Data should be uploaded to S3. The file should stream directly from the user to S3 without buffering the entire file into memory or on disk.
  3. We should be able to configure a maximum file size. This should be a global configuration. If the file exceeds this size we should return an error and discard the partial upload on S3.
  4. Once the file is uploaded to S3, we should add a new message to the queue so that a worker can download the data and insert to the DB. We will need to change the payload of the message we enqueue to include the file path, size, type, and query parameters.
  5. When a worker consumes this message, we should download the file locally and transform it into the JSON format that the regular upload process uses. We should respect any query parameters sent by the user (for example, ?flatten=explode for JSON arrays.)
  6. When uploading data, we should add a new field to clickhouse, __uploaded_file with the file name.
  7. These transformers should have a common interface, so we can implement new file formats.
  8. Consider using [clickhouse-local](https://clickhouse.com/docs/en/operations/utilities/clickhouse-local) for this. For example: ./clickhouse local -q "SELECT * FROM file('data.csv', 'CSV') FORMAT JSONEachRow"

Go Interface

We should have a common interface for transforming data. That interface should be:

type FileUploadFormat interface {
   GetAbbreviation() string // "csv"
   GetDescription() string  // "Comma-Delimited Files"
   Convert(inputFile string, options map[string]string) outputFile string // Converts inputFile and saves to outputFile. "options" might have configuration specific to that file format. It will be unused for now. 
}

The first file format we want to support is csv. That will take CSV files where the first row is a header and outputs JSON, which will then be consumed by the rest of the pipeline. For example:

Input

name,event
Picard,click
Data,click

output

{__row_id: "123", __uploaded_file: "input.csv", "name": "Picard", "event": click"}
{__row_id: "456", __uploaded_file: "input.csv", "name": "Data", "event": click"}

Table optimizer

Look at query logs, table cardinality and produce updated DB schemas which are optimized

  • Automatic sorting key
  • Materialized views
  • Data types/column compression
  • Join optimizations/subqueries

Implement SQS for Queue

In v2, we want an implementation of the Queue interface for SQS.

  • The settings for the queue should live in config.go
  • We should instantiate the queue in main.go if the config file is present

Have separate "admin" vs individual users

When a user does any query via HTTP, it should use their own DB credentials with limits. Admin tasks performed by the service itself with a more privileged clickhouse user

Implement DB insert for QueueStorage

When new data is uploaded to our service, we want to download that data from S3 and insert into the database. This ticket is to implement the StartConsumer() and StopConsumer() functions of queuestorage.go.

  • StartConsumer() will start a background job which does the following:
    • Creates a pool of workers (the number of worker should be specified in a config file)
    • Consumes messages from the queue
    • For each message, downloads the JSON file from storage
    • Calls the "WriteNDJson..." function for the database
    • After the file has been uploaded, clean up the file from local disk. If there is an error, we should log it and do not delete the file.

When we stop the go process, StopConsumer() should be called and the process should gracefully shut down. We should immediately stop consuming new message, and any data that is in progress should be uploaded.

The current logic also makes sure we have a minimum amount of space left on disk before trying to download.

There is already a basic in-memory implementation here (though it does not ensure graceful shutdowns.)

For reference, here is the v1 logic for this process.

In the future, we will want a better failsafe to retry uploads or requeue them if there is a failure.

Named queries

Be able to save a query and call it via an endpoint

Interface for Users and additional user data

The [users] section of the config file only has a key value pair (API Key -> database name). We need to be able to have more data associated with users.

  1. We should have an interface for managing users (for example, func GetUserByAPIKey() User)
  2. We should have a proper User interface. Right now that will only contain a function GetDatabase() but we will add more user fields in the future.
  3. That interface should read data from a config file like it is today. In the future we'll want to use a database for managing user information.

genericize everything

Interfaces for getting config values, getting users, api keys, etc. Then use proper db for this.

Clickhouse Cluster Support

We want to be able to support cluster operations for creating tables.

  1. When you create a database/table in Clickhouse, you have to specify an "ON CLUSTER" parameter in order to make sure the command is replicated across all database servers. You will need to run clickhouse-keeper as part of your docker setup. keeper is Clickhouse's version of Zookeeper which allows different DB servers to coordinate.

  2. When we run a CREATE DATABASE/TABLE, or ALTER TABLE command in Clickhouse, we should be able to specify an "ON CLUSTER" parameter. We should add a new configuration param on the User object, GetCluster() string and use that as the cluster name. If that user does not have a cluster assigned (if the cluster is blank) then we should omit the ON CLUSTER query clause.

Enable and disable services via config

The program does three things:

  1. Exposes an API and runs a web server
  2. Continually uploads new data (from the POST endpoint)
  3. Continually inserts new data into the database
  • The config.toml file should have an option to enable or disable each of these independently. For example, the API already has a config to do this here.
  • The API should have a new "read-only" mode whereby GET requests are enabled, but POST requests always return an error.
  • If the API is enabled, but Uploads are disabled, then the POST endpoint should return an error. Additionally, the /healthcheck endpoint should also return an error.
  • Right now, the upload service is started from api.go (here). Instead, we should start it from main.go as we do with the other daemons.

Implement endpoint for data ingest

The goal is to implement the logic here into v2: https://github.com/scratchdata/ScratchDB/blob/main/ingest/ingest.go#L119

This endpoint is the main API which consumes data from the user. It does the following:

  1. The user must specify the table for data to be inserted. It can come from an X-SCRATCHDB-TABLE header, a ?table query param, or the JSON body itself {"table": "mytable"}. Check for the table name in that order.
  2. If the table is specified in a header or query param, then we assume the POST body contains our data. If the table is specified in the JSON body, we assume the user's data is contained in {"data": {...}}
  3. Validate that the input is valid JSON
  4. If the data is an array of objects, then call the Write() function for each object individually
  5. Before writing JSON, we want to normalize it. This is "flattening". We have two ways of doing so, based on the X-SCRATCHDB-FLATTEN or ?flatten parameter. See: https://github.com/scratchdata/ScratchDB/blob/main/ingest/ingest.go#L148
  6. Once validation is done and we have determined the right configurations (table, api key, how to flatten) then we should write the json to the Writer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.