ビジネス課題への解決策(アイディア)と、新たな発想(+α)が見つかるIT情報メディア

Menu
  1. TOP
  2. データ活用
  3. Amazon SQS:Simple Queue ServiceでFIFOを実装してみる ~後編~

Amazon SQS:Simple Queue ServiceでFIFOを実装してみる ~後編~

  • LINEで送る
  • このエントリーをはてなブックマークに追加

今回は前回に引き続きAmazon SQS:Simple Queue Serviceについて書かせて頂きたいと思います。

前回記事:Amazon SQS:Simple Queue ServiceでFIFOを実装してみる ~前編~

メッセージの受け取り処理の実装

前回記事ではメッセージの送信と受信を行い、受信時にランダムな順番でメッセージを受け取る処理をしているところで終わりました。

今回の記事ではランダムな順番ではなく、送信した順番でメッセージを受け取る処理の実装を紹介したいと思います。

実装時のポイントとしては下記の点があります。

  1. SQSへメッセージを送信する。送信時に処理順をプロパティとして送信する。
  2. メッセージを取得する。
  3. リストに取得したメッセージの処理順のプロパティとメッセージの内容を追加する。
  4. 取得したメッセージの処理順プロパティを確認し、最新の処理順のメッセージの場合は処理フラグをtrueにする。
  5. 処理フラグがtrueの場合、comparatorを使って3のリストの順番を処理順にソートする。
  6. リストのメッセージをループ処理し、メッセージの処理順が最新の処理順カウンターと一致している場合はメッセージの処理を実行する。一致しない場合は2へ戻る。

1の送信部分は前回記事で書きましたので、2以降のポイントをそれぞれ実装したサンプルコードは下記のようになります。ソース内のコメントに各ポイントがどの処理に該当するかが書いてあります。

private static void reciveMessagesInFifo(){ ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); //メッセージを10件一括で取得する設定 receiveMessageRequest.setMaxNumberOfMessages(SQS_MAX_NUM_OF_MESSAGES); //取得時にFifoCounterの属性を取得 Collection messageAttributeNames = new ArrayList(); messageAttributeNames.add("FifoCounter"); receiveMessageRequest.setMessageAttributeNames(messageAttributeNames); //2.メッセージを取得する List messages = sqs.receiveMessage(receiveMessageRequest).getMessages(); //処理フラグ boolean processFlag = falsefor(Message m :messages){ String fifoCounterString = m.getMessageAttributes() .get("FifoCounter").getStringValue(); int fifoCounter = Integer.parseInt(fifoCounterString); MessageWithProcNum messageWithFifo = new MessageWithProcNum(m, fifoCounter); //3.リストに取得したメッセージの処理順のプロパティとメッセージの内容を追加する procMessageList.add(messageWithFifo); //4.取得したメッセージの処理順プロパティを確認し、最新の処理順のメッセージの場合は処理フラグをtrueにする。 if(fifoCounter == processingCounter){ processFlag = true; } } if(processFlag){ //5.処理フラグがtrueの場合、comparatorを使って3のリストの順番を処理順にソートする。 Collections.sort(procMessageList,new procNumComperator()); int nextProcessNumber = (procMessageList.get(processingCounter) != null) ? procMessageList.get(processingCounter).getProcNumber() : null//6.リストのメッセージをループ処理し、メッセージの処理順が最新の処理順カウンターと一致している場合は //メッセージの処理を実行する。 while(processingCounter == nextProcessNumber){ MessageWithProcNum message = procMessageList.get(processingCounter); //メッセージの処理実行 ProcessMessage(message); processingCounter++; if(procMessageList.size() == processingCounter){ break; } nextProcessNumber = (procMessageList.get(processingCounter) != null) ? procMessageList.get(processingCounter).getProcNumber() : null; } } reciveMessagesInFifo(); }

3のリストに追加するクラスとしてMessageと処理順のプロパティを持つMessageWithFifoというクラスを下記のように定義しています。

private static class MessageWithProcNum{ private int procNumber; private Message message; public MessageWithProcNum(Message m, int fifoNumber){ this.message = m; this.setProcNumber(fifoNumber); } public Message getMessage() { return message; } public void setMessage(Message message) { this.message = message; } public int getProcNumber() { return procNumber; } public void setProcNumber(int procNumber) { this.procNumber = procNumber; } }

また、5.の処理順にソースする処理で使用するComparatorは下記のように定義しています。

private static class procNumComperator implements Comparator{ public int compare(MessageWithProcNum m1, MessageWithProcNum m2) { return m1.getProcNumber() < m2.getProcNumber() ? -1 : 1; } }

以上の実装で、ProcessMessageメソッドがメッセージ送信時のfifoCounter順に呼び出されて、引数のmessageが順番に処理されます。

ただし、こちらのサンプルは下記の問題があるのでご注意下さい。

  • メッセージ送信を一度止めて、再開するなどして処理順番がリセットされた場合、受信側もリセットしないと処理順番がずれてしまう
  • 再帰的にメッセージを取得しているので、APIの使用数が増えてしまう。

2つ目の問題は、キュー作成時にロングポーリングの設定ができるので、最大20秒間ポーリングしてAPIの実行回数をある程度減らすことは可能です。

まだまだ実用性には問題ありな実装ですが、SQSでFIFOを実装する際の参考になれば幸いです。


追記:関連コンテンツ

Amazon SQS:Simple Queue ServiceでFIFOを実装してみる ~前編~

データの抽出や加工、連携にお悩みではありませんか?

20年以上の実績に裏打ちされた信頼のデータ連携ツール「Waha! Transformer」で、自社に眠るデータを有効活用。まずは無料のハンズオンセミナーや体験版で効果を実感していただけます。

> 純国産ETLツール「Waha! Transformer」

Waha! Transformer
メールマガジンの登録はこちらから
メルマガ登録 お問い合わせ