こんにちは データを眺めている森藤です
TVer ではたくさんのデータがあって、どこをほっくり返してもなんか有用な知見が出てくるので毎日楽しいです。
現状
さて、現状はまだまだこれからなのですが、レコメンドのアルゴリズムや実装について考えるのも仕事の一つです。 レコメンドといえば、いろいろな手法やベースとなる考え方、タイミングなど様々なものがありますが、そのうちの一つとして、協調フィルタリングというものがあります。 これは端的に言うと、「これを見ている人はこれも見ています」的なやつです。
ただ、協調フィルタリングは実世界において適用が難しく、TVer でも多分にもれずそのまま適用するには課題があります。 大きな課題が「ユーザは限られたコンテンツ(エピソード)しか閲覧しないため、これを見た人はこれも見ています」と適用することが難しい、というものです
user_id | series_id |
---|---|
1 | 3 |
1 | 12 |
2 | 9 |
2 | 34 |
3 | 61 |
3 | 123 |
3 | 612 |
みたいなデータが有ったとして、よし、 user_id=2 にコンテンツ 72 を推薦してみよう!ってならないですよね
対策
スパース(疎)なデータに対してなんとか協調フィルタリングをするために考えられたアルゴリズムとして、 Matrix Factorization というアルゴリズムがあります。 端的に言うと、先に挙げたスパースな行列をなんとかして密な行列にする方法と言えます。
スパースな行列を密な行列にする方法として、1) 欠損値を推定する (見てないけど、こいつ、見そうだな、というのを補完する)、2) そもそもコンテンツを圧縮して「似たコンテンツの潜在要因」にまとめる(ある種の主成分分析に近い処理)、という2つの方法があり、 Matrix Factorization は2をすることで、結果的には1をするアルゴリズムです。
具体的なアルゴリズムの概要
例えば user_num
人 x item_num
シリーズによる行列があり、これを R
とします
user_id \ item_id | 0 | 1 | 2 | 3 | ... | item_num - 1 |
---|---|---|---|---|---|---|
0 | 1 | 1 | ||||
1 | 1 | |||||
2 | 1 | 1 | ||||
... | ||||||
user_num - 1 | 1 | 1 | 1 |
こんな感じで、 user_id = i が item_id = j を視聴していた場合に 1 を立てると、欠損値だらけの行列となります。
で、このままだとシンプルな協調フィルタリングをしてもなかなか難しいわけです。 ここで Matrix Factorization では以下のようにします。
user_num、item_num よりも小さい値 k_num というものをおいて
- 行列 U: user_num * k_num
- 行列 V: item_num * k_num
という2つの行列を用意し、 UVTという行列演算が R に近くなるように学習します (学習方法はなんでもいいんですがまあ SGD とか) 実際のアルゴリズムは Wikipedia とか見てください
私は横着して こちら の実装を参考にしました (若干逐次処理のところが違いますのでご注意ください)
BigQuery Scripting
Matrix Factorization ですが、実は BigQuery ML ではすでに公開されていて、利用することができます。ただし、札束で殴る flex slot という料金体系に切り替える必要があります。
料金体系をいじらずになんとかしたい!と考えたところ、 BigQuery には、 Scripting という、 LOOP
とか IF
などの制御構文、 DECLARE
による変数定義などが使える機能があります。
これを使えば、 SGD とか実装できそうだな、と思い当たってコリコリと実装してみたのが最後に記載した SQL です。
なお BigQuery Scripting での実装における/実適用上のハマりどころとしては以下のとおりです
- 複数のクエリを実行したり、 SQL の結果を変数に格納できるが、 CTE (WITH 句で使える Common Table Expression) は、同一のクエリ内でしか使えないので、クエリをまたぎたいときは SET によって変数に格納する必要がある
- 変数は BigQuery の 1 レコードの制限 100MB に制限される (ので、結論から言うと大規模データの学習には今回の SQL は使えない)
- お金をかけられるなら、毎度 Temp Table に格納する、とかで解決可能です
- 変数は通常の行列としては保存できないので
ARRAY<>
として保存する必要がある
実行結果
max_step = 10 とした結果が下記の通りです
actual_value が推定された値、expect_value が実際のスパースな行列です。
学習完了とは言えませんが、 expect_value が 0 に対する actual_value が 0 に近づき、 1 の場合は大きな値になっているのが見て取れます。
結論
- SQL だけで Matrix Factorization (というか SGD 全般) が実装できる
- BigQuery Scripting は Turing 完全
- flex slot を買って、 Temp Table を Step ごとに読み込むという富豪のような演算をしたい
flex slot 買ったら、 BigQuery ML の Matrix Factorization 使え!
最後に
TVerでは、地上波テレビの視聴データ・ TVer サービスにおける視聴データ、お気に入り・いいね・あとで見るなどのユーザデータなどを集約するデータエンジニア、分析を行うアナリスト、分析結果を元に施策立案や経営・サービス運営の指針を提示するデータサイエンティストを絶賛採用中です。
データいっぱいあるのでホックリ返す山しか有りません。
もしご興味があれば一度カジュアルにお話をするところからでもウェルカムです。ぜひご連絡ください、お待ちしております。
出来上がった SQL
-- ユーザ数 DECLARE user_num INT64 DEFAULT 4; -- アイテム数 DECLARE item_num INT64 DEFAULT 5; -- 分解数 DECLARE d_num INT64 DEFAULT 3; -- サンプルデータ作成におけるスパース度合い DECLARE fill_rate FLOAT64 DEFAULT 0.3; -- 学習率 DECLARE learning_ratio FLOAT64 DEFAULT 0.3; -- モーメント DECLARE momentum FLOAT64 DEFAULT 0.1; -- 最大学習ステップ数 DECLARE max_step INT64 DEFAULT 10; -- 誤差しきい値 (今回は省略) DECLARE mean_square_error FLOAT64 DEFAULT 0.1; -- 状態変数 DECLARE step INT64 DEFAULT 0; DECLARE mse FLOAT64 DEFAULT 0; -- 変数に格納したいのは、 STEP のたびに読み込みコストを発生させたくないため -- ステップごとにデータソースを読み込むコストを支払えるなら、直接テーブルを参照すればいいと思います DECLARE data ARRAY<STRUCT<i INT64, j INT64, value FLOAT64>> DEFAULT ( SELECT ARRAY_AGG(STRUCT(user_id AS i, item_id AS j, IF(RAND() < fill_rate, 1.0, 0.0) AS value)) FROM UNNEST(GENERATE_ARRAY(0, user_num - 1)) AS user_id CROSS JOIN UNNEST(GENERATE_ARRAY(0, item_num - 1)) AS item_id ); -- U/Vの両方を更新するときの効率を上げるための一次領域 DECLARE df ARRAY<STRUCT<i INT64, j INT64, df FLOAT64>> DEFAULT []; -- チューニングされるのはこの2つ DECLARE m_U ARRAY<STRUCT<id INT64, u FLOAT64>> DEFAULT (SELECT ARRAY_AGG(STRUCT(id, RAND() AS u)) FROM UNNEST(GENERATE_ARRAY(0, user_num * d_num - 1)) AS id); DECLARE m_V ARRAY<STRUCT<id INT64, v FLOAT64>> DEFAULT (SELECT ARRAY_AGG(STRUCT(id, RAND() AS v)) FROM UNNEST(GENERATE_ARRAY(0, item_num * d_num - 1)) AS id); BEGIN LOOP SET step = step + 1; IF step > max_step THEN BREAK; END IF; SET df = ( WITH -- 行列演算 uv AS ( SELECT CAST(FLOOR(u.id / d_num) AS INT64) AS i, CAST(FLOOR(v.id / d_num) AS INT64) AS j, SUM(u.u * v.v) AS uv FROM UNNEST(m_U) AS u INNER JOIN UNNEST(m_V) AS v ON MOD(u.id, d_num) = MOD(v.id, d_num) GROUP BY i, j ) -- 差分を配列で保存 SELECT ARRAY_AGG(STRUCT(i, j, v)) FROM ( SELECT r.i, r.j, r.value - uv.uv AS df FROM UNNEST(data) AS r INNER JOIN uv ON r.i = uv.i AND r.j = uv.j ) ); -- U/V 行列の更新 SET m_U = ( SELECT ARRAY_AGG(STRUCT( i * d_num + k AS id, next_u AS u ) ORDER BY i * d_num + k) FROM ( SELECT CAST(FLOOR(u.id / d_num) AS INT64) AS i, MOD(u.id, d_num) AS k, u.u + SUM(learning_ratio * df.df * v.v) - momentum * u.u AS next_u FROM UNNEST(m_U) AS u INNER JOIN UNNEST(m_V) AS v ON MOD(u.id, d_num) = MOD(v.id, d_num) INNER JOIN UNNEST(df) AS df ON df.i = CAST(FLOOR(u.id / d_num) AS INT64) AND df.j = CAST(FLOOR(v.id / d_num) AS INT64) GROUP BY i, k, u.u ) ); SET m_V = ( SELECT ARRAY_AGG(STRUCT( j * d_num + k AS id, next_v AS v ) ORDER BY j * d_num + k) FROM ( SELECT CAST(FLOOR(v.id / d_num) AS INT64) AS j, MOD(u.id, d_num) AS k, v.v + SUM(learning_ratio * df.df * u.u) - momentum * v.v AS next_v FROM UNNEST(m_U) AS u INNER JOIN UNNEST(m_V) AS v ON MOD(u.id, d_num) = MOD(v.id, d_num) INNER JOIN UNNEST(df) AS df ON df.i = CAST(FLOOR(u.id / d_num) AS INT64) AND df.j = CAST(FLOOR(v.id / d_num) AS INT64) GROUP BY j, k, v.v ) ); -- データとの平均自乗誤差を算出して、しきい値以下だったら学習終了 -- 今回は割愛 END LOOP; -- 学習結果の表示 SELECT uv.*, r.value FROM ( SELECT CAST(FLOOR(u.id / d_num) AS INT64) AS i, CAST(FLOOR(v.id / d_num) AS INT64) AS j, SUM(u.u * v.v) AS next_r, FROM UNNEST(m_U) AS u INNER JOIN UNNEST(m_V) AS v ON MOD(u.id, d_num) = MOD(v.id, d_num) GROUP BY i, j ) AS uv INNER JOIN UNNEST(data) AS r ON uv.i = r.i AND uv.j = r.j ; END;