MediaConvertのジョブの状態や進捗率をEventBridgeで通知できるので試してみました。
フロントからアップロードして、API叩いて進捗率を表示する想定です。
EventBridgeからLambda経由でDynamoDBに保存して、APIで取得できるようにします。

構成と流れ

S3に元素材をアップロード ※フロントからアップロードする想定
 ↓
Lambda(createMediaConvertJob) ※ここまでは作成済み → S3設置トリガーで自動変換と、MediaConvertでCMAFを暗号化(未解決)してみる
 ↓
MediaConvert(Job State Change) → EventBridge → Lambda → DynamoDB

アップロードしたURLでステータス確認 ※フロントで叩く想定のAPI
 ↓
API Gateway → Lambda → DynamoDB

DynamoDBのテーブル作成

https://ap-northeast-1.console.aws.amazon.com/dynamodbv2/home?region=ap-northeast-1#create-table

InputFile → JobId

アップロードしたURLしか知らないフロント向けにMediaConvertの最新のジョブIDを返してあげて、ステータス確認に使用する。

テーブルの作成
	テーブル名: MediaConvertInputFileToJobId
	パーティションキー: InputFile, 文字列
	[テーブルの作成]



JobId → OutputPath, KeyFile, KeyValue

成功時にS3のkeyファイルを作成するのに使用する。

テーブルの作成
	テーブル名: MediaConvertJobIdToOutput
	パーティションキー: JobId, 文字列
	[テーブルの作成]

JobId → JobStatus, ProgressRate等

MediaConvert Job State Changeの下記ステータスの最新を残すのに使用する。
PROGRESSING → STATUS_UPDATE → COMPLETE or ERROR
※INPUT_INFORMATIONは残さない。COMPLETEの前に発生する時と後に発生する場合がある。

テーブルの作成
	テーブル名: MediaConvertJobIdToStatus
	パーティションキー: JobId, 文字列
	[テーブルの作成]

DynamoDBの料金

料金 – Amazon DynamoDB | AWS

無料枠があるので、一定までは無料で使えると思いきや、
Auto ScalingでCloudWatchのアラームが設定されるので、その分の料金が発生します。
今回はテストで、Auto Scalingは不要なので削除しました。

[新機能] Amazon DynamoDBでAuto Scalingが可能に! | DevelopersIO

1テーブル辺り8個のアラームが設定されました。
1個のアラーム辺り$0.10/月なので、8個で$0.80/月 → 今回は3テーブルなので、$2.40/月

https://us-east-1.console.aws.amazon.com/billing/home#/bills


(AWSのイベント参加で頂いたクレジットを適用しているのでマイナスされています)

https://ap-northeast-1.console.aws.amazon.com/cloudwatch/home?region=ap-northeast-1#alarmsV2:

Lambda関数を作成

EventBridge設定の為、先に関数のみ作成します。

https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/create/function

関数の作成
	基本的な情報
		関数名: receiveMediaConvertJobState
		ランタイム: Python 3.9
	[関数の作成]

EventBridgeにルールを作成

Lambda関数のトリガーを追加からだと、イベントパターンにMediaConvertが出てこないので、EventBridgeから設定します。

https://ap-northeast-1.console.aws.amazon.com/events/home?region=ap-northeast-1#/rules/create

ルールの詳細を定義
	名前: receiveMediaConvertJobState
	[次へ]

イベントパターンを構築
	サンプルイベント - オプション
		サンプルイベント: MediaConvert Job State Change
	イベントパターン
		AWSのサービス: MediaConvert
		イベントタイプ: MediaConvert Job State Change
	[次へ]



ターゲットを選択
	ターゲットを選択: Lambda関数
	機能: receiveMediaConvertJobState	※作成したLambda関数名
	[次へ]

タグを設定 - オプション
	[次へ]

レビューと作成
	[次へ]


MediaConvertのステータス更新間隔を変更

デフォルトは60秒毎にイベントが起こりLambdaが起動します。
画面の進捗率の更新間隔もこれに従ってそう。

60秒や30秒だと長いし、通常15秒ぐらいにするのが良さそうですが、
テストなので10秒にしました。

https://ap-northeast-1.console.aws.amazon.com/mediaconvert/home?region=ap-northeast-1#/templates/list

IAMロールにポリシー追加

LambdaからDynamoDBにアクセスできるようにIAMにポリシーを追加します。

上記で作成したreceiveMediaConvertJobState

S3に保存できるように、MediaConvert_Default_Role_xxxxxxxxも追加

