import traceback
import logging
from typing import List, Dict
import numpy as np
import torch
import torch.nn as nn
from transformers import BartTokenizer, BartForConditionalGeneration
from .generation_metric import GenerationMetric
log = logging.getLogger(__name__)
SCORE_TYPES = ["rh"]
[docs]class BartScoreSeqMetric(GenerationMetric):
"""
Calculates BARTScore metric (https://arxiv.org/abs/2106.11520)
between model-generated texts and ground truth texts.
"""
def __init__(
self,
score_type: str = "rh",
device=None,
max_length=256,
checkpoint="facebook/bart-large-cnn",
):
assert score_type in SCORE_TYPES
self.score_type = score_type
self.model = None
self.max_length = max_length
self.checkpoint = checkpoint
self.device = device
if device is None:
self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
self.tokenizer = None
self.model = None
self.loss_fct = None
self.lsm = None
super().__init__(["greedy_texts", "input_texts"], "sequence")
def __str__(self):
return "BARTScoreSeq-" + self.score_type
def _setup(self):
self.tokenizer = BartTokenizer.from_pretrained(self.checkpoint)
self.model = BartForConditionalGeneration.from_pretrained(self.checkpoint)
self.model.eval()
self.model.to(self.device)
# Set up loss
self.loss_fct = nn.NLLLoss(
reduction="none", ignore_index=self.model.config.pad_token_id
)
self.lsm = nn.LogSoftmax(dim=1)
[docs] def load(self, path=None):
"""Load model from paraphrase finetuning"""
if path is None:
path = "models/bart.pth"
if self.model is None:
self._setup()
self.model.load_state_dict(torch.load(path, map_location=self.device))
[docs] def score(self, srcs, tgts, batch_size=4):
"""Score a batch of examples"""
if self.model is None:
self._setup()
score_list = []
for i in range(0, len(srcs), batch_size):
src_list = srcs[i : i + batch_size]
tgt_list = tgts[i : i + batch_size]
try:
with torch.no_grad():
encoded_src = self.tokenizer(
src_list,
max_length=self.max_length,
truncation=True,
padding=True,
return_tensors="pt",
)
encoded_tgt = self.tokenizer(
tgt_list,
max_length=self.max_length,
truncation=True,
padding=True,
return_tensors="pt",
)
src_tokens = encoded_src["input_ids"].to(self.device)
src_mask = encoded_src["attention_mask"].to(self.device)
tgt_tokens = encoded_tgt["input_ids"].to(self.device)
tgt_mask = encoded_tgt["attention_mask"]
tgt_len = tgt_mask.sum(dim=1).to(self.device)
output = self.model(
input_ids=src_tokens, attention_mask=src_mask, labels=tgt_tokens
)
logits = output.logits.view(-1, self.model.config.vocab_size)
loss = self.loss_fct(self.lsm(logits), tgt_tokens.view(-1))
loss = loss.view(tgt_tokens.shape[0], -1)
loss = loss.sum(dim=1) / tgt_len
curr_score_list = [-x.item() for x in loss]
score_list += curr_score_list
except RuntimeError:
traceback.print_exc()
log.error(f"source: {src_list}")
log.error(f"target: {tgt_list}")
exit(0)
return score_list
[docs] def test(self, batch_size=3):
"""Test"""
if self.model is None:
self._setup()
src_list = [
"This is a very good idea. Although simple, but very insightful.",
"Can I take a look?",
"Do not trust him, he is a liar.",
]
tgt_list = ["That's stupid.", "What's the problem?", "He is trustworthy."]
log.info(self.score(src_list, tgt_list, batch_size))
def __call__(
self,
stats: Dict[str, np.ndarray],
target_texts: List[str],
) -> np.ndarray:
"""
Calculates BARTScore(https://arxiv.org/abs/2106.11520) between
stats['greedy_texts'] and target_texts.
Parameters:
stats (Dict[str, np.ndarray]): input statistics, which for multiple samples includes:
* model-generated texts in 'greedy_texts'
target_texts (List[str]): ground-truth texts
Returns:
np.ndarray: list of BART Scores for each sample in input.
"""
if self.model is None:
self._setup()
scores = self.score(stats["greedy_texts"], target_texts)
return np.array(scores)