引言

Tokenizer(分词器)是将原始文本转换为模型可处理的token序列的关键组件。Tokenizer的设计直接影响模型的性能、训练效率和推理速度。本文档系统介绍Tokenizer的原理、训练流程和设计策略。

Tokenizer的核心作用

原始文本 → Tokenizer → Token序列 → LLM → 输出序列 → Tokenizer → 输出文本
     "Hello world"     [1234, 5678]            [1234, 5678]      "Hello world"

Tokenizer不仅是文本到数字的映射,更决定了:

  • 词表大小:影响模型参数量和embedding维度
  • 子词切分:影响未登录词处理能力
  • 编码效率:影响序列长度和计算成本
  • 多语言支持:影响跨语言迁移能力

1. Tokenizer类型对比

1.1 常见Tokenizer类型

类型特点优点缺点代表模型
Byte-level BPE字节级分词通用性最强、无OOV序列长度增加GPT-2, LLaMA
WordPiece基于频率的子词适合多语言训练复杂BERT, 日语模型
Unigram概率模型分词灵活、支持多语言推理开销高T5, AlBERT
SentencePiece统一框架支持BPE/Unigram/Char需额外训练日语/中文模型
TokenLLM语义感知的token保留语义边界训练成本高最新研究

1.2 Byte-Pair Encoding (BPE)

BPE是一种基于频率的贪心算法,最初用于数据压缩,后来被引入NLP。

算法原理

def train_bpe(corpus: List[str], vocab_size: int) -> Dict[str, int]:
    """
    训练BPE分词器
    
    Args:
        corpus: 原始语料
        vocab_size: 最终词表大小
    
    Returns:
        词表:token -> id
    """
    # 1. 初始化:每个字符作为一个token
    vocab = {}
    for text in corpus:
        for char in text:
            if char not in vocab:
                vocab[char] = len(vocab)
    
    # 2. 迭代合并频率最高的pair
    while len(vocab) < vocab_size:
        # 统计所有pair的频率
        pair_freq = {}
        for text in corpus:
            tokens = tokenize_text(text, vocab)
            for i in range(len(tokens) - 1):
                pair = (tokens[i], tokens[i + 1])
                pair_freq[pair] = pair_freq.get(pair, 0) + 1
        
        # 3. 找到频率最高的pair
        if not pair_freq:
            break
        
        best_pair = max(pair_freq, key=pair_freq.get)
        
        # 4. 合并该pair
        new_token = best_pair[0] + best_pair[1]
        vocab[new_token] = len(vocab)
        
        # 5. 更新语料中的所有pair
        corpus = [merge_pair(text, best_pair, new_token) for text in corpus]
    
    return vocab
 
 
def tokenize_bpe(text: str, vocab: Dict[str, int]) -> List[int]:
    """使用BPE词表进行分词"""
    tokens = list(text)  # 初始化为字符序列
    
    while len(tokens) > 1:
        # 找到可合并的最长匹配
        best_pos = -1
        best_len = 1
        
        for i in range(len(tokens) - 1):
            pair = tokens[i] + tokens[i + 1]
            if pair in vocab:
                if best_pos == -1 or len(pair) > best_len:
                    best_pos = i
                    best_len = len(pair)
        
        if best_pos == -1:
            break
        
        # 合并
        new_token = tokens[best_pos] + tokens[best_pos + 1]
        tokens = tokens[:best_pos] + [new_token] + tokens[best_pos + 2:]
    
    return [vocab.get(t, vocab['<unk>']) for t in tokens]

BPE的优缺点

优点缺点
算法简单高效贪心策略可能非最优
有效处理未登录词训练时间随词表大小增长
平衡词级和字符级分词对语义边界的感知有限

1.3 WordPiece

WordPiece是Google为BERT开发的分词器,核心思想与BPE类似,但使用不同的合并标准。

算法原理

import math
 
