Lambdaの例外エラーの通知方法を考える で決めた非同期(S3トリガーやEventBridgeなど)の「パターン3. 送信先(失敗時)」 と同期(APIなど)の「パターン5. サブスクリプションフィルター」の設定をメモしておきます。
構成
Lambda【例外発生!】 → 【非同期の場合】送信先(失敗時) → 【同期の場合】CloudWatchのサブスクリプションフィルター → Lambda(通知用) → SNS → Email→ Chatbot(グローバル) → Slack→ Lambda(Slack通知用) → Slack
通知用のLambdaを使わずにメトリクスフィルター+CloudWatch AlarmからSNSに繋げると実装不要ですが、エラー内容をメールやSlackに記載できないので、サブスクリプションフィルターを使用します。
また、フィルターは正規表現に対応していないので、通知したくないものも通知されてしまうので、通知用のLambdaで弾きます。
LambdaからIncomming WebhookやSDKで、直接Slackに投げても良いけど、
SNSは月100万リクエスト無料なのと、Chatbotは追加料金が掛からないので、
LambdaはSNSに投げる事に集中して、通知はSNSに任せてしまうのが良さそう。
Event received is not supported (see https://docs.aws.amazon.com/chatbot/latest/adminguide/related-services.html ):
CloudWatchロググループ us-east-1:/aws/chatbot/warningに、上記エラーが出てSlackには通知されず。(メール通知はされる)
LambdaからSNSに入れたものはChatbotが受け取ってくれない。
障害ポイント増えそうだけど、SNSからLambdaで直接Slackに投げるしかなさそう。
→ エラーをSNS経由でSlackに通知する
APIリクエスト: 毎月最初の 100 万 Amazon SNS リクエストは無料です。
通知配信: Email/Email-JSON -> 無料利用枠: 1,000 件の通知
AWS Chatbot に追加料金はかかりません。
SNSトピックを作成
通知の為のSNSトピックと通知先のサブスクリプションを設定します。
https://ap-northeast-1.console.aws.amazon.com/sns/v3/home?region=ap-northeast-1#/create-topic
トピックの作成 詳細 タイプ: ●スタンダード 名前: LambdaError [トピックの作成]


サブスクリプションを作成(メール通知)
サブスクリプションの作成 詳細 プロトコル: Eメール エンドポイント: (通知先のメールアドレス) [サブスクリプションの作成]

メール認証
入力したメールアドレスに認証のメールが届くので、リンクにアクセスして有効にします。
AWS Notification - Subscription Confirmation [Confirm subscription]


Chatbotとサブスクリプションを作成(Slack通知)
ChatbotとSlackを連携されます。
できれば個人アカウントでなく、管理用のアカウントがあればそれを使った方が良さそうです。
アカウント削除したら通知されなくなりそう。(退職時あるある)
Lambdaの送信先(失敗時)を設定
非同期(S3トリガーやEventBridgeなど)のLambda関数に設定します。
https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/functions
対象の関数を選択 [設定] → [送信先] → 送信先を追加 送信先を追加 送信先の設定 送信先: LambdaError [保存]


IAMロールにポリシー(sns:Publish)が自動作成&追加されますが、
関数毎にポリシーが乱立するので、予め作成して追加しておいても良さそう。
エラーが出る場合もポリシーを予め追加しておけばOK
> The provided execution role does not have permissions to call Publish on SNS
> Role XXXXXXXXXXXXXXXXXXXXX trusts too many services, expected only 1.
AWSLambdaSNSTopicDestinationExecutionRole_LambdaError
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sns:Publish",
"Resource": "arn:aws:sns:*:*:LambdaError"
}
]
}
動作確認
非同期なので、テストイベントでは送信されません。
本番稼働前やテスト環境ならコードを壊して例外エラーにするでも良いですが、定期的に確認できるようにしたいので、エラーになるパターンをコードに仕込んでみました。
createMediaConvertJob
S3トリガーなので、設置パスの先頭で判断するようにしました。
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/createMediaConvertJob/lambda_function.py
import boto3
import os
import datetime
import urllib.parse
import secrets
dynamodb = boto3.resource('dynamodb')
table_to_output = dynamodb.Table('MediaConvertJobIdToOutput')
table_to_job_id = dynamodb.Table('MediaConvertInputFileToJobId')
OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET']
MEDIACONVERT_ENDPOINT_URL = os.environ['MEDIACONVERT_ENDPOINT_URL']
MEDIACONVERT_ROLE = os.environ['MEDIACONVERT_ROLE']
MEDIACONVERT_JOB_TEMPLATE = os.environ['MEDIACONVERT_JOB_TEMPLATE']
OUTPUT_SUFFIX_PATH = '_' + datetime.datetime.now().strftime('%Y%m%d%H%M%S')
+ TEST_EXCEPTION_PATH = 'test_exception/'
+
+ class TestException(Exception):
+ pass
def lambda_handler(event, context):
print(event)
input_bucket = 's3://' + event['Records'][0]['s3']['bucket']['name'] + '/' # 対象動画のバケット
input_file = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key']) # 対象動画のフォルダを含むファイル名
output_path = input_file + OUTPUT_SUFFIX_PATH + '/'
key_file = os.path.splitext(os.path.basename(input_file))[0] + '.key'
print({ 'input_bucket': input_bucket, 'input_file': input_file, 'output_path': output_path, 'key_file': key_file })
+ # 通知テストの為のエラーパターン
+ if input_file.startswith(TEST_EXCEPTION_PATH):
+ raise TestException('Test exception')
+
# MediaConvertパラメータ
key_value = secrets.token_hex(16)
settings = {
'Inputs': [
{
'FileInput': input_bucket + input_file
}
],
'OutputGroups': [
{
'OutputGroupSettings': {
'HlsGroupSettings': {
'Destination': 's3://' + OUTPUT_BUCKET + '/' + output_path,
'Encryption': {
'EncryptionMethod': 'AES128',
'StaticKeyProvider': {
'StaticKeyValue': key_value,
'Url': os.path.basename(output_path + key_file)
},
'Type': 'STATIC_KEY'
}
}
}
}
]
}
print(settings)
# MediaConvertジョブ作成
mediaconvertClient = boto3.client('mediaconvert', endpoint_url = MEDIACONVERT_ENDPOINT_URL)
result = mediaconvertClient.create_job(
Role = MEDIACONVERT_ROLE,
JobTemplate = MEDIACONVERT_JOB_TEMPLATE,
Settings = settings
)
print(result)
job_id = result['Job']['Id']
if job_id == '':
raise Exception('Notfound job_id')
# DynamoDBに出力情報保存 Tips: Keyファイル作成や保存先を連携するのに使用
utcnow = datetime.datetime.utcnow()
creation_time = int(utcnow.timestamp())
expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp())
print(table_to_output.update_item(
Key = {
'JobId': job_id
},
UpdateExpression = 'set OutputPath = :output_path, KeyFile = :key_file, KeyValue = :key_value,'
+ ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
ExpressionAttributeValues = {
':output_path': output_path,
':key_file': key_file,
':key_value': key_value,
':creation_time': creation_time,
':expiration_time': expiration_time
}
))
# DynamoDBにジョブID保存 Tips: フロントに最新のジョブIDを返却するのに使用
print(table_to_job_id.update_item(
Key = {
'InputFile': input_file
},
UpdateExpression = 'set JobId = :job_id,'
+ ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
ExpressionAttributeValues = {
':job_id': job_id,
':creation_time': creation_time,
':expiration_time': expiration_time
}
))
対象のS3のtest_exception/以下に元素材をアップロードして、
暫く待つとメールが届きます。(リトライオーバー後)

