jasperhg90 / dagster-infra Goto Github PK
View Code? Open in Web Editor NEW๐ง [Under construction] Terraform deployment Dagster OS on GCP using Kubernetes.
License: MIT License
๐ง [Under construction] Terraform deployment Dagster OS on GCP using Kubernetes.
License: MIT License
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7379196919
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7366120829
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
Over time, the dagster daemon pod consumes more and more memory, without a clear reason for why this is happening. The memory footprint is reduced when we redeploy the dagster services with Terraform in the "dagster-infra" repository.
Over time, we see an increase in the memory footprint of the dagster daemon pod:
We can see this is happening in the GCP monitor tab of the dagster-daemon Kubernetes workload.
Eventually, the memory will saturate and probably kill the daemon, or create a zombie pod. We don't know this for sure yet.
Depends on what happens when the memory of the daemon pod is saturated. Might crash the entire server, or might reboot.
No, this is a Dagster issue.
Yes. See this github issue. A fix was proposed there, but it didn't seem to fix the problem for another person. We could try it.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7366336751
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7365586815
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7379316262
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
We want to limit concurrency on Dagster jobs because. When we are triggering lots of jobs (e.g. because of a job with a lot of partitions), we don't want to open up too many connections to the PostgreSQL database, or start too many pods at the same time on the GKE cluster.
We found that the easiest way to limit concurrency is to impose it on the entire cluster on the job level (all options described here). It is possible to scope this to the type of job (e.g. backfills versus other runs, example below).
Important
You can limit concurrency at different levels. This ADR only describes limiting job concurrency. It does not describe how you can limit asset concurrency (e.g. if you have lots of assets running in a single run). This should be configured in the job specification of a DAG. More information can be found here.
Limiting concurrency using this approach is achieved by setting the following property when deploying the Dagster Helm chart:
set {
name = "dagsterDaemon.runCoordinator.config.queuedRunCoordinator.maxConcurrentRuns"
value = 5
}
This uses the default QueuedRunCoordinator
. See the dagster documentation for more information.
Dagster allows you to set concurrency limits based on tags. For now, this is only configured for backfills, which are limited to three concurrent runs.
set {
name = "dagsterDaemon.runCoordinator.config.queuedRunCoordinator.tagConcurrencyLimits[0].key"
value = "dagster/backfill"
}
set {
name = "dagsterDaemon.runCoordinator.config.queuedRunCoordinator.tagConcurrencyLimits[0].limit"
value = 3
}
This impacts everyone deploying DAGs on the deployed server, since we are limiting their concurrency. If they need to run more jobs at the same time we will need to re-evaluate this decision.
Easier:
See the ADR in the dagster-dags repository.
No response
No response
No response
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7366340162
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7366274972
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7366349397
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
We want to use secrets in code locations so that we can interface with systems outside of GKE. For example, we want to send slack messages when a job fails or succeeds.
We need a structured approach to add the secrets to the GKE namespace.
Some secrets are generated in the 'dagster-infra' Terraform deployment. These secrets are added as 'Opaque' Kubernetes secrets where appropriate. See example here:
locals {
environment_toupper = upper(var.environment)
credentials = {
"KUBERNETES_${local.environment_toupper}_SA_JSON_KEY_B64" = google_service_account_key.kubernetes.private_key
"CONTAINERADMIN_${local.environment_toupper}_SA_JSON_KEY_B64" = google_service_account_key.containeradmin.private_key
"DATAREADWRITER_${local.environment_toupper}_HMAC_ACCESS_ID" = google_storage_hmac_key.key.access_id
"DATAREADWRITER_${local.environment_toupper}_HMAC_SECRET" = google_storage_hmac_key.key.secret
}
credential_ids = {
for key, value in local.credentials : key => google_secret_manager_secret.credentials[key].id
}
gcp_secret_manager_kubernetes_secret_names = toset([
for secret in data.google_secret_manager_secrets.secrets.secrets : secret["secret_id"]
])
kubernetes_secrets = merge(
{
"GCS_SECRET_ACCESS_KEY" = google_storage_hmac_key.key.secret
"GCS_ACCESS_KEY_ID" = google_storage_hmac_key.key.access_id
},
{
for secret in data.google_secret_manager_secrets.secrets.secrets : secret["secret_id"] => data.google_secret_manager_secret_version.secrets[secret["secret_id"]].secret_data
}
)
}
resource "google_secret_manager_secret" "credentials" {
for_each = local.credentials
secret_id = each.key
labels = {
terraform = true
}
replication {
auto {}
}
}
resource "google_secret_manager_secret_version" "credentials" {
for_each = local.credentials
secret = local.credential_ids[each.key]
secret_data = each.value
}
resource "kubernetes_secret" "credentials" {
for_each = local.kubernetes_secrets
metadata {
name = replace(lower(each.key), "_", "-")
namespace = "dagster-${var.environment}"
}
data = {
"${each.key}" = each.value
}
type = "Opaque"
}
To add a secret and make it available to code locations, we have created a GitHub Actions pipeline.
# dagster-infra/.github/workflows/add_secrets.yml
name: 'Add secrets'
on:
workflow_dispatch:
permissions:
contents: read
jobs:
add_secrets:
name: 'Add secrets'
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- uses: actions/setup-python@v5
with:
python-version: '3.10'
cache: 'pip'
- id: 'auth'
uses: 'google-github-actions/auth@v2'
with:
credentials_json: '${{ secrets.GOOGLE_CREDENTIALS }}'
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v2'
- name: 'Set project'
run: gcloud config set project jasperg-dagster
- name: 'Install dependencies'
run: pip install -r requirements_scripts.txt
- name: 'Add secrets'
run: python scripts/add_secret.py from-prefix DAGSTER_SECRET
env:
DAGSTER_SECRET_SLACK_BOT_OAUTH_TOKEN: '${{ secrets.DAGSTER_SECRET_SLACK_BOT_OAUTH_TOKEN }}'
The GitHub action uses the following python script:
"""
Add a secret from an environment variable to the gcp secrets manager.
Required: gcloud installed and authenticated.
"""
import os
import json
import logging
import tempfile
import pathlib as plb
from dataclasses import dataclass
import typer
import sh
logger = logging.getLogger("add_secret")
handler = logging.StreamHandler()
format = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(format)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
app = typer.Typer()
@dataclass
class ProjectConfig:
region: str
account: str
project: str
def __post_init__(self):
if self.project is None:
raise ValueError(
"Project name not found. Set it using 'gcloud config set project <PROJECT_NAME>'"
)
def log(self):
logger.debug(f"Project: {self.project}")
logger.debug(f"Account: {self.account}")
logger.debug(f"Region: {self.region}")
def _get_project_config():
project_config = json.loads(sh.gcloud.config.list("--format", "json"))
compute = project_config.get("compute")
core = project_config.get("core")
return ProjectConfig(
region=None if compute is None else compute.get("region"),
account=None if core is None else core.get("account"),
project=None if core is None else core.get("project"),
)
def _get_secret(secret_name: str):
logger.debug(f"Filtering for secret with name='{secret_name}'.")
secrets = json.loads(sh.gcloud.secrets.list("--format", "json", "--filter", secret_name))
logger.debug(f"Found {len(secrets)} secrets.")
return secrets
def _create_secret(secret_name: str):
sh.gcloud.secrets.create(secret_name, "--replication-policy", "automatic")
logger.debug(f"Created secret with name='{secret_name}'.")
def _add_secret_version(secret_name: str, secret_value: str):
with tempfile.TemporaryDirectory() as tmpdir:
_secret_file = plb.Path(tmpdir) / "secret.txt"
with _secret_file.open("w") as f:
f.write(secret_value)
sh.gcloud.secrets.versions.add(secret_name, "--data-file", str(_secret_file))
logger.debug(f"Added secret version to secret with name='{secret_name}'.")
def _from_env_var(env_var_name: str):
"""Add a secret from an environment variable to the gcp secrets manager."""
logger.debug(f"Looking for environment variable '{env_var_name}'.")
_value = os.getenv(env_var_name)
if _value is None:
raise KeyError(f"Environment variable {env_var_name} not found.")
_project_config = _get_project_config()
_project_config.log()
if len(_get_secret(env_var_name)) == 0:
_create_secret(env_var_name)
_add_secret_version(env_var_name, _value)
@app.command()
def from_env_var(
env_var_name: str = typer.Argument(
..., help="Name of the environment variable to add to the secrets manager."
)
):
_from_env_var(env_var_name)
@app.command()
def from_prefix(
prefix: str = typer.Argument(
..., help="Prefix of the environment variables to add to the secrets manager."
)
):
for k in os.environ.keys():
if k.startswith(prefix):
_from_env_var(k)
if __name__ == "__main__":
app()
A user can add secrets to the GKE deployment by adding them to the 'dagster-infra' Secrets and variables GitHub settings.
Secrets must be prefixed with "DAGSTER_SECRET", and will be added to the GKE 'dagster-prd' namespace as an 'Opaque' secret. The name of the kubernetes secret will be in lower-case, and underscores are replaced with hyphens.
For example:
DAGSTER_SECRET_SLACK_BOT_OAUTH_TOKEN
--> dagster-secret-slack-bot-oauth-token
Note that, when the secret is used in a python Dagster DAG definition, it should still be referred to using the name as defined in the GitHub secret settings. (see below for example).
This impacts end-users who want to use their own secrets in DAGs.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7384388194
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
- Check out advanced limits on orchestration using e.g. [tags defined on jobs](https://docs.dagster.io/guides/limiting-concurrency-in-data-pipelines#limiting-concurrency-using-tags)
Originally posted by @JasperHG90 in #18 (comment)
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7384367241
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7384096356
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
Please approve or deny the deployment
Workflow is pending manual review.
URL: https://github.com/JasperHG90/dagster-infra/actions/runs/7379175199
Required approvers: [JasperHG90]
Respond "approved", "approve", "lgtm", "yes" to continue workflow or "denied", "deny", "no" to cancel.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.