def wordpiece_score(pair: tuple, char_freq: dict, bigram_freq: dict) -> float:
    """
    计算WordPiece的合并得分
    
    WordPiece使用似然比作为合并标准:
    score = freq(A,B) / (freq(A) * freq(B))
    这等价于计算互信息
    """
    a, b = pair
    ab = a + b
    
    # 频率
    f_a = char_freq.get(a, 0)
    f_b = char_freq.get(b, 0)
    f_ab = bigram_freq.get(ab, 0)
    
    # 互信息
    if f_a == 0 or f_b == 0:
        return 0
    
    return math.log(f_ab) - math.log(f_a) - math.log(f_b)
 
 
def train_wordpiece(corpus: List[str], vocab_size: int) -> Dict[str, int]:
    """训练WordPiece分词器"""
    # 初始化词表
    vocab = {chr(i): i for i in range(256)}  # 字节级
    
    # 统计频率
    char_freq = Counter()
    bigram_freq = Counter()
    
    for text in corpus:
        chars = list(text)
        for char in chars:
            char_freq[char] += 1
        for i in range(len(chars) - 1):
            bigram_freq[chars[i] + chars[i+1]] += 1
    
    # 迭代合并
    while len(vocab) < vocab_size:
        # 计算所有pair的得分
        best_pair = None
        best_score = float('-inf')
        
        for bigram, freq in bigram_freq.items():
            if freq < 2:
                continue
            
            a, b = bigram[0], bigram[1]
            score = wordpiece_score((a, b), char_freq, bigram_freq)
            
            if score > best_score:
                best_score = score
                best_pair = bigram
        
        if best_pair is None:
            break
        
        # 合并
        new_token = best_pair
        vocab[new_token] = len(vocab)
        
        # 更新频率统计(省略具体实现)
        update_frequencies(bigram_freq, char_freq, best_pair)
    
    return vocab

1.4 Unigram Language Model

Unigram模型是SentencePiece的默认选项,基于语言模型进行分词。

from collections import Counter
import random
 
class UnigramTokenizer:
    """Unigram语言模型分词器"""
    
    def __init__(self, corpus: List[str], vocab_size: int, num_threads=4):
        self.corpus = corpus
        self.vocab_size = vocab_size
        self.vocab = {}
        self.token_freqs = {}
        self.total_tokens = 0
    
    def train(self, max_iterations=10):
        """训练Unigram分词器"""
        # 1. 初始化:使用BPE构建初始词表
        self.vocab = self._init_vocab_bpe()
        
        for iteration in range(max_iterations):
            # 2. 计算每个token的出现频率
            self._count_token_frequencies()
            
            # 3. E步:使用维特比算法找最优分词
            segmentations = self._forward_backward()
            
            # 4. M步:调整词表(删除低频token)
            self._update_vocab(segmentations)
            
            # 5. 重新估计语言模型参数
            self._estimate_parameters()
            
            # 检查收敛
            if self._check_convergence():
                break
    
    def _viterbi_segment(self, text: str) -> List[str]:
        """维特比分词:找到概率最高的分词方案"""
        n = len(text)
        
        # dp[i] = 以text[i:]开头的最优分词方案的概率
        dp = [0.0] * (n + 1)
        dp[n] = 1.0
        backptr = [-1] * n
        
        for i in range(n - 1, -1, -1):
            best_prob = float('-inf')
            best_len = 0
            
            for length in range(1, min(50, n - i + 1)):  # 最大token长度
                token = text[i:i+length]
                if token not in self.token_probs:
                    continue
                
                prob = self.token_probs[token] * dp[i + length]
                if prob > best_prob:
                    best_prob = prob
                    best_len = length
            
            dp[i] = best_prob
            backptr[i] = best_len
        
        # 回溯
        tokens = []
        pos = 0
        while pos < n:
            length = backptr[pos]
            if length == 0:  # 未找到有效分词,使用单字符
                tokens.append(text[pos])
                pos += 1
            else:
                tokens.append(text[pos:pos+length])
                pos += length
        
        return tokens
    
    def _estimate_parameters(self):
        """估计token概率参数"""
        total = sum(self.token_freqs.values())
        self.token_probs = {
            token: freq / total 
            for token, freq in self.token_freqs.items()
        }

2. 词表大小的影响

2.1 词表大小的权衡

词表大小是Tokenizer设计的核心超参数之一:

词表大小
    │
    │    ┌─────────────────────────────────┐
