Scalaのコレクションで遅延処理を行う
皆様おはようございます。梅雨もとっくに過ぎ、すっかり夏らしい空模様となりました。埼玉は毎日気温が高くて死にそうです・・・。
さて今回はScalaのコレクションネタです。
結論から先に書くと、Scalaのコレクション処理 (map
, flatMap
, filter
等) は、 (Stream
型を除いて) デフォルトでは遅延的に処理されません。遅延させたい場合はビューを使いましょう。これでこの記事の役割は9割終えてしまったのですが、備忘録も兼ねて詳細のメモを記したいと思います。
早速ですが、次のコードを見て下さい:
List(1, 2, 3)
.map { x =>
println(s"$x is doubled")
x * 2
}
.take(2)
.foreach(println)
上記のコードは次のように出力されます:
1 is doubled
2 is doubled
3 is doubled
2
4
map
はコレクションの要素に対して関数を適用させます。
上記の例では、 Int
型の要素を2倍にした値を返しています。
※本来であれば println
の様な副作用のある処理を内部に書くのは好ましく有りませんが、検証用にここでは呼び出しています。
その後、 .take(2)
で先頭から2要素だけ取得しています。
さて、改めてコンソールに目を向けてみると、 take(2)
が処理される前に、全件に対して最初の map
が実行されているのが分かります。
何が問題だったか
恐らく、大抵の用途では上記の仕様で困る事はほとんど無いかと思います。今回は、とあるプロジェクトでRedisクラスタを使っていた時に問題が発覚しました。
そのプロジェクトでは、Redissonと言うフレームワークを使ってRedisクラスタとのやり取りを行っていました。
ワークロードとしては、特定のパターンにマッチするキーのデータを取得すると言う物で、実装は次のような感じでした:
val redisson: org.redisson.api.RedissonClient = ???
val data: Iterable[String] = redisson
.getKeys
.getKeysByPattern("FOO:*") // <-- Iterable[String]
.asScala
.flatMap { key =>
Option(
// 取得したキーでRedisにGETコマンドを実行. 返り値はnullableなのでOption + flatMap
redisson
.getBucket[String](
key,
org.redisson.client.codec.StringCodec.INSTANCE
)
.get()
)
}
.take(30_000)
尚、ユースケースをもう少し詳しく説明すると、このワークロードは3分に1度だけ実行する、 FOO:*
にマッチするRedisレコードを最大で3万件取得して処理すると言う物になります。当該レコードは180秒強のTTLが設定されている為、あぶれたレコードは次回のスケジュール前に破棄されます。
しばらく運用していると、ワークロードの処理時間が伸びて終わらなくなってしまいました。(タイムアウトを設定している為強制終了となっていました)
コードを分析してみた所、flatMapの部分から先に進んでいない様でした。take(30_000)
を言わばSQLのLIMIT句の様に期待して記述していたのですが、冒頭の記したとおりScalaのコレクションはデフォルトで中間コレクションを都度作成する為、ヒットした10万件超に対して flatMap
が実行されていたのが原因だったと言う訳です。
Scalaの公式ドキュメントを読んでみると
Collections | ビューの項目を見ると、
全ての変換演算子を遅延実装している Stream を除いて、Scala のコレクションは全ての変換演算子をデフォルトで正格法で実装している。しかし、コレクションのビューにより、体系的に全てのコレクションを遅延したものに変え、また逆に戻すことができる。ビュー (view) は特殊なコレクションの一種で、何らかのコレクションに基づいているが全ての変換演算子を遅延実装している。
と書かれています。 正格法 (Strict) では、コレクションの処理 (map
, flatMap
, filter
等) を行う度に中間結果のコレクションが生成されると言う事を意味します。今回のケースで言うと、10万件の要素を持つコレクションに対してmapやflatMapを適用する場合は、常にこの10万件に対して処理が行われ、次の処理にデータが渡される事になります。
そこで、ドキュメントにも記載されている通り、次のように .view
を間に挟む事でコレクション処理を遅延させる事ができます:
List(1, 2, 3)
.view
.map { x =>
println(s"$x is doubled")
x * 2
}
.take(2)
.foreach(println)
1 is doubled
2
2 is doubled
4
結果を見ての通り、 take(2)
で取得した2件に対してmapの関数が適用されました。その後、先程のRedissonのプロジェクトにも同様にviewを適用してみます:
val data: Iterable[String] = redisson
.getKeys
.getKeysByPattern("FOO:*") // <-- Iterable[String]
.asScala
.view // <-- View[String]
.flatMap { key => // このflatMapの処理は後続の .take(30_000) が実行されるまで遅延される
Option(
redisson
.getBucket[String](
key,
org.redisson.client.codec.StringCodec.INSTANCE
)
.get()
)
}
.take(30_000)
コードにし示した通り、 Iterable.view
を実行するとコレクションのビューが返り、以降のコレクション処理はいくつチェーンしても、実行が実際に元のコレクションを戻すまで遅延されます。
この他にも fold
や reduce
, foreach
等を実行した場合も同様に遅延されていた処理が実行されます。また、 force
を実行する事でコレクションを単に戻すだけも可能です。
使い分け
これまたビューのドキュメントに指針が書かれています。
性能を比較すると遅延コレクションが常に正格コレクションに勝るとは限らないというものがある。コレクションのサイズが小さい場合、ビュー内でクロージャを作成し適用するためのオーバーヘッドが、中間結果のためのデータ構造を回避することによる利得を上回ってしまうことが多いのだ。
(中略)
ビューの使用は二つのシナリオに限るべきだ。一つは、ビューの適用を副作用を伴わない純粋関数型のコードに限ること。もしくは、明示的に全ての変更が行われる可変コレクションに適用することだ。避けたほうがいいのは、ビューと新たなコレクションを作成しつつ副作用を伴う演算を混合することだ。
今回のようにコレクションの数が巨大になり得る場合は使った方が良さそうです。
余談
余談ですが筆者はScalaでSparkを扱う事が多い(Scala力自体は低い)ので、この辺の遅延処理はデフォルトで行われる物と誤解していていました。今回使っているビューは、最終的なコレクション結果が必要になるまで中間処理は全て遅延されると言う挙動が、Sparkのコレクション処理に近くて分かりやすいと感じました。
コメントを残す