統合タイプHTTPで指定したURLで、$connect/$disconnect/$defaultのリクエストを受け取って、実装済みのAction Cableともメッセージのやり取りが出来るように実装します。
フロントはNuxtで実装したので、次回、書きます。
先にまとめ
・リクエストオリジンのURLを制限するとセキュアに出来る。
但し、設定しても接続時しか取得できない。
・Action Cableでは購読チャネルに送れば終わりだけど、
API Gatewayでは接続しているコネクションIDに送る必要がある。
・Aws::ApiGatewayManagementApiを使用する。
・サーバー側で切断する場合は、レスポンス返せなくなるので、
非同期で少し待ってから実施した方が良さそう。
API Gatewayの設定
API Gateway(WebSocket API)の統合タイプHTTPを試す で設定した環境を使います。
追加でoriginもヘッダーに追加されるように設定して、Action Cableのホワイトリストチェック(allowed_request_origins)と同じようにリクエスト元サイトを制限するようにします。
$connect以外は設定しても空になります(接続時以外はsocket内の通信だからでしょうね)
% aws apigatewayv2 get-integrations --api-id etg4aa87d6
→ $connectのIntegrationIdを確認
connectionIdは設定済みの為、originのみ
URLは画面で変更した為、割愛
% aws apigatewayv2 update-integration --api-id etg4aa87d6 \
--integration-id t3j407f \
--request-parameters 'integration.request.header.origin'='context.origin'
{
"ConnectionType": "INTERNET",
"IntegrationId": "t3j407f",
"IntegrationMethod": "POST",
"IntegrationType": "HTTP_PROXY",
"IntegrationUri": "https://railsapp.nightonly.com/ws_apigateway/default.json",
"PassthroughBehavior": "WHEN_NO_MATCH",
"PayloadFormatVersion": "1.0",
"RequestParameters": {
"integration.request.header.origin": "context.origin",
"integration.request.header.connectionId": "context.connectionId"
},
"TimeoutInMillis": 29000
}
Gem追加
AWSのGem一覧は下記に記載されています。
今回は、Aws::ApiGatewayManagementApiを使用します。
GitHub – aws/aws-sdk-ruby: The official AWS SDK for Ruby.
Gemfile の最後に追加
# Use AWS SDK gem 'aws-sdk-apigatewaymanagementapi'
% bundle install
自動生成
リクエストを受け取るコントローラーとconnectionIdを保存するカラムを作る
% rails g controller ws_apigateway % rails g migration AddConnectionIdToWsTokens
メッセージ送信とコネクション削除を非同期で行う為、ジョブを用意
% rails g job message::broadcast % rails g job message::delete_connection
カラム追加
今回は、下記で作成したws_tokensにconnectionIdを保存するカラムを追加します。
WebSocket(Action Cable)の認証方法を考えて実装する
WebSocket(Action Cable)の認証等の結果通知方法を考えて実装する
db/migrate/20220312024426_add_connection_id_to_ws_tokens.rb
class AddConnectionIdToWsTokens < ActiveRecord::Migration[6.1] def change + change_column_comment :ws_tokens, :request_id, 'リクエストID(Action Cable)' + + add_column :ws_tokens, :connection_id, :string, comment: '接続ID(API Gateway)' + add_index :ws_tokens, :connection_id, unique: true, name: 'index_ws_tokens1' + add_index :ws_tokens, [:connection_id, :last_status, :user_id], name: 'index_ws_tokens2' end end
config/locales/ja.yml
ja:
activerecord:
attributes:
ws_token:
code: 'コード'
user: 'ユーザー'
auth_successed_at: '認証成功日時'
last_auth_successed_at: '最終認証成功日時'
last_status: '最終ステータス'
- request_id: 'リクエストID'
+ request_id: 'リクエストID(Action Cable)'
+ connection_id: '接続ID(API Gateway)'
ルート追加
config/routes.rb
Rails.application.routes.draw do + draw :ws_apigateway
config/routes/ws_apigateway.rb を作成
Rails.application.routes.draw do
defaults format: :json do
post 'ws_apigateway/connect', to: 'ws_apigateway#connect', as: 'connect_ws_apigateway'
post 'ws_apigateway/disconnect', to: 'ws_apigateway#disconnect', as: 'disconnect_ws_apigateway'
post 'ws_apigateway/default', to: 'ws_apigateway#default', as: 'default_ws_apigateway'
end
end
コントローラーを実装
app/controllers/ws_apigateway_controller.rb
class WsApigatewayController < ApplicationController include MessageConcern
ConcernでAction Cableと処理を共通化
skip_before_action :verify_authenticity_token
CSRFトークンチェックをスキップ
before_action :check_origin, only: %i[connect] # Tips: 設定しても接続時しか取得できない
before_action :check_connectionid # Tips: API Gatewayでrequest-parametersの設定必要
before_action :auth_request, only: %i[default]
# POST /ws_apigateway/connect(.json) WebSocket接続(処理)
def connect
logger.debug('==== connect')
render json: { action: 'connect', success: true }
end
# POST /ws_apigateway/disconnect(.json) WebSocket切断(処理)
def disconnect
logger.debug('==== disconnect')
ws_token = WsToken.find_by(connection_id: @connection_id)
return render json: { action: 'disconnect', success: true, alert: t('alert.ws_apigateway.connection_id.notfound') } if ws_token.blank? # Tips: 成功を返す
ws_token.update!(last_status: 'unsubscribed')
render json: { action: 'disconnect', success: true }
end
# POST /ws_apigateway/default(.json) WebSocketアクション(処理)
def default
logger.debug("==== default: #{params['method']}")
case params['method']
when 'auth_request'
render json: { action: 'auth_result', success: true, data: params }
when 'get_messages'
common_get_messages(params, nil, @connection_id)
when 'send_message'
common_send_message(params, nil, @connection_id)
else
head :not_found
end
end
private
# オリジンURLチェック
def check_origin
logger.debug("==== check_origin: #{request.headers[:HTTP_ORIGIN]}")
origin = request.headers[:HTTP_ORIGIN]
return render './failure', locals: { alert: t('alert.ws_apigateway.origin.blank') }, status: :bad_request if origin.blank?
return render './failure', locals: { alert: t('alert.ws_apigateway.origin.invalid') }, status: :unprocessable_entity unless websocket_allowed(origin)
end
# オリジンが許可されているか返却
def websocket_allowed(origin)
Array(Settings['websocket_allowed']).any? { |allowed_origin| allowed_origin.match?(origin) }
end
Gemのソースを「allowed_origin === origin」を拝借しましたが、Rubocopに怒られたので「allowed_origin.match?(origin)」に変更。
# コネクションIDチェック
def check_connectionid
logger.debug("==== check_connectionid: #{request.headers[:HTTP_CONNECTIONID]}")
@connection_id = request.headers[:HTTP_CONNECTIONID]
return render './failure', locals: { alert: t('alert.ws_apigateway.connection_id.blank') }, status: :bad_request if @connection_id.blank?
return render './failure', locals: { alert: t('alert.ws_apigateway.connection_id.invalid') }, status: :unprocessable_entity if @connection_id.length > 255
end
# 認証・再認証
def auth_request
logger.debug('==== auth_request')
@ws_token = WsToken.find_by(connection_id: @connection_id)
common_auth_request(params, nil, @connection_id)
end
end
app/channels/message_channel.rbも変更していますが、割愛します。
コミットログを参照してください。
app/controllers/concerns/message_concern.rb を作成
module MessageConcern
extend ActiveSupport::Concern
private
# 認証・再認証
def common_auth_request(data, connection, connection_id = nil)
error, ws_token = check_token(data['token'])
if error.present?
ws_token = update_ws_token(ws_token, false, error, connection, connection_id)
respons_data = { action: 'auth_result', success: false, alert: I18n.t("alert.ws_token.#{error}"), data: data }
if connection.present? # Action Cableのみ
ActionCable.server.broadcast(@session_channel, respons_data)
elsif connection_id.present? # API Gatewayのみ
render json: respons_data, status: :unprocessable_entity
end
Message::DeleteConnectionJob.set(wait: 5.seconds).perform_later(connection, connection_id)
直ぐに切断するとレスポンス返せなくなるので、非同期で少し待ってから切断。
else
ws_token = update_ws_token(ws_token, true, 'success', connection, connection_id)
if connection.present? # Action Cableのみ
add_stream_channel(ws_token.user) if @ws_token.blank?
ActionCable.server.broadcast(@session_channel, { action: 'auth_result', success: true, data: data })
end
end
logger.debug("---- error: #{error}, @ws_token: #{@ws_token.inspect} -> #{ws_token.inspect}")
@ws_token = ws_token
error.blank?
end
# メッセージ取得
def common_get_messages(data, connection, connection_id = nil)
if data['last_id'].present?
last_id = data['last_id'].to_i
if last_id <= 0
respons_data = { action: 'get_messages', success: false, alert: I18n.t('alert.message.last_id.invalid'), data: data }
if connection.present? # Action Cableのみ
ActionCable.server.broadcast(@session_channel, respons_data)
elsif connection_id.present? # API Gatewayのみ
render json: respons_data, status: :bad_request
end
return
end
messages = Message.where(channel: subscribe_channels(@ws_token.user), deleted_at: nil).where(id: (last_id + 1)...)
unsent_count = [messages.count - Settings['max_messages_limit'], 0].max
messages = messages.eager_load(:user).limit(Settings['max_messages_limit']).reverse
else
limit = data['limit'].to_i
limit = limit <= 0 ? Settings['default_messages_limit'] : [limit, Settings['max_messages_limit']].min
messages = Message.where(channel: subscribe_channels(@ws_token.user), deleted_at: nil).eager_load(:user).limit(limit).reverse
unsent_count = 0
end
messages_data = []
messages.each do |message|
messages_data.push(send_message_data(message.user, message))
end
respons_data = { action: 'get_messages', success: true, messages: messages_data, unsent_count: unsent_count, data: data }
if connection.present? # Action Cableのみ
ActionCable.server.broadcast(@session_channel, respons_data)
elsif connection_id.present? # API Gatewayのみ
render json: respons_data
end
end
# メッセージ送信(受信)
def common_send_message(data, connection, connection_id = nil)
channel = data['channel']
real_channel = send_real_channel(@ws_token.user, channel)
if real_channel.blank?
respons_data = { action: 'send_result', success: false, alert: I18n.t('alert.message.channel.not_subscribed'), data: data }
if connection.present? # Action Cableのみ
ActionCable.server.broadcast(@session_channel, respons_data)
elsif connection_id.present? # API Gatewayのみ
render json: respons_data
end
return
end
message = Message.new(channel: real_channel, body: data['body'], user_id: @ws_token.user_id)
message.save!
Message::BroadcastJob.perform_later(@ws_token.user, channel, real_channel, message)
respons_data = { action: 'send_result', success: true, data: data }
if connection.present? # Action Cableのみ
ActionCable.server.broadcast(@session_channel, respons_data)
elsif connection_id.present? # API Gatewayのみ
render json: respons_data
end
end
# トークンチェック
def check_token(token)
ws_token = nil
error = nil
if token.blank?
error = 'blank'
elsif @ws_token.present? && @ws_token.code != token
error = 'different'
else
ws_token = WsToken.where(code: token).eager_load(:user).first
if ws_token.blank?
error = 'notfound'
elsif ws_token.auth_successed_at.blank? && (ws_token.created_at < Time.current - Settings['token_expired_start_minutes'].to_i.minutes)
error = 'expired_start' # Tips: 発行後、一定時間以内に開始してない場合は無効
elsif ws_token.created_at < Time.current - Settings['token_expired_hours'].to_i.hours
error = 'expired_created' # Tips: 発行後、一定時間以上経過したら無効
end
end
[error, ws_token]
end
# トークン情報更新
def update_ws_token(ws_token, auth_success, status, connection, connection_id = nil)
return if ws_token.blank?
if auth_success
ws_token.last_auth_successed_at = Time.current
ws_token.auth_successed_at = ws_token.last_auth_successed_at if ws_token.auth_successed_at.blank?
end
ws_token.last_status = status
if connection.present? # Action Cableのみ
ws_token.request_id = connection.statistics[:request_id]
ws_token.started_at = connection.statistics[:started_at]
elsif connection_id.present? && @ws_token.blank? # API Gatewayで、初回のみ
ws_token.connection_id = connection_id
ws_token.started_at = Time.current
end
ws_token.save!
ws_token
end
# 購読チャネル追加
def add_stream_channel(user)
stream_from 'all_channel'
stream_from real_user_channel(user) if user.present?
end
# 購読チャネル一覧を返却
def subscribe_channels(user)
channels = ['all_channel']
channels.push(real_user_channel(user)) if user.present?
channels
end
# 送信チャネルを返却
def send_real_channel(user, channel)
return if user.blank? # Tips: 未ログインでは送信させない
case channel
when 'all_channel'
channel
when 'user_channel'
real_user_channel(user)
end
end
# ユーザーチャネルを返却
def real_user_channel(user)
user.present? ? "user_channel_#{user.code}" : nil
end
# 送信対象のWebSocketトークンを返却
def send_ws_tokens(user, channel)
case channel
when 'all_channel'
WsToken.where.not(connection_id: nil).where(last_status: 'success')
when 'user_channel'
WsToken.where.not(connection_id: nil).where(last_status: 'success').where(user_id: user.id)
end
end
# 送信メッセージを返却
def send_message_data(user, message)
{
id: message.id,
channel: message.channel,
channel_i18n: message.channel_i18n,
body: message.body,
user: {
code: user.code,
image_url: {
mini: "#{Settings['base_image_url']}#{user.image_url(:mini)}",
small: "#{Settings['base_image_url']}#{user.image_url(:small)}",
medium: "#{Settings['base_image_url']}#{user.image_url(:medium)}",
large: "#{Settings['base_image_url']}#{user.image_url(:large)}",
xlarge: "#{Settings['base_image_url']}#{user.image_url(:xlarge)}"
},
name: user.name
},
created_at: message.created_at,
created_at_i18n: I18n.l(message.created_at)
}
end
end
ジョブを実装
app/jobs/message/broadcast_job.rb
class Message::BroadcastJob < ApplicationJob + include MessageConcern
ジョブからもコントローラーのConcernが呼べました。
queue_as :default
- def perform(*args)
- # Do something later
+ # メッセージ送信
+ def perform(user, channel, real_channel, message)
+ logger.debug("==== Message::BroadcastJob: #{user.id}, #{channel}, #{real_channel}, #{message}")
+ data = { action: 'send_message', success: true, messages: [send_message_data(user, message)] }
+
+ # Action Cable # Tips: 購読チャネルに送る
+ ActionCable.server.broadcast(real_channel, data)
+
+ # API Gateway # Tips: 接続しているコネクションIDに送る
+ ws_tokens = send_ws_tokens(user, channel)
+ logger.debug("---- ws_tokens: #{ws_tokens&.ids}")
+ return if ws_tokens.blank?
+
+ data_json = data.to_json
+ api_gw = Aws::ApiGatewayManagementApi::Client.new(region: Settings['ws_apigateway_region'], endpoint: Settings['ws_apigateway_endpoint'])
+ ws_tokens.each do |ws_token|
+ api_gw.post_to_connection({ connection_id: ws_token.connection_id, data: data_json })
+ end
end
end
Action Cableでの接続でも、API Gatewayでの接続でも送れるように実装
config/settings/development.yml に追加
+ ws_apigateway_region: 'ap-northeast-1' + ws_apigateway_endpoint: 'https://(api-id).execute-api.ap-northeast-1.amazonaws.com/(stage)'
(api-id)と(stage)は環境に合わせて変更してください。regionも
app/jobs/message/delete_connection_job.rb
class Message::DeleteConnectionJob < ApplicationJob - queue_as :default + queue_as :low_priority
切断は急がなないので、lowに下げる。
- def perform(*args)
- # Do something later
+ # WebSocket切断 # Tips: フロント側でリトライされないようにする必要がある
+ def perform(connection, connection_id)
+ logger.debug("==== Message::DeleteConnectionJob: #{connection}, #{connection_id}")
+
+ if connection.present? # Action Cableのみ
+ connection.close
+ elsif connection_id.present? # API Gatewayのみ
+ api_gw = Aws::ApiGatewayManagementApi::Client.new(region: Settings['ws_apigateway_region'], endpoint: Settings['ws_apigateway_endpoint'])
+ api_gw.delete_connection({ connection_id: connection_id })
+ end
end
end

“API Gateway(WebSocket API)のバックエンドをRailsで実装する” に対して1件のコメントがあります。