Git Product home page Git Product logo

dlt-meta's People

Contributors

howardwu-db avatar msdotnetclr avatar nfx avatar ravi-databricks avatar ravi-db avatar rtdtwo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dlt-meta's Issues

provide SQL Query support

Introduce sql query string sqlstmt via onboarding which can have joins to other static tables.
This way after readStream we can do spark.sql(sqlstmt)
This can also open gold layer possibilities

Getting ValueError while trying to launch DLT-META using interactive notebook

Hello,

Here's a code:
onboarding_params_map = {
"database": "dlt_demo",
"onboarding_file_path": "onboarding_files/onboarding.json",
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
"bronze_dataflowspec_path": "dbfs:/FileStore/bronze",
"silver_dataflowspec_table": "silver_dataflowspec_table",
"silver_dataflowspec_path": "dbfs:/FileStore/silver".format("silver"),
"overwrite": "True",
"onboard_layer": "bronze_silver",
"env": "prod",
"version": "v1",
"import_author":"MANTAS"
}

OnboardDataflowspec(spark, onboarding_params_map).onboard_dataflow_specs()

I'm getting "ValueError: missing attributes : set()" while all parameters are specified

support for adding file metadata to bronze dataframe using autoloader reader (_metadata option support)

Need to provide ability so that file metadata can be added to dataframe
e.g

import dlt
@dlt.table
def bronze():
  return (spark.readStream.format("cloudFiles")
    # define the schema for the ~6 common columns across files.  All other input fields will be "rescued" into a JSON string column that can be queried via dot notation.
    .schema("Common1 string, Common2 string, _file_path string") # _file_path is a hidden auto-field but shows up in rescueData column JSON with this method.  Spoofing that I have the same column in my input file so i can drop this spoofed column later
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .option("cloudFiles.rescuedDataColumn","extraFields") # override default _rescuedData column name with whatever you want to call this column 
    .option("header","true")
    .load("/Volumes/vol/data/*.txt")
    .select("*","_metadata") # add file metadata information to output
    .drop("_file_path") # discard dummy input column to keep _file_path out of extraFields rescue data column
  )

Add non-Delta as Sink

Support non delta as sink using metadata approach.

  • In metadata if sink is non delta use Structure streaming approach with foreachbatch
  • Use DAB to deploy non-DLT pipelines to databricks workspace

Merge option for onboarding json into metadata table

Feature request, not bug. It would be good to be able to merge the onboarding Json into the metadata tables by key (data_flow_id). This way we don't have to overwrite the table and we could maintain central metadata tables for all source entities that we're ingesting. This could be done manually of course but would be nice to have it baked in into the framework.

Handling historical silver transformations with Full Refresh

I've built a process to generate our silver_transformations.json file. I have a question/future issue for what a DLT "Full refresh" could do if you have silver_transformations that might change for a table over time. If a "Full refresh" is done, then I believe it would only grab what is current in the dataflowspecTable for silver, which could miss any historical transformations put in place at their respective times. Am I missing something architecturally with DLT or DLT-meta that can handle this use case? I appreciate any help/conversation surrounding this topic.

Not able to onboard and deploy in interactive python terminal

Hello,

I'm trying to onboard and deploy DLT-META with all the default parameters in terminal, but getting this error:

