今年触った技術を雑多に

なんとなく。

AWS

GCPはちょっと触ってたけど、AWSは間違ってお金かかってしまいそうという謎の不安があって社会に出るまで敬遠していた。 EC2, ELB, S3, Beanstalkのような基本的なサービスに加えてRedshiftとEMRをかなり触ることになった。 覚えたり調べたりすることが永遠に尽きない。

Ruby

今年一番書いた言語。 もともとPythonに慣れていたので、スコープに由来のわからない変数やメソッドが現れるのに未だ違和感を覚えてしまう。 ライブラリが豊富すぎて技術選択で迷いがちなPythonに比べ、Bundler, Rake, RSpecなどのほぼ必須のエコシステムに乗っておけばそれほど道を踏み外さないのは楽だった。

Python

意外と自分で書くことは少なく、どちらかというとレビューする事が多かった気がする。 自分にとって書いてて一番しっくりくるのは相変わらず。 ちょっとしたスクリプトなどはまずPythonで書いてる。

Scala

今年後半から使い始めた。 静的型付の言語は久々だったけど、以前経験したC, C#, Javaに比べて圧倒的に馴染みやすかった。 心地よい学習曲線に乗って学べる印象。

ES2016

ほんの少しだけ触った。 大したことをしてないので大した感想はない。 JavaScriptって今こんな感じなんだ、ふーん。

Ansible

ちょっとした環境の構成管理ツールとして使った。 最初に覚えなければいけない概念が少し多かったが、公式のベストプラクティスを参考にするとうまく軌道に乗れた気がする。

Fluentd

使うプラグインのコードを読む機会が多かった。 よく使われてメンテされてるプラグインとそうでないものでの品質の落差が激しいのを実感した。

Spark

今年触ったものの中で一番面白かった。 分散処理でスレッドセーフ性が問題になったり、遅延評価が影響してちょっとしたコードの変更が予期しない結果を産んだりといろいろ悩んだけど、その分学びは多かった。 来年はSpark StreamingやSpark MLも本格的に使ってみたい。

Apache Sparkの3つのAPI: RDD, DataFrameからDatasetへ

はじめに

Livesense Advent Calendar 2016の11日目の記事です。

昨今ではAmazon Elastic Mapreduce (EMR)などのマネージドサービスの登場により、分散データ処理基盤を構築・運用するハードルは劇的に下がっています。
ソフトウェアの選択肢も広がり、特にApache Sparkはオンメモリ処理を基本とした高速で汎用的なフレームワークとして人気があります。

spark.apache.org

2016年の初頭には新しいDataset APIを搭載したSpark 2.0がリリースされました。
Spark 1.0系では主にRDDとDataFrameという2つのAPIを処理の特性に応じて使い分けていましたが、Spark 2.0では多くの処理をDataset APIひとつで実現できるようになっています。

従来のRDDやDataFrameを用いた処理は、大抵の場合そのままか多少工夫することでDatasetでも実現できます。
しかしWeb上にはまだDatasetに関する情報は多くなく、Sparkについて検索するとRDDやDataFrameについて書かれた記事の方がよく見つかります。

この記事では、RDDやDataFrameに関するノウハウをDatasetに応用できるよう、従来のAPIとDatasetを比較しながら紹介していきます。
私自身もSparkについてはまだまだ勉強中のため、内容に間違いがあればぜひコメント等でご指摘ください。

Sparkの基本的な仕組み

Sparkでは多数のレコードからなるデータを幾つかのパーティションに分割し、複数のコンピューティングノードに分配して展開します。
その上でパーティション毎に独立に適用できる処理はノード内で完結し、複数のパーティションのレコードを参照・集約する処理はその都度ノード間でデータの再分割・再分配を行うことでデータを処理していきます。

Sparkの仕組みはHadoopMapReduceと似ていますが、Hadoopが毎回のステップ終了時にHDFSにデータを書き出すのに対し、Sparkではデータをメモリ上に展開したまま複数のステップを連続して実行できるのが特徴です。
この仕組みにより、ステップ数の多い処理では一般的にHadoopよりSparkのほうがパフォーマンスが高いとされています。