receiveMediaConvertJobState
EventBridge(MediaConvert)トリガーなので、complete時に取得している出力パスの先頭で判断するようにしました。
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/receiveMediaConvertJobState/lambda_function.py
import boto3
import os
import datetime
import codecs
import json
import re
dynamodb = boto3.resource('dynamodb')
table_to_status = dynamodb.Table('MediaConvertJobIdToStatus')
table_to_output = dynamodb.Table('MediaConvertJobIdToOutput')
s3Client = boto3.client('s3')
OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET']
INFO_FILE = 'info.json'
+ TEST_EXCEPTION_PATH = 'test_exception_state/'
+
+ class TestException(Exception):
+ pass
def lambda_handler(event, context):
print(event)
status = event['detail']['status']
if status == 'PROGRESSING' or status == 'INPUT_INFORMATION':
return
utcnow = datetime.datetime.utcnow()
creation_time = int(utcnow.timestamp())
expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp())
if status == 'STATUS_UPDATE':
status_update(event, creation_time, expiration_time)
elif status == 'COMPLETE':
complete(event, creation_time, expiration_time)
elif status == 'ERROR':
error(event, creation_time, expiration_time)
def status_update(event, creation_time, expiration_time):
progress_rate = event['detail']['jobProgress']['jobPercentComplete']
print({ 'progress_rate': progress_rate })
# DynamoDBにステータス保存 Tips: フロントに進捗を返却するのに使用
print(table_to_status.update_item(
Key = {
'JobId': event['detail']['jobId']
},
UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
+ ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
ExpressionAttributeValues = {
':job_status': 'PROGRESSING',
':progress_rate': progress_rate,
':creation_time': creation_time,
':expiration_time': expiration_time
}
))
def complete(event, creation_time, expiration_time):
duration_ms = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['durationInMs']
width_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['widthInPx']
height_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['heightInPx']
playlists = {}
for playlist_file_path in event['detail']['outputGroupDetails'][0]['playlistFilePaths']:
playlists[os.path.splitext(playlist_file_path)[1]] = os.path.basename(playlist_file_path)
print({ 'duration_ms': duration_ms, 'width_px': width_px, 'height_px': height_px, 'playlists': playlists })
# DynamoDBから出力情報取得
response = table_to_output.get_item(
Key = {
'JobId': event['detail']['jobId']
}
)
print(response)
output_path = response['Item']['OutputPath']
+ # 通知テストの為のエラーパターン
+ if output_path.startswith(TEST_EXCEPTION_PATH):
+ raise TestException('Test exception')
+
# S3にkeyファイル作成
print(s3Client.put_object(
Bucket = OUTPUT_BUCKET,
Key = output_path + response['Item']['KeyFile'],
Body = codecs.decode(response['Item']['KeyValue'], 'hex_codec')
))
# DynamoDBにステータスと動画の情報保存 Tips: フロントに成功を返却するのに使用
print(table_to_status.update_item(
Key = {
'JobId': event['detail']['jobId']
},
UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
+ ' DurationMs = :duration_ms, WidthPx = :width_px, HeightPx = :height_px, OutputPath = :output_path, Playlists = :playlists,'
+ ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
ExpressionAttributeValues = {
':job_status': 'COMPLETE',
':progress_rate': 100,
':duration_ms': duration_ms,
':width_px': width_px,
':height_px': height_px,
':output_path': output_path,
':playlists': playlists,
':creation_time': creation_time,
':expiration_time': expiration_time
}
))
# S3に情報ファイル作成 Tips: 動画の情報を永続化。DB等に保存して配信管理した方が良い
print(s3Client.put_object(
Bucket = OUTPUT_BUCKET,
Key = output_path + INFO_FILE,
Body = json.dumps({
'duration_ms': duration_ms,
'width_px': width_px,
'height_px': height_px,
'playlists': playlists
})
))
def error(event, creation_time, expiration_time):
error_code = event['detail']['errorCode']
error_message = re.sub('s3://[a-z0-9.-]*/', '', event['detail']['errorMessage'])
print({ 'error_code': error_code, 'error_message': error_message })
# DynamoDBにステータスとエラー情報保存 Tips: フロントにエラーを返却するのに使用
print(table_to_status.update_item(
Key = {
'JobId': event['detail']['jobId']
},
UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
+ ' ErrorCode = :error_code, ErrorMessage = :error_message,'
+ ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
ExpressionAttributeValues = {
':job_status': 'ERROR',
':progress_rate': 100,
':error_code': error_code,
':error_message': error_message,
':creation_time': creation_time,
':expiration_time': expiration_time
}
))
対象のS3のtest_exception_state/以下に元素材をアップロードして、
暫く待つとメールが届きます。(リトライオーバー後)

