Lambdaの例外エラーの通知方法を考える で決めた非同期(S3トリガーやEventBridgeなど)の「パターン3. 送信先(失敗時)」 と同期(APIなど)の「パターン5. サブスクリプションフィルター」の設定をメモしておきます。

構成

Lambda【例外発生!】 →
	【非同期の場合】送信先(失敗時) →
	【同期の場合】CloudWatchのサブスクリプションフィルター → Lambda(通知用) →
		SNS → Email
		  → Chatbot(グローバル) → Slack
		  → Lambda(Slack通知用) → Slack

通知用のLambdaを使わずにメトリクスフィルター+CloudWatch AlarmからSNSに繋げると実装不要ですが、エラー内容をメールやSlackに記載できないので、サブスクリプションフィルターを使用します。
また、フィルターは正規表現に対応していないので、通知したくないものも通知されてしまうので、通知用のLambdaで弾きます。

LambdaからIncomming WebhookやSDKで、直接Slackに投げても良いけど、
SNSは月100万リクエスト無料なのと、Chatbotは追加料金が掛からないので、
LambdaはSNSに投げる事に集中して、通知はSNSに任せてしまうのが良さそう。

Event received is not supported (see https://docs.aws.amazon.com/chatbot/latest/adminguide/related-services.html ): 

CloudWatchロググループ us-east-1:/aws/chatbot/warningに、上記エラーが出てSlackには通知されず。(メール通知はされる)
LambdaからSNSに入れたものはChatbotが受け取ってくれない。
障害ポイント増えそうだけど、SNSからLambdaで直接Slackに投げるしかなさそう。

エラーをSNS経由でSlackに通知する

料金 – Amazon SNS | AWS

APIリクエスト: 毎月最初の 100 万 Amazon SNS リクエストは無料です。
通知配信: Email/Email-JSON -> 無料利用枠: 1,000 件の通知

料金 – AWS Chatbot | AWS

AWS Chatbot に追加料金はかかりません。

SNSトピックを作成

通知の為のSNSトピックと通知先のサブスクリプションを設定します。

https://ap-northeast-1.console.aws.amazon.com/sns/v3/home?region=ap-northeast-1#/create-topic

トピックの作成
	詳細
		タイプ: ●スタンダード
		名前: LambdaError
	[トピックの作成]


サブスクリプションを作成(メール通知)

サブスクリプションの作成
	詳細
		プロトコル: Eメール
		エンドポイント: (通知先のメールアドレス)
	[サブスクリプションの作成]

メール認証

入力したメールアドレスに認証のメールが届くので、リンクにアクセスして有効にします。

AWS Notification - Subscription Confirmation
	[Confirm subscription]

Chatbotとサブスクリプションを作成(Slack通知)

エラーをSNS経由でSlackに通知する

ChatbotとSlackを連携されます。
できれば個人アカウントでなく、管理用のアカウントがあればそれを使った方が良さそうです。
アカウント削除したら通知されなくなりそう。(退職時あるある)

Lambdaの送信先(失敗時)を設定

非同期(S3トリガーやEventBridgeなど)のLambda関数に設定します。

https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/functions

対象の関数を選択
[設定] → [送信先] → 送信先を追加
送信先を追加
	送信先の設定
		送信先: LambdaError
	[保存]

IAMロールにポリシー(sns:Publish)が自動作成&追加されますが、
関数毎にポリシーが乱立するので、予め作成して追加しておいても良さそう。

エラーが出る場合もポリシーを予め追加しておけばOK
> The provided execution role does not have permissions to call Publish on SNS
> Role XXXXXXXXXXXXXXXXXXXXX trusts too many services, expected only 1.

AWSLambdaSNSTopicDestinationExecutionRole_LambdaError

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "arn:aws:sns:*:*:LambdaError"
        }
    ]
}

動作確認

非同期なので、テストイベントでは送信されません。
本番稼働前やテスト環境ならコードを壊して例外エラーにするでも良いですが、定期的に確認できるようにしたいので、エラーになるパターンをコードに仕込んでみました。

createMediaConvertJob

S3トリガーなので、設置パスの先頭で判断するようにしました。

最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/createMediaConvertJob/lambda_function.py

import boto3
import os
import datetime
import urllib.parse
import secrets

dynamodb = boto3.resource('dynamodb')
table_to_output = dynamodb.Table('MediaConvertJobIdToOutput')
table_to_job_id = dynamodb.Table('MediaConvertInputFileToJobId')

OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET']
MEDIACONVERT_ENDPOINT_URL = os.environ['MEDIACONVERT_ENDPOINT_URL']
MEDIACONVERT_ROLE = os.environ['MEDIACONVERT_ROLE']
MEDIACONVERT_JOB_TEMPLATE = os.environ['MEDIACONVERT_JOB_TEMPLATE']
OUTPUT_SUFFIX_PATH = '_' + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
+ TEST_EXCEPTION_PATH = 'test_exception/'
+
+ class TestException(Exception):
+     pass

