Azure Functionsのお勉強メモ(11)Queue Trigger利用の深掘り(一部未解決)

最近、Azure Functionsのお勉強をチマチマと始めました。色々と分からないことが多かったのでお勉強メモをまとめて記します。

どこまで続くかわからないお勉強メモ。今日は11回目です。今回はQueue Triggerの利用について深掘りしてみます。過去のものは以下を参照。

Queueにメッセージを送る

テスト用にQueueにメッセージを送るプログラムを以下の記事で作成しました。

miyohide.hatenablog.com

記事ではAzuriteで作成したエミュレーター用の記述ですが、以下のように少し書き換えてやればAzure上でのQueueに対してデータを送ることができます。

# 省略
client = Azure::Storage::Queue::QueueService.create(
  storage_account_name: ストレージアカウント名,
  storage_access_key: アクセウスキー,
  storage_queue_host: "https://ストレージアカウント名.queue.core.windows.net/"
)
# 省略

上記記事の初出時は、メッセージ送付を

client.create_message(キュー名, "test message #{index}")

としていましたが、これではAzure FunctionsのQueue Triggerにて以下のエラーが発生しました。

Unable to translate bytes [B5] at index 0 from specified code page to Unicode.

f:id:miyohide:20210530164826p:plain

これを解決するには、メッセージをBase64エンコードしてあげれば良いです。Rubyのコードで書けば以下の通り。

client.create_message(キュー名, Base64.encode64("test message #{index}"))

多くのQueueを送ったときにNullPointer Exceptionが発生する(未解決)

上記のプログラムでQueueに多くの(といっても100件ぐらい)メッセージを送ると以下のメッセージがでて処理が失敗しました。

Result: Failure Exception: NullPointerException: Stack: java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at com.microsoft.azure.functions.worker.broker.JavaMethodInvokeInfo.invoke(JavaMethodInvokeInfo.java:22) at com.microsoft.azure.functions.worker.broker.EnhancedJavaMethodExecutorImpl.execute(EnhancedJavaMethodExecutorImpl.java:55) at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invokeMethod(JavaFunctionBroker.java:57) at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:33) at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:10) at com.microsoft.azure.functions.worker.handler.MessageHandler.handle(MessageHandler.java:45) at com.microsoft.azure.functions.worker.JavaWorkerClient$StreamingMessagePeer.lambda$onNext$0(JavaWorkerClient.java:92) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.NullPointerException at org.springframework.cloud.function.adapter.azure.FunctionInvoker.handleRequest(FunctionInvoker.java:117) at com.github.miyohide.QueueHandler.queueHandler(QueueHandler.java:17) ... 16 more

f:id:miyohide:20210530170452p:plain

正直、この原因はわからなかったので、対処療法を行います。

対処療法としては以下二つを試しました。

  1. batchSizeを1にする
  2. batchSizeを2以上にしつつ、visibilityTimeoutを設定する

以下ドキュメントに書かれている通りのことをやっただけです。

docs.microsoft.com

batchSizeを1にすると1つのインスタンスで並行に動くことは無くなります。どうしても並行に動かしたいと言うことであれば、NullPointerExceptionが発生することは受け入れて、リトライで問題なければOKというスタンスでも良いかもしれません。動かすものによるかと思いますが、今回のサンプルではhost.jsonを以下のようにしました。

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[1.*, 2.0.0)"
  },
  "extensions": {
    "queues": {
      "visibilityTimeout": "00:00:05",
      "batchSize": 5
    }
  }
}

すると、1回目では失敗したメッセージが2回目では正常に処理できていることを確認できました。

1回目の処理。NullPointerExceptionが発生しています。 f:id:miyohide:20210530170705p:plain

2回目の処理。特に例外は発生していません。 f:id:miyohide:20210530170718p:plain