안녕하세요 오늘은 BESPIN GLOBAL AI실 박성환님이 작성해주신 ‘AWS LAMBDA를 활용한 Account Helth Event 발생 정보 슬랙 전송(1)’ 에 대해 소개해드리도록 하겠습니다.
목차
- 개요
- 환경 설정 및 구성
- Lambda 코드 소스
- 결과
1. 개요
1.1 목적
- AWS 이벤트 발생 시 이벤트 정보를 빠르게 담당자에 전달 될 수 있는 기능 추가가 필요합니다.
- 모든 담당자가 사용 중인 슬랙에 이벤트 정보를 전달 및 슬랙 명령어를 통한 이벤트 정보를 관리합니다.
1.2 요구 사항
- AWS Health Dashboard의 Your account health 전달되는 이벤트를 담당자에게 전달합니다.
- AWS 이벤트 발생 정보 최대한 빠르게 담당자에 전달 될 수 있도록 합니다.
- 매일 09시에 처리 되지 않은 이벤트를 담당자에게 재 전송 하고 처리 완료된 이벤트는 재 전송하지 않습니다.
- 미 처리된 이벤트 정보는 담당자가 수시로 확인 가능토록 하고 처리 완료된 이벤트는 필요에 따라 영구 삭제 가능하지만 보관도 가능하도록 합니다.
2. 환경 설정 및 구성
2.1 Lambda 함수 생성
- 함수를 새로 작성합니다.
- 함수 이름을 지정합니다.
- 함수를 작성하는 사용할 언어를 선택합니다. – python으로 작성
- 함수를 실행하기 위해 사용되는 컴퓨터 프로세서의 유형을 결정합니다.
- 함수에 대한 Role을 설정합니다.
- 설정 완료되면 함수를 생성합니다.

2.2 Trigger 생성 및 설정
2.2.1 EventBridge Event pattern 트리거 생성
- EventBridge 추가된 내용은 무시합니다.( 처음 lambda 생성 시 EventBridge 없음)
- (1) 트리거를 추가 선택합니다.
- (2) 트리거 소스를 선택합니다. ⇒ EventBridge 선택
- (3) 새로운 rule을 선택합니다.
- (4) rule을 (트리거명) 설정합니다.
- (5) TYPE을 Event Parrern으로 설정합니다.
- (6) Event Pattern preview에서 (7) json 붙여넣기 후 (8) 추가 버튼으로 트리거를 생성합니다.

2.2.2 EventBridge Schedule expression 트리거 생성
- (1) 트리거 추가 선택 ~ (4) rule (트리거명) 설정까지는 동일 한 방법으로 진행합니다.
- rule “z_psh_event_remind” 으로 생성합니다.
- (5) Schedule expression을 선택합니다.
- (8) 추가 버튼으로 트리거를 생성합니다.
- z_psh_event_remind 규칙 설정
- Amazon EventBridge서비스 규칙
- 일정패턴을 반복일정으로 선택하고 이벤트 일정을 추가합니다. (매일 09시 수행되도록 설정)