データコレクションの操作のためのAPI

Hadoopのプログラミングモデルは、入力データから出力データへの変換をMapReduceとして記述するというものでした。
MapおよびReduceは単一レコードあるいは同一キーでグループ化されたレコードに対する処理であり、データ全体をひとつのまとまりとして扱うための抽象化層は別に用意する必要がありました。

対してSparkでは、各ノードに分散しているデータをプログラムから単一のオブジェクトとして操作できるAPIを提供しています。
これにより、複数の中間的な状態を経るような複雑な処理であっても、より抽象化された形で実装できるようになっています。

Sparkにおいてデータのまとまりを扱うAPIとしては、RDD, DataFrame, Datasetの3つが存在します。
当初からSparkには最も基本的なAPIであるRDDが用意されており、Spark 1.3ではDataFrameが追加されました。
Spark1.6ではこれらに加えてDatasetが試験的に導入されました。
さらにSpark2.0ではDatasetが正式なAPIとなり、DataFrameはDatasetに統合されました。

以降ではこれらのAPIの特徴を順を追って紹介していきます。
言語はSparkのネイティブ言語であるScalaを用いています。

1. RDD - ネイティブなオブジェクトのコレクション

RDDではデータをネイティブな(JVM)オブジェクトのコレクションとして扱います。
プログラムのイメージは次のようになります。

class Person(name: String, age: Int, country: String) {
  def isAdult = age >= 20
}

val rdd = sc.textFile("people.csv") // name,age,country
  .map(s => s.split(","))
  .map(t => new Person(t(0), t(1).toInt, t(2)))

val result = rdd
  .filter(_.isAdult)
  .groupBy(_.country)
  .countByKey

print(result)
/*
Map(JP-> 50, US -> 20)
*/

RDDの型は RDD[T] であり、 T は任意の型をとることができます。
RDD[T] には次のようなメソッドが定義されています。

  • map[U](f: (T) ⇒ U): RDD[U]
  • reduce(f: (T, T) ⇒ T): T
  • groupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])]

このように、RDDに対する処理でにはレコードの型を与えることができます。
すなわちRDDに対する処理は型安全であり、コンパイル時の型検査によって実行時に型エラーが発生しないことが保証できます。
これは複雑データ変換処理などにおいて、データ形式の不一致によるエラーを防ぐ上で大変便利です。

その反面、それぞれのレコードに対応するオブジェクトは処理のたびにシリアライズ/デシリアライズされるため、オーバーヘッドは必然的に大きくなってしまいます。
特に他のオブジェクトを参照するオブジェクトをレコードとするような場合、シリアライズの際に参照先のオブジェクトが一緒に複製されてしまい、メモリを大量に消費してしまうこともあります。

2. DataFrame - 基本的な型の値からなるテーブル

DataFrameではデータを基本的な型の値からなるテーブルとして扱います。
DataFrameの各レコードは1つ以上の名前付きカラムで構成され、各カラムには IntString などのプリミティブ型および ArrayMap などの原始的なコレクション型のみを用いることができます。
プログラムのイメージは次のようになります。

// Spark 1.6.0
import org.apache.spark.sql.SQLContext

val sqc = new SQLContext(sc)

 // {"name": name, "age": age, "country": country}
val df = sqc.read.json("people.jsonl")

val result = df
  .filter(df("age") >= 20)
  .groupBy(df("country")).count

result.show
/*
+-----+--------+
|value|count(1)|
+-----+--------+
|   JP|       50|
|   US|      20|
+-----+--------+
*/

Spark SQLを用いると、DataFrameに対する処理をSQLとして記述することもできます。

df.registerTempTable("people")

df.sql("""
  SELECT country, COUNT(*)
  FROM people
  WHERE age >= 20
  GROUP BY country;
""").show

