Git Product home page Git Product logo

dagster-infra's People

Contributors

jasperhg90 avatar

Watchers

 avatar

dagster-infra's Issues

[Known issue]: ๐Ÿ’ฆ Dagster daemon memory leak

๐Ÿž Issue

Over time, the dagster daemon pod consumes more and more memory, without a clear reason for why this is happening. The memory footprint is reduced when we redeploy the dagster services with Terraform in the "dagster-infra" repository.

๐Ÿ’ญ What happens?

Over time, we see an increase in the memory footprint of the dagster daemon pod:

Pasted image 20240218132427

We can see this is happening in the GCP monitor tab of the dagster-daemon Kubernetes workload.

Eventually, the memory will saturate and probably kill the daemon, or create a zombie pod. We don't know this for sure yet.

๐Ÿ’ฅ What's the impact?

Depends on what happens when the memory of the daemon pod is saturated. Might crash the entire server, or might reboot.

๐Ÿ”ง Can we solve it?

No, this is a Dagster issue.

๐Ÿคท How will we deal with it?

  • Resource alert so that we know that the pod is reaching its memory limit
  • Increase the resource request for the pod. We reduced this in #20 , but these resource requests are probably too limited
  • See what happens when memory saturates. If the daemon restarts this could be fine as long as no work is lost.

๐Ÿ” How can we monitor it?

  • The alert should allow us to monitor this
  • We are researching monitoring Dagster OS deployment on GKE so that we can track whether this is a problem

๐Ÿ‘ทโ€โ™€๏ธ Is someone working on it?

Yes. See this github issue. A fix was proposed there, but it didn't seem to fix the problem for another person. We could try it.

๐Ÿ“ Checklist

  • I've linked all relevant issues, PRs, and RFCs where required
  • I've checked if someone else is working on it

[ADR]: โš™๏ธ Limiting concurrency for Dagster jobs

โœ๏ธ Context

We want to limit concurrency on Dagster jobs because. When we are triggering lots of jobs (e.g. because of a job with a lot of partitions), we don't want to open up too many connections to the PostgreSQL database, or start too many pods at the same time on the GKE cluster.

๐Ÿค Decision

We found that the easiest way to limit concurrency is to impose it on the entire cluster on the job level (all options described here). It is possible to scope this to the type of job (e.g. backfills versus other runs, example below).

Important

You can limit concurrency at different levels. This ADR only describes limiting job concurrency. It does not describe how you can limit asset concurrency (e.g. if you have lots of assets running in a single run). This should be configured in the job specification of a DAG. More information can be found here.

Limiting concurrency using this approach is achieved by setting the following property when deploying the Dagster Helm chart:

  set {
    name  = "dagsterDaemon.runCoordinator.config.queuedRunCoordinator.maxConcurrentRuns"
    value = 5
  }

This uses the default QueuedRunCoordinator. See the dagster documentation for more information.

๐Ÿท๏ธ Tag-based concurrency limits

Dagster allows you to set concurrency limits based on tags. For now, this is only configured for backfills, which are limited to three concurrent runs.

  set {
    name  = "dagsterDaemon.runCoordinator.config.queuedRunCoordinator.tagConcurrencyLimits[0].key"
    value = "dagster/backfill"
  }

  set {
    name  = "dagsterDaemon.runCoordinator.config.queuedRunCoordinator.tagConcurrencyLimits[0].limit"
    value = 3
  }

๐Ÿ’ฅ Impact

This impacts everyone deploying DAGs on the deployed server, since we are limiting their concurrency. If they need to run more jobs at the same time we will need to re-evaluate this decision.

โ˜๏ธ Consequences

Easier:

  • Don't have to worry about concurrency at the DAG level since this has been taken care of by the system.
  • Can use tags to change default concurrency limit of five jobs if required.

๐Ÿ“ Checklist (after ADR has been accepted)

  • I've set the appropriate status label
  • I've linked relevant issues, PRs, and RFCs
  • If the ADR supersedes a previous ADR, I've updated the previous ADR to reflect this

