2022/12/20

JEP 437: Structured Concurrency

このエントリーをはてなブックマークに追加

このエントリーは Java Advent Calendar の20日目のエントリーです。

qiita.com

 

Java 19のJEP 425: Virtual Threadsの裏で、ひっそりと登場していたのが、JEP 428 Structured Concurrency です。Virtual ThreadはPreview JEPでしたが、Structured ConcurrencyはIncubator JEPです。

そして、Java 20では、Virtual ThreadsはSecond Previewとして JEP 436 になっています。同じようにStructured ConcurrencyもSecond Incubatorとして JEP 437 になりました。

JEP 428とJEP 437の違いはほとんどないので、このままいけば次のLTSであるJava 21で正式に導入されると思います。

 

動機

Structured Concurrencyは名前にConcurrencyとついていることから分かるように、並列/並行処理に関するAPIです。構造化した並列とはどういうことなのでしょう。

それを考えるために、ちょっとしたサンプルで考えてみます。

最近は、複数の外部のサービスを組み合わせて使うシステムが多くありますね。外部にあるということは、何らかのI/Oが発生して待ちがあるということです。

たとえば、HTTPで通信するWebサービスを2つ使用し、クエリ結果を組み合わせるようなサンプルを考えてみます。

HTTP通信が発生するので、逐次的に処理をしてしまうととても時間がかかってしまいます。そこで、クエリを非同期に行います。

すごい単純ですが、こんな感じ。

    String query(final String queryText) throws InterruptedException, IOException {
        var client = HttpClient.newHttpClient();
        var request = HttpRequest.newBuilder()
                .uri(URI.create(queryText))
                .build();
        var response = client.send(request, BodyHandlers.ofString());

        var status = response.statusCode();
        if (status / 100 != 2) {
            throw new IOException("HTTP Error: " + status);
        }

        return response.body();
    }

    record Result(String result1, String result2) {}

    Response complexQuery(final String queryText1, final String queryText2)
            throws InterruptedException, ExecutionException {

        try (var pool = Executors.newFixedThreadPool(2)) {
            Future<String> future1 = pool.submit(() -> query(queryText1));
            Future<String> future2 = pool.submit(() -> query(queryText2));

            String result1 = future1.get();
            String result2 = future2.get();

            return new Result(result1, result2);
        }
    }

TwoTaskクラスではHTTPのGETでクエリーを行いますが、それぞれを非同期に行っています。クエリーのURLが引数のqueryText1とqueryText2です。

これを実行すると、特に問題がなければ普通に動くはずです。

しかし、このコードには複数の問題があります。

  1. 1番目のクエリーで例外が発生した場合、complexQueryメソッドを抜けてしまうが、2番目のクエリーは実行したままになってしまう
  2. complexQueryメソッド実行中に割り込み(Thread.interrupt()メソッドがコール)されるとメソッドを抜けるが、クエリーは実行したままになってしまう
  3. 1番目のクエリーを実行中に、2番目のクエリーで例外が発生しても、1番目のクエリーが完了しない限り(future1.get()が戻らない限り)、2番目のクエリーの例外を扱うことができない

正常に動作していればいいのですが、例外や割り込みが発生した時に、それらが正しくサブタスクに伝播されないことに問題があります。

もちろん、割り込みが発生したらサブタスクにも割り込みしたり、あるサブタスクで例外が発生したら他のサブタスクをキャンセルするという処理を書くこともできます。とはいうものの、こういうボイラープレート的な記述が増えてくるのは避けたいところです。

そこで、サブタスクを構造的に扱い、例外や割り込みなどを容易に扱えるようにしたのが、JEP 437 Structured Concurrencyなのです。

 

Structured Concurrency

Structured Concurrencyで提供されるメインのクラスはStructuredTaskScopeクラスです。

JEP 437はIncubator APIなので、今のところパッケージはjdk.incubator.concurrentですが、正式なAPIになった時にはパッケージは変更になります。たぶん、java.util.concurrentに含まれると思いますが、あくまでも予想です。

