EventBridgeでMediaConvertの進捗率を取得する でDynamoDBに保存したステータス等の情報をAPIで返却できるようにします。
サーバーレスの選択肢としては、CloudFront Functionsもありますが、DynamoDBにアクセスできないので、API Gateway+Lambda, ALB+Lambda, CloudFront+Lambda@Edgeのいずれかになると思いますが、今回はAPI Gatewayで構築します。

ALB+LambdaでDynamoDB(MediaConvertの情報)を返す

構成と流れ

S3に元素材をアップロード ※フロントからアップロードする想定
 ↓
Lambda(createMediaConvertJob) ※作成済み → S3設置トリガーで自動変換と、MediaConvertでCMAFを暗号化(未解決)してみる
 ↓
MediaConvert(Job State Change) → EventBridge → Lambda → DynamoDB ※ここまでは作成済み → EventBridgeでMediaConvertの進捗率を取得する

アップロードしたURLでステータス確認 ※フロントで叩く想定のAPI
 ↓
API Gateway → Lambda → DynamoDB

最新のジョブIDを取得
→ https://api-gw.nightonly.com/media/job_id?input_file=【アップロードしたファイルのパス】
ステータスや進捗率を取得
→ https://api-gw.nightonly.com/media/status?job_id=【上で取得したジョブID】

Lambda関数を作成

API Gateway設定の為、先に関数のみ作成します。

https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/create/function

関数の作成
	基本的な情報
		関数名: mediaConvertInputFileToJobIdApi
		ランタイム: Python 3.9
	▼デフォルトの実行ロールの変更
		実行ロール: ●AWS ポリシーテンプレートから新しいロールを作成
		ロール名: mediaConvertApi
	[関数の作成]
関数の作成
	基本的な情報
		関数名: mediaConvertJobIdToStatusApi
		ランタイム: Python 3.9
	▼デフォルトの実行ロールの変更
		実行ロール: ●既存のロールを使用する
		ロール名: service-role/mediaConvertApi
	[関数の作成]

2つ目の関数のログが出力されないので、IAMロールのポリシーを変更する必要があります。
CloudWatchにログが出力されなくなったので調べてみた

APIを作成

https://ap-northeast-1.console.aws.amazon.com/apigateway/main/precreate?region=ap-northeast-1

APIタイプを選択
	HTTP API
		[構築]

統合を作成して設定
	統合
		Lambda
		Lambda関数: arn:aws:lambda:ap-northeast-1:xxxxxxxx:function:mediaConvertInputFileToJobIdApi
	[統合を追加]
		Lambda
		Lambda関数: arn:aws:lambda:ap-northeast-1:xxxxxxxx:function:mediaConvertJobIdToStatusApi
	API名
		mediaConvertStatusApi
	[次へ]


ルートを設定
	リソースパス
		/media/job_id
		/media/status
	[次へ]

ステージを設定
	[次へ]

確認して作成
	[作成]

カスタムドメイン名を設定

https://ap-northeast-1.console.aws.amazon.com/apigateway/main/publish/domain-names/create?region=ap-northeast-1

ドメイン名を作成
	ドメインの詳細
		ドメイン名: api-gw.nightonly.com	※使用するサブドメイン名
	エンドポイント設定
		ACM証明書: *.nightonly.com	※作成済み or 作成して選択
	[ドメイン名を作成]


DNS設定

Route53または使用しているDNSサーバーに設定。

CNAME	api-gw.nightonly.com	d-30c8ln0208.execute-api.ap-northeast-1.amazonaws.com

動作確認

% nslookup api-gw.nightonly.com
api-gw.nightonly.com	canonical name = d-30c8ln0208.execute-api.ap-northeast-1.amazonaws.com.
Name:	d-30c8ln0208.execute-api.ap-northeast-1.amazonaws.com
Address: 52.68.212.161
Name:	d-30c8ln0208.execute-api.ap-northeast-1.amazonaws.com
Address: 54.238.8.145

APIマッピング

APIマッピングを設定
	APIマッピング
		[新しいマッピングを追加]
		API: mediaConvertStatusApi
		ステージ: $default
	[保存]

動作確認

https://api-gw.nightonly.com/
{"message":"Not Found"}

https://api-gw.nightonly.com/media/job_id
https://api-gw.nightonly.com/media/status
"Hello from Lambda!"

デフォルトのエンドポイント無効化

AWSのドメインでもアクセス出来てしますので、無効にしておきます。

https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/
{"message":"Not Found"}

https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/media/job_id
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/media/status
"Hello from Lambda!"

APIの詳細を編集
	デフォルトのエンドポイント
		●無効
	[保存]

動作確認

https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/media/job_id
https://v6k2ibmifb.execute-api.ap-northeast-1.amazonaws.com/media/status
{"message":"Not Found"}

IAMロールにポリシー追加

