Git Product home page Git Product logo

ydf0509 / funboost Goto Github PK

View Code? Open in Web Editor NEW
663.0 15.0 131.0 71.32 MB

pip install funboost,python全功能分布式函数调度框架,funboost的功能是全面性重量级,用户能想得到的功能99%全都有;funboost的使用方式是轻量级,只有@boost一行代码需要写。支持python所有类型的并发模式和一切知名消息队列中间件,支持如 celery dramatiq等框架整体作为funboost中间件,python函数加速器,框架包罗万象,用户能想到的控制功能全都有。一统编程思维,兼容50% python业务场景,适用范围广。只需要一行代码即可分布式执行python一切函数,99%用过funboost的pythoner 感受是 简易 方便 强劲 强大,相见恨晚 。

License: Apache License 2.0

Python 97.11% CSS 1.26% JavaScript 0.08% HTML 1.54% Batchfile 0.02%
celery kafka nsq rabbitmq redis rocketmq sqlite tcp zeromq asyncio

funboost's Introduction

1.python万能分布式函数调度框架简funboost简介

pip install funboost ,python全功能分布式函数调度框架。  demo用法例子见文档1.3

只需要一行@boost代码即可分布式执行python一切任意函数,99%用过funboost的pythoner 感受是 方便 快速 强大。
支持python所有类型的并发模式,消息队列方面支持全球一切知名消息队列中间件和模拟的实现消息队列,
同时funboost支持celery整个框架作为核心来发布和消费消息,使用funboost的极简api方式来自动化配置和利用celery调度,
也支持huey dramatiq rq等任务队列框架作为funboost的broker。 

python函数加速器,框架包罗万象,一统编程思维,兼容50% python编程业务场景,适用范围广。
python万能分布式函数调度框架,支持5种并发模式,30+种消息队列中间件(或任务队列框架),
30种任务控制功能。给任意python函数赋能。
用途概念就是常规经典的 生产者 + 消息队列中间件 + 消费者 编程**。

框架只需要学习@boost这一个装饰器的入参就可以,所有用法几乎和1.3例子一摸一样,非常简化简单。
框架对代码没有入侵,可以加到任意已有项目而对项目python文件目录结构0要求,
不像 celery django scrapy 这样的框架,要从一开始就开始规划好项目目录结构,如果不想用框架了,
或者想改变使用其他框架框架,那么已经所写的代码组织形式就几乎成了废物,需要大改特改. 
但是funboost完全不会这样,加上或去掉@boost装饰器,对你的项目影响为0,用户照常使用,
所以用户可以对任意项目,任意时候,引入使用funboost或者去掉使用funboost,代码组织形式不需要发生变化.

框架评价

95%的用户在初步使用后,都表示赞不绝口、相见恨晚、两眼放光。认为funboost框架使用简单但功能强大和丰富。

第一次听说此框架的,100%用户会质疑框架性能不行,功能少,表示学习celery的教程文档上的所有功能
已近非常费劲头疼折磨,各种报错不知道如何解决。
一般python用户听到一个新的python框架,脚都软了,学习类似django celery scrapy意味着要学习
几个月文档才只能掌握框架的一小部分用法了,
尤其是celery这种框架,代码在pycharm完全不能自动补全提示,用户连@task装饰的函数有什么方法,
每个方法有什么入参都不知道,配置文件能写哪些配置都不知道,如果不按照博客上的celery目录结构写celery任务
,连celery命令行运行起来都要反复猜测尝试。
正因为如此用户从心理已近十分惧怕学习一种叫python框架的东西了,用户顶多愿意学习一个python包或者模块,
学习一个框架会非常害怕觉得难度高且耗时,所以非常反感尝试新的框架。
用过的99%都说funboost比celery简单方便太多,看都不看的人第一秒就是开始质疑重复造轮子.

funboost只有一个@boost装饰器,@boost入参能自动补全,更重要的是被@boost装饰的函数,
有哪些方法,每个方法入参是什么都能自动补全。funboost的中间件配置文件自当生成在用户当前项目根目录,
用户无需到处找文档,能配置什么东西,框架功能怎么配置。
因为funboost非常注重代码补全提示,所以不存在上面celery的那些复杂高难度缺点。

funboost的旧框架名字是function_scheduling_distributed_framework , 关系和兼容性见1.0.3介绍。 旧框架地址: https://github.com/ydf0509/distributed_framework

1.0 github地址和文档地址

查看分布式函数调度框架文档 https://funboost.readthedocs.io/zh-cn/latest/index.html

funboost依赖的nb_log日志文档 https://nb-log-doc.readthedocs.io/zh_CN/latest/articles/c9.html#id2

文档很长,但归根结底只需要学习 1.3 里面的这1个例子就行,主要是修改下@boost的各种参数,
通过不同的入参,实践测试下各种控制功能。
其中文档第四章列举了所有用法举例,

对比 celery 有20种改善,其中之一是无依赖文件夹层级和文件夹名字 文件名字。
首先能把  https://github.com/ydf0509/celery_demo
这个例子的已经写好的不规则目录层级和文件名字的函数用celery框架玩起来,才能说是了解celery,
否则如果项目文件夹层级和文件名字不规矩,后期再用celery,会把celery新手折磨得想死,
很多新手需要小心翼翼模仿网上说的项目目录结构,以为不按照那么规划目录和命名就玩不起来,本身说明celery很坑。

查看分布式函数调度框架github项目

1.0.3 funboost 框架 和 function_scheduling_distributed_framework 框架 关系说明

funboost 是 function_scheduling_distributed_framework的包名更新版本

旧框架地址:function_scheduling_distributed_framework框架地址链接

1.1 安装方式

pip install funboost --upgrade

