Airflow 스케쥴링 컨셉
일배치면 하루 전 기준으로 돌고, 시간배치면 시간 전 기준으로 도는 컨셉이다.
하루에 한 번 도는 스케쥴 (0 15 * * * *) 일 때
2021-03-08 15:00 에 2021-03-07 15:00 기준으로 실행된다.
2021-03-09 15:00 에 2021-03-08 15:00 기준으로 실행된다.
...
매 1시간마다 도는 스케쥴 (* */1 * * * *) 일 때
2021-03-08 15:00에 2021-03-08 14:00 기준으로 실행된다.
2021-03-08 16:00에 2021-03-08 15:00 기준으로 실행된다.
...
그럼 DAG를 보면서 Airflow 스케쥴 시간을 좀 더 자세히 알아보자.
예제 DAG
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
#DAG 설정
dag = DAG(
dag_id = 'my_first_dag_2',
start_date = datetime(2021,3,7),
schedule_interval = '*/10 * * * *'
)
#DAG Task 작성
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag
)
#print_goodbye = PythonOperator(
# task_id = 'print_goodbye',
# python_callable = print_goodbye,
# dag = dag
# )
#Assign the order of the tasks in our DAG
#print_hello >> print_goodbye
print_hello
- 현재시간 : 2021-03-08 08:34 (UTC)
- start_date : 2021-03-07
- schedule_interval : '*/10 * * * *' (매 10분마다 실행)
1) start_date
DAG가 시작되는 기준 시점이다. 고정값이다. (start_date 날짜에 실행된다는 의미가 아니다.)
start_date가 2021-03-07이면 DAG는 2021-03-07 00:00 기준으로 시작되는 것으로 스케쥴링 된다. 그리고 매 10분 기준마다 돌 것이다. 2021-03-07 00:10, 2021-03-07 00:20, ...
그러면 실제 DAG가 도는 시간을 생각해보자. 위에서 정리했듯이 Airflow 스케쥴링 컨셉은 일배치면 하루 전 기준으로 돌고, 시간배치면 시간 전 기준으로 돌고, 분배치면 분 전 기준으로 돈다. 그러니 2021-03-07 00:10 에 2021-03-07 00:00 기준으로 돈다. 그 다음 2021-03-07 00:20이 되면 2021-03-07 00:10 기준으로 돈다 ...
참고
현재 시간이 start_date보다 이전이면 DAG가 실행되지 않는다. start_date가 2021-03-09이면 DAG는 2021-03-09 00:00 기준으로 도는 것으로 스케쥴링 된다. 그리고 매 10분 기준마다 돌 것이다. 00:10, 00:20, ... 근데 현재 시간 2021-03-08 이라면? 스케쥴링 된 시간보다 이전이다. 그러면 DAG 안 돈다. |
매일 1번씩 실행되는 DAG를 생각해보자. |
DAG 코드의 start_date와 Airflow task 로그에서 start_date는 의미가 다르다.
전자는 DAG가 처음 시작하는 기준 날짜 (고정값)
후자는 task가 실제 실행되는 날짜
2) execution_date
- 실제 실행날짜가 아님. 로지컬한 시간이라고 봐야 한다.
- 일종의 주문번호임. DAG 실행될 때마다 바뀐다.
- 타임존은 UTC 기준이다
위의 DAG 예제를 따르면 execution_date는 2021-03-07 00:00 부터 ~ 2021-03-08 08:20 까지 바뀌면서 DAG가 여러개 돌 것이다. (현재시간 2021-03-08 08:34)
현재시간이 2021-03-08 08:40이 되면 execution_date가 2021-03-08 08:30인 DAG가 돈다.
이런식으로 10분마다 돈다.
start_date가 2021-03-08 이었다면? 2021-03-08 00:00 부터 ~ 2021-03-08 08:20 까지 바뀌면서 DAG가 돌 것이다.
- Airflow DAG 실행시 세팅할 수 있는 유일한 parameter이다.
- context variable을(ds,yesterday_ds, tomorrow_ds 등) execution_date로 사용할 수 있다.
- run, test, backfill, trigger 등 CLI 환경에서는 execution_date를 반드시 명시해야 한다.
- 스케쥴러로 DAG를 실행할 때는 execution_date가 자동으로 들어간다.
3) 주의점 (타임존이 달라서 날짜가 넘어가는 경우)
현재 날짜는 2021-03-08 16:00 이고, start_date는 2021-03-07 이라고 하자.
매일 15시 31분(UTC)에 DAG가 돌도록 세팅했다고 하자. schedule_interval="31 15 * * *" (daily 작업)
그럼 2021-03-07 15:31 기준부터 DAG가 실행될 것이다.
그다음 현재 날짜가 2021-03-09 15:31이 되면 2021-03-08 15:31 기준으로 DAG가 돌 것이다.
아무튼 2021-03-07 15:31 기준으로 DAG가 실행되는 케이스를 생각해보자.
Airflow의 컨셉상 2021-03-08 15:31 에 DAG가 실제 실행된다.
그럼 한국시간으로는 2021-03-09 00:31분이다. 즉 자정이 넘어서 하루가 지나간 날짜이다.
- 실제 실행된 시간: "2021-03-08 15:31" (=한국 시간 2021/03/09 00:31)
- execution_date: "2021-03-07 15:31" (= ds = 주문번호)
2021-03-09 에 2021-03-07 일자 작업을 하게 된 것이다.
우리가 원하는 동작은 2021-03-09에 2021-03-08 일자 작업을 하는 것이다. 그러니 한국시간으로 자정이 넘어가서 날짜가 바뀌는 경우에는 execution_date를 하루 빠르게 설정해준다. execution_date를 tomorrow_ds로 설정하여 사용하면 된다. ( execution_date가 2021-03-08로 잡힘)
-----------------
참고링크
'Side Project > Airflow로 ETL 구축하기' 카테고리의 다른 글
[Airflow] Airflow 서버에서 S3 접근하기 (S3Hook) (4) | 2021.03.13 |
---|---|
[Airflow] task에서 return한 값 사용하기 (XCom) (6) | 2021.02.19 |
[Airflow] python 함수 호출시 argument 넘기기 (4) | 2021.02.19 |
Airflow 설치 및 DAG 실행하기 (4) | 2021.01.30 |
댓글