CloudFrontのリアルタイムログを試すで、Kinesis Data Streamsにログを流す所までやりましたが、今度はKinesis Data Analyticsで、Kinesis Data Streamsのストリームデータを参照して、一定時間のURI毎のリクエスト回数をリアルタイムに表示してみました。
Kinesis Data Analyticsでの集計結果をElasticsearchやRedshiftに送ると永続化できそうですが、それはまた次の機会に。
できれば、最近お気に入りのGoogleデータポータルで表示させたいですね。
Kinesis Data Analyticsの設定
Kinesis Data Streamsがバージニア北部にあるので、同じ所にする必要があります。
アプリケーションを作成
https://console.aws.amazon.com/kinesisanalytics/home?region=us-east-1#/gettingstarted
(サービス -> Kinesis -> バージニア北部)
[アプリケーションを作成] Kinesis Data Analytics アプリケーション アプリケーション名:(ドメイン名)+「_SQL」 ※何でも良さそうだけど、後で解りやすい名前にする [アプリケーションを作成]
ストリーミングデータを接続
[ストリーミングデータを接続] ストリーミングデータソースを接続 ●ソースを選択 ※デフォルト ソース: ●Kinesisデータストリーム Kinesisデータストリーム:(ドメイン名) ※対象のデータストリームを選択 [スキーマを検出]
スキーマを編集
自動検出だと上手く行かないので、手動でスキーマを定義します。
また、ストリームがないと設定できない為、対象のURLを叩きながら設定します。
[スキーマを編集] 列の区切り記号:, -> \t ※カンマ区切りをタブ区切りに変更 [列の順序] [列名] [列のタイプ] 1 c-timestamp VARCHAR Length: 14 ※timestampは予約語の為、使えない。TIMESTAMPだと上手く認識出来なかったので、VARCHARを使用 2 c-ip VARCHAR Length: 15 3 sc-status INTEGER 4 cs-method VARCHAR Length: 6 5 cs-uri-stem VARCHAR Length: 2083 ※IEでのURL最大値 6 cs-user-agent VARCHAR Length: 4096 ※不明な為、2の12乗にしてみました 7 cs-referer VARCHAR Length: 2083 ※IEでのURL最大値 [スキーマの保存とストリームサンプルの更新] ※問題なく表示されたら、 [終了(完了)]
リアルタイム分析
[Go to SQL editor]
ストリーミングSQLの実用的なサンプル
- DESTINATION_SQL_STREAM: アクセスが来たタイミングで取得。このページでの確認用で、その後の利用用途はありません
- DESTINATION_SQL_STREAM_URI_COUNT: HTTPステータス200のリクエストを15秒間貯めて、集計した結果を取得
- DESTINATION_SQL_STREAM_STATUS_ERROR: HTTPステータス200以外のリクエストが来たタイミングで取得
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "c-timestamp" VARCHAR(14), "c-ip" VARCHAR(15), "sc-status" INTEGER, "cs-method" VARCHAR(6), "cs-uri-stem" VARCHAR(2083), "cs-user-agent" VARCHAR(4096), "cs-referer" VARCHAR(2083) ); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "c-timestamp", "c-ip", "sc-status", "cs-method", "cs-uri-stem", "cs-user-agent", "cs-referer" FROM "SOURCE_SQL_STREAM_001"; CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_URI_COUNT" ( "aggregation_time" TIMESTAMP, "cs-uri-stem" VARCHAR(2083), "cs-uri-stem_count" INTEGER ); CREATE OR REPLACE PUMP "STREAM_PUMP_URI_COUNT" AS INSERT INTO "DESTINATION_SQL_STREAM_URI_COUNT" SELECT STREAM ROWTIME AS "aggregation_time", "cs-uri-stem", COUNT(*) AS "cs-uri-stem_count" FROM "SOURCE_SQL_STREAM_001" WHERE "sc-status" = 200 GROUP BY "cs-uri-stem", FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 15 TO SECOND); CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_STATUS_ERROR" ( "c-timestamp" VARCHAR(14), "c-ip" VARCHAR(15), "sc-status" INTEGER, "cs-method" VARCHAR(6), "cs-uri-stem" VARCHAR(2083), "cs-user-agent" VARCHAR(4096), "cs-referer" VARCHAR(2083) ); CREATE OR REPLACE PUMP "STREAM_PUMP_STATUS_ERROR" AS INSERT INTO "DESTINATION_SQL_STREAM_STATUS_ERROR" SELECT STREAM "c-timestamp", "c-ip", "sc-status", "cs-method", "cs-uri-stem", "cs-user-agent", "cs-referer" FROM "SOURCE_SQL_STREAM_001" WHERE "sc-status" <> 200;
[SQLを保存して実行] (●DESTINATION_SQL_STREAM) ※ストリームがないと表示されない為、対象のURLを叩いて待つ。
●DESTINATION_SQL_STREAM_URI_COUNT ※ストリームがないと表示されない為、対象のURLを叩いて待つ。
●DESTINATION_SQL_STREAM_STATUS_ERROR ※ストリームがないと表示されない為、対象のURLを叩いて待つ。
これで、ストリーミングデータの集計は完了です。
ただ、リアルタイムに集計して捨てている状態なので、次回はElasticsearchへの連携について書いて行きます。