2022年6月22日にDurable Functions for JavaがPublic previewとして公開されました。
先週はチュートリアルをそのまま実行しただけですが、今日はファンアウト/ファンインを試してみました。
先週の記事は以下を参照してください。
ファンアウト/ファンインとは
ファンアウト/ファンインはDurable Functionsで説明されているパターンの一つです。
Javaのコードも書かれているので、試してみました。
アクティビティ関数 F1
まずアクティビティ関数 F1を実装します。単純にIntegerの配列を返すメソッドを作成しました。
@FunctionName("F1") public List<Integer> f1(@DurableActivityTrigger(name = "f1name") String name) { return new ArrayList<>(Arrays.asList(10, 20, 30, 40, 50)); }
@DurableActivityTrigger
は必須のようで、記述がないとコンパイルは成功しますが動作時に以下のメッセージが出力されます。
The 'F1' function is in error: At least one binding must be declared.
アクティビティ関数 F2
アクティビティ関数 F2を実装します。引数で取得した値の3倍を返すメソッドを作成しました。
@FunctionName("F2") public Integer f2(@DurableActivityTrigger(name = "f2name") int i, final ExecutionContext context) { context.getLogger().info("F2 with i = [" + i + "]"); return i * 3; }
引数の値をログとして出力するために、final ExecutionContext context
を引数に追加し、context.getLogger().info
を使ってログを出力しました。
オーケストレーター関数
オーケストレーター関数を実装します。ほとんどDurable Functionsの記事内で書かれているJavaのサンプルと同じです。
@FunctionName("FanOutFanIn") public String fanOutFanInOrchestrator( @DurableOrchestrationTrigger(name = "runtimeState") String runtimeState ) { return OrchestrationRunner.loadAndRun(runtimeState, ctx -> { List<?> batch = ctx.callActivity("F1", List.class).await(); List<Task<Integer>> parallelTasks = batch.stream() .map(i -> ctx.callActivity("F2", i, Integer.class)) .collect(Collectors.toList()); List<Integer> results = ctx.allOf(parallelTasks).await(); return results.stream().reduce(0, Integer::sum); }); }
変えたのは、List<Task<Integer>> parallelTasks = batch.stream()
と.map(i -> ctx.callActivity("F2", i, Integer.class))
の部分で、アクティビティ関数 F1で得られたListの各要素をアクティビティ関数F2に渡すようにしています。
Java 8で導入されたラムダ式とかStreamなどを随所で使っているのであまり慣れていないのですが、StreamのJavaDocとにらめっこして理解するようにしました。
クライアント関数
クライアント関数は先週書いたものとほぼ同じです。オーケストレーター関数の名前を変えただけです。
@FunctionName("StartHelloCities") public HttpResponseMessage startHelloCities( @HttpTrigger(name = "req", methods = {HttpMethod.GET})HttpRequestMessage<Optional<String>> req, @DurableClientInput(name = "durableContext")DurableClientContext durableContext, final ExecutionContext context ) { DurableTaskClient client = durableContext.getClient(); String instanceId = client.scheduleNewOrchestrationInstance("FanOutFanIn"); context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId); return durableContext.createCheckStatusResponse(req, instanceId); }
実行
実行は./gradlew azureFunctionsRun
で起動します。その後、http://localhost:7071/api/クライアント関数名
にブラウザでアクセスすると、以下のように処理が走ります。
[2022-07-10T08:01:44.174Z] Executing 'Functions.StartHelloCities' (Reason='This function was programmatically called via the host APIs.', Id=1cc0b8b5-2ce3-4952-b594-ec4845c94745) [2022-07-10T08:01:44.748Z] Created new Java orchestration with instance ID = 9f733526-7db5-42fd-8bef-94eeff0804ac [2022-07-10T08:01:44.752Z] Function "StartHelloCities" (Id: 1cc0b8b5-2ce3-4952-b594-ec4845c94745) invoked by Java Worker [2022-07-10T08:01:44.807Z] Executed 'Functions.StartHelloCities' (Succeeded, Id=1cc0b8b5-2ce3-4952-b594-ec4845c94745, Duration=653ms) [2022-07-10T08:01:44.849Z] Executing 'Functions.FanOutFanIn' (Reason='(null)', Id=50f96a8d-16c2-4d76-be47-6964fa1b0782) [2022-07-10T08:01:44.964Z] Function "FanOutFanIn" (Id: 50f96a8d-16c2-4d76-be47-6964fa1b0782) invoked by Java Worker [2022-07-10T08:01:44.973Z] Executed 'Functions.FanOutFanIn' (Succeeded, Id=50f96a8d-16c2-4d76-be47-6964fa1b0782, Duration=141ms) [2022-07-10T08:01:45.020Z] Executing 'Functions.F1' (Reason='(null)', Id=26b7a005-0b9c-495a-a879-0b619888db67) [2022-07-10T08:01:45.024Z] Function "F1" (Id: 26b7a005-0b9c-495a-a879-0b619888db67) invoked by Java Worker [2022-07-10T08:01:45.025Z] Executed 'Functions.F1' (Succeeded, Id=26b7a005-0b9c-495a-a879-0b619888db67, Duration=5ms) [2022-07-10T08:01:45.120Z] Executing 'Functions.FanOutFanIn' (Reason='(null)', Id=7be266ae-4d91-4a3d-a616-7e672f57fb09) [2022-07-10T08:01:45.157Z] Function "FanOutFanIn" (Id: 7be266ae-4d91-4a3d-a616-7e672f57fb09) invoked by Java Worker [2022-07-10T08:01:45.158Z] Executed 'Functions.FanOutFanIn' (Succeeded, Id=7be266ae-4d91-4a3d-a616-7e672f57fb09, Duration=39ms) [2022-07-10T08:01:45.188Z] Executing 'Functions.F2' (Reason='(null)', Id=0db408a9-82cd-4186-ab55-f1a709c1be74) [2022-07-10T08:01:45.195Z] F2 with i = [10] [2022-07-10T08:01:45.196Z] Function "F2" (Id: 0db408a9-82cd-4186-ab55-f1a709c1be74) invoked by Java Worker [2022-07-10T08:01:45.198Z] Executed 'Functions.F2' (Succeeded, Id=0db408a9-82cd-4186-ab55-f1a709c1be74, Duration=12ms) [2022-07-10T08:01:45.198Z] Executing 'Functions.F2' (Reason='(null)', Id=852b89be-34f7-4117-ba3a-c4afd05c63ff) [2022-07-10T08:01:45.200Z] F2 with i = [30] [2022-07-10T08:01:45.200Z] Function "F2" (Id: 852b89be-34f7-4117-ba3a-c4afd05c63ff) invoked by Java Worker [2022-07-10T08:01:45.201Z] Executed 'Functions.F2' (Succeeded, Id=852b89be-34f7-4117-ba3a-c4afd05c63ff, Duration=3ms) [2022-07-10T08:01:45.209Z] Executing 'Functions.F2' (Reason='(null)', Id=c192396d-bef0-4519-8557-be2f1e6e3931) [2022-07-10T08:01:45.212Z] F2 with i = [50] [2022-07-10T08:01:45.212Z] Function "F2" (Id: c192396d-bef0-4519-8557-be2f1e6e3931) invoked by Java Worker [2022-07-10T08:01:45.212Z] Executed 'Functions.F2' (Succeeded, Id=c192396d-bef0-4519-8557-be2f1e6e3931, Duration=3ms) [2022-07-10T08:01:45.221Z] Executing 'Functions.F2' (Reason='(null)', Id=d1eae73c-da8b-4c09-b310-660f6fed49e0) [2022-07-10T08:01:45.223Z] F2 with i = [20] [2022-07-10T08:01:45.224Z] Function "F2" (Id: d1eae73c-da8b-4c09-b310-660f6fed49e0) invoked by Java Worker [2022-07-10T08:01:45.224Z] Executed 'Functions.F2' (Succeeded, Id=d1eae73c-da8b-4c09-b310-660f6fed49e0, Duration=2ms) [2022-07-10T08:01:45.259Z] Executing 'Functions.FanOutFanIn' (Reason='(null)', Id=b076da01-e48b-4f42-b3e2-c1154815e20c) [2022-07-10T08:01:45.262Z] Function "FanOutFanIn" (Id: b076da01-e48b-4f42-b3e2-c1154815e20c) invoked by Java Worker [2022-07-10T08:01:45.263Z] Executed 'Functions.FanOutFanIn' (Succeeded, Id=b076da01-e48b-4f42-b3e2-c1154815e20c, Duration=3ms) [2022-07-10T08:01:45.281Z] Executing 'Functions.F2' (Reason='(null)', Id=82a0761d-b90e-425e-a7a5-78e83fa85006) [2022-07-10T08:01:45.283Z] F2 with i = [40] [2022-07-10T08:01:45.284Z] Function "F2" (Id: 82a0761d-b90e-425e-a7a5-78e83fa85006) invoked by Java Worker [2022-07-10T08:01:45.284Z] Executed 'Functions.F2' (Succeeded, Id=82a0761d-b90e-425e-a7a5-78e83fa85006, Duration=3ms) [2022-07-10T08:01:45.327Z] Executing 'Functions.FanOutFanIn' (Reason='(null)', Id=61407928-202f-4053-a0a7-d99bc2850bf4) [2022-07-10T08:01:45.340Z] Function "FanOutFanIn" (Id: 61407928-202f-4053-a0a7-d99bc2850bf4) invoked by Java Worker [2022-07-10T08:01:45.342Z] Executed 'Functions.FanOutFanIn' (Succeeded, Id=61407928-202f-4053-a0a7-d99bc2850bf4, Duration=15ms)
アクティビティ関数F1で生成されたListの順番ではなく、バラバラに値が渡されていることが確認できます。
結果はhttp://localhost:7071/api/クライアント関数名
でブラウザでアクセスしたときに出てくるstatusQueryGetUri
のURLにアクセスすると以下のようなJSONが返ってきて確認できます。
{"name":"FanOutFanIn","instanceId":"9f733526-7db5-42fd-8bef-94eeff0804ac","runtimeStatus":"Completed","input":null,"customStatus":"","output":450,"createdTime":"2022-07-10T08:01:44Z","lastUpdatedTime":"2022-07-10T08:01:45Z"}
outputの項目はアクティビティ関数F1の各値を3倍したものを足し合わせたものである450が返ってきていることが確認できます。
これでファンアウト/ファンインパターンのDurable Functionsの実装ができました。
参考
たまたま見つけたのですが、Durable Functions for Javaで使われているライブラリは以下のもののようです。
ソース
現状のソースです。