摘要:celery,rabbitmq,django,db,计划任务,后台管理 。基本的命令和代码。
celery,rabbitmq,django,db,计划任务,后台管理 。基本的命令和代码。
0. 安装
0.1 install
1. celery 基础命令
1.1. 启动 celery
1.2. 后台进程启动celery worker
1.3. 重启 celery worker 后台进程
1.4. 停止 celery worker 后台进程
1.5. 等待 停止 celery worker 后台进程
1.6. 指定 pidfile & logfile
1.7. 启动 多个 worker 并且指定不同参数
1.8. 手动杀死所有worker进程
1.9. 较完整的celery的启动命令
2. 管理相关命令
2.1. 启动 flower
2.2. 需要 目录
2.3. 使用 librabbitmq
0.1 install
sudoapt-getinstallbuild-essentialpython-dev sudopipinstallcelery sudopipinstalllibrabbitmq sudopipinstallflower sudoapt-getinstallrabbitmq-server sudorabbitmq-pluginsenablerabbitmq_management sudoservicerabbitmq-serverrestart sudorabbitmq-pluginsdisablerabbitmq_management
访问 http://host:15672 即可进入管理界面。
默认用户名,密码都是 guest
1.1. 启动 celery
celery-Ataskworker-linfo
启动时指定要使用的 queue
celery-Ataskworker-linfo-Qbrand_queue,new_queue-E
1.2. 后台进程启动celery worker
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue-E
可通过命令查看后台启动的进程:
ps-aux|grepcelery [celeryd:w1@x:MainProcess]-active-(worker-E-Atask-linfo-Qbrand_queue,new_queue--logfile=w1.log--pidfile=w1.pid--hostname=w1@x)
可以看到默认添加了几个参数:
--logfile=w1.log默认在当前文件夹新建w1.log文件 --pidfile=w1.pid默认在当前文件夹新建w1.pid文件 --hostname=w1@x默认实例名woker_name/机器名
1.3. 重启 celery worker 后台进程
celerymultirestartw1-Atask-linfo-Qbrand_queue,new_queue-E
1.4. 停止 celery worker 后台进程
celerymultistopw1-Atask-linfo-Qbrand_queue,new_queue-E
stop 命令是异步的,worker 会立即停止,即使当时仍然有任务在执行,
并且不会写停止worker相关的日志
1.5. 等待 停止 celery worker 后台进程
celerymultistopwaitw1-Atask-linfo-Qbrand_queue,new_queue-E
这个停止命令,会等待正在运行的所有任务都完成再停止。
1.6. 指定 pidfile & logfile
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue-E--pidfile=/var/www/api/space/run/%n.pid--logfile=/var/www/api/space/logs/%n%I.log
1.7. 启动 多个 worker 并且指定不同参数
celerymultistart10-Atask-linfo-E-Q:1-3images,video-Q:4,5data-Qdefault-L:4,5debug
启动了10个worker:
worker 1,2,3 使用了队列 images, video
worker 4,5 使用了队列 data
worker 其他 使用了队列 default
-L 是什么参数?
1.8. 手动杀死所有worker进程
psauxww|grep\'celeryworker\'|awk\'{print$2}\'|xargskill-9
1.9. 较完整的celery的启动命令
celerymultistartw1-Atask-linfo-Qbrand_queue,new_queue,time_queue,cron_queue-E-B-s/var/www/api/space/run/celerybeat-schedule--pidfile=/var/www/api/space/run/%n.pid--logfile=/var/www/api/space/logs/%n%I.log
2.1 启动 flower
celeryflower--broker=amqp://tiger:tiger@192.168.0.6:5672/vr_tiger--address=192.168.0.4--port=5555--broker_api=http://tiger:tiger@192.168.0.6:15672/api/--basic_auth=tiger:tiger rabbitmq主机地址:192.168.0.6 本机地址:192.168.0.4 本地监听端口:5555
2.2 需要目录
run/ log/
2.3 使用 librabbitmq
Ifyou’reusingRabbitMQ(AMQP)asthebrokerthenyoucaninstallthelibrabbitmqmoduletouseanoptimizedclientwritteninC: $pipinstalllibrabbitmq
注意
1. 后台运行 celery 的是否,worker 信息不会保存。所以,每次对 worker 操作时都需要加上相同的参数。特别是 pidfile 和 logfile 需要相同。
2. 我没有使用 celery 提供的任务结果存储。我在业务中自己处理 过程及结果。
代码示例
配置文件:
#-*-coding=utf-8-*- #FileName:task_conf.py from__future__importabsolute_import fromceleryimportCelery fromdatetimeimporttimedelta fromcelery.schedulesimportcrontab \'\'\' task_random:是任务的名称 broker:通过amqp://用户名:密码@ip/虚拟主机连接amqp include:任务程序 \'\'\' #消息队列配置 mq_host=\'192.168.0.6\' mq_name=\'tiger\' mq_pass=\'tiger\' mq_vr=\'vr_tiger\' broker=\'amqp://%s:%s@%s/%s\'%(mq_name,mq_pass,mq_host,mq_vr) #初始化app app=Celery(\'name_wash\',broker=broker,include=[\'task\']) #指定任务存储队列 app.conf.update( CELERY_ROUTES={ \'task.exe_task\':{\'queue\':\'brand_queue\'}, \'task.task_sms_send\':{\'queue\':\'new_queue\'}, \'task.task_sec\':{\'queue\':\'time_queue\'}, \'task.task_cron\':{\'queue\':\'cron_queue\'} }, CELERYBEAT_SCHEDULE={ \'exe-every-10-seconds\':{ \'task\':\'task.task_sec\', \'schedule\':timedelta(seconds=30), \'args\':[1], }, \'add-every-monday-morning\':{ \'task\':\'task.task_cron\', \'schedule\':crontab(hour=15,minute=47,day_of_week=5), \'args\':(15232897835,), }, }, #CELERY_TASK_SERIALIZER=\'json\', #CELERY_ACCEPT_CONTENT=[\'json\'],#Ignoreothercontent #CELERY_RESULT_SERIALIZER=\'json\', CELERY_EVENT_QUEUE_TTL=5, CELERY_TIMEZONE=\'Asia/Shanghai\', CELERY_ENABLE_UTC=True, CELERY_DISABLE_RATE_LIMITS=True, CELERY_IGNORE_RESULT=True ) if__name__==\'__main__\': app.start()
任务文件:
#-*-coding=utf-8-*- #FileName:task.py \'\'\' task \'\'\' from__future__importabsolute_import importtime importtraceback fromjob.task_confimportapp @app.task(ignore_result=True) defexe_task(task_id,number): \'\'\'根据参数执行任务\'\'\' try: print\'exetask:\',task_id time.sleep(number) except: traceback.print_exc() return(task_id,number,-1) return\'true:)\' @app.task deftask_sms_send(mobile,content): \'\'\'任务-发送短信\'\'\' try: print\'sendsms:mobile->%s,content->%s\'%(mobile,content) except: traceback.print_exc() return\'Fail:(\' return\'Success:)\' @app.task deftask_sec(mobile): \'\'\'测试任务时间定制\'\'\' try: print\'sendsms:mobile->%s.\'%mobile except: traceback.print_exc() return\'F\' return\'S\' @app.task deftask_cron(mobile): \'\'\'测试任务时间定制Cron\'\'\' try: print\'sendsms:mobile->%s.\'%mobile except: traceback.print_exc() return\'F-cron\' return\'S-cron\' defmain(): res=exe_task(2,2) print\'res:\',res if__name__==\'__main__\': main()
添加任务代码:
#-*-coding=utf-8-*- #FileName:add_task.py importtime importtraceback importrandom fromtaskimportexe_task,task_sms_send defaction(): tries=0 while1: try: tries+=1 iftries>=20: break task_id=tries number=random.randint(1,5) exe_task.apply_async(args=[task_id,number],queue=\'brand_queue\') print\'addedonetask\' time.sleep(1) except: traceback.print_exc() pass print\'addtaskdone\' defadd_task_by_django(task_id,number): \'\'\'测试从django添加任务\'\'\' exe_task.apply_async(args=[task_id,number],queue=\'brand_queue\') defadd_task_sms(mobile,content): \'\'\'添加发送短信任务\'\'\' #列表参数或者字典参数 #task.apply_async(args=[arg1,arg2],kwargs={\'kwarg1\':\'x\',\'kwarg2\':\'y\'}) task_sms_send.apply_async(args=[mobile,content],queue=\'new_queue\') print\'taskadded:sms\' defmain(): action() if__name__==\'__main__\': main()
相关文章推荐
虚拟主机的专业参数,分别都是什么意思?2022-09-09
中非域名注册规则是怎样的?注册域名有什么用处? 2022-01-10
HostEase新年活动促销 美国/香港主机全场低至五折2021-12-28
HostGator下载完整备份教程分享2021-12-28
Flink中有界数据与无界数据的示例分析2021-12-28