DataFrameに含まれる値は専用のエンコーダによって最適化されるため、通常のオブジェクトに比べてシリアライズ/デシリアライズのオーバーヘッドやメモリ使用量は大幅に小さくなっています。
またDataFrameでは、一連の処理全体の実行計画がオプティマイザによって予め最適化されます。
これらの違いにより、RDDとDataFrameで同等の処理を実行した場合、DataFrameが数倍から数十倍のパフォーマンスを発揮することがあります。

一方、DataFrameはカラム名や各カラムの型についての情報を型で指定できません。
DataFrameのメソッドの型定義は次のようになっています。

  • filter(condition: Column): DataFrame
  • agg(expr: Column, exprs: Column*): DataFrame
  • groupBy(cols: Column*): GroupedData

Columndf("country")df("age") >= 20 のように作成できますが、これらの実際の値の型は実行時にしかわかりません。
またDataFrameの型は DataFrame であり、 RDD[T] のようにレコードの型パラメータを持たないため、カラムの名前や型などの情報をDataFrameの型情報から得ることができません。
このため、存在しないカラムへの参照や型の異なる関数の適用がある場合、それらをコンパイル時に検出できず、実行時エラーが発生してしまいます。

加えてDataFrameが扱える型や関数は限定されており、RDDのようにユーザ定義の型やそれに対する任意の関数を扱うことは難しくなっています。

RDD v.s. DataFrame

これまで見てきたように、RDDとDataFrameには次のように一長一短があります。

  • RDD
    • ✅ データ処理が型安全である
    • ✅ ユーザ定義の型を扱うことができる
    • ❌ パフォーマンスが低い
    • アドホックなクエリ処理が書きにくい
  • DataFrame
    • ✅ パフォーマンスが高い
    • SQLを用いてアドホックな処理が書ける
    • ❌ データ処理が型安全でない
    • ❌ 一部の基本的な型の値しか扱うことができない

このような違いのため、RDDとDataFrameは用途に応じて使い分ける必要がありました。
例えば時間がかかり繰り返し実行されるバッチ処理では型エラーを未然に防ぐためにRDDを用い、アドホックな分析クエリでは応答速度とクエリの書きやすさからDataFrameを用いる、といった要領です。

3. Dataset - RDDとDataFrameの長所を併せ持つコレクション

DatasetはいわばRDDとDataFrameの良い所どりを目指したAPIです。
プログラムのイメージは次のようになります。

// Spark 2.0.0
import org.apache.spark.sql.SQLContext

val sqc = new SQLContext(sc)

// {"name": name, "age": age, "country": country}
val ds = sqc.read.json("people.jsonl")

val result = ds
  .filter(ds("age") >= 20)
  .groupBy(ds("country"))
  .count

result.show
/*
+-----+--------+
|value|count(1)|
+-----+--------+
|   JP|       50|
|   US|      20|
+-----+--------+
*/

気づかれたかもしれませんが、Spark 2.0のDatasetはSpark 1.6のDataFrameと全く同じように扱うことができます。
当然Datasetに対してもDataFrameと同様にSQLを発行することが可能です。

ここで上のDatasetのデータ型に適合するcase classを作成し、 as によってDatasetに適用してみます。
すると、Datasetに対してRDDと同様のmapやreduceなどを実行することができます。

case class Person(name: String, age: Long, country: String) {
  def isAdult = age >= 20
}

val peopleDs = ds.as[Person]

val result = peopleDs
  .filter(_.isAdult)
  .groupByKey(_.country)
  .count

result.show
/*
+-----+--------+
|value|count(1)|
+-----+--------+
|   JP|       50|
|   US|      20|
+-----+--------+
*/

これは以下のような仕組みによって実現されています。

まず、Datasetの型は Dataset[T] であり、DataFrameは Dataset[Row] と等価です。
Row はカラムの名前や実際の値の型を自身の型情報としては持ちませんが、Row 型のオブジェクトはそのスキーマを属性として持つことができます。
Row のコレクションである Dataset[Row] も同様にスキーマを持つことができ、特にJSONなどから読み出されたDatasetには自動的にスキーマが付与されます。

