Posted by & filed under 未分類.

この記事は、2016 Java Advent calendarの7日目の記事である。

Java 8から利用できるStream APIについて。
宣言的に処理を記述出来るので、ハンマーを持つとなんとやらで、仕事においても、なにかにつけてStreamで書くという子供のようなことをしていた。

文字列のjoinするために Stream.of(array).collect(Collectors.joining(“, “)); などと書いてみたり
mapの戻り値でnew Object[] {}を使い疑似Tuple化して次の層に流したりといった具合である。

この記事は、このようにどこかしこをStreamというハンマーで叩きまくった結果、得た知見について紹介するものである。

BufferdReader.lines()からparallelで分散し、遅い出力先に吐く

最初にハマったのは、BufferedReaderのlinesを使い、並列処理をしながらMongoDBを更新するというバッチを書いたときだった。
mapで行データから更新メッセージを作成しforEachでMongoClientに食わせるという流れにしたところ、ガンガンヒープを食ってOOMエラーで落ちるのである。

当時は結局原因がわからなかったため、普通のループに書き直して終わらせたが、もしかすると、内部にあるキューが溢れるといった事象が起っているのではないかと想像した。

おおっ、これはネタとしては十分と思い、今日サンプルコードを実装したのが以下となる。
このコードは、mecab-neologd で配布されている辞書のxzファイルからダラダラ文字数を拾うというものである。

先に結論を言うと、そんなことは無いということだった。(すみません)

以下のconsole.log出力にあるInとOutの数の差がちょうどForkJoinPoolの数になっているので、map処理のところで正しくブロックされていることが伺える。このため、別の箇所でオブジェクトが正しく破棄されていなかったことが原因と思われる。


$ java -Xmx140M -jar target/sample-0.0.1-SNAPSHOT.jar mecab-ipadic-neologd-master/seed/mecab-user-dict-seed.20161205.csv.xz
01:04:44.734 [pool-1-thread-1] INFO sample.SampleMain – In: 405, Out: 389, Count: 29736, Total: 134217728, Free: 33947360
01:04:47.731 [pool-1-thread-1] INFO sample.SampleMain – In: 873, Out: 857, Count: 64788, Total: 134217728, Free: 33947360
01:04:50.728 [pool-1-thread-1] INFO sample.SampleMain – In: 1316, Out: 1300, Count: 96759, Total: 134217728, Free: 33947360
01:04:53.732 [pool-1-thread-1] INFO sample.SampleMain – In: 1788, Out: 1772, Count: 131953, Total: 134217728, Free: 33947360
01:04:56.731 [pool-1-thread-1] INFO sample.SampleMain – In: 2242, Out: 2226, Count: 165813, Total: 134217728, Free: 33947360
01:04:59.732 [pool-1-thread-1] INFO sample.SampleMain – In: 2698, Out: 2682, Count: 199371, Total: 134217728, Free: 33947360
01:05:02.728 [pool-1-thread-1] INFO sample.SampleMain – In: 3155, Out: 3139, Count: 234133, Total: 134217728, Free: 33947360
01:05:05.729 [pool-1-thread-1] INFO sample.SampleMain – In: 3633, Out: 3617, Count: 270345, Total: 134217728, Free: 33947360
01:05:08.728 [pool-1-thread-1] INFO sample.SampleMain – In: 4083, Out: 4067, Count: 303688, Total: 134217728, Free: 33947360
01:05:11.728 [pool-1-thread-1] INFO sample.SampleMain – In: 4541, Out: 4525, Count: 339285, Total: 134217728, Free: 33803576
01:05:14.728 [pool-1-thread-1] INFO sample.SampleMain – In: 5009, Out: 4993, Count: 374753, Total: 134217728, Free: 33718512
01:05:17.730 [pool-1-thread-1] INFO sample.SampleMain – In: 5491, Out: 5475, Count: 410735, Total: 134217728, Free: 33718512
01:05:20.730 [pool-1-thread-1] INFO sample.SampleMain – In: 5955, Out: 5939, Count: 446729, Total: 134217728, Free: 33718512
01:05:23.730 [pool-1-thread-1] INFO sample.SampleMain – In: 6414, Out: 6398, Count: 482890, Total: 134217728, Free: 33718512
01:05:26.728 [pool-1-thread-1] INFO sample.SampleMain – In: 6866, Out: 6850, Count: 518372, Total: 134217728, Free: 33718512

