안녕하세요 오늘은 BESPIN GLOBAL AI실 박성환님이 작성해주신 ‘AWS LAMBDA를 활용한 Account Helth Event 발생 정보 슬랙 전송(2)’에 대해 소개해드리도록 하겠습니다.
목차
- Lambda 함수 생성
- API GATEWAY생성
- Slack APP 설정
- 이벤트 관리테스트
지난 포스팅 ‘AWS LAMBDA를 활용한 EVENT발생 정보 슬랙 전송(1)’에 이어 발생된 이벤트를 슬랙에서 관리하도록 설정해보겠습니다.
1. Lambda 함수 생성
1.1 환경변수
- 이벤트발생 Lambda(z-psh-event-test)와 동일하게 환경변수를 설정합니다.
1.2 소스 코드
- Lambda명 z_psh_test2으로 아래와 같이 코드를 작성합니다.
import json
import os
from slack_sdk import WebClient
from urllib.parse import parse_qs
from io import StringIO
import requests
from base64 import b64decode
from tabulate import tabulate
import pandas as pd
from botocore.exceptions import ClientError
import boto3
from urllib.parse import unquote
SLACK_APP_TOKEN = os.environ['slackToken']
SLACK_CHANNEL = os.environ['slackChannel']
s3_bucket = os.environ['BUCKET']
s3_full_key = os.environ['FULL_KEY']
s3_client = boto3.client('s3')
def send_slack_message(token, channel, message):
client = WebClient(token=token)
try:
response = client.chat_postMessage(
channel=channel,
text=message
)
return response
except Exception as e:
return str(e)
def lambda_handler(event, context):
# Slack으로부터 전송된 데이터 파싱
body = parse_qs(event.get("body", ""))
command = body.get("command", [""])
command = command[0]
text_values = body.get('text', [""])[0].split()
s3_df_full = s3_df_full_download()
if command == "/update_event_complete":
if len(text_values) == 2:
text_values = ' '.join(text_values)
ID = int(text_values.split()[0])
complete = text_values.split()[1]
if (s3_df_full['ID'] == ID).any() and complete.upper() in ['Y', 'N']:
s3_df_full.loc[s3_df_full['ID'] == ID, 'Process_comple'] = complete.upper()
csv_file = '/tmp/result_full.csv'
s3_df_full.to_csv(csv_file, index=False, encoding='utf-8')
with open(csv_file, 'rb') as upload_file:
s3_client.upload_file(csv_file, s3_bucket, s3_full_key)
message = f"이벤트번호: {ID}, 완료여부(Y/N): {complete}"
response = send_slack_message(SLACK_APP_TOKEN, SLACK_CHANNEL, message)
else:
message = "TEST : ID&VALUE 순서로 사용가능합니다."
response = send_slack_message(SLACK_APP_TOKEN, SLACK_CHANNEL, message)
elif len(text_values) == 1:
event_list = ''.join(text_values)
if event_list == 'list':
# 수정된 부분: s3_df_full 변수에 s3_df_full_download 함수의 반환값을 할당
s3_df_full = s3_df_full[s3_df_full['Process_comple'] != 'Y']
message = "TEST : 미처리된 이벤트 내역입니다."
event_list_info(s3_df_full, message)
elif event_list == 'full_list':
message = "TEST : 전체 이벤트 내역입니다."
event_list_info(s3_df_full, message)
elif event_list == 'del_list':
filtered_df = s3_df_full[s3_df_full['Process_comple'] == 'Y']
print(filtered_df)
if not filtered_df.empty:
s3_df_full.drop(filtered_df.index, inplace=True)
csv_file = '/tmp/result_full.csv'
s3_df_full.to_csv(csv_file, index=False, encoding='utf-8')
with open(csv_file, 'rb') as upload_file:
s3_client.upload_file(csv_file, s3_bucket, s3_full_key)
message ="TEST : 완료된 이벤트정보 삭제 되었습니다."
response = send_slack_message(SLACK_APP_TOKEN, SLACK_CHANNEL, message)
else:
message ="TEST : 삭제대상의 이벤트정보가 없습니다."
response = send_slack_message(SLACK_APP_TOKEN, SLACK_CHANNEL, message)
else:
message = "TEST : [list], [full_list], [del_list] 만 사용가능합니다."
response = send_slack_message(SLACK_APP_TOKEN, SLACK_CHANNEL, message)
return {
'statusCode': 200,
'body': json.dumps({'message': 'Lambda function executed successfully'})
}
def s3_df_full_download():
try:
response = s3_client.head_object(Bucket=s3_bucket, Key=s3_full_key)
except ClientError as e:
if e.response['Error']['Code'] == '404':
response = None
raise
if response:
# S3에서 파일 다운로드
response = s3_client.get_object(Bucket=s3_bucket, Key=s3_full_key)
csv_content = response['Body'].read().decode('utf-8')
# 로컬에서 pandas DataFrame으로 읽기
s3_df_full = pd.read_csv(StringIO(csv_content))
return s3_df_full # 수정된 부분: DataFrame을 반환하도록 수정
def event_list_info(s3_df_full, message):
s3_df_full = s3_df_full.drop(columns=['arn', 'eventScopeCode', 'eventTypeCategory', 'latestDescription', 'entityValue'])
new_order = ['ID', 'service', 'eventTypeCode', 'Region', 'startTime', 'lastUpdatedTime', 'statusCode', 'deprecated_versions', 'Process_comple']
s3_df_full = s3_df_full.loc[:, new_order]
s3_df_full = s3_df_full.reset_index(drop=True)
result_text = tabulate(s3_df_full, headers='keys', tablefmt='pretty', stralign='left')
txt_file = '/tmp/result.txt'
with open(txt_file, 'w', encoding='utf-8') as file:
file.write(result_text)
slack_upload_url = "https://slack.com/api/files.upload"
headers = {
"Authorization": f"Bearer {SLACK_APP_TOKEN}"
}
# Slack API 업로드 매개변수
params = {
"channels": SLACK_CHANNEL,
"filename": txt_file,
"initial_comment": message
}
# 파일 업로드
with open(txt_file, "rb") as file_content:
files = {"file": (txt_file, file_content)}
response = requests.post(slack_upload_url, headers=headers, params=params, files=files)
2. API GATEWAY생성
2.1 API & Resource 생성
- Slack과 AWS Lambda함수통신을 위해 API Gateway를 사용하도록 합니다.
- HTTP API나 REST API를 사용하여 생성합니다.
- 생성된 API에 리소스생성 작업을 진행합니다.(리소스이름은 test로 생성)