StructuredTaskScopeクラスは、名前の通りスコープです。そのスコープの中で作られたサブタスクを構造化するという使い方をします。

たとえば、先ほどのサンプルのcomplexQueryメソッドをStructuredTaskScopeクラスで書き直したものが以下になります。

    Result complexQuery(final String queryText1, final String queryText2)
            throws InterruptedException, ExecutionException {

        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Future<String> future1 = scope.fork(() -> query(queryText1));
            Future<String> future2 = scope.fork(() -> query(queryText2));

	    scope.join();
	    scope.throwIfFailed();

            String result1 = future1.resultNow();
            String result2 = future2.resultNow();

            return new Result(result1, result2);
        }
    }

ここでは、StructuredTaskScopeクラスのサブクラスで内部クラスのShutdownOnFailureクラスを使用しています。

try-with-resources構文で記述されたブロックがStructuredTaskScopeクラスのスコープになるわけです。

このスコープの内部でサブタスクをforkし、サブタスクの完了を待つためにjoinしています。

その次のthrowIfFailedメソッドはShutdownOnFailureクラスのメソッドで、サブタスクで例外が発生した場合、他のサブクラスに伝播させタスクをキャンセルさせます。

ShutdownOnFailureというのはサブタスクで例外が発生した時に、サブタスクをキャンセルし、スコープをシャットダウンするためのクラスになります。

その後、サブタスクの結果を取得するために、getメソッドではなくresultNowメソッドを使用しています。Future.resultNowメソッドも新たに定義されたメソッドです。タスクが完了していれば結果を返し、キャンセルされた場合はキャンセルされた時点での結果を返します(結果がないこともあります)。

StructureTaskScopeクラスはVirtual Threadでの使用が前提になっており、デフォルトでVirtual Threadを使うようになっています。

なお、ここではforkしているのは2つのタスクだけですが、さらに多くのタスクをforkすることも可能です。

 

ところで、forkしてjoinするというAPIは他にもありましたね。

そう、Fork/Join Frameworkです。

前回のエントリーでも書きましたが、Fork/Join Frameworkは計算処理などI/O処理を含まないタスクがターゲットでした。一方で、Structured ConcurrencyはVirtual Threadを使うことからも分かるように、I/O処理を含むタスクにフォーカスしています。

この2つのAPIは用途に応じて使い分けるようにしましょう。

 

さて、このサンプルをコンパイルし、実行してみましょう。

Structured ConcurrencyはIncubator JEPなので、使用するにはオプションとして--enable-previewが必要です。コンパイル時に--enable-previewを使用するには--releaseも必要なので、こちらも指定します。

また、Incubatorのため、jdk.incubator.concurrentモジュールがデフォルトでは読み込まれません。そのために--add-modulesでモジュールを指定する必要があります。

このサンプルがTwoTaskクラスだったとした場合、次のようにコンパイル、実行を行います。

> javac --release 20 --enable-preview --add-modules jdk.incubator.concurrent TwoTask.java
警告:実験的なモジュールを使用しています: jdk.incubator.concurrent
警告1個

> java --enable-preview --add-modules jdk.incubator.concurrent TwoTask

せっかくなので、例外も出してみましょう。一方のクエリーだけ失敗するようなURLを使って実行してみると、単にExecutorServiceを使っていた場合、java.util.concurrent.ExecutionException例外がスローされるのに時間がかかります。

ところが、StructuredTaskScopeクラスを使った場合、すぐに例外がスローされます。これが例外が発生させた時に、他のサブタスクをキャンセルさせて、スコープをシャットダウンさせているためです。

 

ところで、StructuredTaskScopeクラスには、もう1つサブクラス兼内部クラスのStructuredTaskScope.ShutdownOnSuccessクラスがあります。

先ほどのShutdownOnFailureクラスは例外が出た場合はシャットダウンします。逆にいうと、例外が出ない場合は複数のサブタスクがすべて完了するまでjoinメソッドはブロックします。

これに対し、ShutdownOnSuccessクラスはサブタスクのうち、どれか1つが完了すれば、他のサブタスクはキャンセルして、シャットダウンします。

ShutdownOnFailureクラスはサブタスクのANDであるのに対し、ShutdownOnSuccessクラスはORになるとも言えますね。

たとえば、先ほどのcomplexQueryメソッドを早く完了したタスクの結果を返すように書きかえてみましょう。

    String complexQuery(final String queryText1, final String queryText2)
            throws InterruptedException, ExecutionException {

        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            Future<String> future1 = scope.fork(() -> query(queryText1));
            Future<String> future2 = scope.fork(() -> query(queryText2));

            scope.join();
            return scope.result();
        }
    }

ShutdownOnFailureクラスと異なり、ShutdownOnSuccessクラスはサブタスクの結果を扱うため、ジェネリクスでサブタスクの戻り値の型を指定します。

最も早く完了したサブタスクの処理結果を取得するのがresultメソッドです。

他はShutdownOnFailureクラスと同じです。

 

Structured Concurrencyでは他に、サブタスク間で共有できる値を扱うScopedValueクラスがあるのですが、これはまた別の機会に紹介します。

2022/12/09

Virtual Thread導入の背景 - Javaのマルチスレッドの歴史を振り返る

このエントリーをはてなブックマークに追加

このエントリーは Java Advent Calendar の9日目のエントリーです。

qiita.com

 

Virtual ThraedはJava 19でPreview (JEP 425)、Java 20でSecond Preview (JEP 436)となり、うまくいけば次のLTSであるJava 21で導入予定です。

パフォーマンスを考える時に、一般的にはスループットと応答性の2つがあります。スループットは単位時間あたりにどのくらいリクエストをさばけるか、応答性は処理のリクエストから結果が帰るまでの時間です。Virtual Threadのこの2者のうち、スループットを向上させるために導入されます。

では、なぜ今になってVirtual Threadが導入されるのかということを、歴史を振り返りながら考えてみるのがこのエントリーです。

 

いにしえの時代 - Java 1.0からJ2SE 1.4

Javaが発表されたのが1995年。その当時の代表的なCPUといえばIntel Pentiumです。Pentiumのリリースが1993年、Pentium Proが1995年、Pentium IIが1997年です。

つまり、その当時のPCはほとんどがシングルCPU、もちろんシングルコアです。

そんな時代にあって、Javaはマルチスレッドを標準の言語機能としたことは驚きです。当然ながら、その当時はコアは1つなのでパラレル処理ではなく、コンカレント処理になります。

 

Javaのスレッドは、OSのスレッドに対応します(初期のSolarisはちょっと違いましたが、J2SE 1.2でOSのスレッドを使うようになりました)。Javaのスレッドが複数あったとしても、CPUは1つなのでOSのスレッドは1つです。したがって、Javaのスレッドをすべて実行するにはスレッド切り替えが必要です。このスレッドの切り替えのことをコンテキストスイッチと呼びます。

このコンテキストスイッチは比較的重い処理なので、頻繁に行うと性能劣化の原因になります。

とはいうものの、複数のスレッドを実行するには何らかの実行スケジューリングが必要になります。スレッドのスケジューリングにはラウンドロビンやタイムシェアリングなどがあります。

Javaのそれは単純で、どのようにスレッドが実行されるかはJVMSには明記されていません。スレッドに優先度も設定できますが、優先度が高いからといって優先的に実行されるとは限りません。

また、やっかいだったのが、タスクを記述するためのRunnableインタフェースは返り値を返せないところです。そのため、処理の結果は複数のスレッドでアクセスできるように同期化したオブジェクトを用意する必要がありました。

同期化のために使用できたのがsynchronizedブロック(もしくはsynchronizedメソッド)です。

synchronizedブロックはモニタというロックを使用して同期化を行います。synchronizedメソッドの場合、ロックするオブジェクトは指定しませんが、thisがロック対象になります。

