Skip to main content

Cloud Trail 로그를 Splunk Enterprise로 적재하기

Object Storage에 저장된 Cloud Trail 로그를 Splunk Enterprise로 수집 및 분석하는 방법을 안내합니다.

안내

시나리오 소개

이 시나리오는 Object Storage에 저장된 Cloud Trail 로그를 Splunk Enterprise로 적재하는 방법을 자세히 설명합니다. 이를 통해 실시간 모니터링 및 분석 환경을 구축하고, 시스템 상태를 시각적으로 확인하며 이상 징후를 빠르게 감지해 장애에 신속하게 대응할 수 있습니다.

이 튜토리얼의 주요 내용은 다음과 같습니다.

  1. Splunk Enterprise 인스턴스 설정
  2. Splunk Universal Forwarder 에이전트 설정: Forwarder 에이전트를 설치하고, 로그 파일울 Splunk Enterprise로 포워딩하는 설정
  3. 로그 저장 자동화: Object Storage에 저장된 Cloud Trail 로그를 Forwarder 에이전트의 특정 디렉토리에 저장하는 자동 스크립트 작성
    • 오류 발생 시, 누락된 로그를 자동으로 다운로드하도록 설정
  4. 백그라운드 프로세스 등록: 스크립트를 매시간 실행되는 백그라운드 프로세스로 등록
  5. Splunk Enterprise에 적재된 로그 확인
Splunk 서비스 소개
  • Splunk Enterprise: 기업용 데이터 분석 플랫폼으로, 다양한 로그 데이터를 실시간으로 수집·분석하여 시스템 운영 현황을 모니터링하고 이상 징후를 신속하게 감지합니다.
  • Splunk Universal Forwarder: 경량 데이터 수집 에이전트로, 원격 서버의 로그와 데이터를 수집하여 Splunk Indexer로 전송하는 역할을 합니다. 이를 활용하면 Object Storage에서 로그를 자동으로 가져와 Splunk Enterprise로 전송할 수 있으며, 이를 통해 실시간 분석 및 검색이 가능합니다.

architecture 아키텍처

시작하기 전에

Splunk 서버와 Forwarder 에이전트의 설정 및 로그 처리 환경을 구축하기 위해 필요한 사전 작업을 안내합니다.

1. Object Storage 버킷 생성 및 Cloud Trail 로그 저장 활성화

Cloud Trail 로그를 활용하려면 먼저 로그를 저장할 Object Storage 버킷을 생성해야 합니다. 이 버킷은 Cloud Trail 로그를 저장할 장소로 사용되며, 이후에 해당 데이터를 Splunk 서버로 전달하여 분석에 활용할 수 있습니다. 또한, Cloud Trail 로그가 Object Storage에 자동으로 저장되도록 Cloud Trail 로그 저장 기능 활성화 설정을 합니다.

2. 네트워크 환경 구축

Splunk Enterprise 서버와 Forwarder 에이전트 간 원활한 통신을 위한 VPC와 서브넷을 구성합니다.

VPC 및 서브넷: tutorial
  1. 카카오클라우드 콘솔 > Beyond Networking Service > VPC 메뉴로 이동합니다.

  2. 우측의 [+ VPC 생성] 버튼을 클릭한 후, 다음과 같이 VPC 및 서브넷을 생성합니다.

    구분항목설정/입력값
    VPC 정보VPC 이름tutorial
    VPC IP CIDR 블록10.0.0.0/16
    Availability Zone가용 영역 개수1
    첫 번째 AZkr-central-2-a
    서브넷 설정가용 영역당 퍼블릭 서브넷 개수1
    kr-central-2-a

    퍼블릭 서브넷 IPv4 CIDR 블록: 10.0.0.0/20

  3. 하단에 생성되는 토폴로지를 확인 후, 이상이 없다면 [생성] 버튼을 클릭합니다.

    • 서브넷의 상태는 Pending Create > Pending Update > Active 순서로 변경됩니다. Active 상태가 되어야 다음 단계를 진행할 수 있습니다.

3. 보안 그룹 설정

보안 그룹을 설정하면 외부 접근을 차단하고 필요한 트래픽만 허용하여 Splunk 서버와 Forwarder 에이전트 간 보안을 강화할 수 있습니다.

