Git Product home page Git Product logo

starlake-ai / starlake Goto Github PK

View Code? Open in Web Editor NEW
31.0 8.0 19.0 148.36 MB

Declarative text based tool for data analysts and engineers to extract, load, transform and orchestrate their data pipelines.

Home Page: https://starlake-ai.github.io/starlake/

License: Apache License 2.0

Dockerfile 0.09% Scala 87.54% Shell 0.51% Java 2.17% Python 3.32% Mustache 0.16% Jinja 5.77% Batchfile 0.24% PowerShell 0.20%
scala spark etl bigquery hdfs redshift snowflake synapse

starlake's Introduction

Build Status Maven Central Starlake Spark 3 License

Starlake is a declarative text based tool that enables analysts and engineers to extract, load, transform and orchestrate their data pipelines.

Starlake is a configuration only Extract, Load, Transform and Orchestration Declarative Data Pipeline Tool. The workflow below is a typical use case:

  • Extract your data as a set of Fixed Position, DSV (Delimiter-separated values) or JSON or XML files
  • Define or infer table schemas fom text files (csv, json, xml, fixed-width ...)
  • Load: Define transformations at load time using YAML and start loading files into your datawarehouse.
  • Transform: Build aggregates using regular SQL SELECT statements and let Starlake build your tables with respect to your selected strategy (Append, Overwrite, Merge ...).
  • Orchestrate: Let Starlake handle your data lineage and run your data pipelines on your favorite orchestrator (Airflow, Dagster ... ).

You may use Starlake for Extract, Load and Transform steps or any combination of these steps.

How it works

The advent of declarative programming, exemplified by tools like Ansible and Terraform, has revolutionized infrastructure deployment by allowing developers to express intended goals without specifying the order of code execution. This paradigm shift brings forth benefits such as reduced error prone coding tasks, significantly shortened development cycles, enhanced code readability, and increased accessibility for developers of all levels.

Starlake is a YAML-based declarative tool designed for expressing Extract, Load, Transform, and Orchestration tasks. Drawing inspiration from the successes of declarative programming in infrastructure, Starlake aims to bring similar advantages to the realm of data engineering.

This paradigm shift encourages a focus on defining goals for data warehouses, rather than the intricacies of implementation details.

The YAML DSL is self-explanatory and easy to understand. This is best explained with an example:

Extract

Let's say we want to extract data from a Postgres Server database on a daily basis

extract:
  connectionRef: "pg-adventure-works-db" # or mssql-adventure-works-db i extracting from SQL Server
  jdbcSchemas:
    - schema: "sales"
      tables:
        - name: "salesorderdetail"              # table name or simple "*" to extract all tables
          partitionColumn: "salesorderdetailid" # (optional)  you may parallelize the extraction based on this field
          fetchSize: 100                        # (optional)  the number of rows to fetch at a time
          timestamp: salesdatetime              # (optional) the timestamp field to use for incremental extraction
      tableTypes:
        - "TABLE"
        #- "VIEW"
        #- "SYSTEM TABLE"
        #- "GLOBAL TEMPORARY"
        #- "LOCAL TEMPORARY"
        #- "ALIAS"
        #- "SYNONYM"

That's it, we have defined our extraction pipeline.

Load

Let's say we want to load the data extracted from the previous example into a datawarehouse

---
table:
  pattern: "salesorderdetail.*.psv" # This property is a regular expression that will be used to match the file name.
  schedule: "when_available"        # (optional) cron expression to schedule the loading
  metadata:
    mode: "FILE"
    format: "CSV"       # (optional) auto-detected if not specified
    encoding: "UTF-8"
    withHeader: yes     # (optional) auto-detected if not specified
    separator: "|"      # (optional) auto-detected if not specified
    writeStrategy:      
      type: "UPSERT_BY_KEY_AND_TIMESTAMP"
      timestamp: signup
      key: [id]         
                        # Please replace it by the adequate file pattern eq. customers-.*.psv if required
  attributes:           # Description of the fields to recognize
    - name: "id"        # attribute name and column name in the destination table if no rename attribute is defined
      type: "string"    # expected type
      required: false   # Is this field required in the source (false by default, change it accordingly) ?
      privacy: "NONE"   # Should we encrypt this field before loading to the warehouse (No encryption by default )?
      ignore: false     # Should this field be excluded (false by default) ?
    - name: "signup"    # second attribute
      type: "timestamp" # auto-detected if  specified
    - name: "contact"
      type: "string"
      ...

