Durable Functions for JavaがPublic previewになったので試してみた(3)モニター編

2022年6月22日にDurable Functions for JavaがPublic previewとして公開されました。

azure.microsoft.com

先々週はチュートリアルをそのまま実行し、先週はファンアウト/ファンインを試してみました。今日はモニターを試してみました。

これまでの記事は以下を参照してください。

モニターとは?

今回試そうとするDurable Functionsのモニターパターンは以下に説明があります。

docs.microsoft.com

サンプルコードが書かれているのでそれを実装してみます。

JobInfo?

サンプルコードの中で特に説明もなく使われているのがJobInfoというクラスです。とりあえず必要なメソッド等を実装すると以下のようになりました。

package com.example;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.time.Duration;
import java.time.Instant;

@JsonIgnoreProperties(ignoreUnknown=true)
public class JobInfo {
    private String jobId = "1";

    public String getJobId() {
        return jobId;
    }

    public Instant getExpirationTime() {
        return Instant.now().plusSeconds(3_600L);
    }

    public Duration getPollingDelay() {
        return Duration.ofSeconds(10L);
    }

    public void setJobId(String jobId) {
        this.jobId = jobId;
    }
}

最初、getJobIdgetExpirationTimegetPollingDelayを実装しましたが、動かすとThe orchestrator failed with an unhandled exception: com.microsoft.durabletask.DataConverter$DataConverterException: Failed to deserialize the JSON text to com.example.JobInfo.という警告が出てうまく動きませんでした。 あれこれ試してみると上記のようにJacksonの設定である@JsonIgnoreProperties(ignoreUnknown=true)をつけることでうまく動くことが確認できました。おそらく、以下の部分が関係するんだろうなと思ってはいるのですが、確証があるわけではないです。

docs.microsoft.com

Jacksonの機能を使うので、build.gradleに以下の記述も実施しました。

implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.3'

アクティビティ関数 GetJobStatus

適当なアクティビティ関数を実装します。以下のような実装にしました。

    @FunctionName("GetJobStatus")
    public String getJobStatus(@DurableActivityTrigger(name = "getJobStatusName") Integer i) {
        if (i < 10) {
            return "Running...";
        } else {
            return "Completed";
        }
    }

引数で取った呼び出し回数に応じて返す値を変えています。

オーケストレーター関数

オーケストレーター関数はサンプルにあるものをもとに実装します。こんな感じです。

    @FunctionName("Monitor")
    public String monitorOrchestrator(
            @DurableOrchestrationTrigger(name = "runtimeState") String runtimeState
    ) {
        return OrchestrationRunner.loadAndRun(
                runtimeState, ctx -> {
                    JobInfo jobInfo = ctx.getInput(JobInfo.class);
                    String jobId = jobInfo.getJobId();
                    Instant expiryTime = jobInfo.getExpirationTime();
                    int i = 0;

                    while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
                        // ctx.getCurrentInstant() < expiryTime の場合
                        i += 1;
                        String status = ctx.callActivity("GetJobStatus", i, String.class).await();

                        if (status.equals("Completed")) {
                            break;
                        } else {
                            Duration pollingDelay = jobInfo.getPollingDelay();
                            ctx.createTimer(pollingDelay).await();
                        }
                    }
                    return "done";
                }
        );
    }

定期的にアクティビティ関数であるGetJobStatusを呼び出し、結果がCompletedなら処理終了それ以外なら少し待って再処理するということを実装しました。

クライアント関数

クライアント関数はこれまでと大きく変わらず、オーケストレーター関数の呼び出しと上記で作成したJobInfoクラスのインスタンスを渡すように変更しました。

    @FunctionName("StartHelloCities")
    public HttpResponseMessage startHelloCities(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET})HttpRequestMessage<Optional<String>> req,
            @DurableClientInput(name = "durableContext")DurableClientContext durableContext,
            final ExecutionContext context
            ) {
        DurableTaskClient client = durableContext.getClient();
        String instanceId = client.scheduleNewOrchestrationInstance("Monitor", new JobInfo());
        context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
        return durableContext.createCheckStatusResponse(req, instanceId);
    }

実行

./gradlew azureFunctionsRunで実行し、ブラウザでhttp://localhost:7071/api/クライアント関数名にアクセスします。ログは以下のようにでました。

