Python 비동기 스케줄링

서론

지속적으로 데이터를 수집하고 학습되는 모델을 관리해야하는 ML 엔지니어라면, 데이터 수집, 전처리, 모델 학습, 추론 등 실행 시간이 몇 초부터 몇 시간에 이르는 작업들을 관리하기 위해 비동기 스키줄링을 구현해야 할 필요가 있습니다. 이 글에서는 Python을 통한 비동기 프로그래밍과 비동기 스케줄링에 대한 개념과 간단한 구현을 진행해봅니다.

I/O 작업 요약

 I/O models ([출처](https://black7375.tistory.com/90?fbclid=IwAR0s7vbA0G3tC42sIl0ocq2oAHWELXblgWEyyXsI2zhaNwibMe3UzdPbaLY_aem_AUj7xGIBeJWM1Wa1xUy899IlN_A6numNnfCU95poLpFxylCrkhMPWh4vNPUMVnVd8YU))

                                                                                        I/O models ([출처](https://black7375.tistory.com/90?fbclid=IwAR0s7vbA0G3tC42sIl0ocq2oAHWELXblgWEyyXsI2zhaNwibMe3UzdPbaLY_aem_AUj7xGIBeJWM1Wa1xUy899IlN_A6numNnfCU95poLpFxylCrkhMPWh4vNPUMVnVd8YU))

I/O 작업(Input/Output 작업)은 데이터를 입력하고 결과를 출력하는 모든 프로세스를 통칭하는 개념이며, 파일 시스템 작업, 네트워크 요청, DB 쿼리 등의 작업을 포함합니다. User space의 프로세스가 I/O 작업을 요청했을 경우, 시스템 호출이 어떤 방식으로 이루어지는지에 대해 blocking/non-blocking, Synchronous/Asynchronous 로 구분해 4가지의 케이스로 구분할 수 있습니다.

  • blocking: I/O 작업이 진행되는 동안 다른 작업을 수행할 수 없음
  • non-blocking: I/O 작업이 완료되지 않더라도 다음 작업으로 넘어갈 수 있음
  • Synchronous: 한 작업이 완료된 후 순차적으로 다음 작업 진행
  • Asynchronous: 한 작업이 완료되지 않더라도 다음 작업 진행 가능

비동기 프로그래밍

동기 프로그래밍은 작업을 순차적으로 실행하며, 한 작업이 완료될 때까지 다음 작업이 대기하는 방식입니다. 반면, 비동기 프로그래밍은 여러 작업이 동시에 실행될 수 있으며, 각 작업은 서로의 완료를 기다리지 않습니다. 즉, non-blocking & Asynchronous 영역의 프로그래밍 방식이며, 주로 싱글 스레드 환경에서 사용됩니다(멀티 스레딩과는 다른 개념).

Python에서는 asyncio 모듈을 통해 비동기 프로그래밍을 구현할 수 있습니다. asyncio는 비동기 I/O, 이벤트 루프, 코루틴 및 태스크를 지원하며, asyncawait 를 사용해 비동기 함수를 쉽게 정의하고 관리할 수 있습니다.

  • 이벤트 루프(Event Loop)
    • 프로그램 실행 동안 발생하는 이벤트를 관리하고, 프로그램의 메인 루프를 제어
    • asyncio.run() 함수를 사용해 자동으로 이벤트 루프를 생성하고 시작할 수 있음
  • 코루틴(Coroutine)
    • 비동기 프로그래밍에서 사용되는 동시 실행 가능한 코드의 단위
    • 비동기 작업의 시작점이며, 이벤트 루프에 의해 실행되고 관리됨
    • async def로 정의되며, await 표현식을 사용하여 실행을 일시 중단하고, 이벤트 루프가 다른 작업을 수행할 수 있도록 허용
  • 태스크(Task)
    • 코루틴을 이벤트 루프에서 실행할 수 있도록 래핑하는 클래스
    • 코루틴을 태스크로 스케줄링함으로써, 코루틴이 비동기적으로 실행될 수 있음
    • asyncio.create_task() 함수를 사용하여 코루틴을 태스크로 만듦

비동기 스케줄링

스케줄링은 작업을 특정 시간에 실행되도록 예약하는 것이고, Python에서는 APScheduler 라이브러리를 통해 스케줄링을 구현할 수 있습니다. APScheduler 에서 제공하는 AsyncIOSchedulerasyncio를 함께 사용하면 비동기 스케줄링을 구현할 수 있습니다. AsyncIOScheduler는 APScheduler의 비동기 스케줄러로, asyncio 이벤트 루프와 함께 작동하도록 설계되어 있습니다.

from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio
import time

async def async_task():
    print("메인 비동기 작업: ", time.strftime("%Y-%m-%d %H:%M:%S"))
    await asyncio.sleep(2)
    print("메인 비동기 작업 완료: ", time.strftime("%Y-%m-%d %H:%M:%S"))

async def additional_async_task():
    while True: 
        print("추가 비동기 작업: ", time.strftime("%Y-%m-%d %H:%M:%S"))
        await asyncio.sleep(1) 

async def main():
    scheduler = AsyncIOScheduler()
    scheduler.add_job(async_task, 'interval', seconds=5)
    
    scheduler.start()
    
    task = asyncio.create_task(additional_async_task())

    try:
        await task
    except asyncio.CancelledError:
        pass

if __name__ == '__main__':
    asyncio.run(main())

#### 결과 예시 ####

# 추가 비동기 작업:  2024-03-17 01:37:17
# 추가 비동기 작업:  2024-03-17 01:37:18
# 추가 비동기 작업:  2024-03-17 01:37:19
# 추가 비동기 작업:  2024-03-17 01:37:20
# 추가 비동기 작업:  2024-03-17 01:37:21
# 메인 비동기 작업:  2024-03-17 01:37:22
# 추가 비동기 작업:  2024-03-17 01:37:22
# 추가 비동기 작업:  2024-03-17 01:37:23
# 메인 비동기 작업 완료:  2024-03-17 01:37:24
# 추가 비동기 작업:  2024-03-17 01:37:24
# 추가 비동기 작업:  2024-03-17 01:37:25
# ...

async_task 함수는 2초 동안 비동기적으로 대기하도록 하며, AsyncIOScheduler를 통해 5초 간격으로 실행되도록 했습니다.additional_async_task 함수는 1초 간격으로 작업이 이루어지는 태스크를 모방하도록 했습니다.asynciocreate_task 메서드를 사용하여 additional_async_task를 반복적으로 실행하는 새로운 비동기 작업을 생성했고, 코드를 실행하면 두 작업이 비동기로 실행되는 것을 확인할 수 있습니다.

References


© 2024. All rights reserved.

Powered by Hydejack v9.2.1