CloudFrontのリアルタイムログを試すで、Kinesis Data Streamsにログを流す所までやりましたが、今度はKinesis Data Analyticsで、Kinesis Data Streamsのストリームデータを参照して、一定時間のURI毎のリクエスト回数をリアルタイムに表示してみました。

Kinesis Data Analyticsでの集計結果をElasticsearchやRedshiftに送ると永続化できそうですが、それはまた次の機会に。
できれば、最近お気に入りのGoogleデータポータルで表示させたいですね。

Kinesis Data Analyticsの設定

Kinesis Data Streamsがバージニア北部にあるので、同じ所にする必要があります。

アプリケーションを作成

https://console.aws.amazon.com/kinesisanalytics/home?region=us-east-1#/gettingstarted
(サービス -> Kinesis -> バージニア北部)

[アプリケーションを作成]
Kinesis Data Analytics アプリケーション
	アプリケーション名:(ドメイン名)+「_SQL」	※何でも良さそうだけど、後で解りやすい名前にする
	[アプリケーションを作成]

ストリーミングデータを接続

[ストリーミングデータを接続]
ストリーミングデータソースを接続
	●ソースを選択	※デフォルト
	ソース: ●Kinesisデータストリーム
	Kinesisデータストリーム:(ドメイン名)	※対象のデータストリームを選択
	[スキーマを検出]


スキーマを編集

自動検出だと上手く行かないので、手動でスキーマを定義します。
また、ストリームがないと設定できない為、対象のURLを叩きながら設定します。

[スキーマを編集]
	列の区切り記号:, -> \t	※カンマ区切りをタブ区切りに変更
	[列の順序]	[列名]		[列のタイプ]
	1		c-timestamp	VARCHAR Length: 14
		※timestampは予約語の為、使えない。TIMESTAMPだと上手く認識出来なかったので、VARCHARを使用
	2		c-ip		VARCHAR Length: 15
	3		sc-status	INTEGER
	4		cs-method	VARCHAR Length: 6
	5		cs-uri-stem	VARCHAR Length: 2083	※IEでのURL最大値
	6		cs-user-agent	VARCHAR Length: 4096	※不明な為、2の12乗にしてみました
	7		cs-referer	VARCHAR Length: 2083	※IEでのURL最大値
	[スキーマの保存とストリームサンプルの更新]
※問題なく表示されたら、
	[終了(完了)]

リアルタイム分析

[Go to SQL editor]

ストリーミングSQLの実用的なサンプル

  • DESTINATION_SQL_STREAM: アクセスが来たタイミングで取得。このページでの確認用で、その後の利用用途はありません
  • DESTINATION_SQL_STREAM_URI_COUNT: HTTPステータス200のリクエストを15秒間貯めて、集計した結果を取得
  • DESTINATION_SQL_STREAM_STATUS_ERROR: HTTPステータス200以外のリクエストが来たタイミングで取得
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
	"c-timestamp" VARCHAR(14),
	"c-ip" VARCHAR(15),
	"sc-status" INTEGER,
	"cs-method" VARCHAR(6),
	"cs-uri-stem" VARCHAR(2083),
	"cs-user-agent" VARCHAR(4096),
	"cs-referer" VARCHAR(2083)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
	"c-timestamp",
	"c-ip",
	"sc-status",
	"cs-method",
	"cs-uri-stem",
	"cs-user-agent",
	"cs-referer"
FROM
    "SOURCE_SQL_STREAM_001";

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_URI_COUNT" (
	"aggregation_time" TIMESTAMP,
	"cs-uri-stem" VARCHAR(2083),
	"cs-uri-stem_count" INTEGER
);
CREATE OR REPLACE PUMP "STREAM_PUMP_URI_COUNT" AS INSERT INTO "DESTINATION_SQL_STREAM_URI_COUNT"
SELECT STREAM
	ROWTIME AS "aggregation_time",
	"cs-uri-stem",
	COUNT(*) AS "cs-uri-stem_count"
FROM
    "SOURCE_SQL_STREAM_001"
WHERE
    "sc-status" = 200
GROUP BY
    "cs-uri-stem",
    FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 15 TO SECOND);

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_STATUS_ERROR" (
	"c-timestamp" VARCHAR(14),
	"c-ip" VARCHAR(15),
	"sc-status" INTEGER,
	"cs-method" VARCHAR(6),
	"cs-uri-stem" VARCHAR(2083),
	"cs-user-agent" VARCHAR(4096),
	"cs-referer" VARCHAR(2083)
);
CREATE OR REPLACE PUMP "STREAM_PUMP_STATUS_ERROR" AS INSERT INTO "DESTINATION_SQL_STREAM_STATUS_ERROR"
SELECT STREAM
	"c-timestamp",
	"c-ip",
	"sc-status",
	"cs-method",
	"cs-uri-stem",
	"cs-user-agent",
	"cs-referer"
FROM
    "SOURCE_SQL_STREAM_001"
WHERE
    "sc-status" <> 200;
[SQLを保存して実行]
(●DESTINATION_SQL_STREAM)
※ストリームがないと表示されない為、対象のURLを叩いて待つ。

●DESTINATION_SQL_STREAM_URI_COUNT
※ストリームがないと表示されない為、対象のURLを叩いて待つ。

●DESTINATION_SQL_STREAM_STATUS_ERROR
※ストリームがないと表示されない為、対象のURLを叩いて待つ。

これで、ストリーミングデータの集計は完了です。
ただ、リアルタイムに集計して捨てている状態なので、次回はElasticsearchへの連携について書いて行きます。

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です