AIP-39: Airflow 'schedule_interval'의 변신, 그리고 'execution_date'의 종말

AIP-39: Airflow 'schedule_interval'의 변신, 그리고 'execution_date'의 종말
Source: Apache Airflow(https://airflow.apache.org/)

Apache Top-level 프로젝트인 Apache Airflow (아파치 에어플로우)는 이제 많은 곳에서 사랑을 받고 있는 오픈소스 워크플로우 오케스트레이션 엔진이 되었습니다.
다른 오케스트레이션 엔진과는 다르게 파이썬으로 쉽게 코드를 작성할 수 있고, 작업 현황을 한눈에 확인할 수 있는 UI가 제공되기 때문에 많은 분들이 활용하고 계신데요, Airflow를 처음 접하시는 분들에게 가장 이해하기 어려운 개념 중 하나가 바로 execution_date입니다.

AIP-39가 드디어 Airflow 2.2에서 배포되었습니다. Airflow 2.2 버전에서 변경된 최근 내용만 보고 싶으시다면 AIP-39, schedule_interval 의 변신 부분부터 봐주시면 됩니다.

모든 문제의 근원 execution_date

execution_date는 그 이름과는 다르게, Task가 실제로 실행되는 시간이 아니라 '해당 Task가 실행하고자 하는 Time Window의 시작 지점' 을 의미합니다. 도대체 무슨 소리냐고요? 제가 쓰면서도 무슨 말인지 모르겠습니다. 이해를 돕기 위해 아주 간단한 예시를 들어보겠습니다.

다음과 같이 2021년 1월 1일부터 daily로 스케줄되는 DAG를 정의해 봅니다. Airflow는 Python 코드를 이용해서 DAG를 작성하기 때문에 유연하고 확장성 있게 작업을 정의할 수 있습니다.

혹시나 Airflow의 개념에 아직 익숙하지 않으신 분들은 Airflow 공식 문서에 기본 개념이 잘 설명되어 있으니 정독해 보시기를 추천드립니다.

# 2021-01-01 부터 하루에 한번 실행되는 DAG
# 빠른 이해를 위해 시간과 관계없는 다른 파라메터들과 타임존은 모두 생략했습니다
DAG(
    dag_id='test_dag',
    # '@daily', '0 0 * * *' or datetime.timedelta(days=1)
    schedule_interval='@daily',
    start_date=datetime.datetime(2021, 1, 1),
    # end_date=datetime.datetime(2021, 12, 31),
    ...
)

이 DAG의 스케줄을 그림으로 표현해 보면 아래와 같습니다.

DAG의 시작시점(start_date)인 2021-01-01 새벽 0시부터 1일 간격(schedule_interval)으로 task가 실행됩니다. 여기까지는 누구나 쉽게 이해할 수 있는 부분이죠. 부푼 마음을 안고 1월 1일 자정까지 기다려 봅니다. 첫번째 작업이 실행됐을까요? 정답은 아니오 입니다.

Airflow를 처음 접하시는 분들이 하는 가장 흔한 실수 중 하나입니다. 실제 첫번째 작업은 2021년 1월 2일 자정에 시작됩니다. 분명 start_date를 1월 1일로 했는데 대체 왜 그런걸까요? 이 규칙을 이해하기 위해서는 먼저 *타임 윈도우(Time Window)*의 개념을 이해해야 합니다.

전통적인 배치 데이터 처리 방식은 작업의 대상이 되는 데이터의 기간이 있고, 그 대상 기간이 지나고 나서 해당 기간에 발생한 데이터를 처리하게 됩니다. 하루에 한번 실행되는 작업이라면 하루가 끝나고 날짜가 바뀔 때 비로소 작업을 실행하게 되는 것이죠.

더 쉬운 예시를 들어 보겠습니다. 쇼핑몰을 운영하고 있다고 해 볼까요. 2021년 1월 3일의 일일 매출을 계산하고 싶다면, 3일이 끝나고 하루가 넘어가는 1월 4일 자정(0시)에 01/03 00:00:00 ~ 01/03 23:59:59에 발생한 매출을 계산해야 합니다. 여기서 작업 대상이 되는 데이터는 1월 3일자 데이터(=execution_date)가 되고, 실제 작업이 실행되는 시간은 1월 4일 0시가 됩니다.

다시 이전에 예시로 돌아와 볼까요. 위의 매출 계산에서 이해한 방식으로 2021년 1월 3일자 Task가 실행되는 방법을 그림으로 표현해 보면 이렇습니다.

즉, execution_date는 해당 Task가 처리해야 할 데이터 기간의 시작 시점인 2021년 1월 3일을 의미합니다. execution_date라는 이름을 직역하면 실행 날짜가 되기 때문에 그 의미와 실제 의미가 달라 자꾸 혼란이 발생하는 것이죠.
이제 start_date2021-01-01임에도 불구하고 1월 1일 자정에 작업이 실행되지 않은 이유가 이해되셨나요?

헷갈리지만 기본적인 개념을 살펴봤습니다. 언뜻 보면 개념만 잘 이해한다면 여러가지 작업들을 스케줄링 하는 데 크게 문제가 되지 않을 것 같은데요, 실제로도 그렇습니다. 그렇다면 이런 방식으로 표현되지 않는 작업은 어떤 것들이 있을까요?

1. 정해진 시점의 데이터를 기록하는 스냅샷(Snapshot)을 찍고 싶어요

스냅샷 데이터는 작업이 실행되는 그 시점의 데이터를 기록하는 것이기 때문에, 일반적으로 정해진 시간에 스냅샷을 찍고 스냅샷을 찍을 당시의 시간으로 기록해 두게 됩니다. 만약 하루에 한번 스냅샷을 기록하는 작업이라면 schedule_interval을 1일로 지정하고, 스냅샷 시간을 작업 당시의 시간 혹은 execution_date + timedelta(days=1)로 기록해야 합니다.

2. 스케줄 간격과 처리해야 하는 데이터의 기간이 달라요

매일 자정에 현재 시점부터 과거 7일치의 데이터를 가지고 계산하는 작업 같은 경우는 어떨까요? 대표적인 예로 사용자가 방문한 후 7일 이내에 다시 방문하는지를 알아보는 D7 리텐션(Retention) 같은 것이 있습니다. (일반적으로 'Rolling window'라고 부릅니다.)
현재 방법으로는 하루에 한번 작업을 실행하기 위해 schedule_interval을 1일로 지정하고, execution_date와 상관없이 과거 7일 데이터를 보도록 해야 합니다. (execution_date - timedelta(days=6))

두 가지 경우만 살펴봤는데도 벌써 execution_date 가 제 역할을 못 하고 있네요. 이 외에도 현재 schedule_interval은 Cron expression과 timedelta만 사용할 수 있기 때문에 '공휴일을 제외한 영업일에만 작업' 과 같이 복잡한 형태의 스케줄을 정의할 수 없다는 문제도 있습니다.

AIP-39, schedule_interval 의 변신

Proposal 문서: Airflow Improvement Proposal: AIP-39 Richer scheduler_interval
배포 버전: Airflow 2.2.0

AIP-39는 현재 투표 중인 Proposal로 그 내용이 아직 확정되지 않았기 때문에 구체적인 구현체의 이름들은 변경될 수 있으며, 최종 확정안에 따라 해당 게시글을 업데이트 할 예정입니다. (현재 투표 상황으로 Proposal 자체는 문제없이 통과 될 예정입니다.)

2021.12.02 AIP-39가 확정되어 드디어 Airflow 2.2에서 배포되었습니다. 관련 내용에 따라 글을 최신으로 업데이트했습니다.

앞서 말씀드린 모든 문제는 스케줄 간격타임 윈도우라는 각기 다른 2가지의 개념이 섞여 있기 때문에 발생했습니다.

지금의 Airflow 구조에서는 시작 시점과 스케줄 간격만을 정할 수 있기 때문에, 작업이 언제 실행될 지를 결정하는 스케줄작업의 대상이 되는 데이터의 Time Window를 분리해서 정의할 수 없습니다. schedule_interval이 스케줄 간격과 타임 윈도우 2가지의 역할을 동시에 하고 있기 때문입니다.

이 문제를 해결하기 위해 PMC 멤버인 Ash Berlin-Taylor와 최근 Airflow 개발에 많이 기여하고 있는 Astronomer의 James Timmins의 주도로 AIP-39 Richer scheduler_interval이 제안되었습니다. AIP-39의 핵심은 바로 작업의 스케줄 간격과 작업의 대상이 되는 타임 윈도우를 분리 하는 것입니다. 지금부터 AIP-39의 내용을 자세히 살펴보겠습니다.

execution_date의 종말

가장 큰 변화는 혼란의 중심에 있는 execution_date을 없애고 좀 더 명확히 타임 윈도우의 개념을 표현하기 위해 data_interval이라는 개념을 도입합니다. 타임 윈도우의 시작은 data_interval_start, 타임 윈도우의 끝은 data_interval_end가 됩니다.

또, 실제로 작업이 실행되는 시점을 표현하기 위해 기존에 오해의 여지가 많았던 변수인 execution_date라는 표현은 이제 사용하지 않고, 새로운 이름인 logical_date로 표현합니다. (하위 호환성 유지를 위해 기존에 사용되던 execution_date는 계속 유지합니다.)

즉, 작업이 실행되는 시점logical_date로 표현하고, 해당 시점에 실행되는 작업이 참고해야 할 데이터의 날짜 범위data_interval_startdata_interval_end로 각각 나누어 표현할 수 있게 된 것입니다.

이해를 돕기 위해 위에서 예시로 들었던 그림에 변경되는 변수들을 맨 아래에 표시해봤습니다.

기존에 사용되던 매크로/변수들과 새로 변경되는 변수들을 표로 정리하면 다음과 같습니다.

기존 이름 새로운 이름
execution_date data_interval_start, logical_date
next_execution_date data_interval_end
tomorrow_ds deprecate
tomorrow_ds_nodash deprecate
yesterday_ds deprecate
yesterday_ds_nodash deprecate
prev_execution_date prev_logical_date
next_execution_date next_logical_date

그 외에도 execution_date는 Airflow 스케줄링의 핵심 개념이기 때문에, Airflow Metadata DB에서도 이미 폭넓게 사용되고 있었습니다. 이에 따라 변경되는 DB테이블 구조의 변경 등 더 자세한 사항을 확인하려면 AIP-39 문서를 참고해 주세요.

Timetable 개념 도입

앞서 현재 schedule_interval은 크론 표현식(Cron expression)과 timedelta로만 표현할 수 있기 때문에 '공휴일을 제외한 영업일에만 작업' 과 같이 스케줄 중간중간에 구멍이 뚫려 있는 복잡한 형태의 스케줄을 정의할 수 없다고 말씀드렸습니다.

AIP-39에서는 이런 문제를 해결하기 위해 복잡한 시간을 표현할 수 있는 Timetable(https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html)이라는 새로운 클래스가 도입되었습니다. 여기서 모든 내용을 다룰 수는 없기에 간단하게 살펴보겠습니다. 더 자세한 내용은 공식 문서에서 확인할 수 있습니다.

참고로, 크론 표현식은 여전히 널리 쓰이기 때문에 사라지지 않고 계속 유지될 예정입니다. 사용자들은 상황에 맞추어 schedule_interval에 크론 표현식 혹은 Timetable 객체를 사용하면 됩니다.

class Timetable(Protocol):
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: Optional[DataInterval],
        restriction: TimeRestriction,
    ) -> Optional[DagRunInfo]:
        """
        스케줄러가 이 함수를 호출해 가장 마지막에 처리된 `data_interval`를 확인하고,
        이에 따라 Dag가 실행되어야 하는 다음 스케줄(`DagRun`)을 리턴합니다.
 
        :param last_automated_data_interval: 이미 실행된 스케줄(`DagRun`) 중
            가장 마지막에 실행된 스케줄에서 처리된 `data_interval` (manual run 제외)
        
        :param restriction: DAG 스케줄을 결정할 때 고려해야 할 제약사항을 담고 있습니다.
            DAG를 정의할 때 설정하는 `start_date`, `end_date`, `catchup` 등의 
            정보가 담겨 있습니다.
        """