2.2 Method 생성
- 생성된 API에 메서드 생성을 진행합니다.
- 메서드 생성 : POST, 통합유형 : Lambda 함수, Lambda 프록시 통합 : on, Lambda함수 : z_psh_test2
- 메서드를 생성하면 lambda의 트리거에 연결됩니다.
- 메서드의 통합요청을 선택하고 편집을하여 매핑탬플릿을 아래와 같이 설정합니다.

3. Slack APP 설정
- 메세지 전송시 사용했던 슬랙 app을 계속 사용 하도록 합니다.(z_psh_test_event)
- z_psh_test_event을 선택하고 메뉴에서 Slash Commands 선택하여 새로운 명령을 생성합니다.
- Command : /update_event_complete, Request URL : Lambda함수의 api gateway 트리거 API Endpoint 입력하고 생성합니다.

4. 이벤트 관리테스트

- 9:00 미처리 이벤트 내역 remind 메세지를 자동 발신합니다.

- 조치 완료 이벤트(AWS_EKS_PLANNED_LIFECYCLE_EVENT)

- 전체 이벤트 조회

- 조치 완료 이벤트(AWS_EKS_PLANNED_LIFECYCLE_EVENT) 삭제

- 전체 이벤트 조회

- 미처리 이벤트 조회

여기까지 ‘AWS LAMBDA를 활용한 Account Helth Event 발생 정보 슬랙 전송(2)’에 대해 소개해드렸습니다. 유익한 정보가 되셨길 바랍니다. 감사합니다.
Written by 박 성환
BESPIN GLOBAL