Git Product home page Git Product logo

overwatch's People

Contributors

alexott avatar geeksheikh avatar gueniai avatar itaiw avatar jiteshsoni-db avatar mohanbaabu1996 avatar nfx avatar piyushagarwalla-db avatar sawankulkarni-db avatar souravbaner-da avatar sriram251-code avatar uucico avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

overwatch's Issues

Bronze job fails with `AnalysisException: No such struct field existing_cluster_id`

Just for tracking....

I have an Azure setup with cluster logs enabled, and events coming to the EventHubs. But the job fails with org.apache.spark.sql.AnalysisException: No such struct field existing_cluster_id in aclPermissionSet, autoscale, autotermination_minutes, clusterId, clusterName, clusterOwnerUserId, clusterState, clusterWorkers, cluster_id, cluster_log_conf, cluster_name, containerId, data_length, driver_node_type_id, enable_elastic_disk, handle, idInJob, idempotency_token, instanceId, jobClusterType, jobId, jobTaskType, jobTerminalState, jobTriggerType, job_id, name, new_cluster, new_settings, node_type_id, notebookId, orgId, overwrite, path, port, publicKey, recursive, resourceId, runId, shardName, spark_conf, spark_version, targetUserId, tokenId, user, userName, validate_cluster_name_uniqueness;