LambdaからDynamoDBにアクセスできるようにIAMにポリシーを追加します。

https://us-east-1.console.aws.amazon.com/iamv2/home#/roles

作成したロール(mediaConvertApi)を選択して、アクセス許可を追加 → ポリシーをアタッチ

ポリシー名
	■AmazonDynamoDBReadOnlyAccess
	[ポリシーをアタッチ]

TODO: DynamoDBのテーブル名で権限を絞る
Amazon DynamoDB: 特定のテーブルへのアクセスの許可 – AWS Identity and Access Management

Lambdaコード

mediaConvertInputFileToJobIdApi

最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertInputFileToJobIdApi/lambda_function.py

import boto3
import os
import re
import json

dynamodb = boto3.resource('dynamodb')
table_to_job_id = dynamodb.Table('MediaConvertInputFileToJobId')

ALLOWED_ORIGINS = os.environ['ALLOWED_ORIGINS']

headers = { # Tips: API GatewayのCORS設定が効かない為
    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Methods': 'GET, OPTIONS',
    'Access-Control-Allow-Headers': '*', # Tips: ChromeでCORSエラーになる為
    'Access-Control-Max-Age': 7200,
    'Content-Type': 'application/json; charset=UTF-8'
}

def lambda_handler(event, context):
    print(event)

    # メソッドチェック
    method = event['requestContext']['http']['method']
    if method == 'OPTIONS':
        return option_response()
    elif method != 'GET':
        return error_response(404, 'No route matches.')

    # オリジンチェク # Tips: 存在する場合のみ
    if 'origin' in event['headers']:
        origin = event['headers']['origin']
        if origin != '' and origin != None:
            allowd_origins = ALLOWED_ORIGINS.split()
            print({ 'origin': origin, 'allowd_origins': allowd_origins })

            allowd = False
            for allowd_origin in allowd_origins:
                if re.fullmatch(allowd_origin, origin):
                    allowd = True
                    break
            if not allowd:
                return error_response(422, 'ORIGIN is invalid.')

    # パラメータチェック
    if 'queryStringParameters' not in event:
        return error_response(400, 'Not found parameter.')
    elif 'input_file' not in event['queryStringParameters']:
        return error_response(400, 'Not found parameter input_file.')

    input_file = event['queryStringParameters']['input_file']
    print({ 'input_file': input_file })

    if input_file == '' or input_file == None:
        return error_response(422, 'Not exist parameter input_file.')

    # DynamoDBから出力情報取得
    response = table_to_job_id.get_item(
         Key = {
            'InputFile': input_file
        }
    )
    print(response)

    if 'Item' not in response:
        return error_response(422, 'Not found item.')

    return success_response(response['Item']['JobId'])

def option_response():
    return { 'statusCode': 200, 'headers': headers }

def error_response(code, message):
    return { 'statusCode': code, 'headers': headers, 'body': json.dumps({ 'success': False, 'message': message }) }

def success_response(job_id):
    return { 'statusCode': 200, 'headers': headers, 'body': json.dumps({ 'success': True, 'job_id': job_id }) }

bodyをjson.dumpsしなくてもテストは通るけど、API Gateway経由だと500エラーになります。

{"message":"Internal Server Error"}

mediaConvertJobIdToStatusApi

最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/mediaConvertJobIdToStatusApi/lambda_function.py

import boto3
import os
import re
import json

dynamodb = boto3.resource('dynamodb')
table_to_status = dynamodb.Table('MediaConvertJobIdToStatus')

ALLOWED_ORIGINS = os.environ['ALLOWED_ORIGINS']

headers = { # Tips: API GatewayのCORS設定が効かない為
    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Methods': 'GET, OPTIONS',
    'Access-Control-Allow-Headers': '*', # Tips: ChromeでCORSエラーになる為
    'Access-Control-Max-Age': 7200,
    'Content-Type': 'application/json; charset=UTF-8'
}

def lambda_handler(event, context):
    print(event)

    # メソッドチェック
    method = event['requestContext']['http']['method']
    if method == 'OPTIONS':
        return option_response()
    elif method != 'GET':
        return error_response(404, 'No route matches.')

    # オリジンチェク # Tips: 存在する場合のみ
    if 'origin' in event['headers']:
        origin = event['headers']['origin']
        if origin != '' and origin != None:
            allowd_origins = ALLOWED_ORIGINS.split()
            print({ 'origin': origin, 'allowd_origins': allowd_origins })

            allowd = False
            for allowd_origin in allowd_origins:
                if re.fullmatch(allowd_origin, origin):
                    allowd = True
                    break
            if not allowd:
                return error_response(422, 'ORIGIN is invalid.')

    # パラメータチェック
    if 'queryStringParameters' not in event:
        return error_response(400, 'Not found parameter.')
    elif 'job_id' not in event['queryStringParameters']:
        return error_response(400, 'Not found parameter job_id.')

    job_id = event['queryStringParameters']['job_id']
    print({ 'job_id': job_id })

    if job_id == '' or job_id == None:
        return error_response(422, 'Not exist parameter job_id.')

    # DynamoDBからステータス取得
    response = table_to_status.get_item(
         Key = {
            'JobId': job_id
        }
    )
    print(response)

    if 'Item' not in response:
        return error_response(422, 'Not found item.')

    return success_response(response['Item'])

