CommonCrawlから有益な日本語の記事と文章を機械学習で抽出するスクリプトのプロトタイプ

はじめに

大規模言語の事前学習には、Webデータを片っ端からダウンロードしたサイト(CommonCrawl, CC)が大活躍します。
普通はCCを使いやすい形で加工したコーパスを用いるのですが、意外と低品質だったり、最新の情報が含まれていなかったり、諸々の問題があります。

そこで、独自に日本語コーパスを作る動きも出ています。

本記事は、以下の記事の続きとして、CCからWebデータをダウンロードして、良い感じの日本語を抜き出すまでのメモです。突貫工事のコードなのでご了承ください。

2/21追記 githubで作業中です(未完成)


CCからコーパスを作る難しさ・容易さ

CCから本気でコーパスを作るのは、とても大変です。
まず第一に1 snapshotあたりデータのサイズ(圧縮状態, WET)が100TiBと超巨大です。これを正攻法で真面目にやるには、相応のリソースが必要です。

また、データ中にはゴミテキストが大量に含まれるので、それらを適切に処理するアルゴリズムも必要です。

ただ、以下の条件下では、意外と素人でコーパスを作れそうな手応えが見えてきました。理由は以下の通りです。

  • ファイルを分散処理できる

    • CCは数万件程度のファイルに分割されています

      • WARCファイルは1つあたり数GB程度(圧縮時は<1GB)と、そこまで大きくないです

      • 普通のPCでも処理できるサイズ感です

      • これらの分割ファイルのダウンロードやデータ処理は、独立した複数のPCで行うことが可能です

  • 律速段階は多くのケースで、ダウンロードになります

    • ダウンロードには時間がかかるので、その間に、収集済みのファイルをデータ処理する猶予があります

      • ダウンロードに1ファイルあたり数分はかかるとすると

      • 処理スクリプトの実行時間も数分程度まで許されることになります

      • つまり、超高速のアルゴリズムを書く必要は無さそうです

  • 適当に作ったテキスト分類のアルゴリズムでも、それなりに日本語文章を抽出できることがわかりました(本記事)

    • 品質にそこまでこだわらなければ、相応のコーパスを作れる可能性があるかもしれません

WARCファイルのダウンロード

以下、WARCファイルのパスリストを参照にしながら、順次、ファイルをダウンロードするスクリプトの例です。

パスリストの取得

target_path="path/warc.paths"

with open(target_path,"r") as f:
    path_list=f.readlines()

path_list=[path.strip() for path in path_list]
path_list[0]

ダウンロードや解凍 (gzは削除)

import gzip
import shutil

import requests
import os
def download_gz(url, save_path):
    if os.path.exists(save_path):
        print(f"ファイルがすでに存在します: {save_path}")
        return
    response = requests.get(url, stream=True)
    
    if response.status_code == 200:
        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=128):
                f.write(chunk)
        print(f"ファイルが正常にダウンロードされました: {save_path}")
    else:
        print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}")