먼저 스케줄 시점을 결정할 Timetable 의 추상 클래스를 살펴봅니다. 스케줄러는 일정한 간격으로 Timetablenext_dagrun_info를 찔러서 Dag의 다음 스케줄이 필요한지 확인합니다. 이 함수에서 마지막으로 실행된 Dag 스케줄(DagRun)에서 처리된 데이터의 범위(date_interval)을 보고, 바로 다음에 실행되어야 할 Dag 스케줄(DagRunInfo)을 리턴하면 스케줄러가 그 시간에 Dag를 실행하게 됩니다.

기존과는 달리 1. Dag의 마지막 실행 날짜(DagRun)가 아닌 해당 Dag에서 처리한 데이터의 범위(date_interval)를 기반으로 다음 Dag가 실행될 스케줄을 결정하게 되고, 2. 스케줄 시점을 파이썬 코드를 이용해서 작성할 수 있기 때문에 DAG의 스케줄 시점을 입맛에 맞게 자유롭게 설정할 수 있습니다.

class DagRunInfo(NamedTuple):
    run_date: DateTime
    """
    Dag가 실행되어야 하는 스케줄(DagRun) 중 가장 빠른 시점을 나타냅니다.
    만약 None이라면 Dag의 실행 스케줄이 아직 결정되지 않았다는 것을 의미합니다.
    """
 
    data_interval: DataInterval
    """
    Dag가 실행될 때 처리해야 하는 데이터의 기간을 표현합니다. ``[start, end)``
    """