this happens in following code snippet:

     val jobsInteractiveClusterIDs = auditDFBase
       .filter('serviceName === "jobs")
      .select($"requestParams.existing_cluster_id".alias("cluster_id"))
       .filter('cluster_id.isNotNull)
       .distinct

I'm not sure how correctly fix it. There are 2 options:

  1. Change .select($"requestParams.existing_cluster_id".alias("cluster_id")) to the .select($"requestParams.cluster_id".alias("cluster_id"))
  2. Check the schema of incoming data & perform this operation only if requestParams.existing_cluster_id is present

Refactor - Scopes, Modules, Pipeline

Refactor so that
Pipelines have scopes which have modules. This allows for a natural flow wherein modules may have requirements including sources, business rules, etc. Currently, due to time constraints, module dependencies and requirements must be maintained outside of the Pipeline architecture in the global config.

Restructure EventLog Bronze Ingest

It's become clear that the event logs are a jumbled mess of several schemas by Event (which wasn't clear at the beginning). As such, as time permits, the original bronze ingest from eventLogs should be restructured to ingest by Event before any explicit schema is applied. This will greatly improve supportability and maintenance requirements.

Better costs analysis for SQL Analytics

SQL Analytics comes with its own price, and should be handled differently from the Interactive and automated workloads. Also, right now, SQLA is reported as not automated workload, so we calculate price as interactive

Future Value - sparkEvents - Research & Enable

Additional events for future research. These events seem to have high-value data, review later.

'Event === "org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveSQLMetricUpdates"
--> select sqlPlanMetrics, ExecutionID
'Event === "SparkListenerEnvironmentUpdate"
--> select JVMInformation
'Event.isin("SparkListenerBlockManagerAdded", "SparkListenerBlockManagerRemoved")
--> select BlockManagerID, MaximumMemory, MaximumOnheapMemory, Timestamp

Continuity Across Runs

Validate continuity across runs. Ensure that no data is missed due to not closing an event that begins in one pipeline run and ends in the next.

For example, clusterStateFact - a cluster state begins in run 1 and ends in run 2 -- is the continuity of that state properly maintained across the runs.

The same validations are necessary in all ETLs that rely on joins/windows to match state a+1 to state a

Add Range Join Optimization

At present both sides of this join are relatively small -- so adding a range join hint here may not have much effect but it's worth exploring when/if there's time.

val jobRunIntermediateStates = newTerminatedJobRuns.alias("jr")
.join(clusterPotentialIntermediateStates.alias("cpot"),
$"jr.organization_id" === $"cpot.organization_id" &&
$"jr.cluster_id" === $"cpot.cluster_id" &&
$"cpot.unixTimeMS_state_start" > $"jr.job_runtime.startEpochMS" && // only states beginning after job start and ending before
$"cpot.unixTimeMS_state_end" < $"jr.job_runtime.endEpochMS"
)

Module 1005 - API call to Cluster Events - Access Denied

Overwatch must have read access to all clusters or the data retrieved will be limited. I would like to persist all cluster IDs that received an API error during retrieval attempt to a table in the ETL database. This will enable us to understand which cluster events were missed due to API errors.

This is critical because once data expires from here it cannot be retrieved programmatically.

Decrease usage of the dbutils

There is a number of places where usage of dbutils is not necessary:

  • use of dbutils.fs.ls could be replaced with Hadoop FileSystem API (I have changes already)
  • organization ID could be extracted from Spark conf:
spark.conf.get("spark.databricks.clusterUsageTags.clusterOwnerOrgId")
  • cloud provider could be also extracted from provider, without relying on the domain name (it will be broken with custom domains, or when using GCP):
spark.conf.get("spark.databricks.clusterUsageTags.cloudProvider")

etc.

InstanceDetails - Initialization Improvement

Currently, the database initialization will create the instanceDetails table in the target and append the organizationId and other Overwatch metadata. This requires a pre-initialization step and customization for each workspace.

Change logic of Pipeline.scala createStaticDatasets. Add config to provide path to pre-created csv of compute cost detail by organizationID (workspace) and use that if it exists

This should be completed in conjunction with issue 49 if possible

azure setup documentation points to AWS docs about billing

In the section With Databricks Billable Usage Delivery Logs in EnvironmentSetup/azure.md, we point to the AWS docs about billing. On Azure, billing information isn't provided this way. We need to either remove this section, or rework it according to the Azure documentation

JsonUtils.objToJson doesn't obey includeNulls and includeEmpty options

JsonUtils.objToJson called as following

val map = Map[String, Any]("sfield1" -> "value", "sfield2" -> "",
     "sfield3" -> null, "ifield" -> 0, "bfield" -> true)
val js = JsonUtils.objToJson(map)

produces following output, although it should ignore null and empty values with default options:

{"ifield":0,"sfield3":null,"sfield2":"","sfield1":"value","bfield":true}

instanceDetails - Price SCD

Prices aren't static, neither should be the table that holds them. The table instanceDetails should be converted to an SCD to capture price through time.

Pipeline -- Enable Pipeline Chaining -- Slight refactor -- low hanging fruit

Currently each pipeline is set to run in a vacuum such as Bronze(workspace)... with almost no effort this could be refactored so that a single Pipeline could be instantiated and run multiple layers. Currently Bronze / Silver / Gold are technically three different pipelines but they should all be able to run as one pipeline in sequence or individually.

Bronze(workspace).run()
OR

Pipeline(workspace)
    .Bronze()
    .Silver()
    .Gold()
    .run()

Delaying this implementation as it's more likely we will switch to Delta Live Tables

Exception Handling -- Refactor

Refactor exception structures to ensure that all failures are handled in the order desired and that all exceptions bubble up to the logs properly.

Additionally, implement all pipeline failures through exception handlers

Module Dependency Map Required

Given a failure in a child module a parent may continue to succeed with null data. A dependency map needs to be implemented such that if a requisite module fails so do all others that depend on it.

Another way to approach it is to not progress timestamps but if a module is not fixed within "maxDays" period there can be a gap in data. Will review options.

[EPIC] - Enable support for GCP

As we get closer to the GCP release, we'll need to consider adding the support for GCP (first step would be just to throw an error during initialization, saying that GCP is not supported)

job fails to execute when `coverageEnabled` is set to true

When the sbt-scoverage plugin is added to the projejct, and coverageEnabled is set to true then the plugin code is gets into the resulting jar, and this lead to job failing with following message:

ERROR Uncaught throwable from user code: java.lang.NoClassDefFoundError: scoverage/Invoker$

Force Scale down cluster during clusterEvents Module

the cluster events api call can take quite some time and some process lineages can result in the cluster not scaling down all the way, as such, it's safer to force downsize the cluster when the module starts if there are > some (n) batches.

[TESTS] - Global Organization ID Filter

Ensure that all bronze input tables are filtering for the current organization_id. I want to ensure that customers using Overwatch across workspaces do not wind up with multiple duplicate data in bronze layer.

Simple test, for example, given input of audit_log_raw containing data for multiple organizations, validate that the output of a module contains only data for the relevant org_id.

Note that when input DFs are retrieved using asIncrementalDF that globalFilters get applied which includes filter for org_id. I noticed that several bronze modules do not utilize asIncrementalDF to retrieve input DF. This is likely because bronze was created before the incrementalDF was as mature as it is now. Whether or not the bronze layer should always use incremental filters everywhere possible is certainly up for debate. Either way, it's critical that we filter out input data outside of the current workspace.

Additionally, I'm not certain when/where/how bronze can actually pull in data from multiple workspaces since it should be acquiring data FROM the workspace on which it's executing but I think this could still become an issue in some cases.

Example of bronze module 1005 not utilizing asIncrementalDF function to retrieve auditLogDF, a date filter is being applied inside the function (this should change but trying to minimize code changes).

lazy private val appendClusterEventLogsProcess = ETLDefinition(
prepClusterEventLogs(
BronzeTargets.auditLogsTarget,
clusterEventLogsModule.fromTime,
clusterEventLogsModule.untilTime,
config.apiEnv,
config.organizationId
),
append(BronzeTargets.clusterEventsTarget)
)

globalFilters Code:

def withIncrementalFilters(
df: DataFrame,
module: Module,
filters: Seq[IncrementalFilter],
globalFilters: Seq[Column] = Seq(),
dataFrequency: Frequency
): DataFrame = {
val parsedFilters = filters.map(filter => {
val c = filter.cronColName
val low = filter.low
val high = filter.high
val dt = df.schema.fields.filter(_.name == c).head.dataType
dt match {
case _: TimestampType =>
col(c).between(PipelineFunctions.addOneTick(low, dataFrequency), high)
case _: DateType => {
col(c).between(
PipelineFunctions.addOneTick(low.cast(DateType), dataFrequency, DateType),
if (dataFrequency == Frequency.daily) date_sub(high.cast(DateType), 1) else high.cast(DateType)
)
}
case _: LongType =>
col(c).between(PipelineFunctions.addOneTick(low, dataFrequency, LongType), high.cast(LongType))
case _: IntegerType =>
col(c).between(PipelineFunctions.addOneTick(low, dataFrequency, IntegerType), high.cast(IntegerType))
case _: DoubleType =>
col(c).between(PipelineFunctions.addOneTick(low, dataFrequency, DoubleType), high.cast(DoubleType))
case _ =>
throw new IllegalArgumentException(s"IncreasingID Type: ${dt.typeName} is Not supported")
}
})
val allFilters = parsedFilters ++ globalFilters
applyFilters(df, allFilters, Some(module))
}

Module 1005 - API Custom Batching - Tests

API batching for Module 1005, Bronze_ClusterEventLogs. The reason this is such an important component is because the ONLY place to get this information (which drives cost and cluster util / everything) is the API. Some customers have 100s of thousand cluster event states per day. This is too much data to pull to the driver all at once, thus I batch it (not talking API pagination, that's working) but I built a custom batcher to query number of cluster events between x and y dates and then batch them to 500K. Pull 500K events, write to table, repeat. This data expires after 60d (so I'm told) and once the data expires in the API, it's not accessible programmatically anywhere else again.

Historical loads and large customers will hit this upper bound and it's critical to ensure the process is working. I envision several tests. I believe unit tests can be used to simulate a small upper bound (set 20 as the upper bound to batch for testing) and ensure that the data is pulled, written and then continues to pull the next batch.

This test isn't as important as the higher-level tests you're working on now as errors here would likely present themselves later but in some cases they may not. I wanted to put this on the radar as a required bronze test set need.

Relevant code is below.
The clusterEvents API call requires the clusterID, thus a list of all clusterIDs with new events must be created to send to the API calls as part of the API query. The section below is the section responsible for ensuring all clusters with new events are accounted for, this is a VERY CRITICAL component of Overwatch.

protected def prepClusterEventLogs(auditLogsTable: PipelineTable,
start_time: TimeTypes, end_time: TimeTypes,
apiEnv: ApiEnv,
organizationId: String): DataFrame = {
val extraQuery = Map(
"start_time" -> start_time.asUnixTimeMilli, // 1588935326000L, //
"end_time" -> end_time.asUnixTimeMilli, //1589021726000L //
"limit" -> 500
)
// TODO -- upgrade to incrementalDF
val auditDFBase = auditLogsTable.asDF
.filter(
'date.between(start_time.asColumnTS.cast("date"), end_time.asColumnTS.cast("date")) &&
'timestamp.between(lit(start_time.asUnixTimeMilli), lit(end_time.asUnixTimeMilli))
)
val existingClusterIds = auditDFBase
.filter('serviceName === "clusters" && 'actionName.like("%Result"))
.select($"requestParams.clusterId".alias("cluster_id"))
.filter('cluster_id.isNotNull)
.distinct
val newClusterIds = auditDFBase
.filter('serviceName === "clusters" && 'actionName === "create")
.select(get_json_object($"response.result", "$.cluster_id").alias("cluster_id"))
.filter('cluster_id.isNotNull)
.distinct
val clusterIDs = existingClusterIds
.unionByName(newClusterIds)
.distinct
.as[String]
.collect()

Batching Code:

val batchSize = 500000D
// TODO -- remove hard-coded path
val tmpClusterEventsPath = "/tmp/overwatch/bronze/clusterEventsBatches"
val clusterEventsBuffer = buildClusterEventBatches(apiEnv, batchSize, start_time.asUnixTimeMilli, end_time.asUnixTimeMilli, clusterIDs)
logger.log(Level.INFO, s"NUMBER OF BATCHES: ${clusterEventsBuffer.length} \n" +
s"ESTIMATED EVENTS: ${clusterEventsBuffer.length * batchSize.toInt}")
var batchCounter = 0
clusterEventsBuffer.foreach(clusterIdsBatch => {
batchCounter += 1
logger.log(Level.INFO, s"BEGINNING BATCH ${batchCounter} of ${clusterEventsBuffer.length}")
val clusterEvents = apiByID("clusters/events", apiEnv, "post",
clusterIdsBatch, "cluster_id", Some(extraQuery))
try {
val tdf = SchemaTools.scrubSchema(
spark.read.json(Seq(clusterEvents: _*).toDS()).select(explode('events).alias("events"))
.select(col("events.*"))
)
val changeInventory = Map[String, Column](
"details.attributes.custom_tags" -> SchemaTools.structToMap(tdf, "details.attributes.custom_tags"),
"details.attributes.spark_conf" -> SchemaTools.structToMap(tdf, "details.attributes.spark_conf"),
"details.attributes.spark_env_vars" -> SchemaTools.structToMap(tdf, "details.attributes.spark_env_vars"),
"details.previous_attributes.custom_tags" -> SchemaTools.structToMap(tdf, "details.previous_attributes.custom_tags"),
"details.previous_attributes.spark_conf" -> SchemaTools.structToMap(tdf, "details.previous_attributes.spark_conf"),
"details.previous_attributes.spark_env_vars" -> SchemaTools.structToMap(tdf, "details.previous_attributes.spark_env_vars")
)
SchemaTools.scrubSchema(tdf.select(SchemaTools.modifyStruct(tdf.schema, changeInventory): _*))
.withColumn("organization_id", lit(organizationId))
.write.mode("append").format("delta")
.option("mergeSchema", "true")
.save(tmpClusterEventsPath)
} catch {
case e: Throwable => {
logger.log(Level.WARN, s"While attempting to grab events data for clusters below, an error occurred" +
s"\n${clusterIdsBatch.foreach(println)}", e)
}
}
})

Audit Logs Missing clusterID - ES-74247

job cluster cannot be derived from audit logs in a specific, but common, case. Jobs team has built a fix and PR is merge. This should be available in platform 3.42 but it will be quite a bit different in that it will populate the clusterID field in the audit logs. This could simplify several pipelines, but it should also fix this going forward.

More details in ES-74247
PR is here

Remove this very nasty code and reference the audit logs.

/**
* ES-74247 requires hit to API for notebook jobs running on existing clusters. PR submitted and will be
* rolled into 3.42 but until then, this is the hack around.
*/
val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(24))
val jrWFilledClusterIDs = jobRunsBase
.filter('jobClusterType === "existing")
.filter('jobTaskType === "notebook" && 'clusterId.isNull)
.filter(datediff(
from_unixtime(lit(System.currentTimeMillis().toDouble / 1000)).cast("timestamp").cast("date"),
$"jobRunTime.startTS".cast("date")) <= 67
)
.select('runId)
.distinct()
.as[Long]
.collect()
.par
jrWFilledClusterIDs.tasksupport = taskSupport
logger.log(Level.INFO, s"RUN_IDs staged for api grabber: ${jrWFilledClusterIDs.mkString(", ")}")
val runIdStrings = jrWFilledClusterIDs
.flatMap(runId => {
try {
val q = Map(
"run_id" -> runId
)
val runIDString = ApiCall("jobs/runs/get", apiEnv, Some(q), paginate = false)
.executeGet().asStrings
logger.log(Level.INFO, s"API CALL SUCCESS: RUN_ID == $runId")
runIDString
} catch {
case e: Throwable =>
logger.log(Level.ERROR, s"API CALL FAILED: RUN_ID == $runId --> ${e.getMessage}")
Array[String]()
}
})
.toArray
val notebookJobsWithClusterIDs = spark.read.json(Seq(runIdStrings: _*).toDS())
.select('run_id.alias("runId"), $"cluster_spec.existing_cluster_id".alias("cluster_id_apiLookup"))

Improve df.joinWithLag func - Add option Lag Runs

The df.joinWithLag transform function is necessary in the pipeline to allow incremental data to be joined with data in previous runs while also subsetting these very large datasets. JoinWithLag satisfies the need when the pipeline runs daily (or more frequently) but if it were to run less frequently, days would not be a valid lagging join method -- thus the pipeline should utilize n previous runs as the lagging side not days.

enable gold view to be separated from etl, layered tables

The config should allow the user to specify a separate database for the consumer-facing gold-layered views. The DB is quite cluttered with all the etl tables in the same db as the user tables. Additionally, this will provide more security granularity.

Initial Timestamp Unit Tests Required

The following functions need to have unit tests -- these are what ensure the proper flow of data throughout overwatch and must be and remain correct.

Identifies From --> Until timestamps for all modules

val lastRunDetail = spark.table(s"${config.databaseName}.pipeline_report")

Creates TimeDetail for use everywhere

def createTimeDetail(tsMilli: Long): TimeTypes = {

Sets the Pipeline Snap Time

Gets the StartTime for a provided Module

def fromTime(moduleID: Int): TimeTypes = {

Gets the UntilTime for a provided Module

def untilTime(moduleID: Int): TimeTypes = {

Adds the proper "one" value to a time/date type since between is inclusive on both sides

def addOneTick(ts: Column, dt: DataType = TimestampType): Column = {

Applies Incremental Filters to a DataFrame

def withIncrementalFilters(df: DataFrame, filters: Seq[IncrementalFilter], globalFilters: Option[Seq[Column]] = None): DataFrame = {

Improve ApiCall.scala

We still need to use open source API helpers but this code got pretty messy in development. There are many challenges with apis and escape characters using DBConnect which have differences between mac/linux/windows. I believe this could be GREATLY simplified but didn't have the time to work it out.

Enable Usage Tracking

Enable metadata to capture usage.

Each API call should pass a header with unique k/v to show a call from Overwatch.

It’s user-agent http header - put a product name and version - eg “databricks-terraform/0.3.2”. Just make a request to list clusters or tokens to an api and it’ll show up in logs with workspace id. Prod.http_access_logs are pretty much real-time, so you can test it.
All of the labs must provide usage tracking, otherwise it’s not possible to know if labs is used or not.

Enable Pools SCD

Enable pools to be pulled through from audit_log_bronze and enhanced as a slow changing dim

SkewJoin Spark Events

Enable skewjoin hints for larger spark join events -- some clusters/sparkContextIDs are MUCH larger than others. Utilizing skew hint here will greatly improve join performance.

Tests - Cluster Logging Capture

Let's ensure that all clusters with logging enabled are captured. The cluster_id must be found for all scenarios.

Code to validate is here.

val incrementalClusterBase = df
.selectExpr("*", "requestParams.*").drop("requestParams")
.filter('serviceName === "clusters" && 'actionName.isin("create", "edit"))
.withColumn("cluster_id", when('actionName === "create", get_json_object($"response.result", "$.cluster_id"))
.when('actionName =!= "create" && 'cluster_id.isNull, 'clusterId)
.otherwise('cluster_id).alias("cluster_id")
)
.select('organization_id, 'timestamp, 'cluster_id, 'cluster_name, 'cluster_log_conf)
// Get incremental snapshot of clusters during current run
// This captures clusters that have not been edited/restarted (still not terminated) since the last run with
// log confs as they will not be in the audit logs
val latestSnapW = Window.partitionBy('organization_id).orderBy('Pipeline_SnapTS.desc)
val currentlyDefinedClustersWithLogging = clusterSnapshot.asDF
.withColumn("snapRnk", rank.over(latestSnapW))
.filter('snapRnk === 1)
.withColumn("cluster_log_conf", to_json('cluster_log_conf))
.filter('cluster_id.isNotNull && 'cluster_log_conf.isNotNull)
.select('cluster_id, 'cluster_log_conf)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.