본문 바로가기
반응형

Side Project/Airflow로 ETL 구축하기5

[Airflow] Airflow 서버에서 S3 접근하기 (S3Hook) python으로 S3Hook을 사용하여 S3에 접근해보자. 그 전에 Airflow 서버에 AWS 인증설정이 되어 있어야 한다. (참고) 그리고 Airflow 서버에서(EC2 사용) S3에 접근권한을 설정해야 한다. IAM에 역할 설정 (EC2 to S3 Access) EC2에 해당 역할 적용 from airflow.hooks.S3_hook import S3Hook bucket = '버켓이름' s3_hook = S3Hook(bucket) #S3 디렉토리 리스팅 keys = s3_hook.list_keys(bucket_name=bucket) # s3에 업로드 s3_key = 'sunnydir/sunnytest.csv' s3_hook.load_file(filename='/var/lib/airflow/test/.. 2021. 3. 13.
[Airflow] task에서 return한 값 사용하기 (XCom) 이전 글에서 task 1개로 이루어진 DAG를 실행해봤다. 이 task는 extract라는 파이썬 함수를 호출한다. 여기에다가 task 1개를 추가해보자. 이 task는 transform이라는 파이썬 함수를 호출한다. transform은 extract함수가 return한 값을 처리하는 함수이다. 이런 경우, Airflow DAG를 어떻게 짜야할까? Airflow XCom를 사용하면 된다. Task간 데이터를 주고 받을때 사용됨 Airflow 웹UI에서 task instance 로그에서도 XCom으로 어떤 데이터를 주고받는지 확인 가능 python task의 경우, python_callable로 호출한 함수에서 return하는 값이 있을 경우 자동으로 xcom_push() 가 실행됨. 즉 return 값이.. 2021. 2. 19.
[Airflow] python 함수 호출시 argument 넘기기 간단한 Airflow DAG를 살펴보자. extract라는 파이썬 함수를 호출하는 task 1개로 이루어져 있다. extract 함수를 호출할 때 argument를 같이 전달해야 한다. 그러면 Airflow DAG에서 python 함수를 호출할 때 어떻게 argument를 전달할까? 1) context 사용하는 방법 DAG를 먼저 작성하자. (/var/lib/airflow/dags/params_test.py) ############## #DAG Setting ############## from airflow import DAG from airflow.operators import PythonOperator from datetime import datetime dag = DAG( dag_id = "sunn.. 2021. 2. 19.
Airflow 스케쥴시간 설정 (execution_date, start_date) Airflow 스케쥴링 컨셉 일배치면 하루 전 기준으로 돌고, 시간배치면 시간 전 기준으로 도는 컨셉이다. 하루에 한 번 도는 스케쥴 (0 15 * * * *) 일 때 2021-03-08 15:00 에 2021-03-07 15:00 기준으로 실행된다. 2021-03-09 15:00 에 2021-03-08 15:00 기준으로 실행된다. ... 매 1시간마다 도는 스케쥴 (* */1 * * * *) 일 때 2021-03-08 15:00에 2021-03-08 14:00 기준으로 실행된다. 2021-03-08 16:00에 2021-03-08 15:00 기준으로 실행된다. ... 그럼 DAG를 보면서 Airflow 스케쥴 시간을 좀 더 자세히 알아보자. 예제 DAG from airflow import DAG fr.. 2021. 1. 31.
Airflow 설치 및 DAG 실행하기 테스트환경 ec2 서버 (ubuntu) airflow 싱글노드로 구성 Airflow 설치하기 1) ec2 서버 접속 pem 파일 다운로드 Windows Power shell 실행 pem 파일 있는 디렉토리로 이동 cd C:\Users\GRAM14\Desktop\study\programmers_de ssh 접속 ssh -i ubuntu@ 2) python 업데이트 및 설치 sudo apt-get update sudo apt-get install -y python3-pip 3) airflow 및 python 모듈 설치 sudo pip3 install apache-airflow==1.10.13 sudo pip3 install cryptography psycopg2-binary boto3 botocore 4) .. 2021. 1. 30.
반응형