import argparse from functools import partial from itertools import chain from typing import Dict, Tuple import datasets import pytorch_lightning as pl import torch from torch.utils.data import ConcatDataset, DataLoader, Dataset from transformers import ( BatchEncoding, DefaultDataCollator, PreTrainedTokenizer, set_seed, ) from modelscope import snapshot_download from lit_module import LitModule from tokenization_qwen import QWenTokenizer model_name = "qwen/Qwen-1_8B-Chat" learning_rate = 0.0001 use_tril_attention_mask = None precision = "32-true" # "precision:bf16-mixed,16-mixed,32-true" tokenizer_name_or_path = None dataset_name = ["/home/colin/develop/dataset/liwu/MNBVC/wiki"] dataset_name = ["/home/colin/develop/dataset/liwu/MNBVC/wiki/20230198/58.jsonl.gz"] train_batch_size = 256 val_batch_size = 16 num_proc = 8 max_epochs = 1000 strategy = "fsdp" resume_from_ckpt_path = None seed = 42 class SpecialDataset(Dataset): def __init__(self, start, end, size=65536): self.size = size self.features = [] a = torch.randint(start, end, [size]) self.data = torch.stack([a, a * 2, a * 3, a * 4]).permute(1, 0) def __len__(self): return self.size def __getitem__(self, idx): output = {} data = self.data[idx] output["input_ids"] = data output["labels"] = data output["token_type_ids"] = torch.zeros(data.shape) return output def split_raw_dataset( raw_dataset: datasets.DatasetDict, ) -> Tuple[datasets.Dataset, datasets.Dataset]: if "validation" in raw_dataset: train_dataset, val_dataset = raw_dataset["train"], raw_dataset["validation"] else: raw_dataset = raw_dataset["train"].train_test_split(test_size=0.05, seed=seed) train_dataset, val_dataset = raw_dataset["train"], raw_dataset["test"] return train_dataset, val_dataset def process_dataset(dataset: datasets.Dataset, tokenizer: PreTrainedTokenizer) -> datasets.Dataset: def group_texts(examples: Dict[str, list], block_size: int = 512) -> BatchEncoding: concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} total_length = len(concatenated_examples[list(examples.keys())[0]]) total_length = (total_length // block_size) * block_size result = { k: [t[i : i + block_size] for i in range(0, total_length, block_size)] for k, t in concatenated_examples.items() } result["labels"] = result["input_ids"].copy() result = BatchEncoding(result) return result def format_inputs(examples): p = examples["段落"] mergeLine = "" for line in p: mergeLine += line["内容"] + "\n" return {"text": mergeLine} def tokenize_inputs( examples: Dict[str, list], tokenizer: PreTrainedTokenizer, column_name: str = "text", ) -> BatchEncoding: logits = tokenizer(examples[column_name], return_attention_mask=False) return logits dataset_column_names = list(dataset.features) dataset = dataset.map( partial(format_inputs), batched=False, num_proc=num_proc, remove_columns=dataset_column_names, ) dataset_column_names = list(dataset.features) dataset = dataset.map( partial(tokenize_inputs, tokenizer=tokenizer), batched=True, num_proc=num_proc, remove_columns=dataset_column_names, ) dataset = dataset.map( partial(group_texts, block_size=tokenizer.model_max_length), batched=True, num_proc=num_proc, ) return dataset if __name__ == "__main__": if tokenizer_name_or_path is None: tokenizer_name_or_path = model_name set_seed(seed) # lightning module model_dir = snapshot_download(model_name) lit_module = LitModule(model_dir, learning_rate, use_tril_attention_mask) tokenizer = QWenTokenizer("./wit_b64.tiktoken", "./wit_char.tiktoken") train_dataset_list = [] val_dataset_list = [] for dn in dataset_name: datanames = dn.split(".") if datanames[-1] == "gz" and datanames[-2] == "jsonl": raw_dataset = datasets.load_dataset("json", data_files=dn) elif datanames[-1] == "json": raw_dataset = datasets.load_dataset("json", data_files=dn) else: raw_dataset = datasets.load_dataset(dn) train_dataset, val_dataset = split_raw_dataset(raw_dataset) train_dataset = process_dataset(train_dataset, tokenizer) val_dataset = process_dataset(val_dataset, tokenizer) train_dataset_list.append(train_dataset) val_dataset_list.append(val_dataset) train_dataset = ConcatDataset(train_dataset_list) val_dataset = ConcatDataset(val_dataset_list) train_dataset = SpecialDataset(0, 1000, 65536) val_dataset = SpecialDataset(1000, 1024, 1024) train_dataloader = DataLoader( train_dataset, batch_size=train_batch_size, num_workers=num_proc, collate_fn=DefaultDataCollator(), persistent_workers=True, shuffle=True, ) val_dataloader = DataLoader( val_dataset, batch_size=val_batch_size, num_workers=num_proc, collate_fn=DefaultDataCollator(), persistent_workers=True, shuffle=True, ) torch.set_float32_matmul_precision("medium") precision = precision lit_trainer = pl.Trainer(accelerator="gpu", precision=precision, strategy=strategy, max_epochs=max_epochs) lit_trainer.fit( lit_module, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, ckpt_path=resume_from_ckpt_path, )