3-3) 파티션 분석 도구 만들기(기초)

Python 을 이용한 소스 코드 작성 및 실행 방법

0-2) Python 을 이용한 개발 환경 구성

우리가 알고 있는 것

  1. 0번 섹터에서 파티션 테이블에서, MBR 파티션 테이블, GPT 파티션 테이블을 확인 가능

  2. 파티션테이블은 MBR에서 확인 가능하며, GPT 파티션테이블의 경우 GPT 헤더 이 후 파티션테이블을 저장하는 공간이 따로 있다.

파티션 분석 주요 요구사항

  1. 물리 드라이브를 리스트를 표시하고 선택한 물리디스크의 파티션을 분석한다.

  2. MBR을 분석하여 GPT 파티션테이블을 포함하고 있는지 여부를 확인하고 분석 한다.

  3. 파티션테이블을 분석하고, 파티션별 시작 섹터, 마지막섹터, 총 섹터 수, 파티션별 용량을 GUI로 표시하자.

1. 파티션 분석 도구 사용 방법 및 최종 소스

  • 아래 소스는 ChatGPT에 MBR, GPT 파티션 구조를 알려주소 확인 할 수 있는 GUI 코드를 짜달라고 하였더니 해준 것입니다.

  • 파이썬 소스를 이용하여 실습에 참고로 활용하도록 합시다.

최종 Python 소스
import ctypes
import subprocess
import logging
import tkinter as tk
from tkinter import messagebox, ttk
import json

# Windows 상수 정의
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002
OPEN_EXISTING = 3
IOCTL_DISK_GET_DRIVE_GEOMETRY = 0x00070000
IOCTL_DISK_GET_LENGTH_INFO = 0x0007405C

# ctypes.wintypes 불러오기
from ctypes import wintypes

# DISK_GEOMETRY 구조체
class DISK_GEOMETRY(ctypes.Structure):
    _fields_ = [
        ("Cylinders", wintypes.LARGE_INTEGER),
        ("MediaType", wintypes.DWORD),
        ("TracksPerCylinder", wintypes.DWORD),
        ("SectorsPerTrack", wintypes.DWORD),
        ("BytesPerSector", wintypes.DWORD)
    ]

# GET_LENGTH_INFORMATION 구조체
class GET_LENGTH_INFORMATION(ctypes.Structure):
    _fields_ = [("Length", wintypes.LARGE_INTEGER)]

# MBR 파티션 항목 구조체
class PARTITION_ENTRY(ctypes.Structure):
    _fields_ = [
        ("boot_flag", ctypes.c_ubyte),
        ("start_chs", ctypes.c_ubyte * 3),
        ("partition_type", ctypes.c_ubyte),
        ("end_chs", ctypes.c_ubyte * 3),
        ("start_sector", ctypes.c_uint32),
        ("num_sectors", ctypes.c_uint32)
    ]

# MBR 구조체
class MBR(ctypes.Structure):
    _fields_ = [
        ("boot_code", ctypes.c_ubyte * 446),
        ("partitions", PARTITION_ENTRY * 4),
        ("signature", ctypes.c_ushort)
    ]

    _pack_ = 1  # 패딩을 사용하지 않도록 설정

# GPT 헤더 구조체
class GPT_HEADER(ctypes.Structure):
    _fields_ = [
        ("signature", ctypes.c_char * 8),
        ("revision", ctypes.c_uint32),
        ("header_size", ctypes.c_uint32),
        ("header_crc32", ctypes.c_uint32),
        ("reserved", ctypes.c_uint32),
        ("current_lba", ctypes.c_uint64),
        ("backup_lba", ctypes.c_uint64),
        ("first_usable_lba", ctypes.c_uint64),
        ("last_usable_lba", ctypes.c_uint64),
        ("disk_guid", ctypes.c_byte * 16),
        ("partition_entry_lba", ctypes.c_uint64),
        ("num_partition_entries", ctypes.c_uint32),
        ("sizeof_partition_entry", ctypes.c_uint32),
        ("partition_entry_array_crc32", ctypes.c_uint32)
    ]

