cs336 作业中的一段介绍: > While the Unicode standard defines a
mapping from characters to code points (integers), it’s impractical to
train tokenizers directly on Unicode codepoints, since the vocabulary
would be prohibitively large (around 150K items) and sparse (since many
characters are quite rare). Instead, we’ll use a Unicode encoding, which
converts a Unicode character into a sequence of bytes
To encode a Unicode string into UTF-8, we can use the
encode() function in Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
>>> test_string = "hello! こんにちは!" >>> utf8_encoded = test_string.encode("utf-8") >>> print(utf8_encoded) b'hello! \xe3\x81\x93\xe3\x82\x93\xe3\x81\xab\xe3\x81\xa1\xe3\x81\xaf!' >>> print(type(utf8_encoded)) <class'bytes'> >>> # Get the byte values for the encoded string (integers from 0 to 255). >>> list(utf8_encoded) [104, 101, 108, 108, 111, 33, 32, 227, 129, 147, 227, 130, 147, 227, 129, 171, 227, 129, 161, 227, 129, 175, 33] >>> # One byte does not necessarily correspond to one Unicode character! >>> print(len(test_string)) 13 >>> print(len(utf8_encoded)) 23 >>> print(utf8_encoded.decode("utf-8")) hello! こんにちは!
By converting our Unicode codepoints into a sequence of bytes (e.g.,
via the UTF-8 encoding), we are essentially taking a sequence of
codepoints (integers in the range 0 to 154,997) and transforming it into
a sequence of byte values (integers in the range 0 to 255).
具体的编码规则不进行展开了
Subword Tokenization
While byte-level tokenization can alleviate the out-of-vocabulary
issues faced by word-level tokenizers, tokenizing text into bytes
results in extremely long input sequences. 虽然 byte-level
tokenization 可以解决 word-level
tokenizers 导致的 oov 的问题,但是这样会导致输入序列非常长。
Subword tokenization is a midpoint between word-level tokenizers and
byte-level tokenizers.
For example, if the byte sequence b’the’ often occurs in our raw text
training data, assigning it an entry in the vocabulary would reduce this
3-token sequence to a single token.
BPE Tokenizer Training
BPE 分词器的训练主要分为 3 步
Vocabulary initialization:
词表是一个从 bytestring 到 int 的一一映射。由于我们采用的是 byte-level
BPE,所以词表的初始大小就是 256
Pre-tokenization:
理论上,有了初始的词表之后,我们就可以直接遍历所有的字节对,然后把出现频率最高的字节对合并成一个新的子词,加入到词表中。但是这样做效率非常低,因为每一次我们都要遍历整一个语料库。另外直接合并字节可能会产生跨单词边界的 token,或者创建出仅在标点符号上有差异的冗余 token(如 dog 和 dog! 被视为完全不同的 token),这浪费了词汇表空间且不符合语言规律。
# The following is a serial implementation, but you can parallelize this # by sending each start/end pair to a set of processes. chunks = [] for start, end inzip(boundaries[:-1], boundaries[1:]): f.seek(start) chunk = f.read(end - start).decode("utf-8", errors="ignore") # Run pre-tokenization on your chunk and store the counts for each pre-token chunks.append(chunk)
args = [(chunk, special_tokens) for chunk in chunks]
with multiprocessing.Pool(processes=num_processes) as pool: results = pool.starmap(process_chunk, args) word_cnt = Counter() for result in results: word_cnt.update(result) vocab = get_basic_vocab(special_tokens) base_vocab_size = len(vocab) num_merges = vocab_size - base_vocab_size
pair_cnt, pair2word_bytes = count_pair(word_cnt)
merges = [] for i inrange(num_merges): if i % 100 == 0: # 每100轮重建一次堆,避免堆变得太大 heap = MaxHeap() for pair, cnt in pair_cnt.items(): heap.push((cnt, pair)) # 懒惰删除 whileTrue: cnt, pair = heap.pop() # print(cnt, pair) if pair in pair_cnt and pair_cnt[pair] == cnt: max_pair = pair break # 否则丢弃,继续弹出 merges.append(max_pair) vocab[base_vocab_size + i] = max_pair[0] + max_pair[1]
# 更新word_cnt # 只需要改动那些max_pair合并影响到的 affected_word_bytes = pair2word_bytes[max_pair] # set pair2word_bytes.pop(max_pair) affected_pairs = set() for word_bytes in affected_word_bytes: cnt = word_cnt[word_bytes] word_cnt.pop(word_bytes)
for pair inzip(word_bytes[:-1], word_bytes[1:]): pair_cnt[pair] -= cnt if pair_cnt[pair] == 0: del pair_cnt[pair] pair2word_bytes[pair].discard(word_bytes) ifnot pair2word_bytes[pair]: del pair2word_bytes[pair]
for i, pair inenumerate(zip(new_word_bytes[:-1], new_word_bytes[1:])): pair_cnt[pair] += cnt pair2word_bytes[pair].add(new_word_bytes) affected_pairs.add(pair) for pair in affected_pairs: if pair in pair_cnt: heap.push((pair_cnt[pair], pair))
if special_tokens: for special_token in special_tokens: byte_encoded_special_token = special_token.encode("utf-8") if byte_encoded_special_token notinset(vocab.values()): vocab[len(vocab)] = byte_encoded_special_token
vocab = { vocab_index: bytes([c for c in vocab_item]) for vocab_index, vocab_item in vocab.items() }
merges = [ ( bytes([token for token in merge_token_1]), bytes([token for token in merge_token_2]), ) for merge_token_1, merge_token_2 in merges ] tokenizer = BPETokenizer(vocab, merges, special_tokens) return tokenizer
defencode(self, text: str) -> list[int]: vocab_reversed = {v: k for k, v inself.vocab.items()} token_bytes_list = pre_tokenize(text, self.special_tokens) # list[bytes]
# print((b'He', b'llo') in merges) new_token_bytes_list = [] for token_bytes in token_bytes_list: # bytes e.g. b'the' # print("initial token_bytes:", token_bytes)
if token_bytes in byte_special_tokens: new_token_bytes_list.append(token_bytes) # print("special: ", token_bytes) continue # 转换为list[bytes] token_bytes = [bytes([byte]) for byte in token_bytes] # print("token_bytes:", token_bytes) # print(token_bytes[0])
# 必须按照词表的顺序 for merge inself.merges: new_token_bytes = [] j = 0 # print(merge) while j < len(token_bytes): if j < len(token_bytes) - 1and (token_bytes[j], token_bytes[j+1]) == merge: new_token_bytes.append(token_bytes[j] + token_bytes[j+1]) j += 2 else: new_token_bytes.append(token_bytes[j]) j += 1 # print(new_token_bytes) token_bytes = new_token_bytes # print(len(new_token_bytes)) new_token_bytes_list.extend(new_token_bytes) # print("new_token_bytes_list after merged: ", new_token_bytes_list) new_token_ids_list = [vocab_reversed[i] for i in new_token_bytes_list] # print(new_token_ids_list) return new_token_ids_list
defencode_iterable(self, iterable: Iterable[str]) -> Iterator[int]: """ Given an iterable of strings (e.g., a Python file handle), return a generator that lazily yields token IDs. This is required for memory-efficient tokenization of large files that we cannot directly load into memory """ for line in iterable: for idx inself.encode(line): yield idx
defdecode(self, ids: list[int]) -> str: # print("ids:", ids) result = [] # print(type(list(self.vocab.keys())[0])) # int foridin ids: # print(self.vocab[id].decode("utf-8")) # print(type(self.vocab.get(id, None))) result.extend(self.vocab.get(id, None)) # print("decode result:", result) result = bytes(result).decode("utf-8", errors="replace") # print("decode result:", result) return result