【Python】BERTの実装方法|文章分類, pytorch

Python
スポンサーリンク

・自然言語処理コンペに参加したい
・コンペでも戦える優秀なモデルを知りたい
・文章の分類や単語予測をしてみたい

こんな方におススメなモデルがBERTです。

現在の自然言語処理コンペではみんながBERTを利用しています。

自然言語処理について、過去にLightGBMとtf-idfを使って文章分類をする方法を解説しました。

このモデルでの正解率は70%強くらいですが、BERTを使えばもっと正解率が上がります。

計算が重いのでGoogleColabでGPUを使うといいですよ!

・Tokenizerを作る
・モデルにinput_ids, attention_mask, token_type_idsを入れる
・BERTの出力は複数ある(最初のpooler_outputを使うのが簡単)

以上3点に注意すれば大丈夫です。

データ準備

インポート

from sklearn.datasets import fetch_20newsgroups
import pandas as pd

train_data = fetch_20newsgroups(subset = 'train')
valid_data = fetch_20newsgroups(subset = 'test')

train = pd.DataFrame({"text" : train_data["data"], "target" : train_data["target"]})
valid = pd.DataFrame({"text" : valid_data["data"], "target" : valid_data["target"]})
print(train.shape, valid.shape)
train.head()

今回使う文章データを準備します。

scikit-learnから文章のデータセットをインポートしましょう。

“fetch_20newsgroups”は文章が20個のグループに分けられたデータセットです。

引数”subset”を”train”にすると学習データ、”valid”にすると評価データが得られます。

このデータをpandasのデータフレームに入れましょう。

こんな感じで文章(text)と予測したい値(target)が手に入りました。

“target”は0~19の数値で、20個のグループを表現しています。

データを見てみましょう。

text = train["text"].values[0]
print(text)

メールの文章みたいですね。

target = train["target"].values[0]
print(target)

1行目の”target”は7です。

print(train_data.target_names)
print(train_data.target_names[7])

“target”の種類は”target_names”に入っています。

これの7個目は”rec.autos”で、つまり車関係の文章という意味です。

確かに文章には車関係ぽい単語がありますね。

こんな感じで文章とそのグループが与えられています。

今回は、文章からグループを予測する分類モデルを作りましょう!

Text Cleaning

このままでは文章が汚いので少し処理しましょう。

print(text)

今はコロンやカンマなどの記号や、改行も多いのでスッキリさせましょう。

“re”という正規表現ライブラリを使えばうまく処理できます。

import re
def cleaning(text):
    text = re.sub("\n", " ", text) # 改行削除
    text = re.sub("[^A-Za-z0-9]", " ", text) # 記号削除
    text = re.sub("[' ']+", " ", text) # スペース統一
    return text.lower() # 小文字で出力
・改行削除
・カンマやコロンなどの記号削除
・スペースを半角1個に統一
・小文字に統一

この処理方法が必ず正しいわけではありませんので、参考程度にしてください。

train["cleaned_text"] = train["text"].map(cleaning)
valid["cleaned_text"] = valid["text"].map(cleaning)

pandasのmapを使って各データに関数を実行しました。

text = train["cleaned_text"].values[0]
print(text)

こんな感じで改行がない小文字になっていればOKです。

Dataset, DataLoader

今回はpytorchでモデルを作ります。

pytorchの使い方は以下の記事で解説しています。

・データセット
・データローダー
・モデル

以上3つを作る必要があります。

まずはデータセットを作りましょう。

import torch
from torch.utils.data import Dataset, DataLoader

class Data(Dataset):
    def __init__(self, data):
        super().__init__()
        self.data = data
    
    def __len__(self):
        return self.data.shape[0]
    
    def __getitem__(self, idx):
        text = self.data["cleaned_text"].values[idx]
        label = self.data["target"].values[idx]
        return text, torch.tensor(label, dtype = torch.float)

クラスとしてデータセットを作りました。

・init:初期化条件
・len:データセットの長さを定義
・getitem:実際に取り出すときの処理(行番号idxが渡される)

initではデータ全体を渡します。

lenはデータの行サイズで大丈夫です。

getitemでは行番号idxが指定され、データを出力します。

たとえば0が指定されたときidx = 0です。

ds = Data(train)
print(ds[0])    # 0行目のデータ
print(ds[0][0]) # 0行目のcleaned_text
print(ds[0][1]) # 0行目のtarget

1行目の文章とラベルが出力されていますね。

次にデータローダーを作成しましょう。

dl = DataLoader(ds, batch_size = 4) # データローダー作成
batch = next(iter(dl))
print(len(batch[0]))
print(batch[1])