# GPT 파티션 엔트리 구조체
class GPT_PARTITION_ENTRY(ctypes.Structure):
    _fields_ = [
        ("partition_type_guid", ctypes.c_byte * 16),
        ("unique_partition_guid", ctypes.c_byte * 16),
        ("starting_lba", ctypes.c_uint64),
        ("ending_lba", ctypes.c_uint64),
        ("attributes", ctypes.c_uint64),
        ("partition_name", ctypes.c_wchar * 36)
    ]

# OVERLAPPED 구조체 정의
class OVERLAPPED(ctypes.Structure):
    _fields_ = [
        ("Internal", ctypes.c_ulong),
        ("InternalHigh", ctypes.c_ulong),
        ("Offset", ctypes.c_uint32),
        ("OffsetHigh", ctypes.c_uint32),
        ("hEvent", wintypes.HANDLE)
    ]

# Kernel32 DLL 로드
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

# CreateFile 함수 정의
def create_file(filename, access, mode, creation, flags):
    handle = kernel32.CreateFileW(
        ctypes.c_wchar_p(filename),
        ctypes.c_ulong(access),
        ctypes.c_ulong(mode),
        None,
        ctypes.c_ulong(creation),
        ctypes.c_ulong(flags),
        None
    )
    if handle == wintypes.HANDLE(-1).value:
        error_code = ctypes.get_last_error()
        logging.error(f"Failed to open {filename}: {ctypes.WinError(error_code)}")
        raise ctypes.WinError(error_code)
    logging.info(f"Successfully opened {filename} with handle {handle}")
    return handle

# DeviceIoControl 함수 정의
def device_io_control(device, io_control_code, in_buffer, out_buffer):
    bytes_returned = wintypes.DWORD(0)
    status = kernel32.DeviceIoControl(
        wintypes.HANDLE(device),
        wintypes.DWORD(io_control_code),
        None,
        0,
        ctypes.byref(out_buffer),
        wintypes.DWORD(ctypes.sizeof(out_buffer)),
        ctypes.byref(bytes_returned),
        None
    )
    return status

# ReadFile 함수 정의
def read_file(handle, num_bytes, offset=0):
    logging.info(f"Reading {num_bytes} bytes from handle {handle} at offset {offset}")
    if handle == wintypes.HANDLE(-1).value:
        logging.error(f"Invalid handle: {handle}")
        raise ValueError("Invalid handle")
    
    data = ctypes.create_string_buffer(num_bytes)
    bytes_read = wintypes.DWORD(0)
    overlapped = OVERLAPPED()
    overlapped.Offset = offset & 0xFFFFFFFF
    overlapped.OffsetHigh = offset >> 32

    success = kernel32.ReadFile(
        wintypes.HANDLE(handle),
        data,
        num_bytes,
        ctypes.byref(bytes_read),
        None  # 동기적 호출에서는 overlapped 매개변수를 None으로 설정
    )
    logging.info(f"Success: {success}")
    if not success:
        error_code = ctypes.get_last_error()
        logging.error(f"Failed to read file: {ctypes.WinError(error_code)}")
        raise ctypes.WinError(error_code)
    logging.info(f"Successfully read {num_bytes} bytes from file")
    return data.raw

