Lambdaの例外エラーの通知方法を考える で決めた非同期(S3トリガーやEventBridgeなど)の「パターン3. 送信先(失敗時)」 と同期(APIなど)の「パターン5. サブスクリプションフィルター」の設定をメモしておきます。
構成
Lambda【例外発生!】 → 【非同期の場合】送信先(失敗時) → 【同期の場合】CloudWatchのサブスクリプションフィルター → Lambda(通知用) → SNS → Email→ Chatbot(グローバル) → Slack→ Lambda(Slack通知用) → Slack
通知用のLambdaを使わずにメトリクスフィルター+CloudWatch AlarmからSNSに繋げると実装不要ですが、エラー内容をメールやSlackに記載できないので、サブスクリプションフィルターを使用します。
また、フィルターは正規表現に対応していないので、通知したくないものも通知されてしまうので、通知用のLambdaで弾きます。
LambdaからIncomming WebhookやSDKで、直接Slackに投げても良いけど、
SNSは月100万リクエスト無料なのと、Chatbotは追加料金が掛からないので、
LambdaはSNSに投げる事に集中して、通知はSNSに任せてしまうのが良さそう。
Event received is not supported (see https://docs.aws.amazon.com/chatbot/latest/adminguide/related-services.html ):
CloudWatchロググループ us-east-1:/aws/chatbot/warningに、上記エラーが出てSlackには通知されず。(メール通知はされる)
LambdaからSNSに入れたものはChatbotが受け取ってくれない。
障害ポイント増えそうだけど、SNSからLambdaで直接Slackに投げるしかなさそう。
→ エラーをSNS経由でSlackに通知する
APIリクエスト: 毎月最初の 100 万 Amazon SNS リクエストは無料です。
通知配信: Email/Email-JSON -> 無料利用枠: 1,000 件の通知
AWS Chatbot に追加料金はかかりません。
SNSトピックを作成
通知の為のSNSトピックと通知先のサブスクリプションを設定します。
https://ap-northeast-1.console.aws.amazon.com/sns/v3/home?region=ap-northeast-1#/create-topic
トピックの作成 詳細 タイプ: ●スタンダード 名前: LambdaError [トピックの作成]
サブスクリプションを作成(メール通知)
サブスクリプションの作成 詳細 プロトコル: Eメール エンドポイント: (通知先のメールアドレス) [サブスクリプションの作成]
メール認証
入力したメールアドレスに認証のメールが届くので、リンクにアクセスして有効にします。
AWS Notification - Subscription Confirmation [Confirm subscription]
Chatbotとサブスクリプションを作成(Slack通知)
ChatbotとSlackを連携されます。
できれば個人アカウントでなく、管理用のアカウントがあればそれを使った方が良さそうです。
アカウント削除したら通知されなくなりそう。(退職時あるある)
Lambdaの送信先(失敗時)を設定
非同期(S3トリガーやEventBridgeなど)のLambda関数に設定します。
https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/functions
対象の関数を選択 [設定] → [送信先] → 送信先を追加 送信先を追加 送信先の設定 送信先: LambdaError [保存]
IAMロールにポリシー(sns:Publish)が自動作成&追加されますが、
関数毎にポリシーが乱立するので、予め作成して追加しておいても良さそう。
エラーが出る場合もポリシーを予め追加しておけばOK
> The provided execution role does not have permissions to call Publish on SNS
> Role XXXXXXXXXXXXXXXXXXXXX trusts too many services, expected only 1.
AWSLambdaSNSTopicDestinationExecutionRole_LambdaError
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "sns:Publish", "Resource": "arn:aws:sns:*:*:LambdaError" } ] }
動作確認
非同期なので、テストイベントでは送信されません。
本番稼働前やテスト環境ならコードを壊して例外エラーにするでも良いですが、定期的に確認できるようにしたいので、エラーになるパターンをコードに仕込んでみました。
createMediaConvertJob
S3トリガーなので、設置パスの先頭で判断するようにしました。
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/createMediaConvertJob/lambda_function.py
import boto3 import os import datetime import urllib.parse import secrets dynamodb = boto3.resource('dynamodb') table_to_output = dynamodb.Table('MediaConvertJobIdToOutput') table_to_job_id = dynamodb.Table('MediaConvertInputFileToJobId') OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET'] MEDIACONVERT_ENDPOINT_URL = os.environ['MEDIACONVERT_ENDPOINT_URL'] MEDIACONVERT_ROLE = os.environ['MEDIACONVERT_ROLE'] MEDIACONVERT_JOB_TEMPLATE = os.environ['MEDIACONVERT_JOB_TEMPLATE'] OUTPUT_SUFFIX_PATH = '_' + datetime.datetime.now().strftime('%Y%m%d%H%M%S') + TEST_EXCEPTION_PATH = 'test_exception/' + + class TestException(Exception): + pass def lambda_handler(event, context): print(event) input_bucket = 's3://' + event['Records'][0]['s3']['bucket']['name'] + '/' # 対象動画のバケット input_file = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key']) # 対象動画のフォルダを含むファイル名 output_path = input_file + OUTPUT_SUFFIX_PATH + '/' key_file = os.path.splitext(os.path.basename(input_file))[0] + '.key' print({ 'input_bucket': input_bucket, 'input_file': input_file, 'output_path': output_path, 'key_file': key_file }) + # 通知テストの為のエラーパターン + if input_file.startswith(TEST_EXCEPTION_PATH): + raise TestException('Test exception') + # MediaConvertパラメータ key_value = secrets.token_hex(16) settings = { 'Inputs': [ { 'FileInput': input_bucket + input_file } ], 'OutputGroups': [ { 'OutputGroupSettings': { 'HlsGroupSettings': { 'Destination': 's3://' + OUTPUT_BUCKET + '/' + output_path, 'Encryption': { 'EncryptionMethod': 'AES128', 'StaticKeyProvider': { 'StaticKeyValue': key_value, 'Url': os.path.basename(output_path + key_file) }, 'Type': 'STATIC_KEY' } } } } ] } print(settings) # MediaConvertジョブ作成 mediaconvertClient = boto3.client('mediaconvert', endpoint_url = MEDIACONVERT_ENDPOINT_URL) result = mediaconvertClient.create_job( Role = MEDIACONVERT_ROLE, JobTemplate = MEDIACONVERT_JOB_TEMPLATE, Settings = settings ) print(result) job_id = result['Job']['Id'] if job_id == '': raise Exception('Notfound job_id') # DynamoDBに出力情報保存 Tips: Keyファイル作成や保存先を連携するのに使用 utcnow = datetime.datetime.utcnow() creation_time = int(utcnow.timestamp()) expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp()) print(table_to_output.update_item( Key = { 'JobId': job_id }, UpdateExpression = 'set OutputPath = :output_path, KeyFile = :key_file, KeyValue = :key_value,' + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', ExpressionAttributeValues = { ':output_path': output_path, ':key_file': key_file, ':key_value': key_value, ':creation_time': creation_time, ':expiration_time': expiration_time } )) # DynamoDBにジョブID保存 Tips: フロントに最新のジョブIDを返却するのに使用 print(table_to_job_id.update_item( Key = { 'InputFile': input_file }, UpdateExpression = 'set JobId = :job_id,' + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', ExpressionAttributeValues = { ':job_id': job_id, ':creation_time': creation_time, ':expiration_time': expiration_time } ))
対象のS3のtest_exception/以下に元素材をアップロードして、
暫く待つとメールが届きます。(リトライオーバー後)
receiveMediaConvertJobState
EventBridge(MediaConvert)トリガーなので、complete時に取得している出力パスの先頭で判断するようにしました。
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/receiveMediaConvertJobState/lambda_function.py
import boto3 import os import datetime import codecs import json import re dynamodb = boto3.resource('dynamodb') table_to_status = dynamodb.Table('MediaConvertJobIdToStatus') table_to_output = dynamodb.Table('MediaConvertJobIdToOutput') s3Client = boto3.client('s3') OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET'] INFO_FILE = 'info.json' + TEST_EXCEPTION_PATH = 'test_exception_state/' + + class TestException(Exception): + pass def lambda_handler(event, context): print(event) status = event['detail']['status'] if status == 'PROGRESSING' or status == 'INPUT_INFORMATION': return utcnow = datetime.datetime.utcnow() creation_time = int(utcnow.timestamp()) expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp()) if status == 'STATUS_UPDATE': status_update(event, creation_time, expiration_time) elif status == 'COMPLETE': complete(event, creation_time, expiration_time) elif status == 'ERROR': error(event, creation_time, expiration_time) def status_update(event, creation_time, expiration_time): progress_rate = event['detail']['jobProgress']['jobPercentComplete'] print({ 'progress_rate': progress_rate }) # DynamoDBにステータス保存 Tips: フロントに進捗を返却するのに使用 print(table_to_status.update_item( Key = { 'JobId': event['detail']['jobId'] }, UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,' + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', ExpressionAttributeValues = { ':job_status': 'PROGRESSING', ':progress_rate': progress_rate, ':creation_time': creation_time, ':expiration_time': expiration_time } )) def complete(event, creation_time, expiration_time): duration_ms = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['durationInMs'] width_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['widthInPx'] height_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['heightInPx'] playlists = {} for playlist_file_path in event['detail']['outputGroupDetails'][0]['playlistFilePaths']: playlists[os.path.splitext(playlist_file_path)[1]] = os.path.basename(playlist_file_path) print({ 'duration_ms': duration_ms, 'width_px': width_px, 'height_px': height_px, 'playlists': playlists }) # DynamoDBから出力情報取得 response = table_to_output.get_item( Key = { 'JobId': event['detail']['jobId'] } ) print(response) output_path = response['Item']['OutputPath'] + # 通知テストの為のエラーパターン + if output_path.startswith(TEST_EXCEPTION_PATH): + raise TestException('Test exception') + # S3にkeyファイル作成 print(s3Client.put_object( Bucket = OUTPUT_BUCKET, Key = output_path + response['Item']['KeyFile'], Body = codecs.decode(response['Item']['KeyValue'], 'hex_codec') )) # DynamoDBにステータスと動画の情報保存 Tips: フロントに成功を返却するのに使用 print(table_to_status.update_item( Key = { 'JobId': event['detail']['jobId'] }, UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,' + ' DurationMs = :duration_ms, WidthPx = :width_px, HeightPx = :height_px, OutputPath = :output_path, Playlists = :playlists,' + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', ExpressionAttributeValues = { ':job_status': 'COMPLETE', ':progress_rate': 100, ':duration_ms': duration_ms, ':width_px': width_px, ':height_px': height_px, ':output_path': output_path, ':playlists': playlists, ':creation_time': creation_time, ':expiration_time': expiration_time } )) # S3に情報ファイル作成 Tips: 動画の情報を永続化。DB等に保存して配信管理した方が良い print(s3Client.put_object( Bucket = OUTPUT_BUCKET, Key = output_path + INFO_FILE, Body = json.dumps({ 'duration_ms': duration_ms, 'width_px': width_px, 'height_px': height_px, 'playlists': playlists }) )) def error(event, creation_time, expiration_time): error_code = event['detail']['errorCode'] error_message = re.sub('s3://[a-z0-9.-]*/', '', event['detail']['errorMessage']) print({ 'error_code': error_code, 'error_message': error_message }) # DynamoDBにステータスとエラー情報保存 Tips: フロントにエラーを返却するのに使用 print(table_to_status.update_item( Key = { 'JobId': event['detail']['jobId'] }, UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,' + ' ErrorCode = :error_code, ErrorMessage = :error_message,' + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', ExpressionAttributeValues = { ':job_status': 'ERROR', ':progress_rate': 100, ':error_code': error_code, ':error_message': error_message, ':creation_time': creation_time, ':expiration_time': expiration_time } ))
対象のS3のtest_exception_state/以下に元素材をアップロードして、
暫く待つとメールが届きます。(リトライオーバー後)
サブスクリプションフィルターを設定
同期(APIなど)のLambdaのCloudWatchロググループに設定します。
通知用のLambda関数を作成
https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/create/function
関数の作成 基本的な情報 関数名: notifyErrorLog ランタイム: Python 3.9 [関数の作成]
コード
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/notifyErrorLog/lambda_function.py
import boto3 import os import base64 import gzip import json sns = boto3.client('sns') SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN'] SNS_SUBJECT_PREFIX = '[WARNING]notifyErrorLog report for ' PREFIX_FILTER = '[ERROR] ' AWS_REGION = os.getenv("AWS_REGION") def lambda_handler(event, context): print(event) messages = [] data = json.loads(gzip.decompress(base64.b64decode(event['awslogs']['data']))) for log_event in data['logEvents']: if log_event['message'].startswith(PREFIX_FILTER): messages.append(log_event['message'].replace('\u00a0', ' ').split('\n')) if messages == []: return log_group = data['logGroup'] body = { 'owner': data['owner'], 'region': AWS_REGION, 'logGroup': log_group, 'logStream': data['logStream'], 'message': messages } print(body) print(sns.publish( TopicArn = SNS_TOPIC_ARN, Subject = SNS_SUBJECT_PREFIX + log_group, Message = json.dumps(body) ))
環境変数
[設定] → [環境変数] → 編集 環境変数の編集 環境変数 [環境変数の追加] キー: SNS_TOPIC_ARN 値: (作成したSNSトピックのARN) [保存]
送信先を追加
この関数も非同期なので、Lambdaの送信先(失敗時)を設定 と同様の設定をします。
[設定] → [送信先] → 送信先を追加 送信先を追加 送信先の設定 送信先: LambdaError [保存]
CloudWatchのサブスクリプションフィルターを設定
対象のロググループを選択 [サブスクリプションフィルター] → [作成] → Lambda サブスクリプションフィルターを作成 Lambda サブスクリプションフィルターを作成 送信先を選択 Lambda 関数: notifyErrorLog ログ形式とフィルターを設定 サブスクリプションフィルターのパターン: "[ERROR] " ※入力値を出力していると一致させられてしまうけど、先頭一致の設定が出来ない為、Lambda側で弾く サブスクリプションフィルター名: ERROR 〜ここからは任意〜 パターンをテスト イベントメッセージをログ記録 [ERROR] NameError: name 'xxx' is not defined Traceback (most recent call last): {'input_file': '[ERROR] '} [パターンをテスト] 〜ここまでは任意〜 [ストリーミングを開始]
動作確認
同期なので、テストイベントで送信できますが、
Lambda@Edgeではログの保存場所がテストとCloudFront経由で違うのと、
定期的なテストを容易に行えるように、URLパラメータで判断するようにしました。
どこからでも叩けるので、対象の値は環境毎に変えた方が良いです。
API GatewayまたはApplication Load Balancer呼び出しのLambda
最新のコードこちら
mediaConvertInputFileToJobIdApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertInputFileToJobIdApi/lambda_function.py
mediaConvertJobIdToStatusApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertJobIdToStatusApi/lambda_function.py
変更箇所のみ記載
+ TEST_EXCEPTION_KEY = 'test_exception' + TEST_EXCEPTION_VALUE = '12345' # Tips: 適当な値に変更してください + class TestException(Exception): + pass + def lambda_handler(event, context): print(event) + + # 通知テストの為のエラーパターン + if ('queryStringParameters' in event and + TEST_EXCEPTION_KEY in event['queryStringParameters'] and + event['queryStringParameters'][TEST_EXCEPTION_KEY] == TEST_EXCEPTION_VALUE): + raise TestException('Test exception')
対象のURLに「?test_exception=12345」を追加してリクエスト、
暫く待つとメールが届きます。(ログ出力がトリガーなので早い)
CloudFront(Lambda@Edge)
最新のコードこちら
mediaConvertInputFileToJobIdEdgeApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertInputFileToJobIdEdgeApi/lambda_function.py
mediaConvertJobIdToStatusEdgeApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertJobIdToStatusEdgeApi/lambda_function.py
変更箇所のみ記載
+ TEST_EXCEPTION_KEY = 'test_exception' + TEST_EXCEPTION_VALUE = '12345' # Tips: 適当な値に変更してください + class TestException(Exception): + pass + def lambda_handler(event, context): print(event) + query = urllib.parse.parse_qs(event['Records'][0]['cf']['request']['querystring']) + + # 通知テストの為のエラーパターン + if (query != {} and + TEST_EXCEPTION_KEY in query and + query[TEST_EXCEPTION_KEY][0] == TEST_EXCEPTION_VALUE): + raise TestException('Test exception')
同様に対象のURLに「?test_exception=12345」を追加してリクエスト、
暫く待つとメールが届きます。(ログ出力がトリガーなので早い)
Lambda@Edgeは、応答したキャッシュサーバーのリージョンのCloudWatch Logsに出力されるので、それぞれに設定するか、リクエストの多い所+テストのURLを叩く所だけにするとかでも良さそう。
動作確認(通知用Lambdaがエラーの場合)
ここがエラーになると通知されないので、エラーになるようにして試してみました。
方法はいくつかありますが、今回はnotifyErrorLogの環境変数(SNS_TOPIC_ARN)を変更(後ろに[-test]追加)して、SNSへのpublishでエラーになるようにしました。
戻し忘れもないようにと。
上で動作確認したURLにリクエストして、暫く待つとメールが届きます。(リトライオーバー後)
“Lambdaの例外エラーをメールとSlackに通知する” に対して3件のコメントがあります。