Apache Sparkの3つのAPI: RDD, DataFrameからDatasetへ

はじめに

Livesense Advent Calendar 2016の11日目の記事です。

昨今ではAmazon Elastic Mapreduce (EMR)などのマネージドサービスの登場により、分散データ処理基盤を構築・運用するハードルは劇的に下がっています。
ソフトウェアの選択肢も広がり、特にApache Sparkはオンメモリ処理を基本とした高速で汎用的なフレームワークとして人気があります。

spark.apache.org

2016年の初頭には新しいDataset APIを搭載したSpark 2.0がリリースされました。
Spark 1.0系では主にRDDとDataFrameという2つのAPIを処理の特性に応じて使い分けていましたが、Spark 2.0では多くの処理をDataset APIひとつで実現できるようになっています。

従来のRDDやDataFrameを用いた処理は、大抵の場合そのままか多少工夫することでDatasetでも実現できます。
しかしWeb上にはまだDatasetに関する情報は多くなく、Sparkについて検索するとRDDやDataFrameについて書かれた記事の方がよく見つかります。

この記事では、RDDやDataFrameに関するノウハウをDatasetに応用できるよう、従来のAPIとDatasetを比較しながら紹介していきます。
私自身もSparkについてはまだまだ勉強中のため、内容に間違いがあればぜひコメント等でご指摘ください。

Sparkの基本的な仕組み

Sparkでは多数のレコードからなるデータを幾つかのパーティションに分割し、複数のコンピューティングノードに分配して展開します。
その上でパーティション毎に独立に適用できる処理はノード内で完結し、複数のパーティションのレコードを参照・集約する処理はその都度ノード間でデータの再分割・再分配を行うことでデータを処理していきます。

Sparkの仕組みはHadoopMapReduceと似ていますが、Hadoopが毎回のステップ終了時にHDFSにデータを書き出すのに対し、Sparkではデータをメモリ上に展開したまま複数のステップを連続して実行できるのが特徴です。
この仕組みにより、ステップ数の多い処理では一般的にHadoopよりSparkのほうがパフォーマンスが高いとされています。

データコレクションの操作のためのAPI

Hadoopのプログラミングモデルは、入力データから出力データへの変換をMapReduceとして記述するというものでした。
MapおよびReduceは単一レコードあるいは同一キーでグループ化されたレコードに対する処理であり、データ全体をひとつのまとまりとして扱うための抽象化層は別に用意する必要がありました。

対してSparkでは、各ノードに分散しているデータをプログラムから単一のオブジェクトとして操作できるAPIを提供しています。
これにより、複数の中間的な状態を経るような複雑な処理であっても、より抽象化された形で実装できるようになっています。

Sparkにおいてデータのまとまりを扱うAPIとしては、RDD, DataFrame, Datasetの3つが存在します。
当初からSparkには最も基本的なAPIであるRDDが用意されており、Spark 1.3ではDataFrameが追加されました。
Spark1.6ではこれらに加えてDatasetが試験的に導入されました。
さらにSpark2.0ではDatasetが正式なAPIとなり、DataFrameはDatasetに統合されました。

以降ではこれらのAPIの特徴を順を追って紹介していきます。
言語はSparkのネイティブ言語であるScalaを用いています。

1. RDD - ネイティブなオブジェクトのコレクション

RDDではデータをネイティブな(JVM)オブジェクトのコレクションとして扱います。
プログラムのイメージは次のようになります。

class Person(name: String, age: Int, country: String) {
  def isAdult = age >= 20
}

val rdd = sc.textFile("people.csv") // name,age,country
  .map(s => s.split(","))
  .map(t => new Person(t(0), t(1).toInt, t(2)))

val result = rdd
  .filter(_.isAdult)
  .groupBy(_.country)
  .countByKey

print(result)
/*
Map(JP-> 50, US -> 20)
*/

RDDの型は RDD[T] であり、 T は任意の型をとることができます。
RDD[T] には次のようなメソッドが定義されています。

  • map[U](f: (T) ⇒ U): RDD[U]
  • reduce(f: (T, T) ⇒ T): T
  • groupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])]

このように、RDDに対する処理でにはレコードの型を与えることができます。
すなわちRDDに対する処理は型安全であり、コンパイル時の型検査によって実行時に型エラーが発生しないことが保証できます。
これは複雑データ変換処理などにおいて、データ形式の不一致によるエラーを防ぐ上で大変便利です。