def analyze_partition(disk_number):
    drive_path = f"\\\\.\\PhysicalDrive{disk_number}"
    try:
        h_drive = create_file(drive_path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, OPEN_EXISTING, 0)
    except Exception as e:
        logging.error(f"Failed to open handle for {drive_path}: {e}")
        return []

    try:
        # MBR을 읽음
        logging.info(f"Reading MBR from {drive_path}")
        mbr_data = read_file(h_drive, 512)
        mbr = MBR.from_buffer_copy(mbr_data)
        
        # MBR signature 확인
        if mbr.signature != 0xAA55:
            raise ValueError("Invalid MBR signature")

        # GPT 확인
        logging.info(f"Reading GPT header from {drive_path}")
        gpt_header_data = read_file(h_drive, 512, 512)
        gpt_header = GPT_HEADER.from_buffer_copy(gpt_header_data)
        
        if gpt_header.signature != b'EFI PART':
            # MBR 파티션 분석
            partitions = []
            for partition in mbr.partitions:
                if partition.partition_type != 0:
                    start_sector = partition.start_sector
                    num_sectors = partition.num_sectors
                    end_sector = start_sector + num_sectors - 1
                    size = num_sectors * 512
                    size_gb = size / (1024 ** 3)
                    size_display = f"{size_gb:.2f} GB" if size_gb >= 1 else f"{size / (1024 ** 2):.2f} MB"
                    partitions.append({
                        "Start Sector": start_sector,
                        "End Sector": end_sector,
                        "Total Sectors": num_sectors,
                        "Size": size_display
                    })
            return partitions

        # GPT 파티션 분석
        partitions = []
        partition_entries_data = read_file(h_drive, gpt_header.num_partition_entries * gpt_header.sizeof_partition_entry, gpt_header.partition_entry_lba * 512)
        for i in range(gpt_header.num_partition_entries):
            offset = i * gpt_header.sizeof_partition_entry
            partition_entry = GPT_PARTITION_ENTRY.from_buffer_copy(partition_entries_data[offset:offset + gpt_header.sizeof_partition_entry])
            if partition_entry.starting_lba != 0:
                start_sector = partition_entry.starting_lba
                end_sector = partition_entry.ending_lba
                num_sectors = end_sector - start_sector + 1
                size = num_sectors * 512
                size_gb = size / (1024 ** 3)
                size_display = f"{size_gb:.2f} GB" if size_gb >= 1 else f"{size / (1024 ** 2):.2f} MB"
                partitions.append({
                    "Start Sector": start_sector,
                    "End Sector": end_sector,
                    "Total Sectors": num_sectors,
                    "Size": size_display
                })

        return partitions

    finally:
        kernel32.CloseHandle(wintypes.HANDLE(h_drive))

def is_admin():
    try:
        return ctypes.windll.shell32.IsUserAnAdmin()
    except:
        return False

# 물리 드라이브 정보를 가져오는 함수 (PowerShell 사용)
def get_physical_drives():
    logging.info("Fetching physical drives using PowerShell")
    try:
        # PowerShell 명령어로 디스크 정보 가져오기
        powershell_cmd = [
            "powershell", "-Command",
            "Get-CimInstance -ClassName Win32_DiskDrive | Select-Object DeviceID, Model, Size | ConvertTo-Json"
        ]
        
        result = subprocess.check_output(powershell_cmd, shell=True, text=True, encoding='utf-8')
        logging.debug(f"Raw PowerShell output: {result}")
        
        # JSON 파싱
        drives_data = json.loads(result)
        
        # 단일 드라이브인 경우 리스트로 변환
        if isinstance(drives_data, dict):
            drives_data = [drives_data]
        
        drives_info = []
        for drive_data in drives_data:
            device_id = drive_data.get('DeviceID', 'Unknown')
            model = drive_data.get('Model', 'Unknown Model')
            size = drive_data.get('Size')
            
            if size and size != "null":
                try:
                    size_bytes = int(size)
                    size_gb = size_bytes / (1024 ** 3)
                    if size_gb < 1:
                        size_mb = size_gb * 1024
                        display_size = f"{size_mb:.2f} MB"
                    else:
                        display_size = f"{size_gb:.2f} GB"
                except (ValueError, TypeError):
                    display_size = "Unknown"
            else:
                display_size = "Unknown"
            
            drives_info.append((device_id, model, display_size))
            logging.debug(f"Drive: {device_id}, Model: {model}, Size: {display_size}")
        
        return drives_info
        
    except subprocess.CalledProcessError as e:
        logging.error(f"PowerShell command failed: {e}")
        # PowerShell이 실패한 경우 WMI 직접 사용 시도
        return get_physical_drives_fallback()
    except json.JSONDecodeError as e:
        logging.error(f"Failed to parse JSON: {e}")
        return get_physical_drives_fallback()
    except Exception as e:
        logging.error(f"Unexpected error: {e}")
        return get_physical_drives_fallback()

