Local AirFlow 설치하기

     

    Airflow Local 설치 (OSX)

    https://airflow.apache.org/

    예전에는 실행하고자 하는 job들을 crontab으로만 구성하였으나, 이제는 rundeck, oozie, airflow 등 많은 툴들이 사용되고 있다.
    이와 같은 툴들은 각 job들의 실행 순서를 정의하고, 언제 실행될지, 실행시간이 얼마나 걸리는지 등에 대한 정보도 제공하기 때문에 유지보수나 관리, 성능개선 트래킹 관점에서도 crontab과는 비교할 수 없을만큼 유용하다.

    AirFlow의 공식 사이트는 https://airflow.apache.org/ 이다.
    다른 어떤 문서보다 공식사이트내 설치, 설정 활용 등에 대한 가이드를 먼저 참고하는 것이 좋다.

    pip로 airflow를 설치할 것이므로, 다음과 같이 pip를 설치한다.

     

    예전 설치 방식

    sudo easy_install pip

    설치시 문제가 발생하여, pip를 하위 버전으로 다운그레이드하고,

    pip install --pip==9.0.3
    pip install airflow

    와 같이 설치를 진행하였다. (당시 별 문제 없었음)

     

    반응형

     

    최근 설치 방식

    그런데, 2018/12/04 기준으로 위와 같이 진행하니 다른 에러가 발생하여, 아래와 같이 변경 설치하였다. 설치 환경에 따라 다를 수 있으므로, 에러가 나는 경우에만 관련 내용을 참고하면 될 듯 하다.

    sudo pip install apache-airflow

    만약, 다음과 같은 에러가 뜬다면

    File "/private/tmp/pip-build-pq6f_5/apache-airflow/setup.py", line 49, in verify_gpl_dependency
    
     raise RuntimeError("By default one of Airflow's dependencies installs a GPL "
    
     RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE

    맨 마지막 줄에 뜨는 메시지를 반영하여, 다음과 같이 실행한다.

    sudo SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow

    또, 만약 dateutil과 관련하여 다음과 같은 에러가 뜬다면, (예전엔 못봤던 에러)

     Found existing installation: python-dateutil 1.5
        Uninstalling python-dateutil-1.5:
    Could not install packages due to an EnvironmentError:

    최종적으로 다음과 같이 실행하여 airflow 설치를 완료하였다.

     sudo SLUGIFY_USES_TEXT_UNIDECODE=yes pip install apache-airflow --ignore-installed python-dateutil

     

     

    Airflow 설정 DB 초기화

    설치 후, 1회만 해주면 되는 job으로 airflow가 내부 참조하는 sqlite db 초기화 작업이 있다.

    airflow initdb

     

    Airflow Web Server 실행

    airflow webserver -p {포트}

    웹 서버를 띄우면, 위와 같이 여러 샘플 workflow들이 포함되어 있다.

     

    Example

    제일 위의 example_bash_operator를 살펴보면, 세부 내역을 확인할 수 있다.

    각 job의 실행 순서가 다소 헷갈릴 수 있는데, leaf 부터 root로 올라가는 실행 구조이다.
    이는 graph view에서 보면 좀더 직관적으로 표현된다. (화살표가 보여서)

    DAG(Directed Acyclic Graph)는 말 그대로, 방향성이 있으면서 사이클이 없는 그래프 즉, 일방통행 그래프라 할 수 있다. 임의의 노드를 클릭하면, context menu가 뜨는데 해당 노드별 세부 내용 조회 및 지시를 할 수 있다.

    상단의 코드 탭을 클릭하면, 이 DAG의 소스를 볼 수 있다.

    # -*- coding: utf-8 -*-
    #
    # Licensed to the Apache Software Foundation (ASF) under one
    # or more contributor license agreements.  See the NOTICE file
    # distributed with this work for additional information
    # regarding copyright ownership.  The ASF licenses this file
    # to you under the Apache License, Version 2.0 (the
    # "License"); you may not use this file except in compliance
    # with the License.  You may obtain a copy of the License at
    #
    #   http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing,
    # software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    # KIND, either express or implied.  See the License for the
    # specific language governing permissions and limitations
    # under the License.
    
    from builtins import range
    from datetime import timedelta
    
    import airflow
    from airflow.models import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.dummy_operator import DummyOperator
    
    args = {
        'owner': 'airflow',
        'start_date': airflow.utils.dates.days_ago(2),
    }
    
    dag = DAG(
        dag_id='example_bash_operator',
        default_args=args,
        schedule_interval='0 0 * * *',
        dagrun_timeout=timedelta(minutes=60),
    )
    
    run_this_last = DummyOperator(
        task_id='run_this_last',
        dag=dag,
    )
    
    # [START howto_operator_bash]
    run_this = BashOperator(
        task_id='run_after_loop',
        bash_command='echo 1',
        dag=dag,
    )
    # [END howto_operator_bash]
    
    run_this >> run_this_last
    
    for i in range(3):
        task = BashOperator(
            task_id='runme_' + str(i),
            bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
            dag=dag,
        )
        task >> run_this
    
    # [START howto_operator_bash_template]
    also_run_this = BashOperator(
        task_id='also_run_this',
        bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
        dag=dag,
    )
    # [END howto_operator_bash_template]
    also_run_this >> run_this_last
    
    if __name__ == "__main__":
        dag.cli()

     

     


     

    Workflow 실행순서 지정

    위의 tree view 또는 graph view에 표현된 바와 같이, job의 실행 순서를 지정하는 부분이
    task >> run_this와 같이 표기된 부분이다.

    Airflow 공식 가이드에도 소개된 바와 같이 두 가지의 표현방식을 사용할 수 있다.

     

    1. using set_upstream(), set_downstream() methods

    op1.set_downstream(op2)
    op2.set_downstream(op3)
    op3.set_upstream(op4)

     

    2. using bitwise operators

    op1 >> op2 >> op3 << op4

    두 표현식 모두 실행순서는 동일하다.
    op1을 먼저 실행하고, op2, op3의 순서대로 실행된다.

     

    Scheduler 실행

    DAG의 실행주기를 설정한 후, scheduler daemon을 띄워서 주기적으로 실행되도록 한다.

    airflow scheduler

    댓글

    Designed by JB FACTORY