EventBridgeでMediaConvertの進捗率を取得する でDynamoDBに保存したステータス等の情報をAPIで返却できるようにします。
サーバーレスの選択肢としては、CloudFront Functionsもありますが、DynamoDBにアクセスできないので、API Gateway+Lambda, ALB+Lambda, CloudFront+Lambda@Edgeのいずれかになると思いますが、今回はAPI Gatewayで構築します。
ALB+LambdaでDynamoDB(MediaConvertの情報)を返す
- 構成と流れ
- Lambda関数を作成
- APIを作成
- カスタムドメイン名を設定
- DNS設定
- APIマッピング
- デフォルトのエンドポイント無効化
- IAMロールにポリシー追加
- Lambdaコード
- デプロイしてテスト・動作確認
- ログ記録(必要に応じて)
構成と流れ
S3に元素材をアップロード ※フロントからアップロードする想定
↓
Lambda(createMediaConvertJob) ※作成済み → S3設置トリガーで自動変換と、MediaConvertでCMAFを暗号化(未解決)してみる
↓
MediaConvert(Job State Change) → EventBridge → Lambda → DynamoDB ※ここまでは作成済み → EventBridgeでMediaConvertの進捗率を取得する
アップロードしたURLでステータス確認 ※フロントで叩く想定のAPI
↓
API Gateway → Lambda → DynamoDB
最新のジョブIDを取得 → https://api-gw.nightonly.com/media/job_id?input_file=【アップロードしたファイルのパス】 ステータスや進捗率を取得 → https://api-gw.nightonly.com/media/status?job_id=【上で取得したジョブID】
Lambda関数を作成
API Gateway設定の為、先に関数のみ作成します。
https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/create/function
関数の作成 基本的な情報 関数名: mediaConvertInputFileToJobIdApi ランタイム: Python 3.9 ▼デフォルトの実行ロールの変更 実行ロール: ●AWS ポリシーテンプレートから新しいロールを作成 ロール名: mediaConvertApi [関数の作成]
関数の作成 基本的な情報 関数名: mediaConvertJobIdToStatusApi ランタイム: Python 3.9 ▼デフォルトの実行ロールの変更 実行ロール: ●既存のロールを使用する ロール名: service-role/mediaConvertApi [関数の作成]
2つ目の関数のログが出力されないので、IAMロールのポリシーを変更する必要があります。
→ CloudWatchにログが出力されなくなったので調べてみた
APIを作成
https://ap-northeast-1.console.aws.amazon.com/apigateway/main/precreate?region=ap-northeast-1
APIタイプを選択 HTTP API [構築]

統合を作成して設定 統合 Lambda Lambda関数: arn:aws:lambda:ap-northeast-1:xxxxxxxx:function:mediaConvertInputFileToJobIdApi [統合を追加] Lambda Lambda関数: arn:aws:lambda:ap-northeast-1:xxxxxxxx:function:mediaConvertJobIdToStatusApi API名 mediaConvertStatusApi [次へ]


ルートを設定 リソースパス /media/job_id /media/status [次へ]

ステージを設定 [次へ]

確認して作成 [作成]

カスタムドメイン名を設定
ドメイン名を作成 ドメインの詳細 ドメイン名: api-gw.nightonly.com ※使用するサブドメイン名 エンドポイント設定 ACM証明書: *.nightonly.com ※作成済み or 作成して選択 [ドメイン名を作成]


DNS設定
Route53または使用しているDNSサーバーに設定。
CNAME api-gw.nightonly.com d-30c8ln0208.execute-api.ap-northeast-1.amazonaws.com

動作確認
% nslookup api-gw.nightonly.com api-gw.nightonly.com canonical name = d-30c8ln0208.execute-api.ap-northeast-1.amazonaws.com. Name: d-30c8ln0208.execute-api.ap-northeast-1.amazonaws.com Address: 52.68.212.161 Name: d-30c8ln0208.execute-api.ap-northeast-1.amazonaws.com Address: 54.238.8.145
APIマッピング

APIマッピングを設定 APIマッピング [新しいマッピングを追加] API: mediaConvertStatusApi ステージ: $default [保存]

動作確認
https://api-gw.nightonly.com/
{"message":"Not Found"}
https://api-gw.nightonly.com/media/job_id
https://api-gw.nightonly.com/media/status
"Hello from Lambda!"
デフォルトのエンドポイント無効化
AWSのドメインでもアクセス出来てしますので、無効にしておきます。
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/
{"message":"Not Found"}
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/media/job_id
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/media/status
"Hello from Lambda!"

APIの詳細を編集 デフォルトのエンドポイント ●無効 [保存]

