SparkでネストしたJSONの処理をやっていく

はじめに

来年度以降SparkやHadoopといったことも使っていくことになると言われているので、Sparkの勉強をし始め ました。 今回はsparkをつかって実際にJSONのデータを処理していきたいと思います。 ちなみに僕はscalaに関しても初心者なので本を見ながら頑張っています。コップ本重宝します。

Scalaスケーラブルプログラミング第3版

Scalaスケーラブルプログラミング第3版

JSONの処理なんて当たり前だろって感じかもしれないですが、Spark NOOBなので割りといろいろ調べるの にも結構時間がかかりました。 もっといい方法とか参考にした方がいいって文献等有りましたら教えていただけると幸いです。

SparkでのJSON処理

SparkのdatasetAPIを利用すると割りと楽にJSONの処理が行えました。 Sparkのデータセットの概念等に関しては下の記事が参考になりました。 簡単に言うとこれまでのRDDとDataFrameのいいとこ取りをしたもののようです。

普通のJSONファイルに関しては、上述のDatasetAPIを用いて

val df = spark.read.json(jsonファイルのパス)

と言った感じにやっていくとうまいことパースしてくれます。 ただ、jsonのデータの中にはネストしたものや、ネスト構造が何故か文字列になって送られてきていたりするものなどがあります。 きちんとネストしているものに関しては上記の方法で自動的にパースしてくれます。

しかし、構造がおかしくなってしまったりするデータに関しては、RDDを使って処理していくのが良さそうです。 今回はネストしたデータに関してうまいことやっていく方法を書いていこうかと思っています。

複雑なJSONの処理をやっていく

さて、個々から実際にネストしたJSONに関してパースしていく処理を行っていくことにします。 例として以下のようなネストしたJSONファイルを考えていきます。

{
  "data":{
    "event": "hello", 
    "times": 12345
    }
  }
}

上記のようにきれいになっているjsonデータならそのまま

val df = spark.read.json(jsonファイルのパス)

とすれば上手にパースしてくれます。 アクセス方法に関しては、

df.select($"data.event")

と言ったようにすれば目的の情報にアクセスすることが可能です。

spark.read.json()ではうまくパースされない場合があります。 以下のようにjsonファイルのネスト部分が文字列として扱われてしまう場合にはうまくパースされないです。

{"data": '{"event": "hello", "time": 12345 }'}

実際にこういったデータのせいで結構ハマったのでメモしていきます。

こういった場合にはRDDを使っていくことになります。 RDDを用いたデータフレームの作成は主に3つのステップがあります。

  1. StructTypeをつかってスキーマの定義を行っていく
  2. オリジナルのRDDから、RDDのRowを作っていく
  3. createDataFrameメソッドを使ってスキーマを適用していく。

以上の3つが主なステップとなります。

スキーマの定義

まずは欲しい情報の構造を定義していきます。 こういうのって欲しいデータはdataの内部のものがほしいときが多いと思うのでeventとtimeの新しい テーブルを作ろうと思います。

import org.apache.spark.sql.types._

val schema = new StructType()
  .add("event", StringType)
  .add("time" , LongType)

スキーマは上記のようにして定義ができました。

RDDのRowの作成

次にJSONを読み込んでいきます。

val jsonStr = """{"data": '{"event": "hello", "time": 12345 }'}"""

val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)

ケースクラスを作っておくと名前でアクセスできて便利なのでケースクラスを作ります。

case class jsonData(data: String)
val jsonDF = df.as[jsonData]

as[T]を使うことで定義した型のデータセットが作れます。 RDD化するにはdataset.rddとすればいいです。 文字列のjsonをパースする必要があるのでjsonのパーサをインポートしておきます。

import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.Row

val parsedRDD = jsonDF.rdd.map{ x =>
  val jsonMap = parse(x.data).asInstanceOf[JObject].values
  Row(jsonMap("event"), jsonMap("time"))
}

スキーマを適用

最後にスキーマを以下のようにして適用します。こうすることでeventとTimeのテーブルを作ることができ ました。

val parsedDF = spark.createDataFrame(parsedRDD, schema)

parsedDF.show()を実行すると

+-----+-----+
|event| time|
+-----+-----+
|hello|12345|
+-----+-----+

というように構造化されたデータを取得できていることを確認できるかと思います。

今後

今回はスキーマを自分で定義していくやり方を見つけたのでソレを参考にやって行きました。 けどなかなかめんどくさいのでなんとか自動でスキーマも作っちゃうようなのないのかな…という感じ。 僕が実際に分析していたログではevent_typeの値によってeventの中のJSONの構造が変わるものだったの でスキーマに関しては、event_typeを絞ってスキーマをかたっぱしから定義していったがこの辺自動的に やってくれる便利な方法あったら知りたいのでこんなのあるよって感じだったら教えてください…

その他参考文献