- Библиотека schedule – CRON на Python
- Celery
- Examples¶
- Run a job every x minute¶
- Use a decorator to schedule a job¶
- Pass arguments to a job¶
- Cancel a job¶
- Run a job once¶
- Get all jobs¶
- Cancel all jobs¶
- Get several jobs, filtered by tags¶
- Cancel several jobs, filtered by tags¶
- Run a job at random intervals¶
- Run a job until a certain time¶
- Time until the next execution¶
- Run all jobs now, regardless of their scheduling¶
Библиотека schedule – CRON на Python
Вам приходилось работать с CRON? Это такой сервис в nix-системах, который позволяет регулярно в определенные моменты времени запускать скрипты или программы. Штука с долгой историей, в наследство которой достался странный синтаксис для описания правил:
Что если бы мы хотели иметь свой CRON внутри программы Python, чтобы в нужные моменты времени вызывать функции? Да еще, чтобы у него был человеческий синтаксис? Такая библиотека есть и называется schedule.
import schedule import time def job(): print("Работаю") schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).minutes.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) # нужно иметь свой цикл для запуска планировщика с периодом в 1 секунду: while True: schedule.run_pending() time.sleep(1)
Как видите, правила для задания временных интервалов прекрасно читаются, словно они предложения на английском языке. Перевод пары примеров:
# спланируй.каждые(10).минут.сделать(работу) schedule.every(10).minutes.do(job) # спланируй.каждый().день.в(10:30).сделать(работу) schedule.every().day.at("10:30").do(job)
В задания можно передавать параметры вот так:
def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice')
Если по какой-то причине нужно отменить задание, это делается так:
def job1(): # возвращаем такой токен, и это задание снимается с выполнения в будущем return schedule.CancelJob schedule.every().day.at('22:30').do(job1)
Если нужно отменить группу заданий, то к ним добавляют тэги:
schedule.every().day.do(greet, 'Monica').tag('daily-tasks') schedule.every().day.do(greet, 'Derek').tag('daily-tasks') schedule.clear('daily-tasks') # массовая отмена по тэгу
Метод to позволяет задать случайный интервал для выполнения задания, например от 5 до 10 секунд:
schedule.every(5).to(10).seconds.do(my_job)
Библиотека сама не обрабатывает сама исключения в ваших задачах, поэтому, возможно, понадобится создать подкласс планировщика, как в этом примере. Или декоратор, который будет отменять работу, если произошло исключение. Вот так:
import functools # декоратор для ловли исключений def catch_exceptions(cancel_on_failure=False): def catch_exceptions_decorator(job_func): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper return catch_exceptions_decorator @catch_exceptions(cancel_on_failure=True) def bad_task(): # даст исключение, но декоратор просто отменит эту задачу return 1 / 0 schedule.every(5).minutes.do(bad_task)
Если задания занимают продолжительное время или должны выполняться параллельно, то вам самостоятельно придется организовать их выполнение в отдельных потоках.
import threading import time import schedule # код задания def job(): print("Выполняюсь в отдельном потоке") def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) # бесконечный цикл, проверяющий каждую секунду, не пора ли запустить задание while 1: schedule.run_pending() time.sleep(1)
Celery
Если вы используете в проекте Celery, то, вероятно, вам не нужен schedule. В Celery и так есть отличный CRON:
from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # эта функция выполнится при запуске - настроим вызовы задачи test sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') sender.add_periodic_task(30.0, test.s('world'), expires=10) # в 7 30 по понедельникам sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
Специально для канала @pyway. Подписывайтесь на мой канал в Телеграм @pyway 👈
Examples¶
Eager to get started? This page gives a good introduction to Schedule. It assumes you already have Schedule installed. If you do not, head over to Installation .
Run a job every x minute¶
import schedule import time def job(): print("I'm working. ") # Run job every 3 second/minute/hour/day/week, # Starting 3 second/minute/hour/day/week from now schedule.every(3).seconds.do(job) schedule.every(3).minutes.do(job) schedule.every(3).hours.do(job) schedule.every(3).days.do(job) schedule.every(3).weeks.do(job) # Run job every minute at the 23rd second schedule.every().minute.at(":23").do(job) # Run job every hour at the 42nd minute schedule.every().hour.at(":42").do(job) # Run jobs every 5th hour, 20 minutes and 30 seconds in. # If current time is 02:00, first execution is at 06:20:30 schedule.every(5).hours.at("20:30").do(job) # Run job every day at specific HH:MM and next HH:MM:SS schedule.every().day.at("10:30").do(job) schedule.every().day.at("10:30:42").do(job) schedule.every().day.at("12:42", "Europe/Amsterdam").do(job) # Run job on a specific day of the week schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
Use a decorator to schedule a job¶
Use the @repeat to schedule a function. Pass it an interval using the same syntax as above while omitting the .do() .
from schedule import every, repeat, run_pending import time @repeat(every(10).minutes) def job(): print("I am a scheduled job") while True: run_pending() time.sleep(1)
The @repeat decorator does not work on non-static class methods.
Pass arguments to a job¶
do() passes extra arguments to the job function
import schedule def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') from schedule import every, repeat @repeat(every().second, "World") @repeat(every().day, "Mars") def hello(planet): print("Hello", planet)
Cancel a job¶
To remove a job from the scheduler, use the schedule.cancel_job(job) method
import schedule def some_task(): print('Hello world') job = schedule.every().day.at('22:30').do(some_task) schedule.cancel_job(job)
Run a job once¶
Return schedule.CancelJob from a job to remove it from the scheduler.
import schedule import time def job_that_executes_once(): # Do some work that only needs to happen once. return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)
Get all jobs¶
To retrieve all jobs from the scheduler, use schedule.get_jobs()
import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs()
Cancel all jobs¶
To remove all jobs from the scheduler, use schedule.clear()
import schedule def greet(name): print('Hello <>'.format(name)) schedule.every().second.do(greet) schedule.clear()
Get several jobs, filtered by tags¶
You can retrieve a group of jobs from the scheduler, selecting them by a unique identifier.
import schedule def greet(name): print('Hello <>'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend')
Will return a list of every job tagged as friend .
Cancel several jobs, filtered by tags¶
You can cancel the scheduling of a group of jobs selecting them by a unique identifier.
import schedule def greet(name): print('Hello <>'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') schedule.clear('daily-tasks')
Will prevent every job tagged as daily-tasks from running again.
Run a job at random intervals¶
def my_job(): print('Foo') # Run every 5 to 10 seconds. schedule.every(5).to(10).seconds.do(my_job)
every(A).to(B).seconds executes the job function every N seconds such that A
Run a job until a certain time¶
import schedule from datetime import datetime, timedelta, time def job(): print('Boo') # run job until a 18:30 today schedule.every(1).hours.until("18:30").do(job) # run job until a 2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # Schedule a job to run for the next 8 hours schedule.every(1).hours.until(timedelta(hours=8)).do(job) # Run my_job until today 11:33:42 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # run job until a specific datetime schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
The until method sets the jobs deadline. The job will not run after the deadline.
Time until the next execution¶
Use schedule.idle_seconds() to get the number of seconds until the next job is scheduled to run. The returned value is negative if the next scheduled jobs was scheduled to run in the past. Returns None if no jobs are scheduled.
import schedule import time def job(): print('Hello') schedule.every(5).seconds.do(job) while 1: n = schedule.idle_seconds() if n is None: # no more jobs break elif n > 0: # sleep exactly the right amount of time time.sleep(n) schedule.run_pending()
Run all jobs now, regardless of their scheduling¶
To run all jobs regardless if they are scheduled to run or not, use schedule.run_all() . Jobs are re-scheduled after finishing, just like they would if they were executed using run_pending() .
import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # Add the delay_seconds argument to run the jobs with a number # of seconds delay in between. schedule.run_all(delay_seconds=10)