“batch_size”で指定した数だけデータを小出ししてくれます。

試しにnext(iter())で1個目のバッチを出しましょう。

データ数が”batch_size”と同じ4になっていればOKです。

本来は学習データと評価データそれぞれでデータローダーが必要なので、分けて作りましょう。

train_ds = Data(train)
valid_ds = Data(valid)
train_dl = DataLoader(train_ds, batch_size = 32, shuffle = True, drop_last = True)
valid_dl = DataLoader(valid_ds, batch_size = 32 * 2, shuffle = False, drop_last = False)

Tokenizer

Tokenizerは簡単に言うと”文章をBERT専用の形式に変換してくれるやつ”です。

!pip install transformers -q
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

まずBERTを使うには”transformers”が必要なのでpipでインストールします。

次にTokenizerを作るライブラリ”AutoTokenizer”をインポートしましょう。

そして”from_pretrained”で”bert-base-uncased”をダウンロードします。

・Tokenizrer:BERTに入れる形式に変換するやつ
・AutoTokenizer:Tokenizerの原型になるやつ
・from_pretrained:指定した種類のモデルをダウンロードする
・bert-base-uncased:サイズが小さくかつ十分強力なBERT
“bert-base-uncased”という形式のTokenizerを作った

と理解しておけば大丈夫です。

Tokenizerで文章を変換してみましょう。

text = train["cleaned_text"].values[0]
print(text)

“cleaned_text”の1つを”encode_plus”に入れて変換します。↓

encoded = tokenizer.encode_plus(text)
print(encoded.keys())

辞書型データになっており、キーは以下の通りです。

・input_ids:単語をidに変換したもの
・attention_mask:使用するidの部分は1, 使用しない部分は0(今は全部1)
・token_type_ids:1つ目の文章なら0, 2つ目の文章なら1(今は全部0)

input_ids

input_idsは文章をトークンと呼ばれるidに変換したものです。

print(len(encoded["input_ids"]))
print(encoded["input_ids"])

154個の数字の羅列になっていますね。

つまり、文章⇒数字の羅列に変換されているということです。

これは逆変換であるdecodeを使えば確かめられます。

input_ids = encoded["input_ids"]
print(input_ids)
decoded = tokenizer.decode(input_ids)
print(decoded)

文章⇒トークン⇒文章の順で元の文章にしました。

[CLS]というのは、文章全体の特徴を表現するためにあります。

詳しいことは長くなるのでググってください。

ここで大事なことは、Tokenizerで文章数値埋め込みたということだけです。

attention_mask

attention_maskは学習時に使用するトークンと無視するトークンを識別するものです。

1なら使用、0なら無視されます。

例えば後で解説するパディングで追加されたダミーの数値は学習時に不要になります。

なのでattention_mask = 0にして不要なデータは無視できるようにします。

token_type_ids

文章を2つ入力するときに使用されます。

1つ目の文章にあるトークンは0、2つ目の文章にあれば1になります。

2つの文章をインプットしたい場合に活用されますが、今回は1つだけなので意味ありません。

パディング

モデル学習時には、入れるデータの列サイズを統一する必要があります。

text1 = train_ds[0][0]
text2 = train_ds[1][0]
print(len(tokenizer.encode_plus(text1)["input_ids"]))
print(len(tokenizer.encode_plus(text2)["input_ids"]))

例えば、1つ目のデータの文章をTokenizerで変換すると154個のデータ。

2つ目の文章では171個になっており、それぞれデータの数が違いますね。

このままだとサイズを統一したデータをモデルに入れることができません。

なのでパディング(サイズの統一)をしましょう。

まず各データの変換後の長さを確認します。

import matplotlib.pyplot as plt

length = []
for text in train["cleaned_text"].values:
    encoded = tokenizer.encode_plus(text.lower())
    length.append(len(encoded["input_ids"]))
plt.hist(length)
plt.show()

ほとんどが5000以下にいますね。

import numpy as np
length = np.array(length)
print(np.quantile(length, q = 0.5))
print(np.quantile(length, q = 0.75))

numpyで中央値と75%四分位を見ます。

中央値が233、75%が368くらいのはずです。

今回は計算を軽くしたいので全部256に統一しましょう。

欲張って10000にしてもいいですが、そうするとデータサイズが大きすぎてメモリが吹っ飛びます。

MAX_LEN = 256
encoded = tokenizer.encode_plus(text, padding = "max_length", max_length = MAX_LEN, truncation = True)
print(len(encoded["input_ids"]))
print(encoded["attention_mask"][:10])
print(encoded["attention_mask"][-10:])
・padding:パディングの方法。”max_length”にすると数値で指定できる。
・max_length:最大のデータ長さ
・truncation:Trueにするとmax_lengthを超えた分はカットされる