[ADR]: ๐Ÿคซ Adding secrets to dagster GKE deployment

โœ๏ธ Context

We want to use secrets in code locations so that we can interface with systems outside of GKE. For example, we want to send slack messages when a job fails or succeeds.

We need a structured approach to add the secrets to the GKE namespace.

๐Ÿค Decision

Secrets added by the Terraform deployment

Some secrets are generated in the 'dagster-infra' Terraform deployment. These secrets are added as 'Opaque' Kubernetes secrets where appropriate. See example here:

locals {
  environment_toupper = upper(var.environment)
  credentials = {
    "KUBERNETES_${local.environment_toupper}_SA_JSON_KEY_B64"     = google_service_account_key.kubernetes.private_key
    "CONTAINERADMIN_${local.environment_toupper}_SA_JSON_KEY_B64" = google_service_account_key.containeradmin.private_key
    "DATAREADWRITER_${local.environment_toupper}_HMAC_ACCESS_ID"  = google_storage_hmac_key.key.access_id
    "DATAREADWRITER_${local.environment_toupper}_HMAC_SECRET"     = google_storage_hmac_key.key.secret
  }
  credential_ids = {
    for key, value in local.credentials : key => google_secret_manager_secret.credentials[key].id
  }
  gcp_secret_manager_kubernetes_secret_names = toset([
    for secret in data.google_secret_manager_secrets.secrets.secrets : secret["secret_id"]
  ])
  kubernetes_secrets = merge(
    {
      "GCS_SECRET_ACCESS_KEY" = google_storage_hmac_key.key.secret
      "GCS_ACCESS_KEY_ID"     = google_storage_hmac_key.key.access_id
    },
    {
      for secret in data.google_secret_manager_secrets.secrets.secrets : secret["secret_id"] => data.google_secret_manager_secret_version.secrets[secret["secret_id"]].secret_data
    }
  )
}

resource "google_secret_manager_secret" "credentials" {
  for_each = local.credentials

  secret_id = each.key

  labels = {
    terraform = true
  }

  replication {
    auto {}
  }
}

resource "google_secret_manager_secret_version" "credentials" {
  for_each = local.credentials

  secret      = local.credential_ids[each.key]
  secret_data = each.value
}

resource "kubernetes_secret" "credentials" {
  for_each = local.kubernetes_secrets

  metadata {
    name      = replace(lower(each.key), "_", "-")
    namespace = "dagster-${var.environment}"
  }

  data = {
    "${each.key}" = each.value
  }

  type = "Opaque"
}

Adding secrets not generated by Terraform

To add a secret and make it available to code locations, we have created a GitHub Actions pipeline.

# dagster-infra/.github/workflows/add_secrets.yml
name: 'Add secrets'

on:
  workflow_dispatch:

permissions:
  contents: read

jobs:
  add_secrets:
    name: 'Add secrets'
    runs-on: ubuntu-latest

    steps:
    - name: Checkout
      uses: actions/checkout@v3
    - uses: actions/setup-python@v5
      with:
        python-version: '3.10'
        cache: 'pip'
    - id: 'auth'
      uses: 'google-github-actions/auth@v2'
      with:
        credentials_json: '${{ secrets.GOOGLE_CREDENTIALS }}'
    - name: 'Set up Cloud SDK'
      uses: 'google-github-actions/setup-gcloud@v2'
    - name: 'Set project'
      run: gcloud config set project jasperg-dagster
    - name: 'Install dependencies'
      run: pip install -r requirements_scripts.txt
    - name: 'Add secrets'
      run: python scripts/add_secret.py from-prefix DAGSTER_SECRET
      env:
        DAGSTER_SECRET_SLACK_BOT_OAUTH_TOKEN: '${{ secrets.DAGSTER_SECRET_SLACK_BOT_OAUTH_TOKEN }}'

The GitHub action uses the following python script:

"""
Add a secret from an environment variable to the gcp secrets manager.

Required: gcloud installed and authenticated.
"""

import os
import json
import logging
import tempfile
import pathlib as plb
from dataclasses import dataclass