// {"name": name, "age": age, "country": country}
val ds = sqc.read.json("people.jsonl")

ds.printSchema
/*
root
 |-- age: long (nullable = true)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
*/

ここで Dataset[Row]スキーマに適合する型 U.as[U] として適用すると、 Dataset[U] 型のDatasetが得られます。

scala > val peopleDs = ds.as[Person]
res: Dataset[Person] = ... 

.as[U]U に元のDatasetのスキーマと同じカラム名・型のフィールドを持つクラスを指定した場合、Rowの各カラムが同名のフィールドにマッピングされます。
特にScalaではデータモデルをcase classで定義することが多いため、通常はUとしてcase classを用います。

as によって得られた Dataset[U] 型には、 RDD[U] と同様の型安全な操作を適用することができます。
さらに U の各フィールドの型が、DataFrameで扱えたような基本的な型である場合、レコードの内部表現はDataFrameと同様に最適化されるため、処理のオーバーヘッドはRDDに比べて大幅に小さくなります。

このようにDatasetはRDDの型安全性とDataFrameのパフォーマンスを持ち合わせており、これまでRDDおよびDataFrameを使い分けていたようなケースを統一的なAPIで記述することができます。

RDD, DataFrameからDatasetへの書き換え

ここまでSparkの3種類のAPIについて紹介しました。
ここからは、RDDまたはDataFrameを用いたコードをDatasetで書き直す際のポイントを簡単に紹介します。
これにより、既存のコードやノウハウをDatasetに応用しやすくなると思います。

DataFrameからDatasetへ

まずDataFrameですが、前述のようにDataseにはDataFrameとほぼ同じ処理が適用可能です。
加えてSpark 2.0では DataFrameDataset[Row] へのエイリアスとなっており、多くの場合は既存のコードを書き換えずにそのまま動作させることができます。

RDDからDatasetへ

次にRDDですが、こちらは多少の注意が必要です。

データの入力については、 SparkContext.textFile の代わりに SQLCotext.text を使用できます。

mapfilter などの処理の型定義は RDD[T]Dataset[T] で同じですが、集約処理については少し違いがあります。
例えば RDD[T]groupBygroupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])] のように、キーと集約されたレコードのイテラブルのペアをレコードとするRDDを返します。
対して Dataset[T]groupByKeygroupByKey[K](func: (T) ⇒ K): KeyValueGroupedDataset[K, T] のように、 KeyValueGroupedDataset[K, T] という集約処理専用の型を持ちます。

またDatasetに対する処理が効率的に実行されるためには、レコードの各カラムの型がDataFrameでサポートされていたような基本型である必要があります。
対応する型については下記に記載されています。

Spark SQL and DataFrames - Spark 2.0.2 Documentation

おわりに

この記事ではSparkのRDD, DataFrame, Datasetの違いと、最も新しいAPIであるDatasetを使う際のポイントについて説明しました。
記事を書くにあたっては、下記の書籍や記事を参考にしました。

特に最後の記事は、Datasetの特徴をRDD,DataFrameと比較して理解するために非常に参考になりました。

今後は実際の利用事例についても発信していきたいと思います。

1974年の仮想化技術の論文についてLTしました

8月に毎月恒例の社内LT大会があり、テーマがVMだったので、1974年の仮想化技術の論文を紹介しました。 そのときのスライドを公開します。

取り上げたのは次の論文です。

Popek, Gerald J., and Robert P. Goldberg. "Formal requirements for virtualizable third generation architectures." Communications of the ACM 17.7 (1974): 412-421.

この論文は、コンピュータアーキテクチャの効率的な仮想化が可能であるために命令セットが満たすべき必要条件を定理として形式的に証明したもので、仮想化技術の界隈では知る人はいないほどの重要な論文とのことです。

テーマがVMに決まった時はネタが思い浮かばず、しばらくWikpediaを眺めていたら面白そうな内容でしかも発表が1974年(!)ということで、LTの題材に決めました。

