統合タイプHTTPで指定したURLで、$connect/$disconnect/$defaultのリクエストを受け取って、実装済みのAction Cableともメッセージのやり取りが出来るように実装します。
フロントはNuxtで実装したので、次回、書きます。
先にまとめ
・リクエストオリジンのURLを制限するとセキュアに出来る。
但し、設定しても接続時しか取得できない。
・Action Cableでは購読チャネルに送れば終わりだけど、
API Gatewayでは接続しているコネクションIDに送る必要がある。
・Aws::ApiGatewayManagementApiを使用する。
・サーバー側で切断する場合は、レスポンス返せなくなるので、
非同期で少し待ってから実施した方が良さそう。
API Gatewayの設定
API Gateway(WebSocket API)の統合タイプHTTPを試す で設定した環境を使います。
追加でoriginもヘッダーに追加されるように設定して、Action Cableのホワイトリストチェック(allowed_request_origins)と同じようにリクエスト元サイトを制限するようにします。
$connect以外は設定しても空になります(接続時以外はsocket内の通信だからでしょうね)
% aws apigatewayv2 get-integrations --api-id etg4aa87d6 → $connectのIntegrationIdを確認 connectionIdは設定済みの為、originのみ URLは画面で変更した為、割愛 % aws apigatewayv2 update-integration --api-id etg4aa87d6 \ --integration-id t3j407f \ --request-parameters 'integration.request.header.origin'='context.origin' { "ConnectionType": "INTERNET", "IntegrationId": "t3j407f", "IntegrationMethod": "POST", "IntegrationType": "HTTP_PROXY", "IntegrationUri": "https://railsapp.nightonly.com/ws_apigateway/default.json", "PassthroughBehavior": "WHEN_NO_MATCH", "PayloadFormatVersion": "1.0", "RequestParameters": { "integration.request.header.origin": "context.origin", "integration.request.header.connectionId": "context.connectionId" }, "TimeoutInMillis": 29000 }
Gem追加
AWSのGem一覧は下記に記載されています。
今回は、Aws::ApiGatewayManagementApiを使用します。
GitHub – aws/aws-sdk-ruby: The official AWS SDK for Ruby.
Gemfile の最後に追加
# Use AWS SDK gem 'aws-sdk-apigatewaymanagementapi'
% bundle install
自動生成
リクエストを受け取るコントローラーとconnectionIdを保存するカラムを作る
% rails g controller ws_apigateway % rails g migration AddConnectionIdToWsTokens
メッセージ送信とコネクション削除を非同期で行う為、ジョブを用意
% rails g job message::broadcast % rails g job message::delete_connection
カラム追加
今回は、下記で作成したws_tokensにconnectionIdを保存するカラムを追加します。
WebSocket(Action Cable)の認証方法を考えて実装する
WebSocket(Action Cable)の認証等の結果通知方法を考えて実装する
db/migrate/20220312024426_add_connection_id_to_ws_tokens.rb
class AddConnectionIdToWsTokens < ActiveRecord::Migration[6.1] def change + change_column_comment :ws_tokens, :request_id, 'リクエストID(Action Cable)' + + add_column :ws_tokens, :connection_id, :string, comment: '接続ID(API Gateway)' + add_index :ws_tokens, :connection_id, unique: true, name: 'index_ws_tokens1' + add_index :ws_tokens, [:connection_id, :last_status, :user_id], name: 'index_ws_tokens2' end end
config/locales/ja.yml
ja: activerecord: attributes: ws_token: code: 'コード' user: 'ユーザー' auth_successed_at: '認証成功日時' last_auth_successed_at: '最終認証成功日時' last_status: '最終ステータス' - request_id: 'リクエストID' + request_id: 'リクエストID(Action Cable)' + connection_id: '接続ID(API Gateway)'
ルート追加
config/routes.rb
Rails.application.routes.draw do + draw :ws_apigateway
config/routes/ws_apigateway.rb を作成
Rails.application.routes.draw do defaults format: :json do post 'ws_apigateway/connect', to: 'ws_apigateway#connect', as: 'connect_ws_apigateway' post 'ws_apigateway/disconnect', to: 'ws_apigateway#disconnect', as: 'disconnect_ws_apigateway' post 'ws_apigateway/default', to: 'ws_apigateway#default', as: 'default_ws_apigateway' end end
コントローラーを実装
app/controllers/ws_apigateway_controller.rb
class WsApigatewayController < ApplicationController include MessageConcern
ConcernでAction Cableと処理を共通化
skip_before_action :verify_authenticity_token
CSRFトークンチェックをスキップ
before_action :check_origin, only: %i[connect] # Tips: 設定しても接続時しか取得できない before_action :check_connectionid # Tips: API Gatewayでrequest-parametersの設定必要 before_action :auth_request, only: %i[default] # POST /ws_apigateway/connect(.json) WebSocket接続(処理) def connect logger.debug('==== connect') render json: { action: 'connect', success: true } end # POST /ws_apigateway/disconnect(.json) WebSocket切断(処理) def disconnect logger.debug('==== disconnect') ws_token = WsToken.find_by(connection_id: @connection_id) return render json: { action: 'disconnect', success: true, alert: t('alert.ws_apigateway.connection_id.notfound') } if ws_token.blank? # Tips: 成功を返す ws_token.update!(last_status: 'unsubscribed') render json: { action: 'disconnect', success: true } end # POST /ws_apigateway/default(.json) WebSocketアクション(処理) def default logger.debug("==== default: #{params['method']}") case params['method'] when 'auth_request' render json: { action: 'auth_result', success: true, data: params } when 'get_messages' common_get_messages(params, nil, @connection_id) when 'send_message' common_send_message(params, nil, @connection_id) else head :not_found end end private # オリジンURLチェック def check_origin logger.debug("==== check_origin: #{request.headers[:HTTP_ORIGIN]}") origin = request.headers[:HTTP_ORIGIN] return render './failure', locals: { alert: t('alert.ws_apigateway.origin.blank') }, status: :bad_request if origin.blank? return render './failure', locals: { alert: t('alert.ws_apigateway.origin.invalid') }, status: :unprocessable_entity unless websocket_allowed(origin) end # オリジンが許可されているか返却 def websocket_allowed(origin) Array(Settings['websocket_allowed']).any? { |allowed_origin| allowed_origin.match?(origin) } end
Gemのソースを「allowed_origin === origin」を拝借しましたが、Rubocopに怒られたので「allowed_origin.match?(origin)」に変更。
# コネクションIDチェック def check_connectionid logger.debug("==== check_connectionid: #{request.headers[:HTTP_CONNECTIONID]}") @connection_id = request.headers[:HTTP_CONNECTIONID] return render './failure', locals: { alert: t('alert.ws_apigateway.connection_id.blank') }, status: :bad_request if @connection_id.blank? return render './failure', locals: { alert: t('alert.ws_apigateway.connection_id.invalid') }, status: :unprocessable_entity if @connection_id.length > 255 end # 認証・再認証 def auth_request logger.debug('==== auth_request') @ws_token = WsToken.find_by(connection_id: @connection_id) common_auth_request(params, nil, @connection_id) end end
app/channels/message_channel.rbも変更していますが、割愛します。
コミットログを参照してください。
app/controllers/concerns/message_concern.rb を作成
module MessageConcern extend ActiveSupport::Concern private # 認証・再認証 def common_auth_request(data, connection, connection_id = nil) error, ws_token = check_token(data['token']) if error.present? ws_token = update_ws_token(ws_token, false, error, connection, connection_id) respons_data = { action: 'auth_result', success: false, alert: I18n.t("alert.ws_token.#{error}"), data: data } if connection.present? # Action Cableのみ ActionCable.server.broadcast(@session_channel, respons_data) elsif connection_id.present? # API Gatewayのみ render json: respons_data, status: :unprocessable_entity end Message::DeleteConnectionJob.set(wait: 5.seconds).perform_later(connection, connection_id)
直ぐに切断するとレスポンス返せなくなるので、非同期で少し待ってから切断。
else ws_token = update_ws_token(ws_token, true, 'success', connection, connection_id) if connection.present? # Action Cableのみ add_stream_channel(ws_token.user) if @ws_token.blank? ActionCable.server.broadcast(@session_channel, { action: 'auth_result', success: true, data: data }) end end logger.debug("---- error: #{error}, @ws_token: #{@ws_token.inspect} -> #{ws_token.inspect}") @ws_token = ws_token error.blank? end # メッセージ取得 def common_get_messages(data, connection, connection_id = nil) if data['last_id'].present? last_id = data['last_id'].to_i if last_id <= 0 respons_data = { action: 'get_messages', success: false, alert: I18n.t('alert.message.last_id.invalid'), data: data } if connection.present? # Action Cableのみ ActionCable.server.broadcast(@session_channel, respons_data) elsif connection_id.present? # API Gatewayのみ render json: respons_data, status: :bad_request end return end messages = Message.where(channel: subscribe_channels(@ws_token.user), deleted_at: nil).where(id: (last_id + 1)...) unsent_count = [messages.count - Settings['max_messages_limit'], 0].max messages = messages.eager_load(:user).limit(Settings['max_messages_limit']).reverse else limit = data['limit'].to_i limit = limit <= 0 ? Settings['default_messages_limit'] : [limit, Settings['max_messages_limit']].min messages = Message.where(channel: subscribe_channels(@ws_token.user), deleted_at: nil).eager_load(:user).limit(limit).reverse unsent_count = 0 end messages_data = [] messages.each do |message| messages_data.push(send_message_data(message.user, message)) end respons_data = { action: 'get_messages', success: true, messages: messages_data, unsent_count: unsent_count, data: data } if connection.present? # Action Cableのみ ActionCable.server.broadcast(@session_channel, respons_data) elsif connection_id.present? # API Gatewayのみ render json: respons_data end end # メッセージ送信(受信) def common_send_message(data, connection, connection_id = nil) channel = data['channel'] real_channel = send_real_channel(@ws_token.user, channel) if real_channel.blank? respons_data = { action: 'send_result', success: false, alert: I18n.t('alert.message.channel.not_subscribed'), data: data } if connection.present? # Action Cableのみ ActionCable.server.broadcast(@session_channel, respons_data) elsif connection_id.present? # API Gatewayのみ render json: respons_data end return end message = Message.new(channel: real_channel, body: data['body'], user_id: @ws_token.user_id) message.save! Message::BroadcastJob.perform_later(@ws_token.user, channel, real_channel, message) respons_data = { action: 'send_result', success: true, data: data } if connection.present? # Action Cableのみ ActionCable.server.broadcast(@session_channel, respons_data) elsif connection_id.present? # API Gatewayのみ render json: respons_data end end # トークンチェック def check_token(token) ws_token = nil error = nil if token.blank? error = 'blank' elsif @ws_token.present? && @ws_token.code != token error = 'different' else ws_token = WsToken.where(code: token).eager_load(:user).first if ws_token.blank? error = 'notfound' elsif ws_token.auth_successed_at.blank? && (ws_token.created_at < Time.current - Settings['token_expired_start_minutes'].to_i.minutes) error = 'expired_start' # Tips: 発行後、一定時間以内に開始してない場合は無効 elsif ws_token.created_at < Time.current - Settings['token_expired_hours'].to_i.hours error = 'expired_created' # Tips: 発行後、一定時間以上経過したら無効 end end [error, ws_token] end # トークン情報更新 def update_ws_token(ws_token, auth_success, status, connection, connection_id = nil) return if ws_token.blank? if auth_success ws_token.last_auth_successed_at = Time.current ws_token.auth_successed_at = ws_token.last_auth_successed_at if ws_token.auth_successed_at.blank? end ws_token.last_status = status if connection.present? # Action Cableのみ ws_token.request_id = connection.statistics[:request_id] ws_token.started_at = connection.statistics[:started_at] elsif connection_id.present? && @ws_token.blank? # API Gatewayで、初回のみ ws_token.connection_id = connection_id ws_token.started_at = Time.current end ws_token.save! ws_token end # 購読チャネル追加 def add_stream_channel(user) stream_from 'all_channel' stream_from real_user_channel(user) if user.present? end # 購読チャネル一覧を返却 def subscribe_channels(user) channels = ['all_channel'] channels.push(real_user_channel(user)) if user.present? channels end # 送信チャネルを返却 def send_real_channel(user, channel) return if user.blank? # Tips: 未ログインでは送信させない case channel when 'all_channel' channel when 'user_channel' real_user_channel(user) end end # ユーザーチャネルを返却 def real_user_channel(user) user.present? ? "user_channel_#{user.code}" : nil end # 送信対象のWebSocketトークンを返却 def send_ws_tokens(user, channel) case channel when 'all_channel' WsToken.where.not(connection_id: nil).where(last_status: 'success') when 'user_channel' WsToken.where.not(connection_id: nil).where(last_status: 'success').where(user_id: user.id) end end # 送信メッセージを返却 def send_message_data(user, message) { id: message.id, channel: message.channel, channel_i18n: message.channel_i18n, body: message.body, user: { code: user.code, image_url: { mini: "#{Settings['base_image_url']}#{user.image_url(:mini)}", small: "#{Settings['base_image_url']}#{user.image_url(:small)}", medium: "#{Settings['base_image_url']}#{user.image_url(:medium)}", large: "#{Settings['base_image_url']}#{user.image_url(:large)}", xlarge: "#{Settings['base_image_url']}#{user.image_url(:xlarge)}" }, name: user.name }, created_at: message.created_at, created_at_i18n: I18n.l(message.created_at) } end end
ジョブを実装
app/jobs/message/broadcast_job.rb
class Message::BroadcastJob < ApplicationJob + include MessageConcern
ジョブからもコントローラーのConcernが呼べました。
queue_as :default - def perform(*args) - # Do something later + # メッセージ送信 + def perform(user, channel, real_channel, message) + logger.debug("==== Message::BroadcastJob: #{user.id}, #{channel}, #{real_channel}, #{message}") + data = { action: 'send_message', success: true, messages: [send_message_data(user, message)] } + + # Action Cable # Tips: 購読チャネルに送る + ActionCable.server.broadcast(real_channel, data) + + # API Gateway # Tips: 接続しているコネクションIDに送る + ws_tokens = send_ws_tokens(user, channel) + logger.debug("---- ws_tokens: #{ws_tokens&.ids}") + return if ws_tokens.blank? + + data_json = data.to_json + api_gw = Aws::ApiGatewayManagementApi::Client.new(region: Settings['ws_apigateway_region'], endpoint: Settings['ws_apigateway_endpoint']) + ws_tokens.each do |ws_token| + api_gw.post_to_connection({ connection_id: ws_token.connection_id, data: data_json }) + end end end
Action Cableでの接続でも、API Gatewayでの接続でも送れるように実装
config/settings/development.yml に追加
+ ws_apigateway_region: 'ap-northeast-1' + ws_apigateway_endpoint: 'https://(api-id).execute-api.ap-northeast-1.amazonaws.com/(stage)'
(api-id)と(stage)は環境に合わせて変更してください。regionも
app/jobs/message/delete_connection_job.rb
class Message::DeleteConnectionJob < ApplicationJob - queue_as :default + queue_as :low_priority
切断は急がなないので、lowに下げる。
- def perform(*args) - # Do something later + # WebSocket切断 # Tips: フロント側でリトライされないようにする必要がある + def perform(connection, connection_id) + logger.debug("==== Message::DeleteConnectionJob: #{connection}, #{connection_id}") + + if connection.present? # Action Cableのみ + connection.close + elsif connection_id.present? # API Gatewayのみ + api_gw = Aws::ApiGatewayManagementApi::Client.new(region: Settings['ws_apigateway_region'], endpoint: Settings['ws_apigateway_endpoint']) + api_gw.delete_connection({ connection_id: connection_id }) + end end end
“API Gateway(WebSocket API)のバックエンドをRailsで実装する” に対して1件のコメントがあります。