その反面、それぞれのレコードに対応するオブジェクトは処理のたびにシリアライズ/デシリアライズされるため、オーバーヘッドは必然的に大きくなってしまいます。
特に他のオブジェクトを参照するオブジェクトをレコードとするような場合、シリアライズの際に参照先のオブジェクトが一緒に複製されてしまい、メモリを大量に消費してしまうこともあります。

2. DataFrame - 基本的な型の値からなるテーブル

DataFrameではデータを基本的な型の値からなるテーブルとして扱います。
DataFrameの各レコードは1つ以上の名前付きカラムで構成され、各カラムには IntString などのプリミティブ型および ArrayMap などの原始的なコレクション型のみを用いることができます。
プログラムのイメージは次のようになります。

// Spark 1.6.0
import org.apache.spark.sql.SQLContext

val sqc = new SQLContext(sc)

 // {"name": name, "age": age, "country": country}
val df = sqc.read.json("people.jsonl")

val result = df
  .filter(df("age") >= 20)
  .groupBy(df("country")).count

result.show
/*
+-----+--------+
|value|count(1)|
+-----+--------+
|   JP|       50|
|   US|      20|
+-----+--------+
*/

Spark SQLを用いると、DataFrameに対する処理をSQLとして記述することもできます。

df.registerTempTable("people")

df.sql("""
  SELECT country, COUNT(*)
  FROM people
  WHERE age >= 20
  GROUP BY country;
""").show

DataFrameに含まれる値は専用のエンコーダによって最適化されるため、通常のオブジェクトに比べてシリアライズ/デシリアライズのオーバーヘッドやメモリ使用量は大幅に小さくなっています。
またDataFrameでは、一連の処理全体の実行計画がオプティマイザによって予め最適化されます。
これらの違いにより、RDDとDataFrameで同等の処理を実行した場合、DataFrameが数倍から数十倍のパフォーマンスを発揮することがあります。

一方、DataFrameはカラム名や各カラムの型についての情報を型で指定できません。
DataFrameのメソッドの型定義は次のようになっています。

  • filter(condition: Column): DataFrame
  • agg(expr: Column, exprs: Column*): DataFrame
  • groupBy(cols: Column*): GroupedData

Columndf("country")df("age") >= 20 のように作成できますが、これらの実際の値の型は実行時にしかわかりません。
またDataFrameの型は DataFrame であり、 RDD[T] のようにレコードの型パラメータを持たないため、カラムの名前や型などの情報をDataFrameの型情報から得ることができません。
このため、存在しないカラムへの参照や型の異なる関数の適用がある場合、それらをコンパイル時に検出できず、実行時エラーが発生してしまいます。

加えてDataFrameが扱える型や関数は限定されており、RDDのようにユーザ定義の型やそれに対する任意の関数を扱うことは難しくなっています。

RDD v.s. DataFrame

これまで見てきたように、RDDとDataFrameには次のように一長一短があります。

  • RDD
    • ✅ データ処理が型安全である
    • ✅ ユーザ定義の型を扱うことができる
    • ❌ パフォーマンスが低い
    • アドホックなクエリ処理が書きにくい
  • DataFrame
    • ✅ パフォーマンスが高い
    • SQLを用いてアドホックな処理が書ける
    • ❌ データ処理が型安全でない
    • ❌ 一部の基本的な型の値しか扱うことができない

このような違いのため、RDDとDataFrameは用途に応じて使い分ける必要がありました。
例えば時間がかかり繰り返し実行されるバッチ処理では型エラーを未然に防ぐためにRDDを用い、アドホックな分析クエリでは応答速度とクエリの書きやすさからDataFrameを用いる、といった要領です。

3. Dataset - RDDとDataFrameの長所を併せ持つコレクション

DatasetはいわばRDDとDataFrameの良い所どりを目指したAPIです。
プログラムのイメージは次のようになります。

// Spark 2.0.0
import org.apache.spark.sql.SQLContext

val sqc = new SQLContext(sc)

// {"name": name, "age": age, "country": country}
val ds = sqc.read.json("people.jsonl")

val result = ds
  .filter(ds("age") >= 20)
  .groupBy(ds("country"))
  .count

result.show
/*
+-----+--------+
|value|count(1)|
+-----+--------+
|   JP|       50|
|   US|      20|
+-----+--------+
*/