仮想化技術については全くの素人が付け焼き刃で勉強して論文を読んだだけなので、専門家から見ればいろいろツッコミが入りそうな内容ですがご容赦ください。

参考

修論と学会終わって成績表も確認したのでやっと気分が落ち着いた。

普通はこういうとき旅行とか行くのだろうけど、高校~大学で日本の8割ぐらいの都道府県は行ったし、海外は面倒なので、特に大きな計画はしてない。 近場で鳥取だけまだ訪れたことがないので、日帰りか1泊ぐらいでふらっと行こうかと思ってる。 地元にいるのもあと3週間くらいなのでのんびりしたい。

最近はiPad miniKindleでマンガ読むのが楽しみで、ランキングの無料のやつを片っ端からダウンロードして気に入ったシリーズを全巻大人買い、という完全にAmazonの家畜みたいな消費してる。 気が付いたら沈黙の艦隊太陽の黙示録がライブラリに揃っててやばい。

勉強はしてて、ちょっと前に話題になったGoogleのTensorFlowを使ったコースを始めて今3つ目の課題に取り掛かったところ。 最近のDeepLearningについては、シグモイド関数の代わりにReLUを活性化関数にした多層NNという程度の理解しかなくて、まとまった時間がとれたら勉強しようと思ってたのでちょうど良かった。 前やったCourseraのやつと違ってクリアしたらなんかもらえるみたいなのはないので少しモチベーションはちょっと下がる。

あとは引越に向けて持ち物の整理してる。 minikuraっていうサービス使ってて、段ボール一箱月250円ぐらいで預かってくれるのですこぶる便利。

学習アルゴリズム以外のscikit-learn便利機能と連携ライブラリ

Python機械学習を使う場合、scikit-learnには何かとお世話になる。 豊富な学習アルゴリズムの実装を利用できるのが長所だが、実はアルゴリズムそのもの以外にも、前処理や評価のための様々な便利機能を有している。 これらを知らずに使っていると,車輪の再発明をしてしまうことになる。

また、機械学習関連のPythonライブラリはscikit-learnと連携できるAPIをもつものも多い。 scikit-learnを中心とするエコシステムが成立しているとも言える。 中にはコードをほとんど書かずに簡単なモデリングができるようなツールまである。

この記事では個々の学習アルゴリズムではなく、scikit-learnに予め用意されている便利機能やscikit-learnと連携できるライブラリなどを紹介する。

便利モジュール・パッケージ

前処理や評価で使われる一般的なテクニックの多くは既に実装されているので、自分で実装する前にまずは API Reference を眺めてみるとよい。 たいていの道具は見つかると思う。

自分がよく使うモジュール・パッケージをいくつか紹介する。

feature_extraction

画像やテキストなどの生データからの特徴抽出・ベクトル化を提供する。 自分の場合、テキストをよく扱うので次のようなクラスをよく使う。

  • DictVectorizer: dictをarrayに変換
  • FeatureHasher: feature hashingによってdictをarrayに変換(テキストデータのオンライン学習等で使う)
  • CountVectorizer: テキストを語の出現回数でBoWベクトル化
  • TfidfVectorizer: テキストをTF-IDFでBoWベクトル化

preprocessing

特徴ベクトルに対して前処理を適用する。

cross_validation

(k-fold) cross validationを手軽に扱う。

metrics

各種評価指標の実装。

grid_search

グリッドサーチによるパラメータを最適化する。

  • GridSearchCV: 与えられたグリッドから最適なパラメータを自動で選ぶモデルを生成

外部ライブラリ

外部ライブラリは細かく挙げていくとキリがないので、主なものだけ紹介する。 もっと調べたい場合は以下のドキュメントを参照。

Related Projects — scikit-learn 0.17.1 documentation

Scikit-Learn Laboratory(SKLL)

用意した設定ファイルに基づいて、モデルの学習や評価などを自動実行する。 簡単な評価程度ならPythonコードを全く書かなくてもいいほど。

