AIP-39: Airflow 'schedule_interval'의 변신, 그리고 'execution_date'의 종말

Apache Top-level 프로젝트인 Apache Airflow (아파치 에어플로우)는 이제 많은 곳에서 사랑을 받고 있는 오픈소스 워크플로우 오케스트레이션 엔진이 되었습니다.
다른 오케스트레이션 엔진과는 다르게 파이썬으로 쉽게 코드를 작성할 수 있고, 작업 현황을 한눈에 확인할 수 있는 UI가 제공되기 때문에 많은 분들이 활용하고 계신데요, Airflow를 처음 접하시는 분들에게 가장 이해하기 어려운 개념 중 하나가 바로 execution_date입니다.

모든 문제의 근원 execution_date

execution_date는 그 이름과는 다르게, Task가 실제로 실행되는 시간이 아니라 '해당 Task가 실행하고자 하는 Time Window의 시작 지점'을 의미합니다. 도대체 무슨 소리냐고요? 제가 쓰면서도 무슨 말인지 모르겠습니다. 이해를 돕기 위해 아주 간단한 예시를 들어보겠습니다.

다음과 같이 2021년 1월 1일부터 daily로 스케줄되는 DAG를 정의해 봅니다. Airflow는 Python 코드를 이용해서 DAG를 작성하기 때문에 유연하고 확장성 있게 작업을 정의할 수 있습니다.

혹시나 Airflow의 개념에 아직 익숙하지 않으신 분들은 Airflow 공식 문서에 기본 개념이 잘 설명되어 있으니 정독해 보시기를 추천드립니다.

# 2021-01-01 부터 하루에 한번 실행되는 DAG
# 빠른 이해를 위해 시간과 관계없는 다른 파라메터들과 타임존은 모두 생략했습니다
DAG(  
    dag_id='test_dag',
    # '@daily', '0 0 * * *' or datetime.timedelta(days=1)
    schedule_interval='@daily',
    start_date=datetime.datetime(2021, 1, 1),
    # end_date=datetime.datetime(2021, 12, 31),
    ...
)

이 DAG의 스케줄을 그림으로 표현해 보면 아래와 같습니다.

DAG의 시작시점(start_date)인 2021-01-01 새벽 0시부터 1일 간격(schedule_interval)으로 task가 실행됩니다. 여기까지는 누구나 쉽게 이해할 수 있는 부분이죠. 부푼 마음을 안고 1월 1일 자정까지 기다려 봅니다. 첫번째 작업이 실행됐을까요? 정답은 아니오 입니다.

Airflow를 처음 접하시는 분들이 하는 가장 흔한 실수 중 하나입니다. 실제 첫번째 작업은 2021년 1월 2일 자정에 시작됩니다. 분명 start_date를 1월 1일로 했는데 대체 왜 그런걸까요? 이 규칙을 이해하기 위해서는 먼저 타임 윈도우(Time Window)의 개념을 이해해야 합니다.

전통적인 배치 데이터 처리 방식은 작업의 대상이 되는 데이터의 기간이 있고, 그 대상 기간이 지나고 나서 해당 기간에 발생한 데이터를 처리하게 됩니다. 하루에 한번 실행되는 작업이라면 하루가 끝나고 날짜가 바뀔 때 비로소 작업을 실행하게 되는 것이죠.

더 쉬운 예시를 들어 보겠습니다. 쇼핑몰을 운영하고 있다고 해 볼까요. 2021년 1월 3일의 일일 매출을 계산하고 싶다면, 3일이 끝나고 하루가 넘어가는 1월 4일 자정(0시)에 01/03 00:00:00 ~ 01/03 23:59:59에 발생한 매출을 계산해야 합니다. 여기서 작업 대상이 되는 데이터는 1월 3일자 데이터(=execution_date)가 되고, 실제 작업이 실행되는 시간은 1월 4일 0시가 됩니다.

