MediaConvertのジョブの状態や進捗率をEventBridgeで通知できるので試してみました。
フロントからアップロードして、API叩いて進捗率を表示する想定です。
EventBridgeからLambda経由でDynamoDBに保存して、APIで取得できるようにします。
- 構成と流れ
- DynamoDBのテーブル作成
- DynamoDBの料金
- Lambda関数を作成
- EventBridgeにルールを作成
- MediaConvertのステータス更新間隔を変更
- IAMロールにポリシー追加
- Lambdaコード
- デプロイしてテスト・動作確認
- DynamoDBのTTL有効化
- 続き
構成と流れ
S3に元素材をアップロード ※フロントからアップロードする想定
↓
Lambda(createMediaConvertJob) ※ここまでは作成済み → S3設置トリガーで自動変換と、MediaConvertでCMAFを暗号化(未解決)してみる
↓
MediaConvert(Job State Change) → EventBridge → Lambda → DynamoDB
アップロードしたURLでステータス確認 ※フロントで叩く想定のAPI
↓
API Gateway → Lambda → DynamoDB
DynamoDBのテーブル作成
https://ap-northeast-1.console.aws.amazon.com/dynamodbv2/home?region=ap-northeast-1#create-table
InputFile → JobId
アップロードしたURLしか知らないフロント向けにMediaConvertの最新のジョブIDを返してあげて、ステータス確認に使用する。
テーブルの作成 テーブル名: MediaConvertInputFileToJobId パーティションキー: InputFile, 文字列 [テーブルの作成]
JobId → OutputPath, KeyFile, KeyValue
成功時にS3のkeyファイルを作成するのに使用する。
テーブルの作成 テーブル名: MediaConvertJobIdToOutput パーティションキー: JobId, 文字列 [テーブルの作成]
JobId → JobStatus, ProgressRate等
MediaConvert Job State Changeの下記ステータスの最新を残すのに使用する。
PROGRESSING → STATUS_UPDATE → COMPLETE or ERROR
※INPUT_INFORMATIONは残さない。COMPLETEの前に発生する時と後に発生する場合がある。
テーブルの作成 テーブル名: MediaConvertJobIdToStatus パーティションキー: JobId, 文字列 [テーブルの作成]
DynamoDBの料金
無料枠があるので、一定までは無料で使えると思いきや、
Auto ScalingでCloudWatchのアラームが設定されるので、その分の料金が発生します。
今回はテストで、Auto Scalingは不要なので削除しました。
[新機能] Amazon DynamoDBでAuto Scalingが可能に! | DevelopersIO
1テーブル辺り8個のアラームが設定されました。
1個のアラーム辺り$0.10/月なので、8個で$0.80/月 → 今回は3テーブルなので、$2.40/月
https://us-east-1.console.aws.amazon.com/billing/home#/bills
(AWSのイベント参加で頂いたクレジットを適用しているのでマイナスされています)
https://ap-northeast-1.console.aws.amazon.com/cloudwatch/home?region=ap-northeast-1#alarmsV2:
Lambda関数を作成
EventBridge設定の為、先に関数のみ作成します。
https://ap-northeast-1.console.aws.amazon.com/lambda/home?region=ap-northeast-1#/create/function
関数の作成 基本的な情報 関数名: receiveMediaConvertJobState ランタイム: Python 3.9 [関数の作成]
EventBridgeにルールを作成
Lambda関数のトリガーを追加からだと、イベントパターンにMediaConvertが出てこないので、EventBridgeから設定します。
https://ap-northeast-1.console.aws.amazon.com/events/home?region=ap-northeast-1#/rules/create
ルールの詳細を定義 名前: receiveMediaConvertJobState [次へ]
イベントパターンを構築 サンプルイベント - オプション サンプルイベント: MediaConvert Job State Change イベントパターン AWSのサービス: MediaConvert イベントタイプ: MediaConvert Job State Change [次へ]
ターゲットを選択 ターゲットを選択: Lambda関数 機能: receiveMediaConvertJobState ※作成したLambda関数名 [次へ]
タグを設定 - オプション [次へ]
レビューと作成 [次へ]
MediaConvertのステータス更新間隔を変更
デフォルトは60秒毎にイベントが起こりLambdaが起動します。
画面の進捗率の更新間隔もこれに従ってそう。
60秒や30秒だと長いし、通常15秒ぐらいにするのが良さそうですが、
テストなので10秒にしました。
IAMロールにポリシー追加
LambdaからDynamoDBにアクセスできるようにIAMにポリシーを追加します。
上記で作成したreceiveMediaConvertJobState
S3に保存できるように、MediaConvert_Default_Role_xxxxxxxxも追加
ポリシー名 ■AmazonDynamoDBFullAccess ■MediaConvert_Default_Role_xxxxxxxx [ポリシーをアタッチ]
TODO: DynamoDBのテーブル名で権限を絞る
→ Amazon DynamoDB: 特定のテーブルへのアクセスの許可 – AWS Identity and Access Management
以前作成したcreateMediaConvertJob
ポリシー名 ■AmazonDynamoDBFullAccess [ポリシーをアタッチ]
Lambdaコード
createMediaConvertJobを修正
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/createMediaConvertJob/lambda_function.py
import boto3 import os import datetime import urllib.parse import secrets - import codecs + dynamodb = boto3.resource('dynamodb') + table_to_output = dynamodb.Table('MediaConvertJobIdToOutput') + table_to_job_id = dynamodb.Table('MediaConvertInputFileToJobId') - s3Client = boto3.client('s3') OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET'] MEDIACONVERT_ENDPOINT_URL = os.environ['MEDIACONVERT_ENDPOINT_URL'] MEDIACONVERT_ROLE = os.environ['MEDIACONVERT_ROLE'] MEDIACONVERT_JOB_TEMPLATE = os.environ['MEDIACONVERT_JOB_TEMPLATE'] OUTPUT_SUFFIX_PATH = '_' + datetime.datetime.now().strftime('%Y%m%d%H%M%S') def lambda_handler(event, context): print(event) input_bucket = 's3://' + event['Records'][0]['s3']['bucket']['name'] + '/' # 対象動画のバケット input_file = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key']) # 対象動画のフォルダを含むファイル名 output_path = input_file + OUTPUT_SUFFIX_PATH + '/' key_file = os.path.splitext(os.path.basename(input_file))[0] + '.key' print({ 'input_bucket': input_bucket, 'input_file': input_file, 'output_path': output_path, 'key_file': key_file }) # MediaConvertパラメータ key_value = secrets.token_hex(16) settings = { 'Inputs': [ { 'FileInput': input_bucket + input_file } ], 'OutputGroups': [ { 'OutputGroupSettings': { 'HlsGroupSettings': { 'Destination': 's3://' + OUTPUT_BUCKET + '/' + output_path, 'Encryption': { 'EncryptionMethod': 'AES128', 'StaticKeyProvider': { 'StaticKeyValue': key_value, 'Url': os.path.basename(output_path + key_file) }, 'Type': 'STATIC_KEY' } } } } ] } print(settings) # MediaConvertジョブ作成 mediaconvertClient = boto3.client('mediaconvert', endpoint_url = MEDIACONVERT_ENDPOINT_URL) result = mediaconvertClient.create_job( Role = MEDIACONVERT_ROLE, JobTemplate = MEDIACONVERT_JOB_TEMPLATE, Settings = settings ) print(result) + job_id = result['Job']['Id'] + if job_id == '': + raise Exception('Notfound job_id') + + # DynamoDBに出力情報保存 Tips: Keyファイル作成や保存先を連携するのに使用 + utcnow = datetime.datetime.utcnow() + creation_time = int(utcnow.timestamp()) + expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp()) + print(table_to_output.update_item( + Key = { + 'JobId': job_id + }, + UpdateExpression = 'set OutputPath = :output_path, KeyFile = :key_file, KeyValue = :key_value,' + + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', + ExpressionAttributeValues = { + ':output_path': output_path, + ':key_file': key_file, + ':key_value': key_value, + ':creation_time': creation_time, + ':expiration_time': expiration_time + } + )) + + # DynamoDBにジョブID保存 Tips: フロントに最新のジョブIDを返却するのに使用 + print(table_to_job_id.update_item( + Key = { + 'InputFile': input_file + }, + UpdateExpression = 'set JobId = :job_id,' + + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', + ExpressionAttributeValues = { + ':job_id': job_id, + ':creation_time': creation_time, + ':expiration_time': expiration_time + } + )) - - # keyファイル作成 - print(s3Client.put_object( - Bucket = OUTPUT_BUCKET, - Key = output_path + key_file, - Body = codecs.decode(key_value, 'hex_codec') - )) - - return result['Job']['Id']
keyファイルの情報はDynamoDBに保存して、変換が完了してから作成するようにしました。
変換に失敗してもS3にゴミが残らないように。
update_itemでKeyが存在しなかったら作成、存在したら更新する動きになります。
receiveMediaConvertJobStateを作成
最新のコードこちら → https://dev.azure.com/nightonly/_git/lambda-origin?path=/receiveMediaConvertJobState/lambda_function.py
import boto3 import os import datetime import codecs import json import re dynamodb = boto3.resource('dynamodb') table_to_status = dynamodb.Table('MediaConvertJobIdToStatus') table_to_output = dynamodb.Table('MediaConvertJobIdToOutput') s3Client = boto3.client('s3') OUTPUT_BUCKET = os.environ['OUTPUT_BUCKET'] INFO_FILE = 'info.json' def lambda_handler(event, context): print(event) status = event['detail']['status'] if status == 'PROGRESSING' or status == 'INPUT_INFORMATION': return utcnow = datetime.datetime.utcnow() creation_time = int(utcnow.timestamp()) expiration_time = int((utcnow + datetime.timedelta(days = 1)).timestamp()) if status == 'STATUS_UPDATE': status_update(event, creation_time, expiration_time) elif status == 'COMPLETE': complete(event, creation_time, expiration_time) elif status == 'ERROR': error(event, creation_time, expiration_time) def status_update(event, creation_time, expiration_time): progress_rate = event['detail']['jobProgress']['jobPercentComplete'] print({ 'progress_rate': progress_rate }) # DynamoDBにステータス保存 Tips: フロントに進捗を返却するのに使用 print(table_to_status.update_item( Key = { 'JobId': event['detail']['jobId'] }, UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,' + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', ExpressionAttributeValues = { ':job_status': 'PROGRESSING', ':progress_rate': progress_rate, ':creation_time': creation_time, ':expiration_time': expiration_time } )) def complete(event, creation_time, expiration_time): duration_ms = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['durationInMs'] width_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['widthInPx'] height_px = event['detail']['outputGroupDetails'][0]['outputDetails'][0]['videoDetails']['heightInPx'] playlists = {} for playlist_file_path in event['detail']['outputGroupDetails'][0]['playlistFilePaths']: playlists[os.path.splitext(playlist_file_path)[1]] = os.path.basename(playlist_file_path) print({ 'duration_ms': duration_ms, 'width_px': width_px, 'height_px': height_px, 'playlists': playlists }) # DynamoDBから出力情報取得 response = table_to_output.get_item( Key = { 'JobId': event['detail']['jobId'] } ) print(response) output_path = response['Item']['OutputPath'] # S3にkeyファイル作成 print(s3Client.put_object( Bucket = OUTPUT_BUCKET, Key = output_path + response['Item']['KeyFile'], Body = codecs.decode(response['Item']['KeyValue'], 'hex_codec') )) # DynamoDBにステータスと動画の情報保存 Tips: フロントに成功を返却するのに使用 print(table_to_status.update_item( Key = { 'JobId': event['detail']['jobId'] }, UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,' + ' DurationMs = :duration_ms, WidthPx = :width_px, HeightPx = :height_px, OutputPath = :output_path, Playlists = :playlists,' + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', ExpressionAttributeValues = { ':job_status': 'COMPLETE', ':progress_rate': 100, ':duration_ms': duration_ms, ':width_px': width_px, ':height_px': height_px, ':output_path': output_path, ':playlists': playlists, ':creation_time': creation_time, ':expiration_time': expiration_time } )) # S3に情報ファイル作成 Tips: 動画の情報を永続化。DB等に保存して配信管理した方が良い print(s3Client.put_object( Bucket = OUTPUT_BUCKET, Key = output_path + INFO_FILE, Body = json.dumps({ 'duration_ms': duration_ms, 'width_px': width_px, 'height_px': height_px, 'playlists': playlists }) )) def error(event, creation_time, expiration_time): error_code = event['detail']['errorCode'] error_message = re.sub('s3://[a-z0-9.-]*/', '', event['detail']['errorMessage']) print({ 'error_code': error_code, 'error_message': error_message }) # DynamoDBにステータスとエラー情報保存 Tips: フロントにエラーを返却するのに使用 print(table_to_status.update_item( Key = { 'JobId': event['detail']['jobId'] }, UpdateExpression = 'set JobStatus = :job_status, ProgressRate = :progress_rate,' + ' ErrorCode = :error_code, ErrorMessage = :error_message,' + ' CreationTime = :creation_time, ExpirationTime = :expiration_time', ExpressionAttributeValues = { ':job_status': 'ERROR', ':progress_rate': 100, ':error_code': error_code, ':error_message': error_message, ':creation_time': creation_time, ':expiration_time': expiration_time } ))
エラーメッセージのS3のパスをフロントに表示したくないので、s3://xxx/を空に変更しています。
他にも見せたくない情報があるかは不明なので随時対応。
Statusは予約語で使えなかったので、JobStatusにしています。予約語めっちゃ多い。
DynamoDB の予約語 – Amazon DynamoDB
Response { "errorMessage": "An error occurred (ValidationException) when calling the UpdateItem operation: Invalid UpdateExpression: Attribute name is a reserved keyword; reserved keyword: Status", "errorType": "ClientError",
環境変数
手順は、MediaConvertでHLSをAESで暗号化してみる 参照
OUTPUT_BUCKET: 変換後の動画や鍵を置くバケット
上で設定しているIAMの権限が足りないと下記エラーになる。
Response { "errorMessage": "An error occurred (AccessDenied) when calling the PutObject operation: Access Denied", "errorType": "ClientError",
デプロイしてテスト・動作確認
対象のS3に動画ファイルをアップロード(またはコピー)
createMediaConvertJobのテストイベントを実行しても、
receiveMediaConvertJobStateも呼ばれる。
正常に通れば、MediaConvertのジョブに表示される
https://ap-northeast-1.console.aws.amazon.com/mediaconvert/home?region=ap-northeast-1#/jobs/list
上手く行かない場合は、CloudWatch Logsで確認して対応
https://ap-northeast-1.console.aws.amazon.com/cloudwatch/home?region=ap-northeast-1#logsV2:log-groups
receiveMediaConvertJobStateのテストイベント
デバッグで使ったテストイベントを記載しておきます。
STATUS_UPDATE
{ "detail": { "jobId": "123456789012-smb6o7", "status": "STATUS_UPDATE", "jobProgress": { "jobPercentComplete": 70 } } }
COMPLETE
{ "detail": { "jobId": "123456789012-smb6o7", "status": "COMPLETE", "outputGroupDetails": [ { "outputDetails": [ { "durationInMs": 596458, "videoDetails": { "widthInPx": 1280, "heightInPx": 720 } } ], "playlistFilePaths": [ "s3://example-bucket/Big Buck Bunny.test.mp4_test/Big Buck Bunny.test.m3u8" ] } ] } }
MediaConvertJobIdToOutputに上記のjobIdを作らないとエラーになります。
Response { "errorMessage": "'Item'", "errorType": "KeyError",
ERROR
{ "detail": { "jobId": "123456789012-smb6o7", "status": "ERROR", "errorCode": 1010, "errorMessage": "Unable to open input file [s3://example-bucket/Big Buck Bunny.mp4]: [Failed probe/open: [The version of the manifest file at this URL is more recent than we support: [s3://example-bucket/Big Buck Bunny.mp4]. We support versions up to [4].]]" } }
DynamoDBのTTL有効化
先に設定しても問題ないですが、対象をプレビューで確認できるので、テーブルに値が入ってから設定しました。
https://ap-northeast-1.console.aws.amazon.com/dynamodbv2/home?region=ap-northeast-1#tables
作成した3つのテーブル毎に設定
Time to Live (TTL)の有効化 TTL設定 TTL属性名: ExpirationTime プレビュー [プレビューの実行] [TTLの有効化]
削除は期限切れになったらすぐにされる訳ではないようです。
仕組み: DynamoDB の有効期限 (TTL) – Amazon DynamoDB
通常、TTL は期限が切れた項目を期限切れから 48 時間以内に削除します。
続き
長くなったので、フロントにステータスを返す所は次回書きます。
→ API Gateway+LambdaでDynamoDB(MediaConvertの情報)を返す
あと、試しながら作った為、手順が散らばっているので、まとめの記事も別途下記予定です。
“EventBridgeでMediaConvertの進捗率を取得する” に対して2件のコメントがあります。