32768 ├────┤                                 │
    │    │    词表过大:                    │
    │    │    - 序列长度短                  │
    │    │    - embedding稀疏               │
    │    │    - 训练数据稀疏                │
    │    └─────────────────────────────────┘
    │
    │    ┌─────────────────────────────────┐
 4096 ├────┤    词表适中:                  │
    │    │    - 平衡序列长度与覆盖率         │
    │    │    - 常见词有直接token           │
    │    │    - 未登录词通过子词处理        │
    │    └─────────────────────────────────┘
    │
    │    ┌─────────────────────────────────┐
 1024 ├────┤    词表过小:                  │
    │    │    - 序列长度过长                │
    │    │    - 序列编码效率低              │
    │    │    - 推理成本增加                │
    │    └─────────────────────────────────┘
    │
    └─────────────────────────────────────────────► 模型参数

2.2 词表大小与模型性能

词表大小典型应用特点
1K-4K字符级/小词表序列长,但泛化能力强
8K-16K经典模型平衡选择
32K-64K现代LLMGPT-3, LLaMA等
100K+多语言/多模态ChatGPT, Claude
# 词表大小与参数量的关系
def calculate_embedding_params(vocab_size, embedding_dim):
    """计算embedding层参数量"""
    return vocab_size * embedding_dim
 
# 不同词表大小的embedding参数量
configs = [
    (30000, 4096, "LLaMA-7B"),
    (32000, 4096, "LLaMA-13B"),
    (50000, 4096, "ChatGPT"),
]
 
for vocab_size, dim, model in configs:
    params = calculate_embedding_params(vocab_size, dim)
    print(f"{model}: vocab={vocab_size}, embedding params={params:,}")

2.3 词表设计原则

def design_vocabulary(corpus, target_size=32000):
    """词表设计流程"""
    
    # 1. 分析语料分布
    stats = analyze_corpus_distribution(corpus)
    
    # 2. 确定特殊token
    special_tokens = [
        "<pad>",      # 填充
        "<s>",        # 句子开始
        "</s>",       # 句子结束
        "<unk>",      # 未知词
        "<mask>",     # 掩码
        # 业务相关特殊token
        "[USER]",     # 用户提及
        "[BOT]",      # 机器人回复
    ]
    
    # 3. 预留位置
    num_special = len(special_tokens)
    effective_target = target_size - num_special
    
    # 4. 训练BPE/Unigram
    base_vocab = train_tokenizer(corpus, vocab_size=effective_target)
    
    # 5. 合并词表
    full_vocab = special_tokens + list(base_vocab.keys())
    
    # 6. 验证覆盖率
    coverage = calculate_coverage(corpus, full_vocab)
    
    return full_vocab, coverage

3. 多语言Tokenizer设计

3.1 多语言挑战

语言特点挑战
英语空格分隔相对简单
中文无空格需要分词
日语无空格+多种字符集最大挑战
阿拉伯语从右到左双向问题
泰语无空格分词歧义
德语复合词长词问题

3.2 统一词表策略

class MultilingualTokenizer:
    """多语言统一Tokenizer"""
    
    def __init__(self, vocab_size=32000):
        self.vocab_size = vocab_size
        self.lang_specific_rules = {}
    
    def train(self, corpora: Dict[str, List[str]]):
        """
        训练多语言Tokenizer
        
        Args:
            corpora: {"en": [...], "zh": [...], "ja": [...]}
        """
        # 1. 预处理各语言
        processed_corpora = {}
        for lang, corpus in corpora.items():
            processed = self.preprocess_language(corpus, lang)
            processed_corpora[lang] = processed
        
        # 2. 合并语料(可加权)
        combined = []
        for lang, corpus in processed_corpora.items():
            weight = self.get_language_weight(lang)
            combined.extend(corpus * weight)
        
        # 3. 训练统一词表
        self.vocab = self.train_bpe(combined, self.vocab_size)
        
        # 4. 保存语言特定规则
        self.lang_specific_rules = {
            lang: self.extract_lang_rules(corpus)
            for lang, corpus in corpora.items()
        }
    
    def preprocess_language(self, corpus, lang):
        """语言特定预处理"""
        if lang == "zh":
            # 中文:使用jieba进行初步分词
            import jieba
            return [" ".join(jieba.cut(text)) for text in corpus]
        
        elif lang == "ja":
            # 日语:使用mecab分词
            import fugashi
            return [self.mecab_tokenize(text) for text in corpus]
        
        elif lang == "ar":
            # 阿拉伯语:处理RTL和连写
            return [self.normalize_arabic(text) for text in corpus]
        
        return corpus
    
    def get_language_weight(self, lang):
        """语言权重:平衡数据量差异"""
        weights = {
            "en": 1.0,
            "zh": 1.2,   # 中文信息密度高
            "ja": 1.1,
            "ko": 1.1,
            "ar": 0.9,
        }
        return weights.get(lang, 1.0)

