시작하며
2023년 4월 한국금융연수원에서 진행된 클라우드 데이터 인프라 과정의 4일차 내용이다. AWS EMR(Elastic MapReduce)에서 Spark 코드를 실행하는 실습과 함께 Hive, Pig 코드 예제, MLOps 오픈소스, Hadoop 생태계 개요를 다룬다.

EMR과 Hadoop 개요
EMR (Elastic Map Reduce)
- Hadoop은 병렬처리(PPM)와 Object-Storage(HDFS)를 의미한다.
- AWS는 병렬처리는 지원하지만 HDFS는 지원하지 않고자 했기 때문에(지금은 지원), 데이터 저장소보다는 프로세싱 엔진으로 생각하는 편이 좋다.
Hadoop 개요
Hadoop: 대용량 파일을 분산저장(HDFS)하여 MapReduce를 통해 원하는 정보를 처리한다.
- Distributed, Scalable, Fault-tolerant를 특징으로 갖는다.
- 리소스 관리는 YARN이 담당한다.
- Hadoop HDFS, Hadoop MapReduce, Hadoop YARN이 주요 구성요소이다.
- KUDU: Object Storage의 immutable storage 특징을 극복할 수 있게 도와주는 SW
- HBASE: HDFS 기반의 컬럼너 NoSQL DB이다.
- IMPALA: C++로 구성되어 Spark보다 빠르게 동작하지만, only-memory라 메모리의 양이 결국 처리할 수 있는 데이터의 양이라는 단점이 있다.
- HUE: UI이나 오히려 Zeppelin을 사용하기도 한다.
Hadoop Stack
- 최근에는 네트워크 성능이 많이 좋아져서 Hadoop Storage와 Computing 엔진을 별도로 두는 경우가 많다.
EMR 노드 구성
- Master Node: 클러스터를 관리한다.
- Core Node: 데이터 노드
- Task Node: 선택적인 노드이다.
EMR 실습
실습 1. EMR에서 Spark 코드 실행하기
1. EMR 구성하기
- EMR 클러스터 생성
- AWS Glue 에서 Hive 데이터 메타데이터에서 사용한다.
- 추후 EMR을 죽거나 필요할 때 켜서 쓰는 데 유용하다.
- 비용절감 측면에서 스팟 형태로도 많이 사용한다.
2. EMR에 보안 설정
- EMR > Summary > security groups for master > edit inbound rules
- c9 private ip 허용
- local ip address 허용
3. ssh로 c9 혹은 로컬(inbound 22포트 추가로 추가) 에서 접속
- ssh hadoop@ec2-43-201-36-199.ap-northeast-2.compute.amazonaws.com -i EmrKeyPair.pem
4. S3 에 버킷 및 하위 디렉토리 생성
- data, files, input, logs, output 폴더 생성
- 요건에 맞게 폴더별 데이터 업로드
5. EMR에 접속하여 wget으로 실행하고자 하는 파일 저장
- wget https://raw.githubusercontent.com/iamtaewan/workstation-demo/main/spark-etl.py
6. EMR에서 spark 코드 실행
- spark-submit spark-etl.py s3://<YOUR-BUCKET>/input/ s3://<YOUR-BUCKET>/output/spark
- (spark-submit spark-etl.py s3://emrdemo-kbi0127/input/ s3://emrdemo-kbi0127/output/spark)
7. Spark History Server
+ 동일한 방식으로 step 에서도 실행할 수 있고
+ Jupyter Lab 에서도 실행할 수 있다.
실습 2. Spark 코드 예제(1)
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 3):
print("Usage: spark-etl [input-folder] [output-folder]")
sys.exit(0)
spark = SparkSession\
.builder\
.appName("SparkETL")\
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.printSchema()
print(updatedNYTaxi.show())
print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet(sys.argv[2])실습 3. Spark 코드 예제(2)
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, col
input_path = "s3://emrdemo-kbi0127/data/sales.csv"
output_path = "s3://emrdemo-kbi0127/output/jupyter/"
df = (spark.read.format('csv').option('header', 'True').option(
"inferSchema", "true").load(input_path))
df.show()
df_final = (
df.withColumn("order_id", df["Order ID"]).drop("Order ID").withColumn(
"order_date",
to_date(col("Order Date"), "M/d/yyyy")).drop("Order Date").withColumn(
"item_type", df["Item Type"]).drop("Item Type").withColumn(
"sales_channel",
df["Sales Channel"]).drop("Sales Channel").withColumn(
"units_sold",
df["Units Sold"].cast('float')).drop("Units Sold").
withColumn("unit_price",
df["Unit Price"].cast('float')).drop("Unit Price").withColumn(
"total_cost", df["Total Cost"].cast('float')).
drop("Total Cost").withColumn(
"total_profit",
df["Total Profit"].cast('float')).drop("Total Profit").withColumn(
"total_revenue",
df["Total Revenue"].cast("float")).drop("Total Revenue").drop(
"Order Priority", "Ship Date", "Unit Cost").distinct())
df_final.show(5)
df_final.createOrReplaceTempView('df_final_View')
spark.sql("select * from df_final_View").show(5)
df_final.repartition(2).write.mode("overwrite").save(
"s3://emrdemo-kbi0127/data/output/sales/sales_final_parquet") # 뭉쳐서 파티션을 두개로 나눈다.
df_final.write.partitionBy("region").parquet(
"s3://emrdemo-kbi0127/output/sales/sales_region_final_parquet") # 뭉쳐서 파티션을 두개로 나눈다.
실습 4. Hive 코드 예제
Hive: 동작시키는 엔진이 MR → Spark로 바뀌었다.
- Managed Table: 내부 테이블
- External Table: 일반적인 External과 살짝 다른 의미이다.
CREATE EXTERNAL TABLE ny_taxi_test (
vendor_id int,
lpep_pickup_datetime string,
lpep_dropoff_datetime string,
store_and_fwd_flag string,
rate_code_id smallint,
pu_location_id int,
do_location_id int,
passenger_count int,
trip_distance double,
fare_amount double,
mta_tax double,
tip_amount double,
tolls_amount double,
ehail_fee double,
improvement_surcharge double,
total_amount double,
payment_type smallint,
trip_type smallint
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION "s3://emrdemo-kbi0127/input/";
실습 5. Pig 코드 예제
DEFINE CSVLoader org.apache.pig.piggybank.storage.CSVLoader();
NY_TAXI = LOAD '$INPUT' USING CSVLoader(',') AS
(vendor_id:int,
lpep_pickup_datetime:chararray,
lpep_dropoff_datetime:chararray,
store_and_fwd_flag:chararray,
rate_code_id:int,
pu_location_id:int,
do_location_id:int,
passenger_count:int,
trip_distance:double,
fare_amount:double,
mta_tax:double,
tip_amount:double,
tolls_amount:double,
ehail_fee:double,
improvement_surcharge:double,
total_amount:double,
payment_type:int,
trip_type:int);
STORE NY_TAXI into '$OUTPUT' USING PigStorage('\t');정리하며
AWS EMR을 통해 Spark, Hive, Pig 등 다양한 빅데이터 처리 프레임워크를 클라우드 환경에서 손쉽게 실행할 수 있다. Hadoop은 HDFS를 통한 분산 스토리지와 YARN을 통한 리소스 관리를 핵심으로 하며, 최근에는 스토리지와 컴퓨팅 엔진을 분리하는 방향으로 발전하고 있다.
MLOps 오픈소스로는 Airflow(Airbnb, Uber 관리), Kubeflow(컨테이너 기반), MLflow(빅데이터 기반)가 대표적이며, ONNX를 통해 ML 모델을 C++ 또는 Java 등 다양한 언어로 변환할 수 있다.