気づかれたかもしれませんが、Spark 2.0のDatasetはSpark 1.6のDataFrameと全く同じように扱うことができます。
当然Datasetに対してもDataFrameと同様にSQLを発行することが可能です。

ここで上のDatasetのデータ型に適合するcase classを作成し、 as によってDatasetに適用してみます。
すると、Datasetに対してRDDと同様のmapやreduceなどを実行することができます。

case class Person(name: String, age: Long, country: String) {
  def isAdult = age >= 20
}

val peopleDs = ds.as[Person]

val result = peopleDs
  .filter(_.isAdult)
  .groupByKey(_.country)
  .count

result.show
/*
+-----+--------+
|value|count(1)|
+-----+--------+
|   JP|       50|
|   US|      20|
+-----+--------+
*/

これは以下のような仕組みによって実現されています。

まず、Datasetの型は Dataset[T] であり、DataFrameは Dataset[Row] と等価です。
Row はカラムの名前や実際の値の型を自身の型情報としては持ちませんが、Row 型のオブジェクトはそのスキーマを属性として持つことができます。
Row のコレクションである Dataset[Row] も同様にスキーマを持つことができ、特にJSONなどから読み出されたDatasetには自動的にスキーマが付与されます。

// {"name": name, "age": age, "country": country}
val ds = sqc.read.json("people.jsonl")

ds.printSchema
/*
root
 |-- age: long (nullable = true)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)
*/

ここで Dataset[Row]スキーマに適合する型 U.as[U] として適用すると、 Dataset[U] 型のDatasetが得られます。

scala > val peopleDs = ds.as[Person]
res: Dataset[Person] = ... 

.as[U]U に元のDatasetのスキーマと同じカラム名・型のフィールドを持つクラスを指定した場合、Rowの各カラムが同名のフィールドにマッピングされます。
特にScalaではデータモデルをcase classで定義することが多いため、通常はUとしてcase classを用います。

as によって得られた Dataset[U] 型には、 RDD[U] と同様の型安全な操作を適用することができます。
さらに U の各フィールドの型が、DataFrameで扱えたような基本的な型である場合、レコードの内部表現はDataFrameと同様に最適化されるため、処理のオーバーヘッドはRDDに比べて大幅に小さくなります。

このようにDatasetはRDDの型安全性とDataFrameのパフォーマンスを持ち合わせており、これまでRDDおよびDataFrameを使い分けていたようなケースを統一的なAPIで記述することができます。

RDD, DataFrameからDatasetへの書き換え

ここまでSparkの3種類のAPIについて紹介しました。
ここからは、RDDまたはDataFrameを用いたコードをDatasetで書き直す際のポイントを簡単に紹介します。
これにより、既存のコードやノウハウをDatasetに応用しやすくなると思います。

DataFrameからDatasetへ

まずDataFrameですが、前述のようにDataseにはDataFrameとほぼ同じ処理が適用可能です。
加えてSpark 2.0では DataFrameDataset[Row] へのエイリアスとなっており、多くの場合は既存のコードを書き換えずにそのまま動作させることができます。

RDDからDatasetへ

次にRDDですが、こちらは多少の注意が必要です。

データの入力については、 SparkContext.textFile の代わりに SQLCotext.text を使用できます。

mapfilter などの処理の型定義は RDD[T]Dataset[T] で同じですが、集約処理については少し違いがあります。
例えば RDD[T]groupBygroupBy[K](f: (T) ⇒ K): RDD[(K, Iterable[T])] のように、キーと集約されたレコードのイテラブルのペアをレコードとするRDDを返します。
対して Dataset[T]groupByKeygroupByKey[K](func: (T) ⇒ K): KeyValueGroupedDataset[K, T] のように、 KeyValueGroupedDataset[K, T] という集約処理専用の型を持ちます。

またDatasetに対する処理が効率的に実行されるためには、レコードの各カラムの型がDataFrameでサポートされていたような基本型である必要があります。
対応する型については下記に記載されています。

Spark SQL and DataFrames - Spark 2.0.2 Documentation

おわりに

この記事ではSparkのRDD, DataFrame, Datasetの違いと、最も新しいAPIであるDatasetを使う際のポイントについて説明しました。
記事を書くにあたっては、下記の書籍や記事を参考にしました。

特に最後の記事は、Datasetの特徴をRDD,DataFrameと比較して理解するために非常に参考になりました。

今後は実際の利用事例についても発信していきたいと思います。