Git Product home page Git Product logo

celery-daemon's Introduction

Celery daemon

[TOC]

celery 在官方文档中已经说明了如何将一个worker进行daemon,这里结合项目本身的一些情况来说明如何让cmdb中运行的各种worker在后台运行起来.

现在的cmdb已经有了多个worker了,通过监听不同的队列来分别处理不同的后台任务,比如说有一个发送邮件的后台任务,一个定时收取邮件的后台任务,一个自动添加svn权限的后台任务。

之前的做法是通过screen来将这个前台运行的worker来当做后台任务处理,但是这样子有一个问题,就是每次更新了一些相应的代码以后,需要进入到不同的screen中,重新启动对应的worker,对于以后越来越多的worker,显然很不方便。

好在官方有一个daemon的方法,这里说明下如何处理

Init-script: celeryd

celeryconfig.py 和tasks.py

celeryconfig.py配置好celery tasks.py配置好任务

cat /data/www/cmdb/celeryconfig.py

# -*- encoding: utf-8 -*-

CELERY_TIMEZONE = 'Asia/Shanghai'


BROKER_URL = 'amqp://localhost//'
CELERY_IMPORTS = ('tasks', )

CELERY_RESULT_BACKEND = 'amqp'
CELERY_RESULT_PERSISTENT = True
CELERY_TASK_RESULT_EXPIRES = None

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
    'default': {
        'binding_key': 'task.#',
    },
    'send_mail': {
        'binding_key': 'send_mail.#',
    },
    'recieve_mail': {
        'binding_key': 'recieve_mail.#',
    },
    'workflow_add_server_permission': {
        'binding_key': 'workflow_add_server_permission.#',
    },
    'set_user_host': {
        'binding_key': 'set_user_host.#',
    },
    'add_svn_workflow': {
        'binding_key': 'add_svn_workflow.#',
    },
}
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default'
CELERY_ROUTES = {
    'tasks.send_mail': {
        'queue': 'send_mail',
        'routing_key': 'send_mail.a_mail'
    },
    'tasks.recieve_mail': {
        'queue': 'recieve_mail',
        'routing_key': 'recieve_mail.a_mail'
    },
    'tasks.workflow_add_server_permission': {
        'queue': 'workflow_add_server_permission',
        'routing_key': 'workflow_add_server_permission.a_server_permission'
    },
    'tasks.set_user_host': {
        'queue': 'set_user_host',
        'routing_key': 'set_user_host.a_set_user_host'
    },
    'tasks.add_svn_workflow': {
        'queue': 'add_svn_workflow',
        'routing_key': 'add_svn_workflow.a_set_user_host'
    },
}


from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'recieve_mail': {
        'task': 'tasks.recieve_mail',
        'schedule': timedelta(seconds=60),
    },
    'set_user_host': {
        'task': 'tasks.set_user_host',
        'schedule': crontab(minute=0, hour='*/4'),
    }
}

cat /data/www/cmdb/tasks.py

# -*- encoding: utf-8 -*-

from celery import Celery

app = Celery()

import celeryconfig

app.config_from_object(celeryconfig)

# from myworkflows.mails import SendEmail
# from myworkflows.mails import RecieveMail


import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

import imapy
from imapy.query_builder import Q

import re

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cmdb.settings")

import django
django.setup()

from myworkflows.models import WorkflowStateEvent
from myworkflows.utils import do_transition, get_state_user, make_email_notify


from django.contrib.auth.models import User

from cmdb.logs import *
from myworkflows.utils import make_email
from myworkflows.utils import SVN_EXCUTORS
from myworkflows.utils import format_svn
from myworkflows.models import *
from myworkflows.config import *
from assets.models import *
from users.models import UserProfileHost

import requests

ml = MailLog()

from django.db import connections

import json
from datetime import datetime

import traceback


def close_old_connections():
    for conn in connections.all():
        conn.close_if_unusable_or_obsolete()


@app.task(ignore_result=True)
def set_user_host():

    shost_log = SHostLog()

    try:
        # 重新连接数据库
        close_old_connections()

        temporary_permission = UserProfileHost.objects.filter(temporary=1, is_valid=1)

        for t in temporary_permission:
            now = datetime.now()
            if now > t.end_time:
                t.is_valid = 0
                t.save()
        shost_log.logger.info('set host status ok')
    except Exception as e:
        shost_log.logger.error('%s' % (str(e)))


