統合タイプHTTPで指定したURLで、$connect/$disconnect/$defaultのリクエストを受け取って、実装済みのAction Cableともメッセージのやり取りが出来るように実装します。
フロントはNuxtで実装したので、次回、書きます。

先にまとめ

・リクエストオリジンのURLを制限するとセキュアに出来る。
 但し、設定しても接続時しか取得できない。
・Action Cableでは購読チャネルに送れば終わりだけど、
 API Gatewayでは接続しているコネクションIDに送る必要がある。
・Aws::ApiGatewayManagementApiを使用する。
・サーバー側で切断する場合は、レスポンス返せなくなるので、
 非同期で少し待ってから実施した方が良さそう。

API Gatewayの設定

API Gateway(WebSocket API)の統合タイプHTTPを試す で設定した環境を使います。
追加でoriginもヘッダーに追加されるように設定して、Action Cableのホワイトリストチェック(allowed_request_origins)と同じようにリクエスト元サイトを制限するようにします。
$connect以外は設定しても空になります(接続時以外はsocket内の通信だからでしょうね)

% aws apigatewayv2 get-integrations --api-id etg4aa87d6
→ $connectのIntegrationIdを確認

connectionIdは設定済みの為、originのみ
URLは画面で変更した為、割愛
% aws apigatewayv2 update-integration --api-id etg4aa87d6 \
    --integration-id t3j407f \
    --request-parameters 'integration.request.header.origin'='context.origin'
{
    "ConnectionType": "INTERNET",
    "IntegrationId": "t3j407f",
    "IntegrationMethod": "POST",
    "IntegrationType": "HTTP_PROXY",
    "IntegrationUri": "https://railsapp.nightonly.com/ws_apigateway/default.json",
    "PassthroughBehavior": "WHEN_NO_MATCH",
    "PayloadFormatVersion": "1.0",
    "RequestParameters": {
        "integration.request.header.origin": "context.origin",
        "integration.request.header.connectionId": "context.connectionId"
    },
    "TimeoutInMillis": 29000
}

Gem追加

AWSのGem一覧は下記に記載されています。
今回は、Aws::ApiGatewayManagementApiを使用します。
GitHub – aws/aws-sdk-ruby: The official AWS SDK for Ruby.

Gemfile の最後に追加

# Use AWS SDK
gem 'aws-sdk-apigatewaymanagementapi'
% bundle install

自動生成

リクエストを受け取るコントローラーとconnectionIdを保存するカラムを作る

% rails g controller ws_apigateway
% rails g migration AddConnectionIdToWsTokens

メッセージ送信とコネクション削除を非同期で行う為、ジョブを用意

% rails g job message::broadcast
% rails g job message::delete_connection

カラム追加

今回は、下記で作成したws_tokensにconnectionIdを保存するカラムを追加します。
WebSocket(Action Cable)の認証方法を考えて実装する
WebSocket(Action Cable)の認証等の結果通知方法を考えて実装する

db/migrate/20220312024426_add_connection_id_to_ws_tokens.rb

class AddConnectionIdToWsTokens < ActiveRecord::Migration[6.1]
  def change
+    change_column_comment :ws_tokens, :request_id, 'リクエストID(Action Cable)'
+
+    add_column :ws_tokens, :connection_id, :string, comment: '接続ID(API Gateway)'
+    add_index :ws_tokens, :connection_id, unique: true, name: 'index_ws_tokens1'
+    add_index :ws_tokens, [:connection_id, :last_status, :user_id], name: 'index_ws_tokens2'
  end
end

config/locales/ja.yml

ja:
  activerecord:
    attributes:
      ws_token:
        code: 'コード'
        user: 'ユーザー'
        auth_successed_at: '認証成功日時'
        last_auth_successed_at: '最終認証成功日時'
        last_status: '最終ステータス'
-        request_id: 'リクエストID'
+        request_id: 'リクエストID(Action Cable)'
+        connection_id: '接続ID(API Gateway)'

ルート追加

config/routes.rb

Rails.application.routes.draw do
+  draw :ws_apigateway

config/routes/ws_apigateway.rb を作成

Rails.application.routes.draw do
  defaults format: :json do
    post 'ws_apigateway/connect',    to: 'ws_apigateway#connect',    as: 'connect_ws_apigateway'
    post 'ws_apigateway/disconnect', to: 'ws_apigateway#disconnect', as: 'disconnect_ws_apigateway'
    post 'ws_apigateway/default',    to: 'ws_apigateway#default',    as: 'default_ws_apigateway'
  end