That's it, we have defined our loading pipeline.

Transform

Let's say we want to build aggregates from the previously loaded data

transform:
  default:
    writeStrategy: 
      type: "OVERWRITE"
  tasks:
    - name: most_profitable_products
      writeStrategy:
        type: "UPSERT_BY_KEY_AND_TIMESTAMP"
        timestamp: signup
        key: [id]
SELECT          # the SQL query will be translated into the appropriate MERGE INTO or INSERT OVERWRITE statement
    productid,
    SUM(unitprice * orderqty) AS total_revenue
FROM salesorderdetail
GROUP BY productid
ORDER BY total_revenue DESC

Starlake will automatically apply the right merge strategy (INSERT OVERWRITE or MERGE INTO) based on writeStrategy property and the input /output tables .

Orchestrate

Starlake will take care of generating the corresponding DAG (Directed Acyclic Graph) and will run it whenever the tables referenced in the SQL query are updated.

Starlake comes with a set of DAG templates that can be used to orchestrate your data pipelines on your favorite orchestrator (Airflow, Dagster, ...). Simply reference them in your YAML files and optionally customize them to your needs.

The following dependencies are extracted from your SQL query and used to generate the corresponding DAG:

The resulting DAG is shown below:

Supported platforms

The Load & Transform steps support multiple configurations for inputs and outputs.

Anywhere

Documentation

Complete documentation available here

starlake's People

Contributors

abdelhakimbendjabeur avatar amine-hmila avatar aminesagaama avatar aureliesalmon avatar bounkongk avatar cchepelov avatar elarib avatar fanirisoa avatar fred904 avatar fupelaqu avatar github-actions[bot] avatar gitter-badger avatar hayssams avatar malon64 avatar mazenhoballah avatar mchabouni avatar mhdkassir avatar mmenestret avatar pchalcol avatar quentinsalais avatar rayan958 avatar sabino avatar scala-steward avatar seyguai avatar tiboun avatar yannbug avatar zedach avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

starlake's Issues

[FEATURE] - Check validity

Description

This ticket lists all validity checks that should be made on the project before executing it.

  • Check that connection references are valid and reference existing connections
  • Make sure that disposition and merge options are compatible
  • Make sure that every expected input files are defined depending on the task launched. Eg: an error is thrown in Attribute (Should never happen: empty list of attributes) but is related to the function call ai.starlake.schema.model.Attribute.sparkType(Attribute.scala:249)) and doesn't help the user to understand what is its mistakes and how to resolve it.

[FEATURE] - Improve performance when loading a huge number of files in GROUPED mode

Description

This feature concerns data loading jobs with a huge number of files to ingest (1000+) with GROUPED mode (true). In such case, a major part of time is passed on moving files between zones (landing to pending, ingesting to archive). This is due to the fact that files are moved sequentially ( simple foreach). In some cases, 30 min are spent on moving files whereas the actual ingestion process takes around 5 min.

Expected behavior

Parallelize file handling when possible.

Possible implementation