ポリシー名
	■AmazonDynamoDBFullAccess
	■MediaConvert_Default_Role_xxxxxxxx
	[ポリシーをアタッチ]

TODO: DynamoDBのテーブル名で権限を絞る
Amazon DynamoDB: 特定のテーブルへのアクセスの許可 – AWS Identity and Access Management

以前作成したcreateMediaConvertJob

ポリシー名
	■AmazonDynamoDBFullAccess
	[ポリシーをアタッチ]

Lambdaコード

createMediaConvertJobを修正

最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/createMediaConvertJob/lambda_function.py

import boto3
import os
import datetime
import urllib.parse
import secrets
- import codecs

+ dynamodb = boto3.resource('dynamodb')
+ table_to_output = dynamodb.Table('MediaConvertJobIdToOutput')
+ table_to_job_id = dynamodb.Table('MediaConvertInputFileToJobId')

- s3Client = boto3.client('s3')

OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET']
MEDIACONVERT_ENDPOINT_URL = os.environ['MEDIACONVERT_ENDPOINT_URL']
MEDIACONVERT_ROLE = os.environ['MEDIACONVERT_ROLE']
MEDIACONVERT_JOB_TEMPLATE = os.environ['MEDIACONVERT_JOB_TEMPLATE']
OUTPUT_SUFFIX_PATH = '_' + datetime.datetime.now().strftime('%Y%m%d%H%M%S')

def lambda_handler(event, context):
    print(event)

    input_bucket = 's3://' + event['Records'][0]['s3']['bucket']['name'] + '/' # 対象動画のバケット
    input_file = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key']) # 対象動画のフォルダを含むファイル名
    output_path = input_file + OUTPUT_SUFFIX_PATH + '/'
    key_file = os.path.splitext(os.path.basename(input_file))[0] + '.key'
    print({ 'input_bucket': input_bucket, 'input_file': input_file, 'output_path': output_path, 'key_file': key_file })

    # MediaConvertパラメータ
    key_value = secrets.token_hex(16)
    settings = {
        'Inputs': [
            {
                'FileInput': input_bucket + input_file
            }
        ],
        'OutputGroups': [
            {
                'OutputGroupSettings': {
                    'HlsGroupSettings': {
                        'Destination': 's3://' + OUTPUT_BUCKET + '/' + output_path,
                        'Encryption': {
                            'EncryptionMethod': 'AES128',
                            'StaticKeyProvider': {
                                'StaticKeyValue': key_value,
                                'Url': os.path.basename(output_path + key_file)
                            },
                            'Type': 'STATIC_KEY'
                        }
                    }
                }
            }
        ]
    }
    print(settings)

    # MediaConvertジョブ作成
    mediaconvertClient = boto3.client('mediaconvert', endpoint_url = MEDIACONVERT_ENDPOINT_URL)
    result = mediaconvertClient.create_job(
        Role = MEDIACONVERT_ROLE,
        JobTemplate = MEDIACONVERT_JOB_TEMPLATE,
        Settings = settings
    )
    print(result)

+    job_id = result['Job']['Id']
+    if job_id == '':
+        raise Exception('Notfound job_id')
+
+    # DynamoDBに出力情報保存 Tips: Keyファイル作成や保存先を連携するのに使用
+    utcnow = datetime.datetime.utcnow()
+    creation_time = int(utcnow.timestamp())
+    expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp())
+    print(table_to_output.update_item(
+        Key = {
+            'JobId': job_id
+        },
+        UpdateExpression = 'set OutputPath = :output_path, KeyFile = :key_file, KeyValue = :key_value,'
+                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
+        ExpressionAttributeValues = {
+            ':output_path': output_path,
+            ':key_file': key_file,
+            ':key_value': key_value,
+            ':creation_time': creation_time,
+            ':expiration_time': expiration_time
+        }
+    ))
+
+    # DynamoDBにジョブID保存 Tips: フロントに最新のジョブIDを返却するのに使用
+    print(table_to_job_id.update_item(
+        Key = {
+            'InputFile': input_file
+        },
+        UpdateExpression = 'set JobId = :job_id,'
+                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
+        ExpressionAttributeValues = {
+            ':job_id': job_id,
+            ':creation_time': creation_time,
+            ':expiration_time': expiration_time
+        }
+    ))
-
-    # keyファイル作成
-    print(s3Client.put_object(
-        Bucket = OUTPUT_BUCKET,
-        Key = output_path + key_file,
-        Body = codecs.decode(key_value, 'hex_codec')
-    ))
-
-    return result['Job']['Id']

