不用sklearn,手把手教你用Python和TF-IDF实现垃圾邮件分类器(附完整代码) 从零构建中文垃圾邮件分类器基于TF-IDF与朴素贝叶斯的纯Python实现在信息爆炸的时代垃圾邮件已成为困扰用户的主要问题之一。传统基于规则的关键词过滤方法已难以应对日益复杂的垃圾邮件变体而机器学习算法则展现出强大的适应性。本文将带你从零开始不依赖任何现成的机器学习库如scikit-learn仅用Python标准库和基础数学知识实现一个基于TF-IDF特征提取和朴素贝叶斯分类的中文垃圾邮件过滤系统。1. 理解核心算法原理1.1 朴素贝叶斯分类器的工作机制朴素贝叶斯算法源于概率论中的贝叶斯定理其核心思想是通过已知特征来预测样本所属类别的概率。在垃圾邮件分类场景中我们需要计算P(垃圾邮件|特征词集合) ∝ P(特征词集合|垃圾邮件) × P(垃圾邮件) P(正常邮件|特征词集合) ∝ P(特征词集合|正常邮件) × P(正常邮件)其中朴素的假设在于各特征词之间相互独立这使得联合概率可以分解为各特征词条件概率的乘积P(词1,词2,...,词n|类别) P(词1|类别) × P(词2|类别) × ... × P(词n|类别)虽然现实中词语之间存在关联性但这一简化假设在实践中往往能取得不错的效果特别是在文本分类任务中。1.2 TF-IDF特征表示法TF-IDFTerm Frequency-Inverse Document Frequency是一种统计方法用于评估一个词对于一个文件集或语料库中的其中一份文件的重要程度。其计算公式为TF-IDF TF × IDF (词在文档中出现的次数/文档总词数) × log(总文档数/(包含该词的文档数1))TF-IDF的主要优势在于TF部分捕捉词语在单个文档中的重要性IDF部分降低常见词如的、是等的权重提升特征词区分度注意实际实现时会对公式做适当调整如添加平滑项避免除零错误。2. 数据准备与预处理2.1 获取与解析邮件数据集我们使用公开的中文垃圾邮件数据集trec06c包含约6万封已标注邮件65%垃圾邮件35%正常邮件。数据集结构如下trec06c/ ├── data/ │ ├── 000/ │ │ ├── 000001 │ │ ├── 000002 │ │ └── ... │ ├── 001/ │ └── ... └── index解析邮件的关键步骤包括import os import re def extract_email_content(filepath): 提取邮件正文并清洗非中文字符 with open(filepath, r, encodinggbk, errorsignore) as f: lines f.readlines() # 定位正文起始行第一个空行之后 content_start 0 for i, line in enumerate(lines): if not line.strip(): content_start i 1 break # 合并正文并清洗 content .join(lines[content_start:]) content re.sub(r[^\u4e00-\u9fa5], , content) # 移除非中文字符 return content2.2 构建标注数据集将邮件内容与标签对应起来def build_labeled_dataset(data_dir, index_file): dataset [] # 解析索引文件 with open(index_file, r, encodinggbk, errorsignore) as f: for line in f: label, path line.strip().split() label 1 if label spam else 0 filename os.path.basename(path) dataset.append((filename, label)) # 提取邮件内容 labeled_data [] for filename, label in dataset: dir_num filename[:3] filepath os.path.join(data_dir, dir_num, filename) content extract_email_content(filepath) labeled_data.append({content: content, label: label}) return labeled_data3. 特征工程实现3.1 中文分词与停用词处理使用jieba进行中文分词并去除停用词import jieba def load_stopwords(stopwords_file): with open(stopwords_file, r, encodingutf-8) as f: return set(line.strip() for line in f) def tokenize(content, stopwords): words jieba.lcut(content) return [word for word in words if word not in stopwords and len(word) 1]3.2 手动实现TF-IDF计算不依赖scikit-learn我们自行实现TF-IDF计算from collections import defaultdict import math class TFIDFVectorizer: def __init__(self, max_features5000, min_df5, max_df0.6): self.max_features max_features self.min_df min_df self.max_df max_df self.vocabulary_ None self.idf_ None def fit(self, documents): # 计算文档频率(DF) df defaultdict(int) total_docs len(documents) for doc in documents: unique_words set(doc) for word in unique_words: df[word] 1 # 过滤词汇 df_items [(word, count) for word, count in df.items() if min_df (count/total_docs) max_df] # 按DF排序并选择top特征 df_items.sort(keylambda x: -x[1]) self.vocabulary_ {word: i for i, (word, _) in enumerate(df_items[:self.max_features])} # 计算IDF self.idf_ {} vocab_size len(self.vocabulary_) for word, idx in self.vocabulary_.items(): self.idf_[word] math.log((total_docs 1)/(df[word] 1)) 1 def transform(self, documents): # 初始化结果矩阵 n_docs len(documents) n_features len(self.vocabulary_) X [[0.0]*n_features for _ in range(n_docs)] # 计算TF for i, doc in enumerate(documents): word_counts defaultdict(int) total_words len(doc) for word in doc: if word in self.vocabulary_: word_counts[word] 1 # 计算TF-IDF for word, count in word_counts.items(): tf count / total_words idx self.vocabulary_[word] X[i][idx] tf * self.idf_[word] return X4. 朴素贝叶斯分类器实现4.1 训练阶段计算条件概率import numpy as np class NaiveBayesClassifier: def __init__(self, alpha1.0): self.alpha alpha # 平滑系数 self.class_prior_ None self.feature_prob_ None def fit(self, X, y): n_samples, n_features len(X), len(X[0]) classes np.unique(y) n_classes len(classes) # 计算类先验概率 self.class_prior_ {} for c in classes: self.class_prior_[c] (np.sum(y c) self.alpha) / (n_samples self.alpha * n_classes) # 计算条件概率 self.feature_prob_ {} for c in classes: # 获取当前类别的样本 X_c [X[i] for i in range(n_samples) if y[i] c] # 计算每个特征的总TF-IDF值加平滑 feature_sum np.zeros(n_features) self.alpha for doc in X_c: feature_sum np.array(doc) # 归一化得到概率 total np.sum(feature_sum) self.feature_prob_[c] feature_sum / total def predict(self, X): predictions [] for doc in X: max_prob -1 best_class -1 for c, prior in self.class_prior_.items(): # 计算对数概率避免下溢 log_prob np.log(prior) feature_prob self.feature_prob_[c] for i, value in enumerate(doc): if value 0: # 只考虑文档中出现的特征 log_prob np.log(feature_prob[i]) if log_prob max_prob: max_prob log_prob best_class c predictions.append(best_class) return predictions4.2 模型评估指标实现def evaluate(y_true, y_pred): tp fp tn fn 0 for true, pred in zip(y_true, y_pred): if true 1 and pred 1: tp 1 elif true 1 and pred 0: fn 1 elif true 0 and pred 1: fp 1 else: tn 1 metrics { accuracy: (tp tn) / (tp tn fp fn), precision: tp / (tp fp) if (tp fp) 0 else 0, recall: tp / (tp fn) if (tp fn) 0 else 0, f1: 2 * tp / (2 * tp fp fn) if (2 * tp fp fn) 0 else 0 } return metrics5. 完整流程与性能优化5.1 端到端训练流程# 1. 加载数据 labeled_data build_labeled_dataset(trec06c/data, trec06c/full/index) # 2. 分词处理 stopwords load_stopwords(stopwords.txt) tokenized_data [tokenize(item[content], stopwords) for item in labeled_data] labels [item[label] for item in labeled_data] # 3. 划分训练测试集 split_idx int(0.7 * len(tokenized_data)) X_train tokenized_data[:split_idx] y_train labels[:split_idx] X_test tokenized_data[split_idx:] y_test labels[split_idx:] # 4. 特征提取 vectorizer TFIDFVectorizer(max_features5000) vectorizer.fit(X_train) X_train_tfidf vectorizer.transform(X_train) X_test_tfidf vectorizer.transform(X_test) # 5. 训练模型 model NaiveBayesClassifier() model.fit(X_train_tfidf, y_train) # 6. 评估 y_pred model.predict(X_test_tfidf) metrics evaluate(y_test, y_pred) print(评估结果:) for name, value in metrics.items(): print(f{name}: {value:.4f})5.2 性能优化技巧特征选择优化使用卡方检验选择信息量最大的特征词动态调整max_features参数找到性价比最高的特征数量计算效率提升使用稀疏矩阵存储TF-IDF特征对概率计算使用对数空间避免数值下溢算法改进实现伯努利朴素贝叶斯变体适用于短文本加入n-gram特征捕捉词语组合信息# 稀疏矩阵实现的示例 from scipy.sparse import lil_matrix class SparseTFIDFVectorizer(TFIDFVectorizer): def transform(self, documents): n_docs len(documents) n_features len(self.vocabulary_) X lil_matrix((n_docs, n_features)) for i, doc in enumerate(documents): word_counts defaultdict(int) total_words len(doc) for word in doc: if word in self.vocabulary_: word_counts[word] 1 for word, count in word_counts.items(): tf count / total_words idx self.vocabulary_[word] X[i, idx] tf * self.idf_[word] return X.tocsr()通过这种从零开始的实现方式我们不仅深入理解了TF-IDF和朴素贝叶斯的数学原理还掌握了如何将理论转化为实际可运行的代码。虽然性能可能不及优化过的库实现但这种实践对于理解机器学习底层原理具有不可替代的价值。