たとえば、スレッド間のやりとりによく使用されるブロッキングキューを作成してみましょう。

ブロッキングキューはキューがいっぱいの時に要素を追加しようとすると、要素が追加できるまで処理をブロックします。同様に要素を取り出そうとした時に要素がなければ、要素が追加されるまで処理をブロックするキューです。

Javaが標準で提供しているブロッキングキューのArrayBlockingQueueクラスなどはかなり複雑なので、ここではキューのサイズが1つのシンプルなブロッキングキューを作ってみます。

キューがいっぱいの時や、空の時に待ち状態にするにはwaitメソッドを使用します。そして、Object.wait状態にあるスレッドを起こすには、Object.notifyAllメソッドを使用します。notifyメソッドもあるのですが、notifyメソッドだと1つの目的のスレッドを起こせるとは限らないので、通常はすべてのスレッドを起こすnotifyAllメソッドを使用します。

public class SingleBlockingQueue<T> {
    private T x;

    public synchronized void push(T x) throws InterruptedException {
        while (this.x != null) {
            System.out.println("Wait - push: " + x);
            wait();
            System.out.println("Wake up - push");
        }

        System.out.println("NotifyAll - push: " + x);
        this.x = x;
        notifyAll();
    }

    public synchronized T pull() throws InterruptedException {
        while (this.x == null) {
            System.out.println("Wait - pull");
            wait();
            System.out.println("Wake up - pull");
        }

        T result = x;
        x = null;
        System.out.println("NotifyAll - pull: " + result);
        notifyAll();

        return result;
    }
}

notifyAllメソッドで起こされるのはすべてのスレッドなので、本当に起こされるべきスレッドではないこともあります。そこで、while文でチェックを毎回行っているわけです。

では、このSingleBlockingQueueクラスを使ってみましょう。

public class QueueTest {
    static SingleBlockingQueue<String> queue = new SingleBlockingQueue<>();

    public static void main(String[] args) throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    queue.push("A");
                    queue.push("B");
                    queue.push("C");
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    queue.pull();
                    queue.pull();
                    queue.pull();
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

これを実行すると次のようになります。

> java QueueTest
NotifyAll - push: A
Wait - push: B
NotifyAll - pull: A
Wait - pull
Wake up - push
NotifyAll - push: B
Wait - push: C
Wake up - pull
NotifyAll - pull: B
Wait - pull
Wake up - push
NotifyAll - push: C
Wake up - pull
NotifyAll - pull: C

Aをpushした後、Bをpushしようとしますが、すでに要素がキューにあるのでwaitします。すると、別スレッドがAをpullし、notifyAllメソッドをコールします。

キューに要素がなくなったので、waitしていたスレッドがnotifyAllで起き、Bをpushします。

といような流れで処理が進みます。

このように、synchronizedを使用するとアクセスできるスレッドを制限することができます。しかし、あるスレッドがsynchronizedブロックを実行している間、他のスレッドは処理待ち状態になってしまいます。

この当時は、想定しているスレッド数はたかがしれていました。しかし、同時に動作できるスレッド数が増えてくると、synchronizedブロックがボトルネックになってしまいます。

 

Java SE 1.0から1.4までの時代をまとめると

  • スレッドを実行するには、OSのスレッドとの対応付けが必要
  • スレッドの実行スケジューリングはシンプルなものしかなかった
  • 必要に応じて、明示的に他のスレッドに処理を移す処理を記述する
  • タスクはRunnableインタフェースで記述するため、処理の結果を直接返すことはできない
  • 複数のスレッドからアクセスするにはsynchronizedが必要

 

マルチコア黎明期 - Java 5, Java 6

Java 5がリリースされたのは2004年、Java 6は2006年。このころ、ちょうどAMDやIntelがマルチコアのCPUをリリースし始めたころです。

AMDがAthlon64 X2で先行し、Intelが1つのパッケージに2つのダイをのせたPentium Dをリリースし、その後にちゃんとしたマルチコアのCore 2をリリースしたなんてこともありました。また、Intelはハイパースレッディングで1つのコアで2つのスレッドを処理できるようになっています。

それに合わせたわけではないと思いますが、Java 5ではご存じの通りConcurrency Utilities (java.util.concurrentパッケージ)が導入され、マルチスレッドに関するAPIが大幅に拡充されました。

主なAPIをあげてみましょう