다시 이전에 예시로 돌아와 볼까요. 위의 매출 계산에서 이해한 방식으로 2021년 1월 3일자 Task가 실행되는 방법을 그림으로 표현해 보면 이렇습니다.

즉, execution_date는 해당 Task가 처리해야 할 데이터 기간의 시작 시점인 2021년 1월 3일을 의미합니다. execution_date라는 이름을 직역하면 실행 날짜가 되기 때문에 그 의미와 실제 의미가 달라 자꾸 혼란이 발생하는 것이죠. 이제 start_date2021-01-01임에도 불구하고 1월 1일 자정에 작업이 실행되지 않은 이유가 이해되셨나요?

헷갈리지만 기본적인 개념을 살펴봤습니다. 언뜻 보면 개념만 잘 이해한다면 여러가지 작업들을 스케줄링 하는 데 크게 문제가 되지 않을 것 같은데요, 실제로도 그렇습니다. 그렇다면 이런 방식으로 표현되지 않는 작업은 어떤 것들이 있을까요?

1. 정해진 시점의 데이터를 기록하는 스냅샷(Snapshot)을 찍고 싶어요

스냅샷 데이터는 작업이 실행되는 그 시점의 데이터를 기록하는 것이기 때문에, 일반적으로 정해진 시간에 스냅샷을 찍고 스냅샷을 찍을 당시의 시간으로 기록해 두게 됩니다. 만약 하루에 한번 스냅샷을 기록하는 작업이라면 schedule_interval을 1일로 지정하고, 스냅샷 시간을 작업 당시의 시간 혹은 execution_date + timedelta(days=1)로 기록해야 합니다.

2. 스케줄 간격과 처리해야 하는 데이터의 기간이 달라요

매일 자정에 현재 시점부터 과거 7일치의 데이터를 가지고 계산하는 작업 같은 경우는 어떨까요? 대표적인 예로 사용자가 방문한 후 7일 이내에 다시 방문하는지를 알아보는 D7 리텐션(Retention) 같은 것이 있습니다. (일반적으로 'Rolling window'라고 부릅니다.) 현재 방법으로는 하루에 한번 작업을 실행하기 위해 schedule_interval을 1일로 지정하고, execution_date와 상관없이 과거 7일 데이터를 보도록 해야 합니다. (execution_date - timedelta(days=6))

두 가지 경우만 살펴봤는데도 벌써 execution_date 가 제 역할을 못 하고 있네요. 이 외에도 현재 schedule_interval은 Cron expression과 timedelta만 사용할 수 있기 때문에 '공휴일을 제외한 영업일에만 작업' 과 같이 복잡한 형태의 스케줄을 정의할 수 없다는 문제도 있습니다.

AIP-39, schedule_interval 의 변신

Airflow Improvement Proposal: AIP-39 Richer scheduler_interval

앞서 말씀드린 모든 문제는 스케줄 간격타임 윈도우라는 각기 다른 2가지의 개념이 섞여 있기 때문에 발생했습니다.

지금의 Airflow 구조에서는 시작 시점과 스케줄 간격만을 정할 수 있기 때문에, 작업이 언제 실행될 지를 결정하는 스케줄작업의 대상이 되는 데이터의 Time Window를 분리해서 정의할 수 없습니다. schedule_interval이 스케줄 간격과 타임 윈도우 2가지의 역할을 동시에 하고 있기 때문입니다.

이 문제를 해결하기 위해 PMC 멤버인 Ash Berlin-Taylor와 최근 Airflow 개발에 많이 기여하고 있는 Astronomer의 James Timmins의 주도로 AIP-39 Richer scheduler_interval이 제안되었습니다. AIP-39의 핵심은 바로 작업의 스케줄 간격과 작업의 대상이 되는 타임 윈도우를 분리하는 것입니다. 지금부터 AIP-39의 내용을 자세히 살펴보겠습니다.