end

コントローラーを実装

app/controllers/ws_apigateway_controller.rb

class WsApigatewayController < ApplicationController
  include MessageConcern

ConcernでAction Cableと処理を共通化

  skip_before_action :verify_authenticity_token

CSRFトークンチェックをスキップ

  before_action :check_origin, only: %i[connect] # Tips: 設定しても接続時しか取得できない
  before_action :check_connectionid # Tips: API Gatewayでrequest-parametersの設定必要
  before_action :auth_request, only: %i[default]

  # POST /ws_apigateway/connect(.json) WebSocket接続(処理)
  def connect
    logger.debug('==== connect')
    render json: { action: 'connect', success: true }
  end

  # POST /ws_apigateway/disconnect(.json) WebSocket切断(処理)
  def disconnect
    logger.debug('==== disconnect')

    ws_token = WsToken.find_by(connection_id: @connection_id)
    return render json: { action: 'disconnect', success: true, alert: t('alert.ws_apigateway.connection_id.notfound') } if ws_token.blank? # Tips: 成功を返す

    ws_token.update!(last_status: 'unsubscribed')
    render json: { action: 'disconnect', success: true }
  end

  # POST /ws_apigateway/default(.json) WebSocketアクション(処理)
  def default
    logger.debug("==== default: #{params['method']}")

    case params['method']
    when 'auth_request'
      render json: { action: 'auth_result', success: true, data: params }
    when 'get_messages'
      common_get_messages(params, nil, @connection_id)
    when 'send_message'
      common_send_message(params, nil, @connection_id)
    else
      head :not_found
    end
  end

  private

  # オリジンURLチェック
  def check_origin
    logger.debug("==== check_origin: #{request.headers[:HTTP_ORIGIN]}")

    origin = request.headers[:HTTP_ORIGIN]
    return render './failure', locals: { alert: t('alert.ws_apigateway.origin.blank') }, status: :bad_request if origin.blank?
    return render './failure', locals: { alert: t('alert.ws_apigateway.origin.invalid') }, status: :unprocessable_entity unless websocket_allowed(origin)
  end

  # オリジンが許可されているか返却
  def websocket_allowed(origin)
    Array(Settings['websocket_allowed']).any? { |allowed_origin| allowed_origin.match?(origin) }
  end

Gemのソースを「allowed_origin === origin」を拝借しましたが、Rubocopに怒られたので「allowed_origin.match?(origin)」に変更。

  # コネクションIDチェック
  def check_connectionid
    logger.debug("==== check_connectionid: #{request.headers[:HTTP_CONNECTIONID]}")

    @connection_id = request.headers[:HTTP_CONNECTIONID]
    return render './failure', locals: { alert: t('alert.ws_apigateway.connection_id.blank') }, status: :bad_request if @connection_id.blank?
    return render './failure', locals: { alert: t('alert.ws_apigateway.connection_id.invalid') }, status: :unprocessable_entity if @connection_id.length > 255
  end

  # 認証・再認証
  def auth_request
    logger.debug('==== auth_request')

    @ws_token = WsToken.find_by(connection_id: @connection_id)
    common_auth_request(params, nil, @connection_id)
  end
end

app/channels/message_channel.rbも変更していますが、割愛します。
コミットログを参照してください。

app/controllers/concerns/message_concern.rb を作成

module MessageConcern
  extend ActiveSupport::Concern

  private

  # 認証・再認証
  def common_auth_request(data, connection, connection_id = nil)
    error, ws_token = check_token(data['token'])
    if error.present?
      ws_token = update_ws_token(ws_token, false, error, connection, connection_id)

      respons_data = { action: 'auth_result', success: false, alert: I18n.t("alert.ws_token.#{error}"), data: data }
      if connection.present? # Action Cableのみ
        ActionCable.server.broadcast(@session_channel, respons_data)
      elsif connection_id.present? # API Gatewayのみ
        render json: respons_data, status: :unprocessable_entity
      end
      Message::DeleteConnectionJob.set(wait: 5.seconds).perform_later(connection, connection_id)

