SQL で Matrix Factorization を実装しました

こんにちは データを眺めている森藤です

TVer ではたくさんのデータがあって、どこをほっくり返してもなんか有用な知見が出てくるので毎日楽しいです。

現状

さて、現状はまだまだこれからなのですが、レコメンドのアルゴリズムや実装について考えるのも仕事の一つです。 レコメンドといえば、いろいろな手法やベースとなる考え方、タイミングなど様々なものがありますが、そのうちの一つとして、協調フィルタリングというものがあります。 これは端的に言うと、「これを見ている人はこれも見ています」的なやつです。

ただ、協調フィルタリングは実世界において適用が難しく、TVer でも多分にもれずそのまま適用するには課題があります。 大きな課題が「ユーザは限られたコンテンツ(エピソード)しか閲覧しないため、これを見た人はこれも見ています」と適用することが難しい、というものです

user_id series_id
1 3
1 12
2 9
2 34
3 61
3 123
3 612

みたいなデータが有ったとして、よし、 user_id=2 にコンテンツ 72 を推薦してみよう!ってならないですよね

対策

スパース(疎)なデータに対してなんとか協調フィルタリングをするために考えられたアルゴリズムとして、 Matrix Factorization というアルゴリズムがあります。 端的に言うと、先に挙げたスパースな行列をなんとかして密な行列にする方法と言えます。

スパースな行列を密な行列にする方法として、1) 欠損値を推定する (見てないけど、こいつ、見そうだな、というのを補完する)、2) そもそもコンテンツを圧縮して「似たコンテンツの潜在要因」にまとめる(ある種の主成分分析に近い処理)、という2つの方法があり、 Matrix Factorization は2をすることで、結果的には1をするアルゴリズムです。

具体的なアルゴリズムの概要

例えば user_num 人 x item_num シリーズによる行列があり、これを R とします

user_id \ item_id 0 1 2 3 ... item_num - 1
0 1 1
1 1
2 1 1
...
user_num - 1 1 1 1

こんな感じで、 user_id = i が item_id = j を視聴していた場合に 1 を立てると、欠損値だらけの行列となります。

で、このままだとシンプルな協調フィルタリングをしてもなかなか難しいわけです。 ここで Matrix Factorization では以下のようにします。

user_num、item_num よりも小さい値 k_num というものをおいて

  • 行列 U: user_num * k_num
  • 行列 V: item_num * k_num

という2つの行列を用意し、 UVTという行列演算が R に近くなるように学習します (学習方法はなんでもいいんですがまあ SGD とか) 実際のアルゴリズムWikipedia とか見てください

私は横着して こちら の実装を参考にしました (若干逐次処理のところが違いますのでご注意ください)

BigQuery Scripting

Matrix Factorization ですが、実は BigQuery ML ではすでに公開されていて、利用することができます。ただし、札束で殴る flex slot という料金体系に切り替える必要があります。

料金体系をいじらずになんとかしたい!と考えたところ、 BigQuery には、 Scripting という、 LOOP とか IF などの制御構文、 DECLARE による変数定義などが使える機能があります。

これを使えば、 SGD とか実装できそうだな、と思い当たってコリコリと実装してみたのが最後に記載した SQL です。

なお BigQuery Scripting での実装における/実適用上のハマりどころとしては以下のとおりです

  • 複数のクエリを実行したり、 SQL の結果を変数に格納できるが、 CTE (WITH 句で使える Common Table Expression) は、同一のクエリ内でしか使えないので、クエリをまたぎたいときは SET によって変数に格納する必要がある
  • 変数は BigQuery の 1 レコードの制限 100MB に制限される (ので、結論から言うと大規模データの学習には今回の SQL は使えない)
    • お金をかけられるなら、毎度 Temp Table に格納する、とかで解決可能です
  • 変数は通常の行列としては保存できないので ARRAY<> として保存する必要がある

実行結果

max_step = 10 とした結果が下記の通りです

実行結果
実行結果

actual_value が推定された値、expect_value が実際のスパースな行列です。

学習完了とは言えませんが、 expect_value が 0 に対する actual_value が 0 に近づき、 1 の場合は大きな値になっているのが見て取れます。

結論

  • SQL だけで Matrix Factorization (というか SGD 全般) が実装できる
  • BigQuery Scripting は Turing 完全
  • flex slot を買って、 Temp Table を Step ごとに読み込むという富豪のような演算をしたい
    • flex slot 買ったら、 BigQuery ML の Matrix Factorization 使え!

最後に

TVerでは、地上波テレビの視聴データ・ TVer サービスにおける視聴データ、お気に入り・いいね・あとで見るなどのユーザデータなどを集約するデータエンジニア、分析を行うアナリスト、分析結果を元に施策立案や経営・サービス運営の指針を提示するデータサイエンティストを絶賛採用中です。

データいっぱいあるのでホックリ返す山しか有りません。

もしご興味があれば一度カジュアルにお話をするところからでもウェルカムです。ぜひご連絡ください、お待ちしております。

www.wantedly.com

出来上がった SQL

-- ユーザ数
DECLARE user_num INT64 DEFAULT 4;
-- アイテム数
DECLARE item_num INT64 DEFAULT 5;
-- 分解数
DECLARE d_num INT64 DEFAULT 3;
-- サンプルデータ作成におけるスパース度合い
DECLARE fill_rate FLOAT64 DEFAULT 0.3;
-- 学習率
DECLARE learning_ratio FLOAT64 DEFAULT 0.3;
-- モーメント
DECLARE momentum FLOAT64 DEFAULT 0.1;
-- 最大学習ステップ数
DECLARE max_step INT64 DEFAULT 10;
-- 誤差しきい値 (今回は省略)
DECLARE mean_square_error FLOAT64 DEFAULT 0.1;

