Copy import ctypes
from ctypes import wintypes
import subprocess
import logging
from tkinter import messagebox
import tkinter as tk
from tkinter import filedialog, ttk
import threading
import os
import hashlib
import atexit
import sys
# Windows 상수 정의
GENERIC_READ = 0x80000000
FILE_SHARE_READ = 0x00000001
FILE_SHARE_WRITE = 0x00000002
OPEN_EXISTING = 3
IOCTL_DISK_GET_DRIVE_GEOMETRY = 0x00070000
IOCTL_DISK_GET_LENGTH_INFO = 0x0007405C
# DISK_GEOMETRY 구조체
class DISK_GEOMETRY(ctypes.Structure):
_fields_ = [
("Cylinders", wintypes.LARGE_INTEGER),
("MediaType", wintypes.DWORD),
("TracksPerCylinder", wintypes.DWORD),
("SectorsPerTrack", wintypes.DWORD),
("BytesPerSector", wintypes.DWORD)
]
# GET_LENGTH_INFORMATION 구조체
class GET_LENGTH_INFORMATION(ctypes.Structure):
_fields_ = [("Length", wintypes.LARGE_INTEGER)]
# Kernel32 DLL 로드
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
# CreateFile 함수 정의
def create_file(filename, access, mode, creation, flags):
return kernel32.CreateFileW(
ctypes.c_wchar_p(filename),
ctypes.c_ulong(access),
ctypes.c_ulong(mode),
None,
ctypes.c_ulong(creation),
ctypes.c_ulong(flags),
None
)
# DeviceIoControl 함수 정의
def device_io_control(device, io_control_code, in_buffer, out_buffer):
bytes_returned = wintypes.DWORD(0)
status = kernel32.DeviceIoControl(
ctypes.wintypes.HANDLE(device),
wintypes.DWORD(io_control_code),
None,
0,
ctypes.byref(out_buffer),
wintypes.DWORD(ctypes.sizeof(out_buffer)),
ctypes.byref(bytes_returned),
None
)
return status
# 마지막 섹터 번호를 계산하는 함수
def get_last_sector_number(disk_number):
drive_path = f"\\\\.\\PhysicalDrive{disk_number}"
h_drive = create_file(drive_path, GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, OPEN_EXISTING, 0)
if h_drive == ctypes.wintypes.HANDLE(-1).value:
raise ctypes.WinError(ctypes.get_last_error())
try:
length_info = GET_LENGTH_INFORMATION()
if not device_io_control(h_drive, IOCTL_DISK_GET_LENGTH_INFO, None, length_info):
raise ctypes.WinError(ctypes.get_last_error())
disk_geometry = DISK_GEOMETRY()
if not device_io_control(h_drive, IOCTL_DISK_GET_DRIVE_GEOMETRY, None, disk_geometry):
raise ctypes.WinError(ctypes.get_last_error())
total_length = length_info.Length
bytes_per_sector = disk_geometry.BytesPerSector
last_sector_number = (total_length // bytes_per_sector) - 1
return last_sector_number
finally:
kernel32.CloseHandle(ctypes.wintypes.HANDLE(h_drive))
def is_admin():
try:
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
# 물리 드라이브 정보를 가져오는 함수 (수정된 버전)
def get_physical_drives():
logging.info("Fetching physical drives")
try:
# PowerShell 명령어 사용 (Windows 10/11 호환)
powershell_cmd = "Get-WmiObject Win32_DiskDrive | Select-Object DeviceID, Model, Size | ConvertTo-Csv -NoTypeInformation"
result = subprocess.check_output(
["powershell", "-Command", powershell_cmd],
shell=True
).decode('cp949', errors='ignore').strip()
logging.debug(f"Raw PowerShell output: {result}")
lines = result.split('\n')[1:] # 첫 번째 줄은 헤더이므로 제외
drives_info = []
for line in lines:
if not line.strip():
continue
# CSV 형식 파싱 (따옴표 제거)
parts = [p.strip('"') for p in line.split(',')]
if len(parts) < 3:
continue
drive = parts[0]
model = parts[1]
size_str = parts[2]
try:
# 마지막 섹터 번호를 기반으로 크기 계산
disk_number = int(drive.split('PhysicalDrive')[-1])
last_sector_number = get_last_sector_number(disk_number)
bytes_per_sector = 512
total_size = (last_sector_number + 1) * bytes_per_sector
# Size 변환 및 표시 단위 결정
size_in_gb = total_size / (1024 ** 3)
if size_in_gb < 1:
size_in_mb = size_in_gb * 1024
display_size = f"{size_in_mb:.2f} MB"
else:
display_size = f"{size_in_gb:.2f} GB"
except Exception as e:
logging.error(f"Failed to get size for {drive}: {e}")
display_size = "Unknown"
drives_info.append((drive, model, display_size))
logging.debug(f"Drive: {drive}, Model: {model}, Size: {display_size}")
return drives_info
except subprocess.CalledProcessError as e:
logging.error("Failed to get physical drives using PowerShell: %s", str(e))
# 대체 방법: wmic 시도 (구형 Windows용)
try:
result = subprocess.check_output(
"wmic diskdrive get DeviceID, Model, Size",
shell=True
).decode('cp949').strip()
lines = result.split('\n')[1:]
drives_info = []
for line in lines:
parts = line.strip().split()
if len(parts) < 3:
continue
drive = parts[0]
model = ' '.join(parts[1:-1])
try:
disk_number = int(drive[-1])
last_sector_number = get_last_sector_number(disk_number)
bytes_per_sector = 512
total_size = (last_sector_number + 1) * bytes_per_sector
size_in_gb = total_size / (1024 ** 3)
if size_in_gb < 1:
size_in_mb = size_in_gb * 1024
display_size = f"{size_in_mb:.2f} MB"
else:
display_size = f"{size_in_gb:.2f} GB"
except Exception as e:
logging.error(f"Failed to get size for {drive}: {e}")
display_size = "Unknown"
drives_info.append((drive, model, display_size))
logging.debug(f"Drive: {drive}, Model: {model}, Size: {display_size}")
return drives_info
except Exception as wmic_error:
logging.error("Failed to get physical drives using wmic: %s", str(wmic_error))
messagebox.showerror("오류", "물리 드라이브를 조회하는 중 오류가 발생했습니다.")
return []
except Exception as e:
messagebox.showerror("오류", "예상치 못한 오류가 발생했습니다.")
logging.error("Unexpected error: %s", str(e))
return []
# 로깅 설정 (수정된 버전)
# 실행 파일의 디렉토리 경로 가져오기
if getattr(sys, 'frozen', False):
# PyInstaller로 만든 실행 파일인 경우
application_path = os.path.dirname(sys.executable)
else:
# 일반 Python 스크립트인 경우
application_path = os.path.dirname(os.path.abspath(__file__))
# 로그 파일 경로 설정
log_file_path = os.path.join(application_path, 'imaging_process.log')
logging.basicConfig(
filename=log_file_path,
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# GUI 애플리케이션 정의
class App:
def __init__(self, root):
self.root = root
self.root.title("물리 이미징 도구")
self.current_process = None # 현재 실행 중인 이미징 프로세스 객체를 저장하기 위한 변수
self.progress_var = tk.DoubleVar()
tk.Label(root, text="물리 드라이브 선택:").pack(padx=10, pady=5)
self.drive_list = tk.Listbox(root, width=60, height=10)
self.drive_list.pack(padx=10, pady=5)
# 드라이브 정보를 가져와서 리스트 박스에 표시합니다.
self.drives = get_physical_drives()
for idx, (drive, model, size) in enumerate(self.drives):
self.drive_list.insert(tk.END, f"{drive} - {model} - {size}")
tk.Label(root, text="저장할 이미지 파일 경로:").pack(padx=10, pady=5)
self.file_path = tk.Entry(root, width=50)
self.file_path.pack(padx=10, pady=5)
tk.Button(root, text="파일 선택", command=self.select_output_file).pack(padx=10, pady=5)
self.progress_bar = ttk.Progressbar(root, variable=self.progress_var, maximum=100, length=400)
self.progress_bar.pack(padx=10, pady=10)
self.status_label = tk.Label(root, text="물리이미지를 획득할 드라이브를 선택해주세요.\n(용량이 충분한지 확인!!)")
self.status_label.pack(padx=10, pady=5)
button_frame = tk.Frame(root)
button_frame.pack(padx=10, pady=10)
self.start_button = tk.Button(button_frame, text="이미징 시작", command=self.start_imaging)
self.start_button.pack(side=tk.LEFT, padx=5)
self.stop_button = tk.Button(button_frame, text="이미징 중지", command=self.stop_imaging)
self.stop_button.pack(side=tk.LEFT, padx=5)
self.quit_button = tk.Button(root, text="종료", command=root.quit)
self.quit_button.pack(padx=10, pady=10)
def select_output_file(self):
logging.info("Prompting user to select an output file")
file_path = filedialog.asksaveasfilename(filetypes=[("Image Files", "*.img"), ("All Files", "*.*")])
if file_path:
self.file_path.delete(0, tk.END)
self.file_path.insert(0, file_path)
logging.info(f"Output file selected: {file_path}")
def update_progress(self, progress):
self.progress_var.set(progress)
self.progress_bar['value'] = progress
def update_status_label(self, status):
self.status_label.config(text=status)
def start_imaging(self):
if self.current_process and self.current_process.is_alive():
messagebox.showinfo("알림", "이미지 작업이 이미 진행 중입니다.")
return
if not self.drive_list.curselection():
messagebox.showerror("오류", "물리 드라이브가 선택되지 않았습니다.")
logging.error("No physical drive was selected by the user")
return
selected_index = self.drive_list.curselection()[0]
selected_drive, _, display_size = self.drives[selected_index]
output_file = self.file_path.get()
if not output_file:
messagebox.showerror("오류", "출력 파일 경로가 지정되지 않았습니다.")
logging.error("No output file path was provided")
return
logging.info(f"Selected drive: {selected_drive}, Output file: {output_file}")
self.current_process = threading.Thread(target=self.create_image, args=(selected_drive, output_file), daemon=True)
self.current_process.start()
def stop_imaging(self):
if self.current_process and self.current_process.is_alive():
self.current_process = None
# 이미지 생성이 중지되면 이미지 파일과 로그 파일을 삭제하기 위해 atexit 모듈을 사용합니다.
output_file = self.file_path.get()
def delete_files():
if os.path.exists(output_file):
os.remove(output_file)
logging.info(f"Deleted image file: {output_file}")
# 프로그램이 종료될 때 파일 삭제 함수를 실행하도록 예약합니다.
atexit.register(delete_files)
else:
messagebox.showinfo("알림", "진행 중인 이미지 작업이 없습니다.")
def create_image(self, source_drive, output_file):
logging.info(f"Starting imaging process for {source_drive} to {output_file}")
h_drive = None
try:
# Windows 물리 드라이브를 올바른 경로로 열기
drive_name = source_drive.split('\\')[-1]
drive_path = f"\\\\.\\{drive_name}"
logging.info(f"Opening drive: {drive_path}")
# CreateFile을 사용하여 물리 드라이브 열기
h_drive = create_file(
drive_path,
GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE,
OPEN_EXISTING,
0
)
if h_drive == ctypes.wintypes.HANDLE(-1).value:
raise ctypes.WinError(ctypes.get_last_error())
logging.info("Opened the physical drive successfully")
with open(output_file, 'wb') as img:
logging.info(f"Attempting to write to the output file: {output_file}")
total_bytes = 0
try:
total_size = self.get_drive_size(source_drive)
except Exception as e:
logging.error(f"Error getting total size: {e}")
total_size = 0
buffer_size = 1024 * 1024 # 1MB 버퍼
buffer = ctypes.create_string_buffer(buffer_size)
bytes_read = wintypes.DWORD(0)
while total_bytes < total_size or total_size == 0:
if not self.current_process:
logging.info("Imaging process was stopped by the user")
self.update_status_label("이미지 작업이 중지되었습니다.\n(진행중이던 파일은 프로그램 종료 시 삭제됩니다. 먼저 수동 삭제 하셔도 됩니다.)")
return
# ReadFile을 사용하여 데이터 읽기
result = kernel32.ReadFile(
ctypes.wintypes.HANDLE(h_drive),
buffer,
buffer_size,
ctypes.byref(bytes_read),
None
)
if not result or bytes_read.value == 0:
logging.info("No more data to read from source drive, end of file reached.")
break
# 읽은 데이터를 파일에 쓰기
img.write(buffer.raw[:bytes_read.value])
total_bytes += bytes_read.value
if total_size > 0:
progress = (total_bytes / total_size) * 100
self.root.after(0, self.update_progress, progress)
self.root.after(0, self.update_status_label,
f"이미지 생성중: {total_bytes} / {total_size} bytes\n({progress:.3f}%)")
else:
self.root.after(0, self.update_status_label,
f"이미지 생성중: {total_bytes} bytes")
self.root.update_idletasks()
logging.info(f"Data written to the image file successfully: {total_bytes} bytes")
# MD5 해시 계산
try:
with open(output_file, 'rb') as img_file:
md5_hash = hashlib.md5()
while True:
chunk = img_file.read(8192)
if not chunk:
break
md5_hash.update(chunk)
md5_result = md5_hash.hexdigest()
with open(f"{output_file}.log", "a") as log_file:
log_file.write(f"Image file: {output_file}\nMD5: {md5_result}\n")
logging.info("Image file name and MD5 hash logged successfully.")
except Exception as e:
logging.error(f"Error logging image file name and MD5 hash: {e}")
md5_result = "Error calculating MD5"
self.update_status_label(f"이미지가 생성되었습니다.\n이미지 파일명: {output_file}\nMD5: {md5_result}\n추가로 이미지 획득 가능합니다.")
messagebox.showinfo("완료", "이미징이 완료되었습니다.")
logging.info("Imaging completed successfully and user informed")
except PermissionError as e:
logging.error(f"PermissionError during read/write operation: {e}")
messagebox.showerror("권한 오류", f"드라이브에 접근할 권한이 없습니다: {e}")
except Exception as e:
logging.error("Error during imaging: %s", str(e))
messagebox.showerror("오류", f"이미징 중 오류가 발생했습니다: {str(e)}")
finally:
if h_drive and h_drive != ctypes.wintypes.HANDLE(-1).value:
kernel32.CloseHandle(ctypes.wintypes.HANDLE(h_drive))
logging.info("Closed the file descriptor for the drive")
def get_drive_size(self, source_drive):
for drive, model, display_size in self.drives:
if drive == source_drive:
size_str = display_size.split()[0]
unit = display_size.split()[1]
size_num = float(size_str)
if unit == "GB":
return size_num * (1024 ** 3)
elif unit == "MB":
return size_num * (1024 ** 2)
return 0
if __name__ == "__main__":
if not is_admin():
messagebox.showerror("권한 오류", "관리자 권한으로 실행해 주세요.")
else:
root = tk.Tk()
app = App(root)
root.mainloop()