2012/12/16

大都会岡山 Advent Calendar 16 日目 - 岡山といえばパフェだよね

このエントリーをはてなブックマークに追加

このエントリは 大都会岡山 Advent Calendar 2012 の第 16 日目のエントリーです。

昨日は @razon さんの ウェルカム・トゥ・オクトプレス で、明日は Masakatsu Kodani さんです。

さて、東京に住む櫻庭にとって岡山というと、やはり後楽園とか倉敷の町並みとかを思い浮かべるわけです。ところが、大都会岡山はフルーツパフェの街でもあるのです!!!!

ということで、櫻庭が岡山で食べたパフェを紹介しようと思います。といっても、岡山を訪れたことは 2 回しかないので、4 つのパフェしか食べてないのですが ^ ^;;

カフェレストランオリビエ

はじめは JR 西日本が運営しているホテルグランヴィアのカフェのオリビエです。

岡山駅に直結なので、便利ですね。

ここで食べたのがイチゴとライチのパフェのリュネル。なかなか豪華です。

_DSC8579

_DSC8582

_DSC8606

_DSC8539

_DSC8570

関連ランキング:カフェ | 岡山駅前駅岡山駅西川緑道公園駅

Fruits-J

天満屋の地下にあるジューススタンドの Fruits-J でもパフェが食べられます。

パフェはちょっと安っぽいのですが、ジューススタンドなのでしかたないですね。

_DSC8311

_DSC8300

_DSC8317

関連ランキング:フルーツパーラー | 県庁通り駅郵便局前駅田町駅

Signe

岡山駅の地下街の岡山一番街にあるカフェの Signe。店の作りがちょっと不思議な感じですが、内装はモダンです。

ここはスイーツの種類はそれほど多くはないのですが、パフェは 2 種類。苺とガトーショコラのパフェを食べました。

_DSC9017

_DSC9085

_DSC9080

_DSC9021

_DSC9059

関連ランキング:カフェ | 岡山駅前駅岡山駅西川緑道公園駅

三宅商店

三宅商店は倉敷のカフェで、マスキングテープでも有名なところ。古民家をそのまま使った内装はなかなか風情があります。

今時、エアコンがないというのもすごいのですが、でもこの建物にはエアコンは似合わないので、いいかも。

櫻庭が食べたのは春の苺パフェです。

_DSC8898

_DSC8976

_DSC8867

_DSC8968

_DSC8878

_DSC8928

_DSC8945

関連ランキング:カフェ | 倉敷市駅倉敷駅

まだまだいったことがない店がいっぱいあるので、次にいった時はまた新しいパフェを開拓したいと思っています!!

2012/12/01

Java Advent Calendar 1 日目 - Project Lambda の遅延評価

このエントリーをはてなブックマークに追加

このエントリは Java Advent Calendar 2012 の第 1 日目です。

去年の Java Advent Calendar も第 1 日目を書いて、しかも Project Lambda についてでした。

今年も Project Lambda について書くわけですが、去年とはちょっと観点を変えようと思います。

今年の JavaOne で Java SE の一番のトピックといえば、やっぱり Project Lambda だったと思います。実際、Keynote でも Brian Goetz が登壇して Lambda の説明をしていますし、セッションも Lambda だけで 5 つもあったほどです。

実際に Brian Goetz の Lambda のセッションに出て思ったのは、Lambda の言語仕様はほぼ固まったということです。そして、開発の中心は Lambda の実装方法や、Lambda を使った API の方に移ってきています。

今年は櫻庭も JJUG で Project Lambda のハンズオンを行なったのですが、Project Lambda を使いこなすのは、やはり Lambda に対応した API を使いこなすことにあると感じました。

でも、その API についてはほとんど触れません。いちおう、ハンズオンから変化したことをあげておきます。

  • インタフェースのデフォルトメソッドの記述が変更
  • Stream インタフェースの導入
  • パラレル処理の実装

現在公開されている build 64 ではデフォルトメソッドの記述はまだ変更されていないのですが、Lambda の Mercurial ではすでに記述法が変更されています。

具体的には default の書く位置が変更になりました。

    public interface Foo {
        void foo() default { .... }
    }

         | 変更
         V

    public interface Foo {
        default void foo() { .... }
    }

もう一方の Stream インタフェースが Lambda の API の中心になるインタフェースです。今までは filter メソッドや map メソッドなどは Iterable インタフェースで定義されていたのですが、これらはみな Stream インタフェースで定義されるようになりました。

Iterable インタフェースに残っているのは forEach メソッドぐらいです。