  • スレッドプール
  • 非同期処理を結果を返すCallableとFuture
  • 多様なロックのAPI
  • スレッドセーフなコレクション
  • プリミティブ型に対するアトミック処理

スレッドプールであるExecutorServiceインタフェースが導入されたことで、スレッドとタスクが切り離され、スレッドを直接生成する必要はなくなりました。

Executorsクラスが作成するスレッドプールはJava 5では4種類ありましたが、一般的には以下の2つのメソッドで作成するスレッドプールを使用します。

  • newFixedThreadPoolメソッド
  • newCachedThreadPoolメソッド

newFixedThreadPoolメソッドで作成するスレッドプールはメソッド名の通り、指定した数しかスレッドを作成しません。通常はマシンのトータルのCPUのコア数よりも少ないスレッド数を指定します。トータルのコア数を超えるスレッドを作成してしまうとコンテキストスイッチが発生してしまいます。

スレッドに割り当てられないタスクはキューで管理されます。タスクがスレッドに割り当てられれば、コンテキストスイッチが発生しないので応答性を高めることができます。

 

一方のnewChachedThreadPoolメソッドではタスクが登録された時にスレッドが余っていれば新たにスレッドを作成することはありません。しかし、余っているスレッドがない場合はスレッドを作成して、タスクに割り当てます。

このスレッドプールはWebアプリケーションなどでリクエストごとにスレッドを作成するThread per Request方式で使われることが多くあります。Thread per Requestはスループットを向上させることが可能です。しかし、応答性は低下します。また、リクエストごとにThreadオブジェクトを生成してしまうので、ヒープの消費も大きくなります。

 

ExecutorServiceインタフェースで扱えるタスクには、Runnableインタフェースに加え、Callableインタフェースが導入されました。Callableインタフェースを使うと非同期処理の結果を返せるようになります。これに伴い、非同期処理の結果を取得するためのFutureインタフェースが導入されています。

こんな感じで使います。

    ExecutorService pool = Executors.newFixedThreadPool(4);

    Future<String> future = pool.submit(new Callable<>() {
        @Override
        public String call() throws Exception {
            String result = // 何らかの処理
            return result;
        }            
    });

    String result = future.get();

    pool.shutdown();

結果が返せるということは、複数のスレッドで結果をやりとりするためにだけ使っていたsynchronizedブロック(もしくはsynchronizedメソッド)を使用しなくてもいいということです。synchronizedが減らせれば、マルチスレッドのボトルネックを減らすことができます。

ただし、Future.getメソッドは、非同期処理が完了して値が戻るまでブロックしてしまいます。この点は気を付ける必要があります。

 

さらに、セマフォなどのロックのAPIが導入されています。

特にReentrantLockクラスはsynchronizedと同様の動作をするので、synchrnozedを減らすことができます。ReentrantLockクラスは実行時に必要のないロックはロックを外すなどの最適化が行われるため、synchnorzedよりも効果的です。

 

他のスレッドセーフなコレクションやアトミック処理APIはスレッド数が少ない場合は有効に使えるのですが、多くのスレッドが同時にアクセスするような場合はボトルネックになりがちです。この当時はまだ同時に扱えるスレッドが少なかったので、よかったのですが、コア数が増えて同時に動作するスレッドが増えれば増えるほど、使い方を考えなくてはいけません。というか、多くのスレッドが同時にアクセスするような場合は、なるべく使わない方が賢明です。

 

さて、Java SE 5, 6時代をまとめると。