def lambda_handler(event, context):
    print(event)

    input_bucket = 's3://' + event['Records'][0]['s3']['bucket']['name'] + '/' # 対象動画のバケット
    input_file = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key']) # 対象動画のフォルダを含むファイル名
    output_path = input_file + OUTPUT_SUFFIX_PATH + '/'
    key_file = os.path.splitext(os.path.basename(input_file))[0] + '.key'
    print({ 'input_bucket': input_bucket, 'input_file': input_file, 'output_path': output_path, 'key_file': key_file })

+     # 通知テストの為のエラーパターン
+     if input_file.startswith(TEST_EXCEPTION_PATH):
+         raise TestException('Test exception')
+
    # MediaConvertパラメータ
    key_value = secrets.token_hex(16)
    settings = {
        'Inputs': [
            {
                'FileInput': input_bucket + input_file
            }
        ],
        'OutputGroups': [
            {
                'OutputGroupSettings': {
                    'HlsGroupSettings': {
                        'Destination': 's3://' + OUTPUT_BUCKET + '/' + output_path,
                        'Encryption': {
                            'EncryptionMethod': 'AES128',
                            'StaticKeyProvider': {
                                'StaticKeyValue': key_value,
                                'Url': os.path.basename(output_path + key_file)
                            },
                            'Type': 'STATIC_KEY'
                        }
                    }
                }
            }
        ]
    }
    print(settings)

    # MediaConvertジョブ作成
    mediaconvertClient = boto3.client('mediaconvert', endpoint_url = MEDIACONVERT_ENDPOINT_URL)
    result = mediaconvertClient.create_job(
        Role = MEDIACONVERT_ROLE,
        JobTemplate = MEDIACONVERT_JOB_TEMPLATE,
        Settings = settings
    )
    print(result)

    job_id = result['Job']['Id']
    if job_id == '':
        raise Exception('Notfound job_id')

    # DynamoDBに出力情報保存 Tips: Keyファイル作成や保存先を連携するのに使用
    utcnow = datetime.datetime.utcnow()
    creation_time = int(utcnow.timestamp())
    expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp())
    print(table_to_output.update_item(
        Key = {
            'JobId': job_id
        },
        UpdateExpression = 'set OutputPath = :output_path, KeyFile = :key_file, KeyValue = :key_value,'
                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
        ExpressionAttributeValues = {
            ':output_path': output_path,
            ':key_file': key_file,
            ':key_value': key_value,
            ':creation_time': creation_time,
            ':expiration_time': expiration_time
        }
    ))

    # DynamoDBにジョブID保存 Tips: フロントに最新のジョブIDを返却するのに使用
    print(table_to_job_id.update_item(
        Key = {
            'InputFile': input_file
        },
        UpdateExpression = 'set JobId = :job_id,'
                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
        ExpressionAttributeValues = {
            ':job_id': job_id,
            ':creation_time': creation_time,
            ':expiration_time': expiration_time
        }
    ))

対象のS3のtest_exception/以下に元素材をアップロードして、
暫く待つとメールが届きます。(リトライオーバー後)



receiveMediaConvertJobState

EventBridge(MediaConvert)トリガーなので、complete時に取得している出力パスの先頭で判断するようにしました。

最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/receiveMediaConvertJobState/lambda_function.py

import boto3
import os
import datetime
import codecs
import json
import re

dynamodb = boto3.resource('dynamodb')
table_to_status = dynamodb.Table('MediaConvertJobIdToStatus')
table_to_output = dynamodb.Table('MediaConvertJobIdToOutput')

s3Client = boto3.client('s3')

OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET']
INFO_FILE = 'info.json'
+ TEST_EXCEPTION_PATH = 'test_exception_state/'
+
+ class TestException(Exception):
+     pass

def lambda_handler(event, context):
    print(event)

    status = event['detail']['status']
    if status == 'PROGRESSING' or status == 'INPUT_INFORMATION':
        return

    utcnow = datetime.datetime.utcnow()
    creation_time = int(utcnow.timestamp())
    expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp())

    if status == 'STATUS_UPDATE':
        status_update(event, creation_time, expiration_time)
    elif status == 'COMPLETE':
        complete(event, creation_time, expiration_time)
    elif status == 'ERROR':
        error(event, creation_time, expiration_time)

def status_update(event, creation_time, expiration_time):
    progress_rate = event['detail']['jobProgress']['jobPercentComplete']
    print({ 'progress_rate': progress_rate })

    # DynamoDBにステータス保存 Tips: フロントに進捗を返却するのに使用
    print(table_to_status.update_item(
        Key = {
            'JobId': event['detail']['jobId']
        },
        UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
        ExpressionAttributeValues = {
            ':job_status': 'PROGRESSING',
            ':progress_rate': progress_rate,
            ':creation_time': creation_time,
            ':expiration_time': expiration_time
        }
    ))

def complete(event, creation_time, expiration_time):
    duration_ms = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['durationInMs']
    width_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['widthInPx']
    height_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['heightInPx']
    playlists = {}
    for playlist_file_path in event['detail']['outputGroupDetails'][0]['playlistFilePaths']:
        playlists[os.path.splitext(playlist_file_path)[1]] = os.path.basename(playlist_file_path)
    print({ 'duration_ms': duration_ms, 'width_px': width_px, 'height_px': height_px, 'playlists': playlists })

    # DynamoDBから出力情報取得
    response = table_to_output.get_item(
         Key = {
            'JobId': event['detail']['jobId']
        }
    )
    print(response)
    output_path = response['Item']['OutputPath']

+     # 通知テストの為のエラーパターン
+     if output_path.startswith(TEST_EXCEPTION_PATH):
+         raise TestException('Test exception')
+
    # S3にkeyファイル作成
    print(s3Client.put_object(
        Bucket = OUTPUT_BUCKET,
        Key = output_path + response['Item']['KeyFile'],
        Body = codecs.decode(response['Item']['KeyValue'], 'hex_codec')
    ))

    # DynamoDBにステータスと動画の情報保存 Tips: フロントに成功を返却するのに使用
    print(table_to_status.update_item(
        Key = {
            'JobId': event['detail']['jobId']
        },
        UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
                            + ' DurationMs = :duration_ms, WidthPx = :width_px, HeightPx = :height_px, OutputPath = :output_path, Playlists = :playlists,'
                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
        ExpressionAttributeValues = {
            ':job_status': 'COMPLETE',
            ':progress_rate': 100,
            ':duration_ms': duration_ms,
            ':width_px': width_px,
            ':height_px': height_px,
            ':output_path': output_path,
            ':playlists': playlists,
            ':creation_time': creation_time,
            ':expiration_time': expiration_time
        }
    ))

    # S3に情報ファイル作成 Tips: 動画の情報を永続化。DB等に保存して配信管理した方が良い
    print(s3Client.put_object(
        Bucket = OUTPUT_BUCKET,
        Key = output_path + INFO_FILE,
        Body = json.dumps({
            'duration_ms': duration_ms,
            'width_px': width_px,
            'height_px': height_px,
            'playlists': playlists
        })
    ))

def error(event, creation_time, expiration_time):
    error_code = event['detail']['errorCode']
    error_message = re.sub('s3://[a-z0-9.-]*/', '', event['detail']['errorMessage'])
    print({ 'error_code': error_code, 'error_message': error_message })

    # DynamoDBにステータスとエラー情報保存 Tips: フロントにエラーを返却するのに使用
    print(table_to_status.update_item(
        Key = {
            'JobId': event['detail']['jobId']
        },
        UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
                            + ' ErrorCode = :error_code, ErrorMessage = :error_message,'
                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
        ExpressionAttributeValues = {
            ':job_status': 'ERROR',
            ':progress_rate': 100,
            ':error_code': error_code,
            ':error_message': error_message,
            ':creation_time': creation_time,
            ':expiration_time': expiration_time
        }
    ))

対象のS3のtest_exception_state/以下に元素材をアップロードして、
暫く待つとメールが届きます。(リトライオーバー後)



サブスクリプションフィルターを設定

同期(APIなど)のLambdaのCloudWatchロググループに設定します。

通知用のLambda関数を作成

https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/create/function

関数の作成
	基本的な情報
		関数名: notifyErrorLog
		ランタイム: Python 3.9
	[関数の作成]

コード

最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/notifyErrorLog/lambda_function.py

import boto3
import os
import base64
import gzip
import json

sns = boto3.client('sns')
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
SNS_SUBJECT_PREFIX = '[WARNING]notifyErrorLog report for '
PREFIX_FILTER = '[ERROR] '
AWS_REGION = os.getenv("AWS_REGION")

def lambda_handler(event, context):
    print(event)

    messages = []
    data = json.loads(gzip.decompress(base64.b64decode(event['awslogs']['data'])))
    for log_event in data['logEvents']:
        if log_event['message'].startswith(PREFIX_FILTER):
            messages.append(log_event['message'].replace('\u00a0', ' ').split('\n'))
    if messages == []:
        return

    log_group = data['logGroup']
    body = { 'owner': data['owner'], 'region': AWS_REGION, 'logGroup': log_group, 'logStream': data['logStream'], 'message': messages }
    print(body)

    print(sns.publish(
        TopicArn = SNS_TOPIC_ARN,
        Subject = SNS_SUBJECT_PREFIX + log_group,
        Message = json.dumps(body)
    ))

