CloudFrontのリアルタイムログを試すで、Kinesis Data Streamsにログを流す所までやりましたが、今度はKinesis Data Analyticsで、Kinesis Data Streamsのストリームデータを参照して、一定時間のURI毎のリクエスト回数をリアルタイムに表示してみました。
Kinesis Data Analyticsでの集計結果をElasticsearchやRedshiftに送ると永続化できそうですが、それはまた次の機会に。
できれば、最近お気に入りのGoogleデータポータルで表示させたいですね。
Kinesis Data Analyticsの設定
Kinesis Data Streamsがバージニア北部にあるので、同じ所にする必要があります。
アプリケーションを作成
https://console.aws.amazon.com/kinesisanalytics/home?region=us-east-1#/gettingstarted
(サービス -> Kinesis -> バージニア北部)
[アプリケーションを作成] Kinesis Data Analytics アプリケーション アプリケーション名:(ドメイン名)+「_SQL」 ※何でも良さそうだけど、後で解りやすい名前にする [アプリケーションを作成]


ストリーミングデータを接続
[ストリーミングデータを接続] ストリーミングデータソースを接続 ●ソースを選択 ※デフォルト ソース: ●Kinesisデータストリーム Kinesisデータストリーム:(ドメイン名) ※対象のデータストリームを選択 [スキーマを検出]



スキーマを編集
自動検出だと上手く行かないので、手動でスキーマを定義します。
また、ストリームがないと設定できない為、対象のURLを叩きながら設定します。
[スキーマを編集] 列の区切り記号:, -> \t ※カンマ区切りをタブ区切りに変更 [列の順序] [列名] [列のタイプ] 1 c-timestamp VARCHAR Length: 14 ※timestampは予約語の為、使えない。TIMESTAMPだと上手く認識出来なかったので、VARCHARを使用 2 c-ip VARCHAR Length: 15 3 sc-status INTEGER 4 cs-method VARCHAR Length: 6 5 cs-uri-stem VARCHAR Length: 2083 ※IEでのURL最大値 6 cs-user-agent VARCHAR Length: 4096 ※不明な為、2の12乗にしてみました 7 cs-referer VARCHAR Length: 2083 ※IEでのURL最大値 [スキーマの保存とストリームサンプルの更新] ※問題なく表示されたら、 [終了(完了)]



リアルタイム分析
[Go to SQL editor]


ストリーミングSQLの実用的なサンプル
- DESTINATION_SQL_STREAM: アクセスが来たタイミングで取得。このページでの確認用で、その後の利用用途はありません
- DESTINATION_SQL_STREAM_URI_COUNT: HTTPステータス200のリクエストを15秒間貯めて、集計した結果を取得
- DESTINATION_SQL_STREAM_STATUS_ERROR: HTTPステータス200以外のリクエストが来たタイミングで取得
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"c-timestamp" VARCHAR(14),
"c-ip" VARCHAR(15),
"sc-status" INTEGER,
"cs-method" VARCHAR(6),
"cs-uri-stem" VARCHAR(2083),
"cs-user-agent" VARCHAR(4096),
"cs-referer" VARCHAR(2083)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
"c-timestamp",
"c-ip",
"sc-status",
"cs-method",
"cs-uri-stem",
"cs-user-agent",
"cs-referer"
FROM
"SOURCE_SQL_STREAM_001";
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_URI_COUNT" (
"aggregation_time" TIMESTAMP,
"cs-uri-stem" VARCHAR(2083),
"cs-uri-stem_count" INTEGER
);
CREATE OR REPLACE PUMP "STREAM_PUMP_URI_COUNT" AS INSERT INTO "DESTINATION_SQL_STREAM_URI_COUNT"
SELECT STREAM
ROWTIME AS "aggregation_time",
"cs-uri-stem",
COUNT(*) AS "cs-uri-stem_count"
FROM
"SOURCE_SQL_STREAM_001"
WHERE
"sc-status" = 200
GROUP BY
"cs-uri-stem",
FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 15 TO SECOND);
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_STATUS_ERROR" (
"c-timestamp" VARCHAR(14),
"c-ip" VARCHAR(15),
"sc-status" INTEGER,
"cs-method" VARCHAR(6),
"cs-uri-stem" VARCHAR(2083),
"cs-user-agent" VARCHAR(4096),
"cs-referer" VARCHAR(2083)
);
CREATE OR REPLACE PUMP "STREAM_PUMP_STATUS_ERROR" AS INSERT INTO "DESTINATION_SQL_STREAM_STATUS_ERROR"
SELECT STREAM
"c-timestamp",
"c-ip",
"sc-status",
"cs-method",
"cs-uri-stem",
"cs-user-agent",
"cs-referer"
FROM
"SOURCE_SQL_STREAM_001"
WHERE
"sc-status" <> 200;
[SQLを保存して実行] (●DESTINATION_SQL_STREAM) ※ストリームがないと表示されない為、対象のURLを叩いて待つ。

●DESTINATION_SQL_STREAM_URI_COUNT ※ストリームがないと表示されない為、対象のURLを叩いて待つ。

●DESTINATION_SQL_STREAM_STATUS_ERROR ※ストリームがないと表示されない為、対象のURLを叩いて待つ。

これで、ストリーミングデータの集計は完了です。
ただ、リアルタイムに集計して捨てている状態なので、次回はElasticsearchへの連携について書いて行きます。