view raw

console.log

hosted with ❤ by GitHub


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sample</groupId>
<artifactId>sample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>sample.SampleMain</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.7</version>
</dependency>
</dependencies>
</project>

view raw

pom.xml

hosted with ❤ by GitHub


package sample;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tukaani.xz.XZInputStream;
public class SampleMain {
static Logger logger = LoggerFactory.getLogger(SampleMain.class);
static LongAdder in = new LongAdder();
static LongAdder out = new LongAdder();
static LongAdder count = new LongAdder();
static Random rnd = new Random();
public static void main(String[] args) throws Exception {
File list = new File(args[0]); // mecab-ipadic-neologd-master/seed/neologd-adjective-exp-dict-seed.20151126.csv.xzとか入れてね
ScheduledExecutorService svc = Executors.newScheduledThreadPool(1);
try {
svc.scheduleAtFixedRate(() -> {
Runtime runtime = Runtime.getRuntime();
logger.info("In: {}, Out: {}, Count: {}, Total: {}, Free: {}", in, out, count, runtime.totalMemory(), runtime.freeMemory());
}, 3, 3, TimeUnit.SECONDS);
Pattern DELIM = Pattern.compile("¥¥s*");
ForkJoinPool executor = new ForkJoinPool(16);
executor.submit(() -> {
try (InputStream xzin = new XZInputStream(Files.newInputStream(list.toPath()))) {
new BufferedReader(new InputStreamReader(xzin, "UTF-8")).lines()
.parallel().unordered()
.map(s -> {
in.increment();
return DELIM.matcher(s).replaceAll(" ");
})
.forEach(SampleMain::verySlowFunction);
;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).get();
} finally {
svc.shutdown();
}
}
public static void verySlowFunction(String value) {
count.add(value.length());
try {
TimeUnit.MILLISECONDS.sleep(rnd.nextInt(100));
} catch (InterruptedException e) {
// do nothing
}
out.increment();
}
}

view raw

SampleMain.java

hosted with ❤ by GitHub

デフォルトForkJoinPoolを回避する

StreamのparallelはデフォルトのForkJoinPoolを使うようになっている。デフォルトのForkJoinPoolはCPUの個数だけしか同時実行しないため、マルチスレッド環境でparallelのタスクを複数並列で実行すると、思った時間に終わらないことがある。
(そもそもの話、マルチスレッド下でparallelStreamを使うこと自体避けるべきである)

とにかくなんとかしたい場合、新しいForkJoinPoolを作成し、submitすることで、指定したPoolのThreadを使って実行するようになる。上記のコードでも使っている。

参考: http://www.slideshare.net/dgomezg/parallel-streams-en-java-8

おまけ 自己満 collector

章が3つないとバランスが悪いので、思いつきで自己満collectorを作った。下に実行結果を貼っているが、まったくメリットが無いので、ふつうにforEachを使う方が良い。


$ java -jar target/sample2-1.0-SNAPSHOT.jar NORMAL ../sample/mecab-ipadic-neologd-master/seed/neologd-adverb-dict-seed.20150623.csv.xz
Documents: 139792
Finished: 3.584 s
$ java -jar target/sample2-1.0-SNAPSHOT.jar ABNORMAL_COLLECT ../sample/mecab-ipadic-neologd-master/seed/neologd-adverb-dict-seed.20150623.csv.xz
Documents: 139792
Finished: 4.527 s
$ java -jar target/sample2-1.0-SNAPSHOT.jar NORMAL ../sample/mecab-ipadic-neologd-master/seed/neologd-adjective-exp-dict-seed.20151126.csv.xz
Documents: 1051146
Finished: 14.65 s
$ java -jar target/sample2-1.0-SNAPSHOT.jar ABNORMAL_COLLECT ../sample/mecab-ipadic-neologd-master/seed/neologd-adjective-exp-dict-seed.20151126.csv.xz
Exception in thread "main" java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:510)
at jp.underthetree.SampleApp$Engine$2.create(SampleApp.java:63)
at jp.underthetree.SampleApp.main(SampleApp.java:24)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.lucene.store.RAMFile.newBuffer(RAMFile.java:78)
at org.apache.lucene.store.RAMFile.addBuffer(RAMFile.java:51)
at org.apache.lucene.store.RAMOutputStream.switchCurrentBuffer(RAMOutputStream.java:164)
at org.apache.lucene.store.RAMOutputStream.writeBytes(RAMOutputStream.java:150)
at org.apache.lucene.store.DataOutput.copyBytes(DataOutput.java:278)
at org.apache.lucene.store.Directory.copyFrom(Directory.java:179)
at org.apache.lucene.store.LockValidatingDirectoryWrapper.copyFrom(LockValidatingDirectoryWrapper.java:50)
at org.apache.lucene.index.IndexWriter.copySegmentAsIs(IndexWriter.java:2886)
at org.apache.lucene.index.IndexWriter.addIndexes(IndexWriter.java:2660)
at jp.underthetree.SampleApp$Engine$2.lambda$create$3(SampleApp.java:79)
at jp.underthetree.SampleApp$Engine$2$$Lambda$4/1128032093.accept(Unknown Source)
at java.util.stream.ReduceOps$4ReducingSink.combine(ReduceOps.java:225)
at java.util.stream.ReduceOps$4ReducingSink.combine(ReduceOps.java:211)
at java.util.stream.ReduceOps$ReduceTask.onCompletion(ReduceOps.java:754)
at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577)
at java.util.stream.AbstractTask.compute(AbstractTask.java:317)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

