본문 바로가기
Side Project/Airflow로 ETL 구축하기

[Airflow] python 함수 호출시 argument 넘기기

by 잇서니 2021. 2. 19.
반응형

 

 

간단한 Airflow DAG를 살펴보자.

extract라는 파이썬 함수를 호출하는 task  1개로 이루어져 있다.

 

extract 함수를 호출할 때 argument를 같이 전달해야 한다.

그러면 Airflow DAG에서 python 함수를 호출할 때 어떻게 argument를 전달할까? 

 

 

 

1) context 사용하는 방법

 

DAG를 먼저 작성하자. (/var/lib/airflow/dags/params_test.py)

##############
#DAG Setting
##############
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime

dag = DAG(
        dag_id = "sunnytest",
        start_date = datetime(2021,1,31),
        schedule_interval = '@once'
    )




#############
#Python code
#############


import requests
import logging

# csv파일을 str로 저장
def extract(**context):
	url = context["params"]["url"]
    logging.info(url)
    f = requests.get(url)
    return (f.text)



####################
# Dag Task Setting
####################

exec_extract = PythonOperator(
        task_id = 'exec_extract',
        python_callable = extract,       
        params={'url': 'https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv'},
        provide_context=True,
        dag = dag
        )
        
        
  • params 옵션의 값은 key, value 형태이다.
  • provide_context = True 옵션도 같이 설정해주어야 한다.
  • extract 파이썬 함수에서는 context["params"]["url"] 로 DAG에서 전달한 매개변수 값을 사용할 수 있다.

 

그 다음 DAG를 실행해본다.

 

터미널에서 실행하거나

#airflow run <dag_id> <task_id> <execution_date>

airflow run sunnytest exec_extract 2021-02-18

 

Airflow 웹에서 실행한다.

 

정상적으로 수행된 것을 확인할 수 있다.

task log도 확인해보자.

  • 매개변수로 전달한 url 값을 로깅하도록 함수를 짜놨는데 logging.info(url)
  • url 값이 제대로 로그에 남는 걸 확인할 수 있다.

 

 

2) op_kwargs 옵션 사용하는 방법

##############
#DAG Setting
##############
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime

dag = DAG(
        dag_id = "sunnytest",
        start_date = datetime(2021,1,31),
        schedule_interval = '@once'
    )




#############
#Python code
#############


import requests

# csv파일을 str로 저장
def extract(url):
    f = requests.get(url)
    return (f.text)



####################
# Dag Task Setting
####################

exec_extract = PythonOperator(
        task_id = 'exec_extract',
        python_callable = extract,
        op_kwargs={'url': 'https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv'},
        dag = dag
        )
        
        
  • op_kwargs 옵션의 값은 key, value 형태이다.
  • extract 파이썬 함수에서는 python함수에서 파라미터 사용하듯이 사용하면 된다.

 

참고링크

 

[나름 중급 파이썬1] *args와 **kwargs

항상 헷갈리는 두 가지 다시 한번 살펴보자 | 이 글은 파이썬의 문법을 모르면 이해하기 어렵습니다. python의 함수 작성 요령, 인자(argument)와 파라미터를 이해한다면 도움이 되는 내용입니다. 아

brunch.co.kr

*args

- argument가 여러 개 전달할 때 사용함. 튜플 형태로 들어옴.

**kwargs

- argument를 key, value 형태로 전달할 때 사용함. 딕셔너리 형태로 들어옴. 

주의점

- 파이썬은 argument가 몇 개 들어올지 모른다. 함수 parameter 작성시, 변수->*args->**kwargs 순서로 작성해야 한다.

반응형

댓글