AIP-39는 현재 투표 중인 Proposal로 그 내용이 아직 확정되지 않았기 때문에 구체적인 구현체의 이름들은 변경될 수 있으며, 최종 확정안에 따라 해당 게시글을 업데이트 할 예정입니다. (현재 투표 상황으로 Proposal 자체는 문제없이 통과 될 예정입니다.)

execution_date의 종말

가장 큰 변화는 혼란의 중심에 있는 execution_date을 없애고 좀 더 명확히 타임 윈도우의 개념을 표현하기 위해 date_interval이라는 개념을 도입합니다. 타임 윈도우의 시작은 data_interval_start, 타임 윈도우의 끝은 data_interval_end가 됩니다. 또, 실제로 작업이 실행되는 시점을 표현하기 위해 새로운 변수인 schedule_date을 도입합니다.

이해를 돕기 위해 위에서 예시로 들었던 그림에 변경되는 변수들을 맨 아래에 표시해봤습니다.

기존에 사용되던 매크로/변수들과 새로 변경되는 변수들을 표로 정리하면 다음과 같습니다.

기존 이름 새로운 이름
execution_date data_interval_start
next_execution_date data_interval_end
tomorrow_ds deprecate
tomorrow_ds_nodash deprecate
yesterday_ds deprecate
yesterday_ds_nodash deprecate
prev_execution_date prev_schedule_date
next_execution_date next_schedule_date

그 외에도 Airflow DB에서도 execution_date가 사용되고 있기 때문에 DB테이블 구조의 변경 등 더 자세한 사항을 확인하려면 AIP-39 문서를 참고해 주세요.

Timetable 개념 도입

앞서 현재 schedule_interval은 크론 표현식(Cron expression)과 timedelta로만 표현할 수 있기 때문에 '공휴일을 제외한 영업일에만 작업' 과 같이 복잡한 형태의 스케줄을 정의할 수 없다고 말씀드렸습니다.

AIP-39에서는 이런 문제를 해결하기 위해 복잡한 시간을 표현할 수 있는 Timetable이라는 새로운 클래스를 제안했습니다. 현재 제안되어 있는 인터페이스는 앞으로 언제든지 바뀔 수 있기 때문에, 간단하게 살펴보겠습니다.

참고로, schedule_interval은 여전히 범용적으로 잘 사용되고 있기 때문에 사라지지 않고 계속 유지될 예정입니다.

class AbstractTimetable(ABC):  
    @abstractmethod
    def next_dagrun_info(
        date_last_automated_dagrun: Optional[pendulum.DateTime],

        session: Session,
    ) -> Optional[DagRunInfo]:
        """
        스케줄러가 이 함수를 호출해 `date_last_automated_dagrun` 이후
        Dag가 실행되어야 하는 스케줄(DagRun) 중 가장 빠른 시점을 확인합니다.

        :param date_last_automated_dagrun: 이미 실행된 스케줄(DagRun) 중
        가장 마지막에 실행된 시간 = max(execution_date)
        """

먼저 스케줄 시점을 결정할 Timetable 의 추상 클래스를 살펴봅니다. 스케줄러는 일정한 간격으로 Timetablenext_dagrun_info를 찔러서 Dag의 다음 스케줄이 필요한지 확인합니다. 이 함수에서 Dag가 마지막으로 실행된 시간을 보고, 바로 다음에 실행되어야 할 스케줄(DagRunInfo)을 리턴하면 스케줄러가 그 시간에 Dag를 실행하게 됩니다.

기존과는 달리 스케줄 시점을 파이썬 코드를 이용해서 작성할 수 있기 때문에, DAG의 스케줄 시점을 입맛에 맞게 자유롭게 설정할 수 있습니다.

