399 lines
16 KiB
Python
399 lines
16 KiB
Python
import os
|
|
import tkinter as tk
|
|
from tkinter import filedialog, messagebox, ttk
|
|
import threading
|
|
from datetime import datetime
|
|
from PIL import Image
|
|
from PIL.ExifTags import TAGS
|
|
import piexif
|
|
from pathlib import Path
|
|
import shutil
|
|
from colorama import init, Fore, Style
|
|
|
|
# 初始化colorama
|
|
init(autoreset=True)
|
|
|
|
|
|
class PhotoMetadataRepair:
|
|
def __init__(self, source_dir, output_dir=None, backup=True, verbose=False, progress_callback=None):
|
|
self.source_dir = os.path.abspath(source_dir)
|
|
self.output_dir = os.path.abspath(output_dir) if output_dir else None
|
|
self.backup = backup
|
|
self.verbose = verbose
|
|
self.progress_callback = progress_callback
|
|
self.supported_formats = ('.jpg', '.jpeg', '.png', '.tiff')
|
|
self.repaired_count = 0
|
|
self.skipped_count = 0
|
|
self.failed_count = 0
|
|
self.total_files = 0
|
|
self.processed_files = 0
|
|
|
|
# 创建输出目录(如果需要)
|
|
if self.output_dir and not os.path.exists(self.output_dir):
|
|
os.makedirs(self.output_dir)
|
|
|
|
def log(self, message, level='info'):
|
|
"""根据日志级别打印带颜色的消息"""
|
|
if not self.verbose and level == 'debug':
|
|
return
|
|
|
|
prefix = ''
|
|
if level == 'info':
|
|
prefix = f"{Fore.GREEN}[INFO] "
|
|
elif level == 'warning':
|
|
prefix = f"{Fore.YELLOW}[WARN] "
|
|
elif level == 'error':
|
|
prefix = f"{Fore.RED}[ERROR] "
|
|
elif level == 'debug':
|
|
prefix = f"{Fore.BLUE}[DEBUG] "
|
|
|
|
if self.progress_callback:
|
|
self.progress_callback(f"{prefix}{message}", level)
|
|
else:
|
|
print(f"{prefix}{message}")
|
|
|
|
def repair_metadata(self):
|
|
"""修复目录中所有照片的元数据"""
|
|
self.log(f"开始修复照片元数据,源目录: {self.source_dir}")
|
|
|
|
# 计算总文件数
|
|
self.total_files = 0
|
|
for root, _, files in os.walk(self.source_dir):
|
|
for filename in files:
|
|
if filename.lower().endswith(self.supported_formats):
|
|
self.total_files += 1
|
|
|
|
self.log(f"找到 {self.total_files} 个支持的照片文件", 'info')
|
|
|
|
for root, _, files in os.walk(self.source_dir):
|
|
for filename in files:
|
|
if filename.lower().endswith(self.supported_formats):
|
|
file_path = os.path.join(root, filename)
|
|
self._process_file(file_path)
|
|
self.processed_files += 1
|
|
progress = (self.processed_files / self.total_files) * 100
|
|
if self.progress_callback:
|
|
self.progress_callback(progress, "progress")
|
|
|
|
self.log(f"处理完成!修复: {self.repaired_count}, 跳过: {self.skipped_count}, 失败: {self.failed_count}")
|
|
if self.progress_callback:
|
|
self.progress_callback("处理完成!", "complete")
|
|
|
|
def _process_file(self, file_path):
|
|
"""处理单个文件"""
|
|
try:
|
|
# 获取相对路径(用于输出目录结构)
|
|
rel_path = os.path.relpath(file_path, self.source_dir)
|
|
|
|
# 确定输出路径
|
|
if self.output_dir:
|
|
output_path = os.path.join(self.output_dir, rel_path)
|
|
output_dir = os.path.dirname(output_path)
|
|
|
|
# 创建输出目录(如果不存在)
|
|
if not os.path.exists(output_dir):
|
|
os.makedirs(output_dir)
|
|
else:
|
|
output_path = file_path
|
|
# 如果不使用输出目录,先备份文件
|
|
if self.backup:
|
|
backup_path = f"{file_path}.bak"
|
|
shutil.copy2(file_path, backup_path)
|
|
self.log(f"已备份原始文件: {backup_path}", 'debug')
|
|
|
|
# 复制文件到输出位置(如果需要)
|
|
if output_path != file_path:
|
|
shutil.copy2(file_path, output_path)
|
|
self.log(f"复制文件到: {output_path}", 'debug')
|
|
|
|
# 修复元数据
|
|
self._fix_metadata(output_path)
|
|
self.repaired_count += 1
|
|
|
|
except Exception as e:
|
|
self.failed_count += 1
|
|
self.log(f"处理文件失败: {file_path}, 错误: {str(e)}", 'error')
|
|
|
|
def _fix_metadata(self, file_path):
|
|
"""修复单个文件的元数据"""
|
|
try:
|
|
# 读取图像
|
|
img = Image.open(file_path)
|
|
|
|
# 获取当前文件修改时间
|
|
file_mtime = os.path.getmtime(file_path)
|
|
file_ctime = os.path.getctime(file_path)
|
|
|
|
# 尝试获取EXIF数据
|
|
try:
|
|
exif_dict = piexif.load(img.info["exif"])
|
|
has_exif = True
|
|
except (KeyError, ValueError, TypeError):
|
|
exif_dict = {"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "Interop": {}}
|
|
has_exif = False
|
|
|
|
# 如果没有拍摄日期,尝试从文件名或文件系统时间设置
|
|
if not self._has_taken_date(exif_dict):
|
|
# 尝试从文件名提取日期
|
|
date_from_filename = self._extract_date_from_filename(os.path.basename(file_path))
|
|
if date_from_filename:
|
|
self._set_date_taken(exif_dict, date_from_filename)
|
|
self.log(f"从文件名提取日期: {date_from_filename}", 'debug')
|
|
else:
|
|
# 使用文件修改时间
|
|
date_from_file = datetime.fromtimestamp(file_mtime).strftime("%Y:%m:%d %H:%M:%S")
|
|
self._set_date_taken(exif_dict, date_from_file)
|
|
self.log(f"从文件系统时间设置日期: {date_from_file}", 'debug')
|
|
|
|
# 如果没有相机型号信息
|
|
if not self._has_camera_info(exif_dict):
|
|
self._set_default_camera_info(exif_dict)
|
|
self.log("设置默认相机信息", 'debug')
|
|
|
|
# 保存修改后的EXIF数据
|
|
exif_bytes = piexif.dump(exif_dict)
|
|
img.save(file_path, exif=exif_bytes)
|
|
|
|
# 恢复文件原始时间戳
|
|
os.utime(file_path, (file_ctime, file_mtime))
|
|
|
|
if not has_exif:
|
|
self.log(f"为文件添加了元数据: {file_path}", 'info')
|
|
else:
|
|
self.log(f"修复了文件的元数据: {file_path}", 'info')
|
|
|
|
except Exception as e:
|
|
self.log(f"修复元数据失败: {file_path}, 错误: {str(e)}", 'error')
|
|
raise
|
|
|
|
def _has_taken_date(self, exif_dict):
|
|
"""检查是否有拍摄日期信息"""
|
|
return piexif.ExifIFD.DateTimeOriginal in exif_dict["Exif"]
|
|
|
|
def _extract_date_from_filename(self, filename):
|
|
"""尝试从文件名提取日期信息"""
|
|
# 常见的日期格式: YYYYMMDD, YYYY-MM-DD, YYYY_MM_DD等
|
|
import re
|
|
|
|
# 匹配模式: YYYYMMDD
|
|
match = re.search(r'(\d{4})(\d{2})(\d{2})', filename)
|
|
if match:
|
|
year, month, day = match.groups()
|
|
return f"{year}:{month}:{day} 00:00:00"
|
|
|
|
# 匹配模式: YYYY-MM-DD
|
|
match = re.search(r'(\d{4})[-_](\d{2})[-_](\d{2})', filename)
|
|
if match:
|
|
year, month, day = match.groups()
|
|
return f"{year}:{month}:{day} 00:00:00"
|
|
|
|
return None
|
|
|
|
def _set_date_taken(self, exif_dict, date_str):
|
|
"""设置拍摄日期"""
|
|
exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = date_str
|
|
exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = date_str
|
|
exif_dict["0th"][piexif.ImageIFD.DateTime] = date_str
|
|
|
|
def _has_camera_info(self, exif_dict):
|
|
"""检查是否有相机信息"""
|
|
return (piexif.ImageIFD.Make in exif_dict["0th"] and
|
|
piexif.ImageIFD.Model in exif_dict["0th"])
|
|
|
|
def _set_default_camera_info(self, exif_dict):
|
|
"""设置默认相机信息"""
|
|
exif_dict["0th"][piexif.ImageIFD.Make] = "Unknown Device"
|
|
exif_dict["0th"][piexif.ImageIFD.Model] = "Metadata Repair Tool"
|
|
|
|
|
|
class MetadataRepairGUI:
|
|
def __init__(self, root):
|
|
self.root = root
|
|
self.root.title("照片元数据修复工具")
|
|
self.root.geometry("800x600")
|
|
self.root.resizable(True, True)
|
|
|
|
# 设置字体,确保中文正常显示
|
|
self.style = ttk.Style()
|
|
self.style.configure("TLabel", font=("SimHei", 10))
|
|
self.style.configure("TButton", font=("SimHei", 10))
|
|
self.style.configure("TCheckbutton", font=("SimHei", 10))
|
|
|
|
# 创建主框架
|
|
self.main_frame = ttk.Frame(root, padding="10")
|
|
self.main_frame.pack(fill=tk.BOTH, expand=True)
|
|
|
|
# 源目录选择
|
|
ttk.Label(self.main_frame, text="源目录:").grid(row=0, column=0, sticky=tk.W, pady=5)
|
|
self.source_var = tk.StringVar()
|
|
ttk.Entry(self.main_frame, textvariable=self.source_var, width=60).grid(row=0, column=1, pady=5)
|
|
ttk.Button(self.main_frame, text="浏览...", command=self.browse_source).grid(row=0, column=2, padx=5, pady=5)
|
|
|
|
# 输出目录选择
|
|
ttk.Label(self.main_frame, text="输出目录:").grid(row=1, column=0, sticky=tk.W, pady=5)
|
|
self.output_var = tk.StringVar()
|
|
ttk.Entry(self.main_frame, textvariable=self.output_var, width=60).grid(row=1, column=1, pady=5)
|
|
ttk.Button(self.main_frame, text="浏览...", command=self.browse_output).grid(row=1, column=2, padx=5, pady=5)
|
|
self.output_check = tk.BooleanVar(value=True)
|
|
ttk.Checkbutton(self.main_frame, text="直接修改源文件(不使用输出目录)", variable=self.output_check,
|
|
command=self.toggle_output).grid(row=2, column=1, sticky=tk.W, pady=5)
|
|
|
|
# 备份选项
|
|
self.backup_check = tk.BooleanVar(value=True)
|
|
ttk.Checkbutton(self.main_frame, text="创建备份文件", variable=self.backup_check).grid(row=3, column=1,
|
|
sticky=tk.W, pady=5)
|
|
|
|
# 详细日志选项
|
|
self.verbose_check = tk.BooleanVar(value=True)
|
|
ttk.Checkbutton(self.main_frame, text="显示详细日志", variable=self.verbose_check).grid(row=4, column=1,
|
|
sticky=tk.W, pady=5)
|
|
|
|
# 处理按钮
|
|
self.process_btn = ttk.Button(self.main_frame, text="开始修复", command=self.start_repair)
|
|
self.process_btn.grid(row=5, column=1, pady=10)
|
|
|
|
# 进度条
|
|
ttk.Label(self.main_frame, text="进度:").grid(row=6, column=0, sticky=tk.W, pady=5)
|
|
self.progress_var = tk.DoubleVar()
|
|
self.progress_bar = ttk.Progressbar(self.main_frame, variable=self.progress_var, length=500)
|
|
self.progress_bar.grid(row=6, column=1, pady=5)
|
|
|
|
# 日志区域
|
|
ttk.Label(self.main_frame, text="日志:").grid(row=7, column=0, sticky=tk.NW, pady=5)
|
|
self.log_text = tk.Text(self.main_frame, height=20, width=70, wrap=tk.WORD)
|
|
self.log_text.grid(row=7, column=1, pady=5, sticky=tk.NSEW)
|
|
|
|
# 添加滚动条
|
|
scrollbar = ttk.Scrollbar(self.main_frame, command=self.log_text.yview)
|
|
scrollbar.grid(row=7, column=2, sticky=tk.NS)
|
|
self.log_text.config(yscrollcommand=scrollbar.set)
|
|
|
|
# 配置网格权重,使日志区域可扩展
|
|
self.main_frame.grid_rowconfigure(7, weight=1)
|
|
self.main_frame.grid_columnconfigure(1, weight=1)
|
|
|
|
# 状态变量
|
|
self.is_running = False
|
|
|
|
def browse_source(self):
|
|
"""浏览并选择源目录"""
|
|
directory = filedialog.askdirectory(title="选择源目录")
|
|
if directory:
|
|
self.source_var.set(directory)
|
|
|
|
def browse_output(self):
|
|
"""浏览并选择输出目录"""
|
|
directory = filedialog.askdirectory(title="选择输出目录")
|
|
if directory:
|
|
self.output_var.set(directory)
|
|
|
|
def toggle_output(self):
|
|
"""切换是否使用输出目录"""
|
|
if self.output_check.get():
|
|
self.output_var.set("")
|
|
self.output_var.set("")
|
|
|
|
def start_repair(self):
|
|
"""开始修复元数据"""
|
|
source_dir = self.source_var.get()
|
|
output_dir = self.output_var.get() if not self.output_check.get() else None
|
|
backup = self.backup_check.get()
|
|
verbose = self.verbose_check.get()
|
|
|
|
# 验证输入
|
|
if not source_dir:
|
|
messagebox.showerror("错误", "请选择源目录")
|
|
return
|
|
|
|
if not os.path.isdir(source_dir):
|
|
messagebox.showerror("错误", "源目录不存在")
|
|
return
|
|
|
|
if output_dir and not os.path.isdir(output_dir):
|
|
try:
|
|
os.makedirs(output_dir)
|
|
except:
|
|
messagebox.showerror("错误", "无法创建输出目录")
|
|
return
|
|
|
|
# 清空日志
|
|
self.log_text.delete(1.0, tk.END)
|
|
|
|
# 更新状态
|
|
self.is_running = True
|
|
self.process_btn.config(text="正在修复...", state=tk.DISABLED)
|
|
|
|
# 在单独的线程中运行修复过程
|
|
repair_thread = threading.Thread(target=self.run_repair, args=(source_dir, output_dir, backup, verbose))
|
|
repair_thread.daemon = True
|
|
repair_thread.start()
|
|
|
|
def run_repair(self, source_dir, output_dir, backup, verbose):
|
|
"""运行修复过程"""
|
|
try:
|
|
# 创建修复工具实例
|
|
repair_tool = PhotoMetadataRepair(
|
|
source_dir=source_dir,
|
|
output_dir=output_dir,
|
|
backup=backup,
|
|
verbose=verbose,
|
|
progress_callback=self.update_progress
|
|
)
|
|
|
|
# 运行修复
|
|
repair_tool.repair_metadata()
|
|
|
|
except Exception as e:
|
|
self.update_progress(f"修复过程中发生错误: {str(e)}", "error")
|
|
finally:
|
|
# 更新状态
|
|
self.root.after(0, self.repair_complete)
|
|
|
|
def update_progress(self, message, level):
|
|
"""更新进度和日志"""
|
|
if level == "progress":
|
|
# 更新进度条
|
|
self.root.after(0, lambda: self.progress_var.set(message))
|
|
elif level == "complete":
|
|
# 更新进度条到100%
|
|
self.root.after(0, lambda: self.progress_var.set(100))
|
|
else:
|
|
# 添加日志消息
|
|
self.root.after(0, lambda: self.add_log(message, level))
|
|
|
|
def add_log(self, message, level):
|
|
"""添加日志消息到日志区域"""
|
|
# 设置文本颜色
|
|
if level == "info":
|
|
self.log_text.insert(tk.END, message + "\n", "info")
|
|
elif level == "warning":
|
|
self.log_text.insert(tk.END, message + "\n", "warning")
|
|
elif level == "error":
|
|
self.log_text.insert(tk.END, message + "\n", "error")
|
|
else:
|
|
self.log_text.insert(tk.END, message + "\n", "normal")
|
|
|
|
# 设置标签颜色
|
|
self.log_text.tag_config("info", foreground="green")
|
|
self.log_text.tag_config("warning", foreground="orange")
|
|
self.log_text.tag_config("error", foreground="red")
|
|
self.log_text.tag_config("normal", foreground="black")
|
|
|
|
# 自动滚动到底部
|
|
self.log_text.see(tk.END)
|
|
|
|
def repair_complete(self):
|
|
"""修复完成后的处理"""
|
|
self.is_running = False
|
|
self.process_btn.config(text="开始修复", state=tk.NORMAL)
|
|
messagebox.showinfo("完成", "元数据修复已完成!")
|
|
|
|
|
|
def main():
|
|
root = tk.Tk()
|
|
app = MetadataRepairGUI(root)
|
|
root.mainloop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |