この記事は、2016 Java Advent calendarの7日目の記事である。
Java 8から利用できるStream APIについて。
宣言的に処理を記述出来るので、ハンマーを持つとなんとやらで、仕事においても、なにかにつけてStreamで書くという子供のようなことをしていた。
文字列のjoinするために Stream.of(array).collect(Collectors.joining(“, “)); などと書いてみたり
mapの戻り値でnew Object[] {}を使い疑似Tuple化して次の層に流したりといった具合である。
この記事は、このようにどこかしこをStreamというハンマーで叩きまくった結果、得た知見について紹介するものである。
BufferdReader.lines()からparallelで分散し、遅い出力先に吐く
最初にハマったのは、BufferedReaderのlinesを使い、並列処理をしながらMongoDBを更新するというバッチを書いたときだった。
mapで行データから更新メッセージを作成しforEachでMongoClientに食わせるという流れにしたところ、ガンガンヒープを食ってOOMエラーで落ちるのである。
当時は結局原因がわからなかったため、普通のループに書き直して終わらせたが、もしかすると、内部にあるキューが溢れるといった事象が起っているのではないかと想像した。
おおっ、これはネタとしては十分と思い、今日サンプルコードを実装したのが以下となる。
このコードは、mecab-neologd で配布されている辞書のxzファイルからダラダラ文字数を拾うというものである。
先に結論を言うと、そんなことは無いということだった。(すみません)
以下のconsole.log出力にあるInとOutの数の差がちょうどForkJoinPoolの数になっているので、map処理のところで正しくブロックされていることが伺える。このため、別の箇所でオブジェクトが正しく破棄されていなかったことが原因と思われる。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ java -Xmx140M -jar target/sample-0.0.1-SNAPSHOT.jar mecab-ipadic-neologd-master/seed/mecab-user-dict-seed.20161205.csv.xz | |
01:04:44.734 [pool-1-thread-1] INFO sample.SampleMain – In: 405, Out: 389, Count: 29736, Total: 134217728, Free: 33947360 | |
01:04:47.731 [pool-1-thread-1] INFO sample.SampleMain – In: 873, Out: 857, Count: 64788, Total: 134217728, Free: 33947360 | |
01:04:50.728 [pool-1-thread-1] INFO sample.SampleMain – In: 1316, Out: 1300, Count: 96759, Total: 134217728, Free: 33947360 | |
01:04:53.732 [pool-1-thread-1] INFO sample.SampleMain – In: 1788, Out: 1772, Count: 131953, Total: 134217728, Free: 33947360 | |
01:04:56.731 [pool-1-thread-1] INFO sample.SampleMain – In: 2242, Out: 2226, Count: 165813, Total: 134217728, Free: 33947360 | |
01:04:59.732 [pool-1-thread-1] INFO sample.SampleMain – In: 2698, Out: 2682, Count: 199371, Total: 134217728, Free: 33947360 | |
01:05:02.728 [pool-1-thread-1] INFO sample.SampleMain – In: 3155, Out: 3139, Count: 234133, Total: 134217728, Free: 33947360 | |
01:05:05.729 [pool-1-thread-1] INFO sample.SampleMain – In: 3633, Out: 3617, Count: 270345, Total: 134217728, Free: 33947360 | |
01:05:08.728 [pool-1-thread-1] INFO sample.SampleMain – In: 4083, Out: 4067, Count: 303688, Total: 134217728, Free: 33947360 | |
01:05:11.728 [pool-1-thread-1] INFO sample.SampleMain – In: 4541, Out: 4525, Count: 339285, Total: 134217728, Free: 33803576 | |
01:05:14.728 [pool-1-thread-1] INFO sample.SampleMain – In: 5009, Out: 4993, Count: 374753, Total: 134217728, Free: 33718512 | |
01:05:17.730 [pool-1-thread-1] INFO sample.SampleMain – In: 5491, Out: 5475, Count: 410735, Total: 134217728, Free: 33718512 | |
01:05:20.730 [pool-1-thread-1] INFO sample.SampleMain – In: 5955, Out: 5939, Count: 446729, Total: 134217728, Free: 33718512 | |
01:05:23.730 [pool-1-thread-1] INFO sample.SampleMain – In: 6414, Out: 6398, Count: 482890, Total: 134217728, Free: 33718512 | |
01:05:26.728 [pool-1-thread-1] INFO sample.SampleMain – In: 6866, Out: 6850, Count: 518372, Total: 134217728, Free: 33718512 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>sample</groupId> | |
<artifactId>sample</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<build> | |
<sourceDirectory>src</sourceDirectory> | |
<plugins> | |
<plugin> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.5.1</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-shade-plugin</artifactId> | |
<version>2.4.3</version> | |
<executions> | |
<execution> | |
<phase>package</phase> | |
<goals> | |
<goal>shade</goal> | |
</goals> | |
<configuration> | |
<transformers> | |
<transformer | |
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> | |
<mainClass>sample.SampleMain</mainClass> | |
</transformer> | |
</transformers> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
<properties> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.tukaani</groupId> | |
<artifactId>xz</artifactId> | |
<version>1.6</version> | |
</dependency> | |
<dependency> | |
<groupId>ch.qos.logback</groupId> | |
<artifactId>logback-classic</artifactId> | |
<version>1.1.7</version> | |
</dependency> | |
</dependencies> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample; | |
import java.io.BufferedReader; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.io.UncheckedIOException; | |
import java.nio.file.Files; | |
import java.util.Random; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ForkJoinPool; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.LongAdder; | |
import java.util.regex.Pattern; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.tukaani.xz.XZInputStream; | |
public class SampleMain { | |
static Logger logger = LoggerFactory.getLogger(SampleMain.class); | |
static LongAdder in = new LongAdder(); | |
static LongAdder out = new LongAdder(); | |
static LongAdder count = new LongAdder(); | |
static Random rnd = new Random(); | |
public static void main(String[] args) throws Exception { | |
File list = new File(args[0]); // mecab-ipadic-neologd-master/seed/neologd-adjective-exp-dict-seed.20151126.csv.xzとか入れてね | |
ScheduledExecutorService svc = Executors.newScheduledThreadPool(1); | |
try { | |
svc.scheduleAtFixedRate(() -> { | |
Runtime runtime = Runtime.getRuntime(); | |
logger.info("In: {}, Out: {}, Count: {}, Total: {}, Free: {}", in, out, count, runtime.totalMemory(), runtime.freeMemory()); | |
}, 3, 3, TimeUnit.SECONDS); | |
Pattern DELIM = Pattern.compile("¥¥s*"); | |
ForkJoinPool executor = new ForkJoinPool(16); | |
executor.submit(() -> { | |
try (InputStream xzin = new XZInputStream(Files.newInputStream(list.toPath()))) { | |
new BufferedReader(new InputStreamReader(xzin, "UTF-8")).lines() | |
.parallel().unordered() | |
.map(s -> { | |
in.increment(); | |
return DELIM.matcher(s).replaceAll(" "); | |
}) | |
.forEach(SampleMain::verySlowFunction); | |
; | |
} catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
}).get(); | |
} finally { | |
svc.shutdown(); | |
} | |
} | |
public static void verySlowFunction(String value) { | |
count.add(value.length()); | |
try { | |
TimeUnit.MILLISECONDS.sleep(rnd.nextInt(100)); | |
} catch (InterruptedException e) { | |
// do nothing | |
} | |
out.increment(); | |
} | |
} |
デフォルトForkJoinPoolを回避する
StreamのparallelはデフォルトのForkJoinPoolを使うようになっている。デフォルトのForkJoinPoolはCPUの個数だけしか同時実行しないため、マルチスレッド環境でparallelのタスクを複数並列で実行すると、思った時間に終わらないことがある。
(そもそもの話、マルチスレッド下でparallelStreamを使うこと自体避けるべきである)
とにかくなんとかしたい場合、新しいForkJoinPoolを作成し、submitすることで、指定したPoolのThreadを使って実行するようになる。上記のコードでも使っている。
参考: http://www.slideshare.net/dgomezg/parallel-streams-en-java-8
おまけ 自己満 collector
章が3つないとバランスが悪いので、思いつきで自己満collectorを作った。下に実行結果を貼っているが、まったくメリットが無いので、ふつうにforEachを使う方が良い。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ java -jar target/sample2-1.0-SNAPSHOT.jar NORMAL ../sample/mecab-ipadic-neologd-master/seed/neologd-adverb-dict-seed.20150623.csv.xz | |
Documents: 139792 | |
Finished: 3.584 s | |
$ java -jar target/sample2-1.0-SNAPSHOT.jar ABNORMAL_COLLECT ../sample/mecab-ipadic-neologd-master/seed/neologd-adverb-dict-seed.20150623.csv.xz | |
Documents: 139792 | |
Finished: 4.527 s | |
$ java -jar target/sample2-1.0-SNAPSHOT.jar NORMAL ../sample/mecab-ipadic-neologd-master/seed/neologd-adjective-exp-dict-seed.20151126.csv.xz | |
Documents: 1051146 | |
Finished: 14.65 s | |
$ java -jar target/sample2-1.0-SNAPSHOT.jar ABNORMAL_COLLECT ../sample/mecab-ipadic-neologd-master/seed/neologd-adjective-exp-dict-seed.20151126.csv.xz | |
Exception in thread "main" java.lang.OutOfMemoryError | |
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) | |
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) | |
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) | |
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) | |
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598) | |
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) | |
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) | |
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714) | |
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) | |
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:510) | |
at jp.underthetree.SampleApp$Engine$2.create(SampleApp.java:63) | |
at jp.underthetree.SampleApp.main(SampleApp.java:24) | |
Caused by: java.lang.OutOfMemoryError: Java heap space | |
at org.apache.lucene.store.RAMFile.newBuffer(RAMFile.java:78) | |
at org.apache.lucene.store.RAMFile.addBuffer(RAMFile.java:51) | |
at org.apache.lucene.store.RAMOutputStream.switchCurrentBuffer(RAMOutputStream.java:164) | |
at org.apache.lucene.store.RAMOutputStream.writeBytes(RAMOutputStream.java:150) | |
at org.apache.lucene.store.DataOutput.copyBytes(DataOutput.java:278) | |
at org.apache.lucene.store.Directory.copyFrom(Directory.java:179) | |
at org.apache.lucene.store.LockValidatingDirectoryWrapper.copyFrom(LockValidatingDirectoryWrapper.java:50) | |
at org.apache.lucene.index.IndexWriter.copySegmentAsIs(IndexWriter.java:2886) | |
at org.apache.lucene.index.IndexWriter.addIndexes(IndexWriter.java:2660) | |
at jp.underthetree.SampleApp$Engine$2.lambda$create$3(SampleApp.java:79) | |
at jp.underthetree.SampleApp$Engine$2$$Lambda$4/1128032093.accept(Unknown Source) | |
at java.util.stream.ReduceOps$4ReducingSink.combine(ReduceOps.java:225) | |
at java.util.stream.ReduceOps$4ReducingSink.combine(ReduceOps.java:211) | |
at java.util.stream.ReduceOps$ReduceTask.onCompletion(ReduceOps.java:754) | |
at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) | |
at java.util.stream.AbstractTask.compute(AbstractTask.java:317) | |
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) | |
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) | |
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) | |
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) | |
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>jp.underthetree</groupId> | |
<artifactId>sample2</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<packaging>jar</packaging> | |
<name>sample2</name> | |
<url>http://maven.apache.org</url> | |
<build> | |
<sourceDirectory>src</sourceDirectory> | |
<plugins> | |
<plugin> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.5.1</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-shade-plugin</artifactId> | |
<version>2.4.3</version> | |
<executions> | |
<execution> | |
<phase>package</phase> | |
<goals> | |
<goal>shade</goal> | |
</goals> | |
<configuration> | |
<transformers> | |
<transformer | |
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> | |
<mainClass>jp.underthetree.Sample2Main</mainClass> | |
</transformer> | |
</transformers> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
<properties> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.tukaani</groupId> | |
<artifactId>xz</artifactId> | |
<version>1.6</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.lucene</groupId> | |
<artifactId>lucene-core</artifactId> | |
<version>6.3.0</version> | |
</dependency> | |
<dependency> | |
<groupId>com.google.guava</groupId> | |
<artifactId>guava</artifactId> | |
<version>20.0</version> | |
</dependency> | |
</dependencies> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package jp.underthetree; | |
import java.io.*; | |
import org.apache.lucene.document.Document; | |
import org.apache.lucene.document.Field.Store; | |
import org.apache.lucene.document.StringField; | |
import org.apache.lucene.index.DirectoryReader; | |
import org.apache.lucene.index.IndexWriter; | |
import org.apache.lucene.index.IndexWriterConfig; | |
import org.apache.lucene.store.Directory; | |
import org.apache.lucene.store.RAMDirectory; | |
import org.tukaani.xz.XZInputStream; | |
import com.google.common.base.Stopwatch; | |
public class Sample2Main { | |
public static void main(String[] args) throws Exception { | |
Engine engine = Engine.valueOf(args[0]); | |
File input = new File(args[1]); | |
Stopwatch sw = Stopwatch.createStarted(); | |
try (InputStream sin = new XZInputStream(new FileInputStream(input))) { | |
DirectoryReader reader = engine.create(sin); | |
System.out.println("Documents: " + reader.maxDoc()); | |
} | |
System.out.println("Finished: " + sw); | |
} | |
static enum Engine { | |
NORMAL { | |
@Override | |
DirectoryReader create(InputStream in) { | |
Directory dir = new RAMDirectory(); | |
try { | |
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig()); | |
new BufferedReader(new InputStreamReader(in)).lines().forEach(s -> { | |
Document doc = new Document(); | |
doc.add(new StringField("row", s, Store.YES)); | |
try { | |
writer.addDocument(doc); | |
} catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
}); | |
writer.commit(); | |
writer.close(); | |
return DirectoryReader.open(dir); | |
} catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
} | |
}, | |
ABNORMAL_COLLECT { | |
@Override | |
DirectoryReader create(InputStream in) { | |
try { | |
IndexWriter w = new BufferedReader(new InputStreamReader(in)).lines().parallel() | |
.unordered().map(s -> { | |
Document doc = new Document(); | |
doc.add(new StringField("row", s, Store.YES)); | |
return doc; | |
}).collect(() -> { | |
try { | |
return new IndexWriter(new RAMDirectory(), new IndexWriterConfig()); | |
} catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
}, (wri, rec) -> { | |
try { | |
wri.addDocument(rec); | |
} catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
}, (wri1, wri2) -> { | |
try { | |
wri2.commit(); | |
wri2.close(); | |
wri1.addIndexes(wri2.getDirectory()); | |
} catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
}); | |
w.commit(); | |
w.close(); | |
return DirectoryReader.open(w.getDirectory()); | |
} catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
} | |
}; | |
abstract DirectoryReader create(InputStream in); | |
} | |
} |
コメントを残す