Oasistブログ

言語学、エンジニアリング、ライフ記事を気まぐれにお届け

Python + Seleniumでウェブスクレイピング Vol.2 - より複雑なDOM -

f:id:oasist:20210128231504p:plain
ウェブスクレイピング

Contents

1. 成果物

ランキング - Webスクレイピング入門上の全ページからツアーレビュー情報を取得し、CSVファイルにエクスポートする。

観光地 総合評価 楽しさ 人混みの多さ 景色 アクセス
0 観光地 1 4.7 4.6 4.5 4.9 4.2
1 観光地 2 4.7 4.6 4.5 4.9 4.2
2 観光地 3 4.6 4.5 4.4 4.8 4.1
3 観光地 4 4.5 4.4 4.4 4.8 4.0
4 観光地 5 4.5 4.4 4.3 4.7 4.0
5 観光地 6 4.4 4.3 4.3 4.7 3.9
6 観光地 7 4.3 4.2 4.2 4.6 3.8
7 観光地 8 4.3 4.2 4.2 4.6 3.8
8 観光地 9 4.2 4.1 4.1 4.5 3.7
9 観光地 10 4.1 4.0 4.1 4.4 3.6
10 観光地 11 4.1 4.0 4.0 4.4 3.6
11 観光地 12 4.0 3.9 4.0 4.3 3.5
12 観光地 13 3.9 3.8 3.9 4.3 3.4
13 観光地 14 3.9 3.8 3.9 4.2 3.4
14 観光地 15 3.8 3.7 3.8 4.2 3.3
15 観光地 16 3.7 3.6 3.8 4.1 3.2
16 観光地 17 3.7 3.6 3.7 4.1 3.2
17 観光地 18 3.6 3.5 3.7 4.0 3.1
18 観光地 19 3.5 3.4 3.6 3.9 3.0
19 観光地 20 3.5 3.4 3.6 3.9 3.0
20 観光地 21 3.4 3.3 3.5 3.8 2.9
21 観光地 22 3.3 3.2 3.5 3.8 2.8
22 観光地 23 3.3 3.2 3.4 3.7 2.8
23 観光地 24 3.2 3.1 3.4 3.7 2.7
24 観光地 25 3.1 3.0 3.3 3.6 2.6
25 観光地 26 3.1 3.0 3.3 3.6 2.6
26 観光地 27 3.0 2.9 3.2 3.5 2.5
27 観光地 28 2.9 2.8 3.2 3.4 2.4
28 観光地 29 2.9 2.8 3.1 3.4 2.4
29 観光地 30 2.8 2.7 3.1 3.3 2.3

2. 実装

2-1. 情報取得