다음으로 Dag의 스케줄 정보를 나타내는 DagRunInfo 클래스를 살펴봅니다. 스케줄러가 Timetable에 다음 실행 스케줄을 물어보면 이 클래스가 리턴됩니다. run_date는 Dag가 실행되어야 할 다음 스케줄을 알려주고, data_interval은 해당 스케줄에서 처리해야 할 데이터 기간의 범위를 표현합니다.

DAG가 실행되어야 하는 시점(run_date)과 처리해야 하는 데이터의 범위(data_interval)가 명확하게 분리되어 있는 것을 확인할 수 있습니다.

그렇다면 새로운 인터페이스가 좋은 것은 알겠는데, 기존과 다르게 뭘 할 수 있는걸까요? 앞서 들어보았던 예시로 살펴보겠습니다.

1. 정해진 시점의 데이터를 기록하는 스냅샷을 찍고 싶어요

스냅샷을 찍을 때는 특정 시점의 데이터를 기록하는 것이기 때문에 data_interval의 개념이 필요 없습니다. 이 경우 data_interval_start, data_interval_end 그리고 run_date 모두 같은 날짜가 됩니다.

2. 스케줄 간격과 처리해야 하는 데이터의 기간이 달라요

'하루에 한번, 자정에 가장 최근 7일 데이터를 처리'해야 하는 경우 run_date는 매일 자정으로 설정하고 data_interval을 7일로 설정합니다. 이제 스케줄 시점과 데이터 처리 기간을 따로 명시할 수 있기 때문에, 불필요한 계산 없이 작업 시점과 범위를 명확하게 표현할 수 있게 되었습니다.