3.3 字节级Tokenizer的优势

class ByteLevelTokenizer:
    """字节级Tokenizer:多语言通用方案"""
    
    def __init__(self, vocab_size=256):
        # 词表大小固定为256(字节数)
        self.vocab_size = 256
        self.pad_token_id = 0
    
    def encode(self, text: str) -> List[int]:
        """编码:文本 -> 字节 -> token"""
        # UTF-8编码转字节
        bytes_sequence = text.encode('utf-8')
        return list(bytes_sequence)
    
    def decode(self, tokens: List[int]) -> str:
        """解码:token -> 字节 -> 文本"""
        bytes_sequence = bytes(tokens)
        return bytes_sequence.decode('utf-8', errors='replace')
    
    def train_merges(self, corpus, num_merges=2000):
        """
        训练BPE合并规则
        这允许用小词表处理任意Unicode字符
        """
        # 初始化:每个字节一个token
        vocab = {bytes([i]): i for i in range(256)}
        
        # 训练合并规则
        self.merges = []
        for _ in range(num_merges):
            pair_freq = self.count_pair_freq(corpus)
            best_pair = max(pair_freq, key=pair_freq.get)
            
            new_id = len(vocab)
            vocab[best_pair[0] + best_pair[1]] = new_id
            self.merges.append(best_pair)
            
            # 应用合并
            corpus = [self.apply_merge(text, best_pair) for text in corpus]
        
        self.vocab = vocab
 
 
# 字节级Tokenizer的覆盖率保证
def test_coverage(tokenizer, text):
    """字节级Tokenizer的OOV率为0"""
    tokens = tokenizer.encode(text)
    
    # 检查是否有OOV
    oov_count = sum(1 for t in tokens if t == tokenizer.unk_token_id)
    
    return {
        "total_tokens": len(tokens),
        "oov_tokens": oov_count,
        "oov_rate": oov_count / len(tokens) if tokens else 0,
        "coverage": 1.0 - (oov_count / len(tokens) if tokens else 0)
    }

4. Tokenizer训练实践

4.1 训练流程

from sentencepiece import SentencePieceTrainer, SentencePieceProcessor
import os
 
def train_sentencepiece_tokenizer(
    corpus_path: str,
    model_prefix: str = "tokenizer",
    vocab_size: int = 32000,
    character_coverage: float = 0.9995,
    model_type: str = "unigram",  # "unigram", "bpe", "char", "word"
    max_sentence_length: int = 16384,
    num_threads: int = 8
):
    """
    使用SentencePiece训练Tokenizer
    
    Args:
        corpus_path: 语料文件路径
        model_prefix: 模型前缀
        vocab_size: 词表大小
        character_coverage: 字符覆盖率(用于处理生僻字)
        model_type: 模型类型
    """
    
    # 构建训练命令
    train_args = [
        f"--input={corpus_path}",
        f"--model_prefix={model_prefix}",
        f"--vocab_size={vocab_size}",
        f"--character_coverage={character_coverage}",
        f"--model_type={model_type}",
        f"--max_sentence_length={max_sentence_length}",
        f"--num_threads={num_threads}",
        f"--pad_id=0",          # 填充token
        f"--unk_id=1",          # 未知token
        f"--bos_id=2",          # 句子开始
        f"--eos_id=3",          # 句子结束
        f"--pad_piece=[PAD]",    # 填充表示
        f"--unk_piece=[UNK]",    # 未知表示
        f"--bos_piece=[BOS]",    # 开始表示
        f"--eos_piece=[EOS]",    # 结束表示
        # 训练参数
        f"--split_by_whitespace=true",
        f"--split_by_number=true",
        f"--split_by_unicode_script=true",
        f"--allow_whitespace_only_pieces=true",
    ]
    
    # 训练
    SentencePieceTrainer.train(" ".join(train_args))
    
    # 加载模型
    spm = SentencePieceProcessor()
    spm.load(f"{model_prefix}.model")
    
    return spm
 
 