つまり、MAX_LEN = 256に足りない分は埋めて、超えた分はカットしています。

変換後のデータサイズが256になってればOK。

ここでattention_maskを確認すると、最初は1が多く最後の方は0だけになります。

これは、パディングで埋められた分のデータは学習に使わないので、01で区別するためです。

print(tokenizer.decode(encoded["input_ids"]))

データを逆変換すると、埋められた位置は[PAD]として表現されています。

この[PAD]は学習にいらない部分なので、”attention_mask”の0として区別しているわけですね。

for key in encoded.keys():
    print(key)
    print(encoded[key])
    print(len(encoded[key]))
    print("=" * 100)

最終的にはこんな感じです。

モデル学習時にTokenizerでの変換とパディングをするので、覚えておきましょう。

モデル作成

モデルをクラスで作ります。

from torch import nn
from transformers import AutoModel

class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.bert = AutoModel.from_pretrained("bert-base-uncased")
        self.classifier = nn.Linear(in_features = 768, out_features = 20)
    
    def forward(self, input_ids, attention_mask, token_type_ids):
        outputs = self.bert(input_ids = input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)
        pooler_output = outputs.pooler_output
        logits = self.classifier(pooler_output).squeeze(-1)
        return logits
・init:初期化条件
・forward:モデルの構造

“AutoModel”の”from_pretrained”から”bert-base-uncased”のモデルを入れます。

このとき、TokenizerとModelとで同じBERTの種類にしておきましょう。

BERTの出力には複数の形式があります。今回は“pooler_output”にしました。

この出力では768個のデータがあるので、Linear層で受け取ってラベル数20にしましょう。

model = Model().cuda()

これで”model”としてモデルが作られました。

cuda()はGPUをONにしている場合に付け足してください。

CPUで実行するならcuda()は不要ですが、計算が重くて終わらないのでおススメしません。

学習

最適化手法と損失関数を定義します。

optimizer = torch.optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
最適化手法:とりあえずAdamを使っとけば無難。
損失関数:分類モデルではベーシックなCrosEntropyLoss。

以下のコードで学習させます。↓

model.train() # 学習モード
train_loss = 0

for batch in train_dl:
    optimizer.zero_grad() # 勾配リセット
    text = batch[0] # 文章
    label = batch[1].long().cuda() # 正解ラベル

    encoded = tokenizer.batch_encode_plus(
        list(text), # 文章
        padding = "max_length", # パディング方法
        max_length = MAX_LEN, # 最大長さ
        truncation = True, # 最大長さを超えたら切り捨て
        return_tensors = "pt", # pytorchで出力
        return_attention_mask = True,
        return_token_type_ids = True
        ) # バッチごと変換する。文章はリストに入れる必要あり。

    input_ids = encoded["input_ids"].cuda() # cudaはGPUがONなら書く
    attention_mask = encoded["attention_mask"].cuda()
    token_type_ids = encoded["token_type_ids"].cuda()
    preds = model(input_ids, attention_mask, token_type_ids) # 予測
    loss = criterion(preds, label) # 誤差計算

    loss.backward() # 誤差伝播
    optimizer.step() # パラメータ更新
    train_loss += loss.item()
train_loss /= len(train_dl)
print(train_loss)

文章はリストに入れて”batch_encode_plus”で変換しましょう。

“return_tensors”を”pt”つまり”pytorch”にするとtensor型となります。

“input_ids”, “attention_mask”, “token_type_ids”をモデルに入れると予測結果が出ます。

この結果を損失関数に入れて誤差を計算し、”backward”と”step”でモデルを改善します。

全バッチをfor文で繰り返したら1サイクルの学習が終わりです。

学習時間はGPUでも長いです。CPUだともはや終わりません。

次に評価データで性能を確認します。↓

model.eval() # 評価モード
valid_loss = 0

with torch.no_grad(): # 必須。忘れがち。
    for batch in valid_dl:
        text = batch[0]
        label = batch[1].long().cuda()
        encoded = tokenizer.batch_encode_plus(
            list(text),
            padding = "max_length",
            max_length = MAX_LEN,
            truncation = True,
            return_tensors = "pt",
            return_attention_mask = True,
            return_token_type_ids = True
            )
        input_ids = encoded["input_ids"].cuda()
        attention_mask = encoded["attention_mask"].cuda()
        token_type_ids = encoded["token_type_ids"].cuda()
        preds = model(input_ids, attention_mask, token_type_ids)
        loss = criterion(preds, label)

        valid_loss += loss.item()
    valid_loss /= len(valid_dl)