keyファイルの情報はDynamoDBに保存して、変換が完了してから作成するようにしました。
変換に失敗してもS3にゴミが残らないように。

update_itemでKeyが存在しなかったら作成、存在したら更新する動きになります。

receiveMediaConvertJobStateを作成

最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/receiveMediaConvertJobState/lambda_function.py

import boto3
import os
import datetime
import codecs
import json
import re

dynamodb = boto3.resource('dynamodb')
table_to_status = dynamodb.Table('MediaConvertJobIdToStatus')
table_to_output = dynamodb.Table('MediaConvertJobIdToOutput')

s3Client = boto3.client('s3')

OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET']
INFO_FILE = 'info.json'

def lambda_handler(event, context):
    print(event)

    status = event['detail']['status']
    if status == 'PROGRESSING' or status == 'INPUT_INFORMATION':
        return

    utcnow = datetime.datetime.utcnow()
    creation_time = int(utcnow.timestamp())
    expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp())

    if status == 'STATUS_UPDATE':
        status_update(event, creation_time, expiration_time)
    elif status == 'COMPLETE':
        complete(event, creation_time, expiration_time)
    elif status == 'ERROR':
        error(event, creation_time, expiration_time)

def status_update(event, creation_time, expiration_time):
    progress_rate = event['detail']['jobProgress']['jobPercentComplete']
    print({ 'progress_rate': progress_rate })

    # DynamoDBにステータス保存 Tips: フロントに進捗を返却するのに使用
    print(table_to_status.update_item(
        Key = {
            'JobId': event['detail']['jobId']
        },
        UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
        ExpressionAttributeValues = {
            ':job_status': 'PROGRESSING',
            ':progress_rate': progress_rate,
            ':creation_time': creation_time,
            ':expiration_time': expiration_time
        }
    ))

def complete(event, creation_time, expiration_time):
    duration_ms = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['durationInMs']
    width_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['widthInPx']
    height_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['heightInPx']
    playlists = {}
    for playlist_file_path in event['detail']['outputGroupDetails'][0]['playlistFilePaths']:
        playlists[os.path.splitext(playlist_file_path)[1]] = os.path.basename(playlist_file_path)
    print({ 'duration_ms': duration_ms, 'width_px': width_px, 'height_px': height_px, 'playlists': playlists })

    # DynamoDBから出力情報取得
    response = table_to_output.get_item(
         Key = {
            'JobId': event['detail']['jobId']
        }
    )
    print(response)
    output_path = response['Item']['OutputPath']

    # S3にkeyファイル作成
    print(s3Client.put_object(
        Bucket = OUTPUT_BUCKET,
        Key = output_path + response['Item']['KeyFile'],
        Body = codecs.decode(response['Item']['KeyValue'], 'hex_codec')
    ))

    # DynamoDBにステータスと動画の情報保存 Tips: フロントに成功を返却するのに使用
    print(table_to_status.update_item(
        Key = {
            'JobId': event['detail']['jobId']
        },
        UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
                            + ' DurationMs = :duration_ms, WidthPx = :width_px, HeightPx = :height_px, OutputPath = :output_path, Playlists = :playlists,'
                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
        ExpressionAttributeValues = {
            ':job_status': 'COMPLETE',
            ':progress_rate': 100,
            ':duration_ms': duration_ms,
            ':width_px': width_px,
            ':height_px': height_px,
            ':output_path': output_path,
            ':playlists': playlists,
            ':creation_time': creation_time,
            ':expiration_time': expiration_time
        }
    ))

    # S3に情報ファイル作成 Tips: 動画の情報を永続化。DB等に保存して配信管理した方が良い
    print(s3Client.put_object(
        Bucket = OUTPUT_BUCKET,
        Key = output_path + INFO_FILE,
        Body = json.dumps({
            'duration_ms': duration_ms,
            'width_px': width_px,
            'height_px': height_px,
            'playlists': playlists
        })
    ))