[2022-07-17T07:15:55.369Z] Executing 'Functions.StartHelloCities' (Reason='This function was programmatically called via the host APIs.', Id=308672c6-e0df-4f66-93f5-090d4a1c61fc)
[2022-07-17T07:15:55.925Z] Created new Java orchestration with instance ID = 159dbf1f-fc11-473a-a6a1-2f24ea0e5f21
[2022-07-17T07:15:55.928Z] Function "StartHelloCities" (Id: 308672c6-e0df-4f66-93f5-090d4a1c61fc) invoked by Java Worker
[2022-07-17T07:15:55.982Z] Executed 'Functions.StartHelloCities' (Succeeded, Id=308672c6-e0df-4f66-93f5-090d4a1c61fc, Duration=634ms)
[2022-07-17T07:15:56.022Z] Executing 'Functions.Monitor' (Reason='(null)', Id=257cbcf1-36cc-493a-86be-8f2023e35acd)
[2022-07-17T07:15:56.143Z] Function "Monitor" (Id: 257cbcf1-36cc-493a-86be-8f2023e35acd) invoked by Java Worker
[2022-07-17T07:15:56.151Z] Executed 'Functions.Monitor' (Succeeded, Id=257cbcf1-36cc-493a-86be-8f2023e35acd, Duration=148ms)
[2022-07-17T07:15:56.204Z] Executing 'Functions.GetJobStatus' (Reason='(null)', Id=3929e30b-c7b0-4e41-9773-a04ae14d58e8)
[2022-07-17T07:15:56.207Z] Function "GetJobStatus" (Id: 3929e30b-c7b0-4e41-9773-a04ae14d58e8) invoked by Java Worker
[2022-07-17T07:15:56.208Z] Executed 'Functions.GetJobStatus' (Succeeded, Id=3929e30b-c7b0-4e41-9773-a04ae14d58e8, Duration=5ms)
[2022-07-17T07:15:56.306Z] Executing 'Functions.Monitor' (Reason='(null)', Id=68366295-fc0f-4dc2-93da-f99b152afe79)
[2022-07-17T07:15:56.313Z] Function "Monitor" (Id: 68366295-fc0f-4dc2-93da-f99b152afe79) invoked by Java Worker
[2022-07-17T07:15:56.315Z] Executed 'Functions.Monitor' (Succeeded, Id=68366295-fc0f-4dc2-93da-f99b152afe79, Duration=10ms)
[2022-07-17T07:16:09.193Z] Executing 'Functions.Monitor' (Reason='(null)', Id=f58dfe99-e518-4a6e-a18e-662d154f1b1e)
[2022-07-17T07:16:09.199Z] Function "Monitor" (Id: f58dfe99-e518-4a6e-a18e-662d154f1b1e) invoked by Java Worker
[2022-07-17T07:16:09.200Z] Executed 'Functions.Monitor' (Succeeded, Id=f58dfe99-e518-4a6e-a18e-662d154f1b1e, Duration=10ms)
[2022-07-17T07:16:09.232Z] Executing 'Functions.GetJobStatus' (Reason='(null)', Id=6f7d4c16-cc1f-4eb5-9776-ebe2dddee459)
[2022-07-17T07:16:09.235Z] Function "GetJobStatus" (Id: 6f7d4c16-cc1f-4eb5-9776-ebe2dddee459) invoked by Java Worker
[2022-07-17T07:16:09.243Z] Executed 'Functions.GetJobStatus' (Succeeded, Id=6f7d4c16-cc1f-4eb5-9776-ebe2dddee459, Duration=11ms)
[2022-07-17T07:16:09.297Z] Executing 'Functions.Monitor' (Reason='(null)', Id=e0764e5a-a148-4ac7-a88e-57ae59960313)
[2022-07-17T07:16:09.301Z] Function "Monitor" (Id: e0764e5a-a148-4ac7-a88e-57ae59960313) invoked by Java Worker
[2022-07-17T07:16:09.301Z] Executed 'Functions.Monitor' (Succeeded, Id=e0764e5a-a148-4ac7-a88e-57ae59960313, Duration=4ms)
[2022-07-17T07:16:23.050Z] Executing 'Functions.Monitor' (Reason='(null)', Id=72a4908e-dea1-4d22-a9f3-865766ef783e)
[2022-07-17T07:16:23.055Z] Function "Monitor" (Id: 72a4908e-dea1-4d22-a9f3-865766ef783e) invoked by Java Worker
[2022-07-17T07:16:23.056Z] Executed 'Functions.Monitor' (Succeeded, Id=72a4908e-dea1-4d22-a9f3-865766ef783e, Duration=6ms)
[2022-07-17T07:16:23.087Z] Executing 'Functions.GetJobStatus' (Reason='(null)', Id=69d60056-4d18-4973-be62-5d1a10bba319)
[2022-07-17T07:16:23.090Z] Function "GetJobStatus" (Id: 69d60056-4d18-4973-be62-5d1a10bba319) invoked by Java Worker
[2022-07-17T07:16:23.090Z] Executed 'Functions.GetJobStatus' (Succeeded, Id=69d60056-4d18-4973-be62-5d1a10bba319, Duration=3ms)
[2022-07-17T07:16:23.141Z] Executing 'Functions.Monitor' (Reason='(null)', Id=c42b8e84-29b7-454d-a200-88b5618320dc)
[2022-07-17T07:16:23.145Z] Function "Monitor" (Id: c42b8e84-29b7-454d-a200-88b5618320dc) invoked by Java Worker
[2022-07-17T07:16:23.145Z] Executed 'Functions.Monitor' (Succeeded, Id=c42b8e84-29b7-454d-a200-88b5618320dc, Duration=4ms)
(省略)
[2022-07-17T07:17:31.330Z] Executing 'Functions.Monitor' (Reason='(null)', Id=519afa8e-ec99-4e3e-bc58-ed6bf364c119)
[2022-07-17T07:17:31.334Z] Function "Monitor" (Id: 519afa8e-ec99-4e3e-bc58-ed6bf364c119) invoked by Java Worker
[2022-07-17T07:17:31.335Z] Executed 'Functions.Monitor' (Succeeded, Id=519afa8e-ec99-4e3e-bc58-ed6bf364c119, Duration=4ms)
[2022-07-17T07:17:31.363Z] Executing 'Functions.GetJobStatus' (Reason='(null)', Id=a72e082b-80cd-47a8-86b7-2036652f0509)
[2022-07-17T07:17:31.367Z] Function "GetJobStatus" (Id: a72e082b-80cd-47a8-86b7-2036652f0509) invoked by Java Worker
[2022-07-17T07:17:31.367Z] Executed 'Functions.GetJobStatus' (Succeeded, Id=a72e082b-80cd-47a8-86b7-2036652f0509, Duration=4ms)
[2022-07-17T07:17:31.440Z] Executing 'Functions.Monitor' (Reason='(null)', Id=9aeedf45-f670-43c8-8bbf-cfeb64be5206)
[2022-07-17T07:17:31.446Z] Function "Monitor" (Id: 9aeedf45-f670-43c8-8bbf-cfeb64be5206) invoked by Java Worker
[2022-07-17T07:17:31.446Z] Executed 'Functions.Monitor' (Succeeded, Id=9aeedf45-f670-43c8-8bbf-cfeb64be5206, Duration=7ms)
[2022-07-17T07:17:45.911Z] Executing 'Functions.Monitor' (Reason='(null)', Id=175ef6a2-1126-4614-bc88-fa5804403c22)
[2022-07-17T07:17:45.917Z] Function "Monitor" (Id: 175ef6a2-1126-4614-bc88-fa5804403c22) invoked by Java Worker
[2022-07-17T07:17:45.917Z] Executed 'Functions.Monitor' (Succeeded, Id=175ef6a2-1126-4614-bc88-fa5804403c22, Duration=6ms)
[2022-07-17T07:17:45.975Z] Executing 'Functions.GetJobStatus' (Reason='(null)', Id=b5acf6e7-d066-421b-aad5-50cb4de5ac1b)
[2022-07-17T07:17:45.978Z] Function "GetJobStatus" (Id: b5acf6e7-d066-421b-aad5-50cb4de5ac1b) invoked by Java Worker
[2022-07-17T07:17:45.979Z] Executed 'Functions.GetJobStatus' (Succeeded, Id=b5acf6e7-d066-421b-aad5-50cb4de5ac1b, Duration=4ms)
[2022-07-17T07:17:46.033Z] Executing 'Functions.Monitor' (Reason='(null)', Id=c8a824a3-9a27-4fe1-ab32-b8736debebd9)
[2022-07-17T07:17:46.037Z] Function "Monitor" (Id: c8a824a3-9a27-4fe1-ab32-b8736debebd9) invoked by Java Worker
[2022-07-17T07:17:46.038Z] Executed 'Functions.Monitor' (Succeeded, Id=c8a824a3-9a27-4fe1-ab32-b8736debebd9, Duration=5ms)
[2022-07-17T07:17:59.140Z] Executing 'Functions.Monitor' (Reason='(null)', Id=7644ddd7-eb7a-4720-b7a7-3f7fbf9996ca)
[2022-07-17T07:17:59.144Z] Function "Monitor" (Id: 7644ddd7-eb7a-4720-b7a7-3f7fbf9996ca) invoked by Java Worker
[2022-07-17T07:17:59.144Z] Executed 'Functions.Monitor' (Succeeded, Id=7644ddd7-eb7a-4720-b7a7-3f7fbf9996ca, Duration=4ms)
[2022-07-17T07:17:59.172Z] Executing 'Functions.GetJobStatus' (Reason='(null)', Id=68dec95f-32ff-4d43-9e04-d1ef01a55f06)
[2022-07-17T07:17:59.174Z] Function "GetJobStatus" (Id: 68dec95f-32ff-4d43-9e04-d1ef01a55f06) invoked by Java Worker
[2022-07-17T07:17:59.175Z] Executed 'Functions.GetJobStatus' (Succeeded, Id=68dec95f-32ff-4d43-9e04-d1ef01a55f06, Duration=2ms)
[2022-07-17T07:17:59.229Z] Executing 'Functions.Monitor' (Reason='(null)', Id=9006fb91-2c3d-4301-ab82-86f5198a92bf)
[2022-07-17T07:17:59.236Z] Function "Monitor" (Id: 9006fb91-2c3d-4301-ab82-86f5198a92bf) invoked by Java Worker
[2022-07-17T07:17:59.237Z] Executed 'Functions.Monitor' (Succeeded, Id=9006fb91-2c3d-4301-ab82-86f5198a92bf, Duration=8ms)

JobInfogetPollingDelayメソッドで指定した10秒+数秒の間隔でアクティビティ関数 GetJobStatus が呼び出されていることが確認できます。

ブラウザでアクセスしたときに表示されるstatusQueryGetUriにアクセスすると、アクセスのタイミングでruntimeStatusRunningになったりCompletedになっていることが確認できます。

実行初期

実行完了後