サブスクリプションフィルターを設定
同期(APIなど)のLambdaのCloudWatchロググループに設定します。
通知用のLambda関数を作成
https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/create/function
関数の作成 基本的な情報 関数名: notifyErrorLog ランタイム: Python 3.9 [関数の作成]
コード
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/notifyErrorLog/lambda_function.py
import boto3
import os
import base64
import gzip
import json
sns = boto3.client('sns')
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
SNS_SUBJECT_PREFIX = '[WARNING]notifyErrorLog report for '
PREFIX_FILTER = '[ERROR] '
AWS_REGION = os.getenv("AWS_REGION")
def lambda_handler(event, context):
print(event)
messages = []
data = json.loads(gzip.decompress(base64.b64decode(event['awslogs']['data'])))
for log_event in data['logEvents']:
if log_event['message'].startswith(PREFIX_FILTER):
messages.append(log_event['message'].replace('\u00a0', ' ').split('\n'))
if messages == []:
return
log_group = data['logGroup']
body = { 'owner': data['owner'], 'region': AWS_REGION, 'logGroup': log_group, 'logStream': data['logStream'], 'message': messages }
print(body)
print(sns.publish(
TopicArn = SNS_TOPIC_ARN,
Subject = SNS_SUBJECT_PREFIX + log_group,
Message = json.dumps(body)
))
環境変数
[設定] → [環境変数] → 編集 環境変数の編集 環境変数 [環境変数の追加] キー: SNS_TOPIC_ARN 値: (作成したSNSトピックのARN) [保存]

