Cloud Trail 로그를 Splunk Enterprise로 적재하기
Object Storage에 저장된 Cloud Trail 로그를 Splunk Enterprise로 수집 및 분석하는 방법을 안내합니다.
- 예상 소요 시간: 40분
- 사용자 환경
- 권장 운영 체제: MacOS, Ubuntu
- Region: kr-central-2
- 사전 준비 사항
시나리오 소개
이 시나리오는 Object Storage에 저장된 Cloud Trail 로그를 Splunk Enterprise로 적재하는 방법을 자세히 설명합니다. 이를 통해 실시간 모니터링 및 분석 환경을 구축하고, 시스템 상태를 시각적으로 확인하며 이상 징후를 빠르게 감지해 장애에 신속하게 대응할 수 있습니다.
이 튜토리얼의 주요 내용은 다음과 같습니다.
- Splunk Enterprise 인스턴스 설정
- Splunk Universal Forwarder 에이전트 설정: Forwarder 에이전트를 설치하고, 로그 파일울 Splunk Enterprise로 포워딩하는 설정
- 로그 저장 자동화: Object Storage에 저장된 Cloud Trail 로그를 Forwarder 에이전트의 특정 디렉토리에 저장하는 자동 스크립트 작성
- 오류 발생 시, 누락된 로그를 자동으로 다운로드하도록 설정
- 백그라운드 프로세스 등록: 스크립트를 매시간 실행되는 백그라운드 프로세스로 등록
- Splunk Enterprise에 적재된 로그 확인
- Splunk Enterprise: 기업용 데이터 분석 플랫폼으로, 다양한 로그 데이터를 실시간으로 수집·분석하여 시스템 운영 현황을 모니터링하고 이상 징후를 신속하게 감지합니다.
- Splunk Universal Forwarder: 경량 데이터 수집 에이전트로, 원격 서버의 로그와 데이터를 수집하여 Splunk Indexer로 전송하는 역할을 합니다. 이를 활용하면 Object Storage에서 로그를 자동으로 가져와 Splunk Enterprise로 전송할 수 있으며, 이를 통해 실시간 분석 및 검색이 가능합니다.
아키텍처
시작하기 전에
Splunk 서버와 Forwarder 에이전트의 설정 및 로그 처리 환경을 구축하기 위해 필요한 사전 작업을 안내합니다.
1. Object Storage 버킷 생성 및 Cloud Trail 로그 저장 활성화
Cloud Trail 로그를 활용하려면 먼저 로그를 저장할 Object Storage 버킷을 생성해야 합니다. 이 버킷은 Cloud Trail 로그를 저장할 장소로 사용되며, 이후에 해당 데이터를 Splunk 서버로 전달하여 분석에 활용할 수 있습니다. 또한, Cloud Trail 로그가 Object Storage에 자동으로 저장되도록 Cloud Trail 로그 저장 기능 활성화 설정을 합니다.
2. 네트워크 환경 구축
Splunk Enterprise 서버와 Forwarder 에이전트 간 원활한 통신을 위한 VPC와 서브넷을 구성합니다.
VPC 및 서브넷: tutorial
-
카카오클라우드 콘솔 > Beyond Networking Service > VPC 메뉴로 이동합니다.
-
우측의 [+ VPC 생성] 버튼을 클릭한 후, 다음과 같이 VPC 및 서브넷을 생성합니다.
구분 항목 설정/입력값 VPC 정보 VPC 이름 tutorial VPC IP CIDR 블록 10.0.0.0/16 Availability Zone 가용 영역 개수 1 첫 번째 AZ kr-central-2-a 서브넷 설정 가용 영역당 퍼블릭 서브넷 개수 1 kr-central-2-a 퍼블릭 서브넷 IPv4 CIDR 블록: 10.0.0.0/20
-
하단에 생성되는 토폴로지를 확인 후, 이상이 없다면 [생성] 버튼을 클릭합니다.
- 서브넷의 상태는
Pending Create
>Pending Update
>Active
순서로 변경됩니다.Active
상태가 되어야 다음 단계를 진행할 수 있습니다.
- 서브넷의 상태는
3. 보안 그룹 설정
보안 그룹을 설정하면 외부 접근을 차단하고 필요한 트래픽만 허용하여 Splunk 서버와 Forwarder 에이전트 간 보안을 강화할 수 있습니다.
보안 그룹: tutorial-splunk-sg
-
카카오클라우드 콘솔 > VPC > 보안 그룹 메뉴로 이동합니다. 아래 표를 참조하여 보안 그룹을 생성합니다.
이름 설명(선택) tutorial-splunk-sg Splunk 서버의 보안정책 -
하단의 [+ 추가하기] 버튼을 클릭 후, 인바운드 조건을 아래와 같이 설정하고 [적용] 버튼을 클릭합니다.
추가할 인바운드 규칙 항목 설정값 splunk inbound policy 1 프로토콜 TCP
패킷 출발지 {사용자 공인 IP}/32
포트 번호 22 정책 설명(선택) SSH 접속 허용 splunk inbound policy 2 프로토콜 TCP
패킷 출발지 {사용자 공인 IP}/32
포트 번호 8000 정책 설명(선택) splunk enterprise web (관리 페이지) 접속 허용
보안 그룹: tutorial-forwarder-sg
-
카카오클라우드 콘솔 > VPC > 보안 그룹 메뉴로 이동합니다. 아래 표를 참조하여 보안 그룹을 생성합니다.
이름 설명(선택) tutorial-forwarder-sg Forwarder 서버의 보안정책 -
하단의 [+ 추가하기] 버튼을 클릭 후, 인바운드 조건을 아래와 같이 설정하고 [적용] 버튼을 클릭합니다.
추가할 인바운드 규칙 항목 설정값 forwarder inbound policy 1 프로토콜 TCP
패킷 출발지 {사용자 공인 IP}/32
포트 번호 22 정책 설명(선택) SSH 접속 허용
시작하기
Splunk Enterprise와 Forwarder를 활용하여 로그 데이터를 수집하고 분석하는 환경을 구성합니다. 각 단계는 Splunk 인스턴스 생성, Forwarder 설정, 자동 로그 전송 스크립트 작성으로 구성됩니다.
Step 1. Splunk 인스턴스 구성
Splunk Enterprise를 설치할 인스턴스를 생성하고, 초기 설정을 통해 로그 수집 및 분석을 위한 기본 환경을 구성합니다.
-
Splunk 공식 사이트에서 무료 평가판 Splunk Enterprise 라이센스를 다운로드합니다. 이번 실습에서는 Linux >
.tgz
파일의wget 링크 복사
를 클릭합니다. -
카카오클라우드 Virtual Machine 서비스에서 Splunk Enterprise 서버로 사용할 인스턴스를 생성합니다.
Splunk 인스턴스: tutorial-splunk
-
카카오클라우드 콘솔 > Beyond Compute Service > Virtual Machine 메뉴로 이동합니다.
-
아래 표의 항목과 값을 참조하여 Splunk Enterprise 서버의 역할을 할 VM 인스턴스를 생성합니다.
구분 항목 설정/입력값 비고 기본 정보 이름 tutorial-splunk 개수 1 이미지 Ubuntu 24.04 인스턴스 타입 m2a.large 볼륨 루트 볼륨 50 키 페어 {USER_KEYPAIR}
⚠️ 키 페어는 최초 1회 안전하게 보관해야 합니다.
잃어버린 키는 복구할 수 없으며, 재발급이 필요합니다.네트워크 VPC tutorial 보안 그룹 tutorial-splunk-sg
네트워크 인터페이스 1 새 인터페이스 서브넷 main (10.0.0.0/20) IP 할당 방식 자동 -
생성된 Splunk 인스턴스에 퍼블릭 IP를 연결합니다.
-
-
생성된 Splunk 인스턴스에 SSH 접속을 하여 아래 명령어를 통해 Splunk Enterprise를 설치합니다.
# 다운로드
1번에서 복사한 wget 명령어 입력
# 다운로드한 파일 압축 해제
tar xvzf splunk-9.4.0-6b4ebe426ca6-linux-amd64.tgz
# Splunk 서버 시작
sudo ./splunk/bin/splunk start --accept-license
# 이때, 로그인할 username과 password를 입력합니다.
# 이후 정상 출력 예시
Waiting for web server at http://127.0.0.1:8000 to be available............ Done
If you get stuck, we're here to help.
Look for answers here: http://docs.splunk.com
The Splunk web interface is at http://host-172-16-0-32:8000 -
브라우저에서
http://{SPLUNK_인스턴스_퍼블릭_IP}:8000
에 접속 후, Splunk 서버 시작 시 입력한 username과 password로 로그인합니다. -
접속한 Splunk Enterprise 페이지에서 설정 > 전달 및 수신 > 데이터 수신에서 새 수신 포트 버튼을 클릭하여 9997 포트를 생성합니다. (아래 그림 참고)
참고 그림
Step 2. Forwarder 인스턴스 구성
Object Storage에 저장된 로그를 쉽게 포워딩 하기 위해 Splunk Universal Forwarder 에이전트를 설치합니다. 이 튜토리얼에서는 해당 에이전트 서버를 Forwarder 인스턴스라고 하겠습니다.
-
Forwarder 인스턴스를 생성합니다.
Forwarder 인스턴스: tutorial-forwarder
-
카카오클라우드 콘솔 > Beyond Compute Service > Virtual Machine 메뉴로 이동합니다.
-
아래 표의 항목과 값을 참조하여 Forwarder 에이전트의 역할을 할 VM 인스턴스를 생성합니다.
구분 항목 설정/입력값 비고 기본 정보 이름 tutorial-forwarder 개수 1 이미지 Ubuntu 24.04 인스턴스 타입 m2a.large 볼륨 루트 볼륨 50 키 페어 {USER_KEYPAIR}
⚠️ 키 페어는 최초 1회 안전하게 보관해야 합니다.
잃어버린 키는 복구할 수 없으며, 재발급이 필요합니다.네트워크 VPC tutorial 보안 그룹 tutorial-forwarder-sg
네트워크 인터페이스 1 새 인터페이스 서브넷 main (10.0.0.0/20) IP 할당 방식 자동 -
생성된 Splunk 인스턴스에 퍼블릭 IP를 연결합니다.
-
생성된 Splunk 인스턴스에 SSH 접속을 합니다.
-
-
Forwarder 인스턴스에 SSH로 접속한 후, Splunk 공식 문서를 참고하여 Universal Forwarder 에이전트를 설치합니다.
-
아래의 설정을 통해 Splunk Enterprise 서버로 로그 데이터를 전송할 준비를 합니다.
# Splunk Enterprise로 데이터를 보낼 파일이 저장되어 있을 디렉토리 생성
sudo mkdir /home/ubuntu/cloudtrail/processed_data
# Splunk에서 해당 디렉토리 내의 로그 파일을 모니터링하도록 설정
sudo /home/ubuntu/splunkforwarder/bin/splunk add monitor /home/ubuntu/cloudtrail/processed_data/
# 모니터링하는 로그 데이터를 Splunk Enterprise 서버로 전송하도록 설정
sudo /home/ubuntu/splunkforwarder/bin/splunk add forward-server ${SPLUNK_PRIVATE_IP}:${SPLUNK_PORT}환경변수 설명 SPLUNK_PRIVATE_IP🖌︎ Splunk 인스턴스 Private IP SPLUNK_PORT🖌︎ Splunk 수신 포트 9997 팁- Splunk Universal Forwarder가 정상적으로 작동하는 경우,
processed_data
디렉토리에 저장된 모든 로그 파일은 자동으로 Splunk 서버로 전송됩니다. - 네트워크 문제로 Forwarder와 Splunk 서버 간의 통신이 일시적으로 끊길 수 있습니다. 이 경우 로그 파일이 서버로 전송되지 않거나 반영되지 않을 수 있습니다. 이때는
/splunkforwarder/var/log/splunk/splunkd.log
파일을 확인하여 통신이 끊긴 시간대를 파악할 수 있습니다. 해당 파일에는 Forwarder와 서버 간 연결 문제에 대한 상세 정보가 기록됩니다. - 만약 통신 장애로 인해 전송되지 않은 로그 파일이 있다면, 해당 로그 파일을 다시
processed_data
디렉토리에 복사하세요. Splunk Forwarder가 이를 자동으로 감지하여, 누락된 로그를 다시 서버로 전송합니다. - 네트워크 문제로 인해 로그가 유실되더라도, 원본 데이터를
processed_data
디렉토리에 보관하고 있다면 후속 조치로 복구가 가능합니다.
- Splunk Universal Forwarder가 정상적으로 작동하는 경우,
-
위에서 설정한 내용이 제대로 반영되었는지 확인합니다. 아래 파일은 Splunk Universal Forwarder의
outputs.conf
파일로, 데이터를 전송할 대상 서버에 대한 설정을 포함하고 있습니다.$ sudo cat /home/ubuntu/splunkforwarder/etc/system/local/outputs.conf
# 출력 예시
[tcpout]
defaultGroup = default-autolb-group
[tcpout:default-autolb-group]
server = ${SPLUNK_PRIVATE_IP}:${SPLUNK_PORT}
[tcpout-server://${SPLUNK_PRIVATE_IP}:${SPLUNK_PORT}환경변수 설명 SPLUNK_PRIVATE_IP🖌︎ Splunk 인스턴스 Private IP SPLUNK_PORT🖌︎ Splunk 수신 포트 9997 -
Splunk 인스턴스를 위한 보안 그룹(tutorial-splunk-sg)에 아래 인바운드 규칙을 추가합니다.
보안 그룹: tutorial-splunk-sg
-
카카오클라우드 콘솔 > VPC > 보안 그룹 메뉴로 이동합니다. 아래 표를 참조하여 보안 그룹을 생성합니다.
이름 설명(선택) tutorial-splunk-sg Splunk 서버의 보안정책 -
하단의 [+ 추가하기] 버튼을 클릭 후, 인바운드 규칙을 추가하고 [적용] 버튼을 클릭합니다.
추가할 인바운드 규칙 항목 설정값 splunk inbound policy 3 프로토콜 TCP
패킷 출발지 {Forwarder Server Private IP}/32
포트 번호 9997 정책 설명(선택) UF에서 로그를 수집하는 포트
-
Step 3. 자동 로그 저장 스크립트 작성
Cloud Trail의 로그 저장 기능을 사용하면, 매시간마다 로그가 Object Storage 버킷 내에 하나의 파일로 저장됩니다. 이 로그를 매시간 Splunk Universal Forwarder 에이전트로 읽어 Splunk Enterprise 서버로 전송하는 자동 스크립트를 작성할 수 있습니다.
- 최초 스크립트 실행 시, Object Storage의 로그 파일과 Forwarder 에이전트의 로컬 로그 파일을 비교하여 모든 파일을 다운로드합니다.
- 가장 최근에 수정된 파일을 Object Storage에서 다운로드하고 압축을 해제합니다.
- 압축 해제된 파일을 JSON 객체 리스트로 변환하여
processed_data
디렉토리에 저장한 후, 해당 JSON 객체 리스트를 Splunk로 이벤트 전송합니다. - 중간에 오류가 발생하면, 누락된 로그 파일을 자동으로 확인하고 다운로드합니다.
- 참고 사항: 2~4번 과정은 매시간 10분마다 반복됩니다.
-
Forwarder 인스턴스에 SSH로 접속하여 Python 환경을 구성합니다. 아래 명령어를 통해 Python 가상 환경을 생성하고, 필요한 라이브러리를 설치합니다.
# Python 가상환경 생성 및 활성화
$ sudo apt install python3-venv
$ python3 -m venv myenv
$ source myenv/bin/activate
# 필요한 Python 라이브러리 설치
$ pip3 install flask
$ pip3 install boto3
$ pip3 install subprocess32 -
자동 스크립트에서 AWS CLI를 사용하려면, 먼저 AWS CLI를 설치하고 설정합니다.
AWS CLI 설치$ sudo apt update
$ sudo apt install unzip
$ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64-2.15.41.zip" -o "awscliv2.zip"
$ unzip awscliv2.zip
$ sudo ./aws/install
$ aws --version
aws-cli/2.15.41 Python/3.11.8 Linux/6.8.0-39-generic exe/x86_64.ubuntu.24 prompt/off주의카카오클라우드에서 호환 가능한 S3 CLI의 최신 버전은
2.22.0
입니다. 이 보다 상위 버전을 사용할 경우, S3 명령 요청의 정상 작동이 보장되지 않을 수 있습니다. 본 가이드에서는2.15.41
버전을 설치하였습니다.사전 준비에서 S3 API 사용을 위해 발급받은 크리덴셜을 확인하고, 아래 명령어로 AWS CLI를 설정합니다.
$ aws configure
AWS Access Key ID: ${CREDENTIAL_ACCESS_KEY}
AWS Secret Access Key: ${CREDENTIAL_SECRET_ACCESS_KEY}
Default region name: kr-central-2
Default output format:환경변수 설명 CREDENTIAL_ACCESS_KEY🖌︎ S3 API 사용에 필요한 액세스 키 CREDENTIAL_SECRET_ACCESS_KEY🖌︎ S3 API 사용에 필요한 비밀 액세스 키 -
스크립트 파일을 엽니다.
스크립트 파일 열기sudo vi /home/ubuntu/cloudtrail/script.py
-
아래 스크립트의 변수를 수정하여 자동 스크립트를 작성합니다.
안내아래 스크립트에 작성된
IAM 엔드포인트 URL
과Object Storage 엔드포인트 URL
은 추후 프라이빗 엔드포인트가 제공될 경우, 변경하여 사용할 수 있습니다.import requests
import zipfile
import os
from dateutil import parser
import json
import subprocess
import sys
import datetime
import logging
import time
# 전역 변수 선언
BUCKET_NAME = ${BUCKET_NAME}
ACCESS_ID = ${ACCESS_KEY_ID}
SECRET_KEY = ${ACCESS_SECRET_KEY}
PROJECT_ID = ${PROJECT_ID}
# 로그 설정
logging.basicConfig(filename='/home/ubuntu/cloudtrail/process_log.log', level=logging.INFO)
LOG_FILE = "/home/ubuntu/cloudtrail/process_log.log"
# 로그 함수 정의 (시간 포함)
def log(level, message):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"{current_time} [{level}] {message}", file=open(LOG_FILE, "a"))
# API 토큰을 가져오는 함수
def get_access_token():
global ACCESS_ID
global SECRET_KEY
url = "https://iam.kakaocloud.com/identity/v3/auth/tokens"
headers = {'Content-Type': 'application/json'}
body = {
"auth": {
"identity": {
"methods": ["application_credential"],
"application_credential": {
"id": ACCESS_ID,
"secret": SECRET_KEY
}
}
}
}
try:
response = requests.post(url, json=body, headers=headers)
response.raise_for_status()
return response.headers['X-Subject-Token']
except requests.exceptions.RequestException as e:
logging.info(f"Error getting access token: {e}")
return None
# 로컬 파일 목록 가져오기 (compare)
def compare_get_local_zip_files(download_dir):
try:
local_files = [f for f in os.listdir(download_dir) if f.endswith('.zip')]
return local_files
except Exception as e:
print(f"Error getting local files: {e}")
return []
# Object Storage에서 .zip 파일 목록 가져오기 (compare)
def compare_fetch_api_zip_files(PROJECT_ID, api_token, BUCKET_NAME):
api_url = f"https://objectstorage.kr-central-2.kakaocloud.com/v1/{PROJECT_ID}/{BUCKET_NAME}?format=json"
headers = {"X-Auth-Token": api_token}
try:
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json()
file_paths = [item["name"] for item in data if item["name"].endswith(".zip")]
return file_paths
except requests.exceptions.RequestException as e:
print(f"Error fetching API data: {e}")
return []
# 다운로드 및 로그 기록 (compare)
def compare_download_file_from_objectstorage(BUCKET_NAME, object_name, file_name):
endpoint = "https://objectstorage.kr-central-2.kakaocloud.com"
download_dir = "/home/ubuntu/cloudtrail/"
if not os.path.exists(download_dir):
os.makedirs(download_dir)
full_file_path = os.path.join(download_dir, file_name)
try:
command = [
"aws",
"--endpoint-url", endpoint,
"s3",
"cp",
f"s3://{BUCKET_NAME}/{object_name}",
full_file_path
]
logging.info(f"Attempting to download from: s3://{BUCKET_NAME}/{object_name}")
result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
logging.info(f"Downloaded: {file_name}")
compare_decompress_and_process_file(full_file_path, file_name)
else:
logging.error(f"Error during download: {result.stderr}")
except subprocess.CalledProcessError as e:
logging.error(f"Subprocess failed: {e.stderr}")
return
except Exception as e:
logging.error(f"Unexpected error: {e}")
return
# 파일 압축 해제 후 JSON 변환 처리 (compare)
def compare_decompress_and_process_file(zip_file_path, file_name):
try:
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall('/home/ubuntu/cloudtrail/')
decompressed_file_name = file_name.rsplit('.', 1)[0]
json_list = compare_convert_to_json_list(f"/home/ubuntu/cloudtrail/{decompressed_file_name}")
if json_list is not None:
compare_save_json_to_file(json_list, file_name)
except zipfile.BadZipFile as e:
print(f"Error decompressing file {zip_file_path}: {e}")
except Exception as e:
print(f"Error processing file {zip_file_path}: {e}")
# 압축 해제된 파일을 읽고 JSON 객체 리스트로 변환 후 새로운 파일로 저장 (compare)
def compare_convert_to_json_list(decompressed_file_name):
json_list = []
try:
with open(decompressed_file_name, 'r') as file:
lines = file.readlines()
for line in lines:
try:
json_obj = json.loads(line)
json_list.append(json_obj)
except json.JSONDecodeError as e:
print(f"Error decoding line to JSON: {e}")
continue
return json_list
except Exception as e:
print(f"Error reading file {decompressed_file_name}: {e}")
return None
# JSON 리스트를 파일로 저장하는 함수 (별도 'processed_data' 디렉터리에 저장)
def compare_save_json_to_file(json_list, file_name):
output_dir = 'processed_data'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_file_name = os.path.join(output_dir, file_name.rsplit('.', 1)[0] + '.json')
try:
with open(output_file_name, 'w') as outfile:
json.dump(json_list, outfile, indent=4)
print(f"Saved the JSON list to {output_file_name}")
except Exception as e:
print(f"Error saving JSON to file {output_file_name}: {e}")
# 초기 다운로드 스크립트 시작 (compare)
def compare_for_download(api_token, local_download_dir):
global BUCKET_NAME
global PROJECT_ID
local_files = compare_get_local_zip_files(local_download_dir)
logging.info(f"로컬 디렉토리에서 가져온 .zip 파일들: {local_files}")
remote_files = compare_fetch_api_zip_files(PROJECT_ID, api_token, BUCKET_NAME)
logging.info(f"API에서 가져온 .zip 파일들: {remote_files}")
files_to_download = [f for f in remote_files if f.split('/')[-1] not in local_files]
logging.info(f"다운로드할 파일 목록: {files_to_download}")
for file in files_to_download:
file_name = file.split('/')[-1]
compare_download_file_from_objectstorage(BUCKET_NAME, file, file_name)
# JSON 리스트를 파일로 저장하는 함수 (별도 'processed_data' 디렉터리에 저장) (main)
def main_save_json_to_file(json_list, file_name):
output_dir = 'processed_data'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_file_name = os.path.join(output_dir, file_name.rsplit('.', 1)[0] + '.json')
try:
with open(output_file_name, 'w') as outfile:
json.dump(json_list, outfile, indent=4)
log("SUCCESS", {output_file_name})
except Exception as e:
error_check(f"Error saving JSON to file {output_file_name}: {e}")
# 가장 최근의 last_modified 값을 가진 파일 선택 (main)
def main_get_most_recent_file(files):
if not files:
error_check("No files found.")
most_recent_file = max(files, key=lambda x: parser.parse(x['last_modified']))
return most_recent_file
# 압축 해제된 파일을 읽고 JSON 객체 리스트로 변환 후 새로운 파일로 저장 (main)
def main_convert_to_json_list(decompressed_file_name):
json_list = []
try:
with open(decompressed_file_name, 'r') as file:
lines = file.readlines()
for line in lines:
try:
json_obj = json.loads(line)
json_list.append(json_obj)
except json.JSONDecodeError as e:
log(f"ERROR", f"Error decoding line to JSON: {e}")
continue
return json_list
except Exception as e:
error_check(f"Error reading file {decompressed_file_name}: {e}")
return None
# 파일 압축 해제 후 JSON 변환 처리 (main)
def main_decompress_and_process_file(zip_file_path, file_name):
try:
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall('/home/ubuntu/cloudtrail/')
decompressed_file_name = file_name.rsplit('.', 1)[0]
json_list = main_convert_to_json_list(f"/home/ubuntu/cloudtrail/{decompressed_file_name}")
if json_list:
main_save_json_to_file(json_list, file_name)
except zipfile.BadZipFile as e:
error_check(f"Error decompressing file {zip_file_path}: {e}")
except Exception as e:
error_check(f"Error processing file {zip_file_path}: {e}")
# object storage로부터 파일 다운로드 (main)
def main_download_file_from_objectstorage(BUCKET_NAME, object_name, file_name):
endpoint = "https://objectstorage.kr-central-2.kakaocloud.com"
download_dir = "/home/ubuntu/cloudtrail/"
if not os.path.exists(download_dir):
os.makedirs(download_dir)
full_file_path = os.path.join(download_dir, file_name)
try:
command = [
"aws",
"--endpoint-url", endpoint,
"s3",
"cp",
f"s3://{BUCKET_NAME}/{object_name}",
full_file_path
]
result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
log("INFO", {full_file_path})
main_decompress_and_process_file(full_file_path, file_name)
except subprocess.CalledProcessError as e:
error_check(f"Error occurred while downloading the file: {e.stderr}")
# Object Storage API를 통해 파일 목록 가져오기
def main_fetch_files_from_objectstorage(PROJECT_ID, BUCKET_NAME, access_token):
url = f'https://objectstorage.kr-central-2.kakaocloud.com/v1/{PROJECT_ID}/{BUCKET_NAME}?format=json'
headers = {
'X-Auth-Token': access_token
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
error_check(f"Error fetching files: {e}")
return []
# Zip 파일 압축 해제 (main)
def main_decompress_zip(file_name):
file_dir = os.path.dirname(file_name) # 압축 파일이 있는 디렉토리
decompressed_file_name = file_name.rstrip('.zip') # .zip 제거한 파일 이름
try:
# 압축 해제
with zipfile.ZipFile(file_name, 'r') as zip_ref:
zip_ref.extractall(file_dir) # 파일이 있는 디렉토리에 압축 해제
# 압축 해제 후 생성된 파일 경로
decompressed_file_path = os.path.join(file_dir, decompressed_file_name)
return decompressed_file_path
except Exception as e:
error_check(f"Error decompressing file: {e}")
# 파일 다운로드 및 압축 해제 후 경로 반환 (main)
def main_process_file_for_download(PROJECT_ID, api_token, BUCKET_NAME):
files = main_fetch_files_from_objectstorage(PROJECT_ID, BUCKET_NAME, api_token)
if not files:
error_check("No files found.")
most_recent_file = main_get_most_recent_file(files)
if not most_recent_file:
error_check("No files found.")
return None, None
object_name = most_recent_file['name']
file_name = object_name.split('/')[-1]
main_download_file_from_objectstorage(BUCKET_NAME, object_name, file_name)
if file_name.endswith('.zip'):
decompressed_file_name = main_decompress_zip(file_name)
if decompressed_file_name:
return decompressed_file_name, file_name
else:
error_check("Failed to decompress the file.")
else:
return file_name, None
# 메인 스크립트 (main)
def main_script(api_token):
global BUCKET_NAME
global PROJECT_ID
decompressed_file_name, original_file_name = main_process_file_for_download(PROJECT_ID, api_token, BUCKET_NAME)
if not decompressed_file_name:
return error_check("No file was processed.")
json_list = main_convert_to_json_list(decompressed_file_name)
if not json_list:
return error_check("No JSON data found.")
main_save_json_to_file(json_list, original_file_name)
# 에러 발생 시 호출되는 함수 (main)
def error_check(error_message):
global BUCKET_NAME
log("ERROR", error_message)
api_token = get_access_token()
local_download_dir = "/home/ubuntu/cloudtrail/"
BUCKET_NAME = BUCKET_NAME
compare_for_download(api_token, local_download_dir)
# 매시각 10분에 실행되도록 하기 위한 함수
def wait_until_next_hour_10():
now = datetime.datetime.now()
next_hour_10 = now.replace(minute=10, second=0, microsecond=0)
if now > next_hour_10:
next_hour_10 += datetime.timedelta(hours=1)
wait_time = (next_hour_10 - now).total_seconds()
log("INFO", f"다음 실행까지 {wait_time}초 대기 중...")
time.sleep(wait_time)
# 메인 함수
if __name__ == "__main__":
# 초기 다운로드 스크립트
api_token = get_access_token()
local_download_dir = "/home/ubuntu/cloudtrail/"
compare_for_download(api_token, local_download_dir)
# 기본 로그 저장 스크립트
log("INFO", "메인 스크립트 시작")
main_script(api_token)
log("INFO", "메인 스크립트 종료")
while True:
# 매시각 10분까지 대기
wait_until_next_hour_10()
# 10분마다 메인 스크립트 실행
log("INFO", "메인 스크립트 시작")
api_token = get_access_token()
main_script(api_token)
log("INFO", "메인 스크립트 종료")환경변수 설명 BUCKET_NAME🖌︎ Object Storage Bucket 이름 ACCESS_KEY_ID🖌︎ 사용자 액세스 키 ID 수정 ACCESS_SECRET_KEY🖌︎ 사용자 액세스 보안 키 수정 PROJECT_ID🖌︎ 프로젝트 ID
Step 4. 백그라운드 프로세스 실행
-
스크립트와 로그 파일에 적절한 권한을 추가합니다.
# 로그 파일 생성
sudo touch /home/ubuntu/cloudtrail/process_log.log
# 로그 파일 권한 설정
sudo chmod 666 /home/ubuntu/cloudtrail/process_log.log
# 스크립트 파일 권한 설정
sudo chmod +x /home/ubuntu/cloudtrail/script.py -
백그라운드 프로세스로 스크립트를 실행합니다.
nohup bash /home/ubuntu/cloudtrail/script.py > /dev/null 2>&1 &
-
결과를 확인합니다.
- 스크립트가 매시간 10분마다 최신 로그 파일을
/home/ubuntu/cloudtrail/processed_data
디렉토리에 json 형식으로 생성합니다. ls /home/ubuntu/cloudtrail/processed_data
명령어로 로그 파일이 생성되었는지 확인합니다.tail -f /home/ubuntu/cloudtrail/process_log.log
명령어로 스크립트 실행 상태를 확인하며, 로그 파일에 출력되는 내용을 확인합니다.
예시: /home/ubuntu/cloudtrail/process_log.log 파일 내용INFO:root:로컬 디렉토리에서 가져온 .zip 파일들: ['trail_XXXX-XX-XX-XX.zip', 'trail_XXXX-XX-XX-XX.zip', ...]
INFO:root:API에서 가져온 .zip 파일들: ['trail_XXXX-XX-XX-XX.zip', 'trail_XXXX-XX-XX-XX.zip', ...]
INFO:root:다운로드할 파일 목록: []
[INFO] 메인 스크립트 시작
[INFO] {'/home/ubuntu/cloudtrail/trail_XXXX-XX-XX-XX.zip'}
[SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
[SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
[INFO] 메인 스크립트 종료
[INFO] 다음 실행까지 453.866015초 대기 중...
[INFO] 메인 스크립트 시작
[INFO] {'/home/ubuntu/cloudtrail/trail_XXXX-XX-XX-XX.zip'}
[SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
[SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
[INFO] 메인 스크립트 종료
[INFO] 다음 실행까지 3597.749401초 대기 중...
[INFO] 메인 스크립트 시작
[INFO] {'/home/ubuntu/cloudtrail/trail_XXXX-XX-XX-XX.zip'}
[SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
[SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
[INFO] 메인 스크립트 종료
[INFO] 다음 실행까지 3598.177826초 대기 중... - 스크립트가 매시간 10분마다 최신 로그 파일을
Step 5. Splunk Enterprise에 적재된 로그 확인
Splunk Enterprise 웹 UI에 접속 및 로그인을 하여 Cloud Trail 로그를 확인합니다.
예시