2.3 계층 및 Layer 생성
- python개발을 위해 필요한 모듈을 설치하여 사용합니다.
- Lambda 계층은 추가 코드 또는 데이터를 포함하는 .zip 파일 아카이브. 계층에는 일반적으로 라이브러리 종속 항목, 사용자 지정 런타임 또는 구성 파일이 포함됩니다.
- AWS Lambda 서비스에서 계층을 선택하고 계층 생성 을 클릭 하여 생성 가능합니다.
2.3.1 계층 생성
- 계층 생성을 하기 위해서는 .zip 파일 아카이브파일이 필요합니다.
- 해당 작업을 위해 python의 tabulate, requests, pandas, slack_sdk 모듈을 설치합니다.
- Request 모듈 추가를 위해, 관련 모듈을 다운로드한다. 다운로드를 하였으면 zip 파일로 압축해줍니다.
- request version은 2.29 사용 : 최신 버전 사용 시 에러가 발생되어 2.29 사용하였습니다.
- windows PowerShell을 활용하여 모듈은 다운로드하고 압축합니다.
- 계층생성에서 계층이름, 파일을 업로드 (업로드 파일은 powershell로 압축한 request229.zip 지정합니다.

2.3.2 계층 추가 (Layer 생성)
- Lambda 함수의 코드 탭 맨 아래에 위치한 계층부분에 Add a Layre를 선택하여 Layer를 추가할 수 있습니다.
- Add a Layre를 이용하여 필요한 모듈 3개를 설치합니다.

2.4 Slack 설정
- Lambda 함수와 slack 연동을 위해 Slack Workspace가 필요합니다.
- 생성되어 있는 Slack Workspace를 사용해서 테스트를 진행합니다.
2.4.1 신규 Slack app 생성
- slack api 사이트에 접속합니다. (https://api.slack.com/)
- Your Apps -> Create New App -> From Scratch 순으로 선택합니다.
- 생성할 app이름과 Workspace를 선택하면 기본정보로 이동됩니다.

2.4.2 Incoming Webhooks 설정
- slack과 Lambda연동은 Incoming Webhooks으로 진행함으로 슬랙 api 메뉴에 있는 Incoming Webhooks을 활성화합니다.
- Incoming Webhooks을 활성화하면 Add New Webhook to Workspace 버튼이 생성됩니다. -> 버튼 클릭
- 채널 선택 화면에서 사용할 채널을 선택하고 허용합니다.
- 이전화면으로 돌아오면 Webhooks URL이 하나 생성됩니다. (lambda에서 사용 예정)

2.4.3 OAuth & Permissions 설정
- slack api 메뉴에서 OAuth & Permissions 선택하고 Bot User Oauth Token을 확인합니다. (lambda 환경변수에서 사용)
- Scopes에서 생성한 slack app 권한을 부여합니다. (추후 chat기능을 사용할 계획임으로 chat 권한도 추가)
- 설정 완료 후
로 재 배포를 진행합니다.

2.4.4 slack app 추가
- slack application의 앱에서 관리 -> 앱찾아보기 -> 생성한 app 선택합니다.
- slack 메세지전송이 가능하도록 slack api 메뉴에서 App Home에 있는 Messages Tab ON
- Slash commands를 사용할 계획이니 Allow users to send Slash commands and messages from the messages tab 체크합니다.

2.4.5 s3 bucket 설정
- 사용하고자 하는 s3의 bucket을 추가하고 권한을 부여합니다.(z_psh_event_test를 사용)
2.4.6 lambda 환경변수 설정
- lambda 함수의 구성탭에서 환경변수를 추가합니다.
- BUCKET : S3 bucket 명
- FULL_KEY : S3 bucket에 저장될 파일명
- HookUrl : 2.4.2에서 확인한 Webhooks URL
- KEY : 사용하지 않음
- slackChannel: 메세지를 주고받을 슬랙 채널명
- slackToken : 2.4.2에서 확인한 Bot User Oauth Token

3. Lambda 코드 소스
3.1 테스트 코드 생성
- Lambda 함수의 테스트 탭을 선택하면 이벤트 json을 저장하면 테스트 가능합니다. (저장 후 사용)

- 테스트 JSON 전문
{
"version": "0",
"id": "121345678-1234-1234-1234-123456789012",
"detail-type": "AWS Health Event",
"source": "aws.health",
"account": "123456789012",
"time": "2022-06-03T06:27:57Z",
"region": "ap-northeast-2",
"resources": [
"i-abcd1111"
],
"detail": {
"eventArn": "arn:aws:health:ap-northeast-2::event/RDS/AWS_RDS_OPERATIONAL_NOTIFICATION/AWS_RDS_OPERATIONAL_NOTIFICATION_4e5bf2fdf79629da628449486dccea0ff10989e2a30424b51236ffa9f2100bcb",
"service": "RDS",
"eventTypeCode": "AWS_RDS_OPERATIONAL_NOTIFICATION",
"eventTypeCategory": "accountNotification",
"eventScopeCode": "ACCOUNT_SPECIFIC",
"communicationId": "01b0993207d81a09dcd552ebd1e633e36cf1f09a-1",
"startTime": "Fri, 3 Jun 2022 05:01:10 GMT",
"endTime": "Fri, 3 Jun 2022 05:30:57 GMT",
"statusCode": "open",
"eventRegion": "ap-northeast-2",
"eventDescription": [
{
"language": "en_US",
"latestDescription": "A description of the event will be provided here"
}
],
"affectedEntities": [
{
"entityValue": "i-abcd1111"
},
{
"entityValue": "i-abcd2222"
}
],
"page": "1",
"totalPages": "1",
"affectedAccount": "123456789012"
}
}
3.2 Lambda 코드 작성
- Lambda 코드는 아래와 같이 작성합니다.
import boto3
import json
import os
import pandas as pd
from datetime import datetime as dt, timezone, timedelta
from dateutil.relativedelta import relativedelta
from dateutil.tz import tzlocal
from io import StringIO
import requests
from base64 import b64decode
from tabulate import tabulate
from botocore.exceptions import ClientError
ENCRYPTED_HOOK_URL = os.environ['HookUrl']
SLACK_CHANNEL = os.environ['slackChannel']
SLACK_TOKEN = os.environ['slackToken']
s3_bucket = os.environ['BUCKET']
s3_full_key = os.environ['FULL_KEY']
s3_client = boto3.client('s3')
health_client = boto3.client('health')
elasticache_client = boto3.client('elasticache')
def lambda_handler(event, context):
# 매일 09시remind 메세지 전달
if (
'source' in event and event['source'] == 'aws.events' and
'detail-type' in event and event['detail-type'] == 'Scheduled Event'
):
event_remind()
else:
describe_health_event(event)
def describe_health_event(event):
latestDescription = event['detail']['eventDescription'][0]['latestDescription'][:150]
entityValue = [entity['entityValue'].split(':')[-1] for entity in event['detail']['affectedEntities']]
# entityValue = event['detail']['affectedEntities'][0]['entityValue']
# 문자열 제거
latestDescription = latestDescription.replace('English follows Korean | 한국어버전 뒤에 영어버전이 있습니다', '')
# 이벤트 정보 담을 리스트 생성
event_list = []
# 이벤트 정보 항목
event_data = {
'arn': event['detail'].get('eventArn', ''),
'service': event['detail'].get('service', ''),
'eventTypeCode': event['detail'].get('eventTypeCode', ''),
'eventTypeCategory': event['detail'].get('eventTypeCategory', ''),
'eventScopeCode': event['detail'].get('eventScopeCode', ''),
'startTime': event['detail'].get('startTime', ''),
'endTime': event['detail'].get('endTime', ''),
'statusCode': event['detail'].get('statusCode', ''),
'Region': event['detail'].get('eventRegion', ''),
'latestDescription': latestDescription,
'entityValue': entityValue
}
# 이벤트 정보 리스트에 추가
event_list.append(event_data)
df = pd.DataFrame(event_list)
# startTime 열을 datetime 형식으로 변환
df['startTime'] = pd.to_datetime(df['startTime'])
df['startTime'] = df['startTime'].dt.strftime("%Y-%m-%d %H:%M:%S")
df = describe_health_event_details(df)
df.loc[:, 'Process_comple'] = 'N'
result_fullfile_process(df)
def describe_health_event_details(df):
# 이벤트 정보 담을 리스트 생성
event_detail_list = []
arn = df['arn'].iloc[0]
# 이벤트의 ARN을 사용하여 이벤트의 상세 정보 가져오기
event_details = health_client.describe_event_details(eventArns=[arn])
for event_details in event_details.get('successfulSet', []):
event_data = event_details.get('event', {})
eventMetadata_data = event_details.get('eventMetadata', {})
event_details = {
'arn': event_data.get('arn', 'Unknown'),
'lastUpdatedTime': event_data.get('lastUpdatedTime', 'Unknown'),
'deprecated_versions': eventMetadata_data.get('deprecated_versions', '')
}
# 이벤트 정보를 리스트에 추가
event_detail_list.append(event_details)
df2 = pd.DataFrame(event_detail_list)
df2['lastUpdatedTime'] = pd.to_datetime(df2['lastUpdatedTime'])
df2['lastUpdatedTime'] = df2['lastUpdatedTime'].dt.strftime("%Y-%m-%d %H:%M:%S")
df = pd.merge(df, df2, on='arn', how='outer')
return df
def result_fullfile_process(df):
df_full = df.copy()
try:
response = s3_client.head_object(Bucket=s3_bucket, Key=s3_full_key)
except ClientError as e:
if e.response['Error']['Code'] == '404':
response = None # 파일이 없으면 response를 None으로 설정
else:
raise # 다른 예외는 다시 발생시킴
if response:
# S3에서 파일 다운로드
response = s3_client.get_object(Bucket=s3_bucket, Key=s3_full_key)
csv_content = response['Body'].read().decode('utf-8')
# 로컬에서 pandas DataFrame으로 읽기
s3_df_full = pd.read_csv(StringIO(csv_content))
# 'arn'을 비교하여 차이가 있는 경우에만 새로운 데이터 추가
if not s3_df_full['arn'].isin(df_full['arn']).any():
df_full['ID'] = s3_df_full['ID'].max() + 1
df['ID'] = df_full['ID'].copy()
s3_df_full = pd.concat([s3_df_full, df_full])
csv_file = '/tmp/result_full.csv'
# CSV 파일로 저장
s3_df_full.to_csv(csv_file, index=False, encoding='utf-8')
with open(csv_file, 'rb') as upload_file:
s3_client.upload_fileobj(upload_file, s3_bucket, s3_full_key)
else:
matching_id = s3_df_full.loc[s3_df_full['arn'].isin(df_full['arn']), 'ID'].iloc[0]
df['ID'] = matching_id
else:
csv_file = '/tmp/result_full.csv'
# CSV 파일로 저장
df_full['ID'] = 1
df['ID'] = df_full['ID'].copy()
df_full.to_csv(csv_file, index=False, encoding='utf-8')
with open(csv_file, 'rb') as upload_file:
s3_client.upload_fileobj(upload_file, s3_bucket, s3_full_key)
result_file_create(df)
def result_file_create(df):
df = df.drop(columns = ['arn'])
new_order = ['ID', 'service', 'eventTypeCode', 'eventScopeCode', 'eventTypeCategory', 'Region', 'startTime', 'lastUpdatedTime', 'statusCode',
'deprecated_versions', 'entityValue', 'Process_comple', 'latestDescription']
df = df.loc[:, new_order]
df = df.T
df.columns = ['Event_Info']
df.index.name = 'Category'
result_text = tabulate(df, headers='keys', tablefmt='pretty', stralign='left')
txt_file = '/tmp/result.txt'
with open(txt_file, 'w', encoding='utf-8') as file:
file.write(result_text)
message = "TEST : 새로운 이벤트가 발생되었습니다.."
send_file_to_slack(txt_file, message)
def send_file_to_slack(txt_file, message):
# Slack API 업로드 URL
slack_upload_url = "https://slack.com/api/files.upload"
# Slack API 헤더
headers = {
"Authorization": f"Bearer {SLACK_TOKEN}"
}
# Slack API 업로드 매개변수
params = {
"channels": SLACK_CHANNEL,
# "filename": s3_key1,
"filename": txt_file,
"initial_comment": message
}
# 파일 업로드
with open(txt_file, "rb") as file_content:
files = {"file": (txt_file, file_content)}
response = requests.post(slack_upload_url, headers=headers, params=params, files=files)
# 응답 확인
if response.status_code == 200 and response.json().get("ok"):
print("파일이 Slack에 성공적으로 업로드되었습니다.")
else:
print(f"업로드 실패: {response.json()}")
def event_remind():
try:
response = s3_client.head_object(Bucket=s3_bucket, Key=s3_full_key)
except ClientError as e:
if e.response['Error']['Code'] == '404':
response = None # 파일이 없으면 response를 None으로 설정
else:
raise # 다른 예외는 다시 발생시킴
if response:
# S3에서 파일 다운로드
response = s3_client.get_object(Bucket=s3_bucket, Key=s3_full_key)
csv_content = response['Body'].read().decode('utf-8')
# 로컬에서 pandas DataFrame으로 읽기
s3_remind_df = pd.read_csv(StringIO(csv_content))
s3_remind_df = s3_remind_df[s3_remind_df['Process_comple'] != 'Y']
s3_remind_df = s3_remind_df.drop(columns = ['arn', 'eventScopeCode', 'eventTypeCategory', 'latestDescription','entityValue'])
new_order = ['ID', 'service', 'eventTypeCode', 'Region', 'startTime', 'lastUpdatedTime', 'statusCode', 'deprecated_versions', 'Process_comple']
s3_remind_df = s3_remind_df.loc[:, new_order]
s3_remind_df = s3_remind_df.reset_index(drop=True)
result_text = tabulate(s3_remind_df, headers='keys', tablefmt='pretty', stralign='left')
# 저장할 파일 경로 정의
txt_file = '/tmp/result.txt'
# 텍스트를 파일로 저장
with open(txt_file, 'w', encoding='utf-8') as file:
file.write(result_text)
message = "TEST : 미처리된 이벤트 내역입니다."
else:
message = "TEST : 미처리된 이벤트는 없습니다."
send_file_to_slack(txt_file, message)
4. 결과
4.1 이벤트 발생 시
- 이벤트 발생 시 아래와 같이 슬랙에 전달됩니다.

- 매일 09시에는 처리되지 이벤트 정보를 전달합니다.

여기까지 ‘AWS LAMBDA를 활용한 Account Helth Event 발생 정보 슬랙 전송(1)’에 대해 소개해드렸습니다. 유익한 정보가 되셨길 바랍니다. 감사합니다.
Written by 박 성환
BESPIN GLOBAL