扫描并处理重重复文件importosimportshutilimporthashlibimportchardetimportpandasaspdfromdocximportDocumentfromcollectionsimportdefaultdict# 全局配置区按需修改 SCAN_FOLDERrD:\data\mydata\t_data\test# 需要扫描查重的根目录BACKUP_FOLDERrD:\data\mydata\t_data\_重复文件备份# 重复文件移动备份目录OUTPUT_REPORTrD:\测试文件目录_查重清单.xlsx# 输出查重清单ExcelLOG_PATHrD:\文件扫描日志.txt# 异常日志保存路径MOVE_DUPLICATETrue# 是否自动移动重复文件副本到备份文件夹MAX_FILE_SIZE_MB500# 大于500MB的超大文件跳过哈希计算仅记录SIMILAR_THRESHOLD0.85# 文本相似度阈值大于等于判定高度相似# 跳过扫描的隐藏/系统目录SKIP_DIR_KEYWORDS{$recycle.bin,system volume information,.git,__pycache__}# 不处理的快捷/链接后缀SKIP_SUFFIX{.lnk,.symlink}# 支持提取文本做相似比对的后缀TEXT_SUFFIX{.txt,.csv,.md,.docx,.xlsx,.xls}# 模块1工具基础函数 defget_file_md5(file_path:str,block_size65536)-str: 计算文件MD5哈希值分块读取适配大文件 :param file_path: 文件绝对路径 :param block_size: 单次读取缓冲区大小 :return: md5字符串读取失败返回空字符串 try:md5_objhashlib.md5()withopen(file_path,rb)asf:whilechunk:f.read(block_size):md5_obj.update(chunk)returnmd5_obj.hexdigest()exceptExceptionase:log_msgf【哈希计算失败】{file_path}| 错误{str(e)}write_log(log_msg)returndefget_file_size_mb(file_path:str)-float:获取文件大小MBbyte_sizeos.path.getsize(file_path)returnround(byte_size/1024/1024,3)defwrite_log(msg:str):全局日志写入追加模式withopen(LOG_PATH,a,encodingutf-8)asf:f.write(msg\n)print(msg)defsafe_move_file(src:str,dst_dir:str): 安全移动文件目标存在自动重命名避免冲突 :param src: 原文件路径 :param dst_dir: 目标文件夹 os.makedirs(dst_dir,exist_okTrue)file_nameos.path.basename(src)dst_pathos.path.join(dst_dir,file_name)# 文件存在则循环加后缀区分idx1whileos.path.exists(dst_path):name_no_ext,extos.path.splitext(file_name)dst_pathos.path.join(dst_dir,f{name_no_ext}_副本{idx}{ext})idx1try:shutil.move(src,dst_path)write_log(f【已移动重复文件】原路径:{src}新路径:{dst_path})exceptExceptionase:write_log(f【文件移动失败】{src}错误:{str(e)})# 模块2文本提取函数用于相似文件比对 defextract_file_text(file_path:str,suffix:str)-str: 提取txt/docx/xlsx/csv内全部文本用于相似度判断 :param file_path: 文件路径 :param suffix: 文件后缀小写 :return: 拼接后的纯文本读取失败返回空字符串 full_texttry:ifsuffix.txtorsuffix.csvorsuffix.md:# 自动识别编码读取文本withopen(file_path,rb)asf:raw_dataf.read()encode_infochardet.detect(raw_data)encodeencode_info.get(encoding,utf-8)full_textraw_data.decode(encode,errorsignore)elifsuffix.docx:docDocument(file_path)para_text[p.textforpindoc.paragraphsifp.text.strip()]full_text .join(para_text)elifsuffixin(.xlsx,.xls):dfpd.read_excel(file_path,sheet_nameNone)all_sheet_text[]forsheet_dataindf.values():text_linesheet_data.to_string()all_sheet_text.append(text_line)full_text .join(all_sheet_text)exceptExceptionase:write_log(f【文本提取失败】{file_path}错误:{str(e)})return# 清理空白字符压缩文本减少比对开销full_textfull_text.replace(\n,).replace( ,)returnfull_textdefcalc_text_similarity(text1:str,text2:str)-float:简易文本相似度字符重合度0~1set1set(text1)set2set(text2)ifnotset1andnotset2:return1.0interlen(set1set2)unionlen(set1|set2)returninter/unionifunion!0else0# 模块3目录扫描函数 defscan_all_files(root_dir:str)-list:遍历目录过滤无效文件返回全部有效文件信息列表file_info_list[]forroot,dirs,filesinos.walk(root_dir):# 过滤系统隐藏目录原地修改dirs阻止进入dirs[:][dfordindirsifd.lower()notinSKIP_DIR_KEYWORDS]forfile_nameinfiles:full_pathos.path.abspath(os.path.join(root,file_name))suffixos.path.splitext(file_name)[1].lower()# 跳过快捷方式/链接ifsuffixinSKIP_SUFFIX:continuetry:byte_sizeos.path.getsize(full_path)mb_sizeround(byte_size/1024/1024,3)modify_timeos.path.getmtime(full_path)file_info_list.append({file_path:full_path,file_name:file_name,suffix:suffix,size_mb:mb_size,size_byte:byte_size,modify_time:modify_time})exceptExceptionase:write_log(f【文件信息读取失败】{full_path}错误:{str(e)})write_log(f目录扫描完成共获取有效文件{len(file_info_list)}个)returnfile_info_list# 模块4查重核心分组逻辑 defgroup_duplicate_files(file_info_list:list): 核心查重分组1.按大小分组 2.同大小计算MD5分完全重复组 3.文本相似分组 :return: duplicate_groups 完全重复组列表; similar_groups 高度相似组列表 # 1. 先按文件字节大小分组大小不同直接不可能重复size_groupdefaultdict(list)forinfoinfile_info_list:size_group[info[size_byte]].append(info)# 2. MD5哈希分组 完全重复文件组hash_groupsdefaultdict(list)forsize_byte,info_listinsize_group.items():# 单个文件无重复跳过iflen(info_list)1:continueforinfoininfo_list:# 超大文件跳过哈希计算ifinfo[size_mb]MAX_FILE_SIZE_MB:continuemd5_valget_file_md5(info[file_path])ifmd5_val:info[md5]md5_val hash_groups[md5_val].append(info)# 过滤仅单个文件的组duplicate_groups[gforginhash_groups.values()iflen(g)2]# 3. 文本类文件相似度分组非完全重复但内容近似similar_groups[]# 提取所有支持文本比对、且不在完全重复组内的文件text_candidate[]all_dup_pathset()forgroupinduplicate_groups:foritemingroup:all_dup_path.add(item[file_path])forinfoinfile_info_list:ifinfo[suffix]inTEXT_SUFFIXandinfo[file_path]notinall_dup_path:info[text_content]extract_file_text(info[file_path],info[suffix])text_candidate.append(info)# 两两比对相似度used_idxset()total_textlen(text_candidate)foriinrange(total_text):ifiinused_idx:continuegroup_temp[text_candidate[i]]text_itext_candidate[i][text_content]forjinrange(i1,total_text):ifjinused_idx:continuetext_jtext_candidate[j][text_content]simcalc_text_similarity(text_i,text_j)ifsimSIMILAR_THRESHOLD:group_temp.append(text_candidate[j])used_idx.add(j)iflen(group_temp)2:similar_groups.append(group_temp)write_log(f查重完成完全重复文件组{len(duplicate_groups)}组高度相似文件组{len(similar_groups)}组)returnduplicate_groups,similar_groups# 模块5生成输出清单 移动重复文件 defgenerate_report_and_handle_dup(duplicate_groups,similar_groups): 1. 生成简易Excel查重清单 2. 可选移动重复副本至备份文件夹 report_rows[]group_id1# 写入完全重复组forgroupinduplicate_groups:base_filegroup[0]# 保留第一个原始文件dup_file_listgroup[1:]# 其余全部判定为重复副本base_pathbase_file[file_path]dup_paths; .join([f[file_path]forfindup_file_list])# 写入报表行row{分组ID:group_id,重复类型:完全重复(二进制一致),主文件(保留):base_path,重复副本文件:dup_paths,文件大小MB:base_file[size_mb],MD5哈希:base_file[md5],文件后缀:base_file[suffix]}report_rows.append(row)# 移动重复副本文件ifMOVE_DUPLICATE:fordup_infoindup_file_list:safe_move_file(dup_info[file_path],BACKUP_FOLDER)group_id1# 写入高度相似组不移动仅人工复核forgroupinsimilar_groups:all_path; .join([f[file_path]forfingroup])row{分组ID:group_id,重复类型:高度相似(文本内容接近),主文件(保留):all_path,重复副本文件:无自动移动建议人工复核,文件大小MB:group[0][size_mb],MD5哈希:不相同,文件后缀:group[0][suffix]}report_rows.append(row)group_id1# 导出简易Excel清单df_reportpd.DataFrame(report_rows)df_report.to_excel(OUTPUT_REPORT,indexFalse)write_log(f查重清单已导出至{OUTPUT_REPORT})# 程序入口主函数 defmain():# 清空历史日志withopen(LOG_PATH,w,encodingutf-8)asf:f.write( 文件查重扫描日志 开始 \n)# 1. 扫描全部文件file_infoscan_all_files(SCAN_FOLDER)# 2. 查重分组dup_groups,sim_groupsgroup_duplicate_files(file_info)# 3. 生成报表处理重复文件移动generate_report_and_handle_dup(dup_groups,sim_groups)write_log( 文件查重全部执行完毕 )print(f\n执行完成\n清单文件{OUTPUT_REPORT}\n操作日志{LOG_PATH})ifMOVE_DUPLICATE:print(f重复副本已移动至备份目录{BACKUP_FOLDER})if__name____main__:main()
python扫描并处理重复文件
发布时间:2026/6/26 5:17:11
扫描并处理重重复文件importosimportshutilimporthashlibimportchardetimportpandasaspdfromdocximportDocumentfromcollectionsimportdefaultdict# 全局配置区按需修改 SCAN_FOLDERrD:\data\mydata\t_data\test# 需要扫描查重的根目录BACKUP_FOLDERrD:\data\mydata\t_data\_重复文件备份# 重复文件移动备份目录OUTPUT_REPORTrD:\测试文件目录_查重清单.xlsx# 输出查重清单ExcelLOG_PATHrD:\文件扫描日志.txt# 异常日志保存路径MOVE_DUPLICATETrue# 是否自动移动重复文件副本到备份文件夹MAX_FILE_SIZE_MB500# 大于500MB的超大文件跳过哈希计算仅记录SIMILAR_THRESHOLD0.85# 文本相似度阈值大于等于判定高度相似# 跳过扫描的隐藏/系统目录SKIP_DIR_KEYWORDS{$recycle.bin,system volume information,.git,__pycache__}# 不处理的快捷/链接后缀SKIP_SUFFIX{.lnk,.symlink}# 支持提取文本做相似比对的后缀TEXT_SUFFIX{.txt,.csv,.md,.docx,.xlsx,.xls}# 模块1工具基础函数 defget_file_md5(file_path:str,block_size65536)-str: 计算文件MD5哈希值分块读取适配大文件 :param file_path: 文件绝对路径 :param block_size: 单次读取缓冲区大小 :return: md5字符串读取失败返回空字符串 try:md5_objhashlib.md5()withopen(file_path,rb)asf:whilechunk:f.read(block_size):md5_obj.update(chunk)returnmd5_obj.hexdigest()exceptExceptionase:log_msgf【哈希计算失败】{file_path}| 错误{str(e)}write_log(log_msg)returndefget_file_size_mb(file_path:str)-float:获取文件大小MBbyte_sizeos.path.getsize(file_path)returnround(byte_size/1024/1024,3)defwrite_log(msg:str):全局日志写入追加模式withopen(LOG_PATH,a,encodingutf-8)asf:f.write(msg\n)print(msg)defsafe_move_file(src:str,dst_dir:str): 安全移动文件目标存在自动重命名避免冲突 :param src: 原文件路径 :param dst_dir: 目标文件夹 os.makedirs(dst_dir,exist_okTrue)file_nameos.path.basename(src)dst_pathos.path.join(dst_dir,file_name)# 文件存在则循环加后缀区分idx1whileos.path.exists(dst_path):name_no_ext,extos.path.splitext(file_name)dst_pathos.path.join(dst_dir,f{name_no_ext}_副本{idx}{ext})idx1try:shutil.move(src,dst_path)write_log(f【已移动重复文件】原路径:{src}新路径:{dst_path})exceptExceptionase:write_log(f【文件移动失败】{src}错误:{str(e)})# 模块2文本提取函数用于相似文件比对 defextract_file_text(file_path:str,suffix:str)-str: 提取txt/docx/xlsx/csv内全部文本用于相似度判断 :param file_path: 文件路径 :param suffix: 文件后缀小写 :return: 拼接后的纯文本读取失败返回空字符串 full_texttry:ifsuffix.txtorsuffix.csvorsuffix.md:# 自动识别编码读取文本withopen(file_path,rb)asf:raw_dataf.read()encode_infochardet.detect(raw_data)encodeencode_info.get(encoding,utf-8)full_textraw_data.decode(encode,errorsignore)elifsuffix.docx:docDocument(file_path)para_text[p.textforpindoc.paragraphsifp.text.strip()]full_text .join(para_text)elifsuffixin(.xlsx,.xls):dfpd.read_excel(file_path,sheet_nameNone)all_sheet_text[]forsheet_dataindf.values():text_linesheet_data.to_string()all_sheet_text.append(text_line)full_text .join(all_sheet_text)exceptExceptionase:write_log(f【文本提取失败】{file_path}错误:{str(e)})return# 清理空白字符压缩文本减少比对开销full_textfull_text.replace(\n,).replace( ,)returnfull_textdefcalc_text_similarity(text1:str,text2:str)-float:简易文本相似度字符重合度0~1set1set(text1)set2set(text2)ifnotset1andnotset2:return1.0interlen(set1set2)unionlen(set1|set2)returninter/unionifunion!0else0# 模块3目录扫描函数 defscan_all_files(root_dir:str)-list:遍历目录过滤无效文件返回全部有效文件信息列表file_info_list[]forroot,dirs,filesinos.walk(root_dir):# 过滤系统隐藏目录原地修改dirs阻止进入dirs[:][dfordindirsifd.lower()notinSKIP_DIR_KEYWORDS]forfile_nameinfiles:full_pathos.path.abspath(os.path.join(root,file_name))suffixos.path.splitext(file_name)[1].lower()# 跳过快捷方式/链接ifsuffixinSKIP_SUFFIX:continuetry:byte_sizeos.path.getsize(full_path)mb_sizeround(byte_size/1024/1024,3)modify_timeos.path.getmtime(full_path)file_info_list.append({file_path:full_path,file_name:file_name,suffix:suffix,size_mb:mb_size,size_byte:byte_size,modify_time:modify_time})exceptExceptionase:write_log(f【文件信息读取失败】{full_path}错误:{str(e)})write_log(f目录扫描完成共获取有效文件{len(file_info_list)}个)returnfile_info_list# 模块4查重核心分组逻辑 defgroup_duplicate_files(file_info_list:list): 核心查重分组1.按大小分组 2.同大小计算MD5分完全重复组 3.文本相似分组 :return: duplicate_groups 完全重复组列表; similar_groups 高度相似组列表 # 1. 先按文件字节大小分组大小不同直接不可能重复size_groupdefaultdict(list)forinfoinfile_info_list:size_group[info[size_byte]].append(info)# 2. MD5哈希分组 完全重复文件组hash_groupsdefaultdict(list)forsize_byte,info_listinsize_group.items():# 单个文件无重复跳过iflen(info_list)1:continueforinfoininfo_list:# 超大文件跳过哈希计算ifinfo[size_mb]MAX_FILE_SIZE_MB:continuemd5_valget_file_md5(info[file_path])ifmd5_val:info[md5]md5_val hash_groups[md5_val].append(info)# 过滤仅单个文件的组duplicate_groups[gforginhash_groups.values()iflen(g)2]# 3. 文本类文件相似度分组非完全重复但内容近似similar_groups[]# 提取所有支持文本比对、且不在完全重复组内的文件text_candidate[]all_dup_pathset()forgroupinduplicate_groups:foritemingroup:all_dup_path.add(item[file_path])forinfoinfile_info_list:ifinfo[suffix]inTEXT_SUFFIXandinfo[file_path]notinall_dup_path:info[text_content]extract_file_text(info[file_path],info[suffix])text_candidate.append(info)# 两两比对相似度used_idxset()total_textlen(text_candidate)foriinrange(total_text):ifiinused_idx:continuegroup_temp[text_candidate[i]]text_itext_candidate[i][text_content]forjinrange(i1,total_text):ifjinused_idx:continuetext_jtext_candidate[j][text_content]simcalc_text_similarity(text_i,text_j)ifsimSIMILAR_THRESHOLD:group_temp.append(text_candidate[j])used_idx.add(j)iflen(group_temp)2:similar_groups.append(group_temp)write_log(f查重完成完全重复文件组{len(duplicate_groups)}组高度相似文件组{len(similar_groups)}组)returnduplicate_groups,similar_groups# 模块5生成输出清单 移动重复文件 defgenerate_report_and_handle_dup(duplicate_groups,similar_groups): 1. 生成简易Excel查重清单 2. 可选移动重复副本至备份文件夹 report_rows[]group_id1# 写入完全重复组forgroupinduplicate_groups:base_filegroup[0]# 保留第一个原始文件dup_file_listgroup[1:]# 其余全部判定为重复副本base_pathbase_file[file_path]dup_paths; .join([f[file_path]forfindup_file_list])# 写入报表行row{分组ID:group_id,重复类型:完全重复(二进制一致),主文件(保留):base_path,重复副本文件:dup_paths,文件大小MB:base_file[size_mb],MD5哈希:base_file[md5],文件后缀:base_file[suffix]}report_rows.append(row)# 移动重复副本文件ifMOVE_DUPLICATE:fordup_infoindup_file_list:safe_move_file(dup_info[file_path],BACKUP_FOLDER)group_id1# 写入高度相似组不移动仅人工复核forgroupinsimilar_groups:all_path; .join([f[file_path]forfingroup])row{分组ID:group_id,重复类型:高度相似(文本内容接近),主文件(保留):all_path,重复副本文件:无自动移动建议人工复核,文件大小MB:group[0][size_mb],MD5哈希:不相同,文件后缀:group[0][suffix]}report_rows.append(row)group_id1# 导出简易Excel清单df_reportpd.DataFrame(report_rows)df_report.to_excel(OUTPUT_REPORT,indexFalse)write_log(f查重清单已导出至{OUTPUT_REPORT})# 程序入口主函数 defmain():# 清空历史日志withopen(LOG_PATH,w,encodingutf-8)asf:f.write( 文件查重扫描日志 开始 \n)# 1. 扫描全部文件file_infoscan_all_files(SCAN_FOLDER)# 2. 查重分组dup_groups,sim_groupsgroup_duplicate_files(file_info)# 3. 生成报表处理重复文件移动generate_report_and_handle_dup(dup_groups,sim_groups)write_log( 文件查重全部执行完毕 )print(f\n执行完成\n清单文件{OUTPUT_REPORT}\n操作日志{LOG_PATH})ifMOVE_DUPLICATE:print(f重复副本已移动至备份目录{BACKUP_FOLDER})if__name____main__:main()