보안 그룹: tutorial-splunk-sg
  1. 카카오클라우드 콘솔 > VPC > 보안 그룹 메뉴로 이동합니다. 아래 표를 참조하여 보안 그룹을 생성합니다.

    이름설명(선택)
    tutorial-splunk-sgSplunk 서버의 보안정책
  2. 하단의 [+ 추가하기] 버튼을 클릭 후, 인바운드 조건을 아래와 같이 설정하고 [적용] 버튼을 클릭합니다.

    추가할 인바운드 규칙항목설정값
    splunk inbound policy 1프로토콜TCP
    패킷 출발지{사용자 공인 IP}/32
    포트 번호22
    정책 설명(선택)SSH 접속 허용
    splunk inbound policy 2프로토콜TCP
    패킷 출발지{사용자 공인 IP}/32
    포트 번호8000
    정책 설명(선택)splunk enterprise web (관리 페이지) 접속 허용
보안 그룹: tutorial-forwarder-sg
  1. 카카오클라우드 콘솔 > VPC > 보안 그룹 메뉴로 이동합니다. 아래 표를 참조하여 보안 그룹을 생성합니다.

    이름설명(선택)
    tutorial-forwarder-sgForwarder 서버의 보안정책
  2. 하단의 [+ 추가하기] 버튼을 클릭 후, 인바운드 조건을 아래와 같이 설정하고 [적용] 버튼을 클릭합니다.

    추가할 인바운드 규칙항목설정값
    forwarder inbound policy 1프로토콜TCP
    패킷 출발지{사용자 공인 IP}/32
    포트 번호22
    정책 설명(선택)SSH 접속 허용

시작하기

Splunk Enterprise와 Forwarder를 활용하여 로그 데이터를 수집하고 분석하는 환경을 구성합니다. 각 단계는 Splunk 인스턴스 생성, Forwarder 설정, 자동 로그 전송 스크립트 작성으로 구성됩니다.

Step 1. Splunk 인스턴스 구성

Splunk Enterprise를 설치할 인스턴스를 생성하고, 초기 설정을 통해 로그 수집 및 분석을 위한 기본 환경을 구성합니다.

  1. Splunk 공식 사이트에서 무료 평가판 Splunk Enterprise 라이센스를 다운로드합니다. 이번 실습에서는 Linux > .tgz 파일의 wget 링크 복사를 클릭합니다.

  2. 카카오클라우드 Virtual Machine 서비스에서 Splunk Enterprise 서버로 사용할 인스턴스를 생성합니다.

    Splunk 인스턴스: tutorial-splunk
    1. 카카오클라우드 콘솔 > Beyond Compute Service > Virtual Machine 메뉴로 이동합니다.

    2. 아래 표의 항목과 값을 참조하여 Splunk Enterprise 서버의 역할을 할 VM 인스턴스를 생성합니다.

      구분항목설정/입력값비고
      기본 정보이름tutorial-splunk
      개수1
      이미지Ubuntu 24.04
      인스턴스 타입m2a.large
      볼륨루트 볼륨50
      키 페어{USER_KEYPAIR}⚠️ 키 페어는 최초 1회 안전하게 보관해야 합니다.
      잃어버린 키는 복구할 수 없으며, 재발급이 필요합니다.
      네트워크VPCtutorial
      보안 그룹tutorial-splunk-sg
      네트워크 인터페이스 1새 인터페이스
      서브넷main (10.0.0.0/20)
      IP 할당 방식자동
    3. 생성된 Splunk 인스턴스에 퍼블릭 IP를 연결합니다.

  3. 생성된 Splunk 인스턴스에 SSH 접속을 하여 아래 명령어를 통해 Splunk Enterprise를 설치합니다.

    # 다운로드
    1번에서 복사한 wget 명령어 입력

    # 다운로드한 파일 압축 해제
    tar xvzf splunk-9.4.0-6b4ebe426ca6-linux-amd64.tgz

    # Splunk 서버 시작
    sudo ./splunk/bin/splunk start --accept-license
    # 이때, 로그인할 username과 password를 입력합니다.

    # 이후 정상 출력 예시
    Waiting for web server at http://127.0.0.1:8000 to be available............ Done


    If you get stuck, we're here to help.
    Look for answers here: http://docs.splunk.com

    The Splunk web interface is at http://host-172-16-0-32:8000
  4. 브라우저에서 http://{SPLUNK_인스턴스_퍼블릭_IP}:8000에 접속 후, Splunk 서버 시작 시 입력한 username과 password로 로그인합니다.

  5. 접속한 Splunk Enterprise 페이지에서 설정 > 전달 및 수신 > 데이터 수신에서 새 수신 포트 버튼을 클릭하여 9997 포트를 생성합니다. (아래 그림 참고) port_setting 참고 그림

