반응형
이전 글에서 task 1개로 이루어진 DAG를 실행해봤다. 이 task는 extract라는 파이썬 함수를 호출한다.
여기에다가 task 1개를 추가해보자. 이 task는 transform이라는 파이썬 함수를 호출한다.
transform은 extract함수가 return한 값을 처리하는 함수이다.
이런 경우, Airflow DAG를 어떻게 짜야할까?
Airflow XCom를 사용하면 된다.
- Task간 데이터를 주고 받을때 사용됨
- Airflow 웹UI에서 task instance 로그에서도 XCom으로 어떤 데이터를 주고받는지 확인 가능
- python task의 경우, python_callable로 호출한 함수에서 return하는 값이 있을 경우 자동으로 xcom_push() 가 실행됨. 즉 return 값이 자동으로 xcom에 저장됨.
- 다음 task에서 이전 task의 return 값을 사용하려면 xcom_pull()을 사용함
- provide_context=True로 설정해야 함
##############
#DAG Setting
##############
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = "xcomtest",
start_date = datetime(2021,1,31),
schedule_interval = '@once'
)
#############
#Python code
#############
import requests
import logging
# csv파일을 str로 저장
def extract(**context):
url = context["params"]["url"]
logging.info(url)
f = requests.get(url)
return (f.text)
# extract 함수에서 얻어온 text를 xcom_pull로 가져와 처리함
def transform(**context):
text = context['task_instance'].xcom_pull(task_ids='exec_extract')
lines = text.split("\n")
return lines
####################
# Dag Task Setting
####################
exec_extract = PythonOperator(
task_id = 'exec_extract',
python_callable = extract,
params={'url': 'https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv'},
provide_context=True,
dag = dag
)
exec_transform = PythonOperator(
task_id = 'exec_transform',
python_callable = transform,
provide_context=True,
dag = dag
)
exec_extract >> exec_transform
exec_extract Task 로그를 확인해보자. (extract 파이썬 함수를 호출하는 task)
- XCom에 return값(f.text)이 저장되어 있는 것을 확인할 수 있다.
- 원래는 xcom_push() 함수로 데이터를 XCom에 저장해야 되는데
- 위에 적어놓은대로 python_callable로 함수를 호출하면, 자동으로 xcom_push()가 호출되어 XCom에 return값이 저장된다.
반응형
'Side Project > Airflow로 ETL 구축하기' 카테고리의 다른 글
[Airflow] Airflow 서버에서 S3 접근하기 (S3Hook) (4) | 2021.03.13 |
---|---|
[Airflow] python 함수 호출시 argument 넘기기 (4) | 2021.02.19 |
Airflow 스케쥴시간 설정 (execution_date, start_date) (3) | 2021.01.31 |
Airflow 설치 및 DAG 실행하기 (4) | 2021.01.30 |
댓글