Airflow 시작하기

데이터 추출, 가공, 저장, 분석 파이프라인을 자동화하기 위해 위해 Airflow를 빠르게 배워보고자 강의를 시청했다. 강의는 Windows OS 기준으로 진행되었지만, 본 글은 Mac OS를 사용해 진행했다. 3강까지 진행하면서, 필요한 개념과 몇몇 이슈들을 해결한 과정을 함께 정리한다.

1. 환경설정

  • Prerequisite:
    • docker (airflow를 docker로 설치하는 방법으로 진행)
    • docker compose
    • vscode
  • 강의는 windows를 기준으로 진행하지만, 본 문서에서는 Mac 기준 진행함
  • 작업 폴더 내에 Dockerfile을 아래와 같이 작성한 후 이미지 빌드
    • 커맨드: docker build -t airflow-tutorial .
FROM apache/airflow:latest-python3.12
USER root

RUN apt-get update && \\
apt-get -y install git && \\
apt-get clean

USER airflow

ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages"
ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"

  • docker-compose.yaml 을 아래와 같이 작성하고 compose up
    • 커맨드: docker compose -f "docker-compose.yaml" up -d --build
version: '3'

services:
	airflow_tutorial:
		image: airflow-tutorial:latest
		volumes:
			- ./airflow:/opt/airflow
		ports:
			- "8080:8080"
		command: airflow standalone

  • http://localhost:8080/ 로 접속해 로그인
    • id: admin
    • pass: standalone_admin_password.txt 파일에 적혀있음
  • 종료: docker compose down

2. DAG 기초

  • alrflow/dags/welcome_dag.py 파일 생성 후 아래와 같이 작성
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime
import requests

def print_welcome():
	print('Welcome to Airflow!')

def print_date():
	print('Today is {}'.format(datetime.today().date()))

def print_random_quote():
	response = requests.get('<https://api.quotable.io/random>')
	quote = response.json()['content']
	print('Quote of the day: "{}"'.format(quote))

dag = DAG(
	'welcome_dag',
	default_args={'start_date': days_ago(1)},
	schedule_interval='0 23 * * *', # 매일 오후 11시에 DAG 실행
	catchup=False
	)

print_welcome_task = PythonOperator(
	task_id='print_welcome',
	python_callable=print_welcome,
	dag=dag
	)

print_date_task = PythonOperator(
	task_id='print_date',
	python_callable=print_date,
	dag=dag
	)

print_random_quote = PythonOperator(
	task_id='print_random_quote',
	python_callable=print_random_quote,
	dag=dag
	)

# Set the dependencies between the tasks
print_welcome_task >> print_date_task >> print_random_quote

  • Airflow가 실행되어있는 상태애서 dag를 추가하고 조금 시간이 지나면 UI에서 DAG 확인 가능
    • UI 상에서 직접 트리거 가능
    • UI의 Logs 에서 결과 확인

3. Provider VS Operator VS Hooks

  • Coverage: Provider ⊃ Operator, Hooks
  • Provider: 외부 시스템과의 연동을 가능하게 하는 플러그인 형태의 모듈
  • Operator: DAGs 내에서 개별 작업을 정의하고, 재사용 가능한 작업 단위를 제공
    • Action: 특정 작업을 실행
      • ex) PythonOperator, BashOperator 등
    • Transfer: 데이터를 옮김
      • ex) S3ToRedshiftOperator, LocalFilesystemToGCSOperator, S3ToGCSOperator 등
    • Sensor: 특정 조건이 충족될 때까지 대기
      • ex) S3KeySensor, RedshiftClusterSensor 등
    • 참고: Operator는 Blueprint (Python의 Class), Task는 Implementation (Python의 Object)에 해당
    • Operator 사용 예시
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(dag_id='bash_operator_dag', start_date=datetime (2023, 8, 1),
schedule_interval="@daily") as dag:
	task1 = BashOperator (
		task_id='command_example',
		bash_command='echo "Airflow is running!"*
	)

	task2 = BashOperator(
		task_id='execute_script',
		bash_command='/path/to/your/script.sh',
		env={'ENV_VAR': 'value'}
	)

	first_task >> second_task

  • Hook: 외부 시스템과의 인터페이스를 추상화하는 클래스
    • Hook 사용 예시
from airflow.providers.postgres.hooks.postgres import PostgresHook

pg_hook = PostgresHook(conn_id='my_postgres_connection')
result = pg_hook.get_records 'SELECT * FROM table')

4. Task, DAGs, airflow.cfg

  • airflow.cfg 파일에서 airflow 운용을 위한 전반적인 설정을 커스텀할 수 있음
    • 예를 들어 smtp 서버나 dags folder 같은 것들
  • tasks 간 dependency를 만드는 방법 4 가지
# 1. Bitshift operations (>>, <<)
task1 >> task2 >> task3

# 2. set-upstream and set-downstream function
task1.set_upstream(task2)
task3.set_downstream(task2)

# 3. Chain funetion:
chain(task1, task2, task3)