Stream オブジェクトを取得するには Streamable インタフェースの stream メソッドを使用します。てっきり、Iterable インタフェースに定義されるのかと思ったら、新たにインタフェースが導入されていたので、ちょっと意外でした。

Streamable インタフェースのサブインタフェースとして Collection インタフェースや Queue インタフェースなどがあります。

ハンズオンの時は、パラレル処理は Array に対してしか実装されていなかったのですが、本格的に実装が行なわれています。


さて、今日の本題は Stream インタフェースです。

といっても Stream インタフェースの使い方ではありません。ではなくて、Stream インタフェースの実装についてです。

というのも、Stream インタフェースで提供されているメソッドは遅延評価されるからなのです。これは Scala の Stream と同じですね。

Java で遅延評価というのはあまり例がないと思うので、どうやって実装されているか調べて見たよというのが、この blog エントリなわけです。

だから、Lambda 式についてはまったく解説しませんww Lambda 式については、ハンズオンの資料を見てください。

遅延評価

注) コメント id:bleis-tift さんに指摘いただきましたが、ここでいっている遅延評価は正しくは遅延計算と呼ぶべきものです。この後の遅延評価と書かれている部分は遅延計算と読み直してください。とりえあず、このエントリではそのままにしておきますが、今後は安易に遅延評価という言葉を使用しないようにしようと思います。

例題として、整数のリストから偶数を取り出して標準出力に出力することを考えてみましょう。

Lambda 式と Stream インタフェースを使うと次のように書けます。

        List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5);

        // Stream オブジェクトの取得
        Stream<Integer> stream = nums.stream(); 

        // 偶数をフィルタリングしたStreamオブジェクトを生成
        Stream<Integer> stream2 = stream.filter(s -> s%2 == 0);

        // 標準出力に出力
        stream2.forEach(s -> System.out.println(s));

これは内部イテレータで書いているわけですが、あえて外部イテレータで書き直してみました。

        List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5);

        List<Integer> nums2 = new ArrayList<>();
        for (Integer s: nums) {
            if (s%2 == 0) {
                nums2.add(s);
            }
        }

        for (Integer s: nums2) {
            System.out.println(s);
        }

するとループが 2 回になってしまうわけです。こんなことをするぐらいだったら、下のように 1 回のループで書けばいいわけです。

        List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5);

        for (Integer s: nums) {
            if (s%2 == 0) {
                System.out.println(s);
            }
        }

でも、内部イテレータで書くのは、処理が独立に記述できることや、処理順序をイテレータに任せられるとか、パラレル処理にしやすいとか、いろいろと利点があるわけです。

とはいうものの、filter メソッドや map メソッドのようなバルクオペレーションを連ねてしまうと、どんどんループが増えていってしまいます。ループが増えるということはパフォーマンスが落ちるということです。

これじゃ、せっかくバルクオペレーションで書けたとしても、魅力半減です。

そこで登場するのが、遅延評価です。

遅延評価は変数を実際に使用する時に、変数を評価することですが、いまいち意味が分かりません。

先ほどのコードで考えてみましょう。遅延評価を行うことで filter メソッドでは、実際にはループを行っていません。

ループを行うのは forEach メソッドだけになります。

        List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5);

        // Stream オブジェクトの取得
        Stream<Integer> stream = nums.stream(); 

        // ループしない。遅延評価の対象
        Stream<Integer> stream2 = stream.filter(s -> s%2 == 0);

        // ループする。
        // s を出力するときにフィルタリングされるかどうかを評価する
        // s がフィルタリングされるのであれば、出力しない
        stream2.forEach(s -> System.out.println(s));

変数を使うときというのは、この場合 forEach メソッドで変数 s を出力するときになります。そこで forEach のループの時に、遡ってフィルタリングの処理も行われるのです。

この結果、遅延評価を行うことで、ループが 1 回で処理が完了することになります。

また、このことから、Stream インタフェースで定義しているすべてのメソッドが遅延評価を行うわけではないことも分かります。

では、どのメソッドが遅延評価され、どのメソッドが即時評価されるのでしょうか。

残念ながら、Stream インタフェースの Javadoc を見ただけでは、どれが遅延評価されるのか分かりません。なので、ソースを読んでチェックしてみました。遅延評価をされる主なメソッドを以下に示します。

  • concat
  • cumulate
  • filter
  • flatMap
  • limit
  • map
  • skip
  • sorted
  • sequential
  • tee
  • uniqueElement
  • unordered

逆に遅延評価されずに即時評価される主なメソッドも列挙しておきます。

  • allMatch
  • anyMatch
  • findAny
  • findFirst
  • fold
  • forEach
  • groupBy
  • noneMatch
  • reduce
  • reduceBy
  • toArray

基本的には戻り値が Stream インタフェースのメソッドが遅延評価されると覚えておけば大丈夫です。

遅延評価の実装

では遅延評価をどのように実装するのでしょうか。

とりあえず、Project Lambda の API のクラス構成から見ていきましょう。なお、Project Lambda で追加される API は、バルクオペレーション API (Bulk Operation API)と呼ばれているので、ここでもそう呼ぶことにします。

まずは、filter メソッドや map メソッドの引数として Lambda 式で記述されるインタフェースです。これらのインタフェースは、java.uitl.funtions パッケージに定義されています。

主なものをあげると

  • Block
  • Combiner
  • BinaryOperator
  • Mapper
  • Predicate

などがあります。たとえば、Block インタフェースは forEach メソッドの引数として使われます。

これに対し、これらのインタフェースで記述した処理を実行するクラス群が java.util.streams パッケージで定義されています。

Stream インタフェースも java.util.streams パッケージで提供されています。

Stream インタフェースを実装しているコンクリートが ValuePipeline クラスです。ValuePipeline クラスのスーパークラスは AbstractPipeline クラスなのですが、こちらは Stream インタフェースを実装していません。

また、Stream オブジェクトを生成するなどのユーティリティメソッドを定義しているのが Streams クラスです。

Streamable インタフェースのサブインタフェースの Collection インタフェースや Queue インタフェースが Streams クラスを使用して Stream オブジェクトを取得しています。

他のクラスは追々説明していきましょう。

 

さて、コードを追っていくとしても、やみくもに追っていってもしかたありません。そこで、次のコードの処理を見ていこうと思います。

       List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6);

        Stream<Integer> stream1 = nums.stream();
        Stream<Integer> stream2 = stream1.filter(s -> s%2 == 0);
        Stream<Double> stream3 = stream2.map(s -> s/2.0);
        stream3.forEach(s -> System.out.println(s));

filter メソッドと map メソッドという 2 つの遅延評価が行われるメソッドがあり、最後に即時評価が行われる forEach メソッドがあるというコードです。

Streamable#stream メソッド

まずは stream メソッドです。stream メソッドは Streamable インタフェースで定義されています。

Streamable インタフェースのサブインタフェースとして、Collection インタフェースなどがあります。Collection インタフェースのサブインタフェースにはここでも使用している List インタフェースや Set インタフェース、Queue インタフェースなどがあるので、これらのクラスはバルクオペレーションが可能です。

stream メソッドの実装はコンクリートクラスではなく、デフォルトメソッドを使用して Collection インタフェースに記述されています。

    Stream<E> stream() default {
        return Streams.stream(this, StreamOpFlags.IS_SIZED);
    }

前述したように、今公開されているビルドは default を書く位置が違うので注意してください。

stream メソッドは単に Streams.stream メソッドをコールしているだけでした。Streams.stream メソッドはオーバーロードされているのですが、第 1 引数が T で、T が T extends Sized & Iterable<U> の stream メソッドがコールされます。

Sized インタフェースも Project Lambda で導入されたインタフェースで、サイズが決まっているコレクションをあらわしています。

    public static<U, T extends Sized & Iterable<U>> Stream<U> stream(T entity, int flags) {
        return new ValuePipeline<>(new Spliterator.Sequential<U>() {
            @Override
            public Iterator<U> iterator() {
                return entity.iterator();
            }

            @Override
            public void forEach(Block<? super U> block) {
                entity.forEach(block);
            }

            @Override
            public int getSizeIfKnown() {
                return entity.size();
            }
        }, StreamOpFlags.IS_SIZED | flags);
    }

Streams クラスの stream メソッドでは ValuePipeline オブジェクトを生成して、返しています。前述したように、ValuePipeline クラスは Stream インタフェースの実装クラスになっています。

そして、ValuePipeline クラスのコンストラクタはスーパークラスの AbstractPipeline クラスのコンストラクタをコールしています。

    protected final AbstractPipeline<?, E_IN> upstream;
    protected final IntermediateOp<E_IN, E_OUT> op;
    protected final Spliterator<?> spliterator;
    protected final int depth;
    protected final int sourceFlags;
    protected final StreamShape shape;

    private Iterator<E_OUT> iterator;

    protected AbstractPipeline(Spliterator<?> spliterator, int sourceFlags, StreamShape shape) {
        this.upstream = null;
        this.op = null;
        this.spliterator = Objects.requireNonNull(spliterator);
        this.depth = 0;
        this.sourceFlags = sourceFlags;
        this.shape = shape;
    }