環境変数

[設定] → [環境変数] → 編集
環境変数の編集
	環境変数
		[環境変数の追加]
		キー: SNS_TOPIC_ARN
		値: (作成したSNSトピックのARN)
	[保存]

送信先を追加

この関数も非同期なので、Lambdaの送信先(失敗時)を設定 と同様の設定をします。

[設定] → [送信先] → 送信先を追加
送信先を追加
	送信先の設定
		送信先: LambdaError
	[保存]

CloudWatchのサブスクリプションフィルターを設定

https://ap-northeast-1.console.aws.amazon.com/cloudwatch/home?region=ap-northeast-1#logsV2:log-groups

対象のロググループを選択
[サブスクリプションフィルター] → [作成] → Lambda サブスクリプションフィルターを作成
Lambda サブスクリプションフィルターを作成
	送信先を選択
		Lambda 関数: notifyErrorLog
	ログ形式とフィルターを設定
		サブスクリプションフィルターのパターン: "[ERROR] "
			※入力値を出力していると一致させられてしまうけど、先頭一致の設定が出来ない為、Lambda側で弾く
		サブスクリプションフィルター名: ERROR
〜ここからは任意〜
	パターンをテスト
		イベントメッセージをログ記録
			[ERROR] NameError: name 'xxx' is not defined Traceback (most recent call last):
			{'input_file': '[ERROR] '}
		[パターンをテスト]
〜ここまでは任意〜
	[ストリーミングを開始]


動作確認

同期なので、テストイベントで送信できますが、
Lambda@Edgeではログの保存場所がテストとCloudFront経由で違うのと、
定期的なテストを容易に行えるように、URLパラメータで判断するようにしました。
どこからでも叩けるので、対象の値は環境毎に変えた方が良いです。

API GatewayまたはApplication Load Balancer呼び出しのLambda

最新のコードこちら
mediaConvertInputFileToJobIdApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertInputFileToJobIdApi/lambda_function.py
mediaConvertJobIdToStatusApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertJobIdToStatusApi/lambda_function.py

変更箇所のみ記載

+ TEST_EXCEPTION_KEY = 'test_exception'
+ TEST_EXCEPTION_VALUE = '12345' # Tips: 適当な値に変更してください

+ class TestException(Exception):
+     pass
+
def lambda_handler(event, context):
    print(event)
+
+     # 通知テストの為のエラーパターン
+     if ('queryStringParameters' in event and
+         TEST_EXCEPTION_KEY in event['queryStringParameters'] and
+         event['queryStringParameters'][TEST_EXCEPTION_KEY] == TEST_EXCEPTION_VALUE):
+         raise TestException('Test exception')

対象のURLに「?test_exception=12345」を追加してリクエスト、
暫く待つとメールが届きます。(ログ出力がトリガーなので早い)



CloudFront(Lambda@Edge)

最新のコードこちら
mediaConvertInputFileToJobIdEdgeApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertInputFileToJobIdEdgeApi/lambda_function.py
mediaConvertJobIdToStatusEdgeApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertJobIdToStatusEdgeApi/lambda_function.py

変更箇所のみ記載

+ TEST_EXCEPTION_KEY = 'test_exception'
+ TEST_EXCEPTION_VALUE = '12345' # Tips: 適当な値に変更してください

+ class TestException(Exception):
+     pass
+
def lambda_handler(event, context):
    print(event)
+     query = urllib.parse.parse_qs(event['Records'][0]['cf']['request']['querystring'])
+
+     # 通知テストの為のエラーパターン
+     if (query != {} and
+         TEST_EXCEPTION_KEY in query and
+         query[TEST_EXCEPTION_KEY][0] == TEST_EXCEPTION_VALUE):
+         raise TestException('Test exception')

同様に対象のURLに「?test_exception=12345」を追加してリクエスト、
暫く待つとメールが届きます。(ログ出力がトリガーなので早い)



Lambda@Edgeは、応答したキャッシュサーバーのリージョンのCloudWatch Logsに出力されるので、それぞれに設定するか、リクエストの多い所+テストのURLを叩く所だけにするとかでも良さそう。

動作確認(通知用Lambdaがエラーの場合)

ここがエラーになると通知されないので、エラーになるようにして試してみました。
方法はいくつかありますが、今回はnotifyErrorLogの環境変数(SNS_TOPIC_ARN)を変更(後ろに[-test]追加)して、SNSへのpublishでエラーになるようにしました。
戻し忘れもないようにと。

上で動作確認したURLにリクエストして、暫く待つとメールが届きます。(リトライオーバー後)



コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です