Step 2. Forwarder 인스턴스 구성

Object Storage에 저장된 로그를 쉽게 포워딩 하기 위해 Splunk Universal Forwarder 에이전트를 설치합니다. 이 튜토리얼에서는 해당 에이전트 서버를 Forwarder 인스턴스라고 하겠습니다.

  1. Forwarder 인스턴스를 생성합니다.

    Forwarder 인스턴스: tutorial-forwarder
    1. 카카오클라우드 콘솔 > Beyond Compute Service > Virtual Machine 메뉴로 이동합니다.

    2. 아래 표의 항목과 값을 참조하여 Forwarder 에이전트의 역할을 할 VM 인스턴스를 생성합니다.

      구분항목설정/입력값비고
      기본 정보이름tutorial-forwarder
      개수1
      이미지Ubuntu 24.04
      인스턴스 타입m2a.large
      볼륨루트 볼륨50
      키 페어{USER_KEYPAIR}⚠️ 키 페어는 최초 1회 안전하게 보관해야 합니다.
      잃어버린 키는 복구할 수 없으며, 재발급이 필요합니다.
      네트워크VPCtutorial
      보안 그룹tutorial-forwarder-sg
      네트워크 인터페이스 1새 인터페이스
      서브넷main (10.0.0.0/20)
      IP 할당 방식자동
    3. 생성된 Splunk 인스턴스에 퍼블릭 IP를 연결합니다.

    4. 생성된 Splunk 인스턴스에 SSH 접속을 합니다.

  2. Forwarder 인스턴스에 SSH로 접속한 후, Splunk 공식 문서를 참고하여 Universal Forwarder 에이전트를 설치합니다.

  3. 아래의 설정을 통해 Splunk Enterprise 서버로 로그 데이터를 전송할 준비를 합니다.

    # Splunk Enterprise로 데이터를 보낼 파일이 저장되어 있을 디렉토리 생성
    sudo mkdir /home/ubuntu/cloudtrail/processed_data

    # Splunk에서 해당 디렉토리 내의 로그 파일을 모니터링하도록 설정
    sudo /home/ubuntu/splunkforwarder/bin/splunk add monitor /home/ubuntu/cloudtrail/processed_data/

    # 모니터링하는 로그 데이터를 Splunk Enterprise 서버로 전송하도록 설정
    sudo /home/ubuntu/splunkforwarder/bin/splunk add forward-server ${SPLUNK_PRIVATE_IP}:${SPLUNK_PORT}
    환경변수설명
    SPLUNK_PRIVATE_IP🖌 Splunk 인스턴스 Private IP
    SPLUNK_PORT🖌 Splunk 수신 포트 9997
    tip
    • Splunk Universal Forwarder가 정상적으로 작동하는 경우, processed_data 디렉토리에 저장된 모든 로그 파일은 자동으로 Splunk 서버로 전송됩니다.
    • 네트워크 문제로 Forwarder와 Splunk 서버 간의 통신이 일시적으로 끊길 수 있습니다. 이 경우 로그 파일이 서버로 전송되지 않거나 반영되지 않을 수 있습니다. 이때는 /splunkforwarder/var/log/splunk/splunkd.log 파일을 확인하여 통신이 끊긴 시간대를 파악할 수 있습니다. 해당 파일에는 Forwarder와 서버 간 연결 문제에 대한 상세 정보가 기록됩니다.
    • 만약 통신 장애로 인해 전송되지 않은 로그 파일이 있다면, 해당 로그 파일을 다시 processed_data 디렉토리에 복사하세요. Splunk Forwarder가 이를 자동으로 감지하여, 누락된 로그를 다시 서버로 전송합니다.
    • 네트워크 문제로 인해 로그가 유실되더라도, 원본 데이터를 processed_data 디렉토리에 보관하고 있다면 후속 조치로 복구가 가능합니다.
  4. 위에서 설정한 내용이 제대로 반영되었는지 확인합니다. 아래 파일은 Splunk Universal Forwarder의 outputs.conf 파일로, 데이터를 전송할 대상 서버에 대한 설정을 포함하고 있습니다.

    $ sudo cat /home/ubuntu/splunkforwarder/etc/system/local/outputs.conf

    # 출력 예시
    [tcpout]
    defaultGroup = default-autolb-group

    [tcpout:default-autolb-group]
    server = ${SPLUNK_PRIVATE_IP}:${SPLUNK_PORT}

    [tcpout-server://${SPLUNK_PRIVATE_IP}:${SPLUNK_PORT}
    환경변수설명
    SPLUNK_PRIVATE_IP🖌 Splunk 인스턴스 Private IP
    SPLUNK_PORT🖌 Splunk 수신 포트 9997
  5. Splunk 인스턴스를 위한 보안 그룹(tutorial-splunk-sg)에 아래 인바운드 규칙을 추가합니다.

    보안 그룹: tutorial-splunk-sg
    1. 카카오클라우드 콘솔 > VPC > 보안 그룹 메뉴로 이동합니다. 아래 표를 참조하여 보안 그룹을 생성합니다.

      이름설명(선택)
      tutorial-splunk-sgSplunk 서버의 보안정책
    2. 하단의 [+ 추가하기] 버튼을 클릭 후, 인바운드 규칙을 추가하고 [적용] 버튼을 클릭합니다.

      추가할 인바운드 규칙항목설정값
      splunk inbound policy 3프로토콜TCP
      패킷 출발지{Forwarder Server Private IP}/32
      포트 번호9997
      정책 설명(선택)UF에서 로그를 수집하는 포트

Step 3. 자동 로그 저장 스크립트 작성

Cloud Trail의 로그 저장 기능을 사용하면, 매시간마다 로그가 Object Storage 버킷 내에 하나의 파일로 저장됩니다. 이 로그를 매시간 Splunk Universal Forwarder 에이전트로 읽어 Splunk Enterprise 서버로 전송하는 자동 스크립트를 작성할 수 있습니다.

자동 스크립트 동작 원리
  1. 최초 스크립트 실행 시, Object Storage의 로그 파일과 Forwarder 에이전트의 로컬 로그 파일을 비교하여 모든 파일을 다운로드합니다.
  2. 가장 최근에 수정된 파일을 Object Storage에서 다운로드하고 압축을 해제합니다.
  3. 압축 해제된 파일을 JSON 객체 리스트로 변환하여 processed_data 디렉토리에 저장한 후, 해당 JSON 객체 리스트를 Splunk로 이벤트 전송합니다.
  4. 중간에 오류가 발생하면, 누락된 로그 파일을 자동으로 확인하고 다운로드합니다.
  • 참고 사항: 2~4번 과정은 매시간 10분마다 반복됩니다.
  1. Forwarder 인스턴스에 SSH로 접속하여 Python 환경을 구성합니다. 아래 명령어를 통해 Python 가상 환경을 생성하고, 필요한 라이브러리를 설치합니다.

    # Python 가상환경 생성 및 활성화
    $ sudo apt install python3-venv
    $ python3 -m venv myenv
    $ source myenv/bin/activate

    # 필요한 Python 라이브러리 설치
    $ pip3 install flask
    $ pip3 install boto3
    $ pip3 install subprocess32
  2. 자동 스크립트에서 AWS CLI를 사용하려면, 먼저 AWS CLI를 설치하고 설정합니다.

    AWS CLI 설치
    $ sudo apt update
    $ sudo apt install unzip

    $ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64-2.15.41.zip" -o "awscliv2.zip"

    $ unzip awscliv2.zip

    $ sudo ./aws/install

    $ aws --version
    aws-cli/2.15.41 Python/3.11.8 Linux/6.8.0-39-generic exe/x86_64.ubuntu.24 prompt/off
    주의

    카카오클라우드에서 호환 가능한 S3 CLI의 최신 버전은 2.22.0입니다. 이 보다 상위 버전을 사용할 경우, S3 명령 요청의 정상 작동이 보장되지 않을 수 있습니다. 본 가이드에서는 2.15.41 버전을 설치하였습니다.

    사전 준비에서 S3 API 사용을 위해 발급받은 크리덴셜을 확인하고, 아래 명령어로 AWS CLI를 설정합니다.

    $ aws configure
    AWS Access Key ID: ${CREDENTIAL_ACCESS_KEY}
    AWS Secret Access Key: ${CREDENTIAL_SECRET_ACCESS_KEY}
    Default region name: kr-central-2
    Default output format:
    환경변수설명
    CREDENTIAL_ACCESS_KEY🖌 S3 API 사용에 필요한 액세스 키
    CREDENTIAL_SECRET_ACCESS_KEY🖌S3 API 사용에 필요한 비밀 액세스 키
  3. 스크립트 파일을 엽니다.

    스크립트 파일 열기
    sudo vi /home/ubuntu/cloudtrail/script.py
  4. 아래 스크립트의 변수를 수정하여 자동 스크립트를 작성합니다.

    안내

    아래 스크립트에 작성된 IAM 엔드포인트 URLObject Storage 엔드포인트 URL은 추후 프라이빗 엔드포인트가 제공될 경우, 변경하여 사용할 수 있습니다.

    import requests
    import zipfile
    import os
    from dateutil import parser
    import json
    import subprocess
    import sys
    import datetime
    import logging
    import time

    # 전역 변수 선언
    BUCKET_NAME = ${BUCKET_NAME}
    ACCESS_ID = ${ACCESS_KEY_ID}
    SECRET_KEY = ${ACCESS_SECRET_KEY}
    PROJECT_ID = ${PROJECT_ID}

    # 로그 설정
    logging.basicConfig(filename='/home/ubuntu/cloudtrail/process_log.log', level=logging.INFO)
    LOG_FILE = "/home/ubuntu/cloudtrail/process_log.log"

    # 로그 함수 정의 (시간 포함)
    def log(level, message):
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"{current_time} [{level}] {message}", file=open(LOG_FILE, "a"))

    # API 토큰을 가져오는 함수
    def get_access_token():
    global ACCESS_ID
    global SECRET_KEY

    url = "https://iam.kakaocloud.com/identity/v3/auth/tokens"
    headers = {'Content-Type': 'application/json'}
    body = {
    "auth": {
    "identity": {
    "methods": ["application_credential"],
    "application_credential": {
    "id": ACCESS_ID,
    "secret": SECRET_KEY
    }
    }
    }
    }


    try:
    response = requests.post(url, json=body, headers=headers)
    response.raise_for_status()
    return response.headers['X-Subject-Token']
    except requests.exceptions.RequestException as e:
    logging.info(f"Error getting access token: {e}")
    return None

    # 로컬 파일 목록 가져오기 (compare)
    def compare_get_local_zip_files(download_dir):
    try:
    local_files = [f for f in os.listdir(download_dir) if f.endswith('.zip')]
    return local_files
    except Exception as e:
    print(f"Error getting local files: {e}")
    return []

    # Object Storage에서 .zip 파일 목록 가져오기 (compare)
    def compare_fetch_api_zip_files(PROJECT_ID, api_token, BUCKET_NAME):
    api_url = f"https://objectstorage.kr-central-2.kakaocloud.com/v1/{PROJECT_ID}/{BUCKET_NAME}?format=json"
    headers = {"X-Auth-Token": api_token}

    try:
    response = requests.get(api_url, headers=headers)
    response.raise_for_status()
    data = response.json()

    file_paths = [item["name"] for item in data if item["name"].endswith(".zip")]
    return file_paths
    except requests.exceptions.RequestException as e:
    print(f"Error fetching API data: {e}")
    return []

    # 다운로드 및 로그 기록 (compare)
    def compare_download_file_from_objectstorage(BUCKET_NAME, object_name, file_name):
    endpoint = "https://objectstorage.kr-central-2.kakaocloud.com"
    download_dir = "/home/ubuntu/cloudtrail/"

    if not os.path.exists(download_dir):
    os.makedirs(download_dir)

    full_file_path = os.path.join(download_dir, file_name)

    try:
    command = [
    "aws",
    "--endpoint-url", endpoint,
    "s3",
    "cp",
    f"s3://{BUCKET_NAME}/{object_name}",
    full_file_path
    ]

    logging.info(f"Attempting to download from: s3://{BUCKET_NAME}/{object_name}")
    result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

    if result.returncode == 0:
    logging.info(f"Downloaded: {file_name}")
    compare_decompress_and_process_file(full_file_path, file_name)
    else:
    logging.error(f"Error during download: {result.stderr}")

    except subprocess.CalledProcessError as e:
    logging.error(f"Subprocess failed: {e.stderr}")
    return
    except Exception as e:
    logging.error(f"Unexpected error: {e}")
    return


    # 파일 압축 해제 후 JSON 변환 처리 (compare)
    def compare_decompress_and_process_file(zip_file_path, file_name):
    try:
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall('/home/ubuntu/cloudtrail/')

    decompressed_file_name = file_name.rsplit('.', 1)[0]
    json_list = compare_convert_to_json_list(f"/home/ubuntu/cloudtrail/{decompressed_file_name}")

    if json_list is not None:
    compare_save_json_to_file(json_list, file_name)

    except zipfile.BadZipFile as e:
    print(f"Error decompressing file {zip_file_path}: {e}")
    except Exception as e:
    print(f"Error processing file {zip_file_path}: {e}")

    # 압축 해제된 파일을 읽고 JSON 객체 리스트로 변환 후 새로운 파일로 저장 (compare)
    def compare_convert_to_json_list(decompressed_file_name):
    json_list = []

    try:
    with open(decompressed_file_name, 'r') as file:
    lines = file.readlines()

    for line in lines:
    try:
    json_obj = json.loads(line)
    json_list.append(json_obj)
    except json.JSONDecodeError as e:
    print(f"Error decoding line to JSON: {e}")
    continue
    return json_list
    except Exception as e:
    print(f"Error reading file {decompressed_file_name}: {e}")
    return None

    # JSON 리스트를 파일로 저장하는 함수 (별도 'processed_data' 디렉터리에 저장)
    def compare_save_json_to_file(json_list, file_name):
    output_dir = 'processed_data'
    if not os.path.exists(output_dir):
    os.makedirs(output_dir)

    output_file_name = os.path.join(output_dir, file_name.rsplit('.', 1)[0] + '.json')

    try:
    with open(output_file_name, 'w') as outfile:
    json.dump(json_list, outfile, indent=4)
    print(f"Saved the JSON list to {output_file_name}")
    except Exception as e:
    print(f"Error saving JSON to file {output_file_name}: {e}")

    # 초기 다운로드 스크립트 시작 (compare)
    def compare_for_download(api_token, local_download_dir):
    global BUCKET_NAME
    global PROJECT_ID
    local_files = compare_get_local_zip_files(local_download_dir)
    logging.info(f"로컬 디렉토리에서 가져온 .zip 파일들: {local_files}")

    remote_files = compare_fetch_api_zip_files(PROJECT_ID, api_token, BUCKET_NAME)
    logging.info(f"API에서 가져온 .zip 파일들: {remote_files}")

    files_to_download = [f for f in remote_files if f.split('/')[-1] not in local_files]
    logging.info(f"다운로드할 파일 목록: {files_to_download}")

    for file in files_to_download:
    file_name = file.split('/')[-1]
    compare_download_file_from_objectstorage(BUCKET_NAME, file, file_name)

    # JSON 리스트를 파일로 저장하는 함수 (별도 'processed_data' 디렉터리에 저장) (main)
    def main_save_json_to_file(json_list, file_name):
    output_dir = 'processed_data'
    if not os.path.exists(output_dir):
    os.makedirs(output_dir)

    output_file_name = os.path.join(output_dir, file_name.rsplit('.', 1)[0] + '.json')

    try:
    with open(output_file_name, 'w') as outfile:
    json.dump(json_list, outfile, indent=4)
    log("SUCCESS", {output_file_name})
    except Exception as e:
    error_check(f"Error saving JSON to file {output_file_name}: {e}")

    # 가장 최근의 last_modified 값을 가진 파일 선택 (main)
    def main_get_most_recent_file(files):
    if not files:
    error_check("No files found.")
    most_recent_file = max(files, key=lambda x: parser.parse(x['last_modified']))
    return most_recent_file

    # 압축 해제된 파일을 읽고 JSON 객체 리스트로 변환 후 새로운 파일로 저장 (main)
    def main_convert_to_json_list(decompressed_file_name):
    json_list = []

    try:
    with open(decompressed_file_name, 'r') as file:
    lines = file.readlines()

    for line in lines:
    try:
    json_obj = json.loads(line)
    json_list.append(json_obj)
    except json.JSONDecodeError as e:
    log(f"ERROR", f"Error decoding line to JSON: {e}")
    continue
    return json_list
    except Exception as e:
    error_check(f"Error reading file {decompressed_file_name}: {e}")
    return None

    # 파일 압축 해제 후 JSON 변환 처리 (main)
    def main_decompress_and_process_file(zip_file_path, file_name):
    try:
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall('/home/ubuntu/cloudtrail/')

    decompressed_file_name = file_name.rsplit('.', 1)[0]
    json_list = main_convert_to_json_list(f"/home/ubuntu/cloudtrail/{decompressed_file_name}")

    if json_list:
    main_save_json_to_file(json_list, file_name)

    except zipfile.BadZipFile as e:
    error_check(f"Error decompressing file {zip_file_path}: {e}")
    except Exception as e:
    error_check(f"Error processing file {zip_file_path}: {e}")

    # object storage로부터 파일 다운로드 (main)
    def main_download_file_from_objectstorage(BUCKET_NAME, object_name, file_name):
    endpoint = "https://objectstorage.kr-central-2.kakaocloud.com"
    download_dir = "/home/ubuntu/cloudtrail/"

    if not os.path.exists(download_dir):
    os.makedirs(download_dir)

    full_file_path = os.path.join(download_dir, file_name)

    try:
    command = [
    "aws",
    "--endpoint-url", endpoint,
    "s3",
    "cp",
    f"s3://{BUCKET_NAME}/{object_name}",
    full_file_path
    ]

    result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

    log("INFO", {full_file_path})

    main_decompress_and_process_file(full_file_path, file_name)

    except subprocess.CalledProcessError as e:
    error_check(f"Error occurred while downloading the file: {e.stderr}")

    # Object Storage API를 통해 파일 목록 가져오기
    def main_fetch_files_from_objectstorage(PROJECT_ID, BUCKET_NAME, access_token):
    url = f'https://objectstorage.kr-central-2.kakaocloud.com/v1/{PROJECT_ID}/{BUCKET_NAME}?format=json'

    headers = {
    'X-Auth-Token': access_token
    }

    try:
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()
    except requests.exceptions.RequestException as e:
    error_check(f"Error fetching files: {e}")
    return []

    # Zip 파일 압축 해제 (main)
    def main_decompress_zip(file_name):
    file_dir = os.path.dirname(file_name) # 압축 파일이 있는 디렉토리
    decompressed_file_name = file_name.rstrip('.zip') # .zip 제거한 파일 이름
    try:
    # 압축 해제
    with zipfile.ZipFile(file_name, 'r') as zip_ref:
    zip_ref.extractall(file_dir) # 파일이 있는 디렉토리에 압축 해제

    # 압축 해제 후 생성된 파일 경로
    decompressed_file_path = os.path.join(file_dir, decompressed_file_name)

    return decompressed_file_path

    except Exception as e:
    error_check(f"Error decompressing file: {e}")

    # 파일 다운로드 및 압축 해제 후 경로 반환 (main)
    def main_process_file_for_download(PROJECT_ID, api_token, BUCKET_NAME):

    files = main_fetch_files_from_objectstorage(PROJECT_ID, BUCKET_NAME, api_token)
    if not files:
    error_check("No files found.")

    most_recent_file = main_get_most_recent_file(files)
    if not most_recent_file:
    error_check("No files found.")
    return None, None

    object_name = most_recent_file['name']
    file_name = object_name.split('/')[-1]

    main_download_file_from_objectstorage(BUCKET_NAME, object_name, file_name)

    if file_name.endswith('.zip'):
    decompressed_file_name = main_decompress_zip(file_name)
    if decompressed_file_name:
    return decompressed_file_name, file_name
    else:
    error_check("Failed to decompress the file.")
    else:
    return file_name, None

    # 메인 스크립트 (main)
    def main_script(api_token):
    global BUCKET_NAME
    global PROJECT_ID

    decompressed_file_name, original_file_name = main_process_file_for_download(PROJECT_ID, api_token, BUCKET_NAME)

    if not decompressed_file_name:
    return error_check("No file was processed.")

    json_list = main_convert_to_json_list(decompressed_file_name)
    if not json_list:
    return error_check("No JSON data found.")

    main_save_json_to_file(json_list, original_file_name)

    # 에러 발생 시 호출되는 함수 (main)
    def error_check(error_message):
    global BUCKET_NAME
    log("ERROR", error_message)

    api_token = get_access_token()
    local_download_dir = "/home/ubuntu/cloudtrail/"
    BUCKET_NAME = BUCKET_NAME
    compare_for_download(api_token, local_download_dir)

    # 매시각 10분에 실행되도록 하기 위한 함수
    def wait_until_next_hour_10():
    now = datetime.datetime.now()
    next_hour_10 = now.replace(minute=10, second=0, microsecond=0)
    if now > next_hour_10:
    next_hour_10 += datetime.timedelta(hours=1)

    wait_time = (next_hour_10 - now).total_seconds()
    log("INFO", f"다음 실행까지 {wait_time}초 대기 중...")
    time.sleep(wait_time)

    # 메인 함수
    if __name__ == "__main__":

    # 초기 다운로드 스크립트
    api_token = get_access_token()
    local_download_dir = "/home/ubuntu/cloudtrail/"
    compare_for_download(api_token, local_download_dir)

    # 기본 로그 저장 스크립트
    log("INFO", "메인 스크립트 시작")
    main_script(api_token)
    log("INFO", "메인 스크립트 종료")

    while True:
    # 매시각 10분까지 대기
    wait_until_next_hour_10()

    # 10분마다 메인 스크립트 실행
    log("INFO", "메인 스크립트 시작")
    api_token = get_access_token()
    main_script(api_token)
    log("INFO", "메인 스크립트 종료")
    환경변수설명
    BUCKET_NAME🖌 Object Storage Bucket 이름
    ACCESS_KEY_ID🖌 사용자 액세스 키 ID 수정
    ACCESS_SECRET_KEY🖌 사용자 액세스 보안 키 수정
    PROJECT_ID🖌 프로젝트 ID

