Git Product home page Git Product logo

souvik-databricks / dlt-with-debug Goto Github PK

View Code? Open in Web Editor NEW
37.0 1.0 7.0 91 KB

A lightweight helper utility which allows developers to do interactive pipeline development by having a unified source code for both DLT run and Non-DLT interactive notebook run.

Home Page: https://pypi.org/project/dlt-with-debug/

License: MIT License

Python 100.00%
big-data big-data-processing databricks delta-live-tables dlt etl etl-pipeline python3 spark

dlt-with-debug's Introduction


Delta Live Table

DLT with Debug

Running DLT workflows from interactive notebooks.

Table of Contents

  1. About the project
  2. Demo Notebook
  3. Installation
  4. Usage
  5. Sample Pipeline Example
  6. Quick API guide
  7. Functionalities
  8. Limitation

About The Project

Delta Live Tables (DLTs) are a great way to design data pipelines with only focusing on the core business logic. It makes the life of data engineers easy but while the development workflows are streamlined in DLT, when it comes to debugging and seeing how the data looks after each transformation step in a typical DLT pipeline it becomes very tedious as we dont have the DLT package available in our interactive environment.

Enter dlt-with-debug a lightweight decorator utility which allows developers to do interactive pipeline development by having a unified source code for both DLT run and Non-DLT interactive notebook run.

(back to top)

Built With

(back to top)

Sample Demo Notebook

Click here to go to a sample notebook which you can import in your workspace to see the utility in action

Installation

pip install in your Databricks Notebook

PyPI

%pip install dlt-with-debug

(back to top)

Prerequisites

(back to top)

Usage

  • In our notebooks containing DLT Jobs the imports changes slightly as below and also the extra decorator @dltwithdebug(globals()) is added to the functions

    # Imports
    from dlt_with_debug import dltwithdebug, pipeline_id, showoutput
    
    if pipeline_id:
      import dlt
    else:
      from dlt_with_debug import dlt
    
    
    # Now define your dlt code with one extra decorator "@dltwithdebug(globals())" added to it
    
    @dlt.create_table(comment = "dlt pipeline example")
    @dltwithdebug(globals())
    def click_raw_bz(): 
         return (
             spark.read.option("header","true").csv("dbfs:/FileStore/souvikpratiher/click.csv")
    )
    
    # See the output
    showoutput(click_raw_bz)
    
    # Get the output data to a dataframe
    df = click_raw_bz()

Note:

  1. Use the dlt.create_table() API instead of dlt.table() as dlt.table() sometimes gets mixed with spark.table() in the global namespace.
  2. Always pass the globals() namespace to dltwithdebug decorator like this @dltwithdebug(globals())

(back to top)


Sample DLT with debug DLT pipeline example

Code:

Cmd 1

%pip install -e git+https://github.com/souvik-databricks/dlt-with-debug.git#"egg=dlt_with_debug"

Cmd 2

from pyspark.sql.functions import *
from pyspark.sql.types import *

# We are importing 
# dltwithdebug as that's the entry point to interactive DLT workflows
# pipeline_id to ensure we import the dlt package based on environment
# showoutput is a helper function for seeing the output result along with expectation metrics if any is specified
from dlt_with_debug import dltwithdebug, pipeline_id, showoutput

if pipeline_id:
  import dlt
else:
  from dlt_with_debug import dlt

Cmd 3

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"

Cmd 4

# Notice we are using dlt.create_table instead of dlt.table

@dlt.create_table(
  comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
  table_properties={
    "quality": "bronze"
  }
)
@dltwithdebug(globals())
def clickstream_raw():
  return (
    spark.read.option("inferSchema", "true").json(json_path)
  )

Cmd 5

# for displaying the result of the transformation 
# use showoutput(func_name)
# for example here we are using showoutput(clickstream_raw) 
showoutput(clickstream_raw)

Alt Text

Cmd 6

