エンジニアとしての機械学習との付き合いかた
Word Mover's Distance: word2vecの文書間距離への応用
word2vecによって得られる語の分散表現を用いて文書間の距離(非類似度)を計算する手法についての論文を読みました。 せっかくなので解説してみます。
TL;DR
この論文では Word Mover’s Distance(WMD) という文書間距離の計算手法を提案しています。 提案手法は手っ取り早く言うと次のようなものです。
- 文書A, B間の距離 =
A, Bの語同士を対応付けることでAをBに変換するとき、
対応付けのコストが最も低い場合のコストの総和 - 語xを語yに対応付けるコスト =
x, yの分散表現ベクトルの距離
例えば次の文書 と または との距離を考えてみます。
ストップワードを除いた上で、各文書の意味の似ている語同士を対応付けます。 さらに各対応付けのコストを語同士の分散表現の距離として算出し、その総和を求めます。
下の図は語の対応付けとそのコストを矢印とそれに付与された値で示しています。
(Figure 2. (Top) from [1])
このコストの総和を文書間の距離と考える、というのがWMDのアイデアとなります。 図では に対する距離が のほうが よりも小さくなっています。
論文では文書の近傍を求めるタスクにおいて、多くのデータセットで提案手法により良好な結果が得られたとしています。
(Figure 3. from [1])
特に twitter のような語数の比較的短い文書からなるデータセットでは、従来手法より良好な結果を残しているようです。
以下では主にWMDの計算アルゴリズムについて、もう少し詳しく解説していきます。
従来の文書類似度計算における課題
単純な文書類似度としては Bag-of-Words (BoW) 表現のコサイン値がよく用いられます。 しかし文書同士の共通語が少ない場合、BoWでは文書間の意味的な類似度を測ることが困難です。
例えば上記の 文書 , の単純なBoW表現は次のようになりますが、これらは共通語を持たないためベクトル同士のコサイン値はゼロとなります。
従来ではLSIやLDAなどの次元削減手法を利用してこの問題に対処することが試みられていましたが、精度面でそれほど大きな改善を実現するには至っていませんでした。
最近ではニューラルネットの発達に伴い、文書自体の分散表現を計算する手法も提案されています。 これらの手法は高い精度を実現できると主張されていますが、NN特有のハイパーパラメータチューニングが必要となる場合もあります。
WMDの考え方
WMDは上記の問題を別のアプローチで解決しようとしています。
上記のとおり文書 , には共通語こそありませんが、人間の目には media と press のようなそれぞれの語同士は非常に近い意味をもつように見えます。 word2vec を用いれば、このような語の意味類似度を捉えた分散表現ベクトルが得られます。
WMDではこの性質を利用して文書間の距離を求めます。 その考え方は大雑把に言えば、 文書Aの語を類似する(=分散表現間の距離が小さい)語で置き換えて文書Bに変換できるならば、文書A, Bの類似度は大きい(=距離が小さい) というようなものです。
このような考え方に基づいて、WMDでは文書A,B間の距離を、A,Bの語同士を対応付けて文書Aを文書Bに変換するのにかかる「コスト」として計算します。
語の対応付けのコスト
まずある語を別の語に対応付ける場合のコストについて考えます。 意味の似ている語同士の対応付けはコストが低い、という特性を満たすよう、語同士の分散表現のL2距離をコストとします。
すなわち、語 を語 に対応付けるコスト を次のように定義します。
文書の変換コスト
次に、語の対応付けに基づく置き換えにより、ある文書を別の文書に変換することを考えてみます。 この置き換えにおいては文書中の意味語(ストップワード以外の語)のみに注目し、語順などは考慮しないものとします。
先程の を に変換する場合、各語を次のように対応付ければよさそうです。
この変換のコストは、単純に考えれば各対応付けのコストの総和とするのが良さそうです。
イメージとしては、この総和が小さいほど文書は意味的に類似すると言えそうです。
問題の一般化
これまでの例では意味語の数が同じで、各語も意味的にほぼ1対1に対応づけることができました。 しかし次のような文書同士では、単純に語を置換することでは文書を変換できません。
任意の2文書間の距離を一貫した基準のもとで計算するには、単純な置き換えによる変換ではなくより一般化された方法を考える必要があります。
そこでまず文書の特徴表現として語の頻度分布を用います。 例えば , の特徴ベクトルは次のようになります。
ここでベクトルの各成分は、文書中の語の正規化された出現頻度(TF)となっています。
さらに各語に対応する成分を変換先の文書の複数の語に分配することで、語の頻度分布全体を変換先の分布に移すことを考えます。 この変換を次のような行列 で表します。
例えばこの行列の7行目は、 における語 speak の重み1/3のうち、それぞれ1/4と1/12をそれぞれ の語 greets と press に移すことを表します。
この対応付全体のコストは、それぞれの語同士の対応コストに分配量に応じた重みかけた値の総和として計算できます。
ここで、上の行列はあくまで考えられる変換のひとつでしかありません。 WMDではそのような変換の中でコストの総和が最小となるものを求め、このコストの総和が文書間の距離とします。
最適化問題としての定式化と解法
上記の内容を最適化問題として定式化すると次のようになります。
行列 の各行ベクトルは、元の文書中の各語に対応します。 その各成分は、元の文書の語分布におけるその語の成分を、変換先の文書の各語にどのように分配するかを表します。 よって行の成分の総和は、元の文書の語分布における各語の成分と一致します。
同様に行列の各列ベクトルの成分の総和は、変換先の文書の語分布に各語の成分と一致します。
コストが最小となる変換のコストを文書間の距離とするので、上式を制約条件とする次の最適化問題を解くことができれば文書間の距離が求まります。
実は上の最適化問題は Earth Mover Distance (EMD) と呼ばれる最適化問題の特殊な形式になっています。 EMDを求めるアルゴリズムは既に知られているので、それを用いればWMDを計算できます。
この記事ではEMDについて詳しく解説することはしませんが、以下の記事が詳しいですので興味のある方はご参照ください。
WMDの長所と短所
WMDの長所としては次のような点が挙げられます。
- 精度が高い
- 直感的にわかりやすい
- ハイパーパラメータが存在しないためチューニングが不要
逆に短所としては次のような点が挙げられます。
- 計算量が大きい(類似度計算1回で語数 に対して [tex: O(p3 \log p)] )
このような性質から、単なるBoWの類似度では望ましい精度が得られず、かつ計算量が多くても許容できるようなケースでWMDが有効であると考えられます。
なお、ある文書に対する近傍文書を上位k件まで取得できればよいようなケース(文書検索など)では、WMDよりも計算量の小さい下界を利用し、計算量を大幅に抑えることができます。 今回はその詳細については割愛しますが、論文には精度の変動の評価も含めて詳しく書かれているので、知りたい方は論文を読まれることをお薦めします。
まとめ
語の分散表現を文書類似度や文書特徴量に応用する手法は多いですが、ハイパーパラメータチューニングなしに高い精度を実現できるWMDは特に魅力的に写ります。 また最近ではWMDを教師あり学習に拡張した論文なども出ているようです。
機会があれば実問題で試してみたいものです。
追記
2017/04/05
gensimを使うと学習済みモデルを用いて簡単にWMDを計算できるようです。
今年触った技術を雑多に
なんとなく。
AWS
GCPはちょっと触ってたけど、AWSは間違ってお金かかってしまいそうという謎の不安があって社会に出るまで敬遠していた。 EC2, ELB, S3, Beanstalkのような基本的なサービスに加えてRedshiftとEMRをかなり触ることになった。 覚えたり調べたりすることが永遠に尽きない。
Ruby
今年一番書いた言語。 もともとPythonに慣れていたので、スコープに由来のわからない変数やメソッドが現れるのに未だ違和感を覚えてしまう。 ライブラリが豊富すぎて技術選択で迷いがちなPythonに比べ、Bundler, Rake, RSpecなどのほぼ必須のエコシステムに乗っておけばそれほど道を踏み外さないのは楽だった。
Python
意外と自分で書くことは少なく、どちらかというとレビューする事が多かった気がする。 自分にとって書いてて一番しっくりくるのは相変わらず。 ちょっとしたスクリプトなどはまずPythonで書いてる。
Scala
今年後半から使い始めた。 静的型付の言語は久々だったけど、以前経験したC, C#, Javaに比べて圧倒的に馴染みやすかった。 心地よい学習曲線に乗って学べる印象。
ES2016
ほんの少しだけ触った。 大したことをしてないので大した感想はない。 JavaScriptって今こんな感じなんだ、ふーん。
Ansible
ちょっとした環境の構成管理ツールとして使った。 最初に覚えなければいけない概念が少し多かったが、公式のベストプラクティスを参考にするとうまく軌道に乗れた気がする。
Fluentd
使うプラグインのコードを読む機会が多かった。 よく使われてメンテされてるプラグインとそうでないものでの品質の落差が激しいのを実感した。
Spark
今年触ったものの中で一番面白かった。 分散処理でスレッドセーフ性が問題になったり、遅延評価が影響してちょっとしたコードの変更が予期しない結果を産んだりといろいろ悩んだけど、その分学びは多かった。 来年はSpark StreamingやSpark MLも本格的に使ってみたい。
Apache Sparkの3つのAPI: RDD, DataFrameからDatasetへ
はじめに
Livesense Advent Calendar 2016の11日目の記事です。
昨今ではAmazon Elastic Mapreduce (EMR)などのマネージドサービスの登場により、分散データ処理基盤を構築・運用するハードルは劇的に下がっています。
ソフトウェアの選択肢も広がり、特にApache Sparkはオンメモリ処理を基本とした高速で汎用的なフレームワークとして人気があります。
2016年の初頭には新しいDataset APIを搭載したSpark 2.0がリリースされました。
Spark 1.0系では主にRDDとDataFrameという2つのAPIを処理の特性に応じて使い分けていましたが、Spark 2.0では多くの処理をDataset APIひとつで実現できるようになっています。
従来のRDDやDataFrameを用いた処理は、大抵の場合そのままか多少工夫することでDatasetでも実現できます。
しかしWeb上にはまだDatasetに関する情報は多くなく、Sparkについて検索するとRDDやDataFrameについて書かれた記事の方がよく見つかります。
この記事では、RDDやDataFrameに関するノウハウをDatasetに応用できるよう、従来のAPIとDatasetを比較しながら紹介していきます。
私自身もSparkについてはまだまだ勉強中のため、内容に間違いがあればぜひコメント等でご指摘ください。
Sparkの基本的な仕組み
Sparkでは多数のレコードからなるデータを幾つかのパーティションに分割し、複数のコンピューティングノードに分配して展開します。
その上でパーティション毎に独立に適用できる処理はノード内で完結し、複数のパーティションのレコードを参照・集約する処理はその都度ノード間でデータの再分割・再分配を行うことでデータを処理していきます。
Sparkの仕組みはHadoopのMapReduceと似ていますが、Hadoopが毎回のステップ終了時にHDFSにデータを書き出すのに対し、Sparkではデータをメモリ上に展開したまま複数のステップを連続して実行できるのが特徴です。
この仕組みにより、ステップ数の多い処理では一般的にHadoopよりSparkのほうがパフォーマンスが高いとされています。
データコレクションの操作のためのAPI
Hadoopのプログラミングモデルは、入力データから出力データへの変換をMapReduceとして記述するというものでした。
MapおよびReduceは単一レコードあるいは同一キーでグループ化されたレコードに対する処理であり、データ全体をひとつのまとまりとして扱うための抽象化層は別に用意する必要がありました。
対してSparkでは、各ノードに分散しているデータをプログラムから単一のオブジェクトとして操作できるAPIを提供しています。
これにより、複数の中間的な状態を経るような複雑な処理であっても、より抽象化された形で実装できるようになっています。
Sparkにおいてデータのまとまりを扱うAPIとしては、RDD, DataFrame, Datasetの3つが存在します。
当初からSparkには最も基本的なAPIであるRDDが用意されており、Spark 1.3ではDataFrameが追加されました。
Spark1.6ではこれらに加えてDatasetが試験的に導入されました。
さらにSpark2.0ではDatasetが正式なAPIとなり、DataFrameはDatasetに統合されました。
以降ではこれらのAPIの特徴を順を追って紹介していきます。
言語はSparkのネイティブ言語であるScalaを用いています。
1. RDD - ネイティブなオブジェクトのコレクション
RDDではデータをネイティブな(JVM)オブジェクトのコレクションとして扱います。
プログラムのイメージは次のようになります。
class Person(name: String, age: Int, country: String) { def isAdult = age >= 20 } val rdd = sc.textFile("people.csv") // name,age,country .map(s => s.split(",")) .map(t => new Person(t(0), t(1).toInt, t(2))) val result = rdd .filter(_.isAdult) .groupBy(_.country) .countByKey print(result) /* Map(JP-> 50, US -> 20) */
RDDの型は RDD[T]
であり、 T
は任意の型をとることができます。
RDD[T]
には次のようなメソッドが定義されています。
map[U](f: (T) ⇒ U): RDD[U]
reduce(f: (T, T) ⇒ T): T
groupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])]
このように、RDDに対する処理でにはレコードの型を与えることができます。
すなわちRDDに対する処理は型安全であり、コンパイル時の型検査によって実行時に型エラーが発生しないことが保証できます。
これは複雑データ変換処理などにおいて、データ形式の不一致によるエラーを防ぐ上で大変便利です。
その反面、それぞれのレコードに対応するオブジェクトは処理のたびにシリアライズ/デシリアライズされるため、オーバーヘッドは必然的に大きくなってしまいます。
特に他のオブジェクトを参照するオブジェクトをレコードとするような場合、シリアライズの際に参照先のオブジェクトが一緒に複製されてしまい、メモリを大量に消費してしまうこともあります。
2. DataFrame - 基本的な型の値からなるテーブル
DataFrameではデータを基本的な型の値からなるテーブルとして扱います。
DataFrameの各レコードは1つ以上の名前付きカラムで構成され、各カラムには Int
や String
などのプリミティブ型および Array
や Map
などの原始的なコレクション型のみを用いることができます。
プログラムのイメージは次のようになります。
// Spark 1.6.0 import org.apache.spark.sql.SQLContext val sqc = new SQLContext(sc) // {"name": name, "age": age, "country": country} val df = sqc.read.json("people.jsonl") val result = df .filter(df("age") >= 20) .groupBy(df("country")).count result.show /* +-----+--------+ |value|count(1)| +-----+--------+ | JP| 50| | US| 20| +-----+--------+ */
Spark SQLを用いると、DataFrameに対する処理をSQLとして記述することもできます。
df.registerTempTable("people") df.sql(""" SELECT country, COUNT(*) FROM people WHERE age >= 20 GROUP BY country; """).show
DataFrameに含まれる値は専用のエンコーダによって最適化されるため、通常のオブジェクトに比べてシリアライズ/デシリアライズのオーバーヘッドやメモリ使用量は大幅に小さくなっています。
またDataFrameでは、一連の処理全体の実行計画がオプティマイザによって予め最適化されます。
これらの違いにより、RDDとDataFrameで同等の処理を実行した場合、DataFrameが数倍から数十倍のパフォーマンスを発揮することがあります。
一方、DataFrameはカラム名や各カラムの型についての情報を型で指定できません。
DataFrameのメソッドの型定義は次のようになっています。
filter(condition: Column): DataFrame
agg(expr: Column, exprs: Column*): DataFrame
groupBy(cols: Column*): GroupedData
Column
は df("country")
や df("age") >= 20
のように作成できますが、これらの実際の値の型は実行時にしかわかりません。
またDataFrameの型は DataFrame
であり、 RDD[T]
のようにレコードの型パラメータを持たないため、カラムの名前や型などの情報をDataFrameの型情報から得ることができません。
このため、存在しないカラムへの参照や型の異なる関数の適用がある場合、それらをコンパイル時に検出できず、実行時エラーが発生してしまいます。
加えてDataFrameが扱える型や関数は限定されており、RDDのようにユーザ定義の型やそれに対する任意の関数を扱うことは難しくなっています。
RDD v.s. DataFrame
これまで見てきたように、RDDとDataFrameには次のように一長一短があります。
このような違いのため、RDDとDataFrameは用途に応じて使い分ける必要がありました。
例えば時間がかかり繰り返し実行されるバッチ処理では型エラーを未然に防ぐためにRDDを用い、アドホックな分析クエリでは応答速度とクエリの書きやすさからDataFrameを用いる、といった要領です。
3. Dataset - RDDとDataFrameの長所を併せ持つコレクション
DatasetはいわばRDDとDataFrameの良い所どりを目指したAPIです。
プログラムのイメージは次のようになります。
// Spark 2.0.0 import org.apache.spark.sql.SQLContext val sqc = new SQLContext(sc) // {"name": name, "age": age, "country": country} val ds = sqc.read.json("people.jsonl") val result = ds .filter(ds("age") >= 20) .groupBy(ds("country")) .count result.show /* +-----+--------+ |value|count(1)| +-----+--------+ | JP| 50| | US| 20| +-----+--------+ */
気づかれたかもしれませんが、Spark 2.0のDatasetはSpark 1.6のDataFrameと全く同じように扱うことができます。
当然Datasetに対してもDataFrameと同様にSQLを発行することが可能です。
ここで上のDatasetのデータ型に適合するcase classを作成し、 as
によってDatasetに適用してみます。
すると、Datasetに対してRDDと同様のmapやreduceなどを実行することができます。
case class Person(name: String, age: Long, country: String) { def isAdult = age >= 20 } val peopleDs = ds.as[Person] val result = peopleDs .filter(_.isAdult) .groupByKey(_.country) .count result.show /* +-----+--------+ |value|count(1)| +-----+--------+ | JP| 50| | US| 20| +-----+--------+ */
これは以下のような仕組みによって実現されています。
まず、Datasetの型は Dataset[T]
であり、DataFrameは Dataset[Row]
と等価です。
Row
はカラムの名前や実際の値の型を自身の型情報としては持ちませんが、Row
型のオブジェクトはそのスキーマを属性として持つことができます。
Row
のコレクションである Dataset[Row]
も同様にスキーマを持つことができ、特にJSONなどから読み出されたDatasetには自動的にスキーマが付与されます。
// {"name": name, "age": age, "country": country} val ds = sqc.read.json("people.jsonl") ds.printSchema /* root |-- age: long (nullable = true) |-- country: string (nullable = true) |-- name: string (nullable = true) */
ここで Dataset[Row]
のスキーマに適合する型 U
を .as[U]
として適用すると、 Dataset[U]
型のDatasetが得られます。
scala > val peopleDs = ds.as[Person]
res: Dataset[Person] = ...
.as[U]
の U
に元のDatasetのスキーマと同じカラム名・型のフィールドを持つクラスを指定した場合、Rowの各カラムが同名のフィールドにマッピングされます。
特にScalaではデータモデルをcase classで定義することが多いため、通常はUとしてcase classを用います。
as
によって得られた Dataset[U]
型には、 RDD[U]
と同様の型安全な操作を適用することができます。
さらに U
の各フィールドの型が、DataFrameで扱えたような基本的な型である場合、レコードの内部表現はDataFrameと同様に最適化されるため、処理のオーバーヘッドはRDDに比べて大幅に小さくなります。
このようにDatasetはRDDの型安全性とDataFrameのパフォーマンスを持ち合わせており、これまでRDDおよびDataFrameを使い分けていたようなケースを統一的なAPIで記述することができます。
RDD, DataFrameからDatasetへの書き換え
ここまでSparkの3種類のAPIについて紹介しました。
ここからは、RDDまたはDataFrameを用いたコードをDatasetで書き直す際のポイントを簡単に紹介します。
これにより、既存のコードやノウハウをDatasetに応用しやすくなると思います。
DataFrameからDatasetへ
まずDataFrameですが、前述のようにDataseにはDataFrameとほぼ同じ処理が適用可能です。
加えてSpark 2.0では DataFrame
が Dataset[Row]
へのエイリアスとなっており、多くの場合は既存のコードを書き換えずにそのまま動作させることができます。
RDDからDatasetへ
次にRDDですが、こちらは多少の注意が必要です。
データの入力については、 SparkContext.textFile
の代わりに SQLCotext.text
を使用できます。
map
や filter
などの処理の型定義は RDD[T]
と Dataset[T]
で同じですが、集約処理については少し違いがあります。
例えば RDD[T]
の groupBy
は groupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])]
のように、キーと集約されたレコードのイテラブルのペアをレコードとするRDDを返します。
対して Dataset[T]
の groupByKey
は groupByKey[K](func: (T) ⇒ K): KeyValueGroupedDataset[K, T]
のように、 KeyValueGroupedDataset[K, T]
という集約処理専用の型を持ちます。
またDatasetに対する処理が効率的に実行されるためには、レコードの各カラムの型がDataFrameでサポートされていたような基本型である必要があります。
対応する型については下記に記載されています。
Spark SQL and DataFrames - Spark 2.0.2 Documentation
おわりに
この記事ではSparkのRDD, DataFrame, Datasetの違いと、最も新しいAPIであるDatasetを使う際のポイントについて説明しました。
記事を書くにあたっては、下記の書籍や記事を参考にしました。
- 詳解 Apache Spark:書籍案内|技術評論社
- Introducing Apache Spark Datasets - The Databricks Blog
- A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets - The Databricks Blog
特に最後の記事は、Datasetの特徴をRDD,DataFrameと比較して理解するために非常に参考になりました。
今後は実際の利用事例についても発信していきたいと思います。
1974年の仮想化技術の論文についてLTしました
8月に毎月恒例の社内LT大会があり、テーマがVMだったので、1974年の仮想化技術の論文を紹介しました。 そのときのスライドを公開します。
取り上げたのは次の論文です。
この論文は、コンピュータアーキテクチャの効率的な仮想化が可能であるために命令セットが満たすべき必要条件を定理として形式的に証明したもので、仮想化技術の界隈では知る人はいないほどの重要な論文とのことです。
テーマがVMに決まった時はネタが思い浮かばず、しばらくWikpediaを眺めていたら面白そうな内容でしかも発表が1974年(!)ということで、LTの題材に決めました。
仮想化技術については全くの素人が付け焼き刃で勉強して論文を読んだだけなので、専門家から見ればいろいろツッコミが入りそうな内容ですがご容赦ください。