def prepare_corpus_for_tokenizer(corpus_dir: str, output_path: str):
    """
    准备语料:清洗和格式化
    """
    texts = []
    
    for file in os.listdir(corpus_dir):
        if not file.endswith(('.txt', '.json', '.jsonl')):
            continue
        
        filepath = os.path.join(corpus_dir, file)
        
        with open(filepath, 'r', encoding='utf-8') as f:
            if file.endswith('.jsonl'):
                for line in f:
                    data = json.loads(line)
                    text = data.get('text', data.get('content', ''))
                    if text:
                        texts.append(text.strip())
            else:
                for line in f:
                    if line.strip():
                        texts.append(line.strip())
    
    # 写入合并文件
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write('\n'.join(texts))
    
    return len(texts)

4.2 多语言训练示例

# 训练多语言Tokenizer
multilingual_corpus = {
    "en": "/data/corpus/en.txt",
    "zh": "/data/corpus/zh.txt",
    "ja": "/data/corpus/ja.txt",
    "ko": "/data/corpus/ko.txt",
}
 
# 合并语料(可按语言比例采样)
combined_corpus = "/data/corpus/combined.txt"
 
# 语料混合比例
ratios = {"en": 0.4, "zh": 0.3, "ja": 0.15, "ko": 0.15}
 
with open(combined_corpus, 'w') as out:
    for lang, path in multilingual_corpus.items():
        ratio = ratios[lang]
        with open(path, 'r') as f:
            lines = f.readlines()
            # 按比例采样
            sample_size = int(len(lines) * ratio)
            sampled = random.sample(lines, min(sample_size, len(lines)))
            out.writelines(sampled)
 
# 训练
tokenizer = train_sentencepiece_tokenizer(
    corpus_path=combined_corpus,
    model_prefix="/models/multilingual_tokenizer",
    vocab_size=48000,  # 稍大词表容纳多语言
    model_type="unigram"
)

4.3 训练数据准备

class CorpusPreprocessor:
    """语料预处理"""
    
    def __init__(self, min_length=10, max_length=4096):
        self.min_length = min_length
        self.max_length = max_length
    
    def preprocess(self, texts: List[str]) -> List[str]:
        """预处理文本"""
        cleaned = []
        
        for text in texts:
            # 1. 基本清洗
            text = self.clean_text(text)
            
            # 2. 长度过滤
            if not self.min_length <= len(text) <= self.max_length:
                continue
            
            # 3. 语言检测(可选)
            if self.is_valid_language(text):
                cleaned.append(text)
        
        return cleaned
    
    def clean_text(self, text: str) -> str:
        """清洗文本"""
        # 移除控制字符
        text = ''.join(ch for ch in text if ch.isprintable() or ch in '\n\t')
        
        # 规范化空白
        text = ' '.join(text.split())
        
        # 移除特殊字符(根据需求)
        # text = re.sub(r'[^\w\s\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff]', '', text)
        
        return text
    
    def deduplicate(self, texts: List[str]) -> List[str]:
        """去重"""
        seen = set()
        unique = []
        
        for text in texts:
            # 使用hash去重
            h = hash(text)
            if h not in seen:
                seen.add(h)
                unique.append(text)
        
        return unique

5. 特殊Token管理

5.1 特殊Token类型

TokenID用途
<pad>0序列填充
<unk>1未知词
<bos>2句子开始
<eos>3句子结束
<sep>4分隔符
<mask>5掩码(MLM)
[CLS]6分类token
[SEP]7BERT分隔符
`<user>`
`<assistant>`

5.2 特殊Token添加策略

