반응형
간단한 Airflow DAG를 살펴보자.
extract라는 파이썬 함수를 호출하는 task 1개로 이루어져 있다.
extract 함수를 호출할 때 argument를 같이 전달해야 한다.
그러면 Airflow DAG에서 python 함수를 호출할 때 어떻게 argument를 전달할까?
1) context 사용하는 방법
DAG를 먼저 작성하자. (/var/lib/airflow/dags/params_test.py)
##############
#DAG Setting
##############
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = "sunnytest",
start_date = datetime(2021,1,31),
schedule_interval = '@once'
)
#############
#Python code
#############
import requests
import logging
# csv파일을 str로 저장
def extract(**context):
url = context["params"]["url"]
logging.info(url)
f = requests.get(url)
return (f.text)
####################
# Dag Task Setting
####################
exec_extract = PythonOperator(
task_id = 'exec_extract',
python_callable = extract,
params={'url': 'https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv'},
provide_context=True,
dag = dag
)
- params 옵션의 값은 key, value 형태이다.
- provide_context = True 옵션도 같이 설정해주어야 한다.
- extract 파이썬 함수에서는 context["params"]["url"] 로 DAG에서 전달한 매개변수 값을 사용할 수 있다.
그 다음 DAG를 실행해본다.
터미널에서 실행하거나
#airflow run <dag_id> <task_id> <execution_date>
airflow run sunnytest exec_extract 2021-02-18
Airflow 웹에서 실행한다.
정상적으로 수행된 것을 확인할 수 있다.
task log도 확인해보자.
- 매개변수로 전달한 url 값을 로깅하도록 함수를 짜놨는데 logging.info(url)
- url 값이 제대로 로그에 남는 걸 확인할 수 있다.
2) op_kwargs 옵션 사용하는 방법
##############
#DAG Setting
##############
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = "sunnytest",
start_date = datetime(2021,1,31),
schedule_interval = '@once'
)
#############
#Python code
#############
import requests
# csv파일을 str로 저장
def extract(url):
f = requests.get(url)
return (f.text)
####################
# Dag Task Setting
####################
exec_extract = PythonOperator(
task_id = 'exec_extract',
python_callable = extract,
op_kwargs={'url': 'https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv'},
dag = dag
)
- op_kwargs 옵션의 값은 key, value 형태이다.
- extract 파이썬 함수에서는 python함수에서 파라미터 사용하듯이 사용하면 된다.
참고링크
*args
- argument가 여러 개 전달할 때 사용함. 튜플 형태로 들어옴.
**kwargs
- argument를 key, value 형태로 전달할 때 사용함. 딕셔너리 형태로 들어옴.
주의점
- 파이썬은 argument가 몇 개 들어올지 모른다. 함수 parameter 작성시, 변수->*args->**kwargs 순서로 작성해야 한다.
반응형
'Side Project > Airflow로 ETL 구축하기' 카테고리의 다른 글
[Airflow] Airflow 서버에서 S3 접근하기 (S3Hook) (4) | 2021.03.13 |
---|---|
[Airflow] task에서 return한 값 사용하기 (XCom) (6) | 2021.02.19 |
Airflow 스케쥴시간 설정 (execution_date, start_date) (3) | 2021.01.31 |
Airflow 설치 및 DAG 실행하기 (4) | 2021.01.30 |
댓글