Source code for lm_polygraph.utils.dataset

import os
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from datasets import load_dataset, Dataset as hf_dataset

from typing import Iterable, Tuple, List, Union, Optional


[docs]class Dataset: """ Seq2seq dataset for calculating quality of uncertainty estimation method. """ def __init__(self, x: List[str], y: List[str], batch_size: int): """ Parameters: x (List[str]): a list of input texts. y (List[str]): a list of output (target) texts. Must have the same length as `x`. batch_size (int): the size of the texts batch. """ self.x = x self.y = y self.batch_size = batch_size def __iter__(self) -> Iterable[Tuple[List[str], List[str]]]: """ Returns: Iterable[Tuple[List[str], List[str]]]: iterates over batches in dataset, returns list of input texts and list of corresponding output texts. """ for i in range(0, len(self.x), self.batch_size): yield ( self.x[i : i + self.batch_size], self.y[i : i + self.batch_size], ) def __len__(self) -> int: """ Returns: int: number of batches in the dataset. """ return (len(self.x) + self.batch_size - 1) // self.batch_size
[docs] def select(self, indices: List[int]): """ Shrinks the dataset down to only texts with the specified index. Parameters: indices (List[int]): indices to left in the dataset.Must have the same length as input texts. """ self.x = [self.x[i] for i in indices] self.y = [self.y[i] for i in indices] return self
[docs] def train_test_split(self, test_size: int, seed: int, split: str = "train"): """ Samples dataset into train and test parts. Parameters: test_size (int): size of test dataset, seed (int): seed to perform random splitting with, split (str): either 'train' or 'test'. If 'train', lefts only train data in the current dataset object. If 'test', left only test data. Default: 'train'. Returns: Tuple[List[str], List[str], List[str], List[str]]: train input and target texts list, test input and target texts list. """ X_train, X_test, y_train, y_test = train_test_split( np.array(self.x), np.array(self.y), test_size=test_size, random_state=seed, ) if split == "train": self.x = X_train.tolist() self.y = y_train.tolist() else: self.x = X_test.tolist() self.y = y_test.tolist() return ( X_train.tolist(), X_test.tolist(), y_train.tolist(), y_test.tolist(), )
[docs] def subsample(self, size: int, seed: int): """ Subsamples the dataset to the provided size. Parameters: size (int): size of the resulting dataset, seed (int): seed to perform random subsampling with. """ np.random.seed(seed) if len(self.x) < size: indices = list(range(len(self.x))) else: if size < 1: size = int(size * len(self.x)) indices = np.random.choice(len(self.x), size, replace=False) self.select(indices)
[docs] @staticmethod def from_csv( csv_path: str, x_column: str, y_column: str, batch_size: int, prompt: str = "", **kwargs, ): """ Creates the dataset from .CSV table. Parameters: csv_path (str): path to .csv table, x_column (str): name of column to take input texts from, y_column (str): name of column to take target texts from, batch_size (int): the size of the texts batch. """ csv = pd.read_csv(csv_path) x = csv[x_column].tolist() y = csv[y_column].tolist() if len(prompt): x = [prompt.format(text=text) for text in x] return Dataset(x, y, batch_size)
[docs] @staticmethod def load_hf_dataset( path: Union[str, List[str]], split: str, **kwargs, ): load_from_disk = kwargs.pop("load_from_disk", False) if load_from_disk: dataset_name = path dataset = hf_dataset.load_from_disk(path) elif isinstance(path, str): dataset_name = path dataset = load_dataset(path, split=split, **kwargs) else: dataset_name = path[0] dataset = load_dataset(*path, split=split, **kwargs) return dataset_name, dataset
[docs] @staticmethod def from_datasets( dataset_path: Union[str, List[str]], x_column: str, y_column: str, batch_size: int, prompt: str = "", description: str = "", mmlu_max_subject_size: int = 100, n_shot: int = 0, few_shot_split: str = "train", few_shot_prompt: Optional[str] = None, instruct: bool = False, split: str = "test", size: int = None, **kwargs, ): """ Creates the dataset from Huggingface datasets. Parameters: dataset_path (str): HF path to dataset, x_column (str): name of column to take input texts from, y_column (str): name of column to take target texts from, batch_size (int): the size of the texts batch, prompt (str): prompt template to use for input texts (default: ''), split (str): dataset split to take data from (default: 'text'), size (Optional[int]): size to subsample dataset to. If None, the full dataset split will be taken. Default: None. """ dataset_name, dataset = Dataset.load_hf_dataset(dataset_path, split, **kwargs) few_shot_dataset = None if n_shot > 0: _, few_shot_dataset = Dataset.load_hf_dataset( dataset_path, few_shot_split, **kwargs ) if size is not None and size < len(dataset): dataset = dataset.select(range(size)) if "translation" in dataset.column_names: x, y = [], [] source_lang = ( "German" if x_column == "de" else "French" if x_column == "fr" else "English" ) target_lang = ( "German" if y_column == "de" else "French" if y_column == "fr" else "English" ) for inst in dataset["translation"]: x.append( prompt.format( source_lang=source_lang, target_lang=target_lang, text=inst[x_column], ) ) y.append(inst[y_column]) elif ("xsum" in dataset_name.lower()) and len(prompt): x, y = [], [] for inst in dataset: x.append(prompt.format(text=inst[x_column])) y.append(inst[y_column]) elif ("wiki" in dataset_name.lower()) and len(prompt): x, y = [], [] for sample in dataset[x_column]: x.append(prompt.format(context=sample["context"].strip())) y.append("") elif "person" in dataset_name.lower(): x = dataset[x_column] if len(prompt): for i in range(len(x)): x[i] = prompt.format(text=x[i]) y = [] for _ in x: y.append("") elif ("coqa" in dataset_name.lower()) and len(prompt): def doc_to_text(doc, prompt, i=0): # Given a passage p, the conversation history {q1, a1, . . . qi−1, ai−1} # and a question qi, the task is to predict the answer ai doc_text = "" for q, a in zip(doc["questions"][:i], doc["answers"]["input_text"][:i]): doc_text += "\n\n" + prompt.format(question=q, answer=a) return doc_text x, y = [], [] for inst in dataset: formatted_description = description.format(story=inst["story"]) for j, (question, answer) in enumerate( zip(inst[x_column], inst[y_column]["input_text"]) ): if instruct: assert ( few_shot_prompt is not None ), "separate few_shot_prompt must be provided for instruction mode." few_shot_section = doc_to_text(inst, few_shot_prompt, j) if few_shot_section != "": few_shot_section = ( "\n\nHere are a few examples of questions and answers:" + few_shot_section + "\n\nNow answer the following question in the same format.\n\n" ) else: few_shot_section = "\n\n" else: few_shot_section = doc_to_text(inst, prompt, j) + "\n\n" formatted_prompt = ( formatted_description + few_shot_section + prompt.format( question=question, answer="", ) ) x.append(formatted_prompt) y.append(answer) elif ("babi_qa" in dataset_name.lower()) and len(prompt): x, y = [], [] for inst in dataset: inst = inst["story"] context = "" for text, answer in zip(inst[x_column], inst[y_column]): if answer == "": context += text + " " else: x.append(prompt.format(context=context.strip(), question=text)) y.append(answer) elif ("mmlu" in dataset_name.lower()) and len(prompt): answers = ["A", "B", "C", "D"] subjects = np.array(dataset["subject"]) few_shot_subjects = np.array(few_shot_dataset["subject"]) x, y = [], [] for subject in np.unique(subjects): formatted_description = description.format( subject=subject.replace("_", " ") ) if n_shot > 0: few_shot_subject = few_shot_dataset.select( np.argwhere(few_shot_subjects == subject).flatten() ) few_shot_ids = np.random.choice( len(few_shot_subject), n_shot, replace=False ) few_shot_data = few_shot_subject.select(few_shot_ids) if instruct: assert ( few_shot_prompt is not None ), "separate few_shot_prompt must be provided for instruction mode." formatted_few_shot_prompt = ( "Here are a few examples of questions and answers:\n\n" ) for inst in few_shot_data: formatted_few_shot_prompt += ( few_shot_prompt.format( choices=inst["choices"], question=inst["question"].strip(), answer=answers[inst["answer"]], ) + "\n\n" ) formatted_few_shot_prompt += ( "Now answer the following question in the same format:\n\n" ) else: formatted_few_shot_prompt = "" for inst in few_shot_data: formatted_few_shot_prompt += ( prompt.format( choices=inst["choices"], question=inst["question"].strip(), answer=answers[inst["answer"]], ) + "\n" ) subject_data = dataset.select( np.argwhere(subjects == subject).flatten() ) if len(subject_data) > mmlu_max_subject_size: subject_data = subject_data.select(range(mmlu_max_subject_size)) for inst in subject_data: formatted_prompt = prompt.format( choices=inst["choices"], question=inst["question"].strip(), answer="", ) x.append( formatted_description + "\n\n" + formatted_few_shot_prompt + formatted_prompt ) y.append(answers[inst[y_column]]) elif ("gsm8k" in dataset_name.lower()) and len(prompt): x, y = [], [] for inst in dataset: x.append(prompt.format(question=inst[x_column])) y.append(inst[y_column]) elif ("trivia_qa" in dataset_name.lower()) and len(prompt): x, y = [], [] formatted_few_shot_prompt = description if n_shot > 0: few_shot_ids = np.random.choice( len(few_shot_dataset), n_shot, replace=False ) few_shot_data = few_shot_dataset.select(few_shot_ids) if instruct: assert ( few_shot_prompt is not None ), "separate few_shot_prompt must be provided for instruction mode." formatted_few_shot_prompt += ( "\n\nHere are a few examples of questions and answers:\n\n" ) for inst in few_shot_data: formatted_few_shot_prompt += ( few_shot_prompt.format( question=inst["question"].strip(), answer=inst["answer"]["normalized_value"], ) + "\n\n" ) formatted_few_shot_prompt += ( "Now answer the following question in the same format:\n\n" ) else: formatted_few_shot_prompt = "" for inst in few_shot_data: formatted_few_shot_prompt += ( prompt.format( question=inst["question"].strip(), answer=inst["answer"]["normalized_value"], ) + "\n" ) else: formatted_few_shot_prompt += "\n" for inst in dataset: if instruct: x.append( formatted_few_shot_prompt + prompt.format(question=inst["question"]) ) else: x.append( formatted_few_shot_prompt + prompt.format(question=inst["question"], answer="") ) y.append([alias for alias in inst["answer"]["aliases"]]) elif "allenai/c4" in dataset_name.lower(): x, y = [], [] for inst in dataset: if len(inst[x_column]) <= 1024: x.append(inst[x_column]) y.append(inst[y_column]) else: x = dataset[x_column] y = dataset[y_column] return Dataset(x, y, batch_size)
[docs] @staticmethod def load(path_or_path_and_files: Union[str, List[str]], *args, **kwargs): """ Creates the dataset from either local .csv path (if such exists) or Huggingface datasets. See `from_csv` and `from_datasets` static functions for the description of *args and **kwargs arguments. Parameters: path_or_path_and_files (str or List[str]): local path to .csv table or HF path to dataset. """ if isinstance(path_or_path_and_files, str) and os.path.isfile( path_or_path_and_files ): return Dataset.from_csv(path_or_path_and_files, *args, **kwargs) return Dataset.from_datasets(path_or_path_and_files, *args, **kwargs)