Coverage for transformer_lens/HookedEncoder.py: 77%
222 statements
« prev ^ index » next coverage.py v7.6.1, created at 2026-03-24 16:35 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2026-03-24 16:35 +0000
1"""Hooked Encoder.
3Contains a BERT style model. This is separate from :class:`transformer_lens.HookedTransformer`
4because it has a significantly different architecture to e.g. GPT style transformers.
5"""
7from __future__ import annotations
9import logging
10import os
11from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union, overload
13import torch
14import torch.nn as nn
15from einops import repeat
16from jaxtyping import Float, Int
17from transformers.models.auto.tokenization_auto import AutoTokenizer
18from typing_extensions import Literal
20import transformer_lens.loading_from_pretrained as loading
21from transformer_lens.ActivationCache import ActivationCache
22from transformer_lens.components import (
23 MLP,
24 Attention,
25 BertBlock,
26 BertEmbed,
27 BertMLMHead,
28 BertNSPHead,
29 BertPooler,
30 Unembed,
31)
32from transformer_lens.FactoredMatrix import FactoredMatrix
33from transformer_lens.hook_points import HookedRootModule, HookPoint
34from transformer_lens.HookedTransformerConfig import HookedTransformerConfig
35from transformer_lens.utilities import devices
37T = TypeVar("T", bound="HookedEncoder")
40class HookedEncoder(HookedRootModule):
41 """
42 This class implements a BERT-style encoder using the components in ./components.py, with HookPoints on every interesting activation. It inherits from HookedRootModule.
44 Limitations:
45 - The model does not include dropouts, which may lead to inconsistent results from training or fine-tuning.
47 Like HookedTransformer, it can have a pretrained Transformer's weights loaded via `.from_pretrained`. There are a few features you might know from HookedTransformer which are not yet supported:
48 - There is no preprocessing (e.g. LayerNorm folding) when loading a pretrained model
49 """
51 def __init__(
52 self,
53 cfg: Union[HookedTransformerConfig, Dict],
54 tokenizer: Optional[Any] = None,
55 move_to_device: bool = True,
56 **kwargs: Any,
57 ):
58 super().__init__()
59 if isinstance(cfg, Dict): 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 cfg = HookedTransformerConfig(**cfg)
61 elif isinstance(cfg, str): 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 raise ValueError(
63 "Please pass in a config dictionary or HookedTransformerConfig object. If you want to load a pretrained model, use HookedEncoder.from_pretrained() instead."
64 )
65 self.cfg = cfg
67 assert self.cfg.n_devices == 1, "Multiple devices not supported for HookedEncoder"
68 if tokenizer is not None:
69 self.tokenizer = tokenizer
70 elif self.cfg.tokenizer_name is not None:
71 huggingface_token = os.environ.get("HF_TOKEN", "")
72 self.tokenizer = AutoTokenizer.from_pretrained(
73 self.cfg.tokenizer_name,
74 token=huggingface_token if len(huggingface_token) > 0 else None,
75 )
76 else:
77 self.tokenizer = None
79 if self.cfg.d_vocab == -1:
80 # If we have a tokenizer, vocab size can be inferred from it.
81 assert self.tokenizer is not None, "Must provide a tokenizer if d_vocab is not provided"
82 self.cfg.d_vocab = max(self.tokenizer.vocab.values()) + 1
83 if self.cfg.d_vocab_out == -1:
84 self.cfg.d_vocab_out = self.cfg.d_vocab
86 self.embed = BertEmbed(self.cfg)
87 self.blocks = nn.ModuleList([BertBlock(self.cfg) for _ in range(self.cfg.n_layers)])
88 self.mlm_head = BertMLMHead(self.cfg)
89 self.unembed = Unembed(self.cfg)
90 self.nsp_head = BertNSPHead(self.cfg)
91 self.pooler = BertPooler(self.cfg)
93 self.hook_full_embed = HookPoint()
95 if move_to_device:
96 if self.cfg.device is None: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true
97 raise ValueError("Cannot move to device when device is None")
98 self.to(self.cfg.device)
100 self.setup()
102 def to_tokens(
103 self,
104 input: Union[str, List[str]],
105 move_to_device: bool = True,
106 truncate: bool = True,
107 ) -> Tuple[
108 Int[torch.Tensor, "batch pos"],
109 Int[torch.Tensor, "batch pos"],
110 Int[torch.Tensor, "batch pos"],
111 ]:
112 """Converts a string to a tensor of tokens.
113 Taken mostly from the HookedTransformer implementation, but does not support default padding
114 sides or prepend_bos.
115 Args:
116 input (Union[str, List[str]]): The input to tokenize.
117 move_to_device (bool): Whether to move the output tensor of tokens to the device the model lives on. Defaults to True
118 truncate (bool): If the output tokens are too long, whether to truncate the output
119 tokens to the model's max context window. Does nothing for shorter inputs. Defaults to
120 True.
121 """
123 assert self.tokenizer is not None, "Cannot use to_tokens without a tokenizer"
125 encodings = self.tokenizer(
126 input,
127 return_tensors="pt",
128 padding=True,
129 truncation=truncate,
130 max_length=self.cfg.n_ctx if truncate else None,
131 )
133 tokens = encodings.input_ids
134 token_type_ids = encodings.token_type_ids
135 attention_mask = encodings.attention_mask
137 if move_to_device: 137 ↛ 142line 137 didn't jump to line 142 because the condition on line 137 was always true
138 tokens = tokens.to(self.cfg.device)
139 token_type_ids = token_type_ids.to(self.cfg.device)
140 attention_mask = attention_mask.to(self.cfg.device)
142 return tokens, token_type_ids, attention_mask
144 def encoder_output(
145 self,
146 tokens: Int[torch.Tensor, "batch pos"],
147 token_type_ids: Optional[Int[torch.Tensor, "batch pos"]] = None,
148 one_zero_attention_mask: Optional[Int[torch.Tensor, "batch pos"]] = None,
149 ) -> Float[torch.Tensor, "batch pos d_vocab"]:
150 """Processes input through the encoder layers and returns the resulting residual stream.
152 Args:
153 input: Input tokens as integers with shape (batch, position)
154 token_type_ids: Optional binary ids indicating segment membership.
155 Shape (batch_size, sequence_length). For example, with input
156 "[CLS] Sentence A [SEP] Sentence B [SEP]", token_type_ids would be
157 [0, 0, ..., 0, 1, ..., 1, 1] where 0 marks tokens from sentence A
158 and 1 marks tokens from sentence B.
159 one_zero_attention_mask: Optional binary mask of shape (batch_size, sequence_length)
160 where 1 indicates tokens to attend to and 0 indicates tokens to ignore.
161 Used primarily for handling padding in batched inputs.
163 Returns:
164 resid: Final residual stream tensor of shape (batch, position, d_model)
166 Raises:
167 AssertionError: If using string input without a tokenizer
168 """
170 if tokens.device.type != self.cfg.device: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 tokens = tokens.to(self.cfg.device)
172 if one_zero_attention_mask is not None:
173 one_zero_attention_mask = one_zero_attention_mask.to(self.cfg.device)
175 resid = self.hook_full_embed(self.embed(tokens, token_type_ids))
177 large_negative_number = -torch.inf
178 mask = (
179 repeat(1 - one_zero_attention_mask, "batch pos -> batch 1 1 pos")
180 if one_zero_attention_mask is not None
181 else None
182 )
183 additive_attention_mask = (
184 torch.where(mask == 1, large_negative_number, 0) if mask is not None else None
185 )
187 for block in self.blocks:
188 resid = block(resid, additive_attention_mask)
190 return resid
192 @overload
193 def forward(
194 self,
195 input: Union[
196 str,
197 List[str],
198 Int[torch.Tensor, "batch pos"],
199 ],
200 return_type: Union[Literal["logits"], Literal["predictions"]],
201 token_type_ids: Optional[Int[torch.Tensor, "batch pos"]] = None,
202 one_zero_attention_mask: Optional[Int[torch.Tensor, "batch pos"]] = None,
203 ) -> Union[Float[torch.Tensor, "batch pos d_vocab"], str, List[str]]:
204 ...
206 @overload
207 def forward(
208 self,
209 input: Union[
210 str,
211 List[str],
212 Int[torch.Tensor, "batch pos"],
213 ],
214 return_type: Literal[None],
215 token_type_ids: Optional[Int[torch.Tensor, "batch pos"]] = None,
216 one_zero_attention_mask: Optional[Int[torch.Tensor, "batch pos"]] = None,
217 ) -> Optional[Union[Float[torch.Tensor, "batch pos d_vocab"], str, List[str]]]:
218 ...
220 def forward(
221 self,
222 input: Union[
223 str,
224 List[str],
225 Int[torch.Tensor, "batch pos"],
226 ],
227 return_type: Optional[Union[Literal["logits"], Literal["predictions"]]] = "logits",
228 token_type_ids: Optional[Int[torch.Tensor, "batch pos"]] = None,
229 one_zero_attention_mask: Optional[Int[torch.Tensor, "batch pos"]] = None,
230 ) -> Optional[Union[Float[torch.Tensor, "batch pos d_vocab"], str, List[str]]]:
231 """Forward pass through the HookedEncoder. Performs Masked Language Modelling on the given input.
233 Args:
234 input: The input to process. Can be one of:
235 - str: A single text string
236 - List[str]: A list of text strings
237 - torch.Tensor: Input tokens as integers with shape (batch, position)
238 return_type: Optional[str]: The type of output to return. Can be one of:
239 - None: Return nothing, don't calculate logits
240 - 'logits': Return logits tensor
241 - 'predictions': Return human-readable predictions
242 token_type_ids: Optional[torch.Tensor]: Binary ids indicating whether a token belongs
243 to sequence A or B. For example, for two sentences:
244 "[CLS] Sentence A [SEP] Sentence B [SEP]", token_type_ids would be
245 [0, 0, ..., 0, 1, ..., 1, 1]. `0` represents tokens from Sentence A,
246 `1` from Sentence B. If not provided, BERT assumes a single sequence input.
247 This parameter gets inferred from the the tokenizer if input is a string or list of strings.
248 Shape is (batch_size, sequence_length).
249 one_zero_attention_mask: Optional[torch.Tensor]: A binary mask which indicates
250 which tokens should be attended to (1) and which should be ignored (0).
251 Primarily used for padding variable-length sentences in a batch.
252 For instance, in a batch with sentences of differing lengths, shorter
253 sentences are padded with 0s on the right. If not provided, the model
254 assumes all tokens should be attended to.
255 This parameter gets inferred from the tokenizer if input is a string or list of strings.
256 Shape is (batch_size, sequence_length).
258 Returns:
259 Optional[torch.Tensor]: Depending on return_type:
260 - None: Returns None if return_type is None
261 - torch.Tensor: Returns logits if return_type is 'logits' (or if return_type is not explicitly provided)
262 - Shape is (batch_size, sequence_length, d_vocab)
263 - str or List[str]: Returns predicted words for masked tokens if return_type is 'predictions'.
264 Returns a list of strings if input is a list of strings, otherwise a single string.
266 Raises:
267 AssertionError: If using string input without a tokenizer
268 """
270 if isinstance(input, str) or isinstance(input, list):
271 assert self.tokenizer is not None, "Must provide a tokenizer if input is a string"
272 tokens, token_type_ids_from_tokenizer, attention_mask = self.to_tokens(input)
274 # If token_type_ids or attention mask are not provided, use the ones from the tokenizer
275 token_type_ids = (
276 token_type_ids_from_tokenizer if token_type_ids is None else token_type_ids
277 )
278 one_zero_attention_mask = (
279 attention_mask if one_zero_attention_mask is None else one_zero_attention_mask
280 )
282 else:
283 tokens = input
285 resid = self.encoder_output(tokens, token_type_ids, one_zero_attention_mask)
287 # MLM requires an unembedding step
288 resid = self.mlm_head(resid)
289 logits = self.unembed(resid)
291 if return_type == "predictions":
292 assert (
293 self.tokenizer is not None
294 ), "Must have a tokenizer to use return_type='predictions'"
295 # Get predictions for masked tokens
296 logprobs = logits[tokens == self.tokenizer.mask_token_id].log_softmax(dim=-1)
297 predictions = self.tokenizer.decode(logprobs.argmax(dim=-1))
299 # If input was a list of strings, split predictions into a list
300 if " " in predictions: 300 ↛ 302line 300 didn't jump to line 302 because the condition on line 300 was never true
301 # Split along space
302 predictions = predictions.split(" ")
303 predictions = [f"Prediction {i}: {p}" for i, p in enumerate(predictions)]
304 return predictions
306 elif return_type == None: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 return None
309 return logits
311 @overload
312 def run_with_cache(
313 self, *model_args: Any, return_cache_object: Literal[True] = True, **kwargs: Any
314 ) -> Tuple[Float[torch.Tensor, "batch pos d_vocab"], ActivationCache]:
315 ...
317 @overload
318 def run_with_cache(
319 self, *model_args: Any, return_cache_object: Literal[False], **kwargs: Any
320 ) -> Tuple[Float[torch.Tensor, "batch pos d_vocab"], Dict[str, torch.Tensor]]:
321 ...
323 def run_with_cache(
324 self,
325 *model_args: Any,
326 return_cache_object: bool = True,
327 remove_batch_dim: bool = False,
328 **kwargs: Any,
329 ) -> Tuple[
330 Float[torch.Tensor, "batch pos d_vocab"],
331 Union[ActivationCache, Dict[str, torch.Tensor]],
332 ]:
333 """
334 Wrapper around run_with_cache in HookedRootModule. If return_cache_object is True, this will return an ActivationCache object, with a bunch of useful HookedTransformer specific methods, otherwise it will return a dictionary of activations as in HookedRootModule. This function was copied directly from HookedTransformer.
335 """
336 out, cache_dict = super().run_with_cache(
337 *model_args, remove_batch_dim=remove_batch_dim, **kwargs
338 )
339 if return_cache_object: 339 ↛ 343line 339 didn't jump to line 343 because the condition on line 339 was always true
340 cache = ActivationCache(cache_dict, self, has_batch_dim=not remove_batch_dim)
341 return out, cache
342 else:
343 return out, cache_dict
345 def to( # type: ignore
346 self,
347 device_or_dtype: Union[torch.device, str, torch.dtype],
348 print_details: bool = True,
349 ):
350 return devices.move_to_and_update_config(self, device_or_dtype, print_details)
352 def cuda(self: T, device: Optional[Union[int, torch.device]] = None) -> T:
353 if isinstance(device, int):
354 return self.to(f"cuda:{device}")
355 elif device is None:
356 return self.to("cuda")
357 else:
358 return self.to(device)
360 def cpu(self: T) -> T:
361 return self.to("cpu")
363 def mps(self: T) -> T:
364 """Warning: MPS may produce silently incorrect results. See #1178."""
365 return self.to(torch.device("mps"))
367 @classmethod
368 def from_pretrained(
369 cls,
370 model_name: str,
371 checkpoint_index: Optional[int] = None,
372 checkpoint_value: Optional[int] = None,
373 hf_model: Optional[Any] = None,
374 device: Optional[str] = None,
375 tokenizer: Optional[Any] = None,
376 move_to_device: bool = True,
377 dtype: torch.dtype = torch.float32,
378 **from_pretrained_kwargs: Any,
379 ) -> HookedEncoder:
380 """Loads in the pretrained weights from huggingface. Currently supports loading weight from HuggingFace BertForMaskedLM. Unlike HookedTransformer, this does not yet do any preprocessing on the model."""
381 logging.warning(
382 "Support for BERT in TransformerLens is currently experimental, until such a time when it has feature "
383 "parity with HookedTransformer and has been tested on real research tasks. Until then, backward "
384 "compatibility is not guaranteed. Please see the docs for information on the limitations of the current "
385 "implementation."
386 "\n"
387 "If using BERT for interpretability research, keep in mind that BERT has some significant architectural "
388 "differences to GPT. For example, LayerNorms are applied *after* the attention and MLP components, meaning "
389 "that the last LayerNorm in a block cannot be folded."
390 )
392 assert not (
393 from_pretrained_kwargs.get("load_in_8bit", False)
394 or from_pretrained_kwargs.get("load_in_4bit", False)
395 ), "Quantization not supported"
397 if "torch_dtype" in from_pretrained_kwargs: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 dtype = from_pretrained_kwargs["torch_dtype"]
400 official_model_name = loading.get_official_model_name(model_name)
402 cfg = loading.get_pretrained_model_config(
403 official_model_name,
404 checkpoint_index=checkpoint_index,
405 checkpoint_value=checkpoint_value,
406 fold_ln=False,
407 device=device,
408 n_devices=1,
409 dtype=dtype,
410 **from_pretrained_kwargs,
411 )
413 state_dict = loading.get_pretrained_state_dict(
414 official_model_name, cfg, hf_model, dtype=dtype, **from_pretrained_kwargs
415 )
417 model = cls(cfg, tokenizer, move_to_device=False)
419 model.load_state_dict(state_dict, strict=False)
421 if move_to_device: 421 ↛ 424line 421 didn't jump to line 424 because the condition on line 421 was always true
422 model.to(cfg.device)
424 print(f"Loaded pretrained model {model_name} into HookedEncoder")
426 return model
428 @property
429 def W_U(self) -> Float[torch.Tensor, "d_model d_vocab"]:
430 """
431 Convenience to get the unembedding matrix (ie the linear map from the final residual stream to the output logits)
432 """
433 return self.unembed.W_U
435 @property
436 def b_U(self) -> Float[torch.Tensor, "d_vocab"]:
437 """
438 Convenience to get the unembedding bias
439 """
440 return self.unembed.b_U
442 @property
443 def W_E(self) -> Float[torch.Tensor, "d_vocab d_model"]:
444 """
445 Convenience to get the embedding matrix
446 """
447 return self.embed.embed.W_E
449 @property
450 def W_pos(self) -> Float[torch.Tensor, "n_ctx d_model"]:
451 """
452 Convenience function to get the positional embedding. Only works on models with absolute positional embeddings!
453 """
454 return self.embed.pos_embed.W_pos
456 @property
457 def W_E_pos(self) -> Float[torch.Tensor, "d_vocab+n_ctx d_model"]:
458 """
459 Concatenated W_E and W_pos. Used as a full (overcomplete) basis of the input space, useful for full QK and full OV circuits.
460 """
461 return torch.cat([self.W_E, self.W_pos], dim=0)
463 @property
464 def W_K(self) -> Float[torch.Tensor, "n_layers n_heads d_model d_head"]:
465 """Stacks the key weights across all layers"""
466 for block in self.blocks:
467 assert isinstance(block.attn, Attention)
468 return torch.stack([block.attn.W_K for block in self.blocks], dim=0)
470 @property
471 def W_Q(self) -> Float[torch.Tensor, "n_layers n_heads d_model d_head"]:
472 """Stacks the query weights across all layers"""
473 for block in self.blocks:
474 assert isinstance(block.attn, Attention)
475 return torch.stack([block.attn.W_Q for block in self.blocks], dim=0)
477 @property
478 def W_V(self) -> Float[torch.Tensor, "n_layers n_heads d_model d_head"]:
479 """Stacks the value weights across all layers"""
480 for block in self.blocks:
481 assert isinstance(block.attn, Attention)
482 return torch.stack([block.attn.W_V for block in self.blocks], dim=0)
484 @property
485 def W_O(self) -> Float[torch.Tensor, "n_layers n_heads d_head d_model"]:
486 """Stacks the attn output weights across all layers"""
487 for block in self.blocks:
488 assert isinstance(block.attn, Attention)
489 return torch.stack([block.attn.W_O for block in self.blocks], dim=0)
491 @property
492 def W_in(self) -> Float[torch.Tensor, "n_layers d_model d_mlp"]:
493 """Stacks the MLP input weights across all layers"""
494 for block in self.blocks:
495 assert isinstance(block.mlp, MLP)
496 return torch.stack([block.mlp.W_in for block in self.blocks], dim=0)
498 @property
499 def W_out(self) -> Float[torch.Tensor, "n_layers d_mlp d_model"]:
500 """Stacks the MLP output weights across all layers"""
501 for block in self.blocks:
502 assert isinstance(block.mlp, MLP)
503 return torch.stack([block.mlp.W_out for block in self.blocks], dim=0)
505 @property
506 def b_K(self) -> Float[torch.Tensor, "n_layers n_heads d_head"]:
507 """Stacks the key biases across all layers"""
508 for block in self.blocks:
509 assert isinstance(block.attn, Attention)
510 return torch.stack([block.attn.b_K for block in self.blocks], dim=0)
512 @property
513 def b_Q(self) -> Float[torch.Tensor, "n_layers n_heads d_head"]:
514 """Stacks the query biases across all layers"""
515 for block in self.blocks:
516 assert isinstance(block.attn, Attention)
517 return torch.stack([block.attn.b_Q for block in self.blocks], dim=0)
519 @property
520 def b_V(self) -> Float[torch.Tensor, "n_layers n_heads d_head"]:
521 """Stacks the value biases across all layers"""
522 for block in self.blocks:
523 assert isinstance(block.attn, Attention)
524 return torch.stack([block.attn.b_V for block in self.blocks], dim=0)
526 @property
527 def b_O(self) -> Float[torch.Tensor, "n_layers d_model"]:
528 """Stacks the attn output biases across all layers"""
529 for block in self.blocks:
530 assert isinstance(block.attn, Attention)
531 return torch.stack([block.attn.b_O for block in self.blocks], dim=0)
533 @property
534 def b_in(self) -> Float[torch.Tensor, "n_layers d_mlp"]:
535 """Stacks the MLP input biases across all layers"""
536 for block in self.blocks:
537 assert isinstance(block.mlp, MLP)
538 return torch.stack([block.mlp.b_in for block in self.blocks], dim=0)
540 @property
541 def b_out(self) -> Float[torch.Tensor, "n_layers d_model"]:
542 """Stacks the MLP output biases across all layers"""
543 for block in self.blocks:
544 assert isinstance(block.mlp, MLP)
545 return torch.stack([block.mlp.b_out for block in self.blocks], dim=0)
547 @property
548 def QK(self) -> FactoredMatrix: # [n_layers, n_heads, d_model, d_model]
549 """Returns a FactoredMatrix object with the product of the Q and K matrices for each layer and head.
550 Useful for visualizing attention patterns."""
551 return FactoredMatrix(self.W_Q, self.W_K.transpose(-2, -1))
553 @property
554 def OV(self) -> FactoredMatrix: # [n_layers, n_heads, d_model, d_model]
555 """Returns a FactoredMatrix object with the product of the O and V matrices for each layer and head."""
556 return FactoredMatrix(self.W_V, self.W_O)
558 def all_head_labels(self) -> List[str]:
559 """Returns a list of strings with the format "L{l}H{h}", where l is the layer index and h is the head index."""
560 return [f"L{l}H{h}" for l in range(self.cfg.n_layers) for h in range(self.cfg.n_heads)]