Python 비동기 스케줄링
서론
지속적으로 데이터를 수집하고 학습되는 모델을 관리해야하는 ML 엔지니어라면, 데이터 수집, 전처리, 모델 학습, 추론 등 실행 시간이 몇 초부터 몇 시간에 이르는 작업들을 관리하기 위해 비동기 스키줄링을 구현해야 할 필요가 있습니다. 이 글에서는 Python을 통한 비동기 프로그래밍과 비동기 스케줄링에 대한 개념과 간단한 구현을 진행해봅니다.
I/O 작업 요약
)](Python%20%EB%B9%84%EB%8F%99%EA%B8%B0%20%EC%8A%A4%EC%BC%80%EC%A4%84%EB%A7%81/Untitled.png)
I/O models ([출처](https://black7375.tistory.com/90?fbclid=IwAR0s7vbA0G3tC42sIl0ocq2oAHWELXblgWEyyXsI2zhaNwibMe3UzdPbaLY_aem_AUj7xGIBeJWM1Wa1xUy899IlN_A6numNnfCU95poLpFxylCrkhMPWh4vNPUMVnVd8YU))
I/O 작업(Input/Output 작업)은 데이터를 입력하고 결과를 출력하는 모든 프로세스를 통칭하는 개념이며, 파일 시스템 작업, 네트워크 요청, DB 쿼리 등의 작업을 포함합니다. User space의 프로세스가 I/O 작업을 요청했을 경우, 시스템 호출이 어떤 방식으로 이루어지는지에 대해 blocking/non-blocking, Synchronous/Asynchronous 로 구분해 4가지의 케이스로 구분할 수 있습니다.
- blocking: I/O 작업이 진행되는 동안 다른 작업을 수행할 수 없음
- non-blocking: I/O 작업이 완료되지 않더라도 다음 작업으로 넘어갈 수 있음
- Synchronous: 한 작업이 완료된 후 순차적으로 다음 작업 진행
- Asynchronous: 한 작업이 완료되지 않더라도 다음 작업 진행 가능
비동기 프로그래밍
동기 프로그래밍은 작업을 순차적으로 실행하며, 한 작업이 완료될 때까지 다음 작업이 대기하는 방식입니다. 반면, 비동기 프로그래밍은 여러 작업이 동시에 실행될 수 있으며, 각 작업은 서로의 완료를 기다리지 않습니다. 즉, non-blocking & Asynchronous 영역의 프로그래밍 방식이며, 주로 싱글 스레드 환경에서 사용됩니다(멀티 스레딩과는 다른 개념).
Python에서는 asyncio 모듈을 통해 비동기 프로그래밍을 구현할 수 있습니다. asyncio는 비동기 I/O, 이벤트 루프, 코루틴 및 태스크를 지원하며, async와 await 를 사용해 비동기 함수를 쉽게 정의하고 관리할 수 있습니다.
- 이벤트 루프(Event Loop)
- 프로그램 실행 동안 발생하는 이벤트를 관리하고, 프로그램의 메인 루프를 제어
asyncio.run()함수를 사용해 자동으로 이벤트 루프를 생성하고 시작할 수 있음
- 코루틴(Coroutine)
- 비동기 프로그래밍에서 사용되는 동시 실행 가능한 코드의 단위
- 비동기 작업의 시작점이며, 이벤트 루프에 의해 실행되고 관리됨
async def로 정의되며,await표현식을 사용하여 실행을 일시 중단하고, 이벤트 루프가 다른 작업을 수행할 수 있도록 허용
- 태스크(Task)
- 코루틴을 이벤트 루프에서 실행할 수 있도록 래핑하는 클래스
- 코루틴을 태스크로 스케줄링함으로써, 코루틴이 비동기적으로 실행될 수 있음
asyncio.create_task()함수를 사용하여 코루틴을 태스크로 만듦
비동기 스케줄링
스케줄링은 작업을 특정 시간에 실행되도록 예약하는 것이고, Python에서는 APScheduler 라이브러리를 통해 스케줄링을 구현할 수 있습니다. APScheduler 에서 제공하는 AsyncIOScheduler와 asyncio를 함께 사용하면 비동기 스케줄링을 구현할 수 있습니다. AsyncIOScheduler는 APScheduler의 비동기 스케줄러로, asyncio 이벤트 루프와 함께 작동하도록 설계되어 있습니다.
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio
import time
async def async_task():
print("메인 비동기 작업: ", time.strftime("%Y-%m-%d %H:%M:%S"))
await asyncio.sleep(2)
print("메인 비동기 작업 완료: ", time.strftime("%Y-%m-%d %H:%M:%S"))
async def additional_async_task():
while True:
print("추가 비동기 작업: ", time.strftime("%Y-%m-%d %H:%M:%S"))
await asyncio.sleep(1)
async def main():
scheduler = AsyncIOScheduler()
scheduler.add_job(async_task, 'interval', seconds=5)
scheduler.start()
task = asyncio.create_task(additional_async_task())
try:
await task
except asyncio.CancelledError:
pass
if __name__ == '__main__':
asyncio.run(main())
#### 결과 예시 ####
# 추가 비동기 작업: 2024-03-17 01:37:17
# 추가 비동기 작업: 2024-03-17 01:37:18
# 추가 비동기 작업: 2024-03-17 01:37:19
# 추가 비동기 작업: 2024-03-17 01:37:20
# 추가 비동기 작업: 2024-03-17 01:37:21
# 메인 비동기 작업: 2024-03-17 01:37:22
# 추가 비동기 작업: 2024-03-17 01:37:22
# 추가 비동기 작업: 2024-03-17 01:37:23
# 메인 비동기 작업 완료: 2024-03-17 01:37:24
# 추가 비동기 작업: 2024-03-17 01:37:24
# 추가 비동기 작업: 2024-03-17 01:37:25
# ...
async_task 함수는 2초 동안 비동기적으로 대기하도록 하며, AsyncIOScheduler를 통해 5초 간격으로 실행되도록 했습니다.additional_async_task 함수는 1초 간격으로 작업이 이루어지는 태스크를 모방하도록 했습니다.asyncio의 create_task 메서드를 사용하여 additional_async_task를 반복적으로 실행하는 새로운 비동기 작업을 생성했고, 코드를 실행하면 두 작업이 비동기로 실행되는 것을 확인할 수 있습니다.
References
- https://developer.ibm.com/articles/l-async/
- https://scoutapm.com/blog/async-javascript
- https://tolerblanc.github.io/os/sync-async-blocking-nonblocking/
- https://black7375.tistory.com/90?fbclid=IwAR0s7vbA0G3tC42sIl0ocq2oAHWELXblgWEyyXsI2zhaNwibMe3UzdPbaLY_aem_AUj7xGIBeJWM1Wa1xUy899IlN_A6numNnfCU95poLpFxylCrkhMPWh4vNPUMVnVd8YU
- https://www.bytesizeponderings.com/asynchronous-programming-1
- ChatGPT