-
[Aws + Spring] Spring Boot + Aws Glue + S3를 활용한 방문자 통계 구축 (3)Aws 2024. 1. 2. 22:49반응형
저번 포스팅에 이어서 AWS Glue Job을 생성하기 전에 AWS Glue에 대한 간략하게 설명을 하겠습니다.
🚀 AWS Glue 란 ?
AWS Glue는 데이터 ETL(Extract, Transform, Load) 작업을 수행하는 완전관리형 서비스로써, 데이터를 추출하고 변환한 후 다양한 저장소에 로드하는 데 사용됩니다. 이는 다양한 데이터 소스와 대상 간의 데이터 이동 및 변환을 쉽게 수행할 수 있게 해줍니다.
아래는 AWS Glue의 주요 개념에 대한 간단한 설명입니다.
- Job(잡):
- 잡은 AWS Glue에서 데이터 변환을 위해 실행되는 작업을 나타냅니다. PySpark 또는 Scala로 작성된 스크립트를 사용하여 데이터를 추출, 변환 및 로드할 수 있습니다. 여러 입력 및 출력을 처리하며, 이러한 작업들은 일정에 따라 주기적으로 실행할 수 있습니다.
- 스크립트 및 PySpark:
- Glue Job은 PySpark를 기반으로 동작하며, PySpark 스크립트를 사용하여 데이터 변환 작업을 정의합니다. 이 스크립트는 PySpark의 DataFrame API를 활용하여 데이터를 처리하고 변환합니다.
- Trigger(트리거):
- 트리거는 AWS Glue Job이 실행되는 시점을 지정하는 데 사용됩니다. 특정 이벤트가 발생하거나 주기적으로 Job을 실행하도록 구성할 수 있습니다.
- Scheduler(스케줄러):
- 스케줄러는 트리거된 Job을 주기적으로 실행하는 데 사용됩니다. 일정에 따라 주기적으로 Job을 실행할 수 있습니다.
각각의 개념을 조합하여, AWS Glue를 사용하여 S3 파일을 읽어와서 데이터를 변환하고, 그 결과를 Spring Boot Server에
주기적으로 전송함으로써 통계 기능을 구현할 수 있습니다. 이를 위해서는 AWS Glue Job에서 필요한 데이터 추출 및 변환 작업을
정의하고, 이를 주기적으로 실행할 트리거와 스케줄러를 설정해야 합니다.
궁극적으로, 해당 포스팅에서 구축하려는 방문자 통계 프로세스는 아래와 같습니다.
1. 사용자의 방문일을 세션에 저장합니다.
2. 세션에 방문일이 없거나 저장 된 방문일과 다른 날짜(이전 날)인 경우, 방문자의 디바이스 정보를 DB와 S3에 저장합니다.
3. 일일 방문 통계 테이블에 유니크한 오늘 일자의 데이터를 추가 하거나, 방문자 수를 업데이트 합니다.
4. 페이지뷰(PV)는 Rest API를 추가하여 Client Side에서 API를 호출합니다.
5. 회원가입 수는 회원가입 API 내부에 로직을 추가합니다.
4. AWS Glue 작업 및 스크립트 작성
먼저 Glue Job에서 사용할 사용자 역할을 생성합니다.
AWS Console에서 IAM > 역할 > 역할 생성 클릭하여 'Glue 사용 사례'를 선택합니다.다음 페이지에서 AmazonRDSDataFullAccess, AmazonS3ReadOnlyAccess, AWSGlueServiceRole 권한을 추가 합니다.
이후, 다음 페이지를 누르면 요약 내용이 위 사진처럼 확인이 된다면, 생성 해주시면 됩니다.
일일 디바이스 정보 통계 Glue Job 생성
먼저 디바이스 정보 통계를 합산하는 Glue Job을 생성 해보겠습니다.
Glue Visual ETL 페이지로 이동하여 Script editor를 클릭하고 Start fresh 를 선택합니다.
먼저 Script를 작성합니다. Script의 내용이 긴 관계로 아래 더보기를 참고 해주시기 바랍니다.더보기import boto3
import sys
import requests
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.functions import to_json
try:
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'date'])
except:
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
# Define your schema
schema = StructType([
StructField("dateTime", StringType(), False),
StructField("date", StringType(), False),
StructField("device", StringType(), False),
StructField("osFamily", StringType(), False),
])
# Read JSON files directly from S3
# Get the date from the arguments, if provided
date = args.get('date', None)
logger.info(f"date from args: {date}")
# If the date is not provided, set it to yesterday's date
if date is None:
yesterday = datetime.now() - timedelta(days=1)
date = yesterday.strftime('%Y-%m-%d')
bucket = 'Your Bucket Name'
s3_paths = [
f"s3://{bucket}/log/site-visit/{date}",
]
logger.info(f"Read S3 path is: {s3_paths}")
# Read data from multiple S3 paths
data_frames = []
for s3_path in s3_paths:
try:
data_frame = spark.read.schema(schema).json(s3_path)
data_frames.append(data_frame)
except:
logger.info(f"Skipped reading from {s3_path}")
# Union all the data frames
data_frame = data_frames[0]
for df in data_frames[1:]:
data_frame = data_frame.union(df)
logger.info(f"Record count after filtering: {data_frame.count()}")
# Count records per channelId
counts = data_frame.groupBy("date").agg( \
F.sum(F.when(F.col("osFamily").isin(["Mac OS X", "Windows", "Linux"]), 1).otherwise(0)).alias("desktopCount"), \
F.sum(F.when(F.col("osFamily").isin(["Android"]), 1).when(F.col("device") == "iPhone", 1).otherwise(0)).alias("mobileCount"), \
F.sum(F.when(F.col("device").isin(["iPad"]), 1).otherwise(0)).alias("tabletCount"), \
F.sum(F.when(~F.col("osFamily").isin(["Mac OS X", "Windows", "Linux", "iOS", "Android"]), 1).otherwise(0)).alias("unknownCount") \
)
json_counts = counts.select(to_json(F.struct("*")).alias("json"))
json_rows = json_counts.collect()
payload = [json.loads(row.json) for row in json_rows]
logger.info(f"Payload: {payload}")
url = f"https://Your Host/api/statistics/daily-device-stats"
response = requests.post(url, json=payload)
if response.status_code == 200:
logger.info("Data sent successfully")
else:
logger.error(f"Failed to send data: {response.status_code} - {response.text}")
sys.exit()
job.commit()이 스크립트는 주어진 날짜에 대한 log/site-visit S3 경로에서 JSON 파일을 읽어와, 운영체제 패밀리와 디바이스 타입에 따라 레코드를 필터링하고, 각 카테고리에 대한 레코드 수를 집계한 후, 해당 데이터를 외부 API로 전송하는 ETL 작업을 수행합니다.
Job details를 작성합니다.IAM Role은 위에서 생성했던 glue-logging-to-stats를 선택합니다.
ETL Job Type은 Spark를 선택하고 Language는 Python 3을 선택 하겠습니다.
Worker Type은 원하시는 유형을 선택 해주시면 됩니다.
Requested number of workers는 AWS Glue Job을 실행하는 데 할당하려는 워커(Workers)의 개수를 지정하는 옵션입니다.
여기서 워커는 AWS Glue Job이 데이터 추출, 변환, 로딩 (ETL) 작업을 수행하는 데 사용되는 병렬 처리 단위를 나타냅니다. 더 많은 워커를 할당하면 일반적으로 작업이 빨리 완료되지만, 추가 비용이 발생할 수 있습니다. 반면 적은 워커를 할당하면 비용은 줄어들지만 작업 속도가 감소할 수 있습니다.
Glue ETL 작업의 기본값은 2,880분(48시간)입니다. 따라서 Job Timeout은 10분으로 설정했으며,
S3에 해당 스크립트를 저장하기 위한 경로를 입력 합니다.
나머지는 Default 값을 설정하고, Temporary path를 지정합니다.
AWS Glue에서 Temporary Path(임시 경로)는 Glue Job 실행 중에 일시적으로 생성되는 데이터 및 작업에 대한 일시적인 저장소의 위치를 나타냅니다. 임시 경로는 Glue Job이 실행되는 동안 중간 결과, 캐시 파일 및 다른 임시 데이터를 저장하는 데 사용됩니다. Glue Job이 완료되면 이러한 임시 데이터는 자동으로 삭제됩니다.
Save를 눌러서 Job 추가를 완료합니다.
해당 Job은 매일 AM 12:00에 한번씩 실행 할 예정입니다.
Schedule을 추가하기 위해 다시 Visual ETL 페이지를 와서 생성한 Job을 클릭합니다.
Create schedule를 클릭하여 아래처럼 매일 오전 12시에 실행 되는 scheduler를 추가 합니다.
이제 방문자의 디바이스 정보 통계 Glue Job 설정이 끝났습니다.
일일 사이트 방문 통계 Glue Job 생성
이제 사이트 방문 통계를 위한 Glue Job을 추가 하겠습니다.
과정은 디바이스 정보 통계와 같으며, Script만 다릅니다. Script의 내용이 긴 관계로 아래 더보기를 참고 해주시기 바랍니다.더보기import boto3
import sys
import requests
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.functions import to_json
try:
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'date'])
except:
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()
# Define your schema
schema = StructType([
StructField("dateTime", StringType(), False),
StructField("date", StringType(), False),
StructField("type", StringType(), False),
])
# Read JSON files directly from S3
# Get the date from the arguments, if provided
date = args.get('date', None)
logger.info(f"date from args: {date}")
# If the date is not provided, set it to yesterday's date
if date is None:
yesterday = datetime.now() - timedelta(days=1)
date = yesterday.strftime('%Y-%m-%d')
bucket = 'Your Bucket Name'
s3_paths = [
f"s3://{bucket}/log/site-visit/{date}",
f"s3://{bucket}/log/site-signup/{date}",
f"s3://{bucket}/log/site-page-view/{date}",
]
logger.info(f"Read S3 path is: {s3_paths}")
# Read data from multiple S3 paths
data_frames = []
for s3_path in s3_paths:
try:
data_frame = spark.read.schema(schema).json(s3_path)
data_frames.append(data_frame)
except:
logger.info(f"Skipped reading from {s3_path}")
# Union all the data frames
data_frame = data_frames[0]
for df in data_frames[1:]:
data_frame = data_frame.union(df)
logger.info(f"Record count after filtering: {data_frame.count()}")
# Count records per channelId
counts = data_frame \
.groupBy("date") \
.agg( \
F.sum(F.when(F.col("type") == "VISIT", 1).otherwise(0)).alias("visitorCount"), \
F.sum(F.when(F.col("type") == "SIGN_UP", 1).otherwise(0)).alias("signUpCount"), \
F.sum(F.when(F.col("type") == "PAGE_VIEW", 1).otherwise(0)).alias("pageViewCount"), \
)
json_counts = counts.select(to_json(F.struct("*")).alias("json"))
json_rows = json_counts.collect()
payload = [json.loads(row.json) for row in json_rows]
logger.info(f"Payload: {payload}")
url = f"https://Your Host/api/statistics/daily-site-stats"
response = requests.post(url, json=payload)
if response.status_code == 200:
logger.info("Data sent successfully")
else:
logger.error(f"Failed to send data: {response.status_code} - {response.text}")
sys.exit()
job.commit()
이 스크립트는 site-visit / site-signup / site-page-view S3 Log 경로에서 JSON 파일을 읽어와
방문자 수, 가입자 수, 페이지 뷰 수를 집계한 후, 해당 데이터를 외부 API로 전송하는 ETL 작업을 수행합니다.Job Details와 scheduler도 위에서 생성한 일일 디바이스 정보 통계 Glue Job과 동일하게 설정하면 됩니다.
이렇게 일일 디바이스 정보 통계 , 일일 사이트 방문 통계에 대한 Glue Job 설정을 완료 했습니다.
반응형'Aws' 카테고리의 다른 글
[Aws] S3 파일 업로드 확장자 제한 및 에러 페이지 처리 - 웹 취약점 보안 (0) 2024.02.14 [AWS] EC2 Instance Loadbalancer (0) 2022.10.24 [AWS] EB Deploy Issue (Chrome) (0) 2022.10.24 - Job(잡):