AHSS[1기]
AWS에서의 침해사고 자동 대응: 실시간 SSRF 감지 및 대응 자동화
59lee
2023. 9. 11. 21:25
1. 침해사고 자동 대응의 필요성:
- 침해사고 발생 시의 전통적인 대응 방식
- 전통적인 방식은 수동으로 로그 분석, 악성 코드 탐지 및 시스템 복구 작업을 수행합니다.
- 자동화의 장점 및 이점
- 실시간 대응 가능하고 시간 및 리소스 절약됩니다.
2. SSRF(서버 측 요청 위조)란?:
- SSRF에 대한 간략한 설명
- SSRF 취약점은 직접적으로 백엔드 서버를 접근할 수 없지만, 서버를 통해서 백엔드 서버 반응값으로 공격을 찾는 방법입니다.
- SSRF를 통한 침해사고 예시
SSRF 취약점이 존재하는 IMDS v1 AWS 클라우드 환경에서 이미지 업로드 발생하는 취약 파라미터 발견해서 엑세스, 토큰 탈취하고 IAM 권한을 이용해 S3 버킷 데이터 탈취에 악용한 사례이다.
3. AWS 환경에서의 자동 대응 전략:
- 인스턴스의 메타데이터를 캡처하기 위해 초기 단계로 인스턴스 종료 보호를 활성화하고, 아웃바운드 트래픽을 미허용하는 방식으로 인스턴스를 격리합니다.
- 인스턴스와 관련된 서비스 연결 해제하고 ASG에서 인스턴스를 분리하고, ELB에서 등록을 해제합니다.
- EBS 볼륨의 스냅샷을 생성하여 분석을 준비합니다.
- AWS의 람다와 SSM 실행 명령을 통해 자동화할 수 있습니다 (파이썬 통해서 가능)
4. 실제 구현 예제:
- 현재 웹 서버가 설정되어 있는 인스턴스의 보안 그룹의 규칙 상태이다
- 정상적인 접근이 일 때 아무 문제 없이 접근이 가능하다.
- 하지만 메타데이터로 접근하려고 하는 순간 SSRF 시도 감지할 수 있도록 플라스크 코드를 만들었다
- 브라우저 상으로도 SSRF 시도를 감지했다고 알림을 준다.
- SSRF를 인지하고 현재 더 안전한 보안 그룹으로 옮긴 상태이다.
SSRF 인지하는 코드
#!/usr/bin/env python3
from flask import Flask, request
import os
import urllib
import logging
import boto3
import requests
import hashlib
import subprocess
# Configurable Variables
FORENSIC_INSTANCE_ID_ENV_VAR = 'FORENSIC_INSTANCE_ID'
FORENSIC_INSTANCE_ID = os.environ.get(FORENSIC_INSTANCE_ID_ENV_VAR, "i-01")
REGION = 'ap-northeast-2'
SG_ID = 'sg-'
WEB_SERVER_PORT = 5001
FORENSIC_SCRIPT_PATH = '/home/ec2-user/shss.py' # 실제 shs.sh 스크립트의 경로로 수정하세요.
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
ec2 = boto3.client('ec2', region_name=REGION)
autoscaling = boto3.client('autoscaling', region_name=REGION)
ssm = boto3.client('ssm', region_name=REGION)
if not FORENSIC_INSTANCE_ID:
logging.error("Forensic instance ID environment variable not set!")
raise Exception("Forensic instance ID environment variable not set!")
def get_current_instance_id():
try:
response = requests.get('<http://169.254.169.254/latest/meta-data/instance-id>')
return response.text
except Exception as e:
logging.error(f"Fetching instance ID failed: {e}")
raise
def detect_ssrf(url):
if "169.254.169.254" in url:
logging.warning(f"Possible SSRF attempt detected: {url}")
respond_to_ssrf()
return True
return False
def enable_termination_protection(instance_id):
ec2.modify_instance_attribute(InstanceId=instance_id, DisableApiTermination={'Value': True})
def isolate_instance(instance_id, sg_id):
ec2.modify_instance_attribute(InstanceId=instance_id, Groups=[sg_id])
def detach_from_asg(instance_id):
asg_name = get_asg_name(instance_id)
if asg_name:
autoscaling.detach_instances(AutoScalingGroupName=asg_name, InstanceIds=[instance_id], ShouldDecrementDesiredCapacity=True)
def snapshot_and_attach_to_forensic(instance_id):
volumes = ec2.describe_volumes(Filters=[{'Name': 'attachment.instance-id', 'Values': [instance_id]}])
for volume in volumes['Volumes']:
snapshot = ec2.create_snapshot(VolumeId=volume['VolumeId'])
snapshot_id = snapshot['SnapshotId']
waiter = ec2.get_waiter('snapshot_completed')
waiter.wait(SnapshotIds=[snapshot_id])
new_volume = ec2.create_volume(AvailabilityZone='ap-northeast-2c', SnapshotId=snapshot_id)
volume_id = new_volume['VolumeId']
waiter = ec2.get_waiter('volume_available')
waiter.wait(VolumeIds=[volume_id])
TARGET_DEVICE_NAME = get_next_available_device()
ec2.attach_volume(InstanceId=FORENSIC_INSTANCE_ID, VolumeId=volume_id, Device=TARGET_DEVICE_NAME)
def tag_instance(instance_id, tag_key, tag_value):
ec2.create_tags(Resources=[instance_id], Tags=[{'Key': tag_key, 'Value': tag_value}])
def execute_ssm_command(instance_id):
commands = ["sudo dmesg > /tmp/logs.txt"]
response = ssm.send_command(
InstanceIds=[instance_id],
DocumentName="AWS-RunShellScript",
Parameters={'commands': commands}
)
return response
def run_forensic_script():
try:
result = subprocess.run([FORENSIC_SCRIPT_PATH], check=True, text=True, capture_output=True)
return result.stdout
except subprocess.CalledProcessError as e:
logging.error(f"Forensic script execution failed with error: {e}")
return None
def respond_to_ssrf():
try:
instance_id = get_current_instance_id()
logging.info(f"SSRF detected on instance: {instance_id}")
enable_termination_protection(instance_id)
isolate_instance(instance_id, SG_ID)
detach_from_asg(instance_id)
snapshot_and_attach_to_forensic(instance_id)
tag_instance(instance_id, "InvestigationTicket", "SSRF_Detected")
execute_ssm_command(instance_id)
forensic_report = get_forensic_summary()
logging.info(f"Forensic Analysis Report:\\n{forensic_report}")
except Exception as e:
logging.error(f"SSRF response failed with error: {e}")
# 포렌식 스크립트 실행
forensic_result = run_forensic_script()
if forensic_result:
logging.info(f"Forensic script result:\\n{forensic_result}")
def get_asg_name(instance_id):
asg_descriptions = autoscaling.describe_auto_scaling_instances(InstanceIds=[instance_id])
for asg_instance in asg_descriptions['AutoScalingInstances']:
if asg_instance['InstanceId'] == instance_id:
return asg_instance['AutoScalingGroupName']
return None
def get_next_available_device():
used_devices = []
try:
forensic_volumes = ec2.describe_volumes(Filters=[{'Name': 'attachment.instance-id', 'Values': [FORENSIC_INSTANCE_ID]}])
used_devices = [vol['Attachments'][0]['Device'] for vol in forensic_volumes['Volumes'] if vol['Attachments']]
except Exception as e:
logging.error(f"Fetching used devices failed with error: {e}")
raise e
for char in "bcdefghijklmnopqrstuvwxyz":
device_name = f"/dev/xvd{char}"
if device_name not in used_devices:
return device_name
raise Exception("All device names are in use.")
def verify_integrity(file_path, original_hash):
with open(file_path, 'rb') as f:
file_data = f.read()
current_hash = hashlib.sha256(file_data).hexdigest()
return original_hash == current_hash
def run_forensic_analysis_on_instance(instance_id):
forensic_commands = [
"sudo /path/to/shs.sh" # 실제 포렌식 분석 스크립트의 경로로 변경
]
ssm.send_command(
InstanceIds=[instance_id],
DocumentName="AWS-RunShellScript",
Parameters={'commands': forensic_commands}
)
def get_forensic_summary():
forensic_summary = ""
integrity_violations = []
# TODO: 이 해시 값들은 외부에서 안전하게 관리해야 합니다.
# 이는 예제로 제공된 것이며 실제 파일 경로와 해시 값을 반영해야 합니다.
original_hashes = {
"/path/to/important/file1": "sample_hash_value1",
"/path/to/important/file2": "sample_hash_value2"
}
for file_path, expected_hash in original_hashes.items():
if not verify_integrity(file_path, expected_hash):
integrity_violations.append(f"File {file_path} has been modified!")
# 주의: 애플리케이션이 해당 파일에 대한 읽기 권한이 있는지 확인해야 합니다.
with open("/var/log/forensic_analysis.log", 'r') as file:
lines = file.readlines()
start_time = lines[0].split("시작: ")[1].strip()
end_time = lines[-1].split("완료: ")[1].strip()
forensic_summary = f"포렌식 분석 시작 시간: {start_time}\\n포렌식 분석 완료 시간: {end_time}\\n"
if integrity_violations:
forensic_summary += "\\n무결성 검증 침해사고:\\n"
for violation in integrity_violations:
forensic_summary += f"- {violation}\\n"
return forensic_summary
# Inside your respond_to_ssrf function...
snapshot_and_attach_to_forensic(instance_id)
tag_instance(instance_id, "InvestigationTicket", "SSRF_Detected")
execute_ssm_command(instance_id)
# After forensic analysis script execution
forensic_report = get_forensic_summary()
print(f"Forensic Analysis Report:\\n{forensic_report}")
@app.route("/", methods=["GET"])
def home():
return "플라스크 웹"
@app.route("/go", methods=["GET"])
def route():
url = request.args.get("url")
if not url:
return "URL 파라미터 요구.", 400
if detect_ssrf(url):
return "SSRF 감지 격리 시작."
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as response:
data = response.read()
return data
except Exception as e:
logging.error(f"Error in route function: {e}")
return "Error"
if __name__ == "__main__":
app.run(host='0.0.0.0', port=WEB_SERVER_PORT)
5. 테스트 및 결과:
- 정상적인 인스턴스는 볼륨 ID를 하나만 부여받은 경우가 많다.포렌식 전용 인스턴스는 볼륨 ID를 여러가지 부여받은 경우가 많다.
- EBS 상태를 확인했다(가장 최근에 붙은 xvdc로 추정된다)
포렌식 자동화하는 코드
#!/usr/bin/env python3
import os
import time
from datetime import datetime
MOUNTED_DEVICE = "/dev/xvdc" # 분석할 볼륨 디바이스
MOUNT_POINT = "/mnt/forensic" # 마운트 포인트
LOG_FILE = "/var/log/forensic_analysis.log" # 로그 파일 경로
LOCK_FILE = "/var/lock/forensic_analysis.lock" # 락 파일 경로
LOCK_TIMEOUT = 60 # 락 파일 타임아웃 시간 (초)
def log(message):
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
log_message = f"{current_time}: {message}"
with open(LOG_FILE, 'a') as log_file:
log_file.write(f"{log_message}\\n")
print(log_message)
def acquire_lock():
start_time = time.time()
while True:
try:
with open(LOCK_FILE, 'w') as lock_fd:
lock_fd.write("lock")
lock_fd.flush()
os.fsync(lock_fd)
log("Lock acquired.")
return True
except IOError:
if time.time() - start_time > LOCK_TIMEOUT:
log("Lock acquisition timed out.")
return False
time.sleep(1)
def mount_device():
if not os.path.exists(MOUNT_POINT):
os.makedirs(MOUNT_POINT, exist_ok=True)
if acquire_lock():
if not os.path.ismount(MOUNT_POINT):
log(f"Mounting {MOUNTED_DEVICE} on {MOUNT_POINT}")
mount_command = f'mount {MOUNTED_DEVICE} {MOUNT_POINT}'
mount_result = os.system(mount_command)
log(f"Mount command result: {mount_result}")
log("3. 포렌식 분석 수행")
# 포렌식 분석 작업을 수행하는 코드 추가
# 예: forensic_analysis(MOUNT_POINT)
log("4. 마운트 해제")
umount_command = f'umount {MOUNT_POINT}'
umount_result = os.system(umount_command)
log(f"Umount command result: {umount_result}")
def cleanup():
if os.path.exists(LOCK_FILE):
os.remove(LOCK_FILE)
def main():
log("포렌식 분석 스크립트 시작")
try:
mount_device()
except Exception as e:
log(f"Error: {str(e)}")
finally:
cleanup()
log("5. 작업 완료 후 정리")
# 필요한 리소스 정리 작업을 추가
log("포렌식 분석 스크립트 완료")
def display_forensic_results(log_file):
try:
with open(log_file, 'r') as log_file:
lines = log_file.readlines()
print("포렌식 결과:")
for line in lines:
print(line.strip())
except Exception as e:
print(f"포렌식 결과를 읽어오는 중 오류 발생: {str(e)}")
if __name__ == '__main__':
main()
display_forensic_results(LOG_FILE)
이러한 식으로 로그 기록이 남은다.