# 대체 방법: WMI 직접 사용
def get_physical_drives_fallback():
    logging.info("Using fallback method with WMI")
    try:
        import wmi
        c = wmi.WMI()
        drives_info = []
        
        for disk in c.Win32_DiskDrive():
            device_id = disk.DeviceID
            model = disk.Model or "Unknown Model"
            size = disk.Size
            
            if size:
                try:
                    size_bytes = int(size)
                    size_gb = size_bytes / (1024 ** 3)
                    if size_gb < 1:
                        size_mb = size_gb * 1024
                        display_size = f"{size_mb:.2f} MB"
                    else:
                        display_size = f"{size_gb:.2f} GB"
                except (ValueError, TypeError):
                    display_size = "Unknown"
            else:
                display_size = "Unknown"
            
            drives_info.append((device_id, model, display_size))
            logging.debug(f"Drive: {device_id}, Model: {model}, Size: {display_size}")
        
        return drives_info
        
    except ImportError:
        logging.error("WMI module not available. Please install with: pip install WMI")
        messagebox.showerror("오류", "WMI 모듈이 필요합니다. 'pip install WMI' 명령으로 설치하세요.")
        return []
    except Exception as e:
        logging.error(f"WMI fallback failed: {e}")
        messagebox.showerror("오류", f"드라이브 정보를 가져오는 중 오류가 발생했습니다: {str(e)}")
        return []

