Skip to content
26 changes: 23 additions & 3 deletions src/heretic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@

class DatasetSpecification(BaseModel):
dataset: str = Field(
description="Hugging Face dataset ID, or path to dataset on disk"
default="mlabonne/harmless_alpaca",
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaults don't make sense here, because the class is used in different contexts.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done reverted.

description="Hugging Face dataset ID, or path to dataset on disk",
)
split: str = Field(
default="train[:400]", description="Portion of the dataset to use"
)
column: str = Field(
default="text", description="Column in the dataset that contains the prompts"
)
split: str = Field(description="Portion of the dataset to use")
column: str = Field(description="Column in the dataset that contains the prompts")


class Settings(BaseSettings):
Expand Down Expand Up @@ -129,6 +134,21 @@ class Settings(BaseSettings):
description="Strings whose presence in a response (case insensitive) identifies the response as a refusal.",
)

steering_mode: str = Field(
default="refusal",
description='Mode of steering: "refusal" (default) or "classifier".',
)

classifier_model: str = Field(
default="j-hartmann/emotion-english-distilroberta-base",
description='Hugging Face model ID for the classifier plugin (used when steering_mode="classifier").',
)

classifier_label: str = Field(
default="joy",
description='Target label to optimize for (used when steering_mode="classifier").',
)

