(注:このブログはもう更新していません)この日記は私的なものであり所属会社の見解とは無関係です。 GitHub: takahashikzn

[クラウド帳票エンジンDocurain]

zstd-jniはストリームではなくbyte[]で使え

まだGZIPで消耗してるの!?

というアオリでスタートする今日の記事です。

はい。もうみんな使ってますねZstandard。

facebook.github.io

gzipにほぼ全方面で勝っており、使わない理由がありません。

もちろんJavaでも使えます。JNIですが。

github.com

このzstd-jniですが、圧縮に関して

  • Java標準のストリーム形式(OutputStream)→少しずつ書き込む*1
  • byte[]を渡してbyte[]を返す(com.github.luben.zstd.Zstd#compress(byte[])あたり) → 一気に書き込む

の2つのインタフェースがあります。それぞれ用途は異なりますが、明らかに後者が速い。使い方次第ですが2倍以上。 ネイティブメソッド呼び出しのオーバーヘッドは大きい*2というわけです。

でも、byte[]のインタフェースだとギガバイトクラスのデータを圧縮する時、一旦メモリに全部置くんかい!って話になります。確かに。

というわけで簡易な便利クラスを作りました。何なのかはコードを読んでください。解説はしません。

import java.io.IOException;
import java.io.OutputStream;

import org.apache.commons.io.output.ByteArrayOutputStream;


public class LimitedBufferedOutputStream<T extends OutputStream>
    extends OutputStream {

    private final int limit;

    private final OutputStream sink;

    @FunctionalInterface
    public interface OpenStream<T extends OutputStream> {

        T open(OutputStream out) throws IOException;
    }

    private final OpenStream<T> openStream;

    @FunctionalInterface
    public interface FinishStream<T extends OutputStream> {

        void finish(ByteArrayOutputStream buf, OutputStream out) throws IOException;

        default void finish(final T out) throws IOException {}
    }

    private final FinishStream<T> finishStream;

    private ByteArrayOutputStream buf = new ByteArrayOutputStream();

    private T out;

    protected LimitedBufferedOutputStream(final OutputStream sink, final OpenStream<T> openStream,
        final FinishStream<T> finishStream, final int limit) {
        this.sink = sink;
        this.openStream = openStream;
        this.finishStream = finishStream;
        this.limit = limit;
    }

    public static <T extends OutputStream> LimitedBufferedOutputStream<T> of(final OutputStream sink,
        final OpenStream<T> openStream, final FinishStream<T> finishStream) {
        return of(sink, openStream, finishStream, 1024 * 1024 * 128);
    }

    public static <T extends OutputStream> LimitedBufferedOutputStream<T> of(final OutputStream sink,
        final OpenStream<T> openStream, final FinishStream<T> finishStream, final int limit) {
        return new LimitedBufferedOutputStream<>(sink, openStream, finishStream, limit);
    }

    @Override
    public void write(final int b) throws IOException {
        this.write(o -> o.write(b));
    }

    @Override
    public void write(final byte[] b) throws IOException {
        this.onDemandFinishBuffer(b.length);
        this.write(o -> o.write(b));
    }

    @Override
    public void write(final byte[] b, final int off, final int len) throws IOException {
        this.onDemandFinishBuffer(len);
        this.write(o -> o.write(b, off, len));
    }

    @FunctionalInterface
    private interface WriteOperation {

        void accept(OutputStream out) throws IOException;
    }

    private void write(final WriteOperation opr) throws IOException {
        if (this.out != null) {
            opr.accept(this.out);
            return;
        }

        opr.accept(this.buf);
        this.onDemandFinishBuffer(0);
    }

    private void onDemandFinishBuffer(final int wrote) throws IOException {
        if ((this.buf == null) || (this.buf.size() + wrote) <= this.limit) { return; }

        this.out = this.openStream.open(this.sink);
        this.buf.writeTo(this.out);
        this.buf = null;
    }

    @Override
    public void flush() throws IOException {
        if (this.out != null) {
            this.out.flush();
        }
    }

    private boolean finished;

    public void finish() throws IOException {
        if (this.finished) {
            return;
        } else {
            this.finished = true;
        }

        if (this.out != null) {
            this.finishStream.finish(this.out);
        } else {
            this.finishStream.finish(this.buf, this.sink);
            this.buf = null;
        }
    }

    @Override
    public void close() throws IOException {
        this.finish();

        if (this.out != null) {
            this.out.close();
        } else {
            this.sink.close();
        }
    }
}

使い方ですが、こんな感じです。

LimitedBufferedOutputStream.of(
    out,
    ZstdOutputStream::new,
    (b, o) -> o.write(ZSTD.compress(b.toByteArray()))
);

*1:zstd-jniの実装により一回に書き込める最大のサイズは約128Kb

*2:こんなブログをわざわざ見る方であれば言うまでもありませんが、ストリームのインタフェースだと、極端な話write毎にネイティブメソッド呼び出しになるため