또 다른 활용 예는 '하루에 한번 전날의 데이터를 자정이 아니라 새벽 2시에 처리'해야 하는 경우입니다. 예를 들면 서드파티에서 데이터를 가져오는데, 하루가 끝나고 데이터가 처리되는 데 약간의 시간이 걸려서 자정에 바로 데이터가 들어오는 것이 아니라 약간의 딜레이가 있어 새벽 2시 쯤에 들어오는 것이죠.

물론 기존에는 TimeSensor를 이용해 2시간을 기다렸다가 실행하거나, 크론 표현식을 0 2 * * * 와 같이 설정해, 작업이 자정이 아니라 새벽 2시에 시작되게 하는 방법을 사용했습니다. 하지만 전자는 스케줄 자원이 낭비되고, 후자는 execution_date가 2시로 설정돼 실제 데이터의 시간과 맞지 않는 문제가 있었습니다.

3. '공휴일을 제외한 영업일'과 같은 복잡한 스케줄을 하고 싶어요

Timetable을 구현할 때 휴일 정보를 제공하는 확인하는 API 등을 이용해 복잡한 스케줄을 자유롭게 구성할 수 있습니다. 다만, 스케줄을 판단하는 로직이 너무 오래 걸릴 경우 스케줄러의 퍼포먼스에 영향을 줄 수 있다는 점을 고려해야 합니다. 2.0 버전부터 지원되는 스케줄러 HA(고가용성)를 구성하면 이런 위험에서 조금 더 자유로울 수 있습니다.

마무리하며

지금까지 Airflow의 역사 중 가장 큰 변화라고 할 수 있는 AIP-39 Richer scheduler_interval에서 제안된 schedule_interval의 변신과 execution_date의 종말, 그리고 새로 도입된 Timetable에 대해서 살펴봤습니다. 기존에 혼재되어 있었던 개념인 작업의 실행 시점과 처리해야 하는 데이터의 범위가 명확하게 분리되었고, 파이썬 코드를 이용해서 DAG의 스케줄 시점을 입맛에 맞게 자유롭게 설정할 수 있게 되었습니다.

더 자세한 내용은 Airflow 위키에 있는 AIP-39 Richer scheduler_interval 문서를 참고해 주세요.

참고