본문 바로가기
Side Project/Airflow로 ETL 구축하기

[Airflow] task에서 return한 값 사용하기 (XCom)

by 잇서니 2021. 2. 19.
반응형

 

 

이전 글에서 task 1개로 이루어진 DAG를 실행해봤다. 이 task는 extract라는 파이썬 함수를 호출한다.

여기에다가 task 1개를 추가해보자. 이 task는 transform이라는 파이썬 함수를 호출한다.

transform은 extract함수가 return한 값을 처리하는 함수이다.

이런 경우, Airflow DAG를 어떻게 짜야할까?

 

Airflow XCom를 사용하면 된다.

  • Task간 데이터를 주고 받을때 사용됨
  • Airflow 웹UI에서 task instance 로그에서도 XCom으로 어떤 데이터를 주고받는지 확인 가능
  • python task의 경우, python_callable로 호출한 함수에서 return하는 값이 있을 경우 자동으로 xcom_push() 가 실행됨. 즉 return 값이 자동으로 xcom에 저장됨.
  • 다음 task에서 이전 task의 return 값을 사용하려면 xcom_pull()을 사용함
  • provide_context=True로 설정해야 함

 

##############
#DAG Setting
##############
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime

dag = DAG(
        dag_id = "xcomtest",
        start_date = datetime(2021,1,31),
        schedule_interval = '@once'
    )




#############
#Python code
#############


import requests
import logging

# csv파일을 str로 저장
def extract(**context):
	url = context["params"]["url"]
    logging.info(url)
    f = requests.get(url)
    return (f.text)

# extract 함수에서 얻어온 text를 xcom_pull로 가져와 처리함
def transform(**context):
    text = context['task_instance'].xcom_pull(task_ids='exec_extract')
    lines = text.split("\n")
    return lines

####################
# Dag Task Setting
####################

exec_extract = PythonOperator(
        task_id = 'exec_extract',
        python_callable = extract,       
        params={'url': 'https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv'},
        provide_context=True,
        dag = dag
        )
        
exec_transform = PythonOperator(
        task_id = 'exec_transform',
        python_callable = transform,
        provide_context=True,
        dag = dag
        )
        
exec_extract >> exec_transform

 

 

exec_extract Task 로그를 확인해보자. (extract 파이썬 함수를 호출하는 task)

  • XCom에 return값(f.text)이 저장되어 있는 것을 확인할 수 있다.
  • 원래는 xcom_push() 함수로 데이터를 XCom에 저장해야 되는데
  • 위에 적어놓은대로 python_callable로 함수를 호출하면, 자동으로 xcom_push()가 호출되어 XCom에 return값이 저장된다.

 

 

반응형

댓글