Durable Functions for JavaがPublic previewになったので試してみた(2)ファンアウト/ファンイン編

2022年6月22日にDurable Functions for JavaがPublic previewとして公開されました。

azure.microsoft.com

先週はチュートリアルをそのまま実行しただけですが、今日はファンアウト/ファンインを試してみました。

先週の記事は以下を参照してください。

miyohide.hatenablog.com

ファンアウト/ファンインとは

ファンアウト/ファンインはDurable Functionsで説明されているパターンの一つです。

docs.microsoft.com

Javaのコードも書かれているので、試してみました。

アクティビティ関数 F1

まずアクティビティ関数 F1を実装します。単純にIntegerの配列を返すメソッドを作成しました。

    @FunctionName("F1")
    public List<Integer> f1(@DurableActivityTrigger(name = "f1name") String name) {
        return new ArrayList<>(Arrays.asList(10, 20, 30, 40, 50));
    }

@DurableActivityTriggerは必須のようで、記述がないとコンパイルは成功しますが動作時に以下のメッセージが出力されます。

The 'F1' function is in error: At least one binding must be declared.

アクティビティ関数 F2

アクティビティ関数 F2を実装します。引数で取得した値の3倍を返すメソッドを作成しました。

    @FunctionName("F2")
    public Integer f2(@DurableActivityTrigger(name = "f2name") int i, final ExecutionContext context) {
        context.getLogger().info("F2 with i = [" + i + "]");
        return i * 3;
    }

引数の値をログとして出力するために、final ExecutionContext contextを引数に追加し、context.getLogger().infoを使ってログを出力しました。

オーケストレーター関数

オーケストレーター関数を実装します。ほとんどDurable Functionsの記事内で書かれているJavaのサンプルと同じです。

    @FunctionName("FanOutFanIn")
    public String fanOutFanInOrchestrator(
            @DurableOrchestrationTrigger(name = "runtimeState") String runtimeState
    ) {
        return OrchestrationRunner.loadAndRun(runtimeState, ctx -> {

            List<?> batch = ctx.callActivity("F1", List.class).await();

            List<Task<Integer>> parallelTasks = batch.stream()
                    .map(i -> ctx.callActivity("F2", i, Integer.class))
                    .collect(Collectors.toList());

            List<Integer> results = ctx.allOf(parallelTasks).await();
            return results.stream().reduce(0, Integer::sum);
        });
    }

変えたのは、List<Task<Integer>> parallelTasks = batch.stream().map(i -> ctx.callActivity("F2", i, Integer.class))の部分で、アクティビティ関数 F1で得られたListの各要素をアクティビティ関数F2に渡すようにしています。

Java 8で導入されたラムダ式とかStreamなどを随所で使っているのであまり慣れていないのですが、StreamのJavaDocとにらめっこして理解するようにしました。

docs.oracle.com

クライアント関数

クライアント関数は先週書いたものとほぼ同じです。オーケストレーター関数の名前を変えただけです。

    @FunctionName("StartHelloCities")
    public HttpResponseMessage startHelloCities(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET})HttpRequestMessage<Optional<String>> req,
            @DurableClientInput(name = "durableContext")DurableClientContext durableContext,
            final ExecutionContext context
            ) {
        DurableTaskClient client = durableContext.getClient();
        String instanceId = client.scheduleNewOrchestrationInstance("FanOutFanIn");
        context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
        return durableContext.createCheckStatusResponse(req, instanceId);
    }

実行

実行は./gradlew azureFunctionsRunで起動します。その後、http://localhost:7071/api/クライアント関数名にブラウザでアクセスすると、以下のように処理が走ります。

[2022-07-10T08:01:44.174Z] Executing 'Functions.StartHelloCities' (Reason='This function was programmatically called via the host APIs.', Id=1cc0b8b5-2ce3-4952-b594-ec4845c94745)
[2022-07-10T08:01:44.748Z] Created new Java orchestration with instance ID = 9f733526-7db5-42fd-8bef-94eeff0804ac
[2022-07-10T08:01:44.752Z] Function "StartHelloCities" (Id: 1cc0b8b5-2ce3-4952-b594-ec4845c94745) invoked by Java Worker
[2022-07-10T08:01:44.807Z] Executed 'Functions.StartHelloCities' (Succeeded, Id=1cc0b8b5-2ce3-4952-b594-ec4845c94745, Duration=653ms)
[2022-07-10T08:01:44.849Z] Executing 'Functions.FanOutFanIn' (Reason='(null)', Id=50f96a8d-16c2-4d76-be47-6964fa1b0782)
[2022-07-10T08:01:44.964Z] Function "FanOutFanIn" (Id: 50f96a8d-16c2-4d76-be47-6964fa1b0782) invoked by Java Worker
[2022-07-10T08:01:44.973Z] Executed 'Functions.FanOutFanIn' (Succeeded, Id=50f96a8d-16c2-4d76-be47-6964fa1b0782, Duration=141ms)
[2022-07-10T08:01:45.020Z] Executing 'Functions.F1' (Reason='(null)', Id=26b7a005-0b9c-495a-a879-0b619888db67)
[2022-07-10T08:01:45.024Z] Function "F1" (Id: 26b7a005-0b9c-495a-a879-0b619888db67) invoked by Java Worker
[2022-07-10T08:01:45.025Z] Executed 'Functions.F1' (Succeeded, Id=26b7a005-0b9c-495a-a879-0b619888db67, Duration=5ms)
[2022-07-10T08:01:45.120Z] Executing 'Functions.FanOutFanIn' (Reason='(null)', Id=7be266ae-4d91-4a3d-a616-7e672f57fb09)
[2022-07-10T08:01:45.157Z] Function "FanOutFanIn" (Id: 7be266ae-4d91-4a3d-a616-7e672f57fb09) invoked by Java Worker
[2022-07-10T08:01:45.158Z] Executed 'Functions.FanOutFanIn' (Succeeded, Id=7be266ae-4d91-4a3d-a616-7e672f57fb09, Duration=39ms)
[2022-07-10T08:01:45.188Z] Executing 'Functions.F2' (Reason='(null)', Id=0db408a9-82cd-4186-ab55-f1a709c1be74)
[2022-07-10T08:01:45.195Z] F2 with i = [10]
[2022-07-10T08:01:45.196Z] Function "F2" (Id: 0db408a9-82cd-4186-ab55-f1a709c1be74) invoked by Java Worker
[2022-07-10T08:01:45.198Z] Executed 'Functions.F2' (Succeeded, Id=0db408a9-82cd-4186-ab55-f1a709c1be74, Duration=12ms)
[2022-07-10T08:01:45.198Z] Executing 'Functions.F2' (Reason='(null)', Id=852b89be-34f7-4117-ba3a-c4afd05c63ff)
[2022-07-10T08:01:45.200Z] F2 with i = [30]
[2022-07-10T08:01:45.200Z] Function "F2" (Id: 852b89be-34f7-4117-ba3a-c4afd05c63ff) invoked by Java Worker
[2022-07-10T08:01:45.201Z] Executed 'Functions.F2' (Succeeded, Id=852b89be-34f7-4117-ba3a-c4afd05c63ff, Duration=3ms)
[2022-07-10T08:01:45.209Z] Executing 'Functions.F2' (Reason='(null)', Id=c192396d-bef0-4519-8557-be2f1e6e3931)
[2022-07-10T08:01:45.212Z] F2 with i = [50]
[2022-07-10T08:01:45.212Z] Function "F2" (Id: c192396d-bef0-4519-8557-be2f1e6e3931) invoked by Java Worker
[2022-07-10T08:01:45.212Z] Executed 'Functions.F2' (Succeeded, Id=c192396d-bef0-4519-8557-be2f1e6e3931, Duration=3ms)
[2022-07-10T08:01:45.221Z] Executing 'Functions.F2' (Reason='(null)', Id=d1eae73c-da8b-4c09-b310-660f6fed49e0)
[2022-07-10T08:01:45.223Z] F2 with i = [20]
[2022-07-10T08:01:45.224Z] Function "F2" (Id: d1eae73c-da8b-4c09-b310-660f6fed49e0) invoked by Java Worker
[2022-07-10T08:01:45.224Z] Executed 'Functions.F2' (Succeeded, Id=d1eae73c-da8b-4c09-b310-660f6fed49e0, Duration=2ms)
[2022-07-10T08:01:45.259Z] Executing 'Functions.FanOutFanIn' (Reason='(null)', Id=b076da01-e48b-4f42-b3e2-c1154815e20c)
[2022-07-10T08:01:45.262Z] Function "FanOutFanIn" (Id: b076da01-e48b-4f42-b3e2-c1154815e20c) invoked by Java Worker
[2022-07-10T08:01:45.263Z] Executed 'Functions.FanOutFanIn' (Succeeded, Id=b076da01-e48b-4f42-b3e2-c1154815e20c, Duration=3ms)
[2022-07-10T08:01:45.281Z] Executing 'Functions.F2' (Reason='(null)', Id=82a0761d-b90e-425e-a7a5-78e83fa85006)
[2022-07-10T08:01:45.283Z] F2 with i = [40]
[2022-07-10T08:01:45.284Z] Function "F2" (Id: 82a0761d-b90e-425e-a7a5-78e83fa85006) invoked by Java Worker
[2022-07-10T08:01:45.284Z] Executed 'Functions.F2' (Succeeded, Id=82a0761d-b90e-425e-a7a5-78e83fa85006, Duration=3ms)
[2022-07-10T08:01:45.327Z] Executing 'Functions.FanOutFanIn' (Reason='(null)', Id=61407928-202f-4053-a0a7-d99bc2850bf4)
[2022-07-10T08:01:45.340Z] Function "FanOutFanIn" (Id: 61407928-202f-4053-a0a7-d99bc2850bf4) invoked by Java Worker
[2022-07-10T08:01:45.342Z] Executed 'Functions.FanOutFanIn' (Succeeded, Id=61407928-202f-4053-a0a7-d99bc2850bf4, Duration=15ms)

アクティビティ関数F1で生成されたListの順番ではなく、バラバラに値が渡されていることが確認できます。

結果はhttp://localhost:7071/api/クライアント関数名でブラウザでアクセスしたときに出てくるstatusQueryGetUriのURLにアクセスすると以下のようなJSONが返ってきて確認できます。

{"name":"FanOutFanIn","instanceId":"9f733526-7db5-42fd-8bef-94eeff0804ac","runtimeStatus":"Completed","input":null,"customStatus":"","output":450,"createdTime":"2022-07-10T08:01:44Z","lastUpdatedTime":"2022-07-10T08:01:45Z"}

outputの項目はアクティビティ関数F1の各値を3倍したものを足し合わせたものである450が返ってきていることが確認できます。

これでファンアウト/ファンインパターンのDurable Functionsの実装ができました。

参考

たまたま見つけたのですが、Durable Functions for Javaで使われているライブラリは以下のもののようです。

github.com

ソース

現状のソースです。

github.com