ここで注目しておいて欲しいのが、upstream というフィールドがあることです。upstream フィールドの型は AbstractPipeline クラスです。つまり、リスト構造的に AbstractPipeline オブジェクトが連なっていくことができるということです。

Stream#filter メソッド, Stream#map メソッド

では、次に filter メソッドと map メソッドです。この 2 つのメソッドは実質的にはほとんど変わりません。

filter メソッドと map メソッドを実装しているのは ValuePipeline クラスです。

    @Override
    public Stream<U> filter(Predicate<? super U> predicate) {
        return chainValue(new FilterOp<>(predicate));
    }

    @Override
    public <R> Stream<R> map(Mapper<? extends R, ? super U> mapper) {
        return chainValue(new MapOp<>(mapper));
    }

この chainValue メソッドをコールしているメソッドが遅延評価の対象になります。chainValue メソッドは AbstractPipeline クラスで実装されています。

    protected<E_NEXT> Stream<E_NEXT> chainValue(IntermediateOp<E_OUT, E_NEXT> op) {
        return new ValuePipeline<>(this, op);
    }

なんと、ValuePipeline オブジェクトを生成しているだけです。ただし、先ほどの ValuePipeline オブジェクトの生成とはコンストラクタの引数が違います。

    public ValuePipeline(AbstractPipeline<?, T> upstream, IntermediateOp<T, U> op) {
        super(upstream, op);
    }

第 1 引数が upstream という変数名になっています。ということは...

    protected AbstractPipeline(AbstractPipeline<?, E_IN> upstream, IntermediateOp<E_IN, E_OUT> op) {
        this.upstream = Objects.requireNonNull(upstream);
        this.op = Objects.requireNonNull(op);
        this.spliterator = upstream.spliterator;
        this.depth = upstream.depth + 1;
        this.sourceFlags = upstream.sourceFlags;
        this.shape = upstream.shape;
        assert upstream.getShape() == op.inputShape();
        assert (upstream.depth == 0) ^ (upstream.op != null);
    }

やはり AbstractPipeline オブジェクトの upstream フィールドに代入されています。先ほどの指摘したように、Stream オブジェクトがこのようにして、リスト構造的に津ならなっていくわけです。

もう 1 つ、spliterator フィールドに upstream フィールドの spliterator フィールドが代入されています。

とすると、遅延評価のメソッドをいくつ連ねても、spliterator フィールドは一番始めの stream オブジェクトを作った時の spliterator フィールドが参照されるということになります。

Stream#forEach メソッド

最後に forEach メソッドです。ValuePipeline クラスの forEach メソッドを次に示します。

    public void forEach(Block<? super U> block) {
        pipeline(ForEachOp.make(block));
    }

filter メソッドや、map メソッドは、実際に処理を行なうオブジェクトをラップするクラスのオブジェクトを生成していましたが、 forEach メソッドでは ForEachOp オブジェクトの生成ではなく、make メソッドをコールしています。

最後に Op がつくクラスは java.util.streams.op パッケージで定義されています。

    private final TerminalSink<T, Void> sink;
    private final StreamShape shape;

    protected ForEachOp(TerminalSink<T, Void> sink, StreamShape shape) {
        this.shape = Objects.requireNonNull(shape);
        this.sink = Objects.requireNonNull(sink);
    }

    public static<T> ForEachOp<T> make(final Block<? super T> block) {
        Objects.requireNonNull(block);
        return new ForEachOp<>(new TerminalSink<T, Void>() {
            @Override
            public void accept(T t) {
                block.apply(t);
            }

            @Override
            public Void getAndClearState() {
                return null;
            }
        }, StreamShape.VALUE);
    }

ForEachOp クラスの make メソッドを見てみると、結果的には ForEachOp オブジェクトを生成していることが分かります。この時に TerminalSink オブジェクトもいっしょに生成していることが分かります。

そして、先ほどの forEach メソッドに戻って、pipeline メソッドをコールしていることが分かります。

    public<R> R pipeline(TerminalOp<E_OUT, R> terminal) {
        assert getShape() == terminal.inputShape();
        return evaluate(terminal);
    }
 
    protected<R> R evaluate(TerminalOp<E_OUT, R> terminal) {
        // @@@ NYI If the source size estimate is small, don't bother going parallel
        if (StreamOpFlags.PARALLEL.isKnown(sourceFlags)) {
            return evaluateParallel(terminal);
        }
        else
            return evaluateSequential(terminal);
    }