動作確認
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/media/job_id
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/media/status
{"message":"Not Found"}
IAMロールにポリシー追加
LambdaからDynamoDBにアクセスできるようにIAMにポリシーを追加します。
https://us-east-1.console.aws.amazon.com/iamv2/home#/roles
作成したロール(mediaConvertApi)を選択して、アクセス許可を追加 → ポリシーをアタッチ
ポリシー名 ■AmazonDynamoDBReadOnlyAccess [ポリシーをアタッチ]
TODO: DynamoDBのテーブル名で権限を絞る
→ Amazon DynamoDB: 特定のテーブルへのアクセスの許可 – AWS Identity and Access Management
Lambdaコード
mediaConvertInputFileToJobIdApi
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertInputFileToJobIdApi/lambda_function.py
import boto3
import os
import re
import json
dynamodb = boto3.resource('dynamodb')
table_to_job_id = dynamodb.Table('MediaConvertInputFileToJobId')
ALLOWED_ORIGINS = os.environ['ALLOWED_ORIGINS']
headers = { # Tips: API GatewayのCORS設定が効かない為
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': '*', # Tips: ChromeでCORSエラーになる為
'Access-Control-Max-Age': 7200,
'Content-Type': 'application/json; charset=UTF-8'
}
def lambda_handler(event, context):
print(event)
# メソッドチェック
method = event['requestContext']['http']['method']
if method == 'OPTIONS':
return option_response()
elif method != 'GET':
return error_response(404, 'No route matches.')
# オリジンチェク # Tips: 存在する場合のみ
if 'origin' in event['headers']:
origin = event['headers']['origin']
if origin != '' and origin != None:
allowd_origins = ALLOWED_ORIGINS.split()
print({ 'origin': origin, 'allowd_origins': allowd_origins })
allowd = False
for allowd_origin in allowd_origins:
if re.fullmatch(allowd_origin, origin):
allowd = True
break
if not allowd:
return error_response(422, 'ORIGIN is invalid.')
# パラメータチェック
if 'queryStringParameters' not in event:
return error_response(400, 'Not found parameter.')
elif 'input_file' not in event['queryStringParameters']:
return error_response(400, 'Not found parameter input_file.')
input_file = event['queryStringParameters']['input_file']
print({ 'input_file': input_file })
if input_file == '' or input_file == None:
return error_response(422, 'Not exist parameter input_file.')
# DynamoDBから出力情報取得
response = table_to_job_id.get_item(
Key = {
'InputFile': input_file
}
)
print(response)
if 'Item' not in response:
return error_response(422, 'Not found item.')
return success_response(response['Item']['JobId'])
def option_response():
return { 'statusCode': 200, 'headers': headers }
def error_response(code, message):
return { 'statusCode': code, 'headers': headers, 'body': json.dumps({ 'success': False, 'message': message }) }
def success_response(job_id):
return { 'statusCode': 200, 'headers': headers, 'body': json.dumps({ 'success': True, 'job_id': job_id }) }
bodyをjson.dumpsしなくてもテストは通るけど、API Gateway経由だと500エラーになります。
{"message":"Internal Server Error"}
mediaConvertJobIdToStatusApi
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertJobIdToStatusApi/lambda_function.py
import boto3
import os
import re
import json
dynamodb = boto3.resource('dynamodb')
table_to_status = dynamodb.Table('MediaConvertJobIdToStatus')
ALLOWED_ORIGINS = os.environ['ALLOWED_ORIGINS']
headers = { # Tips: API GatewayのCORS設定が効かない為
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': '*', # Tips: ChromeでCORSエラーになる為
'Access-Control-Max-Age': 7200,
'Content-Type': 'application/json; charset=UTF-8'
}
def lambda_handler(event, context):
print(event)
# メソッドチェック
method = event['requestContext']['http']['method']
if method == 'OPTIONS':
return option_response()
elif method != 'GET':
return error_response(404, 'No route matches.')
# オリジンチェク # Tips: 存在する場合のみ
if 'origin' in event['headers']:
origin = event['headers']['origin']
if origin != '' and origin != None:
allowd_origins = ALLOWED_ORIGINS.split()
print({ 'origin': origin, 'allowd_origins': allowd_origins })
allowd = False
for allowd_origin in allowd_origins:
if re.fullmatch(allowd_origin, origin):
allowd = True
break
if not allowd:
return error_response(422, 'ORIGIN is invalid.')
# パラメータチェック
if 'queryStringParameters' not in event:
return error_response(400, 'Not found parameter.')
elif 'job_id' not in event['queryStringParameters']:
return error_response(400, 'Not found parameter job_id.')
job_id = event['queryStringParameters']['job_id']
print({ 'job_id': job_id })
if job_id == '' or job_id == None:
return error_response(422, 'Not exist parameter job_id.')
# DynamoDBからステータス取得
response = table_to_status.get_item(
Key = {
'JobId': job_id
}
)
print(response)
if 'Item' not in response:
return error_response(422, 'Not found item.')
return success_response(response['Item'])
def option_response():
return { 'statusCode': 200, 'headers': headers }
def error_response(code, message):
return { 'statusCode': code, 'headers': headers, 'body': json.dumps({ 'success': False, 'message': message }) }
def success_response(item):
job_status = item['JobStatus']
result = { 'success': True, 'job_status': job_status, 'progress_rate': int(item['ProgressRate']) }
info = {}
if job_status == 'COMPLETE':
info = { 'duration_ms': int(item['DurationMs']), 'width_px': int(item['WidthPx']), 'height_px': int(item['HeightPx']),
'output_path': item['OutputPath'], 'playlists': item['Playlists'] }
elif job_status == 'ERROR':
info = { 'error_code': int(item['ErrorCode']), 'error_message': item['ErrorMessage'] }
return { 'statusCode': 200, 'headers': headers, 'body': json.dumps({**result, **info}) }
json.dumps使うと、DynamoDBから取得した数値がエラーになるので、対象の値をintに変換しています。
Response
{
"errorMessage": "Object of type Decimal is not JSON serializable",
"errorType": "TypeError",
環境変数
両方のLambda関数に設定。
テスト用にlocalhostも入れていますが、本番では入れない。
手順は、MediaConvertでHLSをAESで暗号化してみる 参照

ALLOWED_ORIGINS: 許可するオリジンのURL指定(正規表現、複数設定する場合はスペース区切り)
デプロイしてテスト・動作確認
DynamoDBに値が入っている前提なので、事前に変換して試しています。
・https://api-gw.nightonly.com/media/job_id?input_file=Big%20Buck%20Bunny.mp4
{"success": true, "job_id": "c123456789012-smb6o7"}
・https://api-gw.nightonly.com/media/status?job_id=c123456789012-smb6o7
{"success": true, "job_status": "COMPLETE", "progress_rate": 100, "duration_ms": 596458, "width_px": 1280, "height_px": 720, "output_path": "Big Buck Bunny.mp4_test/", "playlists": {".m3u8": "Big Buck Bunny.m3u8"}}
・https://api-gw.nightonly.com/media/status?job_id=p123456789012-smb6o7
{"success": true, "job_status": "PROGRESSING", "progress_rate": 70}
・https://api-gw.nightonly.com/media/status?job_id=e123456789012-smb6o7
{"success": true, "job_status": "ERROR", "progress_rate": 100, "error_code": 1010, "error_message": "Unable to open input file [Big Buck Bunny.mp4]: [Failed probe/open: [The version of the manifest file at this URL is more recent than we support: [Big Buck Bunny.mp4]. We support versions up to [4].]]"}
mediaConvertInputFileToJobIdApiのテストイベント
デバッグで使ったテストイベントを記載しておきます。
OK
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
"origin": "https://test.nightonly.com"
},
"queryStringParameters": {
"input_file": "Big Buck Bunny.mp4"
}
}
diff → Not found parameter input_file.
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
},
"queryStringParameters": {
"diff": "Big Buck Bunny.mp4"
}
}
blank → Not exist parameter input_file.
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
},
"queryStringParameters": {
"input_file": ""
}
}
null → Not exist parameter input_file.
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
},
"queryStringParameters": {
"input_file": null
}
}
mediaConvertJobIdToStatusApiのテストイベント
OK
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
"origin": "https://test.nightonly.com"
},
"queryStringParameters": {
"job_id": "123456789012-smb6o7"
}
}
diff → Not found parameter job_id.
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
},
"queryStringParameters": {
"diff": "123456789012-smb6o7"
}
}
blank → Not exist parameter job_id.
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
},
"queryStringParameters": {
"job_id": ""
}
}
null → Not exist parameter input_file.
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
},
"queryStringParameters": {
"job_id": null
}
}
共通のテストイベント
OPTIONS
{
"requestContext": {
"http": {
"method": "OPTIONS"
}
}
}
not → Not found parameter.
{
"requestContext": {
"http": {
"method": "GET"
}
},
"headers": {
}
}
ログ記録(必要に応じて)
CloudWatchにロググループを作成してARNを設定すればOK。
ログの形式はCLFにしました。
TODO: s3に出力して永続化


“API Gateway+LambdaでDynamoDB(MediaConvertの情報)を返す” に対して4件のコメントがあります。