# 4. TaskFlow API: dependencies are automatically inferred based on the sequence of task function calls)
task1()
task2()
task3()

  • 첫 번째 예제: dags/exchange_rate_pipeline.py 파일 작성 (여러 Operator 활용해보는 목적)
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from hooks.clean_data import clean_data

# Define or Instantiate DAG
dag = DAG(
	'exchange_rate_etl',
	start_date=datetime(2023, 10, 1),
	end_date=datetime(2023, 12, 31),
	schedule_interval='0 22 * * *',
	default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
	catchup=False
	)

# Define or Instantiate Tasks
download_task = BashOperator(
	task_id='download_file',
	bash_command='curl -o xrate.csv <https://data-api.ecb.europa.eu/service/data/EXR/M.USD.EUR.SP00.A?format=csvdata>',
	cwd='/tmp',
	dag=dag,
	)

clean_data_task = PythonOperator(
	task_id='clean_data',
	python_callable=clean_data,
	dag=dag,
	)

send_email_task = EmailOperator(
	task_id='send_email',
	to='sleekdatasolutions@gmail.com',
	subject='Exchange Rate Download - Successful',
	html_content='The Exchange Rate data has been successfully downloaded, cleaned, and loaded.',
	dag=dag,
	)

# Define Task Dependencies
download_task >> clean_data_task >> send_email_task

  • airflow/plugins/hooks/clean_data.py 파일 작성
import os
import pandas as pd
from airflow.hooks.base_hook import BaseHook

class CleanDataHook(BaseHook):

	def __init__(self, path='/tmp/xrate.csv'):
		self.path = path

	def clean_data(self):
		data = pd.read_csv(self.path, header=None)
		default_values = {
			int: 0,
			float: 0.0,
			str: '',
		}
		cleaned_data = data.fillna(value=default_values)

		now = pd.Timestamp.now()
		year = now.year
		month = now.month
		day = now.day

		data_dir = f'/opt/airflow/data/xrate_cleansed/{year}/{month}/{day}'
		os.makedirs(data_dir, exist_ok=True)

		cleaned_data.to_csv(f'{data_dir}/xrate.csv', index=False)

FROM apache/airflow:latest-python3.12
USER root

RUN apt-get update && \\
apt-get -y install git && \\
apt-get clean

USER airflow

COPY requirements.txt /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt

ENV PYTHONPATH="${PYTHONPATH}:/home/airflow/.local/lib/python3.12/site-packages"
ENV PYTHONPATH="${PYTHONPATH}:/opt/airflow/plugins"

  • docker-compose.yaml 수정
version: '3'

services:
	airflow_tutorial:
		image: airflow-tutorial:latest
		volumes:
			- ./airflow:/opt/airflow
		ports:
			- "8080:8080"
		command: airflow standalone
	my_smtp:
		image: ixdotai/smtp
		restart: always

  • airflow.cfg 수정
...
smtp_host = my_smtp
...

  • requirements.txt 작성
pandas

  • 강의의 bytemark/smtp 이미지는 ARM 아키텍처 기반 운영체제에서 동작하지 않으니 수정하고 진행
  • smtp가 포함된 DAG를 실행시킨 후 docker compose down 시 리소스를 계속 사용 중이라는 메시지
    • compose up 시 다음과 같이 옵션 주기: docker compose -f "docker-compose.yaml" up -d --build --remove-orphans
  • 두 번째 예제: git_repo_dag.py 파일 작성 (connection 설정을 위한 예제)
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.github.operators.github import GithubOperator
from airflow.operators.dummy import DummyOperator
import logging

# Define the DAG
dag = DAG(
	'git_repo_dag',
	default_args={'start_date': days_ago(1)},
	schedule_interval='0 21 * * *',
	catchup=False
	)

# Start Dummy Operator
start = DummyOperator(task_id='start', dag=dag)

# List GitRepository Tags
list_repo_tags = GithubOperator(
	task_id="list_repo_tags",
	github_method="get_repo",
	github_method_args={"full_name_or_id": "ycseong07/minimal-streamlit-lecture"},
	result_processor=lambda repo: logging.info(list(repo.get_tags())),
	dag=dag,
	)

# End Dummy Operator
end = DummyOperator(task_id='end', dag=dag)

# Define task dependencies
start >> list_repo_tags >> end

  • requirements.txt 수정
pandas
apache-airflow-providers-github

  • UI 상에서 Admin - Connections 에서 아래와 같이 connection 추가
    • Connection Id: github_default
    • Connection Type: Github
    • GitHub Access Token: xxxxx

이슈 리스트

  • bytemark/smtp 이미지가 Mac OS(ARM)에서 동작하지 않음
    • ixdotai/smtp 이미지로 대체
  • plugins/clean_data.py 를 import해오지 못하는 문제
  • 이메일이 전송되지 않음. bytemark/smtp 이미지와 ixdotai/smtp 이미지의 동작방식이 다른 것 같은데, 이건 따로 구글 이메일로 발송 시키는 방법에 대한 자료가 많아 우선 패스함

References


© 2024. All rights reserved.

Powered by Hydejack v9.2.1