class DagRunInfo(NamedTuple):  
    run_date: Optional[DateTime]
    """
    Dag가 실행되어야 하는 스케줄(DagRun) 중 가장 빠른 시점을 나타냅니다.
    만약 None이라면 Dag의 실행 스케줄이 아직 결정되지 않았다는 것을 의미합니다.
    """

    data_interval: Optional[Tuple[DateTime, DateTime]]
    """
    Dag가 실행될 때 처리해야 하는 데이터의 기간을 표현합니다. ``[start, end)``
    """

다음으로 Dag의 스케줄 정보를 나타내는 DagRunInfo 클래스를 살펴봅니다. 스케줄러가 Timetable에 다음 실행 스케줄을 물어보면 이 클래스가 리턴됩니다. run_date는 Dag가 실행되어야 할 다음 스케줄을 알려주고, data_interval은 해당 스케줄에서 처리해야 할 데이터 기간의 범위를 표현합니다.

그렇다면 새로운 인터페이스가 좋은 것은 알겠는데, 기존과 다르게 뭘 할 수 있는걸까요? 앞서 들어보았던 예시로 살펴보겠습니다.

1. 정해진 시점의 데이터를 기록하는 스냅샷을 찍고 싶어요

스냅샷을 찍을 때는 특정 시점의 데이터를 기록하는 것이기 때문에 data_interval의 개념이 필요 없습니다. 이 경우 data_interval_start, data_interval_end 그리고 run_date 모두 같은 날짜가 됩니다.

2. 스케줄 간격과 처리해야 하는 데이터의 기간이 달라요

'하루에 한번, 자정에 가장 최근 7일 데이터를 처리'해야 하는 경우 run_date는 매일 자정으로 설정하고 data_interval을 7일로 설정합니다. 이제 스케줄 시점과 데이터 처리 기간을 따로 명시할 수 있기 때문에, 불필요한 계산 없이 작업 시점과 범위를 명확하게 표현할 수 있게 되었습니다.

또 다른 활용 예는 '하루에 한번 전날의 데이터를 자정이 아니라 새벽 2시에 처리'해야 하는 경우입니다. 예를 들면 서드파티에서 데이터를 가져오는데, 하루가 끝나고 데이터가 처리되는 데 약간의 시간이 걸려서 자정에 바로 데이터가 들어오는 것이 아니라 약간의 딜레이가 있어 새벽 2시 쯤에 들어오는 것이죠.

물론 기존에는 TimeSensor를 이용해 2시간을 기다렸다가 실행하거나, 크론 표현식을 0 2 * * * 와 같이 설정해, 작업이 자정이 아니라 새벽 2시에 시작되게 하는 방법을 사용했습니다. 하지만 전자는 스케줄 자원이 낭비되고, 후자는 execution_date가 2시로 설정돼 실제 데이터의 시간과 맞지 않는 문제가 있었습니다.

3. '공휴일을 제외한 영업일'과 같은 복잡한 스케줄을 하고 싶어요

Timetable을 구현할 때 휴일 정보를 제공하는 확인하는 API 등을 이용해 복잡한 스케줄을 자유롭게 구성할 수 있습니다. 다만, 스케줄을 판단하는 로직이 너무 오래 걸릴 경우 스케줄러의 퍼포먼스에 영향을 줄 수 있다는 점을 고려해야 합니다. 2.0 버전부터 지원되는 스케줄러 HA(고가용성)를 구성하면 이런 위험에서 조금 더 자유로울 수 있습니다.

마무리하며

지금까지 Airflow의 역사 중 가장 큰 변화라고 할 수 있는 schedule_interval의 변신과 execution_date의 종말에 대해서 살펴봤습니다. 앞서 여러번 말씀드렸지만 AIP-39는 아직 확정 단계가 아니기 때문에 전반적인 틀은 변하지 않겠지만, 세세한 사항들은 언제든지 변경될 수 있습니다.

더 자세한 내용은 Airflow 위키에 있는 AIP-39 Richer scheduler_interval 문서를 참고해 주시고, 추후 업데이트가 있다면 이 글에 업데이트 해 보겠습니다.