直ぐに切断するとレスポンス返せなくなるので、非同期で少し待ってから切断。

    else
      ws_token = update_ws_token(ws_token, true, 'success', connection, connection_id)

      if connection.present? # Action Cableのみ
        add_stream_channel(ws_token.user) if @ws_token.blank?
        ActionCable.server.broadcast(@session_channel, { action: 'auth_result', success: true, data: data })
      end
    end

    logger.debug("---- error: #{error}, @ws_token: #{@ws_token.inspect} -> #{ws_token.inspect}")
    @ws_token = ws_token

    error.blank?
  end

  # メッセージ取得
  def common_get_messages(data, connection, connection_id = nil)
    if data['last_id'].present?
      last_id = data['last_id'].to_i
      if last_id <= 0
        respons_data = { action: 'get_messages', success: false, alert: I18n.t('alert.message.last_id.invalid'), data: data }
        if connection.present? # Action Cableのみ
          ActionCable.server.broadcast(@session_channel, respons_data)
        elsif connection_id.present? # API Gatewayのみ
          render json: respons_data, status: :bad_request
        end

        return
      end

      messages = Message.where(channel: subscribe_channels(@ws_token.user), deleted_at: nil).where(id: (last_id + 1)...)
      unsent_count = [messages.count - Settings['max_messages_limit'], 0].max
      messages = messages.eager_load(:user).limit(Settings['max_messages_limit']).reverse
    else
      limit = data['limit'].to_i
      limit = limit <= 0 ? Settings['default_messages_limit'] : [limit, Settings['max_messages_limit']].min
      messages = Message.where(channel: subscribe_channels(@ws_token.user), deleted_at: nil).eager_load(:user).limit(limit).reverse
      unsent_count = 0
    end

    messages_data = []
    messages.each do |message|
      messages_data.push(send_message_data(message.user, message))
    end
    respons_data = { action: 'get_messages', success: true, messages: messages_data, unsent_count: unsent_count, data: data }
    if connection.present? # Action Cableのみ
      ActionCable.server.broadcast(@session_channel, respons_data)
    elsif connection_id.present? # API Gatewayのみ
      render json: respons_data
    end
  end

  # メッセージ送信(受信)
  def common_send_message(data, connection, connection_id = nil)
    channel = data['channel']
    real_channel = send_real_channel(@ws_token.user, channel)
    if real_channel.blank?
      respons_data = { action: 'send_result', success: false, alert: I18n.t('alert.message.channel.not_subscribed'), data: data }
      if connection.present? # Action Cableのみ
        ActionCable.server.broadcast(@session_channel, respons_data)
      elsif connection_id.present? # API Gatewayのみ
        render json: respons_data
      end

      return
    end

    message = Message.new(channel: real_channel, body: data['body'], user_id: @ws_token.user_id)
    message.save!

    Message::BroadcastJob.perform_later(@ws_token.user, channel, real_channel, message)
    respons_data = { action: 'send_result', success: true, data: data }
    if connection.present? # Action Cableのみ
      ActionCable.server.broadcast(@session_channel, respons_data)
    elsif connection_id.present? # API Gatewayのみ
      render json: respons_data
    end
  end

  # トークンチェック
  def check_token(token)
    ws_token = nil
    error = nil

    if token.blank?
      error = 'blank'
    elsif @ws_token.present? && @ws_token.code != token
      error = 'different'
    else
      ws_token = WsToken.where(code: token).eager_load(:user).first
      if ws_token.blank?
        error = 'notfound'
      elsif ws_token.auth_successed_at.blank? && (ws_token.created_at < Time.current - Settings['token_expired_start_minutes'].to_i.minutes)
        error = 'expired_start' # Tips: 発行後、一定時間以内に開始してない場合は無効
      elsif ws_token.created_at < Time.current - Settings['token_expired_hours'].to_i.hours
        error = 'expired_created' # Tips: 発行後、一定時間以上経過したら無効
      end
    end

    [error, ws_token]
  end

  # トークン情報更新
  def update_ws_token(ws_token, auth_success, status, connection, connection_id = nil)
    return if ws_token.blank?

    if auth_success
      ws_token.last_auth_successed_at = Time.current
      ws_token.auth_successed_at = ws_token.last_auth_successed_at if ws_token.auth_successed_at.blank?
    end
    ws_token.last_status = status
    if connection.present? # Action Cableのみ
      ws_token.request_id = connection.statistics[:request_id]
      ws_token.started_at = connection.statistics[:started_at]
    elsif connection_id.present? && @ws_token.blank? # API Gatewayで、初回のみ
      ws_token.connection_id = connection_id
      ws_token.started_at = Time.current
    end
    ws_token.save!

    ws_token
  end

  # 購読チャネル追加
  def add_stream_channel(user)
    stream_from 'all_channel'
    stream_from real_user_channel(user) if user.present?
  end

  # 購読チャネル一覧を返却
  def subscribe_channels(user)
    channels = ['all_channel']
    channels.push(real_user_channel(user)) if user.present?

    channels
  end

  # 送信チャネルを返却
  def send_real_channel(user, channel)
    return if user.blank? # Tips: 未ログインでは送信させない

    case channel
    when 'all_channel'
      channel
    when 'user_channel'
      real_user_channel(user)
    end
  end

  # ユーザーチャネルを返却
  def real_user_channel(user)
    user.present? ? "user_channel_#{user.code}" : nil
  end

  # 送信対象のWebSocketトークンを返却
  def send_ws_tokens(user, channel)
    case channel
    when 'all_channel'
      WsToken.where.not(connection_id: nil).where(last_status: 'success')
    when 'user_channel'
      WsToken.where.not(connection_id: nil).where(last_status: 'success').where(user_id: user.id)
    end
  end

  # 送信メッセージを返却
  def send_message_data(user, message)
    {
      id: message.id,
      channel: message.channel,
      channel_i18n: message.channel_i18n,
      body: message.body,
      user: {
        code: user.code,
        image_url: {
          mini: "#{Settings['base_image_url']}#{user.image_url(:mini)}",
          small: "#{Settings['base_image_url']}#{user.image_url(:small)}",
          medium: "#{Settings['base_image_url']}#{user.image_url(:medium)}",
          large: "#{Settings['base_image_url']}#{user.image_url(:large)}",
          xlarge: "#{Settings['base_image_url']}#{user.image_url(:xlarge)}"
        },
        name: user.name
      },
      created_at: message.created_at,
      created_at_i18n: I18n.l(message.created_at)
    }
  end
