시작하며

프로그래머스 데이터 엔지니어링 강의 6강에서는 OLTP 테이블을 DW인 AWS Redshift로 복사하는 방법과, Summary 테이블을 Airflow DAG로 구현하는 방법을 다룬다. 또한 Airflow를 프로덕션 환경에 배포하는 방법과 Slack 연동에 대해서도 정리한다.

OLTP 테이블을 Redshift로 복사하기

flowchart LR
    OLTP["`Production MySQL Tables
    (OLTP)`"]
    OLAP["`DataWareHouse AWS RedShift
    (OLAP)`"]
    OLTP --> OLAP

PRODUCTION TABLE을 DW인 AWS Redshift로 옮기는 방법은 두 가지가 있다.

    1. OLTP → Local Disk(Airflow) → RedShift (Bulk Update)
    1. OLTP → Local Disk(Airflow) → AWS S3 → RedShift (Insert)
    • SqlToS3Operator
    • S3ToRedshiftOperator
flowchart LR
    OLTP["`Production MySQL Tables
    (OLTP, Source)`"]
    OLAP["`DataWareHouse AWS RedShift
    (OLAP, Target)`"]
    LD["`Local Disk
    (Airflow Server)`"]
    S3["`AWS S3`"]
    OLTP --> LD
    LD --> S3
    S3 --> |RedShift Bulk Copy|OLAP
    LD --> |Insert|OLAP

v1: Full Replace 방식

# v1
 
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
 
from datetime import datetime
from datetime import timedelta
 
import requests
import logging
import psycopg2
import json
 
 
dag = DAG(
    dag_id = 'MySQL_to_Redshift',
    start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)
 
schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
 
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = "SELECT * FROM prod.nps",
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True, # 이미 존재하는 경우 덮어쓴다.
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)
 
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    method = 'REPLACE',
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",
    dag = dag
)
 
mysql_to_s3_nps >> s3_to_redshift_nps

v2: Incremental Upsert 방식

# v2
 
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
 
from datetime import datetime
from datetime import timedelta
 
import requests
import logging
import psycopg2
import json
 
dag = DAG(
    dag_id = 'MySQL_to_Redshift_v2',
    start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
    schedule = '0 9 * * *',  # 적당히 조절
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)
 
schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table       # s3_key = schema + "/" + table
 
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    query = sql,
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)
 
s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",    
    method = "UPSERT",
    upsert_keys = ["id"], # primary key 로 지정
    dag = dag
)
 
mysql_to_s3_nps >> s3_to_redshift_nps

Backfill 및 Summary 테이블 구현

Backfill을 커맨드 라인에서 실행하는 방법

airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
  • Full Refresh가 가능하다면 Backfill을 굳이 할 이유는 없다.
  • 데이터 크기가 굉장히 커지면 Backfill 기능을 구현해 두는 것이 필수이다.

Summary 테이블 구현

  • 이 부분을 dbt로 구현하는 회사들이 많다(Analytics Engineer).
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta
 
from airflow import AirflowException
 
import requests
import logging
import psycopg2
 
from airflow.exceptions import AirflowException
 
def get_Redshift_connection():
    hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
    return hook.get_conn().cursor()
 
 
def execSQL(**context):
 
    schema = context['params']['schema'] 
    table = context['params']['table']
    select_sql = context['params']['sql']
 
    logging.info(schema)
    logging.info(table)
    logging.info(select_sql)
 
    cur = get_Redshift_connection()
 
    sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
    sql += select_sql
    cur.execute(sql)
 
    cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
    count = cur.fetchone()[0]
    if count == 0:
        raise ValueError(f"{schema}.{table} didn't have any record")
 
    try:
        sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};"""
        sql += "COMMIT;"
        logging.info(sql)
        cur.execute(sql)
    except Exception as e:
        cur.execute("ROLLBACK")
        logging.error('Failed to sql. Completed ROLLBACK!')
        raise AirflowException("")
 
 
dag = DAG(
    dag_id = "Build_Summary",
    start_date = datetime(2021,12,10),
    schedule = '@once',
    catchup = False
)
 
execsql = PythonOperator(
    task_id = 'mau_summary',
    python_callable = execSQL,
    params = {
        'schema' : 'keeyong',
        'table': 'mau_summary',
        'sql' : """SELECT 
  TO_CHAR(A.ts, 'YYYY-MM') AS month,
  COUNT(DISTINCT B.userid) AS mau
FROM raw_data.session_timestamp A
JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1 
;"""
    },
    dag = dag
)

정리하며

Airflow를 프로덕션 환경에 배포할 때는 .cfg 설정 파일을 통해 환경변수를 관리하며, 예를 들어 AIRFLOW__CORE__EXECUTOR=CeleryExecutor와 같이 작성하면 CORE 섹션의 EXECUTOR 키값을 CeleryExecutor로 변경할 수 있다. 프로덕션 배포 방법으로는 Metadata DB, Cloud 환경, Kubernetes 기반의 세 가지가 있다.

Slack 연동을 통해 DAG 실행 중 에러가 발생하면 지정된 Slack 워크스페이스 채널로 알림을 보낼 수 있다.

from plugins import slack ...
default_args= {
    'on_failure_callback': slack.on_failure_callback,
}

이번 강의에서는 MySQL OLTP 테이블을 S3를 경유하여 Redshift로 복사하는 두 가지 패턴(Full Replace, Incremental Upsert)과 Summary 테이블 구현 방법을 실습했다. 데이터 규모가 커질수록 Incremental Update와 Backfill 전략이 중요해진다.