为编程爱好者分享易语言教程源码的资源网
好用的代理IP,游戏必备 ____广告位招租____ 服务器99/年 ____广告位招租____ ____广告位招租____ 挂机,建站服务器
好用的代理IP,游戏必备 ____广告位招租____ 服务器低至38/年 ____广告位招租____ ____广告位招租____ 挂机,建站服务器

网站首页 > 网络编程 > 其它综合 正文

Python Tornado集成Apscheduler定时任务(oracle创建定时任务)

三叶资源网 2022-10-20 19:18:50 其它综合 326 ℃ 0 评论

熟悉Python的人可能都知道,Apscheduler是python里面一款非常优秀的任务调度框架,这个框架是从鼎鼎大名的Quartz移植而来。

之前有用过Flask版本的Apscheduler做定时任务。刚好前不久接触了Tornado,顺便玩玩Tornado版本的Apscheduler。

本篇做了一个简单的Wdb页面,用于添加和删除定时任务,小伙伴们可以基于这个做一些扩展,比如把定时定时任务写入数据库,改变cron规则等等。

主要功能点如下:

#新增任务(需要动态改变job_id的值)

http://localhost:8888/scheduler?job_id=1&action=add

#删除任务(需要动态改变job_id的值)

http://localhost:8888/scheduler?job_id=1&action=remov

执行结果可以在console看到

from datetime import datetime
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import RequestHandler, Application
from apscheduler.schedulers.tornado import TornadoScheduler


scheduler = None
job_ids   = []

# 初始化
def init_scheduler():
    global scheduler
    scheduler = TornadoScheduler()
    scheduler.start()
    print('[Scheduler Init]APScheduler has been started')

# 要执行的定时任务在这里
def task1(options):
    print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))


class MainHandler(RequestHandler):
    def get(self):
        self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')


class SchedulerHandler(RequestHandler):
    def get(self):
        global job_ids
        job_id = self.get_query_argument('job_id', None)
        action = self.get_query_argument('action', None)
        if job_id:
            # add
            if 'add' == action:
                if job_id not in job_ids:
                    job_ids.append(job_id)
                    scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))
                    self.write('[TASK ADDED] - {}'.format(job_id))
                else:
                    self.write('[TASK EXISTS] - {}'.format(job_id))
            # remove
            elif 'remove' == action:
                if job_id in job_ids:
                    scheduler.remove_job(job_id)
                    job_ids.remove(job_id)
                    self.write('[TASK REMOVED] - {}'.format(job_id))
                else:
                    self.write('[TASK NOT FOUND] - {}'.format(job_id))
        else:
            self.write('[INVALID PARAMS] INVALID job_id or action')


if __name__ == "__main__":
    routes    = [
        (r"/", MainHandler),
        (r"/scheduler/?", SchedulerHandler),
    ]
    init_scheduler()
    app       = Application(routes, debug=True)
    app.listen(8888)
    IOLoop.current().start()

来源:三叶资源网,欢迎分享,公众号:iisanye,(三叶资源网⑤群:21414575

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

百度站内搜索
关注微信公众号
三叶资源网⑤群:三叶资源网⑤群

网站分类
随机tag
失败代码按键模拟发送邮件无限加好友API例程EDB数据库字库验证码识别MCI指令LOL无限视距md5效验寻找文件ETCP界面设计express万能快递单打印防撤回系统服务键盘敲音乐异步套接字源码批量下载
最新评论