# 마지막 섹터 번호를 계산하는 함수
def get_last_sector_number(disk_number):
    drive_path = f"\\\\.\\PhysicalDrive{disk_number}"
    try:
        h_drive = create_file(drive_path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, OPEN_EXISTING, 0)
    except Exception as e:
        logging.error(f"Failed to open handle for {drive_path}: {e}")
        return -1

    try:
        length_info = GET_LENGTH_INFORMATION()
        if not device_io_control(h_drive, IOCTL_DISK_GET_LENGTH_INFO, None, length_info):
            raise ctypes.WinError(ctypes.get_last_error())

        disk_geometry = DISK_GEOMETRY()
        if not device_io_control(h_drive, IOCTL_DISK_GET_DRIVE_GEOMETRY, None, disk_geometry):
            raise ctypes.WinError(ctypes.get_last_error())

        total_length = length_info.Length
        bytes_per_sector = disk_geometry.BytesPerSector
        last_sector_number = (total_length // bytes_per_sector) - 1

        return last_sector_number

    finally:
        kernel32.CloseHandle(wintypes.HANDLE(h_drive))

class DiskAnalyzerApp(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Disk Analyzer")
        self.geometry("800x600")
        self.create_widgets()

    def create_widgets(self):
        # 상단 프레임
        top_frame = tk.Frame(self)
        top_frame.pack(fill=tk.X, padx=10, pady=5)
        
        tk.Label(top_frame, text="물리 드라이브 목록:").pack(anchor=tk.W)
        
        # 드라이브 목록
        self.drive_listbox = tk.Listbox(self, height=8)
        self.drive_listbox.pack(fill=tk.X, padx=10, pady=5)

        # 버튼 프레임
        button_frame = tk.Frame(self)
        button_frame.pack(fill=tk.X, padx=10, pady=5)
        
        self.refresh_button = tk.Button(button_frame, text="새로고침", command=self.load_drives)
        self.refresh_button.pack(side=tk.LEFT, padx=(0, 5))
        
        self.analyze_button = tk.Button(button_frame, text="디스크 분석", command=self.analyze_selected_disk)
        self.analyze_button.pack(side=tk.LEFT)

        # 결과 표시 영역
        result_frame = tk.Frame(self)
        result_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
        
        tk.Label(result_frame, text="파티션 정보:").pack(anchor=tk.W)
        
        # 스크롤바가 있는 텍스트 위젯
        text_frame = tk.Frame(result_frame)
        text_frame.pack(fill=tk.BOTH, expand=True)
        
        self.partition_info_text = tk.Text(text_frame, wrap=tk.WORD)
        scrollbar = tk.Scrollbar(text_frame, orient=tk.VERTICAL, command=self.partition_info_text.yview)
        self.partition_info_text.configure(yscrollcommand=scrollbar.set)
        
        self.partition_info_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
        scrollbar.pack(side=tk.RIGHT, fill=tk.Y)

        self.load_drives()

    def load_drives(self):
        self.drive_listbox.delete(0, tk.END)
        self.partition_info_text.delete(1.0, tk.END)
        
        try:
            drives = get_physical_drives()
            if not drives:
                self.drive_listbox.insert(tk.END, "드라이브를 찾을 수 없습니다.")
                return
                
            for drive in drives:
                display_text = f"{drive[0]} - {drive[1]} - {drive[2]}"
                self.drive_listbox.insert(tk.END, display_text)
        except Exception as e:
            logging.error(f"Error loading drives: {e}")
            self.drive_listbox.insert(tk.END, f"오류: {str(e)}")

    def analyze_selected_disk(self):
        selection = self.drive_listbox.curselection()
        if not selection:
            messagebox.showwarning("경고", "분석할 디스크를 선택하세요.")
            return

        selected_text = self.drive_listbox.get(selection[0])
        if "드라이브를 찾을 수 없습니다" in selected_text or "오류:" in selected_text:
            messagebox.showwarning("경고", "유효한 드라이브를 선택하세요.")
            return

        try:
            # DeviceID에서 숫자 추출 (예: \\.\PHYSICALDRIVE0에서 0 추출)
            device_id = selected_text.split(" - ")[0]
            disk_number = int(device_id.split("PHYSICALDRIVE")[1])
            
            self.partition_info_text.delete(1.0, tk.END)
            self.partition_info_text.insert(tk.END, f"디스크 {disk_number} 분석 중...\n\n")
            self.update()  # UI 업데이트
            
            partitions = analyze_partition(disk_number)
            
            self.partition_info_text.delete(1.0, tk.END)
            if not partitions:
                self.partition_info_text.insert(tk.END, "파티션을 찾을 수 없거나 분석할 수 없습니다.\n")
                return
                
            self.partition_info_text.insert(tk.END, f"디스크 {disk_number} 파티션 정보:\n")
            self.partition_info_text.insert(tk.END, "=" * 50 + "\n\n")
            
            for i, partition in enumerate(partitions, 1):
                self.partition_info_text.insert(tk.END, f"파티션 {i}:\n")
                self.partition_info_text.insert(tk.END, f"  시작 섹터: {partition['Start Sector']:,}\n")
                self.partition_info_text.insert(tk.END, f"  끝 섹터: {partition['End Sector']:,}\n")
                self.partition_info_text.insert(tk.END, f"  총 섹터 수: {partition['Total Sectors']:,}\n")
                self.partition_info_text.insert(tk.END, f"  크기: {partition['Size']}\n")
                self.partition_info_text.insert(tk.END, "\n")
                
        except Exception as e:
            logging.error(f"Error analyzing disk: {e}")
            self.partition_info_text.delete(1.0, tk.END)
            self.partition_info_text.insert(tk.END, f"분석 중 오류가 발생했습니다: {str(e)}\n")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    
    if not is_admin():
        messagebox.showerror("오류", "이 프로그램을 실행하려면 관리자 권한이 필요합니다.")
    else:
        try:
            app = DiskAnalyzerApp()
            app.mainloop()
        except Exception as e:
            logging.error(f"Application error: {e}")
            messagebox.showerror("오류", f"애플리케이션 실행 중 오류가 발생했습니다: {str(e)}")

사용법

  1. cmd 창을 관리자 권한으로 열고 파이썬코드를 실행 해보도록 합시다. > cmd [파이썬 소스코드.py]

  2. 해당 물리매체를 선택 후 Analyze Disk 를 선택하면 시작 섹터, 마지막 섹터 번호, 전체 섹터 수 그리고 전체 용량이 표시됩니다. 우리가 GPT 로 파티션 나눠둔 영역 그리고 그냥 MBR 파티션 영역도 모두 확인이 가능합니다.

  3. FTK Imager에서 디스크를 인식 후 비교해 봅시다. 우리가 만든 도구에서는 섹터관련 정보만 있으나, 우리는 GUID 등의 정보도 어떤 값을 통해 만들었는지 알 수 있기 때문에 소스를 수정하면 다른 영역의 값도 충분히 추가하여 도구를 만들 수 있습니다.

3줄 요약

  1. 저장매체에서 파티션 별로 시작 섹터와 마지막 섹터번호, 파티션의 총 섹터 수를 가진다.

  2. 대부분 파티션의 상세한 정보를 이미 많이 알려져 있기 때문에 우리가 모든 영역을 일일히 직접 만들 수고는 많이 줄었습니다. 대신 물리적으로 0번 섹터부터 접근하는 경우는 거의 없기 때문에 소스에서 그 부분을 잘 하고 있는지, 어떤 방법으로 하고 있는지 잘 알아둘 필요가 있습니다.

  3. 물리이미징 수집에서 활용했던 소스도 잘 활용해보고, 이번에 파티션에 사용했던 소스도 추후에 잘 이용해보도록 합시다.

Last updated