  • スレッドプールは応答性重視とスループット重視の2種類
  • Callable/Futureの導入で非同期処理の結果を返せる
  • 非同期処理の結果の受け渡しのためにだけ使用していたsynchronizedは減らせる
  • synchronizedではなく、ロックAPIを使用することでロックの最適化が可能

 

マルチコア実用期 - Java SE 7

Java 6がリリースされた後、ラムダ式導入でもめたり、Sun MircrosystemsがOracleに買収されたりなどあって、Java 7のリリースは大幅に遅れ、2011年にやっとリリースされました。結局、ラムダ式はJava 8に延期されたのはご存じのとおりです。

この当時、インテルは2010年に6コアのCPUをリリース、一方のAMDは2011年に8コアのCPUをリリースしています。ノートPCでもマルチコアが当たり前になってきたのもこの頃ですね。

 

さて、ラムダ式がJava 8にスリップしたということで、Java 7ではあまり大きな変化はないように思えますが、マルチスレッドに関しては大きな変化がありました。それがFork/Join Frameworkの導入です。

Fork/Join Frameworkは主に計算処理や再帰処理の効率化を行うために導入されました。たとえば、ソートや行列計算などです。

たとえば、Java SE 6まで標準APIで使われていたマージソートを考えてみましょう。

マージソートはソートを行う範囲を半分に分割していき、ソートする範囲を小さくしていきます。小さい範囲でまずソートを行い、それを組み合わせて徐々に大きい範囲のソートを行うアルゴリズムです。

このような処理の方法を分割統治法と呼びます。

現在のJavaの標準APIのソートはティムソートに変更されていますが、ティムソートも分割統治法を用いたソートアルゴリズムです。

分割統治法で分割したタスクを非同期に実行するようにすれば、マルチスレッドで処理することができます。しかし、Java 6までのスレッドプールはタスクスケジューリングが単純で、多くのタスクを非同期に実行させなければならない分割統治法では有効にスレッドを活用することができません。

そこで、Fork/Join Frameworkでは、Work Stealingを採用したタスクスケジューリングが導入されました。

それまではタスクを1つのキューで管理していましたが、Work Stealingではスレッドごとにタスクキューを持ちます。このキューはFIFOではなく、両端キューで構成されます。

スレッドが自身のタスクキューからタスクを取り出す場合は、キューの先頭から取り出します。

もし、あるスレッドのタスクキューが空になった場合、そのスレッドは他のスレッドのタスクキューからタスクを取り出す(他のスレッドから取り出すのでSteal、つまり盗むということです)ことができます。この時、タスクキューの先頭から取り出すのではなく、キューの最後から取り出します。

分割統治法では大きい粒度のタスクの処理中に分割したタスクを再びキューに登録していきます。このことは、タスクキューの先頭に近いほどタスクが大きく、最後になればなるほどタスクが小さくなることを示しています。

このため、他のスレッドが盗んでいくタスクは小さい粒度で短時間で処理ができるはずです。このようにすることで、複数のスレッドで効率よく分割したタスクを処理していくことができます。

Work Stealing方式のスレッドプールは、Java 8で導入されるパラレルストリームでも使用されます。

 

Fork/Join Frameworkは計算処理の効率化はできるものの、Webアプリケーションのような通信やDBへのアクセスがあるような用途には向いていません。しかし、Work Stealing方式のタスクスケジューリングを含むスレッドプールは、Fork/Join Framework以外の用途でも使われていくようになりました。

このスレッドプールはForkJoinPoolクラスで使用できます。デフォルトではトータルのCPUのコア数だけスレッドを生成します。このため、タスクスイッチは頻繁に行うものの、スレッドのコンテキストスイッチは抑えることができます。

また、Executors.newWorkStealingPoolメソッドでもForkJoinPoolオブジェクトを取得できます。ただし、このメソッドはJava 8以降で使用することができます。

 

マルチコア熟成期 - Java SE 8

Java SE 8は2014年にやっとリリースされました。Java 8で最も注目されたのがProject Lambda、つまりラムダ式とストリームです。マルチスレッド的にはパラレルストリームの導入もあります。

しかし、ここで取り上げるのはパラレルストリームではなく、CompletableFutureです。く

Java 7のFork/Join Frmeworkで計算処理は効率よく行うことが可能になりました。しかし、通信やDBアクセスなどのI/O処理を含むようなタスクには向いていません。

I/O処理は処理をブロックします。たとえば、Readerオブジェクトから読み込む処理はどうでしょう。