def add_special_tokens(
    base_vocab_path: str,
    special_tokens: Dict[str, str],
    output_path: str
):
    """
    向基础词表添加特殊Token
    
    Args:
        base_vocab_path: 基础词表路径
        special_tokens: 特殊token字典 {name: value}
        output_path: 输出路径
    """
    # 加载基础词表
    with open(base_vocab_path, 'r') as f:
        base_vocab = json.load(f)
    
    # 扩展词表
    for name, value in special_tokens.items():
        if name not in base_vocab:
            base_vocab[name] = len(base_vocab)
    
    # 保存
    with open(output_path, 'w') as f:
        json.dump(base_vocab, f, ensure_ascii=False, indent=2)
    
    return base_vocab
 
 
# 示例:为对话模型添加角色token
chat_special_tokens = {
    "<|system|>": "系统提示",
    "<|user|>": "用户输入",
    "<|assistant|>": "助手回复",
    "<|endoftext|>": "文本结束",
    # 工具调用
    "<|tool_call|>": "工具调用开始",
    "<|tool_response|>": "工具响应",
    "<|function|>": "函数定义",
}
 
extended_vocab = add_special_tokens(
    base_vocab_path="/models/base_tokenizer.json",
    special_tokens=chat_special_tokens,
    output_path="/models/chat_tokenizer.json"
)

6. Tokenizer评估与分析

6.1 评估指标

class TokenizerEvaluator:
    """Tokenizer评估工具"""
    
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
    
    def evaluate(self, test_corpus: List[str]) -> Dict:
        """全面评估Tokenizer"""
        return {
            "compression_ratio": self.compression_ratio(test_corpus),
            "coverage": self.vocabulary_coverage(test_corpus),
            "oov_rate": self.oov_rate(test_corpus),
            "avg_token_length": self.avg_token_length(test_corpus),
            "special_token_ratio": self.special_token_ratio(test_corpus),
            "chinese_coverage": self.chinese_coverage(test_corpus),
        }
    
    def compression_ratio(self, corpus: List[str]) -> float:
        """
        压缩率:原始字符数 / token数
        越高表示压缩效果越好
        """
        total_chars = sum(len(text) for text in corpus)
        total_tokens = sum(len(self.tokenizer.encode(text)) for text in corpus)
        
        return total_chars / total_tokens if total_tokens > 0 else 0
    
    def vocabulary_coverage(self, corpus: List[str]) -> Dict:
        """
        词表覆盖率
        """
        vocab = set()
        total_tokens = 0
        
        for text in corpus:
            tokens = self.tokenizer.encode(text)
            vocab.update(tokens)
            total_tokens += len(tokens)
        
        # 计算每个token的覆盖率
        token_freq = Counter()
        for text in corpus:
            tokens = self.tokenizer.encode(text)
            token_freq.update(tokens)
        
        covered_tokens = sum(token_freq.values())
        
        return {
            "unique_tokens": len(vocab),
            "vocab_size": self.tokenizer.vocab_size(),
            "unique_coverage": len(vocab) / self.tokenizer.vocab_size(),
            "token_coverage": covered_tokens / total_tokens if total_tokens > 0 else 0
        }
    
    def oov_rate(self, corpus: List[str]) -> float:
        """
        未登录词率
        """
        total_chars = 0
        oov_chars = 0
        
        for text in corpus:
            # 检查未登录字符
            for char in text:
                total_chars += 1
                if not self.tokenizer.is_in_vocab(char):
                    oov_chars += 1
        
        return oov_chars / total_chars if total_chars > 0 else 0
    
    def avg_token_length(self, corpus: List[str]) -> float:
        """平均token长度"""
        lengths = [len(self.tokenizer.encode(text)) for text in corpus]
        return sum(lengths) / len(lengths) if lengths else 0

6.2 词表分析