end

ジョブを実装

app/jobs/message/broadcast_job.rb

class Message::BroadcastJob < ApplicationJob
+  include MessageConcern

ジョブからもコントローラーのConcernが呼べました。

  queue_as :default

-  def perform(*args)
-    # Do something later
+  # メッセージ送信
+  def perform(user, channel, real_channel, message)
+    logger.debug("==== Message::BroadcastJob: #{user.id}, #{channel}, #{real_channel}, #{message}")
+    data = { action: 'send_message', success: true, messages: [send_message_data(user, message)] }
+
+    # Action Cable # Tips: 購読チャネルに送る
+    ActionCable.server.broadcast(real_channel, data)
+
+    # API Gateway # Tips: 接続しているコネクションIDに送る
+    ws_tokens = send_ws_tokens(user, channel)
+    logger.debug("---- ws_tokens: #{ws_tokens&.ids}")
+    return if ws_tokens.blank?
+
+    data_json = data.to_json
+    api_gw = Aws::ApiGatewayManagementApi::Client.new(region: Settings['ws_apigateway_region'], endpoint: Settings['ws_apigateway_endpoint'])
+    ws_tokens.each do |ws_token|
+      api_gw.post_to_connection({ connection_id: ws_token.connection_id, data: data_json })
+    end
  end
end

Action Cableでの接続でも、API Gatewayでの接続でも送れるように実装

config/settings/development.yml に追加

+ ws_apigateway_region: 'ap-northeast-1'
+ ws_apigateway_endpoint: 'https://(api-id).execute-api.ap-northeast-1.amazonaws.com/(stage)'

(api-id)と(stage)は環境に合わせて変更してください。regionも

app/jobs/message/delete_connection_job.rb

class Message::DeleteConnectionJob < ApplicationJob
-  queue_as :default
+  queue_as :low_priority

切断は急がなないので、lowに下げる。

-  def perform(*args)
-    # Do something later
+  # WebSocket切断 # Tips: フロント側でリトライされないようにする必要がある
+  def perform(connection, connection_id)
+    logger.debug("==== Message::DeleteConnectionJob: #{connection}, #{connection_id}")
+
+    if connection.present? # Action Cableのみ
+      connection.close
+    elsif connection_id.present? # API Gatewayのみ
+      api_gw = Aws::ApiGatewayManagementApi::Client.new(region: Settings['ws_apigateway_region'], endpoint: Settings['ws_apigateway_endpoint'])
+      api_gw.delete_connection({ connection_id: connection_id })
+    end
  end
end

今回のコミット内容
https://dev.azure.com/nightonly/rails-app-origin/_git/rails-app-origin/commit/8e180c00e825445e5f6cb55b6d9a3fd4f0822464

API Gateway(WebSocket API)のバックエンドをRailsで実装する” に対して1件のコメントがあります。

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です