初めに、Google Chromeを立ち上げて基底URLを取得する必要がある。
この一連の処理は InfoCollector クラスのインスタンス生成時に呼び出される。
コンストラクタは基底URL(https://scraping-for-beginner.herokuapp.com/ranking/)を引数に取る。

InfoCollector#__init__

def __init__(self, base_url):
    self.chrome = webdriver.Chrome(executable_path="../exec/chromedriver.exe")
    self.base_url = base_url

InfoCollector#_get_urlInfoCollector#export_csv を除き、各メソッドで呼び出される。
このメソッドは基底URLとクエリ文字列("?page={page_num}")を結合し、URLにアクセスする。
InfoCollector#_get_categirues はクエリ文字列を必要としないため、デフォルト値は "" に設定されている。
get メソッドを使うことで指定したURLにアクセス出来るが、あらかじめ webdriver をインポートするのを忘れないように。

InfoCollector#_get_url is called in each method except for InfoCollector#export_csv.
This method combine the base URL with a query string("?page={page_num}") and get to the URL.
InfoCollector#_get_categirues need no query string, so its default value is defined as "".
get method enables you to get to the URL, but please do not forget to import webdriver in advance.

InfoCollector#_get_url

def _get_url(self, query_str=""):
    url = self.base_url + query_str
    self.chrome.get(url)

f:id:oasist:20210129164536p:plain
観光地

InfoCollector#get_titlesfind_elements_by_class_name メソッドで .u_title クラス要素のリストを取得し、以下の手順で各要素からテキスト情報を取り出す。

  1. 各要素のテキスト情報を挿入するための空のリストを生成する。
  2. for 文でループを回し、各要素の .text でテキスト情報を取り出す。
  3. テキスト情報にエスケープシーケンスが含まれている場合、split 関数で分割した文字列の配列の最後の要素を文字情報を取り出す。。
  4. 手順1で生成したリストにテキスト情報を追加する。

InfoCollector#get_titles

def get_titles(self, query_str):
    self._get_url(query_str)
    elem_titles = self.chrome.find_elements_by_class_name("u_title")
    titles = []
    for elem_title in elem_titles:
        titles.append(elem_title.text.split("\n")[-1])
    return titles

f:id:oasist:20210129164656p:plain
総合評価

InfoCollector#get_evaluations.u_rankBox クラス要素のリストを取得し、以下の手順で各要素から少数を取り出す。

  1. 各要素のテキスト情報を挿入するための空のリストを生成する。
  2. for 文でループを回して各要素を取り出し、find_element_by_class_name 関数で .evaluateNumber クラス要素を取得する。
  3. クラス要素から文字情報を取り出す。
  4. float 関数で取得した文字列を少数に変換する。
  5. 手順1で生成したリストに少数を追加する。

InfoCollector#get_evaluations

def get_evaluations(self, query_str):
    self._get_url(query_str)
    elem_rank_boxes = self.chrome.find_elements_by_class_name("u_rankBox")
    evaluations = []
    for elem_rank_box in elem_rank_boxes:
        evaluations.append(float(elem_rank_box.find_element_by_class_name("evaluateNumber").text))
    return evaluations

InfoCollector#get_evaluations.u_categoryTipsItem クラス要素のリストを返す。
このプライベートメソッドは他のパブリックメソッドで使いまわしている。

InfoCollector#_get_ranking_items

def _get_ranking_items(self):
    elem_ranking_items = self.chrome.find_elements_by_class_name("u_categoryTipsItem")
    return elem_ranking_items

f:id:oasist:20210129165009p:plain
カテゴリー

InfoCollector#get_categories カテゴリーのリストを返す。
重複を排除するため、リストの最初の要素のみ参照する。

InfoCollector#get_categories

def get_categories(self):
    self._get_url()
    elem_ranking_items = self._get_ranking_items()
    categories = tag_elems_list(elem_ranking_items, "dt")
    return categories[0]

tag_elems_list 関数は以下のロジックを持つ。

  1. 各要素のテキスト情報の配列を挿入するための空のリストを生成する。
  2. for 文でループを回して elem_ranking_items の各要素を取り出す。
  3. 各要素のテキスト情報を挿入するための一時保存用の空のリストを生成する。
  4. for 文でループを回して find_elements_by_tag_name 関数が返すリストからタグ要素を取り出す。
  5. .text で文字情報を取り出し、手順3で生成した一時保存用のリストに追加する。
  6. 一時保存用のリストを手順1で生成したリストに追加する。

list_hander.tag_elems_list

def tag_elems_list(items, tag):
    elems_list = []
    for item in items:
        _elems_list = []
        for elem in item.find_elements_by_tag_name(tag):
            _elems_list.append(elem.text)
        elems_list.append(_elems_list)
    return elems_list

f:id:oasist:20210129165042p:plain
評価

InfoCollector#get_rankings は評価の二次元配列を返す。

InfoCollector#get_rankings

def get_rankings(self, query_str):
    self._get_url(query_str)
    elem_ranking_items = self._get_ranking_items()
    rankings = class_elems_list(elem_ranking_items, "is_rank")
    return rankings

f:id:oasist:20210129165106p:plain
コメント

InfoCollector#get_comments はコメントの二次元配列を返す。

InfoCollector#get_comments

def get_comments(self, query_str):
    self._get_url(query_str)
    elem_ranking_items = self._get_ranking_items()
    comments = class_elems_list(elem_ranking_items, "comment")
    return comments
  1. 各要素のテキスト情報の配列を挿入するための空のリストを生成する。
  2. for 文でループを回して elem_ranking_items の各要素を取り出す。
  3. 各要素のテキスト情報を挿入するための一時保存用の空のリストを生成する。
  4. for 文でループを回して find_elements_by_class_name 関数が返すリストからクラス要素を取り出す。
  5. .text で文字情報を取り出し、is_float 関数が True を返す場合は少数として、False を返す場合は文字列として手順3で生成した一時保存用のリストに追加する。
  6. 一時保存用のリストを手順1で生成したリストに追加する。

list_hander.class_elems_list

def class_elems_list(items, klass):
    elems_list = []
    for item in items:
        _elems_list = []
        for elem in item.find_elements_by_class_name(klass):
            if isfloat(elem.text):
                _elems_list.append(float(elem.text))
            else:
                _elems_list.append(elem.text)
        elems_list.append(_elems_list)
    return elems_list

is_float 関数は、isdecimalがパラメーターを整数として評価しない場合、かつ float 関数が例外 ValueError を発生させない限り True を返す。

decimal_handler.isfloat

def isfloat(param):
    if not param.isdecimal():
        try:
            float(param)
            return True
        except ValueError:
            return False
    else:
        return False

2-2. CSVエクスポート

TextExtractor#export_csv は観光地、総合評価、カテゴリー、評価、ファイルパスを引数に取る。

  1. DataFrame メソッドで空のデータフレームを生成する(pandas のインポートをあらかじめしておくこと)。
  2. titlesevaluationsdf[ラベル] に代入する。
  3. rankings を引数に新たにデータフレームを生成する。
  4. categoriesdf_rankingscolumns 属性に代入する。
  5. dfdf_rankings を結合し、変数 df に代入する。
  6. to_csv メソッドにファイルパスを渡し、CSVファイルをエクスポートする。

InfoCollector#export_csv

def export_csv(self, titles, evaluations, categories, rankings, path):
    df = pd.DataFrame()
    df["観光地"] = titles
    df["総合評価"] = evaluations
    df_rankings = pd.DataFrame(rankings)
    df_rankings.columns = categories
    df = pd.concat([df, df_rankings], axis=1)
    df.to_csv(path)

3. ユニットテスト

  • TestInfoCollector#setUpGoogle Chromeを立ち上げ、全観光地、全総合評価、全評価を全てのページから、全カテゴリーを最初のページからのみ取得する。
  • TestInfoCollector#test_get_titlesInfoCollector#get_titles が全観光地を取得しているかを検証する。
  • TestInfoCollector#test_get_evaluationsInfoCollector#get_evaluations全総合評価を取得しているか検証する。
  • TestInfoCollector#test_get_categoriesInfoCollector#get_categories が全カテゴリーを重複がない状態で取得しているかを検証する。
  • TestInfoCollector#test_get_rankingsInfoCollector#get_rankings が全評価を取得しているかを検証する。
  • TestInfoCollector#test_get_commentsInfoCollector#get_comments が全コメントを取得しているかを検証する。
  • TestInfoCollector#test_export_csvInfoCollector#export_csvCSVファイルを指定したパスにエクスポートしているか検証する。

test/test_text_extractor.py

import unittest
import sys
sys.path.append("../lib")
sys.path.append("../lib/concerns")
import os.path
from os import path
from info_collector import InfoCollector

class TestInfoCollector(unittest.TestCase):
    def setUp(self):
        self.info_collector = InfoCollector("https://scraping-for-beginner.herokuapp.com/ranking/")
        titles = []
        evaluations = []
        rankings = []
        for i in range(1, 4):
            titles.append(self.info_collector.get_titles("?page={}".format(i)))
            evaluations.append(self.info_collector.get_evaluations("?page={}".format(i)))
            rankings.append(self.info_collector.get_rankings("?page={}".format(i)))
        self.titles = sum(titles, [])
        self.evaluations = sum(evaluations, [])
        self.rankings = sum(rankings, [])
        self.categories = self.info_collector.get_categories()

    def test_get_titles(self):
        self.assertEqual([
            "観光地 1",
            "観光地 2",
            "観光地 3",
            "観光地 4",
            "観光地 5",
            "観光地 6",
            "観光地 7",
            "観光地 8",
            "観光地 9",
            "観光地 10",
            "観光地 11",
            "観光地 12",
            "観光地 13",
            "観光地 14",
            "観光地 15",
            "観光地 16",
            "観光地 17",
            "観光地 18",
            "観光地 19",
            "観光地 20",
            "観光地 21",
            "観光地 22",
            "観光地 23",
            "観光地 24",
            "観光地 25",
            "観光地 26",
            "観光地 27",
            "観光地 28",
            "観光地 29",
            "観光地 30",
        ], self.titles)

    def test_get_evaluations(self):
        self.assertEqual([
            4.7, 4.7, 4.6, 4.5, 4.5, 4.4, 4.3, 4.3, 4.2, 4.1,
            4.1, 4.0, 3.9, 3.9, 3.8, 3.7, 3.7, 3.6, 3.5, 3.5,
            3.4, 3.3, 3.3, 3.2, 3.1, 3.1, 3.0, 2.9, 2.9, 2.8
        ], self.evaluations)

    def test_get_categories(self):
        self.categories = self.info_collector.get_categories()
        self.assertEqual(["楽しさ", "人混みの多さ", "景色", "アクセス"], self.categories)

    def test_get_rankings(self):
        self.assertEqual([
            [4.6, 4.5, 4.9, 4.2],
            [4.6, 4.5, 4.9, 4.2],
            [4.5, 4.4, 4.8, 4.1],
            [4.4, 4.4, 4.8, 4.0],
            [4.4, 4.3, 4.7, 4.0],
            [4.3, 4.3, 4.7, 3.9],
            [4.2, 4.2, 4.6, 3.8],
            [4.2, 4.2, 4.6, 3.8],
            [4.1, 4.1, 4.5, 3.7],
            [4.0, 4.1, 4.4, 3.6],
            [4.0, 4.0, 4.4, 3.6],
            [3.9, 4.0, 4.3, 3.5],
            [3.8, 3.9, 4.3, 3.4],
            [3.8, 3.9, 4.2, 3.4],
            [3.7, 3.8, 4.2, 3.3],
            [3.6, 3.8, 4.1, 3.2],
            [3.6, 3.7, 4.1, 3.2],
            [3.5, 3.7, 4.0, 3.1],
            [3.4, 3.6, 3.9, 3.0],
            [3.4, 3.6, 3.9, 3.0],
            [3.3, 3.5, 3.8, 2.9],
            [3.2, 3.5, 3.8, 2.8],
            [3.2, 3.4, 3.7, 2.8],
            [3.1, 3.4, 3.7, 2.7],
            [3.0, 3.3, 3.6, 2.6],
            [3.0, 3.3, 3.6, 2.6],
            [2.9, 3.2, 3.5, 2.5],
            [2.8, 3.2, 3.4, 2.4],
            [2.8, 3.1, 3.4, 2.4],
            [2.7, 3.1, 3.3, 2.3]
        ], self.rankings)

    # Comments are shown at random every time the browser is booted, so the value of each element cannot be tested.
    def test_get_comments(self):
        comments = []
        for i in range(1, 4):
            comments.append(self.info_collector.get_comments("?page={}".format(i)))
        self.assertEqual(30, len(sum(comments, [])))

    def test_export_csv(self):
        self.info_collector.export_csv(self.titles, self.evaluations, self.rankings, self.categories, "../csv/tour_reviews.csv")
        self.assertEqual(True, path.exists("../csv/tour_reviews.csv"))

if __name__ == "__main__":
    unittest.main()

4. ソースコード

oasis-forever/web_scraping_tutorial