    try (BufferedReader reader = new BufferedReader(...)) {
        int c = reader.read();
    }

readメソッドは文字が読み込めるまでブロックします。Readerオブジェクトがソケットから生成されていれば、通信が届かない限り文字を読み込めません。その間、ブロックします。

I/O処理はCPUの計算時間に比べると多大な時間を使います。しかし、ブロックしてしまうと、その間CPUは遊んでしまうわけです。

そこで、I/O処理を非同期に行うことが考えられます。たとえば、こんな感じです。

    Future<String> future = pool.submit(() -> {
        // I/O読み込み処理

        return contents;
    });

    // 非同期処理の結果を受け取る
    String contents = future.get();

お分かりかと思いますが、これでは非同期処理の利点を生かすことができません。というのも、Future.getメソッドが非同期処理が完了するまでブロックしてしまうからです。

そこで、導入されたのがCompletableFutureクラスです。

非同期で行いたいような処理は1つだけということはあまりなく、複数の処理を連ねることが多いのではないでしょうか。

たとえば、Webアプリケーションであれば

  1. リクエストを受けて何らかのロジック処理を行う
  2. DBへのクエリを投げる
  3. クエリ結果からレスポンスを作成し、送信する

のような感じです。もちろん、こんな単純なことで済まないことも多いとは思いますが、こんな感じで処理が進むという例のつだと思ってください。

Thread per Requestでリクエストごとにスレッドを生成して、非同期に1, 2, 3の処理を行えばスループットは向上します。しかし、2のDB処理は完了するまでブロッキングしてしまいます。そのため、ブロッキングしている間、CPUは遊んでしまいます。

その間にも、リクエストがあればスレッドがどんどん増えるばかりで、あっという間にスループットは飽和してしまいます。そこで、DB処理も非同期にして... とやっていると先ほどのFuture.getメソッドのようにまたブロッキングを増やしてしまうかもしれません。

CompletableFutureクラスを使うと、このような一連の処理をラムダ式で記述し、メソッドチェーンで記述することができます。

たとえば、リクエストを受けるメソッドがhandleメソッドだとしてみましょう。引数の型はRequestクラスとResponseクラスだとします。