def decompress_gz(gz_path, output_path):
    with gzip.open(gz_path, 'rb') as f_in:
        with open(output_path, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    print(f"{gz_path}が解凍され、{output_path}に保存されました。")
    os.remove(gz_path)

実際のダウンロード

base_url="https://data.commoncrawl.org/"
i=1
#download

for i in range(2,4,1):  #ダウンロードするファイルはここで指定
    path=path_list[i]
    url=base_url+path

    path_=path.replace("/","_")
    save_path = f"data/{path_}"
    download_gz(url, save_path)
    decompress_gz(save_path, save_path.replace(".gz",""))

WARCファイルからの日本語テキストの抽出

WARCファイルからざっくり日本語のテキストページを抽出するスクリプトです。

補助関数

from warcio.archiveiterator import ArchiveIterator
from bs4 import BeautifulSoup
from tqdm import tqdm
import json
import glob
ja_soup_list=[]

def halfwidth_ratio(s):
    if len(s) == 0:  # 空の文字列の場合は0を返す
        return 0
    halfwidth_count = sum(
        1 for char in s
        if '\u0020' <= char <= '\u007E' or  # 基本的なASCII範囲
           '\uFF61' <= char <= '\uFF9F' or  # 半角カタカナ
           char in ('\u0009', '\u000A', '\u000D')  # タブ、改行、復帰
    )
    return halfwidth_count / len(s)

def pre_clean(soup):
    texts_with_tags = []
    for tag in soup.find_all(True):
        # 特定のタグを除外する場合
        # if tag.name not in ['html', 'body', 'ul']:
        text = tag.get_text(separator="\n", strip=True)
        spl_text = text.split("\n")
        spl_text = [i.strip() for i in spl_text if i.strip()]  # 空の文字列を除外
        for item in spl_text:
            if tag.name=="script" or tag.name=="style":
                continue
            texts_with_tags.append((item, tag.name))  # テキストとタグの名前をタプルとして追加
    return texts_with_tags

メイン処理

import os
def extract_japanese_from_warc(path,
                               save_dir="json",
                               ja_soup_list=[],
                               max_num=10**10,
                               ):

    path=path.replace("\\","/")#for windows env
    filename=path.split("/")[-1].replace(".warc",".json")
    if os.path.exists(f"{save_dir}/{filename}"):
        print("already done")
        return

    #途中から再開する用の位置情報の取得
    if len(ja_soup_list)>0:
        fin_record_id=ja_soup_list[-1]["record_id"]
    else:
        fin_record_id=0

    # WARCファイルを開く
    record_id = 0
    with open(path, 'rb') as stream:
        for record in tqdm(ArchiveIterator(stream)):
            record_id += 1
            if record_id<=fin_record_id:
                continue
            try:
                if record.rec_type == 'response':
                    if record.http_headers.get_header('Content-Type') == 'text/html':
                        content = record.content_stream().read()
                        soup = BeautifulSoup(content, 'html.parser')
                        
                        # <html>タグからlang属性を取得
                        html_tag = soup.find('html')
                        if html_tag and html_tag.has_attr('lang'):
                            lang = html_tag['lang']
                            #print(f"Found language: {lang}")
                            texts=pre_clean(soup)
                            if len(texts)==0:
                                continue
                            if lang=="ja":
                                d={
                                    "record_id":record_id,
                                    "url":record.rec_headers.get_header('WARC-Target-URI'),
                                    "title":soup.title.string,
                                    "timestamp":record.rec_headers.get_header('WARC-Date'),
                                    "text":texts,
                                }
                                ja_soup_list.append(d)
                                print(f"Found Japanese: {d['url']}")

                            if len(ja_soup_list)>max_num:
                                break
            except:
                continue            
    with open(f"{save_dir}/{filename}", "w") as f:
        json.dump(ja_soup_list, f, indent=4, ensure_ascii=False)

処理ループ

import glob
for path in glob.glob("data/*.warc"):

    extract_japanese_from_warc(path,
                                save_dir="json",
                                ja_soup_list=[],
                                #max_num=10
                                max_num=10**10
                                )

機械学習によるページの自動分類

Webページの半分くらいは、販売・求人・公序良俗に反するサイトのようです。
これらを、「title + url + 本文の一部の情報」から、クラス分けして除外してみます。

生成した上記jsonの読み込み

import glob
import json
import random
title_url_list = []

for filename in glob.glob('json/*.json'):
    with open(filename, 'r') as f:
        data = json.load(f)
        for page in data:
            texts=page['text']
            texts=[i[0] for i in texts]
            text=' '.join(texts)[:300]
            title_url_list.append((page['title'], page['url'], text))

アノテーション用のデータ生成

page_title_path="annot/pages/titles.txt"
random.shuffle(title_url_list)

def gen_problem(title,url,text):
    return f"{title}_-_-_-{url}_-_-_-_-{text}".strip()

problem_list=[]
with open(page_title_path, 'w') as f:
    for title, url,text in title_url_list:
        problem=gen_problem(title,url,text)
        f.write(problem)
        problem_list.append(problem)

このコードを実行すると、タイトル一覧が生成されます。
それぞれのページのgood/badについて、アノテーションしていきます。
アノテーションには以下のサイトが便利です。
とりあえず100件ほどやりました。

アノテーションデータの読み込みとクラス分け

import pandas as pd
texts=[]
labels=[]
#annotの読み込み
annot_path="annot/pages/output.txt"


df=pd.read_csv(annot_path,delimiter="\t", header=None,
on_bad_lines ="warn",
                names=["text", "label"])

df["label"]=df["label"].str.replace("bad","1")
df["label"]=df["label"].str.replace("good","0").astype(int)


#textがnanのものを削除
df=df.dropna(subset=["text"])

texts+=df["text"].tolist()
labels+=df["label"].tolist()

機械学習
適当にlogistic regressionを使いました。


import MeCab
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.calibration import CalibratedClassifierCV


# テキストデータの前処理とトークン化のための関数
def tokenize(text):
    mecab = MeCab.Tagger()
    mecab.parse('')  # MeCabのバグ対策
    node = mecab.parseToNode(text)
    tokens = []
    while node:
        if node.surface != '':
            tokens.append(node.surface)
        node = node.next
    return tokens

# データセットの分割
X_train, X_test, y_train, y_test = train_test_split(texts, labels, test_size=0.2, random_state=42)

# TF-IDFベクトル化とSVMを用いた分類モデルのパイプライン
pipeline = Pipeline([
    ('tfidf', TfidfVectorizer(tokenizer=tokenize)),
    ('clf', LinearSVC()),
])

pipeline = Pipeline([
    ('tfidf', TfidfVectorizer(tokenizer=tokenize)),
    #('clf', CalibratedClassifierCV(LinearSVC(), method='sigmoid')),
    #('clf', CalibratedClassifierCV(RandomForestClassifier())),
    ('clf', CalibratedClassifierCV(LogisticRegression())),
])
# モデルの訓練
pipeline.fit(X_train, y_train)

# モデルの評価
predictions = pipeline.predict(X_test)
print(classification_report(y_test, predictions))

評価結果: 精度は改善の余地がありますね。

追加でアノテーションする場合は、予測の自信が低いデータを選ぶと良さそうです。全部で300件ほどannotationしました。

#未知問題の抽出
unlabeled_problem_list=[i for i in problem_list if i not in texts]
len(unlabeled_problem_list)
annotations = pipeline.predict(unlabeled_problem_list)
predicted_probabilities = pipeline.predict_proba(unlabeled_problem_list)

import pandas as pd
import time
#新たな問題の生成
cut_threshold=0.3

df = pd.DataFrame({ "label": annotations, "probability": predicted_probabilities[:, 1],
"text": unlabeled_problem_list,
                   
                   })
df=df.sort_values(by="probability", ascending=False)
df=df[df["probability"]>cut_threshold]
df=df[df["probability"]<1-cut_threshold]

texts=df["text"].tolist()
ct=int(time.time())
with open(f"annot/pages/{ct}.text", "w") as f:
    f.write("\n".join(texts))

最後に、classifierでページを分別していきます。

#classifierでjsonを整理
good_count=0
bad_count=0
for file_path in glob.glob('json/*.json'):
    cleaned_data=[]
    with open(file_path, 'r') as f:
        data = json.load(f)
        for page in data:
            texts=page['text']
            texts=[i[0] for i in texts]
            text=' '.join(texts)[:300]
            problem=gen_problem(page['title'], page['url'], text)

            genre=pipeline.predict([problem])[0]
            proba=pipeline.predict_proba([problem])[0][1]

            if genre==0:
                cleaned_data.append(page)
                page["text"]=[i[0]  for i in page["text"]]
                print("good",page['title'],proba)
                good_count+=1
            else:
                print("bad",page['title'],proba)
                bad_count+=1

    cleaned_file_path=file_path.replace("\\","/")
    cleaned_file_path=cleaned_file_path.replace("json/","json/cleaned/")
    with open(cleaned_file_path, 'w') as f:
        json.dump(cleaned_data, f, ensure_ascii=False, indent=4)

以下のような分類結果(抜粋)となりました。わりといい感じにクラス分けできたと思います。

一連の処理を経て、元のWARCファイル(5GB)が、5MB程度まで清掃されました。

テキストのクリーニング(途中まで)

最後にテキストをクリーニングしていきます。
やることはページ分類とほぼ同じで、htmlタグによって分割された
「テキスト1」ー「テキスト2」ー「テキスト3」の連なりが、日本語としてgoodかbadかを判定していきます。

例えば
「ようこそ」ー「サイトマップ」ー「プライバシーポリシー」
のような羅列は、ほぼ間違いなく自動挿入されたリンクなので、badなテキストとして判定します。
それに対し、
「こんにちは、今日は」ー「Wikipedia」ー「を使います」
のような羅列は、日本語と自然なので、残します。
(「wikipedia」にhrefタグでリンクが貼られていたりすると、上記のようにタグレベルでテキストが分割されてしまいます)

このアノテーション作業を500件くらい行いました。
コード説明はページ分類とおおむね同じなので、割愛します。

コード自体は以下のリンクから落とせます。
down: ダウンロードスクリプト
page_classification: ページの分別
clean_text: テキストの掃除

(アノテーションデータは著作権の問題があるので、uploadできません)

結果

元のWARCファイル(5GB)が、500 kb程度までスリムになりました。

定性的な性能検証として、以下の経産省のサイト

https://www.enecho.meti.go.jp/about/special/johoteikyo/co2_plastics.html

を諸々処理した結果、、、

メルマガ登録
地球温暖化の一因として、いまやすっかりやっかいものの代名詞のようになっているCO2。削減しようという取り組みはよく聞きますが、むしろCO2を積極的に有効活用する方法はないの?そうすれば、「脱炭素化社会」の実現もより加速するはず!そこで、CO2を使うことで役立つモノを生み出そうという研究が進められています。生み出そうとしているのは、ずばり、プラスチックの原料。そんな夢物語にも聞こえる研究が、ここ日本で、実用化に向け一歩ずつ動き出しているのです。
CO2を使ってモノを生み出すしくみのお手本は、「植物」?!
やっかいもののCO2を、逆に資源として活用する―。この考え方は「カーボンリサイクル」と呼ばれ、さまざまな分野で研究が進められています。
CO2を使ってモノをつくるしくみには、お手本があります。それは、植物がおこなう「光合成」です。植物はCO2と水を吸収して、でんぷんと酸素を生み出しています。それと同じように、人の手によって、CO2から有益なモノを生み出すことはできないものでしょうか?
こうした考えから研究されているのが、「人工光合成」と呼ばれる技術です。
「人工光合成」とは、植物がおこなっている光合成のように、人工的に化学品を合成する技術。植物の光合成に太陽光が不可欠なように、人工光合成でも太陽エネルギーを活用します。さまざまな化学品をつくることが期待されていますが、その中のひとつが、プラスチックなどの原料となる「オレフィン」です。
太陽と水とCO2から、プラスチックができる!
人工光合成の技術でプラスチックの原料をつくるプロセスについて、もう少し詳しく説明しましょう。
人工光合成では、まず「光触媒」と呼ばれる、太陽光に反応する物質を使って、水(H2O)を水素(H2)と酸素(O2)に分解します。ただし、ここではまだ、水素と酸素は混合している状態です。
次に、水素のみが通過できる「分離膜」を使って、水素と酸素を分離し、水素を取り出します。最後に、この水素と、工場などから排出されたCO2とを合わせ、化学合成をうながす「合成触媒」を使って、「オレフィン」を作ります。
大きい画像で見る
このように人工光合成では、「光触媒」「分離膜」「合成触媒」の3つの技術が重要な役割をはたします。特に触媒は、いかに効率的に水素を取り出せるか、いかに効率的に化学合成物を生み出せるかの決め手になる技術。日本はこの触媒技術で国際的に強みをもっていることから、人工光合成の実現に期待がかかっています。
たとえばレジ袋やラップはオレフィンの一種である「エチレン」から、ストローや医療機器は同じくオレフィンの一種「プロピレン」から作られています。そのほかにも、食品トレイやペットボトルなどもプラスチックからつくられています。これだけ生活に身近にあるプラスチック製品に、排出されたCO2が使われるようになるとしたら!どれだけのCO2削減につながるのか、考えただけでもワクワクしますね。
さまざまなプラスチック製品

....

というテキストが得られました。元のサイトには、「ご意見」、「お問い合わせ」などの自動挿入リンクが多量に貼られているのですが、一連のスクリプトで、概ね消せたようです。

(「メルマガ登録」、「大きい画像で見る」あたりの文字列はクリーニング不足なので、もう少しアノテーションデータを増やす必要がありそうです)

以上、わりといい加減な実装ながら、100点満点中、80点くらいの精度でクリーニングができたと思います。
アノテーションデータを増やしたり、分類アルゴリズムをfasttextやdeep learning系に変えれば、もっと良い精度でコーパスを作れそうです。


この記事が気に入ったらサポートをしてみませんか?