Techfirm Cloud Architect Blog

テックファーム株式会社クラウドインフラグループのブログ

EventBridgeスケジューラの実行失敗をDLQとEventBridge PipesでSNSトピックスに通知する

EventBridgeスケジューラには、EventBridgeルールによるスケジュール実行に比べ、より柔軟なターゲット実行がサポートされています。
以前は単純なAWSサービスのAPI呼び出しであっても、LambdaやStep Functions、SSMオートメーションなど別サービスを経由する必要がありました。
EventBridgeスケジューラでは、EventBridgeスケジューラSDKやAWS CLIを使ってカスタマイズしたターゲットを設定できるため、ノーコードでシンプルな構成を構築できます。

定期実行タスクをEventBridgeスケジューラへ置き換える中で、監視の観点から次の課題が見えてきました。 CloudWatchメトリクスにはスケジュールごとのディメンションがなく、すべてスケジュールグループにまとめられています。そのため、ターゲットアクションが失敗しても、どのスケジュールで失敗したのかをメトリクスだけでは特定できません。

そこでEventBridgeスケジューラがターゲットアクションの実行失敗を検知する仕組みを、AWSのマネージドサービスのみを組み合わせ、Lambdaなどのコードを書かずに構築しました。その方法を紹介します。

全体構成

構成図

EventBridgeスケジューラはターゲットアクションの呼び出しに失敗した場合、EventBridgeルールと同様にDLQへエラーメッセージを保存する機能があります。 DLQはSQSスタンダードキューであるため、キューに入ったメッセージをSNSへ連携できれば、ターゲットアクションの実行失敗を通知できます。

SQSからメッセージを取り出す際は通常Lambda連携を利用しますが、今回はLambdaのコード実装・管理を不要にするため、EventBridge Pipesを使ってSQSとSNSの連携を行います。

EventBridge Pipesはイベントプロデューサーとコンシューマーを連携させるサービスです。イベントのポーリング、フィルタリング、情報補完(エンリッチ)、トランスフォーム、ルーティングなど、従来は個別に実装していた処理をまとめて提供します。これにより、開発と運用の両面でイベント駆動アプリケーションの構築を省力化できます。

Terraformでリソースを作成する

Terraformを使って構築した内容を紹介します。

「ノーコード」と言いながらTerraformのコードを書いている、というツッコミはご容赦ください。本記事での「ノーコード」とは、PythonやNode.jsなどのアプリケーション開発言語を書かずに済むという意味で用いています。詳細な設定内容を共有する観点から、Terraformのコードを示して解説します。

EventBridgeスケジューラのターゲットアクションとして、EC2インスタンスを停止する構成にします。

スケジュールリソースの作成

まずは次のリソースを作成します

リソース 設定内容
EC2停止スケジュール 毎日決まった時間にEC2の停止APIを実行します。DLQを設定しています。
ターゲット実行ロール EventBridgeスケジューラがターゲットアクションを実行するためのIAMロールです。
DLQ ターゲットアクションの失敗メッセージを記録するSQSスタンダードキューです。
########################################
# Data and Settings
########################################
data "aws_caller_identity" "current" {}

########################################
# EventBridge Scheduler
########################################
resource "aws_scheduler_schedule" "ec2_stop" {
  name        = "ec2-stop"
  state       = "ENABLED"

  flexible_time_window {
    mode = "OFF"
  }

  schedule_expression           = "cron(00 20 * * ? *)"
  schedule_expression_timezone  = "Asia/Tokyo"

  target {
    arn      = "arn:aws:scheduler:::aws-sdk:ec2:stopInstances"
    role_arn = aws_iam_role.scheduler_role.arn

    input = jsonencode({
      InstanceIds = [
        "i-0000000aaaaaaaaaa"
      ]
    })

    retry_policy {
      maximum_event_age_in_seconds = 86400
      maximum_retry_attempts       = 0
    }

    dead_letter_config {
        arn = aws_sqs_queue.scheduler_dlq.arn
    }
  }
}

########################################
# EventBridge Scheduler Target IAM Role
########################################
resource "aws_iam_role" "scheduler_role" {
  name               = "scheduler_role"
  assume_role_policy = data.aws_iam_policy_document.scheduler_assumerole.json
}

data "aws_iam_policy_document" "scheduler_assumerole" {
  statement {
    actions = [
      "sts:AssumeRole"
    ]
    principals {
      type = "Service"
      identifiers = [
        "scheduler.amazonaws.com"
      ]
    }
    condition {
      test     = "StringEquals"
      variable = "aws:SourceAccount"
      values = [
        data.aws_caller_identity.current.account_id
      ]
    }
  }
}

