- AWS에서의 침해사고 자동 대응: 실시간 SSRF 감지 및 대응 자동화2023년 09월 11일
- 59lee
- 작성자
- 2023.09.11.:25
1. 침해사고 자동 대응의 필요성:
- 침해사고 발생 시의 전통적인 대응 방식
- 전통적인 방식은 수동으로 로그 분석, 악성 코드 탐지 및 시스템 복구 작업을 수행합니다.
- 자동화의 장점 및 이점
- 실시간 대응 가능하고 시간 및 리소스 절약됩니다.
2. SSRF(서버 측 요청 위조)란?:
SSRF 그림 - SSRF에 대한 간략한 설명
- SSRF 취약점은 직접적으로 백엔드 서버를 접근할 수 없지만, 서버를 통해서 백엔드 서버 반응값으로 공격을 찾는 방법입니다.
- SSRF를 통한 침해사고 예시
캐피탈 원 SSRF 취약점이 존재하는 IMDS v1 AWS 클라우드 환경에서 이미지 업로드 발생하는 취약 파라미터 발견해서 엑세스, 토큰 탈취하고 IAM 권한을 이용해 S3 버킷 데이터 탈취에 악용한 사례이다.
3. AWS 환경에서의 자동 대응 전략:
이미지 순서 대 - 인스턴스의 메타데이터를 캡처하기 위해 초기 단계로 인스턴스 종료 보호를 활성화하고, 아웃바운드 트래픽을 미허용하는 방식으로 인스턴스를 격리합니다.
- 인스턴스와 관련된 서비스 연결 해제하고 ASG에서 인스턴스를 분리하고, ELB에서 등록을 해제합니다.
- EBS 볼륨의 스냅샷을 생성하여 분석을 준비합니다.
- AWS의 람다와 SSM 실행 명령을 통해 자동화할 수 있습니다 (파이썬 통해서 가능)
4. 실제 구현 예제:
현재 인스턴스 보안 규칙 - 현재 웹 서버가 설정되어 있는 인스턴스의 보안 그룹의 규칙 상태이다
정상적인 접근 - 정상적인 접근이 일 때 아무 문제 없이 접근이 가능하다.
플라스크에서 감지 - 하지만 메타데이터로 접근하려고 하는 순간 SSRF 시도 감지할 수 있도록 플라스크 코드를 만들었다
SSRF 감지 격리시작이라고 알려준다 - 브라우저 상으로도 SSRF 시도를 감지했다고 알림을 준다.
더 안전한 보안 그룹으로 옮긴 상태 - SSRF를 인지하고 현재 더 안전한 보안 그룹으로 옮긴 상태이다.
SSRF 인지하는 코드
#!/usr/bin/env python3 from flask import Flask, request import os import urllib import logging import boto3 import requests import hashlib import subprocess # Configurable Variables FORENSIC_INSTANCE_ID_ENV_VAR = 'FORENSIC_INSTANCE_ID' FORENSIC_INSTANCE_ID = os.environ.get(FORENSIC_INSTANCE_ID_ENV_VAR, "i-01") REGION = 'ap-northeast-2' SG_ID = 'sg-' WEB_SERVER_PORT = 5001 FORENSIC_SCRIPT_PATH = '/home/ec2-user/shss.py' # 실제 shs.sh 스크립트의 경로로 수정하세요. app = Flask(__name__) logging.basicConfig(level=logging.INFO) ec2 = boto3.client('ec2', region_name=REGION) autoscaling = boto3.client('autoscaling', region_name=REGION) ssm = boto3.client('ssm', region_name=REGION) if not FORENSIC_INSTANCE_ID: logging.error("Forensic instance ID environment variable not set!") raise Exception("Forensic instance ID environment variable not set!") def get_current_instance_id(): try: response = requests.get('<http://169.254.169.254/latest/meta-data/instance-id>') return response.text except Exception as e: logging.error(f"Fetching instance ID failed: {e}") raise def detect_ssrf(url): if "169.254.169.254" in url: logging.warning(f"Possible SSRF attempt detected: {url}") respond_to_ssrf() return True return False def enable_termination_protection(instance_id): ec2.modify_instance_attribute(InstanceId=instance_id, DisableApiTermination={'Value': True}) def isolate_instance(instance_id, sg_id): ec2.modify_instance_attribute(InstanceId=instance_id, Groups=[sg_id]) def detach_from_asg(instance_id): asg_name = get_asg_name(instance_id) if asg_name: autoscaling.detach_instances(AutoScalingGroupName=asg_name, InstanceIds=[instance_id], ShouldDecrementDesiredCapacity=True) def snapshot_and_attach_to_forensic(instance_id): volumes = ec2.describe_volumes(Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}]) for volume in volumes['Volumes']: snapshot = ec2.create_snapshot(VolumeId=volume['VolumeId']) snapshot_id = snapshot['SnapshotId'] waiter = ec2.get_waiter('snapshot_completed') waiter.wait(SnapshotIds=[snapshot_id]) new_volume = ec2.create_volume(AvailabilityZone='ap-northeast-2c', SnapshotId=snapshot_id) volume_id = new_volume['VolumeId'] waiter = ec2.get_waiter('volume_available') waiter.wait(VolumeIds=[volume_id]) TARGET_DEVICE_NAME = get_next_available_device() ec2.attach_volume(InstanceId=FORENSIC_INSTANCE_ID, VolumeId=volume_id, Device=TARGET_DEVICE_NAME) def tag_instance(instance_id, tag_key, tag_value): ec2.create_tags(Resources=[instance_id], Tags=[{'Key': tag_key, 'Value': tag_value}]) def execute_ssm_command(instance_id): commands = ["sudo dmesg > /tmp/logs.txt"] response = ssm.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", Parameters={'commands': commands} ) return response def run_forensic_script(): try: result = subprocess.run([FORENSIC_SCRIPT_PATH], check=True, text=True, capture_output=True) return result.stdout except subprocess.CalledProcessError as e: logging.error(f"Forensic script execution failed with error: {e}") return None def respond_to_ssrf(): try: instance_id = get_current_instance_id() logging.info(f"SSRF detected on instance: {instance_id}") enable_termination_protection(instance_id) isolate_instance(instance_id, SG_ID) detach_from_asg(instance_id) snapshot_and_attach_to_forensic(instance_id) tag_instance(instance_id, "InvestigationTicket", "SSRF_Detected") execute_ssm_command(instance_id) forensic_report = get_forensic_summary() logging.info(f"Forensic Analysis Report:\\n{forensic_report}") except Exception as e: logging.error(f"SSRF response failed with error: {e}") # 포렌식 스크립트 실행 forensic_result = run_forensic_script() if forensic_result: logging.info(f"Forensic script result:\\n{forensic_result}") def get_asg_name(instance_id): asg_descriptions = autoscaling.describe_auto_scaling_instances(InstanceIds=[instance_id]) for asg_instance in asg_descriptions['AutoScalingInstances']: if asg_instance['InstanceId'] == instance_id: return asg_instance['AutoScalingGroupName'] return None def get_next_available_device(): used_devices = [] try: forensic_volumes = ec2.describe_volumes(Filters=[{'Name': 'attachment.instance-id', 'Values': [FORENSIC_INSTANCE_ID]}]) used_devices = [vol['Attachments'][0]['Device'] for vol in forensic_volumes['Volumes'] if vol['Attachments']] except Exception as e: logging.error(f"Fetching used devices failed with error: {e}") raise e for char in "bcdefghijklmnopqrstuvwxyz": device_name = f"/dev/xvd{char}" if device_name not in used_devices: return device_name raise Exception("All device names are in use.") def verify_integrity(file_path, original_hash): with open(file_path, 'rb') as f: file_data = f.read() current_hash = hashlib.sha256(file_data).hexdigest() return original_hash == current_hash def run_forensic_analysis_on_instance(instance_id): forensic_commands = [ "sudo /path/to/shs.sh" # 실제 포렌식 분석 스크립트의 경로로 변경 ] ssm.send_command( InstanceIds=[instance_id], DocumentName="AWS-RunShellScript", Parameters={'commands': forensic_commands} ) def get_forensic_summary(): forensic_summary = "" integrity_violations = [] # TODO: 이 해시 값들은 외부에서 안전하게 관리해야 합니다. # 이는 예제로 제공된 것이며 실제 파일 경로와 해시 값을 반영해야 합니다. original_hashes = { "/path/to/important/file1": "sample_hash_value1", "/path/to/important/file2": "sample_hash_value2" } for file_path, expected_hash in original_hashes.items(): if not verify_integrity(file_path, expected_hash): integrity_violations.append(f"File {file_path} has been modified!") # 주의: 애플리케이션이 해당 파일에 대한 읽기 권한이 있는지 확인해야 합니다. with open("/var/log/forensic_analysis.log", 'r') as file: lines = file.readlines() start_time = lines[0].split("시작: ")[1].strip() end_time = lines[-1].split("완료: ")[1].strip() forensic_summary = f"포렌식 분석 시작 시간: {start_time}\\n포렌식 분석 완료 시간: {end_time}\\n" if integrity_violations: forensic_summary += "\\n무결성 검증 침해사고:\\n" for violation in integrity_violations: forensic_summary += f"- {violation}\\n" return forensic_summary # Inside your respond_to_ssrf function... snapshot_and_attach_to_forensic(instance_id) tag_instance(instance_id, "InvestigationTicket", "SSRF_Detected") execute_ssm_command(instance_id) # After forensic analysis script execution forensic_report = get_forensic_summary() print(f"Forensic Analysis Report:\\n{forensic_report}") @app.route("/", methods=["GET"]) def home(): return "플라스크 웹" @app.route("/go", methods=["GET"]) def route(): url = request.args.get("url") if not url: return "URL 파라미터 요구.", 400 if detect_ssrf(url): return "SSRF 감지 격리 시작." try: req = urllib.request.Request(url) with urllib.request.urlopen(req) as response: data = response.read() return data except Exception as e: logging.error(f"Error in route function: {e}") return "Error" if __name__ == "__main__": app.run(host='0.0.0.0', port=WEB_SERVER_PORT)
5. 테스트 및 결과:
정상적인 인스턴스 - 정상적인 인스턴스는 볼륨 ID를 하나만 부여받은 경우가 많다.포렌식 전용 인스턴스는 볼륨 ID를 여러가지 부여받은 경우가 많다.
포렌식 EBS - EBS 상태를 확인했다(가장 최근에 붙은 xvdc로 추정된다)
EBS 확인 포렌식 자동화하는 코드
#!/usr/bin/env python3 import os import time from datetime import datetime MOUNTED_DEVICE = "/dev/xvdc" # 분석할 볼륨 디바이스 MOUNT_POINT = "/mnt/forensic" # 마운트 포인트 LOG_FILE = "/var/log/forensic_analysis.log" # 로그 파일 경로 LOCK_FILE = "/var/lock/forensic_analysis.lock" # 락 파일 경로 LOCK_TIMEOUT = 60 # 락 파일 타임아웃 시간 (초) def log(message): current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") log_message = f"{current_time}: {message}" with open(LOG_FILE, 'a') as log_file: log_file.write(f"{log_message}\\n") print(log_message) def acquire_lock(): start_time = time.time() while True: try: with open(LOCK_FILE, 'w') as lock_fd: lock_fd.write("lock") lock_fd.flush() os.fsync(lock_fd) log("Lock acquired.") return True except IOError: if time.time() - start_time > LOCK_TIMEOUT: log("Lock acquisition timed out.") return False time.sleep(1) def mount_device(): if not os.path.exists(MOUNT_POINT): os.makedirs(MOUNT_POINT, exist_ok=True) if acquire_lock(): if not os.path.ismount(MOUNT_POINT): log(f"Mounting {MOUNTED_DEVICE} on {MOUNT_POINT}") mount_command = f'mount {MOUNTED_DEVICE} {MOUNT_POINT}' mount_result = os.system(mount_command) log(f"Mount command result: {mount_result}") log("3. 포렌식 분석 수행") # 포렌식 분석 작업을 수행하는 코드 추가 # 예: forensic_analysis(MOUNT_POINT) log("4. 마운트 해제") umount_command = f'umount {MOUNT_POINT}' umount_result = os.system(umount_command) log(f"Umount command result: {umount_result}") def cleanup(): if os.path.exists(LOCK_FILE): os.remove(LOCK_FILE) def main(): log("포렌식 분석 스크립트 시작") try: mount_device() except Exception as e: log(f"Error: {str(e)}") finally: cleanup() log("5. 작업 완료 후 정리") # 필요한 리소스 정리 작업을 추가 log("포렌식 분석 스크립트 완료") def display_forensic_results(log_file): try: with open(log_file, 'r') as log_file: lines = log_file.readlines() print("포렌식 결과:") for line in lines: print(line.strip()) except Exception as e: print(f"포렌식 결과를 읽어오는 중 오류 발생: {str(e)}") if __name__ == '__main__': main() display_forensic_results(LOG_FILE)
이러한 식으로 로그 기록이 남은다.
이러한 식으로 파이썬을 이용해서 할 수 있다. 'AHSS[1기]' 카테고리의 다른 글
AWS에서 진행할 수 있는 자동화 포렌식 (0) 2023.09.17 5주차(9.24) : AWS Penetration Testing (0) 2023.08.25 4주차(9.17) : AWS WA Labs : Security (0) 2023.08.25 3주차(9.10) - 웹 취약점 및 보안 (0) 2023.08.25 2주차(9.3) : IAM 취약점 및 보안 (0) 2023.08.25 다음글이전글이전 글이 없습니다.댓글
스킨 업데이트 안내
현재 이용하고 계신 스킨의 버전보다 더 높은 최신 버전이 감지 되었습니다. 최신버전 스킨 파일을 다운로드 받을 수 있는 페이지로 이동하시겠습니까?
("아니오" 를 선택할 시 30일 동안 최신 버전이 감지되어도 모달 창이 표시되지 않습니다.)