view raw

console.log

hosted with ❤ by GitHub


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jp.underthetree</groupId>
<artifactId>sample2</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>sample2</name>
<url>http://maven.apache.org</url>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>jp.underthetree.Sample2Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>6.3.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
</dependencies>
</project>

view raw

pom.xml

hosted with ❤ by GitHub


package jp.underthetree;
import java.io.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.tukaani.xz.XZInputStream;
import com.google.common.base.Stopwatch;
public class Sample2Main {
public static void main(String[] args) throws Exception {
Engine engine = Engine.valueOf(args[0]);
File input = new File(args[1]);
Stopwatch sw = Stopwatch.createStarted();
try (InputStream sin = new XZInputStream(new FileInputStream(input))) {
DirectoryReader reader = engine.create(sin);
System.out.println("Documents: " + reader.maxDoc());
}
System.out.println("Finished: " + sw);
}
static enum Engine {
NORMAL {
@Override
DirectoryReader create(InputStream in) {
Directory dir = new RAMDirectory();
try {
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig());
new BufferedReader(new InputStreamReader(in)).lines().forEach(s -> {
Document doc = new Document();
doc.add(new StringField("row", s, Store.YES));
try {
writer.addDocument(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
writer.commit();
writer.close();
return DirectoryReader.open(dir);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
},
ABNORMAL_COLLECT {
@Override
DirectoryReader create(InputStream in) {
try {
IndexWriter w = new BufferedReader(new InputStreamReader(in)).lines().parallel()
.unordered().map(s -> {
Document doc = new Document();
doc.add(new StringField("row", s, Store.YES));
return doc;
}).collect(() -> {
try {
return new IndexWriter(new RAMDirectory(), new IndexWriterConfig());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, (wri, rec) -> {
try {
wri.addDocument(rec);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, (wri1, wri2) -> {
try {
wri2.commit();
wri2.close();
wri1.addIndexes(wri2.getDirectory());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
w.commit();
w.close();
return DirectoryReader.open(w.getDirectory());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
};
abstract DirectoryReader create(InputStream in);
}
}

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です