resource "aws_iam_role_policy_attachment" "scheduler_role" {
  role       = aws_iam_role.scheduler_role.name
  policy_arn = aws_iam_policy.scheduler_stop_ec2.arn
}

resource "aws_iam_policy" "scheduler_stop_ec2" {
  name        = "scheduler-stop-ec2"
  path        = "/"
  policy      = data.aws_iam_policy_document.scheduler_stop_ec2.json
}

data "aws_iam_policy_document" "scheduler_stop_ec2" {
  statement {
    sid    = "StateMachineStartExecution"
    effect = "Allow"
    actions = [
      "ec2:StopInstances"
    ]
    resources = [
      "*"
    ]
  }
  statement {
    sid    = "DLQSendMessage"
    effect = "Allow"
    actions = [
      "sqs:SendMessage"
    ]
    resources = [
      aws_sqs_queue.scheduler_dlq.arn
    ]
  }
}

########################################
# EventBridge Scheduler Target DLQ
########################################
resource "aws_sqs_queue" "scheduler_dlq" {
  name                    = "scheduler-dlq"
}

resource "aws_sqs_queue_policy" "scheduler_dlq" {
  queue_url = aws_sqs_queue.scheduler_dlq.id
  policy    = data.aws_iam_policy_document.scheduler_dlq_sqspolicy.json
}

data "aws_iam_policy_document" "scheduler_dlq_sqspolicy" {
  version   = "2012-10-17"
  policy_id = "__default_policy_ID"
  statement {
    sid    = "__owner_statement"
    effect = "Allow"

    principals {
      type = "AWS"
      identifiers = [
        "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
      ]
    }

    actions   = ["sqs:*"]
    resources = [aws_sqs_queue.scheduler_dlq.arn]
  }
}

IAMロールには、EventBridge Schedulerがロールを引き受けるための信頼ポリシーを設定します。
ターゲットで実行するec2:stopInstancesの権限に加え、DLQにメッセージ送信するためのsqs:SendMessageも必要です。

DLQにエラーメッセージが送信されることを確認する

ここまでリソース作成ができたら、実際にスケジュールを実行させて、DLQにエラーメッセージが送信されるか確認してみます。
AWSマネジメントコンソールでスケジュールのパターンを1回限りの実行に変更し、直近の日時を設定します。
実行時間を過ぎてから、DLQに設定したSQSキューでメッセージ受信すると、エラーメッセージを取得できます。

スケジュール失敗メッセージ

存在しないインスタンスIDを指定しているので、エラーコードがInvalidInstanceID.NotFoundとなります。

通知用パイプの作成

次にDLQとSNSを連携するEventBridge Pipesを作成します。

リソース 設定内容
EventBridge Pipes DLQからメッセージを受け取り、SNSに送信するパイプです。
EventBridge Pipesロール EventBridge Pipesが処理をおこなうのに使うIAMロールです。
イベント記録CloudWatchロググループ EventBridge Pipesのログ記録先

利用するSNSトピックスの作成は紹介するテンプレートに含みませんので、別途作成してご利用ください。
今回は最終的にメールでメッセージを受信する前提で進めます。

########################################
# EventBridge Pipes
########################################
resource "aws_pipes_pipe" "scheduler_dlq_pipes" {
  depends_on = [
    aws_sqs_queue.scheduler_dlq,
    aws_cloudwatch_log_group.scheduler_dlq_pipes
  ]
  name          = "scheduler-dlq"
  desired_state = "RUNNING"
  role_arn      = aws_iam_role.scheduler_dlq_pipes.arn
  source        = aws_sqs_queue.scheduler_dlq.arn
  target        = ${SNSトピックスARN}

  source_parameters {
    sqs_queue_parameters {
      batch_size = 1
    }
  }

  target_parameters {
    input_template = <<EOF
The scheduler failed to start the target action.

SCHEDULED_TIME: <$.messageAttributes.SCHEDULED_TIME.stringValue>
SCHEDULE_ARN:   <$.messageAttributes.SCHEDULE_ARN.stringValue>
TARGET_ARN:     <$.messageAttributes.TARGET_ARN.stringValue>
ERROR_CODE:     <$.messageAttributes.ERROR_CODE.stringValue>
ERROR_MESSAGE:  <$.messageAttributes.ERROR_MESSAGE.stringValue>
EOF
  }

  log_configuration {
    include_execution_data = ["ALL"]
    level                  = "INFO"
    cloudwatch_logs_log_destination {
      log_group_arn = aws_cloudwatch_log_group.scheduler_dlq_pipes.arn
    }
  }
}