def analyze_vocabulary(tokenizer, corpus: List[str]):
    """分析词表结构"""
    # 统计token类型
    token_stats = {
        "single_char": 0,
        "word": 0,
        "subword": 0,
        "special": 0
    }
    
    # token频率统计
    freq_dist = Counter()
    
    for text in corpus:
        tokens = tokenizer.encode(text)
        freq_dist.update(tokens)
    
    # 分类统计
    for token_id in range(tokenizer.vocab_size()):
        token = tokenizer.id_to_token(token_id)
        
        if tokenizer.is_special(token):
            token_stats["special"] += 1
        elif len(token) == 1:
            token_stats["single_char"] += 1
        elif token.startswith("▁"):  # SentencePiece标记
            token_stats["subword"] += 1
        else:
            token_stats["word"] += 1
    
    return {
        "stats": token_stats,
        "top_20_tokens": tokenizer.decode(list(range(20))),
        "bottom_20_tokens": [tokenizer.id_to_token(i) for i in range(tokenizer.vocab_size()-20, tokenizer.vocab_size())]
    }

7. 子词vs字符级Tokenization权衡

7.1 不同粒度的比较

# 不同Tokenization粒度的比较
comparison = {
    "char_level": {
        "vocab_size": 256,
        "seq_length_multiplier": 4,  # vs词级
        "oov_rate": 0,
        "semantic_preservation": "low",
        "training_data_efficiency": "low",
    },
    "subword_bpe": {
        "vocab_size": 30000,
        "seq_length_multiplier": 1.5,
        "oov_rate": 0.02,
        "semantic_preservation": "high",
        "training_data_efficiency": "high",
    },
    "word_level": {
        "vocab_size": 100000,
        "seq_length_multiplier": 1.0,
        "oov_rate": 0.15,
        "semantic_preservation": "very_high",
        "training_data_efficiency": "medium",
    }
}

7.2 选择建议

场景推荐粒度理由
低资源语言字节级保证零OOV
多语言统一子词级跨语言共享子词
代码模型子词+字节混合处理特殊符号
专用领域词+子词保留专业术语
中文为主字符+词混合平衡效率与语义

8. 实践指南

8.1 从头训练Tokenizer

# 使用SentencePiece训练
spm_train \
    --input=corpus.txt \
    --model_prefix=my_tokenizer \
    --vocab_size=32000 \
    --character_coverage=0.9995 \
    --model_type=unigram \
    --pad_id=0 \
    --unk_id=1 \
    --bos_id=2 \
    --eos_id=3 \
    --add_dummy_prefix=false \
    --split_by_whitespace=true \
    --split_by_number=true

8.2 加载和使用Tokenizer

from transformers import AutoTokenizer
 
# 加载HuggingFace格式的Tokenizer
tokenizer = AutoTokenizer.from_pretrained("/path/to/tokenizer")
 
# 编码
text = "Hello, world!"
tokens = tokenizer.encode(text, return_tensors="pt")
 
# 解码
decoded = tokenizer.decode(tokens[0])
 
# 批量编码
batch = ["Hello", "World", "你好"]
encoded = tokenizer(batch, padding=True, truncation=True, return_tensors="pt")

8.3 检查Tokenizer质量

def quality_check(tokenizer, test_texts):
    """Tokenizer质量检查"""
    issues = []
    
    for text in test_texts:
        # 1. 往返一致性
        tokens = tokenizer.encode(text)
        decoded = tokenizer.decode(tokens)
        if decoded.strip() != text.strip():
            issues.append(f"Round-trip mismatch: {text} -> {decoded}")
        
        # 2. 特殊字符处理
        if any(ord(c) > 0xFFFF for c in text):
            if not tokenizer.can_encode_special_chars():
                issues.append(f"Special chars not handled: {text}")
        
        # 3. 长度异常
        if len(tokens) / len(text) > 3:  # 压缩率过低
            issues.append(f"Poor compression: {len(text)} chars -> {len(tokens)} tokens")
    
    return {
        "issues": issues,
        "pass": len(issues) == 0,
        "quality_score": 1.0 - len(issues) / len(test_texts)
    }

9. 总结

Tokenizer设计是LLM训练的关键环节,需要综合考虑:

  1. 词表大小:平衡模型参数、序列长度和覆盖率
  2. 分词粒度:根据语言特性和任务需求选择
  3. 多语言支持:统一词表 vs 语言特定词表
  4. 特殊Token:支持模型架构和业务需求
  5. 训练数据:数据质量和分布影响词表质量

参考资料