設定ファイルはPython configuration file形式で記述する。 以下はTitanicデータセットに対してRandomForest, 決定木, SVM, 等で学習を行い評価するタスクの設定ファイル一例(ここから引用)。

[General]
experiment_name = Titanic_Evaluate_Tuned
task = evaluate

[Input]
train_directory = train
test_directory = dev
featuresets = [["family.csv", "misc.csv", "socioeconomic.csv", "vitals.csv"]]
learners = ["RandomForestClassifier", "DecisionTreeClassifier", "SVC", "MultinomialNB"]
label_col = Survived
id_col = PassengerId

[Tuning]
grid_search = true
objective = accuracy

タスクの実行はCLIで行う。

$ run_experiment evaluate.cfg

結果は次のように出力される。

Fold:
Model Parameters: {"max_depth": 10, "compute_importances": null, "min_density": null, "bootstrap": true, "n_estimators": 500, "verbose": 0, "min_samples_split": 2, "max_features": "auto", "min_samples_leaf": 1, "criterion": "gini", "random_state": 123456789, "max_leaf_nodes": null, "n_jobs": 1, "oob_score": false}
Grid Objective Score (Train) = 0.8089887640449438
+---+-------+------+-----------+--------+-----------+
|   |     0 |    1 | Precision | Recall | F-measure |
+---+-------+------+-----------+--------+-----------+
| 0 | [101] |   14 |     0.871 |  0.878 |     0.874 |
+---+-------+------+-----------+--------+-----------+
| 1 |    15 | [49] |     0.778 |  0.766 |     0.772 |
+---+-------+------+-----------+--------+-----------+
(row = reference; column = predicted)
Accuracy = 0.8379888268156425
Objective Function Score (Test) = 0.8379888268156425

sklearn-pandas

pandasのDataFrameを上記のsklearn.preprocessing等を用いて学習用の入力データに変換する。

これを

data = pd.DataFrame({
    'pet':      ['cat', 'dog', 'dog', 'fish', 'cat', 'dog', 'cat', 'fish'],
    'children': [4., 6, 3, 3, 2, 3, 5, 4],
    'salary':   [90, 24, 44, 27, 32, 59, 36, 27]
})

こうして

mapper = DataFrameMapper([
    ('pet', sklearn.preprocessing.LabelBinarizer()),
    (['children'], sklearn.preprocessing.StandardScaler())
])
np.round(mapper.fit_transform(data.copy()), 2)

こうじゃ!

array([[ 1.  ,  0.  ,  0.  ,  0.21],
       [ 0.  ,  1.  ,  0.  ,  1.88],
       [ 0.  ,  1.  ,  0.  , -0.63],
       [ 0.  ,  0.  ,  1.  , -0.63],
       [ 1.  ,  0.  ,  0.  , -1.46],
       [ 0.  ,  1.  ,  0.  , -0.63],
       [ 1.  ,  0.  ,  0.  ,  1.04],
       [ 0.  ,  0.  ,  1.  ,  0.21]])

ここから引用)。

auto-sklearn

データを与えるだけでアルゴリズム選択・パラメータチューニングを全自動で行い、適切なモデルを生成してくれる。 サンプルコードにあるとおり、人間がやるのは AutoSklearnClassifierインスタンスを生成して fit するだけという便利なのか乱暴なのかよくわからない代物。 手っ取り早くベースラインのモデルを作るにはいいかもしれない。

その他

scikit-learnにはないトピックモデルや構造化学習などのアルゴリズム実装もいろいろある。 自分ではあまり使ったことのないライブラリも含んでるので参考程度にどうぞ。

追記

2016-02-26: scikit-learnにもトピックモデルの実装は含まれているとのこと。 id:another16javac さん、ご指摘ありがとうございます。

学習アルゴリズム以外のscikit-learn便利機能と連携ライブラリ - yubessy.hatenablog.com

実はscikit-learnにもトピックモデルあります http://scikit-learn.org/stable/modules/generated/sklearn.decomposition.LatentDirichletAllocation.html

2016/02/25 01:56