@dlt.create_table(
  comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
  table_properties={
    "quality": "silver"
  }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
@dlt.expect_all({'valid_prev_page_id': "previous_page_id IS NOT NULL"})
@dltwithdebug(globals())
def clickstream_clean():
  return (
    dlt.read("clickstream_raw")
      .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
      .withColumn("click_count", expr("CAST(n AS INT)"))
      .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
      .withColumnRenamed("curr_title", "current_page_title")
      .withColumnRenamed("prev_title", "previous_page_title")
      .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
  )

Cmd 7

showoutput(clickstream_clean)

Alt Text


Important to note that here you can see we are also seeing how many records will the expectations affect.


(back to top)

Same sample DLT with debug DLT pipeline executed as part of a delta live table

Alt Text

Below we can see the expectation results also match up with the expectation metrics that we got from dltwithdebug earlier with showoutput(clickstream_clean) Expectation Results

(back to top)

Quick API guide

Table syntax

@dlt.create_table(   # <-- Notice we are using the dlt.create_table() instead of dlt.table()
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
@dltwithdebug(globals())    # <-- This dltwithdebug(globals()) needs to be added
def <function-name>():
    return (<query>)

View syntax

@dlt.create_view(    # <-- Notice we are using the dlt.create_view() instead of dlt.view()
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
@dltwithdebug(globals())    # <-- This dltwithdebug(globals()) needs to be added
def <function-name>():
    return (<query>)

Getting results syntax

showoutput(function_name)  # <-- showoutput(function_name) 
                           # Notice we are only passing the function name
                           # The name of the function which is wrapped by the dltdecorators
                           
                           # For example:
                           # @dlt.create_table()
                           # @dltwithdebug(globals())
                           # def step_one():
                           #    return spark.read.csv()

                           # showoutput(step_one)

Import syntax

# We are importing 
# dltwithdebug as that's the entry point to interactive DLT workflows
# pipeline_id to ensure we import the dlt package based on environment
# showoutput is a helper function for seeing the output result along with expectation metrics if any is specified
from dlt_with_debug import dltwithdebug, pipeline_id, showoutput

if pipeline_id:
  import dlt
else:
  from dlt_with_debug import dlt

(back to top)

Functionality

As of now the following DLT APIs are covered for interactive use:

  1. Currently Available:

    • dlt.read
    • dlt.read_stream
    • dlt.create_table
    • dlt.create_view
    • dlt.table <-- This one sometimes gets overridden with spark.table so use dlt.create_table instead.
    • dlt.view
    • dlt.expect
    • dlt.expect_or_fail
    • dlt.expect_or_drop
    • dlt.expect_all
    • dlt.expect_all_or_drop
    • dlt.expect_all_or_fail
  2. Will be covered in the upcoming release:

    • dlt.create_target_table
    • dlt.create_streaming_live_table
    • dlt.apply_changes

Limitation

DLT with Debug is a fully python based utility and as such it doesn't supports spark.table("LIVE.func_name") syntax.

So instead of spark.table("LIVE.func_name") use dlt.read("func_name") or dlt.read_stream("func_name")

License

Distributed under the MIT License.

(back to top)

Drop a ⭐️ if you liked the project and it helped you to have a smoother experience while working with DLTs

dlt-with-debug's People

Contributors

souvik-databricks avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

dlt-with-debug's Issues

create_stream_live_table

Has this being implemented yet? Also how different is this from the standard dlt.create_table or dlt.create_view?

approxQuantile works in DLT debug, but does not work within the databricks website itself.

Hi,

First of all, please do allow me to thank you greatly for this package, it is very convenient to be able to debug the code meant to create a pipeline of delta live table without having the run the entire thing. However I am currently experiencing some problems that I find hard to resolve.

To give more context, please check the code below:

@dlt.table(name = "customer_order_silver_v2")
def capping_unitPrice_Qt():
    df =  dlt.read("customer_order_silver")
    boundary_unit = [0,0]
    boundary_qty = [0,0]
    boundary_unit = df.select(col("UnitPrice")).approxQuantile('UnitPrice',[0.05,0.95], 0.25)

    boundary_qty = df.select(col("Quantity")).approxQuantile('Quantity',[0.05,0.95], 0.25)
    print(boundary_unit)
    print(boundary_unit[0])
    print(boundary_unit[1])
    

    df = df.withColumn('UnitPrice', F.when(col('UnitPrice') > boundary_unit[1], boundary_unit[1])
                                       .when(col('UnitPrice') < boundary_unit[0], boundary_unit[0])
                                       .otherwise(col('UnitPrice')))
    
    df = df.withColumn('Quantity', F.when(col('Quantity') > boundary_qty[1], boundary_qty[1])
                                       .when(col('Quantity') < boundary_qty[0], boundary_qty[0])
                                       .otherwise(col('Quantity')))
                                          
    return df

When I run this the code for this DLT, the approxQuantile() in it seems to be not working. What I get after running this:
image

Yet somehow, after I use the debug package and rewrite the code into:

#this way of writing might be too complex. An alternative solution is to write the DLT as a general function and then pass it as a function. 
@dlt.create_table(name = "customer_order_silver_v2")
@dltwithdebug(globals())
# @dlt.table(name = "customer_order_silver_v2")
def capping_unitPrice_Qt():
    df =  dlt.read("wtchk_customer_order_filtered")


    boundary_unit = [0,0]
    boundary_qty = [0,0]
    boundary_unit = df.select(col("UnitPrice")).approxQuantile('UnitPrice',[0.05,0.95], 0.25)
 

    boundary_qty = df.select(col("Quantity")).approxQuantile('Quantity',[0.05,0.95], 0.25)
    print(boundary_unit)
    print(boundary_unit[0])
    print(boundary_unit[1])
    



    df = df.withColumn('UnitPrice', F.when(col('UnitPrice') > boundary_unit[1], boundary_unit[1])
                                       .when(col('UnitPrice') < boundary_unit[0], boundary_unit[0])
                                       .otherwise(col('UnitPrice')))
    
    df = df.withColumn('Quantity', F.when(col('Quantity') > boundary_qty[1], boundary_qty[1])
                                       .when(col('Quantity') < boundary_qty[0], boundary_qty[0])
                                       .otherwise(col('Quantity')))
                                          
    return df

showoutput(capping_unitPrice_Qt)

The code runs and it produces the table. as well as the value that I need:
image
I really cannot wrap my head around as what is not well written. I would appreciate any kind of input or advice. Thank you very much!

Add tests

Add unit tests for the functions and get coverage.

Ability to import dlt signatures without a current SparkSession

I've been happily using the dlt-with-debug library, but I'm running into an issue when importing the dlt signatures without an active SparkSession. I'm trying:

import dlt_with_debug.dlt_signatures as dlt

But the __init__.py calls the v2.py file which in turn tries to get the pipeline_id from the current SparkSession. But this fails when there is no active SparkSession.

I think my usecase can be achieved with an extra check in

pipeline_id = spark.conf.get("pipelines.id", None)
that first confirms there is an actual SparkSession before calling .conf.get() on it.

@souvik-databricks curious what you think! And I'd be happy to contribute via PR

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.