import typer
import sh

logger = logging.getLogger("add_secret")
handler = logging.StreamHandler()
format = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
handler.setFormatter(format)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

app = typer.Typer()


@dataclass
class ProjectConfig:
    region: str
    account: str
    project: str

    def __post_init__(self):
        if self.project is None:
            raise ValueError(
                "Project name not found. Set it using 'gcloud config set project <PROJECT_NAME>'"
            )

    def log(self):
        logger.debug(f"Project: {self.project}")
        logger.debug(f"Account: {self.account}")
        logger.debug(f"Region: {self.region}")


def _get_project_config():
    project_config = json.loads(sh.gcloud.config.list("--format", "json"))
    compute = project_config.get("compute")
    core = project_config.get("core")
    return ProjectConfig(
        region=None if compute is None else compute.get("region"),
        account=None if core is None else core.get("account"),
        project=None if core is None else core.get("project"),
    )


def _get_secret(secret_name: str):
    logger.debug(f"Filtering for secret with name='{secret_name}'.")
    secrets = json.loads(sh.gcloud.secrets.list("--format", "json", "--filter", secret_name))
    logger.debug(f"Found {len(secrets)} secrets.")
    return secrets


def _create_secret(secret_name: str):
    sh.gcloud.secrets.create(secret_name, "--replication-policy", "automatic")
    logger.debug(f"Created secret with name='{secret_name}'.")


def _add_secret_version(secret_name: str, secret_value: str):
    with tempfile.TemporaryDirectory() as tmpdir:
        _secret_file = plb.Path(tmpdir) / "secret.txt"
        with _secret_file.open("w") as f:
            f.write(secret_value)
        sh.gcloud.secrets.versions.add(secret_name, "--data-file", str(_secret_file))
    logger.debug(f"Added secret version to secret with name='{secret_name}'.")


def _from_env_var(env_var_name: str):
    """Add a secret from an environment variable to the gcp secrets manager."""
    logger.debug(f"Looking for environment variable '{env_var_name}'.")
    _value = os.getenv(env_var_name)
    if _value is None:
        raise KeyError(f"Environment variable {env_var_name} not found.")
    _project_config = _get_project_config()
    _project_config.log()
    if len(_get_secret(env_var_name)) == 0:
        _create_secret(env_var_name)
    _add_secret_version(env_var_name, _value)


@app.command()
def from_env_var(
    env_var_name: str = typer.Argument(
        ..., help="Name of the environment variable to add to the secrets manager."
    )
):
    _from_env_var(env_var_name)


@app.command()
def from_prefix(
    prefix: str = typer.Argument(
        ..., help="Prefix of the environment variables to add to the secrets manager."
    )
):
    for k in os.environ.keys():
        if k.startswith(prefix):
            _from_env_var(k)


if __name__ == "__main__":
    app()

A user can add secrets to the GKE deployment by adding them to the 'dagster-infra' Secrets and variables GitHub settings.

Secrets must be prefixed with "DAGSTER_SECRET", and will be added to the GKE 'dagster-prd' namespace as an 'Opaque' secret. The name of the kubernetes secret will be in lower-case, and underscores are replaced with hyphens.

For example:
DAGSTER_SECRET_SLACK_BOT_OAUTH_TOKEN --> dagster-secret-slack-bot-oauth-token

Note that, when the secret is used in a python Dagster DAG definition, it should still be referred to using the name as defined in the GitHub secret settings. (see below for example).

๐Ÿ’ฅ Impact

This impacts end-users who want to use their own secrets in DAGs.

โ˜๏ธ Consequences

  • Users should not add secrets manually, but should add them using the GitHub Actions pipeline.
  • Users don't have to add secrets manually. They can simply refer to the secrets they want and use the secrets in their own pipelines.

๐Ÿ“ Checklist (after ADR has been accepted)

  • I've set the appropriate status label
  • I've linked relevant issues, PRs, and RFCs
  • If the ADR supersedes a previous ADR, I've updated the previous ADR to reflect this

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.