Posted by & filed under 未分類.

この記事は、2016 Java Advent calendarの7日目の記事である。

Java 8から利用できるStream APIについて。
宣言的に処理を記述出来るので、ハンマーを持つとなんとやらで、仕事においても、なにかにつけてStreamで書くという子供のようなことをしていた。

文字列のjoinするために Stream.of(array).collect(Collectors.joining(“, “)); などと書いてみたり
mapの戻り値でnew Object[] {}を使い疑似Tuple化して次の層に流したりといった具合である。

この記事は、このようにどこかしこをStreamというハンマーで叩きまくった結果、得た知見について紹介するものである。

BufferdReader.lines()からparallelで分散し、遅い出力先に吐く

最初にハマったのは、BufferedReaderのlinesを使い、並列処理をしながらMongoDBを更新するというバッチを書いたときだった。
mapで行データから更新メッセージを作成しforEachでMongoClientに食わせるという流れにしたところ、ガンガンヒープを食ってOOMエラーで落ちるのである。

当時は結局原因がわからなかったため、普通のループに書き直して終わらせたが、もしかすると、内部にあるキューが溢れるといった事象が起っているのではないかと想像した。

おおっ、これはネタとしては十分と思い、今日サンプルコードを実装したのが以下となる。
このコードは、mecab-neologd で配布されている辞書のxzファイルからダラダラ文字数を拾うというものである。

先に結論を言うと、そんなことは無いということだった。(すみません)

以下のconsole.log出力にあるInとOutの数の差がちょうどForkJoinPoolの数になっているので、map処理のところで正しくブロックされていることが伺える。このため、別の箇所でオブジェクトが正しく破棄されていなかったことが原因と思われる。

デフォルトForkJoinPoolを回避する

StreamのparallelはデフォルトのForkJoinPoolを使うようになっている。デフォルトのForkJoinPoolはCPUの個数だけしか同時実行しないため、マルチスレッド環境でparallelのタスクを複数並列で実行すると、思った時間に終わらないことがある。
(そもそもの話、マルチスレッド下でparallelStreamを使うこと自体避けるべきである)

とにかくなんとかしたい場合、新しいForkJoinPoolを作成し、submitすることで、指定したPoolのThreadを使って実行するようになる。上記のコードでも使っている。

参考: http://www.slideshare.net/dgomezg/parallel-streams-en-java-8

おまけ 自己満 collector

章が3つないとバランスが悪いので、思いつきで自己満collectorを作った。下に実行結果を貼っているが、まったくメリットが無いので、ふつうにforEachを使う方が良い。

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です