ソース・ターゲットにはさまざまな対応サービスが用意されており、各サービスごとに専用のパラメータが用意されているものがあります。
sqs_queue_parametersではSQSから取得するメッセージのバッチサイズやバッチウィンドウ時間を設定できます。1イベント1通知にしたいので、batch_size = 1としています。
SNSへの専用ターゲットパラメータはありません。
ターゲットの汎用パラメータである入力トランスフォーマーを利用し、メッセージを変換します。

DLQに送信されるメッセージのフォーマットは公式ドキュメントに記載されています。必要な情報だけ整形して通知させています。

EventBridge Pipesに流れるイベントを記録しておくためにCloudWatchロググループを設定できます。ログレベルや、どのPipes実行ステップのイベントを記録するかを指定できます。

########################################
# EventBridge Pipes IAM Role
########################################
resource "aws_iam_role" "scheduler_dlq_pipes" {
  name               = "scheduler-dlq-pipes"
  assume_role_policy = data.aws_iam_policy_document.scheduler_dlq_pipes_assumerole.json
}

data "aws_iam_policy_document" "scheduler_dlq_pipes_assumerole" {
  statement {
    actions = [
      "sts:AssumeRole"
    ]
    principals {
      type = "Service"
      identifiers = [
        "pipes.amazonaws.com"
      ]
    }
    condition {
      test     = "StringEquals"
      variable = "aws:SourceAccount"
      values = [
        data.aws_caller_identity.current.account_id
      ]
    }
  }
}

resource "aws_iam_role_policy_attachment" "scheduler_dlq_pipes" {
  role       = aws_iam_role.scheduler_dlq_pipes.name
  policy_arn = aws_iam_policy.scheduler_dlq_pipes.arn
}

resource "aws_iam_policy" "scheduler_dlq_pipes" {
  name        = "scheduler-dlq-pipes"
  path        = "/"
  policy      = data.aws_iam_policy_document.scheduler_dlq_pipes.json
}

data "aws_iam_policy_document" "scheduler_dlq_pipes" {
  statement {
    sid    = "SqsPipeSource"
    effect = "Allow"
    actions = [
      "sqs:ReceiveMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]
    resources = [
      aws_sqs_queue.scheduler_dlq.arn
    ]
  }
  statement {
    sid    = "SnsPipeTarget"
    effect = "Allow"
    actions = [
      "sns:Publish"
    ]
    resources = [
      ${SNSトピックスARN}
    ]
  }
}

########################################
# Log Group
########################################
resource "aws_cloudwatch_log_group" "scheduler_dlq_pipes" {
  name              = "/aws/vendedlogs/pipes/scheduler-dlq-pipes"
}

IAMロールには、EventBridge Pipesでロールを引き受けるための信頼ポリシーを作成設定します。
付与する権限は、DLQのメッセージ受信に必要な権限と、SNSにメッセージ送信する権限です。

パイプの設定を確認する

DLQに設定したSQSキューの「EventBridge Pipes」タブにパイプ設定が追加されています。

DLQにパイプ設定が追加されている

接続されているパイプの詳細画面で、SQSソースからSNSターゲットにつながるパイプが作成されていることが確認できます。

パイプ画面

SNS通知内容の確認

先ほど試しに実行した失敗イベントの通知が、SNSトピックスに設定したメールアドレスに届いていれば設定は完了です。

通知メール内容

パイプ設定の応用

今回の構成では再実行回数を0回に設定し、失敗イベントをすべて通知対象にしています。そのためパイプにはソースとターゲットのみを指定していますが、EventBridge Pipesにはフィルタリングや情報補完(エンリッチ)も追加でき、より複雑な要件にも対応可能です。

たとえばAPIの一時的なスロットリングを考慮し、再実行回数を3回に設定したうえで「上限までリトライした後のエラーのみ通知したい」という要件を想定します。その場合はパイプにフィルタを追加するだけで、条件を満たすイベントだけをSNSに送信できます。

まとめ

以上のように、コーディングすることなく、AWSサービスを設定していくだけで、どのEventBridgeスケジューラがターゲットアクションに失敗したかを特定する通知ができるようになりました。

実現方法を検討する中で、SQSとSNSの連携に初めてEventBridge Pipesを使ってみました。使う前は、イベント駆動アプリケーション向けのサービスという印象が強く、できることが多い分、設定項目も多く取っつきにくいと感じていました。しかし、今回のように監視用途でシンプルに試せたことで、サービスのコンセプトと機能の使い方がより明確になりました。

EventBridgeスケジューラとEventBridge Pipesは、これまで独自コードで実装していた、サービス間のつなぎ込みを、AWSマネージドサービスの定義だけで置き換えられるサービスです。標準的な処理はAWSに任せ、コア実装に専念できる──そんなAWSの方針を体現したサービスだと感じました。