或pip install funboost[all]  一次性安装所有小众三方中间件

1.2 框架功能介绍

分布式函数调度框架,支持5种并发模式,20+种消息中间件,30种任务控制功能。
用途概念就是常规经典的 生产者 + 消息队列中间件 + 消费者 编程**。

有了这个框架,用户再也无需亲自手写操作进程、线程、协程的并发的代码了。

有了这个框架,用户再也无需亲自手写操作redis rabbitmq socket kafka celery nameko了。

funboost示图: pkFFghj.png

也就是这种非常普通的流程图,一样的意思

pkFFcNQ.png

1.2.1 框架支持5种并发模式

threading (使用的是可变线程池,可以智能自动缩小和扩大线程数量,也可以运行async def的函数)
gevent
eventlet
asyncio (框架可以直接支持async 定义的携程函数作为任务,celery不支持)
single_thread

除此之外,直接内置方便的支持 多进程multiprocess 叠加 以上5种并发,多进程和以上细粒度并发是叠加的而不是平行的二选一关系。


总结一下那就是此框架可以适应所有编程场景,无论是io密集 cpu密集 还是cpu io双密集场景,框架能非常简便的应对任意场景。
框架的 单线程  多线程  gevent eventlet  asyncio 多进程  这些并发模型,囊括了目前python界所有的并发方式。
框架能自动实现 单线程  ,多线程, gevent , eventlet ,asyncio ,多进程 并发 ,
多进程 + 单线程 ,多进程 + 多线程,多进程 + gevent,  多进程 + eventlet  ,多进程 + asyncio 的组合并发
这么多并发方式能够满足任意编程场景。

以下两种方式,都是10线程加python内存queue方式运行f函数,有了此框架,用户无需代码手写手动操作线程 协程 asyncio 进程 来并发。

1)手动开启线程池方式

import time
from concurrent.futures import ThreadPoolExecutor


def f(x):
    time.sleep(3)
    print(x)


pool = ThreadPoolExecutor(10)

if __name__ == '__main__':
    for i in range(100):
        pool.submit(f, i)

2)funboost 使用内存队列,设置10线程并发

import time
from funboost import BoosterParams, BrokerEnum


@BoosterParams(queue_name="test_insteda_thread_queue", broker_kind=BrokerEnum.MEMORY_QUEUE, concurrent_num=10, is_auto_start_consuming_message=True)
def f(x):
    time.sleep(3)
    print(x)


if __name__ == '__main__':
    for i in range(100):
        f.push(i)

1.2.2 框架支持30种中间件或三方框架

框架支持 rabbitmq redis python自带的queue.Queue sqlite sqlachemy kafka pulsar mongodb 直接socket celery nameko 等作为消息中间件。

同时此框架也支持操作 kombu 库作为中间件,所以此框架能够支持的中间件类型只会比celery更多。

框架支持的中间件种类大全和选型见文档3.1章节的介绍:

3.1 各种中间件选择的场景和优势

1.2.3 框架对任务支持30种控制功能。

python通用分布式函数调度框架。适用场景范围广泛, 框架非常适合io密集型(框架支持对函数自动使用 thread gevent eventlet asyncio 并发)
框架非常适合cpu密集型(框架能够在线程 协程基础上 叠加 多进程 multi_process 并发 ,不仅能够多进程执行任务还能多机器执行任务)。
不管是函数需要消耗时io还是消耗cpu,用此框架都很合适,因为任务都是在中间件里面,可以自动分布式分发执行。 此框架是函数的辅助控制倍增器。

框架不适合的场景是 函数极其简单,例如函数只是一行简单的 print hello,函数只需要非常小的cpu和耗时,运行一次函数只消耗了几十hz或者几纳秒,
此时那就采用直接调用函数就好了,因为框架施加了很多控制功能,当框架的运行逻辑耗时耗cpu 远大于函数本身 时候,使用框架反而会使函数执行变慢。

(python框架从全局概念上影响程序的代码组织和运行,包和模块是局部的只影响1个代码文件的几行。)

可以一行代码分布式并发调度起一切任何老代码的旧函数和新项目的新函数,并提供数十种函数控制功能。

还是不懂框架能做什么是什么,就必须先去了解下celery rq。如果连celery rq类似这种的用途概念听都没听说, 那就不可能知道框架的概念和功能用途。

