databrickslabs / overwatch Goto Github PK
View Code? Open in Web Editor NEWCapture deep metrics on one or all assets within a Databricks workspace
License: Other
Capture deep metrics on one or all assets within a Databricks workspace
License: Other
The column names are not uniform across all layers; they should be. The gold layer's model should be pulled back into bronze and silver layers. The gold layer is uniform.
Just for tracking....
I have an Azure setup with cluster logs enabled, and events coming to the EventHubs. But the job fails with org.apache.spark.sql.AnalysisException: No such struct field existing_cluster_id in aclPermissionSet, autoscale, autotermination_minutes, clusterId, clusterName, clusterOwnerUserId, clusterState, clusterWorkers, cluster_id, cluster_log_conf, cluster_name, containerId, data_length, driver_node_type_id, enable_elastic_disk, handle, idInJob, idempotency_token, instanceId, jobClusterType, jobId, jobTaskType, jobTerminalState, jobTriggerType, job_id, name, new_cluster, new_settings, node_type_id, notebookId, orgId, overwrite, path, port, publicKey, recursive, resourceId, runId, shardName, spark_conf, spark_version, targetUserId, tokenId, user, userName, validate_cluster_name_uniqueness;
this happens in following code snippet:
val jobsInteractiveClusterIDs = auditDFBase
.filter('serviceName === "jobs")
.select($"requestParams.existing_cluster_id".alias("cluster_id"))
.filter('cluster_id.isNotNull)
.distinct
I'm not sure how correctly fix it. There are 2 options:
.select($"requestParams.existing_cluster_id".alias("cluster_id"))
to the .select($"requestParams.cluster_id".alias("cluster_id"))
requestParams.existing_cluster_id
is presentRefactor so that
Pipelines have scopes which have modules. This allows for a natural flow wherein modules may have requirements including sources, business rules, etc. Currently, due to time constraints, module dependencies and requirements must be maintained outside of the Pipeline architecture in the global config.
It's become clear that the event logs are a jumbled mess of several schemas by Event (which wasn't clear at the beginning). As such, as time permits, the original bronze ingest from eventLogs should be restructured to ingest by Event before any explicit schema is applied. This will greatly improve supportability and maintenance requirements.
TODO -- Add partition filters to Gold Spark Incremental DFs
Originally posted by @GeekSheikh in #42 (comment)
SQL Analytics comes with its own price, and should be handled differently from the Interactive and automated workloads. Also, right now, SQLA is reported as not automated workload, so we calculate price as interactive
Additional events for future research. These events seem to have high-value data, review later.
'Event === "org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveSQLMetricUpdates"
--> select sqlPlanMetrics, ExecutionID
'Event === "SparkListenerEnvironmentUpdate"
--> select JVMInformation
'Event.isin("SparkListenerBlockManagerAdded", "SparkListenerBlockManagerRemoved")
--> select BlockManagerID, MaximumMemory, MaximumOnheapMemory, Timestamp
left off the dbu metric in instance details for azure.
Initially, the users/groups/secrets were very rudimentary. The depth of these contexts can be greatly improved upon without a lot of effort.
JDBC logic was abandoned for V 1.0 release date due to time. Very valuable piece of sparkEvents for many customers, this should be implemented from silver --> consumer.
Validate continuity across runs. Ensure that no data is missed due to not closing an event that begins in one pipeline run and ends in the next.
For example, clusterStateFact - a cluster state begins in run 1 and ends in run 2 -- is the continuity of that state properly maintained across the runs.
The same validations are necessary in all ETLs that rely on joins/windows to match state a+1 to state a
At present both sides of this join are relatively small -- so adding a range join hint here may not have much effect but it's worth exploring when/if there's time.
Overwatch must have read access to all clusters or the data retrieved will be limited. I would like to persist all cluster IDs that received an API error during retrieval attempt to a table in the ETL database. This will enable us to understand which cluster events were missed due to API errors.
This is critical because once data expires from here it cannot be retrieved programmatically.
There is a number of places where usage of dbutils
is not necessary:
dbutils.fs.ls
could be replaced with Hadoop FileSystem API (I have changes already)spark.conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")
spark.conf.get("spark.databricks.clusterUsageTags.cloudProvider")
etc.
The SparkApplication events were abandoned. From what I recall, it didn't really add any value. Need to review and confirm.
Currently, the database initialization will create the instanceDetails table in the target and append the organizationId and other Overwatch metadata. This requires a pre-initialization step and customization for each workspace.
Change logic of Pipeline.scala createStaticDatasets. Add config to provide path to pre-created csv of compute cost detail by organizationID (workspace) and use that if it exists
This should be completed in conjunction with issue 49 if possible
The view definitions currently only accept a select statement. These views should also accept a where clause and perhaps a few other clauses.
In the section With Databricks Billable Usage Delivery Logs
in EnvironmentSetup/azure.md
, we point to the AWS docs about billing. On Azure, billing information isn't provided this way. We need to either remove this section, or rework it according to the Azure documentation
Use batch streams to monitor the source[s] for new event logs.
JsonUtils.objToJson
called as following
val map = Map[String, Any]("sfield1" -> "value", "sfield2" -> "",
"sfield3" -> null, "ifield" -> 0, "bfield" -> true)
val js = JsonUtils.objToJson(map)
produces following output, although it should ignore null
and empty values with default options:
{"ifield":0,"sfield3":null,"sfield2":"","sfield1":"value","bfield":true}
Prices aren't static, neither should be the table that holds them. The table instanceDetails
should be converted to an SCD to capture price through time.
As per Michael -- plus some findings during large table alterations -- managed tables are challenging and inefficient to manage and not 1st class in Databricks going forward. Will migrate away from managed tables
Currently each pipeline is set to run in a vacuum such as Bronze(workspace)... with almost no effort this could be refactored so that a single Pipeline could be instantiated and run multiple layers. Currently Bronze / Silver / Gold are technically three different pipelines but they should all be able to run as one pipeline in sequence or individually.
Bronze(workspace).run()
OR
Pipeline(workspace)
.Bronze()
.Silver()
.Gold()
.run()
Delaying this implementation as it's more likely we will switch to Delta Live Tables
forgot to apply the partitions and zorders to the gold tables -- re-add for perf
Refactor exception structures to ensure that all failures are handled in the order desired and that all exceptions bubble up to the logs properly.
Additionally, implement all pipeline failures through exception handlers
Given a failure in a child module a parent may continue to succeed with null data. A dependency map needs to be implemented such that if a requisite module fails so do all others that depend on it.
Another way to approach it is to not progress timestamps but if a module is not fixed within "maxDays" period there can be a gap in data. Will review options.
As we get closer to the GCP release, we'll need to consider adding the support for GCP (first step would be just to throw an error during initialization, saying that GCP is not supported)
When the sbt-scoverage plugin is added to the projejct, and coverageEnabled
is set to true
then the plugin code is gets into the resulting jar, and this lead to job failing with following message:
ERROR Uncaught throwable from user code: java.lang.NoClassDefFoundError: scoverage/Invoker$
AND
can be combined to accomplish all schema scrubbing needs (I think).
Additionally, MapTypes should be added to supported validations.
the cluster events api call can take quite some time and some process lineages can result in the cluster not scaling down all the way, as such, it's safer to force downsize the cluster when the module starts if there are > some (n) batches.
Ensure that all bronze input tables are filtering for the current organization_id. I want to ensure that customers using Overwatch across workspaces do not wind up with multiple duplicate data in bronze layer.
Simple test, for example, given input of audit_log_raw containing data for multiple organizations, validate that the output of a module contains only data for the relevant org_id.
Note that when input DFs are retrieved using asIncrementalDF
that globalFilters
get applied which includes filter for org_id. I noticed that several bronze modules do not utilize asIncrementalDF
to retrieve input DF. This is likely because bronze was created before the incrementalDF was as mature as it is now. Whether or not the bronze layer should always use incremental filters everywhere possible is certainly up for debate. Either way, it's critical that we filter out input data outside of the current workspace.
Additionally, I'm not certain when/where/how bronze can actually pull in data from multiple workspaces since it should be acquiring data FROM the workspace on which it's executing but I think this could still become an issue in some cases.
Example of bronze module 1005 not utilizing asIncrementalDF
function to retrieve auditLogDF, a date filter is being applied inside the function (this should change but trying to minimize code changes).
globalFilters Code:
Overwatch works across orgs/workspaces but it's a bit clunky. This should be simplified and streamlined.
API batching for Module 1005, Bronze_ClusterEventLogs. The reason this is such an important component is because the ONLY place to get this information (which drives cost and cluster util / everything) is the API. Some customers have 100s of thousand cluster event states per day. This is too much data to pull to the driver all at once, thus I batch it (not talking API pagination, that's working) but I built a custom batcher to query number of cluster events between x and y dates and then batch them to 500K. Pull 500K events, write to table, repeat. This data expires after 60d (so I'm told) and once the data expires in the API, it's not accessible programmatically anywhere else again.
Historical loads and large customers will hit this upper bound and it's critical to ensure the process is working. I envision several tests. I believe unit tests can be used to simulate a small upper bound (set 20 as the upper bound to batch for testing) and ensure that the data is pulled, written and then continues to pull the next batch.
This test isn't as important as the higher-level tests you're working on now as errors here would likely present themselves later but in some cases they may not. I wanted to put this on the radar as a required bronze test set need.
Relevant code is below.
The clusterEvents API call requires the clusterID, thus a list of all clusterIDs with new events must be created to send to the API calls as part of the API query. The section below is the section responsible for ensuring all clusters with new events are accounted for, this is a VERY CRITICAL component of Overwatch.
Batching Code:
job cluster cannot be derived from audit logs in a specific, but common, case. Jobs team has built a fix and PR is merge. This should be available in platform 3.42 but it will be quite a bit different in that it will populate the clusterID field in the audit logs. This could simplify several pipelines, but it should also fix this going forward.
More details in ES-74247
PR is here
Remove this very nasty code and reference the audit logs.
Integrate passthrough logs in a clear, legible way similar to the example in this notebook
The df.joinWithLag
transform function is necessary in the pipeline to allow incremental data to be joined with data in previous runs while also subsetting these very large datasets. JoinWithLag satisfies the need when the pipeline runs daily (or more frequently) but if it were to run less frequently, days would not be a valid lagging join method -- thus the pipeline should utilize n previous runs as the lagging side not days.
The config should allow the user to specify a separate database for the consumer-facing gold-layered views. The DB is quite cluttered with all the etl tables in the same db as the user tables. Additionally, this will provide more security granularity.
The following functions need to have unit tests -- these are what ensure the proper flow of data throughout overwatch and must be and remain correct.
Identifies From --> Until timestamps for all modules
Creates TimeDetail for use everywhere
Sets the Pipeline Snap Time
Gets the StartTime for a provided Module
Gets the UntilTime for a provided Module
Adds the proper "one" value to a time/date type since between is inclusive on both sides
Applies Incremental Filters to a DataFrame
We still need to use open source API helpers but this code got pretty messy in development. There are many challenges with apis and escape characters using DBConnect which have differences between mac/linux/windows. I believe this could be GREATLY simplified but didn't have the time to work it out.
Enable metadata to capture usage.
Each API call should pass a header with unique k/v to show a call from Overwatch.
It’s user-agent http header - put a product name and version - eg “databricks-terraform/0.3.2”. Just make a request to list clusters or tokens to an api and it’ll show up in logs with workspace id. Prod.http_access_logs are pretty much real-time, so you can test it.
All of the labs must provide usage tracking, otherwise it’s not possible to know if labs is used or not.
Enable pools to be pulled through from audit_log_bronze and enhanced as a slow changing dim
Enable skewjoin hints for larger spark join events -- some clusters/sparkContextIDs are MUCH larger than others. Utilizing skew hint here will greatly improve join performance.
Let's ensure that all clusters with logging enabled are captured. The cluster_id must be found for all scenarios.
Code to validate is here.
We must market this idea and identify dollar values associated to get spark engineering to support a link between user and spark job. If this happens we can
A recent enhancement captures the fileCreateTS and metadata about the logfile. Enhance this metadata further and capture it in the fileName group detail struct.
Improve and simplify the pipeline by having the EH write directly to storage account. This will simplify and optimize the Overwatch Pipeline as well as reduce risks of backup on Azure EH.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.