Step 4. 백그라운드 프로세스 실행

  1. 스크립트와 로그 파일에 적절한 권한을 추가합니다.

    # 로그 파일 생성
    sudo touch /home/ubuntu/cloudtrail/process_log.log
    # 로그 파일 권한 설정
    sudo chmod 666 /home/ubuntu/cloudtrail/process_log.log
    # 스크립트 파일 권한 설정
    sudo chmod +x /home/ubuntu/cloudtrail/script.py
  2. 백그라운드 프로세스로 스크립트를 실행합니다.

    nohup bash /home/ubuntu/cloudtrail/script.py > /dev/null 2>&1 &
  3. 결과를 확인합니다.

    • 스크립트가 매시간 10분마다 최신 로그 파일을 /home/ubuntu/cloudtrail/processed_data 디렉토리에 json 형식으로 생성합니다.
    • ls /home/ubuntu/cloudtrail/processed_data 명령어로 로그 파일이 생성되었는지 확인합니다.
    • tail -f /home/ubuntu/cloudtrail/process_log.log 명령어로 스크립트 실행 상태를 확인하며, 로그 파일에 출력되는 내용을 확인합니다.
    예시: /home/ubuntu/cloudtrail/process_log.log 파일 내용
    INFO:root:로컬 디렉토리에서 가져온 .zip 파일들: ['trail_XXXX-XX-XX-XX.zip', 'trail_XXXX-XX-XX-XX.zip', ...]
    INFO:root:API에서 가져온 .zip 파일들: ['trail_XXXX-XX-XX-XX.zip', 'trail_XXXX-XX-XX-XX.zip', ...]
    INFO:root:다운로드할 파일 목록: []
    [INFO] 메인 스크립트 시작
    [INFO] {'/home/ubuntu/cloudtrail/trail_XXXX-XX-XX-XX.zip'}
    [SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
    [SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
    [INFO] 메인 스크립트 종료
    [INFO] 다음 실행까지 453.866015초 대기 중...
    [INFO] 메인 스크립트 시작
    [INFO] {'/home/ubuntu/cloudtrail/trail_XXXX-XX-XX-XX.zip'}
    [SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
    [SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
    [INFO] 메인 스크립트 종료
    [INFO] 다음 실행까지 3597.749401초 대기 중...
    [INFO] 메인 스크립트 시작
    [INFO] {'/home/ubuntu/cloudtrail/trail_XXXX-XX-XX-XX.zip'}
    [SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
    [SUCCESS] {'processed_data/trail_XXXX-XX-XX-XX.json'}
    [INFO] 메인 스크립트 종료
    [INFO] 다음 실행까지 3598.177826초 대기 중...

Step 5. Splunk Enterprise에 적재된 로그 확인

Splunk Enterprise 웹 UI에 접속 및 로그인을 하여 Cloud Trail 로그를 확인합니다.

check 예시