Traceback (most recent call last):
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 500, in
main(*sys.argv[1:])
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 496, in main
MAPPINGcommand
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 467, in onboard
dltmeta.onboard(cmd)
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 158, in onboard
self._ws.dbfs.upload(cmd.dbfs_path + f"/dltmeta_conf/{onboarding_filename}", ob_file, overwrite=True)
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 320, in upload
with self.open(path, write=True, overwrite=overwrite) as dst:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 316, in open
return _DbfsIO(self, path, read=read, write=write, overwrite=overwrite)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 38, in init
elif write: self._created = api.create(path, overwrite=overwrite)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/service/files.py", line 382, in create
res = self._api.do('POST', '/api/2.0/dbfs/create', body=body, headers=headers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/core.py", line 128, in do
return retryable(self._perform)(method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/retries.py", line 54, in wrapper
raise err
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/retries.py", line 33, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/core.py", line 221, in _perform
raise self._make_nicer_error(response=response, **payload) from None
databricks.sdk.errors.platform.ResourceAlreadyExists: A file or directory already exists at the input path dbfs:/dlt-meta_cli_demo/dltmeta_conf/onboarding.json.
Error: exit status 1

Facing a issue in launching DLT-Meta Pipeline through command line

Parsing argument complete. args=Namespace(cloud_provider_name='aws', dbr_version='12.2.x-scala2.12', dbfs_path='dbfs:/dais-dlt-meta-demo-automated/')
Traceback (most recent call last):
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 345, in
main()
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 244, in main
api_client = get_api_client()
^^^^^^^^^^^^^^^^
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 17, in get_api_client
api_client = _get_api_client(config, command_name="labs_dlt-meta")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\nehamjain\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\databricks_cli\configure\config.py", line 102, in _get_api_client
verify = config.insecure is None
^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'insecure'

No such file or directory: '/demo' regardless of input arguments for databricks labs dlt-meta onboard

Hi, when running dlt-meta onboard, I was stuck on issue #51 that is not yet released, so I switched to your branch 51-dlt-meta-cli-onboard-command-issue-for-azure-databrickssdkerrorsplatformresourcealreadyexists

However, there are more issues there - regardless of input arguments for dlt-meta onboard, it tries to read from '/demo' that does not exist. Since I am on a mac, it is not possible for me to move the demo folder to root https://support.apple.com/en-gb/101400

I would assume that the 'dlt-meta onboard cmd' would try to read from folder specified in 'Provide onboarding files local directory', but that is not the case.

See my use of cmd and stacktrace:

sudo databricks labs dlt-meta onboard --debug --profile=test
15:50:54 INFO start pid=94725 version=0.223.1 args="databricks, labs, dlt-meta, onboard, --debug, --profile=test"
15:50:54 INFO Overriding login profile: test pid=94725
15:50:54 DEBUG Loading test profile from /Users/dev/.databrickscfg pid=94725 sdk=true
15:50:54 DEBUG Passing down environment variables: DATABRICKS_HOST, DATABRICKS_TOKEN pid=94725
15:50:54 DEBUG Forwarding subprocess: /Users/dev/.databricks/labs/dlt-meta/state/venv/bin/python3 /Users/dev/dlt-meta/src/cli.py {"command":"onboard","flags":{"log_level":"debug"},"output_type":""} pid=94725
15:50:54 DEBUG starting: /Users/dev/.databricks/labs/dlt-meta/state/venv/bin/python3 /Users/dev/dlt-meta/src/cli.py {"command":"onboard","flags":{"log_level":"debug"},"output_type":""} pid=94725
Provide onboarding file path (default: demo/conf/onboarding.template):
Provide onboarding files local directory (default: demo/):
Provide dbfs path (default: dbfs:/dlt-meta_cli_demo): dbfs:/dlt-meta_cli_demo_100
Provide databricks runtime version (default: 15.3.x-scala2.12):
Run onboarding with unity catalog enabled?
[0] False
[1] True
Enter a number between 0 and 1: 1
Provide unity catalog name: dlt_test
Provide dlt meta schema name (default: dlt_meta_dataflowspecs_f5eda329176c4678bd2cf2c499c270e2):
Provide dlt meta bronze layer schema name (default: dltmeta_bronze_2a2e2b8c465244e5a22743b4d0013eed):
Provide dlt meta silver layer schema name (default: dltmeta_silver_481fbe8078ef4267aec8966d4efb9ec1):
Provide dlt meta layer
[0] bronze
[1] bronze_silver
[2] silver
Enter a number between 0 and 2: 1
Provide bronze dataflow spec table name (default: bronze_dataflowspec):
Provide silver dataflow spec table name (default: silver_dataflowspec):
Overwrite dataflow spec?
[0] False
[1] True
Enter a number between 0 and 1: 1
Provide dataflow spec version (default: v1):
Provide environment name (default: prod): test
Provide import author name (default: dev): test_auth
Provide cloud provider name
[0] aws
[1] azure
[2] gcp
Enter a number between 0 and 2: 1
Update workspace/dbfs paths, unity catalog name, bronze/silver schema names in onboarding file?
[0] False
[1] True
Enter a number between 0 and 1: 1
Traceback (most recent call last):
File "/Users/dev/dlt-meta/src/cli.py", line 503, in
main(*sys.argv[1:])
File "/Users/dev/dlt-meta/src/cli.py", line 499, in main
MAPPINGcommand
File "/Users/dev/dlt-meta/src/cli.py", line 470, in onboard
dltmeta.onboard(cmd)
File "/Users/dev/dlt-meta/src/cli.py", line 160, in onboard
self._ws.dbfs.copy(cmd.onboarding_files_dir_path,
File "/Users/dev/.databricks/labs/dlt-meta/state/venv/lib/python3.9/site-packages/databricks/sdk/mixins/files.py", line 603, in copy
with child.open(read=True) as reader:
File "/Users/dev/.databricks/labs/dlt-meta/state/venv/lib/python3.9/site-packages/databricks/sdk/mixins/files.py", line 351, in open
return self._path.open(mode='wb' if overwrite else 'rb' if read else 'xb')
File "/Users/dev/.pyenv/versions/3.9.6/lib/python3.9/pathlib.py", line 1242, in open
return io.open(self, mode, buffering, encoding, errors, newline,
File "/Users/dev/.pyenv/versions/3.9.6/lib/python3.9/pathlib.py", line 1110, in _opener
return self._accessor.open(self, flags, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/demo'
Error: exit status 1
15:52:15 ERROR failed execution pid=94725 exit_code=1 error="exit status 1"

thank you for any help.

java.lang.RuntimeException: non-nullable field authBytes was serialized as null

With Event Hubs source configuration like the following:

  "source_format": "eventhub",
  "source_details": {
     "source_schema_path": "{dbfs_path}/integration-tests/resources/eventhub_iot_schema.ddl",
     "eventhub.accessKeyName": "{eventhub_accesskey_name}",
     "eventhub.name": "{eventhub_name}",
     "eventhub.secretsScopeName": "{eventhub_secrets_scope_name}",
     "kafka.sasl.mechanism": "PLAIN",
     "kafka.security.protocol": "SASL_SSL",
     "eventhub.namespace": "{eventhub_nmspace}",
     "eventhub.port": "{eventhub_port}"
  },

The pipeline may fail with the following error:

Connection to node -1 ({eventhub_nmspace}.servicebus.windows.net/xx.xx.xx.xx:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
...
Unexpected error from {eventhub_nmspace}.servicebus.windows.net/xx.xx.xx.xx (channelId=-1); closing connection
java.lang.RuntimeException: non-nullable field authBytes was serialized as null

Delta source_format key error

When using the delta option for the bronze source_format the read_dlt_delta method will fail with a key error:

.{bronze_dataflow_spec.sourceDetails["table"]}
KeyError: 'table'

Switching the key to source_table worked for me locally.

Create demo for append_flow and auto loader metadata

Create demo which can demonstrate DLT's following capabilities

  1. Append Flow DLT API
  2. Autoloader _metadata column to add additional columns to bronze table like file_name , file_path etc
  3. Bring your own custom transformations for entire bronze layer

Point DLT-META to Source System

  • Point to source system's database/catalog and DLT-META should launch DLT pipelines to create ingestion flow
  • This will create DLT-META config automatically
  • Launch DLT pipelines

Bring your own custom transformations for bronze/silver layer

Need to support custom transformations for bronze and silver layer. e.g after reading from source pipeline reader returns dataframe, if customer want to transform dataframe need support to call custom function for transformation
e.g below transformation needs to be applied on input dataframe

from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
def custom_transform_func1(input_df) -> DataFrame:
  return input_df.withColumn('custom_col', lit('test1'))

def custom_transform_func2(input_df) -> DataFrame:
  return input_df.withColumn('custom_col', lit('test2'))

dlt-meta needs placehold to put tranformation functions and their order in onboarding.json
e.g "transformation_functions":["custom_transform_func_test1","custom_transform_func2"]

you need to attach these function to DLT notebook via either add it to notebook before calling dlt-meta generic pipeline or pip install your custom function lib. This way DLT will get functions at runtime

Unity dataflowspec tables

Hello,

if the dataflowspecs tables + bronze/silver tables are stored in schema managed by UC, what is the reason to use/set paths for these tables?
I'm unable to create dataflowspec tables if I want them to be stored in schema managed by UC. If I store it in hive_metastore, it runs fine.

The main issue is that users don't know the paths for the tables managed by UC, they should only reference the tables names in specific schema. Or am I missing something?

"silver_dataflowspec_table": "silver_dataflowspec_table",
"silver_dataflowspec_path": "dbfs:/onboarding_tables_cdc/silver",
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
"bronze_dataflowspec_path": "dbfs:/onboarding_tables_cdc/bronze",

Thanks.

expect_or_quarantine

Hi, not an error but I was looking at this and noticed you defined

"expect_or_quarantine":{
"valid_operation":"operation IN ('APPEND', 'DELETE', 'UPDATE')"
}

in one of the dqe. But when I tried, it throws

AttributeError: module 'dlt' has no attribute 'expect_or_quarantine',Map(),Map(),List(),List(),Map())

Is there something I missed?

Silver Transformations Became a Dependency

TypeError: __init__() missing 1 required positional argument: 'dataQualityExpectations',None,Map(),Map(),List(),List(),Map())

When a Delta Live Table is onboarded thru a previous version of DLT-META without silver data quality expectations, the code will fail when invoking the launch notebook.

Ability to add columns to bronze tables similar to silver table query

It's not an issue but a feature request. This would be useful if we want to add the name of the source file name or put in processing time. Example:
val df = spark.readStream.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("cloudFiles.region","ap-south-1")
.load("path")
.withColumn("filePath",input_file_name())

Multiple select expression support

Hello,

I ingest JSON data into bronze layer and then try to apply some transformations on it to promote it to the silver layer.

here's the problem: when I try to explode the ingested nested json and then select all columns I get the following error:


from pyspark.sql.types import StructType, StructField, StringType, ArrayType

 

data = [("id1", [("a", "111"), ("b", "222")]),

        ("id2", [("c", "333"), ("d", "444")])]

 

schema = StructType([

    StructField("id", StringType()),

    StructField("payload", ArrayType(

        StructType([

            StructField("c1", StringType()),

            StructField("c2", StringType())

        ])

    ))

])

 

df = spark.createDataFrame(data, schema)

df2 = df.selectExpr(

"explode(payload) as temp",

"temp.*"

)

 

display(df2.schema)

 

AnalysisException: Cannot resolve 'temp.*' given input columns ''.; line 1 pos 0

 

However if I select all columns in a separate selectExpr, all is good:


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*"

)

 

display(df2.schema)

 

df2:pyspark.sql.dataframe.DataFrame

c1:string

c2:string

StructType([StructField('c1', StringType(), True), StructField('c2', StringType(), True)])

 

Now suppose I want to drop the unwanted columns from the result:


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*",

"except(c1)"

)

 

display(df2.schema)

Which gives the error:


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `c1` cannot be resolved. Did you mean one of the following? [`temp`].; line 1 pos 0

 

However if add another selectExpr on top of a previous one, it works!


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*").selectExpr(

"* except(c1)"

)

 

display(df2.schema)

 

StructType([StructField('c2', StringType(), True)])

 

As far as I understood from the DLT Meta source code:


    def get_silver_schema(self):

        """Get Silver table Schema."""

        silver_dataflow_spec: SilverDataflowSpec = self.dataflowSpec

        # source_database = silver_dataflow_spec.sourceDetails["database"]

        # source_table = silver_dataflow_spec.sourceDetails["table"]

        select_exp = silver_dataflow_spec.selectExp

        where_clause = silver_dataflow_spec.whereClause

        raw_delta_table_stream = self.spark.read.load(

            path=silver_dataflow_spec.sourceDetails["path"],

            format="delta"

            # #f"{source_database}.{source_table}"

        ).selectExpr(*select_exp)

 

the selectExpr is applied once on an array of select expressions.

Can we apply it separately on each of the select expression so as to avoid the above errors and make it more flexible in transforming?

Thank you

How to chain multiple silver tables after bronze table?

Hi, is it possible to have more than one silver tables after a bronze table? For example, after customers_cdc, can I have customers silver table reading from customers_cdc and another customers_clean silver table reading from customers? If so, how do I define these in onboarding.json?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.