using scala palallel collections
Example in the ingest() method on IngestionWorkflow
ingestingPath.par.foreach { // back up file in archive}
instead of
ingestingPath..foreach { // back up file in archive}

Complexity estimation

T-Shirt Size: S

[BUG] - BigQuery - Missing `audit` table when using a custom sink name

Description

When I set a custom audit sink name using COMET_AUDIT_SINK_NAME, the audit table is missing.
I am running load jobs on Google BigQuery

Expected behavior

The job should insert in the two audit tables audit and rejected

Current Behavior

audit table is no longer created/populated. Check the attached screenshot for a more visual illustration of the current behaviour

missing_audit_table

Steps to reproduce

Steps to reproduce the behavior:

  1. Run a Load into a BigQuery table
  2. Make sure you have both tables (audit and rejected) with data in them
  3. Set the variable COMET_AUDIT_SINK_NAME to custom_audit
  4. Make sure you have created the custom_audit dataset in you BigQuery project
  5. Run the same Load as in (1)
  6. See that the audit table is missing the custom_audit dataset

Context

GCP - BigQuery -

Life cycle

Possible implementation

TBD

Complexity estimation

TBD

[BUG] - Accepted row counting is incorrect in Audit table

Description

I was looking at my audit table and something was wrong on the countAccepted column on my json files :
image

Expected behavior

On my last row I have 5 row processed, 5 accepted and 3 rejected. When looking at my data I actually just have 2 accepted.
So countAccepted has a problem

Possible implementation

When looking in the source code I think I found the problem. In ai/starlake/job/ingest/JsonIngestionJob.scala, line 164 you return (rejectedDS, toValidate) instead of (rejectedDS, validationResult). I don't understand all your code architecture but it seems to be the problem.

Thank you in advance.

[FEATURE] - Support Schema on write

Description

Many Database including Snowflake and other relational databases require the table schema to be created upfront.
Even when addressing BigQuery, some users would like the schema to be created upfront too even though it is not required.

This feature would add a new command that would create the required DDL. We call this command infer-ddl.
Databases usually evolve in time. This command should also address the issue of altering the table schema if it already exist in the database.

// Use mapping specifict BigQuery and generate create/alter table statements
starlake.sh infer-ddl --datawarehouse bigquery --connection bq

// Use mapping default mapping and generate create/alter table statements
starlake.sh infer-ddl --connection bq
starlake.sh infer-ddl --datawarehouse sql --connection bq --output ./sql

// use default mapping and generate and apply create/alter table statements
starlake.sh infer-ddl --datawarehouse sql --connection bq --apply

// use default mapping and generate and apply create/alter table statements using the provided moustache template
starlake.sh infer-ddl --datawarehouse sql --template my template.mustache --connection bq --apply

Solution proposition

We augment the types.yml file to include a mapping section with case specific to each data warehouse when necessary :

type:
  name: int
  pimitiveType: long
  mapping:
    bigquery: LONG # Specific to BigQuery
    synapse: INTEGER # Specific to Synapse
    sql: NUMERIC # used by default when no specific defined for the target data warehouse

From the schema we generate the final DDL statements and apply them gif requested using the provided connection

Complexity estimation

T-Shirt Size: M

[BUG] - JQ not installed on user host

Description

On startup, starlake.sh check the last version on maven repository and parse the result using "jq"

This fails if jq is not already installed.

Steps to reproduce

Steps to reproduce the behavior:

  1. Launch a fresh WSL on Windows
  2. Launch starlake.sh for the first time

Complexity estimation

XS

[FEATURE] - Add more fields in xlsx files to get around the excel sheet name limit and have more descriptive name

Description

Hello, in our process we use xlsx files to generate our schemas, and then from these schemas we generate SQL+ extraction scripts.
We are facing two limiting cases in this process (sometimes both at the same time):

  • the source names are not really descriptive and we would like to have a meaningful name for our table once ingested into BQ.
  • the source names are too long for Excel sheet name (limited to 31 characters).

Solution proposition

In order to keep the original source name to generate the correct SQL extraction scripts from the schemas and also change the table name in BQ during the ingestion process. I propose to add two fields in the xlsx files:

  • rename_source: optional value to replace the name and thus bypass the excel sheet name limit.
  • rename_target: optional value to replace the name in the ingestion process.

Considered alternatives

I went through the code base and documentation but couldn't find any information to solve my problem. I'm also not sure if its technically possible in the current implementation, so feel free to correct me or suggest a better solution and I'll be happy to take it.

[TU] - Enrich Unit test po

Description

Ingestion job is tested but not all details are currently tested such as the result of audit output.

This issue is related to #498.

[BUG] - Databricks absolute path must be written differently

Description

I try to distribute my ingestion areas on several containers in an azure data lake mounted on a dbfs path on the databricks platform (ex: /mnt/starlake).
For that I specified in environment variable SL_AREA_PENDING=dbfs://mnt/pending_area for example for the pending area.
Problem, when you specify an absolute path on databricks with the filesystem included in it, you have to write it with a single '/'.

%fs ls dbfs://mnt/starlake gives us this error:
IllegalArgumentException: Hostname not allowed in dbfs uri. Please use 'dbfs:/' instead of 'dbfs://' in uri: dbfs://mnt/starlake

While %fs ls dbfs:/mnt/starlake works correctly.

However I can't change the absolute path of SL_AREA_PENDING=dbfs:/mnt/pending_area because according to your code, in DatasetArea.scala line 49, a path is recognized as absolute only if it contains "://".
So if I include the variable this way, starlake will recognize this path as relative and concatenate me SL_DATASETS with SL_AREA_PENDING : "dbfs:/mnt/datasets_area/dbfs:/mnt/pending_area"
This will cause an error at runtime.

So it might be useful to change this condition to contains(":/") or just create a specific condition for databricks.

[BUG] - Infer-schema has an input file error while running on databricks

Description

I'm trying to run starlake on Databricks, and it works fine when using import and watch command lines.
However, I'm unable to use infer-schema awhich is pretty useful. Bottstrap command has some problems too but it is not really important for the moment.
For now I'm using manually edited yaml domain files but I would like to get them quicker.

Expected behavior

image

I'm using a command line that works perfectly fine on my local cli and I tested my access to my mounted azure storagecontainers. I'm suppose to get my domain.yml file in my output-dir. Specified in absolute path here.

Current Behavior

I get this error in my spark logs :

23/05/04 08:37:24 ERROR IngestionWorkflow: java.io.FileNotFoundException: dbfs:/mnt/sample-data/sales/customers-2018-01-01.psv (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at scala.io.Source$.fromFile(Source.scala:94)
at scala.io.Source$.fromFile(Source.scala:79)
at scala.io.Source$.fromFile(Source.scala:57)
at ai.starlake.job.infer.InferSchemaJob.$anonfun$infer$1(InferSchemaJob.scala:217)

I looked at the code and I didn't find any visible error so it may come from how you manage dbfs.
I tried to use {{incoming_path}} a variable that i setted in my env.comet.yml but it didn't change anything.
My path is correct and I don't have any authorization issues so I'm asking here.

Steps to reproduce

Steps to reproduce the behavior:

  1. Create a job with your assembly jar file, (jackson yaml jar in already included in my cluster), and put the arguments as shown in my screenshot.
  2. Mount an azure blob storage or adls gen2 with the actual jar files. I mounted two different containers but the error occurs even when I put an incoming directory in my starlake root project.
  3. Execute the job.
  4. When failing go to the spark/log and look at the Log4J outputs

[DOC] - Add documentation for all ENV VARS

Description

Settings and their corresponding env variables are not yet documented.
We will add documentation inside the reference-*.conf files and generate the HTML page based on the comments.
Documentation should start with a # and document the top level properties

Complexity estimation

T-Shirt Size: M

[DOC] - Databricks on Azure

Description

Where is documentation missing? What could be added or simplified?
What's the type of documentation requested? (scaladoc, technical on readthedocs, user guide, ...)
Please provide guidelines, package or classes if appropriate.

Complexity estimation

T-Shirt Size: XS, S, M, L. If you would recommend XL or more, we advise you to discuss with the community how to split this issue.
Don't forget to tag this issue as a 'good first issue' if you think it is appropriate!

[BUG] - Path error

Description

the error occur when the cmd command should run spark-submit

Current Behavior

Launch starlake.cmd for the first time

See error complete

Le chemin d’accès spécifié est introuvable.

Complexity estimation

S

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.