print(valid_loss)

“torch.no_grad”は必須なので注意しましょう。

ここでは”backward”や”step”は不要です。

以上で1サイクルの学習と評価が終わりました。

実際にはこれを何度か繰り返します。

model = Model().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr = 1e-4) # 学習率変更
criterion = nn.CrossEntropyLoss()

best_loss = np.inf # 無限大
for epoch in range(5):
    model.train()
    train_loss = 0
    for batch in train_dl:
        optimizer.zero_grad()
        text = batch[0]
        label = batch[1].long().cuda()
        encoded = tokenizer.batch_encode_plus(
            list(text),
            padding = "max_length",
            max_length = MAX_LEN,
            truncation = True,
            return_tensors = "pt",
            return_attention_mask = True,
            return_token_type_ids = True
            )
        input_ids = encoded["input_ids"].cuda()
        attention_mask = encoded["attention_mask"].cuda()
        token_type_ids = encoded["token_type_ids"].cuda()
        preds = model(input_ids, attention_mask, token_type_ids)
        loss = criterion(preds, label)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_dl)

    model.eval()
    valid_loss = 0
    with torch.no_grad():
        for batch in valid_dl:
            text = batch[0]
            label = batch[1].long().cuda()
            encoded = tokenizer.batch_encode_plus(
                list(text),
                padding = "max_length",
                max_length = MAX_LEN,
                truncation = True,
                return_tensors = "pt",
                return_attention_mask = True,
                return_token_type_ids = True
                )
            input_ids = encoded["input_ids"].cuda()
            attention_mask = encoded["attention_mask"].cuda()
            token_type_ids = encoded["token_type_ids"].cuda()
            preds = model(input_ids, attention_mask, token_type_ids)
            loss = criterion(preds, label)
            valid_loss += loss.item()
        valid_loss /= len(valid_dl)

    print(f"EPOCH[{epoch}]")
    print(train_loss)
    print(valid_loss)
    if valid_loss < best_loss:
        best_loss = valid_loss
        torch.save(model.state_dict(), "bert.pth")
        print("saved ...")

学習と検証を5回繰り返し、最も誤差の小さいモデルを保存するようにしました。

Adamの学習率(learning_rate)を低く設定しないとうまく進まないので注意してください。

このように良くなったり悪化したりしています。

2回目の学習結果が最も良いみたいですね。

たぶん、もう少し学習率を下げてもよかったです。

予測結果

model.load_state_dict(torch.load("bert.pth", map_location = "cpu")) # 重みの読み込み
model.eval()
oof = []
with torch.no_grad():
    for batch in valid_dl:
        text = batch[0]
        encoded = tokenizer.batch_encode_plus(
            list(text),
            padding = "max_length",
            max_length = MAX_LEN,
            truncation = True,
            return_tensors = "pt",
            return_attention_mask = True,
            return_token_type_ids = True
            )
        input_ids = encoded["input_ids"].cuda()
        attention_mask = encoded["attention_mask"].cuda()
        token_type_ids = encoded["token_type_ids"].cuda()
        preds = model(input_ids, attention_mask, token_type_ids)
        oof.append(preds.cpu().numpy()) # 予測結果を入れる。cpuデータにする。
oof = np.concatenate(oof, axis = 0) # 予測結果を1次元にまとめる

モデルの予測結果をリストに入れているだけです。

print(oof.argmax(axis = 1)[:20])
print("=" * 100)
print(valid["target"].values[:20])

予測結果の列数はラベル数(20)と同じです。

最も値の大きい列つまりラベルが最も該当する確率が高いことを意味します。

なのでargmaxで最大の列番号を取り出しましょう。

元々の正解データと比較してみると、ソコソコあっていそうですね。

from sklearn.metrics import accuracy_score
acc = accuracy_score(valid["target"], oof.argmax(axis = 1))
print(acc)

正解率を計算しましょう。80%を超えているかと思います。

もっとスコアを上げたいなら、
・文章の前処理を変える
・学習率や最適化手法を変える
・モデルの種類を変える(robertaやlargeなど)
など色々やりようがあります!

まとめ:BERTを使ってみよう

今回はBERTの使い方を解説しました。

現状、自然言語処理コンペに出るなら必須と言っていいほどのモデルです。

ぜひコンペで使ってみてください!

もし計算が重すぎて手元でできない場合は、CPUでLightGBMを使うといいですよ。

コメント

タイトルとURLをコピーしました