@app.task(ignore_result=True)
def add_svn_workflow(wse_id):
    """自动添加svn权限"""
    wse = WorkflowStateEvent.objects.get(id=wse_id)
    content_object = wse.content_object
    svn_log = SVNLog()

    if isinstance(content_object, SVNWorkflow):
        try:
            # 重新连接数据库
            close_old_connections()

            payload = format_svn(content_object)
            url = 'https://192.168.40.11/api/addprivilege/'
            headers = {
                'Accept': 'application/json',
                'Authorization': 'Token d11205fc792d2d2def44ca55e5252dcbdcea6961',
                'Connection': 'keep-alive',
            }
            r = requests.post(url, headers=headers, data=payload, verify=False)
            svn_log.logger.info('%s: %d: %s' % (content_object.title, r.status_code, r.text))
            msg = r.json()
            result = msg['result']
            data = msg['data']
            if result:
                content_object.status = 0
                svn_log.logger.info('ok')
            else:
                content_object.status = 2
                svn_log.logger.info('%s' % (data))
            content_object.save()
        except requests.exceptions.ConnectionError:
            svn_log.logger.info('time_out')
            content_object.status = 2
            content_object.save()
        except Exception as e:
            svn_log.logger.info('%s' % (str(e)))
            content_object.status = 2
            content_object.save()

celery worker启动脚本

1 创建/etc/init.d/celeryd,内容在celery repo

2 修改权限 chmod 755 /etc/init.d/celeryd chown root:root /etc/init.d/celeryd

配置

具体的配置需要根据你的项目的实际情况,可以参考doc

/etc/default目录下,创建celeryd文件

这里列出我的关于执行svn的配置

CELERY_BIN="/data/code/cy_devops/bin/celery"

CELERY_APP="tasks:app"

CELERYD_CHDIR="/data/www/cmdb/"

CELERYD_OPTS="--time-limit=300 --concurrency=8 --hostname=1 -Q add_svn_workflow"

CELERYD_LOG_FILE="/var/log/celery/svn.log"

CELERYD_PID_FILE="/var/run/celery/svn.pid"

CELERYD_USER="root"
CELERYD_GROUP="root"

CELERY_CREATE_DIRS=1

修改启动脚本名称和指定配置文件

/etc/init.d/celeryd的说明中可以看到下面这句话:

# Short-Description: celery task worker daemon
### END INIT INFO
#
#
# To implement separate init-scripts, copy this script and give it a different
# name.  That is, if your new application named "little-worker" needs an init,
# you should use:
#
#   cp /etc/init.d/celeryd /etc/init.d/little-worker
#
# You can then configure this by manipulating /etc/default/little-worker.

很明显,如果你有多个不同的worker, 你可以分别将/etc/init.d/celeryd和/etc/default/celeryd命名为相同的名称,这样子celeryd会读取同样名称的配置文件,这样子就能很好分别对每个不同的worker进行配置。

以我这里的svn为例,那么/etc/init.d/celeryd重命名为/etc/init.d/svn-worker /etc/default/celeryd重命名为/etc/default/svn-worker

启动celery worker daemon /etc/init.d/svn-worker start

启动 celery beat daemon

celery beat按照我的理解,是一个任务调度器,通过这个celery beat来读取定时配置文件里面的内容来发送task

然后启动另外的worker来获取这些定时任务去执行。

拷贝官方的启动脚本到目录/etc/init.d/

/etc/default/目录下,编辑如下配置文件

vim /etc/default/celerybeat

CELERY_BIN="/data/code/cy_devops/bin/celery"

CELERY_APP="tasks:app"

CELERYD_CHDIR="/data/www/cmdb/"


CELERYD_LOG_FILE="/var/log/celery/celerybeat.log"

CELERYD_PID_FILE="/var/run/celery/celerybeat.pid"

CELERYD_USER="root"
CELERYD_GROUP="root"

CELERY_CREATE_DIRS=1

现在可以启动celery beat来发送定时任务了 /etc/init.d/celerybeat

启动执行定时任务的worker

cmdb还有一个worker,用来定时修改服务器权限的记录

配置启动脚本 cp /etc/init.d/svn-worker /etc/init.d/shost-beat-worker

修改配置文件

这里的配置文件和普通的worker类似,但是需要注意的是不要加上-B参数,因为你已经单独启动了celery beat

cat /etc/default/shost-beat-worker

CELERY_BIN="/data/code/cy_devops/bin/celery"

CELERY_APP="tasks:app"

CELERYD_CHDIR="/data/www/cmdb/"

CELERYD_OPTS="--time-limit=300 --concurrency=8 --hostname=shost-beat-worker -Q set_user_host -l debug"

CELERYD_LOG_FILE="/var/log/celery/shost-beat-worker.log"

CELERYD_PID_FILE="/var/run/celery/shost-beat-worker.pid"

CELERYD_USER="root"
CELERYD_GROUP="root"

CELERY_CREATE_DIRS=1

启动worker /etc/init.d/shost-beat-worker start

celery-daemon's People

Contributors

page1990 avatar

Stargazers

 avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.