Local AirFlow 설치하기
- Development/Hadoop, NoSQL, BigData
- 2020. 12. 9.
Airflow Local 설치 (OSX)
예전에는 실행하고자 하는 job들을 crontab으로만 구성하였으나, 이제는 rundeck, oozie, airflow 등 많은 툴들이 사용되고 있다.
이와 같은 툴들은 각 job들의 실행 순서를 정의하고, 언제 실행될지, 실행시간이 얼마나 걸리는지 등에 대한 정보도 제공하기 때문에 유지보수나 관리, 성능개선 트래킹 관점에서도 crontab과는 비교할 수 없을만큼 유용하다.
AirFlow의 공식 사이트는 https://airflow.apache.org/ 이다.
다른 어떤 문서보다 공식사이트내 설치, 설정 활용 등에 대한 가이드를 먼저 참고하는 것이 좋다.
pip로 airflow를 설치할 것이므로, 다음과 같이 pip를 설치한다.
예전 설치 방식
sudo easy_install pip
설치시 문제가 발생하여, pip를 하위 버전으로 다운그레이드하고,
pip install --pip==9.0.3
pip install airflow
와 같이 설치를 진행하였다. (당시 별 문제 없었음)
최근 설치 방식
그런데, 2018/12/04 기준으로 위와 같이 진행하니 다른 에러가 발생하여, 아래와 같이 변경 설치하였다. 설치 환경에 따라 다를 수 있으므로, 에러가 나는 경우에만 관련 내용을 참고하면 될 듯 하다.
sudo pip install apache-airflow
만약, 다음과 같은 에러가 뜬다면
File "/private/tmp/pip-build-pq6f_5/apache-airflow/setup.py", line 49, in verify_gpl_dependency
raise RuntimeError("By default one of Airflow's dependencies installs a GPL "
RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE
맨 마지막 줄에 뜨는 메시지를 반영하여, 다음과 같이 실행한다.
sudo SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow
또, 만약 dateutil과 관련하여 다음과 같은 에러가 뜬다면, (예전엔 못봤던 에러)
Found existing installation: python-dateutil 1.5
Uninstalling python-dateutil-1.5:
Could not install packages due to an EnvironmentError:
최종적으로 다음과 같이 실행하여 airflow 설치를 완료하였다.
sudo SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow --ignore-installed python-dateutil
Airflow 설정 DB 초기화
설치 후, 1회만 해주면 되는 job으로 airflow가 내부 참조하는 sqlite db 초기화 작업이 있다.
airflow initdb
Airflow Web Server 실행
airflow webserver -p {포트}
웹 서버를 띄우면, 위와 같이 여러 샘플 workflow들이 포함되어 있다.
Example
제일 위의 example_bash_operator를 살펴보면, 세부 내역을 확인할 수 있다.
각 job의 실행 순서가 다소 헷갈릴 수 있는데, leaf 부터 root로 올라가는 실행 구조이다.
이는 graph view에서 보면 좀더 직관적으로 표현된다. (화살표가 보여서)
DAG(Directed Acyclic Graph)는 말 그대로, 방향성이 있으면서 사이클이 없는 그래프 즉, 일방통행 그래프라 할 수 있다. 임의의 노드를 클릭하면, context menu가 뜨는데 해당 노드별 세부 내용 조회 및 지시를 할 수 있다.
상단의 코드 탭을 클릭하면, 이 DAG의 소스를 볼 수 있다.
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from builtins import range
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_bash_operator',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
run_this_last = DummyOperator(
task_id='run_this_last',
dag=dag,
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id='runme_' + str(i),
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
dag=dag,
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
if __name__ == "__main__":
dag.cli()
Workflow 실행순서 지정
위의 tree view 또는 graph view에 표현된 바와 같이, job의 실행 순서를 지정하는 부분이
task >> run_this와 같이 표기된 부분이다.
Airflow 공식 가이드에도 소개된 바와 같이 두 가지의 표현방식을 사용할 수 있다.
1. using set_upstream(), set_downstream() methods
op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)
2. using bitwise operators
op1 >> op2 >> op3 << op4
두 표현식 모두 실행순서는 동일하다.
op1을 먼저 실행하고, op2, op3의 순서대로 실행된다.
Scheduler 실행
DAG의 실행주기를 설정한 후, scheduler daemon을 띄워서 주기적으로 실행되도록 한다.
airflow scheduler
'Development > Hadoop, NoSQL, BigData' 카테고리의 다른 글
Redis 개요 (0) | 2021.04.28 |
---|---|
Avro 개요 (0) | 2021.04.23 |
Redis 설치 방법 세 가지 (0) | 2021.04.21 |
Docker기반 Spark Cluster 설치하기 (6) | 2020.12.15 |
brew로 local zeppelin 설치하기 (0) | 2020.11.30 |
The Hadoop Distributed File System : Architecture and Design 요약 (0) | 2008.07.29 |
HBase에서 HQL 사용하기 (0) | 2008.07.25 |
HBase 설치/설정하기 (0) | 2008.07.25 |