-- 状態変数
DECLARE step INT64 DEFAULT 0;
DECLARE mse FLOAT64 DEFAULT 0;

-- 変数に格納したいのは、 STEP のたびに読み込みコストを発生させたくないため
-- ステップごとにデータソースを読み込むコストを支払えるなら、直接テーブルを参照すればいいと思います
DECLARE data ARRAY<STRUCT<i INT64, j INT64, value FLOAT64>> DEFAULT (
    SELECT
        ARRAY_AGG(STRUCT(user_id AS i, item_id AS j, IF(RAND() < fill_rate, 1.0, 0.0) AS value))
    FROM
        UNNEST(GENERATE_ARRAY(0, user_num - 1)) AS user_id
    CROSS JOIN
        UNNEST(GENERATE_ARRAY(0, item_num - 1)) AS item_id
);

-- U/Vの両方を更新するときの効率を上げるための一次領域
DECLARE df ARRAY<STRUCT<i INT64, j INT64, df FLOAT64>> DEFAULT [];
-- チューニングされるのはこの2つ
DECLARE m_U ARRAY<STRUCT<id INT64, u FLOAT64>> DEFAULT (SELECT ARRAY_AGG(STRUCT(id, RAND() AS u)) FROM UNNEST(GENERATE_ARRAY(0, user_num * d_num - 1)) AS id);
DECLARE m_V ARRAY<STRUCT<id INT64, v FLOAT64>> DEFAULT (SELECT ARRAY_AGG(STRUCT(id, RAND() AS v)) FROM UNNEST(GENERATE_ARRAY(0, item_num * d_num - 1)) AS id);


BEGIN
LOOP
    SET step = step + 1;
    IF step > max_step THEN 
        BREAK;
    END IF;
    
    SET df = (
        WITH 
        -- 行列演算
        uv AS (
            SELECT 
                CAST(FLOOR(u.id / d_num) AS INT64) AS i,
                CAST(FLOOR(v.id / d_num) AS INT64) AS j,
                SUM(u.u * v.v) AS uv
            FROM
                UNNEST(m_U) AS u
            INNER JOIN 
                UNNEST(m_V) AS v
            ON
                MOD(u.id, d_num) = MOD(v.id, d_num)
            GROUP BY 
                i,
                j
        )
        -- 差分を配列で保存
        SELECT 
            ARRAY_AGG(STRUCT(i, j, v))
        FROM (
            SELECT
                r.i,
                r.j,
                r.value - uv.uv AS df
            FROM
                UNNEST(data) AS r
            INNER JOIN 
                uv
            ON
                r.i = uv.i
                AND r.j = uv.j
        )
    );
    
    -- U/V 行列の更新
    SET m_U = (
        SELECT 
            ARRAY_AGG(STRUCT(
                i * d_num + k AS id,
                next_u AS u
            ) ORDER BY i * d_num + k)
        FROM (
            SELECT
                CAST(FLOOR(u.id / d_num) AS INT64) AS i,
                MOD(u.id, d_num) AS k,
                u.u + SUM(learning_ratio * df.df * v.v) - momentum * u.u AS next_u
            FROM
                UNNEST(m_U) AS u
            INNER JOIN
                UNNEST(m_V) AS v
            ON
                MOD(u.id, d_num) = MOD(v.id, d_num)
            INNER JOIN 
                UNNEST(df) AS df
            ON
                df.i = CAST(FLOOR(u.id / d_num) AS INT64)
                AND df.j = CAST(FLOOR(v.id / d_num) AS INT64)
            GROUP BY 
                i,
                k,
                u.u
        )
    );
    
    SET m_V = (
        SELECT 
            ARRAY_AGG(STRUCT(
                j * d_num + k AS id,
                next_v AS v
            ) ORDER BY j * d_num + k)
        FROM (
            SELECT
                CAST(FLOOR(v.id / d_num) AS INT64) AS j,
                MOD(u.id, d_num) AS k,
                v.v + SUM(learning_ratio * df.df * u.u) - momentum * v.v AS next_v
            FROM
                UNNEST(m_U) AS u
            INNER JOIN
                UNNEST(m_V) AS v
            ON
                MOD(u.id, d_num) = MOD(v.id, d_num)
            INNER JOIN 
                UNNEST(df) AS df
            ON
                df.i = CAST(FLOOR(u.id / d_num) AS INT64)
                AND df.j = CAST(FLOOR(v.id / d_num) AS INT64)
            GROUP BY 
                j,
                k,
                v.v
        )
    );
    -- データとの平均自乗誤差を算出して、しきい値以下だったら学習終了
    -- 今回は割愛
END LOOP;

-- 学習結果の表示
SELECT 
    uv.*,
    r.value
FROM (
    SELECT 
        CAST(FLOOR(u.id / d_num) AS INT64) AS i,
        CAST(FLOOR(v.id / d_num) AS INT64) AS j,
        SUM(u.u * v.v) AS next_r,
    FROM
        UNNEST(m_U) AS u
    INNER JOIN 
        UNNEST(m_V) AS v
    ON
        MOD(u.id, d_num) = MOD(v.id, d_num)
    GROUP BY 
        i,
        j
) AS uv
INNER JOIN 
    UNNEST(data) AS r
ON
    uv.i = r.i
    AND uv.j = r.j
;
END;