def option_response():
    return { 'statusCode': 200, 'headers': headers }

def error_response(code, message):
    return { 'statusCode': code, 'headers': headers, 'body': json.dumps({ 'success': False, 'message': message }) }

def success_response(item):
    job_status = item['JobStatus']
    result = { 'success': True, 'job_status': job_status, 'progress_rate': int(item['ProgressRate']) }
    info = {}
    if job_status == 'COMPLETE':
        info = { 'duration_ms': int(item['DurationMs']), 'width_px': int(item['WidthPx']), 'height_px': int(item['HeightPx']),
                 'output_path': item['OutputPath'], 'playlists': item['Playlists'] }
    elif job_status == 'ERROR':
        info = { 'error_code': int(item['ErrorCode']), 'error_message': item['ErrorMessage'] }

    return { 'statusCode': 200, 'headers': headers, 'body': json.dumps({**result, **info}) }

json.dumps使うと、DynamoDBから取得した数値がエラーになるので、対象の値をintに変換しています。

Response
{
  "errorMessage": "Object of type Decimal is not JSON serializable",
  "errorType": "TypeError",

環境変数

両方のLambda関数に設定。
テスト用にlocalhostも入れていますが、本番では入れない。

手順は、MediaConvertでHLSをAESで暗号化してみる 参照

ALLOWED_ORIGINS: 許可するオリジンのURL指定(正規表現、複数設定する場合はスペース区切り)

デプロイしてテスト・動作確認

DynamoDBに値が入っている前提なので、事前に変換して試しています。

・https://api-gw.nightonly.com/media/job_id?input_file=Big%20Buck%20Bunny.mp4
{"success": true, "job_id": "c123456789012-smb6o7"}

・https://api-gw.nightonly.com/media/status?job_id=c123456789012-smb6o7
{"success": true, "job_status": "COMPLETE", "progress_rate": 100, "duration_ms": 596458, "width_px": 1280, "height_px": 720, "output_path": "Big Buck Bunny.mp4_test/", "playlists": {".m3u8": "Big Buck Bunny.m3u8"}}
・https://api-gw.nightonly.com/media/status?job_id=p123456789012-smb6o7
{"success": true, "job_status": "PROGRESSING", "progress_rate": 70}

・https://api-gw.nightonly.com/media/status?job_id=e123456789012-smb6o7
{"success": true, "job_status": "ERROR", "progress_rate": 100, "error_code": 1010, "error_message": "Unable to open input file [Big Buck Bunny.mp4]: [Failed probe/open: [The version of the manifest file at this URL is more recent than we support: [Big Buck Bunny.mp4]. We support versions up to [4].]]"}

mediaConvertInputFileToJobIdApiのテストイベント

デバッグで使ったテストイベントを記載しておきます。

OK

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
        "origin": "https://test.nightonly.com"
    },
    "queryStringParameters": {
        "input_file": "Big Buck Bunny.mp4"
    }
}

diff → Not found parameter input_file.

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
    },
    "queryStringParameters": {
        "diff": "Big Buck Bunny.mp4"
    }
}

blank → Not exist parameter input_file.

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
    },
    "queryStringParameters": {
        "input_file": ""
    }
}

null → Not exist parameter input_file.

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
    },
    "queryStringParameters": {
        "input_file": null
    }
}

mediaConvertJobIdToStatusApiのテストイベント

OK

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
        "origin": "https://test.nightonly.com"
    },
    "queryStringParameters": {
        "job_id": "123456789012-smb6o7"
    }
}

diff → Not found parameter job_id.

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
    },
    "queryStringParameters": {
        "diff": "123456789012-smb6o7"
    }
}

blank → Not exist parameter job_id.

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
    },
    "queryStringParameters": {
        "job_id": ""
    }
}

null → Not exist parameter input_file.

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
    },
    "queryStringParameters": {
        "job_id": null
    }
}

共通のテストイベント

OPTIONS

{
    "requestContext": {
        "http": {
            "method": "OPTIONS"
        }
    }
}

not → Not found parameter.

{
    "requestContext": {
        "http": {
            "method": "GET"
        }
    },
    "headers": {
    }
}

ログ記録(必要に応じて)

CloudWatchにロググループを作成してARNを設定すればOK。
ログの形式はCLFにしました。
TODO: s3に出力して永続化

API Gateway+LambdaでDynamoDB(MediaConvertの情報)を返す” に対して4件のコメントがあります。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です