| | |
| | """ |
| | Maaza Nano-Orchestrator 9.6M - Custom BPE Tokenizer |
| | Train a tool-focused tokenizer with 8k vocab. |
| | |
| | Key goal: Tool names become single tokens (maaza_extract_json = 1 token, not 5) |
| | """ |
| |
|
| | import json |
| | import re |
| | from pathlib import Path |
| | from typing import List, Dict, Optional, Tuple |
| | from collections import Counter |
| | import argparse |
| |
|
| | |
| | |
| | |
| |
|
| | SPECIAL_TOKENS = [ |
| | "<|pad|>", |
| | "<|unk|>", |
| | "<|bos|>", |
| | "<|eos|>", |
| | "<|tool_start|>", |
| | "<|tool_end|>", |
| | "<|param_start|>", |
| | "<|param_end|>", |
| | "<|user|>", |
| | "<|assistant|>", |
| | "<|system|>", |
| | ] |
| |
|
| | |
| | TOOL_TOKENS = [ |
| | |
| | "maaza_extract_json", |
| | "mcpbodega_deploy", |
| | "mcpbodega_list", |
| | "doom_mcp", |
| | "bitchat_send", |
| | "crypto_lookup", |
| | "scratchpad_mcp", |
| | "voice_mcp", |
| | |
| | "web_search", |
| | "web_fetch", |
| | "puppeteer_navigate", |
| | "puppeteer_click", |
| | "puppeteer_screenshot", |
| | "puppeteer_extract", |
| | |
| | "file_read", |
| | "file_write", |
| | "database_query", |
| | "csv_parse", |
| | "json_validate", |
| | "image_caption", |
| | |
| | "code_execute_python", |
| | "code_execute_js", |
| | "calculator", |
| | "regex_match", |
| | "shell_command", |
| | |
| | "weather_lookup", |
| | "stock_lookup", |
| | "news_fetch", |
| | "email_send", |
| | "calendar_add", |
| | |
| | "mcpbodega_chat", |
| | "health_check", |
| | "slmbench_query", |
| | "slack_send", |
| | "github_issue", |
| | "cyclecore_terminal", |
| | ] |
| |
|
| | |
| | JSON_TOKENS = [ |
| | '{"tool"', |
| | '"params"', |
| | '"action"', |
| | '"retry"', |
| | '"fallback"', |
| | "true", |
| | "false", |
| | "null", |
| | ] |
| |
|
| | |
| | RECOVERY_TOKENS = [ |
| | "retry", |
| | "fallback", |
| | "timeout", |
| | "rate_limit", |
| | "unavailable", |
| | "max_retries", |
| | "backoff", |
| | "exponential", |
| | "alternative", |
| | ] |
| |
|
| | |
| | |
| | |
| |
|
| | class BPETokenizer: |
| | """Custom BPE tokenizer optimized for tool routing.""" |
| |
|
| | def __init__(self, vocab_size: int = 8000): |
| | self.vocab_size = vocab_size |
| | self.vocab: Dict[str, int] = {} |
| | self.inverse_vocab: Dict[int, str] = {} |
| | self.merges: List[Tuple[str, str]] = [] |
| |
|
| | |
| | self._init_special_tokens() |
| |
|
| | def _init_special_tokens(self): |
| | """Initialize vocabulary with special tokens.""" |
| | idx = 0 |
| |
|
| | |
| | for token in SPECIAL_TOKENS: |
| | self.vocab[token] = idx |
| | self.inverse_vocab[idx] = token |
| | idx += 1 |
| |
|
| | |
| | for token in TOOL_TOKENS: |
| | self.vocab[token] = idx |
| | self.inverse_vocab[idx] = token |
| | idx += 1 |
| |
|
| | |
| | for token in JSON_TOKENS: |
| | self.vocab[token] = idx |
| | self.inverse_vocab[idx] = token |
| | idx += 1 |
| |
|
| | |
| | for token in RECOVERY_TOKENS: |
| | self.vocab[token] = idx |
| | self.inverse_vocab[idx] = token |
| | idx += 1 |
| |
|
| | |
| | for i in range(256): |
| | char = chr(i) if i >= 32 and i < 127 else f"<0x{i:02X}>" |
| | if char not in self.vocab: |
| | self.vocab[char] = idx |
| | self.inverse_vocab[idx] = char |
| | idx += 1 |
| |
|
| | self.base_vocab_size = idx |
| |
|
| | def _get_pairs(self, word: List[str]) -> Counter: |
| | """Get all adjacent pairs in word.""" |
| | pairs = Counter() |
| | for i in range(len(word) - 1): |
| | pairs[(word[i], word[i + 1])] += 1 |
| | return pairs |
| |
|
| | def _merge_pair(self, pair: Tuple[str, str], word: List[str]) -> List[str]: |
| | """Merge a specific pair in the word.""" |
| | new_word = [] |
| | i = 0 |
| | while i < len(word): |
| | if i < len(word) - 1 and word[i] == pair[0] and word[i + 1] == pair[1]: |
| | new_word.append(pair[0] + pair[1]) |
| | i += 2 |
| | else: |
| | new_word.append(word[i]) |
| | i += 1 |
| | return new_word |
| |
|
| | def _tokenize_word(self, word: str) -> List[str]: |
| | """Tokenize a single word to characters.""" |
| | |
| | if word in self.vocab: |
| | return [word] |
| |
|
| | |
| | for tool in TOOL_TOKENS: |
| | if tool in word: |
| | parts = word.split(tool) |
| | result = [] |
| | for i, part in enumerate(parts): |
| | if part: |
| | result.extend(list(part)) |
| | if i < len(parts) - 1: |
| | result.append(tool) |
| | return result |
| |
|
| | return list(word) |
| |
|
| | def train(self, texts: List[str], verbose: bool = True): |
| | """Train BPE on a corpus of texts.""" |
| | if verbose: |
| | print(f"Training BPE tokenizer (target vocab: {self.vocab_size})") |
| | print(f" Base vocab size: {self.base_vocab_size}") |
| |
|
| | |
| | word_freqs = Counter() |
| | for text in texts: |
| | |
| | words = re.findall(r'\w+|[^\w\s]', text.lower()) |
| | word_freqs.update(words) |
| |
|
| | |
| | splits = {} |
| | for word, freq in word_freqs.items(): |
| | splits[word] = (self._tokenize_word(word), freq) |
| |
|
| | |
| | num_merges = self.vocab_size - len(self.vocab) |
| | if verbose: |
| | print(f" Performing {num_merges} merges...") |
| |
|
| | for merge_idx in range(num_merges): |
| | |
| | pair_freqs = Counter() |
| | for word, (split, freq) in splits.items(): |
| | pairs = self._get_pairs(split) |
| | for pair, count in pairs.items(): |
| | pair_freqs[pair] += count * freq |
| |
|
| | if not pair_freqs: |
| | break |
| |
|
| | |
| | best_pair = pair_freqs.most_common(1)[0][0] |
| | self.merges.append(best_pair) |
| |
|
| | |
| | merged = best_pair[0] + best_pair[1] |
| | if merged not in self.vocab: |
| | idx = len(self.vocab) |
| | self.vocab[merged] = idx |
| | self.inverse_vocab[idx] = merged |
| |
|
| | |
| | for word in splits: |
| | split, freq = splits[word] |
| | splits[word] = (self._merge_pair(best_pair, split), freq) |
| |
|
| | if verbose and (merge_idx + 1) % 500 == 0: |
| | print(f" Merge {merge_idx + 1}: '{best_pair[0]}' + '{best_pair[1]}' -> '{merged}'") |
| |
|
| | if verbose: |
| | print(f" Final vocab size: {len(self.vocab)}") |
| |
|
| | def encode(self, text: str) -> List[int]: |
| | """Encode text to token IDs.""" |
| | tokens = [] |
| |
|
| | |
| | |
| | special_pattern = '|'.join(re.escape(t) for t in SPECIAL_TOKENS) |
| | tool_pattern = '|'.join(re.escape(t) for t in TOOL_TOKENS) |
| | combined_pattern = f'({special_pattern}|{tool_pattern})' |
| |
|
| | |
| | parts = re.split(combined_pattern, text) |
| |
|
| | for part in parts: |
| | if not part: |
| | continue |
| |
|
| | |
| | if part in self.vocab: |
| | tokens.append(self.vocab[part]) |
| | continue |
| |
|
| | |
| | words = re.findall(r'\w+|[^\w\s]|\s+', part) |
| |
|
| | for word in words: |
| | |
| | if word in self.vocab: |
| | tokens.append(self.vocab[word]) |
| | continue |
| |
|
| | |
| | word_lower = word.lower() |
| | if word_lower in self.vocab: |
| | tokens.append(self.vocab[word_lower]) |
| | continue |
| |
|
| | |
| | found_tool = False |
| | for tool in TOOL_TOKENS: |
| | if tool in word_lower: |
| | parts_inner = word_lower.split(tool) |
| | for i, p in enumerate(parts_inner): |
| | if p: |
| | tokens.extend(self._encode_subword(p)) |
| | if i < len(parts_inner) - 1: |
| | tokens.append(self.vocab[tool]) |
| | found_tool = True |
| | break |
| |
|
| | if found_tool: |
| | continue |
| |
|
| | |
| | tokens.extend(self._encode_subword(word_lower)) |
| |
|
| | return tokens |
| |
|
| | def _encode_subword(self, word: str) -> List[int]: |
| | """Apply BPE merges to encode a subword.""" |
| | if not word: |
| | return [] |
| |
|
| | if word in self.vocab: |
| | return [self.vocab[word]] |
| |
|
| | |
| | word_tokens = list(word) |
| |
|
| | |
| | for pair in self.merges: |
| | i = 0 |
| | while i < len(word_tokens) - 1: |
| | if word_tokens[i] == pair[0] and word_tokens[i + 1] == pair[1]: |
| | word_tokens = word_tokens[:i] + [pair[0] + pair[1]] + word_tokens[i + 2:] |
| | else: |
| | i += 1 |
| |
|
| | |
| | ids = [] |
| | for token in word_tokens: |
| | if token in self.vocab: |
| | ids.append(self.vocab[token]) |
| | else: |
| | |
| | ids.append(self.vocab["<|unk|>"]) |
| |
|
| | return ids |
| |
|
| | def decode(self, ids: List[int]) -> str: |
| | """Decode token IDs back to text.""" |
| | tokens = [self.inverse_vocab.get(i, "<|unk|>") for i in ids] |
| | text = "".join(tokens) |
| |
|
| | |
| | for special in SPECIAL_TOKENS: |
| | text = text.replace(special, "") |
| |
|
| | return text |
| |
|
| | def save(self, path: str): |
| | """Save tokenizer to file.""" |
| | data = { |
| | "vocab_size": self.vocab_size, |
| | "vocab": self.vocab, |
| | "merges": self.merges, |
| | "special_tokens": SPECIAL_TOKENS, |
| | "tool_tokens": TOOL_TOKENS, |
| | } |
| | with open(path, "w") as f: |
| | json.dump(data, f, indent=2) |
| | print(f"Tokenizer saved to {path}") |
| |
|
| | @classmethod |
| | def load(cls, path: str) -> "BPETokenizer": |
| | """Load tokenizer from file.""" |
| | with open(path) as f: |
| | data = json.load(f) |
| |
|
| | tokenizer = cls(vocab_size=data["vocab_size"]) |
| | tokenizer.vocab = data["vocab"] |
| | tokenizer.inverse_vocab = {int(v): k for k, v in data["vocab"].items()} |
| | tokenizer.merges = [tuple(m) for m in data["merges"]] |
| |
|
| | return tokenizer |
| |
|
| | def __len__(self): |
| | return len(self.vocab) |
| |
|
| |
|
| | def train_from_dataset(dataset_path: str, output_path: str = "tokenizer.json", vocab_size: int = 8000): |
| | """Train tokenizer from dataset file.""" |
| | print(f"Loading dataset from {dataset_path}") |
| |
|
| | texts = [] |
| | with open(dataset_path) as f: |
| | for line in f: |
| | data = json.loads(line) |
| | texts.append(data["prompt"]) |
| | texts.append(json.dumps(data["tool_calls"])) |
| |
|
| | print(f"Loaded {len(texts)} text samples") |
| |
|
| | tokenizer = BPETokenizer(vocab_size=vocab_size) |
| | tokenizer.train(texts, verbose=True) |
| | tokenizer.save(output_path) |
| |
|
| | |
| | print("\n=== Tokenization Tests ===") |
| | test_cases = [ |
| | "extract the invoice details", |
| | '{"tool": "maaza_extract_json", "params": {"text": "test"}}', |
| | "puppeteer_navigate to google.com", |
| | "The crypto_lookup tool failed with timeout", |
| | "retry with exponential backoff", |
| | ] |
| |
|
| | for text in test_cases: |
| | ids = tokenizer.encode(text) |
| | decoded = tokenizer.decode(ids) |
| | print(f"\nInput: '{text}'") |
| | print(f"Tokens: {ids}") |
| | print(f"Decoded: '{decoded}'") |
| | print(f"Length: {len(ids)} tokens") |
| |
|
| | |
| | print("\n=== Tool Token Verification ===") |
| | for tool in TOOL_TOKENS[:5]: |
| | ids = tokenizer.encode(tool) |
| | if len(ids) == 1: |
| | print(f"✓ {tool} = single token (ID: {ids[0]})") |
| | else: |
| | print(f"✗ {tool} = {len(ids)} tokens: {ids}") |
| |
|
| | return tokenizer |
| |
|
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser(description="Train custom BPE tokenizer") |
| | parser.add_argument("--input", required=True, help="Input dataset (JSONL)") |
| | parser.add_argument("--output", default="tokenizer.json", help="Output path") |
| | parser.add_argument("--vocab-size", type=int, default=8000, help="Vocabulary size") |
| |
|
| | args = parser.parse_args() |
| |
|
| | train_from_dataset( |
| | dataset_path=args.input, |
| | output_path=args.output, |
| | vocab_size=args.vocab_size |
| | ) |
| |
|
| | print(f"\n✓ Tokenizer trained and saved to {args.output}") |
| | print(f"Next step: python model.py") |
| |
|