databrickslabs / dlt-meta Goto Github PK
View Code? Open in Web Editor NEWMetadata driven Databricks Delta Live Tables framework for bronze/silver pipelines
Home Page: https://databrickslabs.github.io/dlt-meta/
License: Other
Metadata driven Databricks Delta Live Tables framework for bronze/silver pipelines
Home Page: https://databrickslabs.github.io/dlt-meta/
License: Other
Integrate append_flow API for following use cases:
Introduce sql query string sqlstmt
via onboarding which can have joins to other static tables.
This way after readStream we can do spark.sql(sqlstmt)
This can also open gold layer possibilities
Hello,
Here's a code:
onboarding_params_map = {
"database": "dlt_demo",
"onboarding_file_path": "onboarding_files/onboarding.json",
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
"bronze_dataflowspec_path": "dbfs:/FileStore/bronze",
"silver_dataflowspec_table": "silver_dataflowspec_table",
"silver_dataflowspec_path": "dbfs:/FileStore/silver".format("silver"),
"overwrite": "True",
"onboard_layer": "bronze_silver",
"env": "prod",
"version": "v1",
"import_author":"MANTAS"
}
OnboardDataflowspec(spark, onboarding_params_map).onboard_dataflow_specs()
I'm getting "ValueError: missing attributes : set()" while all parameters are specified
Need to provide ability so that file metadata can be added to dataframe
e.g
import dlt
@dlt.table
def bronze():
return (spark.readStream.format("cloudFiles")
# define the schema for the ~6 common columns across files. All other input fields will be "rescued" into a JSON string column that can be queried via dot notation.
.schema("Common1 string, Common2 string, _file_path string") # _file_path is a hidden auto-field but shows up in rescueData column JSON with this method. Spoofing that I have the same column in my input file so i can drop this spoofed column later
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaEvolutionMode", "rescue")
.option("cloudFiles.rescuedDataColumn","extraFields") # override default _rescuedData column name with whatever you want to call this column
.option("header","true")
.load("/Volumes/vol/data/*.txt")
.select("*","_metadata") # add file metadata information to output
.drop("_file_path") # discard dummy input column to keep _file_path out of extraFields rescue data column
)
Support non delta as sink using metadata approach.
Feature request, not bug. It would be good to be able to merge the onboarding Json into the metadata tables by key (data_flow_id). This way we don't have to overwrite the table and we could maintain central metadata tables for all source entities that we're ingesting. This could be done manually of course but would be nice to have it baked in into the framework.
Currently apply changes assumes version column as Integer types, we need to infer data types from sequence_by
Hello,
I am getting this error while initializing the dlt pipeline for bronze table. Please suggest what might be the cause. My source is eventhub.
"no tables are defined by the library of this pipeline. This error usually occurs when there are view definitions"
Thanks
I've built a process to generate our silver_transformations.json file. I have a question/future issue for what a DLT "Full refresh" could do if you have silver_transformations that might change for a table over time. If a "Full refresh" is done, then I believe it would only grab what is current in the dataflowspecTable for silver, which could miss any historical transformations put in place at their respective times. Am I missing something architecturally with DLT or DLT-meta that can handle this use case? I appreciate any help/conversation surrounding this topic.
Hello,
I'm trying to onboard and deploy DLT-META with all the default parameters in terminal, but getting this error:
Traceback (most recent call last):
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 500, in
main(*sys.argv[1:])
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 496, in main
MAPPINGcommand
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 467, in onboard
dltmeta.onboard(cmd)
File "/Users/mantasmicikevicius/dlt-meta/src/cli.py", line 158, in onboard
self._ws.dbfs.upload(cmd.dbfs_path + f"/dltmeta_conf/{onboarding_filename}", ob_file, overwrite=True)
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 320, in upload
with self.open(path, write=True, overwrite=overwrite) as dst:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 316, in open
return _DbfsIO(self, path, read=read, write=write, overwrite=overwrite)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/mixins/files.py", line 38, in init
elif write: self._created = api.create(path, overwrite=overwrite)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/service/files.py", line 382, in create
res = self._api.do('POST', '/api/2.0/dbfs/create', body=body, headers=headers)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/core.py", line 128, in do
return retryable(self._perform)(method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/retries.py", line 54, in wrapper
raise err
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/retries.py", line 33, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/mantasmicikevicius/dlt-meta/.venv/lib/python3.11/site-packages/databricks/sdk/core.py", line 221, in _perform
raise self._make_nicer_error(response=response, **payload) from None
databricks.sdk.errors.platform.ResourceAlreadyExists: A file or directory already exists at the input path dbfs:/dlt-meta_cli_demo/dltmeta_conf/onboarding.json.
Error: exit status 1
Parsing argument complete. args=Namespace(cloud_provider_name='aws', dbr_version='12.2.x-scala2.12', dbfs_path='dbfs:/dais-dlt-meta-demo-automated/')
Traceback (most recent call last):
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 345, in
main()
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 244, in main
api_client = get_api_client()
^^^^^^^^^^^^^^^^
File "C:\Users\nehamjain\dlt-meta\dlt-meta-demo\launch_demo.py", line 17, in get_api_client
api_client = _get_api_client(config, command_name="labs_dlt-meta")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\nehamjain\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\LocalCache\local-packages\Python311\site-packages\databricks_cli\configure\config.py", line 102, in _get_api_client
verify = config.insecure is None
^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'insecure'
Hi, when running dlt-meta onboard, I was stuck on issue #51 that is not yet released, so I switched to your branch 51-dlt-meta-cli-onboard-command-issue-for-azure-databrickssdkerrorsplatformresourcealreadyexists
However, there are more issues there - regardless of input arguments for dlt-meta onboard, it tries to read from '/demo' that does not exist. Since I am on a mac, it is not possible for me to move the demo folder to root https://support.apple.com/en-gb/101400
I would assume that the 'dlt-meta onboard cmd' would try to read from folder specified in 'Provide onboarding files local directory', but that is not the case.
See my use of cmd and stacktrace:
sudo databricks labs dlt-meta onboard --debug --profile=test
15:50:54 INFO start pid=94725 version=0.223.1 args="databricks, labs, dlt-meta, onboard, --debug, --profile=test"
15:50:54 INFO Overriding login profile: test pid=94725
15:50:54 DEBUG Loading test profile from /Users/dev/.databrickscfg pid=94725 sdk=true
15:50:54 DEBUG Passing down environment variables: DATABRICKS_HOST, DATABRICKS_TOKEN pid=94725
15:50:54 DEBUG Forwarding subprocess: /Users/dev/.databricks/labs/dlt-meta/state/venv/bin/python3 /Users/dev/dlt-meta/src/cli.py {"command":"onboard","flags":{"log_level":"debug"},"output_type":""} pid=94725
15:50:54 DEBUG starting: /Users/dev/.databricks/labs/dlt-meta/state/venv/bin/python3 /Users/dev/dlt-meta/src/cli.py {"command":"onboard","flags":{"log_level":"debug"},"output_type":""} pid=94725
Provide onboarding file path (default: demo/conf/onboarding.template):
Provide onboarding files local directory (default: demo/):
Provide dbfs path (default: dbfs:/dlt-meta_cli_demo): dbfs:/dlt-meta_cli_demo_100
Provide databricks runtime version (default: 15.3.x-scala2.12):
Run onboarding with unity catalog enabled?
[0] False
[1] True
Enter a number between 0 and 1: 1
Provide unity catalog name: dlt_test
Provide dlt meta schema name (default: dlt_meta_dataflowspecs_f5eda329176c4678bd2cf2c499c270e2):
Provide dlt meta bronze layer schema name (default: dltmeta_bronze_2a2e2b8c465244e5a22743b4d0013eed):
Provide dlt meta silver layer schema name (default: dltmeta_silver_481fbe8078ef4267aec8966d4efb9ec1):
Provide dlt meta layer
[0] bronze
[1] bronze_silver
[2] silver
Enter a number between 0 and 2: 1
Provide bronze dataflow spec table name (default: bronze_dataflowspec):
Provide silver dataflow spec table name (default: silver_dataflowspec):
Overwrite dataflow spec?
[0] False
[1] True
Enter a number between 0 and 1: 1
Provide dataflow spec version (default: v1):
Provide environment name (default: prod): test
Provide import author name (default: dev): test_auth
Provide cloud provider name
[0] aws
[1] azure
[2] gcp
Enter a number between 0 and 2: 1
Update workspace/dbfs paths, unity catalog name, bronze/silver schema names in onboarding file?
[0] False
[1] True
Enter a number between 0 and 1: 1
Traceback (most recent call last):
File "/Users/dev/dlt-meta/src/cli.py", line 503, in
main(*sys.argv[1:])
File "/Users/dev/dlt-meta/src/cli.py", line 499, in main
MAPPINGcommand
File "/Users/dev/dlt-meta/src/cli.py", line 470, in onboard
dltmeta.onboard(cmd)
File "/Users/dev/dlt-meta/src/cli.py", line 160, in onboard
self._ws.dbfs.copy(cmd.onboarding_files_dir_path,
File "/Users/dev/.databricks/labs/dlt-meta/state/venv/lib/python3.9/site-packages/databricks/sdk/mixins/files.py", line 603, in copy
with child.open(read=True) as reader:
File "/Users/dev/.databricks/labs/dlt-meta/state/venv/lib/python3.9/site-packages/databricks/sdk/mixins/files.py", line 351, in open
return self._path.open(mode='wb' if overwrite else 'rb' if read else 'xb')
File "/Users/dev/.pyenv/versions/3.9.6/lib/python3.9/pathlib.py", line 1242, in open
return io.open(self, mode, buffering, encoding, errors, newline,
File "/Users/dev/.pyenv/versions/3.9.6/lib/python3.9/pathlib.py", line 1110, in _opener
return self._accessor.open(self, flags, mode)
FileNotFoundError: [Errno 2] No such file or directory: '/demo'
Error: exit status 1
15:52:15 ERROR failed execution pid=94725 exit_code=1 error="exit status 1"
thank you for any help.
With Event Hubs source configuration like the following:
"source_format": "eventhub",
"source_details": {
"source_schema_path": "{dbfs_path}/integration-tests/resources/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "{eventhub_accesskey_name}",
"eventhub.name": "{eventhub_name}",
"eventhub.secretsScopeName": "{eventhub_secrets_scope_name}",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"eventhub.namespace": "{eventhub_nmspace}",
"eventhub.port": "{eventhub_port}"
},
The pipeline may fail with the following error:
Connection to node -1 ({eventhub_nmspace}.servicebus.windows.net/xx.xx.xx.xx:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
...
Unexpected error from {eventhub_nmspace}.servicebus.windows.net/xx.xx.xx.xx (channelId=-1); closing connection
java.lang.RuntimeException: non-nullable field authBytes was serialized as null
When using the delta option for the bronze source_format the read_dlt_delta method will fail with a key error:
.{bronze_dataflow_spec.sourceDetails["table"]}
KeyError: 'table'
Switching the key to source_table
worked for me locally.
Create demo which can demonstrate DLT's following capabilities
Following the default docs with default argument values, It fails with the same error, regardless the dbfs folder:
databricks.sdk.errors.platform.ResourceAlreadyExists: A file or directory already exists at the input path dbfs:/dlt-meta_cli_demo/dltmeta_conf/onboarding.json
.
Integrate databricks labs blueprint into code base for CLI and unit tests
Need to support custom transformations for bronze and silver layer. e.g after reading from source pipeline reader returns dataframe, if customer want to transform dataframe need support to call custom function for transformation
e.g below transformation needs to be applied on input dataframe
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
def custom_transform_func1(input_df) -> DataFrame:
return input_df.withColumn('custom_col', lit('test1'))
def custom_transform_func2(input_df) -> DataFrame:
return input_df.withColumn('custom_col', lit('test2'))
dlt-meta needs placehold to put tranformation functions and their order in onboarding.json
e.g "transformation_functions":["custom_transform_func_test1","custom_transform_func2"]
you need to attach these function to DLT notebook via either add it to notebook before calling dlt-meta generic pipeline or pip install your custom function lib. This way DLT will get functions at runtime
I am trying to use cdc_apply_changes with a bronze table that we are using schema evolution (so no defined schema). Having a schema isn't required by Databricks for this operation, but it appears that the code is checking for a schema ddl before you can run use cdc_apply_changes.
The create_target_table() and create_streaming_live_table() functions are deprecated. Databricks recommends updating existing code to use the create_streaming_table() function
Provide support for dlt.apply_changes_from_snapshot
Hello,
if the dataflowspecs tables + bronze/silver tables are stored in schema managed by UC, what is the reason to use/set paths for these tables?
I'm unable to create dataflowspec tables if I want them to be stored in schema managed by UC. If I store it in hive_metastore, it runs fine.
The main issue is that users don't know the paths for the tables managed by UC, they should only reference the tables names in specific schema. Or am I missing something?
"silver_dataflowspec_table": "silver_dataflowspec_table",
"silver_dataflowspec_path": "dbfs:/onboarding_tables_cdc/silver",
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
"bronze_dataflowspec_path": "dbfs:/onboarding_tables_cdc/bronze",
Thanks.
Hi, not an error but I was looking at this and noticed you defined
"expect_or_quarantine":{
"valid_operation":"operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
in one of the dqe. But when I tried, it throws
AttributeError: module 'dlt' has no attribute 'expect_or_quarantine',Map(),Map(),List(),List(),Map())
Is there something I missed?
TypeError: __init__() missing 1 required positional argument: 'dataQualityExpectations',None,Map(),Map(),List(),List(),Map())
When a Delta Live Table is onboarded thru a previous version of DLT-META without silver data quality expectations, the code will fail when invoking the launch notebook.
Could we add a warning to the onboarding process that would throw a warning when a table in the onboarding file is used but the corresponding silver transformation table couldn't be found? It may also make sense for the source_schema_path to be listed, but either the file isn't found or is blank. That might add some QoL when prototyping and testing new configurations.
Hi Team, thank you so much for the UC branch merge. We are now able to work with UC without any issues or workarounds. However, when i tried running the pipeline for a different dataflow group, this new group overwrites whatever existed before it, instead of writing alongside the other tables. Could you let me know how to fix this?
It's not an issue but a feature request. This would be useful if we want to add the name of the source file name or put in processing time. Example:
val df = spark.readStream.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("cloudFiles.region","ap-south-1")
.load("path")
.withColumn("filePath",input_file_name())
In dlt-meta CLI it should not build whl file locally but take version and do pip install dlt-meta
Create CLI documentation which shows gif or videos on how to use dlt-meta using databricks labs cli.
Hello,
I ingest JSON data into bronze layer and then try to apply some transformations on it to promote it to the silver layer.
here's the problem: when I try to explode the ingested nested json and then select all columns I get the following error:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
data = [("id1", [("a", "111"), ("b", "222")]),
("id2", [("c", "333"), ("d", "444")])]
schema = StructType([
StructField("id", StringType()),
StructField("payload", ArrayType(
StructType([
StructField("c1", StringType()),
StructField("c2", StringType())
])
))
])
df = spark.createDataFrame(data, schema)
df2 = df.selectExpr(
"explode(payload) as temp",
"temp.*"
)
display(df2.schema)
AnalysisException: Cannot resolve 'temp.*' given input columns ''.; line 1 pos 0
However if I select all columns in a separate selectExpr, all is good:
df2 = df.selectExpr(
"explode(payload) as temp").selectExpr(
"temp.*"
)
display(df2.schema)
df2:pyspark.sql.dataframe.DataFrame
c1:string
c2:string
StructType([StructField('c1', StringType(), True), StructField('c2', StringType(), True)])
Now suppose I want to drop the unwanted columns from the result:
df2 = df.selectExpr(
"explode(payload) as temp").selectExpr(
"temp.*",
"except(c1)"
)
display(df2.schema)
Which gives the error:
AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `c1` cannot be resolved. Did you mean one of the following? [`temp`].; line 1 pos 0
However if add another selectExpr on top of a previous one, it works!
df2 = df.selectExpr(
"explode(payload) as temp").selectExpr(
"temp.*").selectExpr(
"* except(c1)"
)
display(df2.schema)
StructType([StructField('c2', StringType(), True)])
As far as I understood from the DLT Meta source code:
def get_silver_schema(self):
"""Get Silver table Schema."""
silver_dataflow_spec: SilverDataflowSpec = self.dataflowSpec
# source_database = silver_dataflow_spec.sourceDetails["database"]
# source_table = silver_dataflow_spec.sourceDetails["table"]
select_exp = silver_dataflow_spec.selectExp
where_clause = silver_dataflow_spec.whereClause
raw_delta_table_stream = self.spark.read.load(
path=silver_dataflow_spec.sourceDetails["path"],
format="delta"
# #f"{source_database}.{source_table}"
).selectExpr(*select_exp)
the selectExpr is applied once on an array of select expressions.
Can we apply it separately on each of the select expression so as to avoid the above errors and make it more flexible in transforming?
Thank you
Hi, is it possible to have more than one silver tables after a bronze table? For example, after customers_cdc
, can I have customers
silver table reading from customers_cdc
and another customers_clean
silver table reading from customers
? If so, how do I define these in onboarding.json?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.