    void handle(final Request request, final Response response) {
        record Param<S>(S s, Response response) {}
        
        CompletableFuture
            .completedFuture(new Param(request, response))
            .thenApply(param -> {

                // 1. ロジック処理

                return new Param(logicResult, response);
            })
            .thenApplyAsync(param -> {

                // 2. 非同期でDBへのクエリ

                return new Param(queryResult, response);
            })
            .thenAccept(param -> {

                // 3. レスポンスの作成、送信

                param.response().response(responseContents);
            });
     }

ここではタプル的に2つのオブジェクトを扱うためにレコードクラスのParamクラスを作りましたが、これが必須というわけではないです。

そのParamオブジェクトを引数としてcompletedFutureメソッドでCompletableFuture生成しています。その後のメソッドチェーンで処理を連ねていきます。

thenが先頭につくメソッドは前段までの処理が完了した時に引数のラムダ式がコールバック的に呼ばれるメソッドです。ApplyやAcceptはラムダ式の引数や返り値によって異なります。

メソッド名の最後がAsyncのメソッドは、引数のラムダ式がさらに非同期に実行されることを示します。したがって、1.のロジック処理と2.のDBクエリが同じスレッドで実行されるわけではないことを示します。

このように、一連の処理を同期、非同期を交ぜながら記述できるのが、CompletableFutureクラスの特徴です。また、CompletableFutureクラスはデフォルトでForkJoinPoolを使用するので、スレッド数は抑えたまま、効率的に実行することができます。

 

このCompletableFutureクラスを使えば、Thread per Requestに比べるとスループットの飽和も抑えることができます。

しかし、残念ながら、いまだにCompletableFutureクラスはそれほど使われていません。

単にシーケンシャルに処理を記述していくのに比べると、CompletableFutureクラスで処理を記述するには考え方を変えなければいけません。処理のキャンセルや、例外の扱いなどが煩雑になるという点もあります。

さらに、CompletableFutureクラスのデバッグは難しいのです。

ストリームのデバッグをしたことがある方は分かると思うのですが、ラムダ式が含まれるとスタックトレースが読みにくくなります。

しかも、CompletableFutureクラスは処理を記述したラムダ式が同じスレッドで動作するわけでありません。スタックトレースのスタックは、スレッドが持つスタックのことです。スレッドが異なればスタックトレースは別ものになります。このため、CompletableFutureクラスにおいて、ラムダ式で記述した処理で例外が発生したとしてもスレッドが異なるため、スタックトレースも別々になり、一連の処理の流れはスタックトレースからは追うことができません。

このように、CompletableFutureクラスは使いこなすうえで難しい点があります。しかし、CompletableFutureクラスを使いこなせるのであれば、スループットの高いシステムを実現することができます。

これはSpring WebFluxのようなリアクティブ系のフレームワークでも同じです。使いこなせるのであれば、高いスループット性能が可能です。

そして、CompletableFutureクラスでも、リアクティブなフレームワークでも使いこなせるのであれば、Virtual Threadは使わなくても大丈夫なはずです。

 

ここまでのJavaでのマルチスレッドの流れをまとめてみると

  • Thread per Requestを使えば、スループットを上げることは可能だが、飽和するのも早い
  • 計算処理など処理をブロックする要因がない処理であればFork/Join Frameworkもしくはパラレルストリームで応答性を向上することができる
  • I/Oなど処理をブロックする要因がある場合はブロックする処理を非同期に処理する。そのためにCompletableFutureクラスやリアクティブ系のフレームワークが使用可能
  • しかし、CompletableFutureクラスやリアクティブ系のフレームワークは使いこなすのが難しく、デバッグもやりにくい

ここらへんを何とかしてくれるのがVirtual Threadです。

Concurrency Utilities以降のJavaでは、自分でスレッドを生成することはなくなっているはずです。Virtual Threadも自分で生成させることはないはずです。

フレームワークなどがVirtual Threadに対応すれば、コードを変更しなくても自動的にVirtual Threadを使えるようになっているはず。

Springが早々にVirtual Threadに対応することを発表しているように、意外にVirtual Threadを使えるようになる日も近いかもしれません。

 

おまけ

わざわざ、Java 1.0のころの古いスレッドん使い方を説明したのは、ここで説明したことはほとんどがVirtual Threadではバッドマナーになるからです。

Virtual Threadは、Virtualではないスレッドの上で動く仮想スレッドです。しかし、Java 1.0のころのsynchronizedやwait/notifyなどの操作は仮想スレッドではなく、仮想スレッドが動作しているスレッドに対して行われます。

スレッドがブロックしてしまったりすると、スレッドの上で動作する複数のVirtual Threadすべてが影響を受けてしまいます。

ThreadクラスのいくつかのAPIは@deprecatedになり、使うことが非推奨になっています。

 

Threadクラスの非推奨(@deprecated)のAPI

  • checkAccess
  • countStackFrames
  • getId
  • resume
  • stop
  • suspend

これらのメソッドはいつ削除されても不思議はありません。

ちなみに、Java 8では使えていたdestroyメソッドとstop(Throwable)メソッドはすでに削除されています。

また、synchronizedブロック(メソッド)はロックAPIで書きかえられます。wait/notifyを使う場面はほとんどないとは思いますが、なるべくタスクは独立に処理できるようにし、これらを使わないような設計を考えていく必要があります。