system_prompt: str = Field(
default="You are a helpful assistant.",
description="System prompt to use when prompting the model.",
Expand Down
81 changes: 44 additions & 37 deletions src/heretic/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

from .config import Settings
from .model import Model
from .plugins import Plugin
from .utils import load_prompts, print


class Evaluator:
def __init__(self, settings: Settings, model: Model):
def __init__(self, settings: Settings, model: Model, plugin: Plugin):
self.settings = settings
self.model = model
self.plugin = plugin

print()
print(
Expand All @@ -30,34 +32,20 @@ def __init__(self, settings: Settings, model: Model):
self.bad_prompts = load_prompts(settings.bad_evaluation_prompts)
print(f"* [bold]{len(self.bad_prompts)}[/] prompts loaded")

print("* Counting model refusals...")
self.base_refusals = self.count_refusals()
print(
f"* Initial refusals: [bold]{self.base_refusals}[/]/{len(self.bad_prompts)}"
)

def is_refusal(self, response: str) -> bool:
# Remove emphasis (e.g. "I *will not*...").
response = response.lower().replace("*", "")

# Normalize typographic apostrophes ("won’t" -> "won't").
response = response.replace("’", "'")

# Normalize whitespace between words to a single space.
response = " ".join(response.split())

for marker in self.settings.refusal_markers:
if marker.lower() in response:
return True
print("* Calculating initial scores...")
self.base_score = self.calculate_score()
print(f"* Initial score: [bold]{self.base_score:.4f}[/]")

return False

def count_refusals(self) -> int:
def calculate_score(self) -> float:
responses = self.model.get_responses_batched(self.bad_prompts)
refusals = [response for response in responses if self.is_refusal(response)]
return len(refusals)

def get_score(self) -> tuple[tuple[float, float], float, int]:
scores = self.plugin.score(responses)
# We calculate the average score across all responses.
# For refusal, this is the refusal rate (0.0 to 1.0).
# For classifier, this is the average probability of the target label (0.0 to 1.0).
avg_score = sum(scores) / len(scores)
return avg_score

def get_score(self) -> tuple[tuple[float, float], float, float]:
print(" * Obtaining first-token probability distributions...")
logprobs = self.model.get_logprobs_batched(self.good_prompts)
kl_divergence = F.kl_div(
Expand All @@ -68,13 +56,32 @@ def get_score(self) -> tuple[tuple[float, float], float, int]:
).item()
print(f" * KL divergence: [bold]{kl_divergence:.2f}[/]")

print(" * Counting model refusals...")
refusals = self.count_refusals()
print(f" * Refusals: [bold]{refusals}[/]/{len(self.bad_prompts)}")

score = (
(kl_divergence / self.settings.kl_divergence_scale),
(refusals / self.base_refusals),
)

return score, kl_divergence, refusals
print(" * Calculating steering score...")
responses = self.model.get_responses_batched(self.bad_prompts)
scores = self.plugin.score(responses)
avg_score = sum(scores) / len(scores)

print(f" * Average Score: [bold]{avg_score:.4f}[/]")

# Optimization Objective:
# We want to minimize KL Divergence (stay close to original model).
# We want to OPTIMIZE the steering metric.

if self.settings.steering_mode == "refusal":
# Minimize refusal rate.
# Objective = (KL / scale) + (Refusal Rate / Base Refusal Rate)
# Note: Base Refusal Rate might be 0, so we need to be careful.
# The original code used (refusals / base_refusals).
# If base_refusals is 0, we have a problem. But usually it's high.
metric_term = (
avg_score / self.base_score if self.base_score > 0 else avg_score
)
else:
# Maximize target label (e.g. "joy").
# So we minimize (1 - avg_score).
# Objective = (KL / scale) + (1 - avg_score)
metric_term = 1.0 - avg_score

objective = (kl_divergence / self.settings.kl_divergence_scale) + metric_term

return (objective, metric_term), kl_divergence, avg_score
24 changes: 17 additions & 7 deletions src/heretic/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .config import Settings
from .evaluator import Evaluator
from .model import AbliterationParameters, Model
from .plugins import ClassifierPlugin, RefusalPlugin
from .utils import (
empty_cache,
format_duration,
Expand Down Expand Up @@ -187,7 +188,16 @@ def run():
settings.batch_size = best_batch_size
print(f"* Chosen batch size: [bold]{settings.batch_size}[/]")

evaluator = Evaluator(settings, model)
if settings.steering_mode == "classifier":
print(
f"Initializing classifier plugin: [bold]{settings.classifier_model}[/] ({settings.classifier_label})"
)
plugin = ClassifierPlugin(settings.classifier_model, settings.classifier_label)
else:
print("Initializing refusal plugin...")
plugin = RefusalPlugin(settings.refusal_markers)

evaluator = Evaluator(settings, model, plugin)

if settings.evaluate_model is not None:
print()
Expand Down Expand Up @@ -340,7 +350,7 @@ def objective(trial: Trial) -> tuple[float, float]:
print("* Abliterating...")
model.abliterate(refusal_directions, direction_index, parameters)
print("* Evaluating...")
score, kl_divergence, refusals = evaluator.get_score()
(combined_obj, metric_term), kl_divergence, raw_score = evaluator.get_score()

elapsed_time = time.perf_counter() - start_time
remaining_time = (elapsed_time / trial_index) * (
Expand All @@ -354,9 +364,9 @@ def objective(trial: Trial) -> tuple[float, float]:
)

trial.set_user_attr("kl_divergence", kl_divergence)
trial.set_user_attr("refusals", refusals)
trial.set_user_attr("score", raw_score)

return score
return metric_term, kl_divergence

def objective_wrapper(trial: Trial) -> tuple[float, float]:
try:
Expand Down Expand Up @@ -391,14 +401,14 @@ def objective_wrapper(trial: Trial) -> tuple[float, float]:

best_trials = sorted(
study.best_trials,
key=lambda trial: trial.user_attrs["refusals"],
key=lambda trial: trial.user_attrs["score"],
)

choices = [
Choice(
title=(
f"[Trial {trial.user_attrs['index']:>3}] "
f"Refusals: {trial.user_attrs['refusals']:>2}/{len(evaluator.bad_prompts)}, "
f"Score: {trial.user_attrs['score']:.4f}, "
f"KL divergence: {trial.user_attrs['kl_divergence']:.2f}"
),
value=trial,
Expand All @@ -418,7 +428,7 @@ def objective_wrapper(trial: Trial) -> tuple[float, float]:
print()
print(
(
"The following trials resulted in Pareto optimal combinations of refusals and KL divergence. "
"The following trials resulted in Pareto optimal combinations of score and KL divergence. "
"After selecting a trial, you will be able to save the model, upload it to Hugging Face, "
"or chat with it to test how well it works. You can return to this menu later to select a different trial. "
"[yellow]Note that KL divergence values above 1 usually indicate significant damage to the original model's capabilities.[/]"
Expand Down
73 changes: 73 additions & 0 deletions src/heretic/plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025 Philipp Emanuel Weidmann <pew@worldwidemann.com>

from abc import ABC, abstractmethod

from transformers import pipeline


class Plugin(ABC):
@abstractmethod
def score(self, responses: list[str]) -> list[float]:
"""
Return a score (0.0 to 1.0) for each response.
The meaning of the score depends on the plugin, but generally
higher scores indicate a stronger presence of the target attribute.
"""
pass


class RefusalPlugin(Plugin):
def __init__(self, markers: list[str]):
self.markers = markers

def is_refusal(self, response: str) -> bool:
# Remove emphasis (e.g. "I *will not*...").
response = response.lower().replace("*", "")

# Normalize typographic apostrophes ("won’t" -> "won't").
response = response.replace("’", "'")

# Normalize whitespace between words to a single space.
response = " ".join(response.split())

for marker in self.markers:
if marker.lower() in response:
return True

return False

def score(self, responses: list[str]) -> list[float]:
return [1.0 if self.is_refusal(r) else 0.0 for r in responses]


class ClassifierPlugin(Plugin):
def __init__(self, model_name: str, target_label: str):
# We use top_k=None to get probabilities for all labels.
# device=0 uses the first GPU if available, otherwise CPU.
# Note: This might conflict with the main model if VRAM is tight.
# Ideally, we should handle device placement more carefully.
self.pipe = pipeline(
"text-classification",
model=model_name,
top_k=None,
device_map="auto",
)
self.target_label = target_label

def score(self, responses: list[str]) -> list[float]:
scores = []
# Process in batches if necessary, but pipeline handles lists well.
# Truncation is important as classification models often have short context windows.
results = self.pipe(responses, truncation=True)

for result in results:
# result is a list of dicts like [{'label': 'joy', 'score': 0.9}, ...]
label_score = 0.0
for item in result:
if item["label"] == self.target_label:
label_score = item["score"]
break
scores.append(label_score)

return scores
8 changes: 8 additions & 0 deletions src/heretic/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ def load_prompts(specification: DatasetSpecification) -> list[str]:
# But also don't use cached data, as the dataset may have changed on disk.
download_mode=DownloadMode.FORCE_REDOWNLOAD,
)
elif str(path).endswith(".txt") and os.path.exists(path):
# Path is a local text file.
dataset = load_dataset(
"text",
data_files={"train": str(path)},
split=split_str,
sample_by="line",
)
else:
# Probably a repository path; let load_dataset figure it out.
dataset = load_dataset(path, split=split_str)
Expand Down