def error(event, creation_time, expiration_time):
    error_code = event['detail']['errorCode']
    error_message = re.sub('s3://[a-z0-9.-]*/', '', event['detail']['errorMessage'])
    print({ 'error_code': error_code, 'error_message': error_message })

    # DynamoDBにステータスとエラー情報保存 Tips: フロントにエラーを返却するのに使用
    print(table_to_status.update_item(
        Key = {
            'JobId': event['detail']['jobId']
        },
        UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,'
                            + ' ErrorCode = :error_code, ErrorMessage = :error_message,'
                            + ' CreationTime = :creation_time, ExpirationTime = :expiration_time',
        ExpressionAttributeValues = {
            ':job_status': 'ERROR',
            ':progress_rate': 100,
            ':error_code': error_code,
            ':error_message': error_message,
            ':creation_time': creation_time,
            ':expiration_time': expiration_time
        }
    ))

エラーメッセージのS3のパスをフロントに表示したくないので、s3://xxx/を空に変更しています。
他にも見せたくない情報があるかは不明なので随時対応。

Statusは予約語で使えなかったので、JobStatusにしています。予約語めっちゃ多い。
DynamoDB の予約語 – Amazon DynamoDB

Response
{
  "errorMessage": "An error occurred (ValidationException) when calling the UpdateItem operation: Invalid UpdateExpression: Attribute name is a reserved keyword; reserved keyword: Status",
  "errorType": "ClientError",

環境変数

手順は、MediaConvertでHLSをAESで暗号化してみる 参照

OUTPUT_BUCKET: 変換後の動画や鍵を置くバケット

上で設定しているIAMの権限が足りないと下記エラーになる。

Response
{
  "errorMessage": "An error occurred (AccessDenied) when calling the PutObject operation: Access Denied",
  "errorType": "ClientError",

デプロイしてテスト・動作確認

対象のS3に動画ファイルをアップロード(またはコピー)
createMediaConvertJobのテストイベントを実行しても、
receiveMediaConvertJobStateも呼ばれる。

正常に通れば、MediaConvertのジョブに表示される
https://ap-northeast-1.console.aws.amazon.com/mediaconvert/home?region=ap-northeast-1#/jobs/list

上手く行かない場合は、CloudWatch Logsで確認して対応
https://ap-northeast-1.console.aws.amazon.com/cloudwatch/home?region=ap-northeast-1#logsV2:log-groups

receiveMediaConvertJobStateのテストイベント

デバッグで使ったテストイベントを記載しておきます。

STATUS_UPDATE

{
	"detail": {
		"jobId": "123456789012-smb6o7",
		"status": "STATUS_UPDATE",
		"jobProgress": {
			"jobPercentComplete": 70
		}
	}
}

COMPLETE

{
	"detail": {
		"jobId": "123456789012-smb6o7",
		"status": "COMPLETE",
		"outputGroupDetails": [
			{
				"outputDetails": [
					{
						"durationInMs": 596458,
						"videoDetails": {
							"widthInPx": 1280,
							"heightInPx": 720
						}
					}
				],
				"playlistFilePaths": [
					"s3://example-bucket/Big Buck Bunny.test.mp4_test/Big Buck Bunny.test.m3u8"
				]
			}
		]
	}
}

MediaConvertJobIdToOutputに上記のjobIdを作らないとエラーになります。

Response
{
  "errorMessage": "'Item'",
  "errorType": "KeyError",

ERROR

{
	"detail": {
		"jobId": "123456789012-smb6o7",
		"status": "ERROR",
		"errorCode": 1010,
		"errorMessage": "Unable to open input file [s3://example-bucket/Big Buck Bunny.mp4]: [Failed probe/open: [The version of the manifest file at this URL is more recent than we support: [s3://example-bucket/Big Buck Bunny.mp4]. We support versions up to [4].]]"
	}
}

DynamoDBのTTL有効化

先に設定しても問題ないですが、対象をプレビューで確認できるので、テーブルに値が入ってから設定しました。

https://ap-northeast-1.console.aws.amazon.com/dynamodbv2/home?region=ap-northeast-1#tables

作成した3つのテーブル毎に設定

Time to Live (TTL)の有効化
	TTL設定
		TTL属性名: ExpirationTime
	プレビュー
		[プレビューの実行]
	[TTLの有効化]


削除は期限切れになったらすぐにされる訳ではないようです。
仕組み: DynamoDB の有効期限 (TTL) – Amazon DynamoDB

通常、TTL は期限が切れた項目を期限切れから 48 時間以内に削除します。

長くなったので、フロントにステータスを返す所は次回書きます。
API Gateway+LambdaでDynamoDB(MediaConvertの情報)を返す
あと、試しながら作った為、手順が散らばっているので、まとめの記事も別途下記予定です。

EventBridgeでMediaConvertの進捗率を取得する” に対して2件のコメントがあります。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です