はじめの assert 文はともかく、evaluate メソッドをコールしていることが分かります。なんとなく、それっぽいですね。

そして、evaluate メソッドではパラレルに処理するかどうかで処理を切り替えています。ここではシリアルに処理する方を見ていきましょう。

    protected <R> R evaluateSequential(TerminalOp<E_OUT, R> terminal) {
        return (R) terminal.evaluateSequential(new SequentialImplPipelineHelperSource());
    }

SequentialImplePipelineHelperSource クラスがなんなのかはとりあえずおいておいて、terminal の evaluateSequential メソッドをコールしていることが分かります。

terminal の型は TerminalOp インタフェースですが、ここでの実装クラスは ForEachOp クラスです。

    public <S> Void evaluateSequential(PipelineHelper<S, T> helper) {
        return helper.into(sink).getAndClearState();
    }

先ほどおいておいた SequentialImplePipelineHelperSource クラスの into メソッドをコールしているのでした。into メソッドの引数の sink は、先ほど ForEachOp クラスの make メソッドの中で作成した無名クラスのオブジェクトです。

SequentialImplePipelineHelperSource クラスは AbstractPipeline クラスの内部クラスになり、into メソッドを定義しているのはスーパークラスの SequentialImplePipelineHelper クラスです。で、それを見てみると、今度はさらにスーパークラスの AbstractPipelineHelper クラスの into メソッドがコールされていることが分かります。

    public <S extends Sink<P_OUT>> S into(S sink) {
            Sink<P_IN> wrappedSink = wrapSink(sink);
            wrappedSink.begin(spliterator.getSizeIfKnown());
            spliterator.forEach(wrappedSink);
            wrappedSink.end();
            return sink;
        }

        @Override
        public Sink<P_IN> wrapSink(Sink sink) {
            Objects.requireNonNull(sink);

            for (int i = upTo - 1; i >= from; i--) {
                sink = ops[i].wrapSink(
                        StreamOpFlags.combineStreamFlags(sourceFlags, opsFlags[i]),
                        sink);
            }
            return sink;
        }

いろいろと処理は行なわれていますが、重要なのは赤字の部分。つまり、spliterator の forEach メソッドをコールしていることが分かります。

この forEach メソッドでやっとループになります。

spliterator が、連なっている Stream オブジェクトのはじめの Stream オブジェクトに定義されているものだということを思いだしてください。

forEach メソッドでは Block インタフェースの apply メソッドをコールします。しかし、ここでは Sink クラスでラップしてあり、コールされるのは spliterator を定義した Stream オブジェクトの op フィールドの accept メソッドがコールされます。ここではそれがなんだったかというと、FilterOp クラスになるわけです。

FilterOp クラスの wrapSink メソッドを次に示します。

    public Sink wrapSink(int flags, final Sink sink) {
        Objects.requireNonNull(sink);
        return new Sink.ChainedValue(sink) {
            @Override
            public void accept(T t) {
                if (predicate.test(t))
                    downstream.accept(t);
            }
        };
    }

ようやく、filter メソッドの処理が出てきました。filter メソッドの引数は Predicate インタフェースのオブジェクトになり、Predicate インタフェースは test メソッドが定義されています。

この test メソッドがコールされるのが赤字の部分です。test メソッドの戻り値が true の場合、言いかえればフィルターされた結果で残る場合は downstream 変数が示している次の処理の accept メソッドをコールします。

こうすることで連なっている処理を行なっていくわけです。

同様に MapOp クラスの wrapSink メソッドを見てみましょう。

    public Sink<t> wrapSink(int flags, Sink sink) {
        return new Sink.ChainedValue<t>(sink) {
            @Override
            public void accept(T t) {
                downstream.accept(mapper.map(t));
            }
        };
    }

Stream インタフェースの map メソッドの引数の型は Mapper インタフェースになります。Mapper インタフェースも map メソッドを定義しており、この部分が Lambda 式で評価されるわけです。それが赤字で書いた map メソッドのコールになります。

map メソッドで 1 つの要素を他の型に変換するので、その後、再び連なる処理の accept メソッドをコールするというわけです。

このように Stream オブジェクトの連なりから、処理をつなげていくことで、ループが 1 回だけで処理が可能になっているわけです。

最後はちょっとはしょってしまいましたが、遅延評価の手法についてコードをおっていきました。遅延評価というだけであれば簡単ですけど、実装はなかなか大変そうです。

明日は @btnrouge さんです!