引言
Tokenizer(分词器)是将原始文本转换为模型可处理的token序列的关键组件。Tokenizer的设计直接影响模型的性能、训练效率和推理速度。本文档系统介绍Tokenizer的原理、训练流程和设计策略。
Tokenizer的核心作用
原始文本 → Tokenizer → Token序列 → LLM → 输出序列 → Tokenizer → 输出文本
"Hello world" [1234, 5678] [1234, 5678] "Hello world"
Tokenizer不仅是文本到数字的映射,更决定了:
- 词表大小:影响模型参数量和embedding维度
- 子词切分:影响未登录词处理能力
- 编码效率:影响序列长度和计算成本
- 多语言支持:影响跨语言迁移能力
1. Tokenizer类型对比
1.1 常见Tokenizer类型
| 类型 | 特点 | 优点 | 缺点 | 代表模型 |
|---|---|---|---|---|
| Byte-level BPE | 字节级分词 | 通用性最强、无OOV | 序列长度增加 | GPT-2, LLaMA |
| WordPiece | 基于频率的子词 | 适合多语言 | 训练复杂 | BERT, 日语模型 |
| Unigram | 概率模型分词 | 灵活、支持多语言 | 推理开销高 | T5, AlBERT |
| SentencePiece | 统一框架 | 支持BPE/Unigram/Char | 需额外训练 | 日语/中文模型 |
| TokenLLM | 语义感知的token | 保留语义边界 | 训练成本高 | 最新研究 |
1.2 Byte-Pair Encoding (BPE)
BPE是一种基于频率的贪心算法,最初用于数据压缩,后来被引入NLP。
算法原理
def train_bpe(corpus: List[str], vocab_size: int) -> Dict[str, int]:
"""
训练BPE分词器
Args:
corpus: 原始语料
vocab_size: 最终词表大小
Returns:
词表:token -> id
"""
# 1. 初始化:每个字符作为一个token
vocab = {}
for text in corpus:
for char in text:
if char not in vocab:
vocab[char] = len(vocab)
# 2. 迭代合并频率最高的pair
while len(vocab) < vocab_size:
# 统计所有pair的频率
pair_freq = {}
for text in corpus:
tokens = tokenize_text(text, vocab)
for i in range(len(tokens) - 1):
pair = (tokens[i], tokens[i + 1])
pair_freq[pair] = pair_freq.get(pair, 0) + 1
# 3. 找到频率最高的pair
if not pair_freq:
break
best_pair = max(pair_freq, key=pair_freq.get)
# 4. 合并该pair
new_token = best_pair[0] + best_pair[1]
vocab[new_token] = len(vocab)
# 5. 更新语料中的所有pair
corpus = [merge_pair(text, best_pair, new_token) for text in corpus]
return vocab
def tokenize_bpe(text: str, vocab: Dict[str, int]) -> List[int]:
"""使用BPE词表进行分词"""
tokens = list(text) # 初始化为字符序列
while len(tokens) > 1:
# 找到可合并的最长匹配
best_pos = -1
best_len = 1
for i in range(len(tokens) - 1):
pair = tokens[i] + tokens[i + 1]
if pair in vocab:
if best_pos == -1 or len(pair) > best_len:
best_pos = i
best_len = len(pair)
if best_pos == -1:
break
# 合并
new_token = tokens[best_pos] + tokens[best_pos + 1]
tokens = tokens[:best_pos] + [new_token] + tokens[best_pos + 2:]
return [vocab.get(t, vocab['<unk>']) for t in tokens]BPE的优缺点
| 优点 | 缺点 |
|---|---|
| 算法简单高效 | 贪心策略可能非最优 |
| 有效处理未登录词 | 训练时间随词表大小增长 |
| 平衡词级和字符级分词 | 对语义边界的感知有限 |
1.3 WordPiece
WordPiece是Google为BERT开发的分词器,核心思想与BPE类似,但使用不同的合并标准。
算法原理
import math
def wordpiece_score(pair: tuple, char_freq: dict, bigram_freq: dict) -> float:
"""
计算WordPiece的合并得分
WordPiece使用似然比作为合并标准:
score = freq(A,B) / (freq(A) * freq(B))
这等价于计算互信息
"""
a, b = pair
ab = a + b
# 频率
f_a = char_freq.get(a, 0)
f_b = char_freq.get(b, 0)
f_ab = bigram_freq.get(ab, 0)
# 互信息
if f_a == 0 or f_b == 0:
return 0
return math.log(f_ab) - math.log(f_a) - math.log(f_b)
def train_wordpiece(corpus: List[str], vocab_size: int) -> Dict[str, int]:
"""训练WordPiece分词器"""
# 初始化词表
vocab = {chr(i): i for i in range(256)} # 字节级
# 统计频率
char_freq = Counter()
bigram_freq = Counter()
for text in corpus:
chars = list(text)
for char in chars:
char_freq[char] += 1
for i in range(len(chars) - 1):
bigram_freq[chars[i] + chars[i+1]] += 1
# 迭代合并
while len(vocab) < vocab_size:
# 计算所有pair的得分
best_pair = None
best_score = float('-inf')
for bigram, freq in bigram_freq.items():
if freq < 2:
continue
a, b = bigram[0], bigram[1]
score = wordpiece_score((a, b), char_freq, bigram_freq)
if score > best_score:
best_score = score
best_pair = bigram
if best_pair is None:
break
# 合并
new_token = best_pair
vocab[new_token] = len(vocab)
# 更新频率统计(省略具体实现)
update_frequencies(bigram_freq, char_freq, best_pair)
return vocab1.4 Unigram Language Model
Unigram模型是SentencePiece的默认选项,基于语言模型进行分词。
from collections import Counter
import random
class UnigramTokenizer:
"""Unigram语言模型分词器"""
def __init__(self, corpus: List[str], vocab_size: int, num_threads=4):
self.corpus = corpus
self.vocab_size = vocab_size
self.vocab = {}
self.token_freqs = {}
self.total_tokens = 0
def train(self, max_iterations=10):
"""训练Unigram分词器"""
# 1. 初始化:使用BPE构建初始词表
self.vocab = self._init_vocab_bpe()
for iteration in range(max_iterations):
# 2. 计算每个token的出现频率
self._count_token_frequencies()
# 3. E步:使用维特比算法找最优分词
segmentations = self._forward_backward()
# 4. M步:调整词表(删除低频token)
self._update_vocab(segmentations)
# 5. 重新估计语言模型参数
self._estimate_parameters()
# 检查收敛
if self._check_convergence():
break
def _viterbi_segment(self, text: str) -> List[str]:
"""维特比分词:找到概率最高的分词方案"""
n = len(text)
# dp[i] = 以text[i:]开头的最优分词方案的概率
dp = [0.0] * (n + 1)
dp[n] = 1.0
backptr = [-1] * n
for i in range(n - 1, -1, -1):
best_prob = float('-inf')
best_len = 0
for length in range(1, min(50, n - i + 1)): # 最大token长度
token = text[i:i+length]
if token not in self.token_probs:
continue
prob = self.token_probs[token] * dp[i + length]
if prob > best_prob:
best_prob = prob
best_len = length
dp[i] = best_prob
backptr[i] = best_len
# 回溯
tokens = []
pos = 0
while pos < n:
length = backptr[pos]
if length == 0: # 未找到有效分词,使用单字符
tokens.append(text[pos])
pos += 1
else:
tokens.append(text[pos:pos+length])
pos += length
return tokens
def _estimate_parameters(self):
"""估计token概率参数"""
total = sum(self.token_freqs.values())
self.token_probs = {
token: freq / total
for token, freq in self.token_freqs.items()
}2. 词表大小的影响
2.1 词表大小的权衡
词表大小是Tokenizer设计的核心超参数之一:
词表大小
│
│ ┌─────────────────────────────────┐
32768 ├────┤ │
│ │ 词表过大: │
│ │ - 序列长度短 │
│ │ - embedding稀疏 │
│ │ - 训练数据稀疏 │
│ └─────────────────────────────────┘
│
│ ┌─────────────────────────────────┐
4096 ├────┤ 词表适中: │
│ │ - 平衡序列长度与覆盖率 │
│ │ - 常见词有直接token │
│ │ - 未登录词通过子词处理 │
│ └─────────────────────────────────┘
│
│ ┌─────────────────────────────────┐
1024 ├────┤ 词表过小: │
│ │ - 序列长度过长 │
│ │ - 序列编码效率低 │
│ │ - 推理成本增加 │
│ └─────────────────────────────────┘
│
└─────────────────────────────────────────────► 模型参数
2.2 词表大小与模型性能
| 词表大小 | 典型应用 | 特点 |
|---|---|---|
| 1K-4K | 字符级/小词表 | 序列长,但泛化能力强 |
| 8K-16K | 经典模型 | 平衡选择 |
| 32K-64K | 现代LLM | GPT-3, LLaMA等 |
| 100K+ | 多语言/多模态 | ChatGPT, Claude |
# 词表大小与参数量的关系
def calculate_embedding_params(vocab_size, embedding_dim):
"""计算embedding层参数量"""
return vocab_size * embedding_dim
# 不同词表大小的embedding参数量
configs = [
(30000, 4096, "LLaMA-7B"),
(32000, 4096, "LLaMA-13B"),
(50000, 4096, "ChatGPT"),
]
for vocab_size, dim, model in configs:
params = calculate_embedding_params(vocab_size, dim)
print(f"{model}: vocab={vocab_size}, embedding params={params:,}")2.3 词表设计原则
def design_vocabulary(corpus, target_size=32000):
"""词表设计流程"""
# 1. 分析语料分布
stats = analyze_corpus_distribution(corpus)
# 2. 确定特殊token
special_tokens = [
"<pad>", # 填充
"<s>", # 句子开始
"</s>", # 句子结束
"<unk>", # 未知词
"<mask>", # 掩码
# 业务相关特殊token
"[USER]", # 用户提及
"[BOT]", # 机器人回复
]
# 3. 预留位置
num_special = len(special_tokens)
effective_target = target_size - num_special
# 4. 训练BPE/Unigram
base_vocab = train_tokenizer(corpus, vocab_size=effective_target)
# 5. 合并词表
full_vocab = special_tokens + list(base_vocab.keys())
# 6. 验证覆盖率
coverage = calculate_coverage(corpus, full_vocab)
return full_vocab, coverage3. 多语言Tokenizer设计
3.1 多语言挑战
| 语言 | 特点 | 挑战 |
|---|---|---|
| 英语 | 空格分隔 | 相对简单 |
| 中文 | 无空格 | 需要分词 |
| 日语 | 无空格+多种字符集 | 最大挑战 |
| 阿拉伯语 | 从右到左 | 双向问题 |
| 泰语 | 无空格 | 分词歧义 |
| 德语 | 复合词 | 长词问题 |
3.2 统一词表策略
class MultilingualTokenizer:
"""多语言统一Tokenizer"""
def __init__(self, vocab_size=32000):
self.vocab_size = vocab_size
self.lang_specific_rules = {}
def train(self, corpora: Dict[str, List[str]]):
"""
训练多语言Tokenizer
Args:
corpora: {"en": [...], "zh": [...], "ja": [...]}
"""
# 1. 预处理各语言
processed_corpora = {}
for lang, corpus in corpora.items():
processed = self.preprocess_language(corpus, lang)
processed_corpora[lang] = processed
# 2. 合并语料(可加权)
combined = []
for lang, corpus in processed_corpora.items():
weight = self.get_language_weight(lang)
combined.extend(corpus * weight)
# 3. 训练统一词表
self.vocab = self.train_bpe(combined, self.vocab_size)
# 4. 保存语言特定规则
self.lang_specific_rules = {
lang: self.extract_lang_rules(corpus)
for lang, corpus in corpora.items()
}
def preprocess_language(self, corpus, lang):
"""语言特定预处理"""
if lang == "zh":
# 中文:使用jieba进行初步分词
import jieba
return [" ".join(jieba.cut(text)) for text in corpus]
elif lang == "ja":
# 日语:使用mecab分词
import fugashi
return [self.mecab_tokenize(text) for text in corpus]
elif lang == "ar":
# 阿拉伯语:处理RTL和连写
return [self.normalize_arabic(text) for text in corpus]
return corpus
def get_language_weight(self, lang):
"""语言权重:平衡数据量差异"""
weights = {
"en": 1.0,
"zh": 1.2, # 中文信息密度高
"ja": 1.1,
"ko": 1.1,
"ar": 0.9,
}
return weights.get(lang, 1.0)3.3 字节级Tokenizer的优势
class ByteLevelTokenizer:
"""字节级Tokenizer:多语言通用方案"""
def __init__(self, vocab_size=256):
# 词表大小固定为256(字节数)
self.vocab_size = 256
self.pad_token_id = 0
def encode(self, text: str) -> List[int]:
"""编码:文本 -> 字节 -> token"""
# UTF-8编码转字节
bytes_sequence = text.encode('utf-8')
return list(bytes_sequence)
def decode(self, tokens: List[int]) -> str:
"""解码:token -> 字节 -> 文本"""
bytes_sequence = bytes(tokens)
return bytes_sequence.decode('utf-8', errors='replace')
def train_merges(self, corpus, num_merges=2000):
"""
训练BPE合并规则
这允许用小词表处理任意Unicode字符
"""
# 初始化:每个字节一个token
vocab = {bytes([i]): i for i in range(256)}
# 训练合并规则
self.merges = []
for _ in range(num_merges):
pair_freq = self.count_pair_freq(corpus)
best_pair = max(pair_freq, key=pair_freq.get)
new_id = len(vocab)
vocab[best_pair[0] + best_pair[1]] = new_id
self.merges.append(best_pair)
# 应用合并
corpus = [self.apply_merge(text, best_pair) for text in corpus]
self.vocab = vocab
# 字节级Tokenizer的覆盖率保证
def test_coverage(tokenizer, text):
"""字节级Tokenizer的OOV率为0"""
tokens = tokenizer.encode(text)
# 检查是否有OOV
oov_count = sum(1 for t in tokens if t == tokenizer.unk_token_id)
return {
"total_tokens": len(tokens),
"oov_tokens": oov_count,
"oov_rate": oov_count / len(tokens) if tokens else 0,
"coverage": 1.0 - (oov_count / len(tokens) if tokens else 0)
}4. Tokenizer训练实践
4.1 训练流程
from sentencepiece import SentencePieceTrainer, SentencePieceProcessor
import os
def train_sentencepiece_tokenizer(
corpus_path: str,
model_prefix: str = "tokenizer",
vocab_size: int = 32000,
character_coverage: float = 0.9995,
model_type: str = "unigram", # "unigram", "bpe", "char", "word"
max_sentence_length: int = 16384,
num_threads: int = 8
):
"""
使用SentencePiece训练Tokenizer
Args:
corpus_path: 语料文件路径
model_prefix: 模型前缀
vocab_size: 词表大小
character_coverage: 字符覆盖率(用于处理生僻字)
model_type: 模型类型
"""
# 构建训练命令
train_args = [
f"--input={corpus_path}",
f"--model_prefix={model_prefix}",
f"--vocab_size={vocab_size}",
f"--character_coverage={character_coverage}",
f"--model_type={model_type}",
f"--max_sentence_length={max_sentence_length}",
f"--num_threads={num_threads}",
f"--pad_id=0", # 填充token
f"--unk_id=1", # 未知token
f"--bos_id=2", # 句子开始
f"--eos_id=3", # 句子结束
f"--pad_piece=[PAD]", # 填充表示
f"--unk_piece=[UNK]", # 未知表示
f"--bos_piece=[BOS]", # 开始表示
f"--eos_piece=[EOS]", # 结束表示
# 训练参数
f"--split_by_whitespace=true",
f"--split_by_number=true",
f"--split_by_unicode_script=true",
f"--allow_whitespace_only_pieces=true",
]
# 训练
SentencePieceTrainer.train(" ".join(train_args))
# 加载模型
spm = SentencePieceProcessor()
spm.load(f"{model_prefix}.model")
return spm
def prepare_corpus_for_tokenizer(corpus_dir: str, output_path: str):
"""
准备语料:清洗和格式化
"""
texts = []
for file in os.listdir(corpus_dir):
if not file.endswith(('.txt', '.json', '.jsonl')):
continue
filepath = os.path.join(corpus_dir, file)
with open(filepath, 'r', encoding='utf-8') as f:
if file.endswith('.jsonl'):
for line in f:
data = json.loads(line)
text = data.get('text', data.get('content', ''))
if text:
texts.append(text.strip())
else:
for line in f:
if line.strip():
texts.append(line.strip())
# 写入合并文件
with open(output_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(texts))
return len(texts)4.2 多语言训练示例
# 训练多语言Tokenizer
multilingual_corpus = {
"en": "/data/corpus/en.txt",
"zh": "/data/corpus/zh.txt",
"ja": "/data/corpus/ja.txt",
"ko": "/data/corpus/ko.txt",
}
# 合并语料(可按语言比例采样)
combined_corpus = "/data/corpus/combined.txt"
# 语料混合比例
ratios = {"en": 0.4, "zh": 0.3, "ja": 0.15, "ko": 0.15}
with open(combined_corpus, 'w') as out:
for lang, path in multilingual_corpus.items():
ratio = ratios[lang]
with open(path, 'r') as f:
lines = f.readlines()
# 按比例采样
sample_size = int(len(lines) * ratio)
sampled = random.sample(lines, min(sample_size, len(lines)))
out.writelines(sampled)
# 训练
tokenizer = train_sentencepiece_tokenizer(
corpus_path=combined_corpus,
model_prefix="/models/multilingual_tokenizer",
vocab_size=48000, # 稍大词表容纳多语言
model_type="unigram"
)4.3 训练数据准备
class CorpusPreprocessor:
"""语料预处理"""
def __init__(self, min_length=10, max_length=4096):
self.min_length = min_length
self.max_length = max_length
def preprocess(self, texts: List[str]) -> List[str]:
"""预处理文本"""
cleaned = []
for text in texts:
# 1. 基本清洗
text = self.clean_text(text)
# 2. 长度过滤
if not self.min_length <= len(text) <= self.max_length:
continue
# 3. 语言检测(可选)
if self.is_valid_language(text):
cleaned.append(text)
return cleaned
def clean_text(self, text: str) -> str:
"""清洗文本"""
# 移除控制字符
text = ''.join(ch for ch in text if ch.isprintable() or ch in '\n\t')
# 规范化空白
text = ' '.join(text.split())
# 移除特殊字符(根据需求)
# text = re.sub(r'[^\w\s\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]', '', text)
return text
def deduplicate(self, texts: List[str]) -> List[str]:
"""去重"""
seen = set()
unique = []
for text in texts:
# 使用hash去重
h = hash(text)
if h not in seen:
seen.add(h)
unique.append(text)
return unique5. 特殊Token管理
5.1 特殊Token类型
| Token | ID | 用途 |
|---|---|---|
<pad> | 0 | 序列填充 |
<unk> | 1 | 未知词 |
<bos> | 2 | 句子开始 |
<eos> | 3 | 句子结束 |
<sep> | 4 | 分隔符 |
<mask> | 5 | 掩码(MLM) |
[CLS] | 6 | 分类token |
[SEP] | 7 | BERT分隔符 |
| `< | user | >` |
| `< | assistant | >` |
5.2 特殊Token添加策略
def add_special_tokens(
base_vocab_path: str,
special_tokens: Dict[str, str],
output_path: str
):
"""
向基础词表添加特殊Token
Args:
base_vocab_path: 基础词表路径
special_tokens: 特殊token字典 {name: value}
output_path: 输出路径
"""
# 加载基础词表
with open(base_vocab_path, 'r') as f:
base_vocab = json.load(f)
# 扩展词表
for name, value in special_tokens.items():
if name not in base_vocab:
base_vocab[name] = len(base_vocab)
# 保存
with open(output_path, 'w') as f:
json.dump(base_vocab, f, ensure_ascii=False, indent=2)
return base_vocab
# 示例:为对话模型添加角色token
chat_special_tokens = {
"<|system|>": "系统提示",
"<|user|>": "用户输入",
"<|assistant|>": "助手回复",
"<|endoftext|>": "文本结束",
# 工具调用
"<|tool_call|>": "工具调用开始",
"<|tool_response|>": "工具响应",
"<|function|>": "函数定义",
}
extended_vocab = add_special_tokens(
base_vocab_path="/models/base_tokenizer.json",
special_tokens=chat_special_tokens,
output_path="/models/chat_tokenizer.json"
)6. Tokenizer评估与分析
6.1 评估指标
class TokenizerEvaluator:
"""Tokenizer评估工具"""
def __init__(self, tokenizer):
self.tokenizer = tokenizer
def evaluate(self, test_corpus: List[str]) -> Dict:
"""全面评估Tokenizer"""
return {
"compression_ratio": self.compression_ratio(test_corpus),
"coverage": self.vocabulary_coverage(test_corpus),
"oov_rate": self.oov_rate(test_corpus),
"avg_token_length": self.avg_token_length(test_corpus),
"special_token_ratio": self.special_token_ratio(test_corpus),
"chinese_coverage": self.chinese_coverage(test_corpus),
}
def compression_ratio(self, corpus: List[str]) -> float:
"""
压缩率:原始字符数 / token数
越高表示压缩效果越好
"""
total_chars = sum(len(text) for text in corpus)
total_tokens = sum(len(self.tokenizer.encode(text)) for text in corpus)
return total_chars / total_tokens if total_tokens > 0 else 0
def vocabulary_coverage(self, corpus: List[str]) -> Dict:
"""
词表覆盖率
"""
vocab = set()
total_tokens = 0
for text in corpus:
tokens = self.tokenizer.encode(text)
vocab.update(tokens)
total_tokens += len(tokens)
# 计算每个token的覆盖率
token_freq = Counter()
for text in corpus:
tokens = self.tokenizer.encode(text)
token_freq.update(tokens)
covered_tokens = sum(token_freq.values())
return {
"unique_tokens": len(vocab),
"vocab_size": self.tokenizer.vocab_size(),
"unique_coverage": len(vocab) / self.tokenizer.vocab_size(),
"token_coverage": covered_tokens / total_tokens if total_tokens > 0 else 0
}
def oov_rate(self, corpus: List[str]) -> float:
"""
未登录词率
"""
total_chars = 0
oov_chars = 0
for text in corpus:
# 检查未登录字符
for char in text:
total_chars += 1
if not self.tokenizer.is_in_vocab(char):
oov_chars += 1
return oov_chars / total_chars if total_chars > 0 else 0
def avg_token_length(self, corpus: List[str]) -> float:
"""平均token长度"""
lengths = [len(self.tokenizer.encode(text)) for text in corpus]
return sum(lengths) / len(lengths) if lengths else 06.2 词表分析
def analyze_vocabulary(tokenizer, corpus: List[str]):
"""分析词表结构"""
# 统计token类型
token_stats = {
"single_char": 0,
"word": 0,
"subword": 0,
"special": 0
}
# token频率统计
freq_dist = Counter()
for text in corpus:
tokens = tokenizer.encode(text)
freq_dist.update(tokens)
# 分类统计
for token_id in range(tokenizer.vocab_size()):
token = tokenizer.id_to_token(token_id)
if tokenizer.is_special(token):
token_stats["special"] += 1
elif len(token) == 1:
token_stats["single_char"] += 1
elif token.startswith("▁"): # SentencePiece标记
token_stats["subword"] += 1
else:
token_stats["word"] += 1
return {
"stats": token_stats,
"top_20_tokens": tokenizer.decode(list(range(20))),
"bottom_20_tokens": [tokenizer.id_to_token(i) for i in range(tokenizer.vocab_size()-20, tokenizer.vocab_size())]
}7. 子词vs字符级Tokenization权衡
7.1 不同粒度的比较
# 不同Tokenization粒度的比较
comparison = {
"char_level": {
"vocab_size": 256,
"seq_length_multiplier": 4, # vs词级
"oov_rate": 0,
"semantic_preservation": "low",
"training_data_efficiency": "low",
},
"subword_bpe": {
"vocab_size": 30000,
"seq_length_multiplier": 1.5,
"oov_rate": 0.02,
"semantic_preservation": "high",
"training_data_efficiency": "high",
},
"word_level": {
"vocab_size": 100000,
"seq_length_multiplier": 1.0,
"oov_rate": 0.15,
"semantic_preservation": "very_high",
"training_data_efficiency": "medium",
}
}7.2 选择建议
| 场景 | 推荐粒度 | 理由 |
|---|---|---|
| 低资源语言 | 字节级 | 保证零OOV |
| 多语言统一 | 子词级 | 跨语言共享子词 |
| 代码模型 | 子词+字节混合 | 处理特殊符号 |
| 专用领域 | 词+子词 | 保留专业术语 |
| 中文为主 | 字符+词混合 | 平衡效率与语义 |
8. 实践指南
8.1 从头训练Tokenizer
# 使用SentencePiece训练
spm_train \
--input=corpus.txt \
--model_prefix=my_tokenizer \
--vocab_size=32000 \
--character_coverage=0.9995 \
--model_type=unigram \
--pad_id=0 \
--unk_id=1 \
--bos_id=2 \
--eos_id=3 \
--add_dummy_prefix=false \
--split_by_whitespace=true \
--split_by_number=true8.2 加载和使用Tokenizer
from transformers import AutoTokenizer
# 加载HuggingFace格式的Tokenizer
tokenizer = AutoTokenizer.from_pretrained("/path/to/tokenizer")
# 编码
text = "Hello, world!"
tokens = tokenizer.encode(text, return_tensors="pt")
# 解码
decoded = tokenizer.decode(tokens[0])
# 批量编码
batch = ["Hello", "World", "你好"]
encoded = tokenizer(batch, padding=True, truncation=True, return_tensors="pt")8.3 检查Tokenizer质量
def quality_check(tokenizer, test_texts):
"""Tokenizer质量检查"""
issues = []
for text in test_texts:
# 1. 往返一致性
tokens = tokenizer.encode(text)
decoded = tokenizer.decode(tokens)
if decoded.strip() != text.strip():
issues.append(f"Round-trip mismatch: {text} -> {decoded}")
# 2. 特殊字符处理
if any(ord(c) > 0xFFFF for c in text):
if not tokenizer.can_encode_special_chars():
issues.append(f"Special chars not handled: {text}")
# 3. 长度异常
if len(tokens) / len(text) > 3: # 压缩率过低
issues.append(f"Poor compression: {len(text)} chars -> {len(tokens)} tokens")
return {
"issues": issues,
"pass": len(issues) == 0,
"quality_score": 1.0 - len(issues) / len(test_texts)
}9. 总结
Tokenizer设计是LLM训练的关键环节,需要综合考虑:
- 词表大小:平衡模型参数、序列长度和覆盖率
- 分词粒度:根据语言特性和任务需求选择
- 多语言支持:统一词表 vs 语言特定词表
- 特殊Token:支持模型架构和业务需求
- 训练数据:数据质量和分布影响词表质量