送信先を追加
この関数も非同期なので、Lambdaの送信先(失敗時)を設定 と同様の設定をします。
[設定] → [送信先] → 送信先を追加 送信先を追加 送信先の設定 送信先: LambdaError [保存]
CloudWatchのサブスクリプションフィルターを設定
対象のロググループを選択
[サブスクリプションフィルター] → [作成] → Lambda サブスクリプションフィルターを作成
Lambda サブスクリプションフィルターを作成
送信先を選択
Lambda 関数: notifyErrorLog
ログ形式とフィルターを設定
サブスクリプションフィルターのパターン: "[ERROR] "
※入力値を出力していると一致させられてしまうけど、先頭一致の設定が出来ない為、Lambda側で弾く
サブスクリプションフィルター名: ERROR
〜ここからは任意〜
パターンをテスト
イベントメッセージをログ記録
[ERROR] NameError: name 'xxx' is not defined Traceback (most recent call last):
{'input_file': '[ERROR] '}
[パターンをテスト]
〜ここまでは任意〜
[ストリーミングを開始]



動作確認
同期なので、テストイベントで送信できますが、
Lambda@Edgeではログの保存場所がテストとCloudFront経由で違うのと、
定期的なテストを容易に行えるように、URLパラメータで判断するようにしました。
どこからでも叩けるので、対象の値は環境毎に変えた方が良いです。
API GatewayまたはApplication Load Balancer呼び出しのLambda
最新のコードこちら
mediaConvertInputFileToJobIdApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertInputFileToJobIdApi/lambda_function.py
mediaConvertJobIdToStatusApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertJobIdToStatusApi/lambda_function.py
変更箇所のみ記載
+ TEST_EXCEPTION_KEY = 'test_exception'
+ TEST_EXCEPTION_VALUE = '12345' # Tips: 適当な値に変更してください
+ class TestException(Exception):
+ pass
+
def lambda_handler(event, context):
print(event)
+
+ # 通知テストの為のエラーパターン
+ if ('queryStringParameters' in event and
+ TEST_EXCEPTION_KEY in event['queryStringParameters'] and
+ event['queryStringParameters'][TEST_EXCEPTION_KEY] == TEST_EXCEPTION_VALUE):
+ raise TestException('Test exception')
対象のURLに「?test_exception=12345」を追加してリクエスト、
暫く待つとメールが届きます。(ログ出力がトリガーなので早い)

CloudFront(Lambda@Edge)
最新のコードこちら
mediaConvertInputFileToJobIdEdgeApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertInputFileToJobIdEdgeApi/lambda_function.py
mediaConvertJobIdToStatusEdgeApi → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertJobIdToStatusEdgeApi/lambda_function.py
変更箇所のみ記載
+ TEST_EXCEPTION_KEY = 'test_exception'
+ TEST_EXCEPTION_VALUE = '12345' # Tips: 適当な値に変更してください
+ class TestException(Exception):
+ pass
+
def lambda_handler(event, context):
print(event)
+ query = urllib.parse.parse_qs(event['Records'][0]['cf']['request']['querystring'])
+
+ # 通知テストの為のエラーパターン
+ if (query != {} and
+ TEST_EXCEPTION_KEY in query and
+ query[TEST_EXCEPTION_KEY][0] == TEST_EXCEPTION_VALUE):
+ raise TestException('Test exception')
同様に対象のURLに「?test_exception=12345」を追加してリクエスト、
暫く待つとメールが届きます。(ログ出力がトリガーなので早い)

Lambda@Edgeは、応答したキャッシュサーバーのリージョンのCloudWatch Logsに出力されるので、それぞれに設定するか、リクエストの多い所+テストのURLを叩く所だけにするとかでも良さそう。
動作確認(通知用Lambdaがエラーの場合)
ここがエラーになると通知されないので、エラーになるようにして試してみました。
方法はいくつかありますが、今回はnotifyErrorLogの環境変数(SNS_TOPIC_ARN)を変更(後ろに[-test]追加)して、SNSへのpublishでエラーになるようにしました。
戻し忘れもないようにと。
上で動作確認したURLにリクエストして、暫く待つとメールが届きます。(リトライオーバー後)


“Lambdaの例外エラーをメールとSlackに通知する” に対して3件のコメントがあります。