20种控制功能包括:

     分布式:
        支持数十种最负盛名的消息中间件.(除了常规mq,还包括用不同形式的如 数据库 磁盘文件 redis等来模拟消息队列)

     并发:
        支持threading gevent eventlet asyncio 单线程 5种并发模式 叠加 多进程。
        多进程不是和前面四种模式平行的,是叠加的,例如可以是 多进程 + 协程,多进程 + 多线程。
     
     控频限流:
        例如十分精确的指定1秒钟运行30次函数或者0.02次函数(无论函数需要随机运行多久时间,都能精确控制到指定的消费频率;
       
     分布式控频限流:
        例如一个脚本反复启动多次或者多台机器多个容器在运行,如果要严格控制总的qps,能够支持分布式控频限流。
      
     任务持久化:
        消息队列中间件天然支持
     
     断点接续运行:
        无惧反复重启代码,造成任务丢失。消息队列的持久化 + 消费确认机制 做到不丢失一个消息
        (此框架很重视消息的万无一失,就是执行函数的机器支持在任何时候随时肆无忌惮反复粗暴拉电闸断电,或者强制硬关机,
        或者直接用锄头把执行函数代码的机器砸掉,只要不是暴力破坏安装了消息队列中间件的机器就行,消息就万无一失,
        现在很多人做的简单redis list消息队列,以为就叫做分布式断点接续,那是不正确的,因为这种如果把消息从reidis brpop取出来后,
        如果消息正在被执行,粗暴的kill -9脚本或者直接强制关机,那么正在运行的消息就丢失了,如果是多线程同时并发运行很多消息,粗暴重启
        会丢失几百个大量消息,这种简单的redis list根本就不能叫做安全的断点续传。
        分布式函数调度框架的消费确认机制,保证函数运行完了才确认消费,正在运行突然强制关闭进程不会丢失一个消息,
        下次启动还会消费或者被别的机器消费。
        此框架的消息万无一失特性,不仅支持rabbbitmq因为原生支持,也支持redis,框架对redis的实现机制是因为客户端加了一层保障)。
     
     定时:
        可以按时间间隔、按指定时间执行一次、按指定时间执行多次,使用的是apscheduler包的方式。
     
     延时任务:
         例如规定任务发布后,延迟60秒执行,或者规定18点执行。这个概念和定时任务有一些不同。
              
     指定时间不运行:
        例如,有些任务你不想在白天运行,可以只在晚上的时间段运行
     
     消费确认:
        这是最为重要的一项功能之一,有了这才能肆无忌惮的任性反复重启代码也不会丢失一个任务。
        (常规的手写 redis.lpush + redis.blpop,然后并发的运行取出来的消息,随意关闭重启代码瞬间会丢失大量任务,
        那种有限的 断点接续 完全不可靠,根本不敢随意重启代码)
     
     立即重试指定次数:
        当函数运行出错,会立即重试指定的次数,达到最大次重试数后就确认消费了
     
     重新入队:
        在消费函数内部主动抛出一个特定类型的异常ExceptionForRequeue后,消息重新返回消息队列
     
     超时杀死:
        例如在函数运行时间超过10秒时候,将此运行中的函数kill
     
     计算消费次数速度:
        实时计算单个进程1分钟的消费次数,在日志中显示;当开启函数状态持久化后可在web页面查看消费次数
     
     预估消费时间:
        根据前1分钟的消费次数,按照队列剩余的消息数量来估算剩余的所需时间
     
     函数运行日志记录:
        使用自己设计开发的 控制台五彩日志(根据日志严重级别显示成五种颜色;使用了可跳转点击日志模板)
        + 多进程安全切片的文件日志 + 可选的kafka elastic日志
                   
     任务过滤:
        例如求和的add函数,已经计算了1 + 2,再次发布1 + 2的任务到消息中间件,可以让框架跳过执行此任务。
        任务过滤的原理是使用的是函数入参判断是否是已近执行过来进行过滤。
     
     任务过滤有效期缓存:
        例如查询深圳明天的天气,可以设置任务过滤缓存30分钟,30分钟内查询过深圳的天气,则不再查询。
        30分钟以外无论是否查询过深圳明天的天气,则执行查询。
        
     任务过期丢弃:
        例如消息是15秒之前发布的,可以让框架丢弃此消息不执行,防止消息堆积,
        在消息可靠性要求不高但实时性要求高的高并发互联网接口中使用
                
     函数状态和结果持久化:
        可以分别选择函数状态和函数结果持久化到mongodb,使用的是短时间内的离散mongo任务自动聚合成批量
        任务后批量插入,尽可能的减少了插入次数
                      
     消费状态实时可视化:
        在页面上按时间倒序实时刷新函数消费状态,包括是否成功 出错的异常类型和异常提示 
        重试运行次数 执行函数的机器名字+进程id+python脚本名字 函数入参 函数结果 函数运行消耗时间等
                     
     消费次数和速度生成统计表可视化:
        生成echarts统计图,主要是统计最近60秒每秒的消费次数、最近60分钟每分钟的消费次数
        最近24小时每小时的消费次数、最近10天每天的消费次数
                                
     rpc:
        生产端(或叫发布端)获取消费结果。各个发布端对消费结果进行不同步骤的后续处理更灵活,而不是让消费端对消息的处理一干到底。

     远程服务器部署消费函数:
        代码里面 task_fun.fabric_deploy('192.168.6.133', 22, 'xiaomin', '123456', process_num=2) 只需要这样就可以自动将函数部署在远程机器运行,
        无需任何额外操作,不需要借助阿里云codepipeline发版工具 和 任何运维发版管理工具,就能轻松将函数运行在多台远程机器。task_fun指的是被@boost装饰的函数

     暂停消费:
        支持从python解释器外部/远程机器 ,控制暂停消息消费和继续消费。

     优先级队列:
         支持队列优先级消息。

     远程杀死(取消)任务:
         支持在发布端杀死正在运行的消息,发送杀死命令时候对还未取出的消息则放弃运行消息。
    
     funboost支持命令行操作:
         使用fire实现的命令行,见文档第12章

关于稳定性和性能,一句话概括就是直面百万c端用户(包括app和小程序), 已经连续超过三个季度稳定高效运行无事故,从没有出现过假死、崩溃、内存泄漏等问题。 windows和linux行为100%一致,不会像celery一样,相同代码前提下,很多功能在win上不能运行或出错。

1.3 框架使用例子

使用之前先学习 PYTHONPATH的概念 https://github.com/ydf0509/pythonpathdemo

win cmd和linux 运行时候,设置 PYTHONPATH 为项目根目录,是为了自动生成或读取到项目根目录下的 funboost_config.py文件作为配置。

以下这只是简单求和例子,实际情况换成任意函数里面写任意逻辑,框架可没有规定只能用于 求和函数 的自动调度并发。
而是根据实际情况函数的参数个数、函数的内部逻辑功能,全部都由用户自定义,函数里面想写什么就写什么,想干什么就干什么,极端自由。
也就是框架很容易学和使用,把下面的task_fun函数的入参和内部逻辑换成你自己想写的函数功能就可以了,框架只需要学习boost这一个函数的参数就行。
测试使用的时候函数里面加上sleep模拟阻塞,从而更好的了解框架的并发和各种控制功能。

有一点要说明的是框架的消息中间件的ip 端口 密码 等配置是在你第一次运行代码时候,在你当前项目的根目录下生成的 funboost_config.py 按需设置。
import time
from funboost import boost, BrokerEnum,BoosterParams

# BoosterParams 代码自动补全请看文档4.1.3
@boost(BoosterParams(queue_name="task_queue_name1", qps=5, broker_kind=BrokerEnum.SQLITE_QUEUE))  # 入参包括20种,运行控制方式非常多,想得到的控制都会有。
def task_fun(x, y):
    print(f'{x} + {y} = {x + y}')
    time.sleep(3)  # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 5 次 这个 task_fun 函数的目的。


if __name__ == "__main__":
    for i in range(100):
        task_fun.push(i, y=i * 2)  # 发布者发布任务
    task_fun.consume()  # 消费者启动循环调度并发消费任务
tips: sqlite作为消息队列,如果linux或mac运行报错read-only文件夹权限,需修改SQLLITE_QUEUES_PATH 就好啦,见文档10.3 
"""
对于消费函数,框架内部会生成发布者(生产者)和消费者。
1.推送。 task_fun.push(1,y=2) 会把 {"x":1,"y":2} (消息也自动包含一些其他辅助信息) 发送到中间件的 task_queue_name1 队列中。
2.消费。 task_fun.consume() 开始自动从中间件拉取消息,并发的调度运行函数,task_fun(**{"x":1,"y":2}),每秒运行5次
整个过程只有这两步,清晰明了,其他的控制方式需要看 boost 的中文入参解释,全都参数都很有用。


这个是单个脚本实现了发布和消费,一般都是分离成两个文件的,任务发布和任务消费无需在同一个进程的解释器内部,
因为是使用了中间件解耦消息和持久化消息,不要被例子误导成了,以为发布和消费必须放在同一个脚本里面


框架使用方式基本上只需要练习这一个例子就行了,其他举得例子只是改了下broker_kind和其他参数而已,
而且装饰器的入参已近解释得非常详细了,框架浓缩到了一个装饰器,并没有用户需要从框架里面要继承什么组合什么的复杂写法。
用户可以修改此函数的sleep大小和@boost的数十种入参来学习 验证 测试框架的功能。
"""

运行截图:

pkFkP4H.png

pkFkCUe.png

pkE6IYR.png

1.4 python分布式函数执行为什么重要?

python比其他语言更需要分布式函数调度框架来执行函数,有两点原因

1 python有gil,
  直接python xx.py启动没有包括multipricsessing的代码,在16核机器上,cpu最多只能达到100%,也就是最高使用率1/16,
  别的语言直接启动代码最高cpu可以达到1600%。如果在python代码里面亲自写多进程将会十分麻烦,对代码需要改造需要很大
  ,多进程之间的通讯,多进程之间的任务共享、任务分配,将会需要耗费大量额外代码,
  而分布式行函数调度框架天生使用中间件解耦的来存储任务,使得单进程的脚本和多进程在写法上
  没有任何区别都不需要亲自导入multiprocessing包,也不需要手动分配任务给每个进程和搞进程间通信,
  因为每个任务都是从中间件里面获取来的。
  
2 python性能很差,不光是gil问题,只要是动态语言无论是否有gil限制,都比静态语言慢很多。
 那么就不光是需要跨进程执行任务了,例如跨pvm解释器启动脚本共享任务(即使是同一个机器,把python xx.py连续启动多次)、
 跨docker容器、跨物理机共享任务。只有让python跑在更多进程的cpu核心 跑在更多的docker容器 跑在更多的物理机上,
 python才能获得与其他语言只需要一台机器就实现的执行速度。分布式函数调度框架来驱动函数执行针对这些不同的场景,
 用户代码不需要做任何变化。
 
所以比其他语言来说,python是更需要分布式函数调度框架来执行任务。
  

1.5 框架学习方式

把1.3的求和例子,通过修改boost装饰器额参数和sleep大小反复测试两数求和,
从而体会框架的分布式 并发 控频。

这是最简单的框架,只有@boost 1行代码需要学习。说的是这是最简单框架,这不是最简单的python包。
如果连只有一个重要函数的框架都学不会,那就学不会学习得了更复杂的其他框架了,大部分框架都很复杂比学习一个包难很多。
大部分框架,都要深入使用里面的很多个类,还需要继承组合一顿。

1.6 funboost支持支持celery框架整体作为funboost的broker (2023.4新增)

见11.1章节代码例子,celery框架整体作为funboost的broker,funboost的发布和消费将只作为极简api,核心的消费调度和发布和定时功能,都是由celery框架来完成,funboost框架的发布和调度代码不实际起作用。
用户操作funboost的api,语法和使用其他消息队列中间件类型一样,funboost自动化操作celery。

用户无需操作celery本身,无需敲击celery难记的命令行启动消费、定时、flower;
用户无需小心翼翼纠结亲自使用celery时候怎么规划目录结构 文件夹命名 需要怎么在配置写include 写task_routes,
完全不存在需要固定的celery目录结构,不需要手动配置懵逼的任务路由,不需要配置每个函数怎么使用不同的队列名字,funboost自动搞定这些。

用户只需要使用简单的funboost语法就能操控celery框架了。funboost使用celery作为broker_kind,远远的暴击亲自使用无法ide下代码补全的celery框架的语法。
funboost通过支持celery作为broker_kind,使celer框架变成了funboost的一个子集

查看分布式函数调度框架完整文档

funboost's People

Contributors

leec20 avatar t880216t avatar ydf0509 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

funboost's Issues

funboost mysql

我不打算用 celery,因为他没办法和mysql完美的结合,我打算测试一下你这个 产品

cannot set 'is_timeout' attribute of immutable type 'TimeoutError'

python 3.10.3 运行会出现这个哦
Traceback (most recent call last):
File "", line 1027, in _find_and_load
File "", line 1006, in find_and_load_unlocked
File "", line 688, in load_unlocked
File "", line 883, in exec_module
File "", line 241, in call_with_frames_removed
File "D:\python\python3.10\lib\site-packages\funboost_init
.py", line 11, in
from funboost.assist.user_custom_broker_register import register_custom_broker
File "D:\python\python3.10\lib\site-packages\funboost\assist\user_custom_broker_register.py", line 2, in
from funboost.publishers.base_publisher import AbstractPublisher
File "D:\python\python3.10\lib\site-packages\funboost\publishers\base_publisher.py", line 22, in
from funboost.concurrent_pool import CustomThreadPoolExecutor
File "D:\python\python3.10\lib\site-packages\funboost\concurrent_pool_init
.py", line 14, in
from .custom_evenlet_pool_executor import CustomEventletPoolExecutor
File "D:\python\python3.10\lib\site-packages\funboost\concurrent_pool\custom_evenlet_pool_executor.py", line 7, in
from eventlet import greenpool, monkey_patch, patcher, Timeout
File "D:\python\python3.10\lib\site-packages\eventlet_init.py", line 17, in
from eventlet import convenience
File "D:\python\python3.10\lib\site-packages\eventlet\convenience.py", line 7, in
from eventlet.green import socket
File "D:\python\python3.10\lib\site-packages\eventlet\green\socket.py", line 4, in
import('eventlet.green.socket_nodns')
File "D:\python\python3.10\lib\site-packages\eventlet\green_socket_nodns.py", line 11, in
from eventlet import greenio
File "D:\python\python3.10\lib\site-packages\eventlet\greenio_init
.py", line 3, in
from eventlet.greenio.base import * # noqa
File "D:\python\python3.10\lib\site-packages\eventlet\greenio\base.py", line 32, in
socket_timeout = eventlet.timeout.wrap_is_timeout(socket.timeout)
File "D:\python\python3.10\lib\site-packages\eventlet\timeout.py", line 166, in wrap_is_timeout
base.is_timeout = property(lambda _: True)
TypeError: cannot set 'is_timeout' attribute of immutable type 'TimeoutError'

是否可以多任务依赖执行

很有幸了解到这个项目。我想问一下,多任务依赖,怎么配备优先级,以及等待任务执行完成之后 再执行。
比如: 当前时间为 2点, 任务参数为1 点
那么我在2点执行的时候是1点数据,因为别的原因导致任务超时或者失败, 重新执行2点的任务。 但是可能任务参数 为3点了。 是否会保存当时执行的参数。 并且可以判断多任务依赖关系。 A->b/d/f->C

结合tornado使用

hi
想请问一下funboost如何结合tornado实现异步非阻塞啊? 我想使用tornado通过funboost向队列中发消息 然后异步的获取执行结果 然后结束响应。
thks

能否支持降级

支持多种中间件 能否通过配置支持降级 [ kafka -> MQ - > redis -> 本地文件 ] 这种

在django项目中使用时,会阻塞django进程(用户自己问题)

大佬,我在django项目中使用时,会阻塞django进程(可能我用的不对?),
我觉得应该不会阻塞才对,不知怎么改进了。。

  1. 定义一个任务
from funboost import boost, BrokerEnum

@boost('run_queue', broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE, is_using_rpc_mode=True, log_level=50)
def backup_task(device_info):
    backup = BackupNetConfTask(device_info)
    return "success"
  1. 在视图函数中调用任务
from tasks import backup_task
    ... ...
    @action(methods=['post'], detail=False)
    def run_backup_task(self, request, *args, **kwargs):
        """
        api/v1/devices/run_backup_task/
        :param request: 请求对象
        :param args: 元组参数
        :param kwargs: 字典参数
        :return:
        """
        device_infos = get_device_infos() 
        for device_info in device_infos:
            backup_task.pub(dict(device_info=device_info))  <------------------ 程序阻塞在这里了
        return CustomResponse(status=status.HTTP_200_OK, msg="run backup task success")
  1. 启动消费者
from tasks import backup_task
backup_task.consume()

mongodb 作为队列报错

报错运行截图

image

python version: 3.9
funboost version: 14.8

代码

from funboost import boost, BrokerEnum, ConcurrentModeEnum, FunctionResultStatusPersistanceConfig


@boost(
    "queue2",
    broker_kind=BrokerEnum.MONGOMQ,
    function_result_status_persistance_conf=FunctionResultStatusPersistanceConfig(True, True, 7*25*36000)
)
def print_hi(name):
    # Use a breakpoint in the code line below to debug your script.
    print(f'Hi, {name}')  # Press ⌘F8 to toggle the breakpoint.


# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    for i in range(100):
        pass
    print_hi.push(f"Hello")

    print_hi.consume()
    # print_hi.consume()
    # print_hi.consume()

monbodb docker-compose

version: '3.1'

services:

  mongo:
    image: mongo
    restart: always
    ports:
      - 27017:27017
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
    volumes:
      - ./data:/data/db

  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_ADMINPASSWORD: example
      ME_CONFIG_MONGODB_URL: mongodb://root:example@mongo:27017/

mongo version:

➜  mongo docker exec cb5a5c4c8932 mongo --version
MongoDB shell version v5.0.5
Build Info: {
    "version": "5.0.5",
    "gitVersion": "d65fd89df3fc039b5c55933c0f71d647a54510ae",
    "openSSLVersion": "OpenSSL 1.1.1f  31 Mar 2020",
    "modules": [],
    "allocator": "tcmalloc",
    "environment": {
        "distmod": "ubuntu2004",
        "distarch": "x86_64",
        "target_arch": "x86_64"
    }
}

运行示例程序有报错?(用户自己的问题)

import time
from funboost import boost, BrokerEnum,get_consumer,run_consumer_with_multi_process

def f(a, b):
    return a+b

consumer = get_consumer('queue_test_f01', consuming_function=f,qps=0.2, broker_kind=2)

if __name__ == "__main__":
    # 需要手动指定consuming_function入参的值。
    for i in range(10, 2000):
        consumer.publisher_of_same_queue.publish(dict(a=i, b=i * 2))
        
    consumer.start_consuming_message()
    run_consumer_with_multi_process(consumer,8)
    

运行后,报错:

Traceback (most recent call last):
  File "main.py", line 16, in <module>
    run_consumer_with_multi_process(consumer,8)
  File "/home/project/test_mul_thread/venv/lib/python3.8/site-packages/funboost/helpers.py", line 45, in run_consumer_with_multi_process
    if not getattr(task_fun, 'is_decorated_as_consume_function'):
AttributeError: 'RedisConsumer' object has no attribute 'is_decorated_as_consume_function'

但不影响程序执行,程序会正常继续运行,想知道这个错误是否可以规避?

使用延时任务发布的时候,任务是async的就会出错,是不支持吗还是我写错了

@boost("register", broker_kind=BrokerEnum.REDIS_STREAM,
       concurrent_mode=ConcurrentModeEnum.ASYNC, log_level=20)
async def register(cookie_dict: dict, meta_dict: dict, mobile: str):
       pass

register.publish({'cookie_dict': cookie_dict, 'meta_dict': meta_dict, 'mobile': mobile},
                                 priority_control_config=PriorityConsumingControlConfig(countdown=40))

报错

Error submitting job "AsyncPoolExecutor.submit (trigger: date[2022-04-09 17:55:16 CST], next run at: 2022-04-09 17:55:16 CST)" to executor "default"
Traceback (most recent call last):
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\schedulers\base.py", line 979, in _process_jobs
    executor.submit_job(job, run_times)
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\executors\base.py", line 71, in submit_job
    self._do_submit_job(job, run_times)
  File "C:\ProgramData\Miniconda3\lib\site-packages\apscheduler\executors\pool.py", line 28, in _do_submit_job
    f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
  File "C:\ProgramData\Miniconda3\lib\concurrent\futures\thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

python 3.10的Callable导入错误

使用python3.10版本运行测试案例报错

问题已解决

报错情况:

  File "C:\Users\90957\Documents\2022-1\fastapitest\test.py", line 2, in <module>
    from funboost import boost, BrokerEnum
  File "C:\software\Anaconda\envs\funboost\lib\site-packages\funboost\__init__.py", line 12, in <module>
    from funboost.consumers.base_consumer import (ExceptionForRequeue, ExceptionForRetry,
  File "C:\software\Anaconda\envs\funboost\lib\site-packages\funboost\consumers\base_consumer.py", line 25, in <module>
    from collections import Callable
ImportError: cannot import name 'Callable' from 'collections' (C:\software\Anaconda\envs\funboost\lib\collections\__init__.py)

原因:Deprecated since version 3.9: collections.abc.Callable Callable在python3.9已弃用
解决方案:使用低于3.9的python

出错的环境的配置完整命令:

conda create -n env_name python
conda activate env_name 
pip install funboost --upgrade

使用的代码

import time
from funboost import boost, BrokerEnum


@boost("task_queue_name1", qps=5, broker_kind=BrokerEnum.PERSISTQUEUE)  # 入参包括20种,运行控制方式非常多,想得到的控制都会有。
def task_fun(x, y):
    print(f'{x} + {y} = {x + y}')
    time.sleep(3)  # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 5 次 这个 task_fun 函数的目的。


if __name__ == "__main__":
    for i in range(100):
        task_fun.push(i, y=i * 2)  # 发布者发布任务
    task_fun.consume()  # 消费者启动循环调度并发消费任务

求教,案例代码跑,出现这个,是中间件连接不对吗(不支持python3.10现在)

截屏2022-08-21 22 10 53

File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/eventlet/green/_socket_nodns.py", line 11, in from eventlet import greenio File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/eventlet/greenio/__init__.py", line 3, in from eventlet.greenio.base import * # noqa File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/eventlet/greenio/base.py", line 32, in socket_timeout = eventlet.timeout.wrap_is_timeout(socket.timeout) File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/eventlet/timeout.py", line 166, in wrap_is_timeout base.is_timeout = property(lambda _: True) TypeError: cannot set 'is_timeout' attribute of immutable type 'TimeoutError'

复制文档中《4.4 演示如何定时运行。》代码直接给我报错了

15:28:17 "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/tasks/test.py:7" queue_test_666 的消费者
2022-02-07 15:28:20 - apscheduler.scheduler - "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/venv/lib/python3.9/site-packages/apscheduler/schedulers/base.py:988" - _process_jobs - ERROR - Error submitting job "timing_publish_deco.._deco (trigger: interval[0:00:03], next run at: 2022-02-07 15:28:20 CST)" to executor "default"
Traceback (most recent call last):
File "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/venv/lib/python3.9/site-packages/apscheduler/schedulers/base.py", line 979, in _process_jobs
executor.submit_job(job, run_times)
File "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/venv/lib/python3.9/site-packages/apscheduler/executors/base.py", line 71, in submit_job
self._do_submit_job(job, run_times)
File "/Users/zhangxiaodong/PycharmProjects/GameFiBridge/venv/lib/python3.9/site-packages/apscheduler/executors/pool.py", line 28, in _do_submit_job
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 163, in submit
raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown

mqtt项目实例

大佬能否出一个mqtt项目实例,文档中没有找到相关使用说明。

Small wishes

非常仰慕,非常好用
一个小愿望:setup中是否考虑使用extras_require
默认还可以安装所有组件,可以按需选装组件。
非常感谢作者的贡献

win11 python3.10.0版本安装gevent报错,要求c++环境

安装报错,我截几个关键的位置,不知道如何解决
Building wheels for collected packages: gevent
Building wheel for gevent (pyproject.toml) ... error
error: subprocess-exited-with-error

× Building wheel for gevent (pyproject.toml) did not run successfully.
│ exit code: 1
╰─> [289 lines of output]
.................
error: Microsoft Visual C++ 14.0 or greater is required. Get it with "Microsoft C++ Build Tools": https://visualstudio.microsoft.com/visual-cpp-build-tools/
[end of output]

note: This error originates from a subprocess, and is likely not a problem with pip.
ERROR: Failed building wheel for gevent
Failed to build gevent
ERROR: Could not build wheels for gevent, which is required to install pyproject.toml-based projects

关于日志的询问

根据指引 把实际项目附加到funboost项目下,方便调试
实际生产项目中使用了阿里云 SLS日志服务。

工程项目使用了dictConfig和get_logger(name)
funbootst使用了推荐的PYTHONPATH的方式导入工程
根据funboost的规划,如我我需要继续使用 阿里云 sls服务,我似乎应该导入ng_log.get_logger 使用logger.addHandler()
logger.addFilter()去实现,是这样的么?

nb 克拉斯

确实nb,但是吧,引入了好多项目用不上的包。flask、kafka、mogodb…

关于setup install后部署运行问题?

大佬好,我将funboost集成到自己写的一个包里,位置在/root/rdown。这个包是我写的命令行,需要Python setup.py install后安装到/root/anaconda3/lib下进行命令行使用,这时喔该如何启动消费者呢?是在/root/rdown下还是/root/ansconda3/lib/site-packages/rdown下呢?

其实celery也是一样的问题,但是我各种调试不好celery的后台任务,每次命令行执行都OK,一旦按照官方文档systemctl后台部署就不对了

请问支持task按组限制qps吗

其实就是一个需求, 我们调用某saas服务, 最高100qps 超过就会封禁一直断时间, 而且此saas的接口的qps限制是共用的

也就是我们写的 一部分task 加起来的qps 不得超过100 但是我们又得让他尽量靠近100 这样可以最大化利用资源

文档我看了两遍
但是好像没有 按组 限制qps的

恳请解答 感谢作者开源,优秀项目

push任务之后想要阻塞进程弹出如下错误?(没有设置rpc模式)

大佬, 我创建了2个任务,第一个负责下载,第二个负责处理;但是负责处理的任务依赖于第一个,而处理任务会有大量的IO,因此我设置了下载2个线程处理,而处理任务只开启了1个。于是在提交任务的函数中设置了获取下载的运行状态,要求回调结果,但是弹出下面的错误,简化代码如下:

from tasks import download_task, compress_task

def request(url, tempfn):
    '''
    url 为下载地址
    tempfn 为下载的本地路径
    '''
    download_status = download_task.push(url)
    if download_status.result:
        compress_task.push(tempfn)

错误:

raceback (most recent call last):

  File "/root/anaconda3/lib/python3.9/threading.py", line 930, in _bootstrap
    self._bootstrap_inner()
    │    └ <function Thread._bootstrap_inner at 0x7f3b3c808af0>
    └ <DummyProcess(Thread-8, started daemon 139891555944192)>
  File "/root/anaconda3/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
    │    └ <function Thread.run at 0x7f3b3c808820>
    └ <DummyProcess(Thread-8, started daemon 139891555944192)>
  File "/root/anaconda3/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <DummyProcess(Thread-8, started daemon 139891555944192)>
    │    │        │    └ (<_queue.SimpleQueue object at 0x7f3b2179af40>, <_queue.SimpleQueue object at 0x7f3b2179aa40>, None, (), None, False)
    │    │        └ <DummyProcess(Thread-8, started daemon 139891555944192)>
    │    └ <function worker at 0x7f3b217c71f0>
    └ <DummyProcess(Thread-8, started daemon 139891555944192)>
  File "/root/anaconda3/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (('reanalysis-era5-pressure-levels', {'variable': 'geopotential', 'year': '2020', 'month': '4', 'format': 'netcdf', 'area': '...
                    └ <function only_request at 0x7f3b23864d30>
> File "/root/anaconda3/lib/python3.9/site-packages/era5/cli.py", line 111, in only_request
    if downloas_status.result: # 为了阻塞线程
       │               └ <property object at 0x7f3b24a51450>
       └ <funboost.publishers.base_publisher.AsyncResult object at 0x7f3b2078e2b0>
  File "/root/anaconda3/lib/python3.9/site-packages/funboost/publishers/base_publisher.py", line 68, in result
    return self.get()
           │    └ <function AsyncResult.get at 0x7f3b24a52280>
           └ <funboost.publishers.base_publisher.AsyncResult object at 0x7f3b2078e2b0>
  File "/root/anaconda3/lib/python3.9/site-packages/funboost/publishers/base_publisher.py", line 64, in get
    raise HasNotAsyncResult
          └ <class 'funboost.publishers.base_publisher.HasNotAsyncResult'>

多进程锁竞争严重

  • 提示
"C:\Python38\lib\site-packages\funboost\consumers\confirm_mixin.py:70" 这行代码没有获得redis锁 fsdf_lock__requeue_tasks_which_unconfirmed:class_sch_group_check
  • 程序情况
  1. 开了五个实例(进程)
  2. 每个进程 qps=1, concurrent_num=1 (因为这个任务为CPU密集型
  3. push很多任务,但同时运行的进程大约为 2-4
  4. 后面竞争起来了,有效进程就为2了,然后参与锁竞争的进程好像还会满载cpu

supporting python3.10+ maybe?

非常好用的框架,之前一直在用celery,发现这个框架之后,我觉得无论从项目设计规划和上手难易度来说,这个框架都非常棒。

目前问题是,这个框架在使用python 3.10+的时候,存在一些不兼容,请问作者是否有计划加入pyhton 3.10的支持。

提个建议

下载完之后100多m,能不能精简精简,把不必要的库都删掉,只保留必须的库,只是建议啊,希望国产这库越来越好!

引入的包太多了,可以优化包的管理吗?

刚才重新创建了环境,专门用于学习测试这个框架,装一个包,发现有很多依赖包:
Successfully installed AMQPStorm-2.7.1 Automat-20.2.0 Flask-2.1.1 Jinja2-3.1.1 MarkupSafe-2.1.1 Werkzeug-2.1.1 aiohttp-3.8.0 aiosignal-1.2.0 amqp-5.1.0 apscheduler-3.7.0 async-timeout-4.0.2 attrs-21.4.0 bcrypt-3.2.0 blinker-1.4 certifi-2021.10.8 cffi-1.15.0 charset-normalizer-2.0.12 click-8.1.2 concurrent-log-handler-0.9.19 constantly-15.1.0 cryptography-36.0.2 decorator-4.4.0 deprecated-1.2.13 dnspython-1.16.0 dominate-2.6.0 elastic-transport-8.1.1 elasticsearch-8.1.2 eventlet-0.31.0 fabric2-2.6.0 flask-bootstrap-3.3.7.1 flask-login-0.6.0 flask-wtf-1.0.1 frozenlist-1.3.0 funboost-15.9 gevent-21.1.2 gnsq-1.0.1 greenlet-1.1.2 hyperlink-21.0.0 idna-3.3 importlib-metadata-4.11.3 incremental-21.3.0 invoke-1.7.0 itsdangerous-2.1.2 kafka-python-2.0.2 kombu-5.2.4 multidict-6.0.2 nats-python-0.8.0 nb-filelock-0.7 nb-log-7.5 packaging-21.3 paho-mqtt-1.6.1 pamqp-2.3.0 paramiko-2.10.3 pathlib2-2.3.7.post1 persist-queue-0.7.0 pika-1.2.0 pikav0-0.1.23 pikav1-1.0.13 portalocker-2.4.0 psutil-5.9.0 pyasn1-0.4.8 pyasn1-modules-0.2.8 pycparser-2.21 pymongo-4.0.2 pynacl-1.5.0 pyparsing-3.0.7 pysnooper-1.1.0 python-json-logger-0.1.10 pytz-2022.1 pyzmq-22.3.0 rabbitpy-2.0.1 redis-4.2.1 redis2-2.10.6.3 redis3-3.5.2.3 requests-2.27.1 rocketmq-0.4.4 semantic-version-2.9.0 service-identity-21.1.0 setuptools-rust-1.2.0 six-1.16.0 sqlalchemy-1.3.10 sqlalchemy-utils-0.36.1 tomorrow3-1.1.0 tornado-6.1 twisted-22.2.0 typing-extensions-4.1.1 tzlocal-2.1 urllib3-1.26.9 vine-5.0.0 visitor-0.1.3 wrapt-1.14.0 wtforms-3.0.1 yarl-1.7.2 zipp-3.7.0 zmq-0.0.0 zope.event-4.5.0 zope.interface-5.4.0

建议:

  1. 把不必要的依赖去掉;
  2. 如果实在需要,是否可以按需引入;

pulsar怎么使用

粗略看了下,好像只有constant定义了,配置、工厂和其他地方都没有相关代码,是需要自己拓展吗?还是我没有留意到?

NameError: name 'sqlalchemy' is not defined

lib/python3.9/site-packages/funboost/queues/sqla_queue.py", line 66, in SessionContext
def init(self, session: sqlalchemy.orm.session.Session):
NameError: name 'sqlalchemy' is not defined

我在运行例子的时候报错了
Uploading 截屏2022-10-19 15.06.04.png…

建议优化相关包管理

验证了一下,准备投入使用试试,发现还是一大堆不兼容的问题,主要体现在包上。
image

  1. 建议使用 poetry 来进行依赖管理,及时更新对新版本的支持。
  2. 建议移除无关的包,或者把一些中间件做成可选安装的。

兼容性问题用起来确实有点头大。

consumers/base_consumer.py 中的一处疑惑

在 consumers/base_consumer.py 文件的 schedulal_task_with_no_block() 中,第 1097 行代码:
elif self._concurrent_mode == ConcurrentModeEnum.GEVENT:
是不是应该改为:
elif self._concurrent_mode == ConcurrentModeEnum.EVENTLET:

设置任务60s执行一次,但是看log发现10s内执行了两次?

定时任务写法:
fsdf_background_scheduler.add_job(timing_publish_deco(xxx_spider), 'interval', id='xxx_add_queue', seconds=300)
fsdf_background_scheduler.add_job(timing_publish_deco(xxx_result), 'interval', id='xxx_result_queue', seconds=60)

日志:RedisConsumer--xxx_result_queue - "base_consumer.py:697" - INFO - 10 秒内执行了 2 次函数 [xxx_result ] ,函数平均运行耗时 0.009 秒

消费者:
xxx_spider.consume()
xxx_result.consume()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.