今年触った技術を雑多に
なんとなく。
AWS
GCPはちょっと触ってたけど、AWSは間違ってお金かかってしまいそうという謎の不安があって社会に出るまで敬遠していた。 EC2, ELB, S3, Beanstalkのような基本的なサービスに加えてRedshiftとEMRをかなり触ることになった。 覚えたり調べたりすることが永遠に尽きない。
Ruby
今年一番書いた言語。 もともとPythonに慣れていたので、スコープに由来のわからない変数やメソッドが現れるのに未だ違和感を覚えてしまう。 ライブラリが豊富すぎて技術選択で迷いがちなPythonに比べ、Bundler, Rake, RSpecなどのほぼ必須のエコシステムに乗っておけばそれほど道を踏み外さないのは楽だった。
Python
意外と自分で書くことは少なく、どちらかというとレビューする事が多かった気がする。 自分にとって書いてて一番しっくりくるのは相変わらず。 ちょっとしたスクリプトなどはまずPythonで書いてる。
Scala
今年後半から使い始めた。 静的型付の言語は久々だったけど、以前経験したC, C#, Javaに比べて圧倒的に馴染みやすかった。 心地よい学習曲線に乗って学べる印象。
ES2016
ほんの少しだけ触った。 大したことをしてないので大した感想はない。 JavaScriptって今こんな感じなんだ、ふーん。
Ansible
ちょっとした環境の構成管理ツールとして使った。 最初に覚えなければいけない概念が少し多かったが、公式のベストプラクティスを参考にするとうまく軌道に乗れた気がする。
Fluentd
使うプラグインのコードを読む機会が多かった。 よく使われてメンテされてるプラグインとそうでないものでの品質の落差が激しいのを実感した。
Spark
今年触ったものの中で一番面白かった。 分散処理でスレッドセーフ性が問題になったり、遅延評価が影響してちょっとしたコードの変更が予期しない結果を産んだりといろいろ悩んだけど、その分学びは多かった。 来年はSpark StreamingやSpark MLも本格的に使ってみたい。
Apache Sparkの3つのAPI: RDD, DataFrameからDatasetへ
はじめに
Livesense Advent Calendar 2016の11日目の記事です。
昨今ではAmazon Elastic Mapreduce (EMR)などのマネージドサービスの登場により、分散データ処理基盤を構築・運用するハードルは劇的に下がっています。
ソフトウェアの選択肢も広がり、特にApache Sparkはオンメモリ処理を基本とした高速で汎用的なフレームワークとして人気があります。
2016年の初頭には新しいDataset APIを搭載したSpark 2.0がリリースされました。
Spark 1.0系では主にRDDとDataFrameという2つのAPIを処理の特性に応じて使い分けていましたが、Spark 2.0では多くの処理をDataset APIひとつで実現できるようになっています。
従来のRDDやDataFrameを用いた処理は、大抵の場合そのままか多少工夫することでDatasetでも実現できます。
しかしWeb上にはまだDatasetに関する情報は多くなく、Sparkについて検索するとRDDやDataFrameについて書かれた記事の方がよく見つかります。
この記事では、RDDやDataFrameに関するノウハウをDatasetに応用できるよう、従来のAPIとDatasetを比較しながら紹介していきます。
私自身もSparkについてはまだまだ勉強中のため、内容に間違いがあればぜひコメント等でご指摘ください。
Sparkの基本的な仕組み
Sparkでは多数のレコードからなるデータを幾つかのパーティションに分割し、複数のコンピューティングノードに分配して展開します。
その上でパーティション毎に独立に適用できる処理はノード内で完結し、複数のパーティションのレコードを参照・集約する処理はその都度ノード間でデータの再分割・再分配を行うことでデータを処理していきます。
Sparkの仕組みはHadoopのMapReduceと似ていますが、Hadoopが毎回のステップ終了時にHDFSにデータを書き出すのに対し、Sparkではデータをメモリ上に展開したまま複数のステップを連続して実行できるのが特徴です。
この仕組みにより、ステップ数の多い処理では一般的にHadoopよりSparkのほうがパフォーマンスが高いとされています。
データコレクションの操作のためのAPI
Hadoopのプログラミングモデルは、入力データから出力データへの変換をMapReduceとして記述するというものでした。
MapおよびReduceは単一レコードあるいは同一キーでグループ化されたレコードに対する処理であり、データ全体をひとつのまとまりとして扱うための抽象化層は別に用意する必要がありました。
対してSparkでは、各ノードに分散しているデータをプログラムから単一のオブジェクトとして操作できるAPIを提供しています。
これにより、複数の中間的な状態を経るような複雑な処理であっても、より抽象化された形で実装できるようになっています。
Sparkにおいてデータのまとまりを扱うAPIとしては、RDD, DataFrame, Datasetの3つが存在します。
当初からSparkには最も基本的なAPIであるRDDが用意されており、Spark 1.3ではDataFrameが追加されました。
Spark1.6ではこれらに加えてDatasetが試験的に導入されました。
さらにSpark2.0ではDatasetが正式なAPIとなり、DataFrameはDatasetに統合されました。
以降ではこれらのAPIの特徴を順を追って紹介していきます。
言語はSparkのネイティブ言語であるScalaを用いています。
1. RDD - ネイティブなオブジェクトのコレクション
RDDではデータをネイティブな(JVM)オブジェクトのコレクションとして扱います。
プログラムのイメージは次のようになります。
class Person(name: String, age: Int, country: String) { def isAdult = age >= 20 } val rdd = sc.textFile("people.csv") // name,age,country .map(s => s.split(",")) .map(t => new Person(t(0), t(1).toInt, t(2))) val result = rdd .filter(_.isAdult) .groupBy(_.country) .countByKey print(result) /* Map(JP-> 50, US -> 20) */
RDDの型は RDD[T]
であり、 T
は任意の型をとることができます。
RDD[T]
には次のようなメソッドが定義されています。
map[U](f: (T) ⇒ U): RDD[U]
reduce(f: (T, T) ⇒ T): T
groupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])]
このように、RDDに対する処理でにはレコードの型を与えることができます。
すなわちRDDに対する処理は型安全であり、コンパイル時の型検査によって実行時に型エラーが発生しないことが保証できます。
これは複雑データ変換処理などにおいて、データ形式の不一致によるエラーを防ぐ上で大変便利です。
その反面、それぞれのレコードに対応するオブジェクトは処理のたびにシリアライズ/デシリアライズされるため、オーバーヘッドは必然的に大きくなってしまいます。
特に他のオブジェクトを参照するオブジェクトをレコードとするような場合、シリアライズの際に参照先のオブジェクトが一緒に複製されてしまい、メモリを大量に消費してしまうこともあります。
2. DataFrame - 基本的な型の値からなるテーブル
DataFrameではデータを基本的な型の値からなるテーブルとして扱います。
DataFrameの各レコードは1つ以上の名前付きカラムで構成され、各カラムには Int
や String
などのプリミティブ型および Array
や Map
などの原始的なコレクション型のみを用いることができます。
プログラムのイメージは次のようになります。
// Spark 1.6.0 import org.apache.spark.sql.SQLContext val sqc = new SQLContext(sc) // {"name": name, "age": age, "country": country} val df = sqc.read.json("people.jsonl") val result = df .filter(df("age") >= 20) .groupBy(df("country")).count result.show /* +-----+--------+ |value|count(1)| +-----+--------+ | JP| 50| | US| 20| +-----+--------+ */
Spark SQLを用いると、DataFrameに対する処理をSQLとして記述することもできます。
df.registerTempTable("people") df.sql(""" SELECT country, COUNT(*) FROM people WHERE age >= 20 GROUP BY country; """).show
DataFrameに含まれる値は専用のエンコーダによって最適化されるため、通常のオブジェクトに比べてシリアライズ/デシリアライズのオーバーヘッドやメモリ使用量は大幅に小さくなっています。
またDataFrameでは、一連の処理全体の実行計画がオプティマイザによって予め最適化されます。
これらの違いにより、RDDとDataFrameで同等の処理を実行した場合、DataFrameが数倍から数十倍のパフォーマンスを発揮することがあります。
一方、DataFrameはカラム名や各カラムの型についての情報を型で指定できません。
DataFrameのメソッドの型定義は次のようになっています。
filter(condition: Column): DataFrame
agg(expr: Column, exprs: Column*): DataFrame
groupBy(cols: Column*): GroupedData
Column
は df("country")
や df("age") >= 20
のように作成できますが、これらの実際の値の型は実行時にしかわかりません。
またDataFrameの型は DataFrame
であり、 RDD[T]
のようにレコードの型パラメータを持たないため、カラムの名前や型などの情報をDataFrameの型情報から得ることができません。
このため、存在しないカラムへの参照や型の異なる関数の適用がある場合、それらをコンパイル時に検出できず、実行時エラーが発生してしまいます。
加えてDataFrameが扱える型や関数は限定されており、RDDのようにユーザ定義の型やそれに対する任意の関数を扱うことは難しくなっています。
RDD v.s. DataFrame
これまで見てきたように、RDDとDataFrameには次のように一長一短があります。
このような違いのため、RDDとDataFrameは用途に応じて使い分ける必要がありました。
例えば時間がかかり繰り返し実行されるバッチ処理では型エラーを未然に防ぐためにRDDを用い、アドホックな分析クエリでは応答速度とクエリの書きやすさからDataFrameを用いる、といった要領です。
3. Dataset - RDDとDataFrameの長所を併せ持つコレクション
DatasetはいわばRDDとDataFrameの良い所どりを目指したAPIです。
プログラムのイメージは次のようになります。
// Spark 2.0.0 import org.apache.spark.sql.SQLContext val sqc = new SQLContext(sc) // {"name": name, "age": age, "country": country} val ds = sqc.read.json("people.jsonl") val result = ds .filter(ds("age") >= 20) .groupBy(ds("country")) .count result.show /* +-----+--------+ |value|count(1)| +-----+--------+ | JP| 50| | US| 20| +-----+--------+ */
気づかれたかもしれませんが、Spark 2.0のDatasetはSpark 1.6のDataFrameと全く同じように扱うことができます。
当然Datasetに対してもDataFrameと同様にSQLを発行することが可能です。
ここで上のDatasetのデータ型に適合するcase classを作成し、 as
によってDatasetに適用してみます。
すると、Datasetに対してRDDと同様のmapやreduceなどを実行することができます。
case class Person(name: String, age: Long, country: String) { def isAdult = age >= 20 } val peopleDs = ds.as[Person] val result = peopleDs .filter(_.isAdult) .groupByKey(_.country) .count result.show /* +-----+--------+ |value|count(1)| +-----+--------+ | JP| 50| | US| 20| +-----+--------+ */
これは以下のような仕組みによって実現されています。
まず、Datasetの型は Dataset[T]
であり、DataFrameは Dataset[Row]
と等価です。
Row
はカラムの名前や実際の値の型を自身の型情報としては持ちませんが、Row
型のオブジェクトはそのスキーマを属性として持つことができます。
Row
のコレクションである Dataset[Row]
も同様にスキーマを持つことができ、特にJSONなどから読み出されたDatasetには自動的にスキーマが付与されます。
// {"name": name, "age": age, "country": country} val ds = sqc.read.json("people.jsonl") ds.printSchema /* root |-- age: long (nullable = true) |-- country: string (nullable = true) |-- name: string (nullable = true) */
ここで Dataset[Row]
のスキーマに適合する型 U
を .as[U]
として適用すると、 Dataset[U]
型のDatasetが得られます。
scala > val peopleDs = ds.as[Person]
res: Dataset[Person] = ...
.as[U]
の U
に元のDatasetのスキーマと同じカラム名・型のフィールドを持つクラスを指定した場合、Rowの各カラムが同名のフィールドにマッピングされます。
特にScalaではデータモデルをcase classで定義することが多いため、通常はUとしてcase classを用います。
as
によって得られた Dataset[U]
型には、 RDD[U]
と同様の型安全な操作を適用することができます。
さらに U
の各フィールドの型が、DataFrameで扱えたような基本的な型である場合、レコードの内部表現はDataFrameと同様に最適化されるため、処理のオーバーヘッドはRDDに比べて大幅に小さくなります。
このようにDatasetはRDDの型安全性とDataFrameのパフォーマンスを持ち合わせており、これまでRDDおよびDataFrameを使い分けていたようなケースを統一的なAPIで記述することができます。
RDD, DataFrameからDatasetへの書き換え
ここまでSparkの3種類のAPIについて紹介しました。
ここからは、RDDまたはDataFrameを用いたコードをDatasetで書き直す際のポイントを簡単に紹介します。
これにより、既存のコードやノウハウをDatasetに応用しやすくなると思います。
DataFrameからDatasetへ
まずDataFrameですが、前述のようにDataseにはDataFrameとほぼ同じ処理が適用可能です。
加えてSpark 2.0では DataFrame
が Dataset[Row]
へのエイリアスとなっており、多くの場合は既存のコードを書き換えずにそのまま動作させることができます。
RDDからDatasetへ
次にRDDですが、こちらは多少の注意が必要です。
データの入力については、 SparkContext.textFile
の代わりに SQLCotext.text
を使用できます。
map
や filter
などの処理の型定義は RDD[T]
と Dataset[T]
で同じですが、集約処理については少し違いがあります。
例えば RDD[T]
の groupBy
は groupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])]
のように、キーと集約されたレコードのイテラブルのペアをレコードとするRDDを返します。
対して Dataset[T]
の groupByKey
は groupByKey[K](func: (T) ⇒ K): KeyValueGroupedDataset[K, T]
のように、 KeyValueGroupedDataset[K, T]
という集約処理専用の型を持ちます。
またDatasetに対する処理が効率的に実行されるためには、レコードの各カラムの型がDataFrameでサポートされていたような基本型である必要があります。
対応する型については下記に記載されています。
Spark SQL and DataFrames - Spark 2.0.2 Documentation
おわりに
この記事ではSparkのRDD, DataFrame, Datasetの違いと、最も新しいAPIであるDatasetを使う際のポイントについて説明しました。
記事を書くにあたっては、下記の書籍や記事を参考にしました。
- 詳解 Apache Spark:書籍案内|技術評論社
- Introducing Apache Spark Datasets - The Databricks Blog
- A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets - The Databricks Blog
特に最後の記事は、Datasetの特徴をRDD,DataFrameと比較して理解するために非常に参考になりました。
今後は実際の利用事例についても発信していきたいと思います。
1974年の仮想化技術の論文についてLTしました
8月に毎月恒例の社内LT大会があり、テーマがVMだったので、1974年の仮想化技術の論文を紹介しました。 そのときのスライドを公開します。
取り上げたのは次の論文です。
この論文は、コンピュータアーキテクチャの効率的な仮想化が可能であるために命令セットが満たすべき必要条件を定理として形式的に証明したもので、仮想化技術の界隈では知る人はいないほどの重要な論文とのことです。
テーマがVMに決まった時はネタが思い浮かばず、しばらくWikpediaを眺めていたら面白そうな内容でしかも発表が1974年(!)ということで、LTの題材に決めました。
仮想化技術については全くの素人が付け焼き刃で勉強して論文を読んだだけなので、専門家から見ればいろいろツッコミが入りそうな内容ですがご容赦ください。
参考
■
修論と学会終わって成績表も確認したのでやっと気分が落ち着いた。
普通はこういうとき旅行とか行くのだろうけど、高校~大学で日本の8割ぐらいの都道府県は行ったし、海外は面倒なので、特に大きな計画はしてない。 近場で鳥取だけまだ訪れたことがないので、日帰りか1泊ぐらいでふらっと行こうかと思ってる。 地元にいるのもあと3週間くらいなのでのんびりしたい。
最近はiPad miniのKindleでマンガ読むのが楽しみで、ランキングの無料のやつを片っ端からダウンロードして気に入ったシリーズを全巻大人買い、という完全にAmazonの家畜みたいな消費してる。 気が付いたら沈黙の艦隊と太陽の黙示録がライブラリに揃っててやばい。
勉強はしてて、ちょっと前に話題になったGoogleのTensorFlowを使ったコースを始めて今3つ目の課題に取り掛かったところ。 最近のDeepLearningについては、シグモイド関数の代わりにReLUを活性化関数にした多層NNという程度の理解しかなくて、まとまった時間がとれたら勉強しようと思ってたのでちょうど良かった。 前やったCourseraのやつと違ってクリアしたらなんかもらえるみたいなのはないので少しモチベーションはちょっと下がる。
あとは引越に向けて持ち物の整理してる。 minikuraっていうサービス使ってて、段ボール一箱月250円ぐらいで預かってくれるのですこぶる便利。
学習アルゴリズム以外のscikit-learn便利機能と連携ライブラリ
Pythonで機械学習を使う場合、scikit-learnには何かとお世話になる。 豊富な学習アルゴリズムの実装を利用できるのが長所だが、実はアルゴリズムそのもの以外にも、前処理や評価のための様々な便利機能を有している。 これらを知らずに使っていると,車輪の再発明をしてしまうことになる。
また、機械学習関連のPythonライブラリはscikit-learnと連携できるAPIをもつものも多い。 scikit-learnを中心とするエコシステムが成立しているとも言える。 中にはコードをほとんど書かずに簡単なモデリングができるようなツールまである。
この記事では個々の学習アルゴリズムではなく、scikit-learnに予め用意されている便利機能やscikit-learnと連携できるライブラリなどを紹介する。
便利モジュール・パッケージ
前処理や評価で使われる一般的なテクニックの多くは既に実装されているので、自分で実装する前にまずは API Reference を眺めてみるとよい。 たいていの道具は見つかると思う。
自分がよく使うモジュール・パッケージをいくつか紹介する。
feature_extraction
画像やテキストなどの生データからの特徴抽出・ベクトル化を提供する。 自分の場合、テキストをよく扱うので次のようなクラスをよく使う。
- DictVectorizer: dictをarrayに変換
- FeatureHasher: feature hashingによってdictをarrayに変換(テキストデータのオンライン学習等で使う)
- CountVectorizer: テキストを語の出現回数でBoWベクトル化
- TfidfVectorizer: テキストをTF-IDFでBoWベクトル化
preprocessing
特徴ベクトルに対して前処理を適用する。
- scale: 標準正規分布にスケーリング
- minmax_scale, maxabs_scale:
[0, 1]
[-1, 1]
などの範囲にスケーリング - normalize: 各ベクトルのノルムを正規化
- label_binalize: カテゴリ特徴を二値化
- LabelEncoder: ラベルを数値にエンコード
- PolynomialFeatures: 多項式による組み合わせ特徴を生成
cross_validation
(k-fold) cross validationを手軽に扱う。
- train_test_split: データセットを訓練セットとテストセットに分割
- cross_val_score: k-fold cross validation により各foldの評価スコアを算出
metrics
各種評価指標の実装。
- accuracy_score: 精度
- f1_score: F値
- precision_score, recall_score: 適合率, 再現率
- auc: AUC
- mean_squared_error: MSE
grid_search
グリッドサーチによるパラメータを最適化する。
- GridSearchCV: 与えられたグリッドから最適なパラメータを自動で選ぶモデルを生成
外部ライブラリ
外部ライブラリは細かく挙げていくとキリがないので、主なものだけ紹介する。 もっと調べたい場合は以下のドキュメントを参照。
Related Projects — scikit-learn 0.17.1 documentation
Scikit-Learn Laboratory(SKLL)
用意した設定ファイルに基づいて、モデルの学習や評価などを自動実行する。 簡単な評価程度ならPythonコードを全く書かなくてもいいほど。
設定ファイルはPython configuration file形式で記述する。 以下はTitanicデータセットに対してRandomForest, 決定木, SVM, 等で学習を行い評価するタスクの設定ファイル一例(ここから引用)。
[General] experiment_name = Titanic_Evaluate_Tuned task = evaluate [Input] train_directory = train test_directory = dev featuresets = [["family.csv", "misc.csv", "socioeconomic.csv", "vitals.csv"]] learners = ["RandomForestClassifier", "DecisionTreeClassifier", "SVC", "MultinomialNB"] label_col = Survived id_col = PassengerId [Tuning] grid_search = true objective = accuracy
タスクの実行はCLIで行う。
$ run_experiment evaluate.cfg
結果は次のように出力される。
Fold: Model Parameters: {"max_depth": 10, "compute_importances": null, "min_density": null, "bootstrap": true, "n_estimators": 500, "verbose": 0, "min_samples_split": 2, "max_features": "auto", "min_samples_leaf": 1, "criterion": "gini", "random_state": 123456789, "max_leaf_nodes": null, "n_jobs": 1, "oob_score": false} Grid Objective Score (Train) = 0.8089887640449438 +---+-------+------+-----------+--------+-----------+ | | 0 | 1 | Precision | Recall | F-measure | +---+-------+------+-----------+--------+-----------+ | 0 | [101] | 14 | 0.871 | 0.878 | 0.874 | +---+-------+------+-----------+--------+-----------+ | 1 | 15 | [49] | 0.778 | 0.766 | 0.772 | +---+-------+------+-----------+--------+-----------+ (row = reference; column = predicted) Accuracy = 0.8379888268156425 Objective Function Score (Test) = 0.8379888268156425
sklearn-pandas
pandasのDataFrameを上記のsklearn.preprocessing等を用いて学習用の入力データに変換する。
これを
data = pd.DataFrame({ 'pet': ['cat', 'dog', 'dog', 'fish', 'cat', 'dog', 'cat', 'fish'], 'children': [4., 6, 3, 3, 2, 3, 5, 4], 'salary': [90, 24, 44, 27, 32, 59, 36, 27] })
こうして
mapper = DataFrameMapper([ ('pet', sklearn.preprocessing.LabelBinarizer()), (['children'], sklearn.preprocessing.StandardScaler()) ]) np.round(mapper.fit_transform(data.copy()), 2)
こうじゃ!
array([[ 1. , 0. , 0. , 0.21], [ 0. , 1. , 0. , 1.88], [ 0. , 1. , 0. , -0.63], [ 0. , 0. , 1. , -0.63], [ 1. , 0. , 0. , -1.46], [ 0. , 1. , 0. , -0.63], [ 1. , 0. , 0. , 1.04], [ 0. , 0. , 1. , 0.21]])
(ここから引用)。
auto-sklearn
データを与えるだけでアルゴリズム選択・パラメータチューニングを全自動で行い、適切なモデルを生成してくれる。
サンプルコードにあるとおり、人間がやるのは AutoSklearnClassifier
のインスタンスを生成して fit
するだけという便利なのか乱暴なのかよくわからない代物。
手っ取り早くベースラインのモデルを作るにはいいかもしれない。
その他
scikit-learnにはないトピックモデルや構造化学習などのアルゴリズム実装もいろいろある。
自分ではあまり使ったことのないライブラリも含んでるので参考程度にどうぞ。
- https://github.com/ariddell/lda/:lda: LDA
- PyStruct: CRF等の構造化学習
- HMMLearn: 隠れマルコフモデル
- sklearn-compiledtrees: 決定木のアンサンブルモデル(RandomForest, GradientBoostedTree)をコンパイルして高速化
追記
2016-02-26: scikit-learnにもトピックモデルの実装は含まれているとのこと。 id:another16javac さん、ご指摘ありがとうございます。
学習アルゴリズム以外のscikit-learn便利機能と連携ライブラリ - yubessy.hatenablog.com実はscikit-learnにもトピックモデルあります http://scikit-learn.org/stable/modules/generated/sklearn.decomposition.LatentDirichletAllocation.html
2016/02/25 01:56