技術メモ

プログラミングとか電子工作とか

Phoenix入門 (第10章 Channels その2 リアルタイムチャット機能の実装)

f:id:ysmn_deus:20190219130922p:plain

どうも、靖宗です。 前回の続きでChannelですが、今回はおそらくサンプルコードを動かすお話。

Tying it all together

とりあえず試してみようぜ!ということで、今までのコードはゴチャゴチャしてますし一旦作り直します。
地味に色々見て下さっている方だとお気づきかもしれませんが、サンプルはhelloというプロジェクト名で自分はhello_phoenixになってました。
なのでサンプルどおりhelloで作成します。

PS > mix phx.new hello
* creating hello/config/config.exs
...

例のごとくconfig/dev.exsを編集します。たぶんデータベース名が異なるから同じdokcerで動いてるPostgreSQL使える筈。

...
# Configure your database
config :hello, Hello.Repo,
  username: "elixir",
  password: "elixir",
  database: "hello_dev",
  hostname: "localhost",
  pool_size: 10
...

ecto.createする。

PS \hello> mix ecto.create
Compiling 13 files (.ex)
Generated hello app
The database for Hello.Repo has been created

一応サーバーが立ち上がるかチェック。

PS \hello> mix phx.server
[warn] Phoenix is unable to create symlinks. Phoenix' ...

http://localhost:4000/にアクセス。

f:id:ysmn_deus:20190315115753p:plain

問題ナシ!

ここからようやくChannelのサンプルに戻ります。
lib/hello_web/endpoint.exを確認します。(編集は不要だと思います。)

defmodule HelloWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :hello

  socket "/socket", HelloWeb.UserSocket
  ...
end

初期設定でいいはず。
ここでlib/hello_web/channels/user_socket.exを編集します。といっても今回はコメントを外すだけ。

defmodule HelloWeb.UserSocket do
  use Phoenix.Socket

  ## Channels
  channel "room:*", HelloWeb.RoomChannel # ここのコメントを外す。
  ...

ここまででルーティングまでが終わる。簡単!
次はHelloWeb.RoomChannelを作って行きます。

Joining Channels

HelloWeb.RoomChannelを定義するためにlib/hello_web/channels/room_channel.exを作成します。
まずは認証の為にjoin/3を実装します。

defmodule HelloWeb.RoomChannel do
  use Phoenix.Channel

  def join("room:lobby", _message, socket) do
    {:ok, socket}
  end
  def join("room:" <> _private_room_id, _params, _socket) do
    {:error, %{reason: "unauthorized"}}
  end
end

"room:lobby"というトピックへは無条件に誰でも入室できるイメージです。
その他のトピック("room:hoge"など)は認証が必要な想定です。今はとりあえずエラーを返すようにしているのでしょう。

認証が成功したら{:ok, socket}または{:ok, reply, socket}を返し、アクセス拒否する場合は{:error, reply}を返すように実装します。
この辺はPhoenix.TokenというPhoenixの認証機能の仕様みたいです。詳しく読んでいきたい所ですが、今回はサンプルを動かすことに徹します。

webpackを有効にして(デフォルトで有効です)Phoenixプロジェクトを作成していればassets/js/socket.jsJavaScriptのクライアントが実装されているようです。ありがたや。
これをトピック"room:lobby"を利用するように編集します。

// assets/js/socket.js
...
socket.connect()

// Now that you are connected, you can join channels with a topic:
let channel = socket.channel("room:lobby", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

このクライアントを読み込むようにassets/js/app.jsでインポートするように変更します。
コメント上にsocket.jsをインポートする文が記載されているのでコメントアウトで対応します。

...
// Import local files
//
// Local files can be imported directly using relative paths, for example:
import socket from "./socket"

ここまで完了すればあとはライブリロードでロードされます。
lib/hello_web/templates/layout/app.html.eexに既にassets/js/app.jsをロードする文言が記載されているので、とりあえずこれだけでOKです。
開発者ツールなどを開いてコンソールを確認します。

f:id:ysmn_deus:20190315122443p:plain

Joined successfullyと表示されていれば動作しているということでしょう。

とはいえこんなので「Channel分かった」とは全くもって思いません。
実際にやりとりするところを実装していきます。lib/hello_web/templates/page/index.html.eexを編集していきます。

<div id="messages"></div>
<input id="chat-input" type="text"></input>

他の文言は邪魔なだけですし上記のコードのみで良いと思います。
次にassets/js/socket.jsを編集していきます。

...
// Finally, connect to the socket:
socket.connect()

// Now that you are connected, you can join channels with a topic:
let channel = socket.channel("room:lobby", {})
let chatInput = document.querySelector("#chat-input"); // #chat-inputの要素を取り出す
let messagesContainer = document.querySelector("#messages"); // #messagesの要素を取り出す

chatInput.addEventListener("keypress", event => {
  if(event.keyCode === 13){ // エンターコードが押されたら
    channel.push("new_msg", {body: chatInput.value}); // 接続しているトピックへ文字列を送信
    chatInput.value = ""; // 入力欄を空にする
  }
});

channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

まずはEnterを押したら入力された文字をトピックにプッシュ(送信)するイベントを作成しました。
次にchannelに"new_msg"というイベントが発生したら実行するコールバック関数を追加する処理を書いておきます。

...
let channel           = socket.channel("room:lobby", {})
let chatInput         = document.querySelector("#chat-input")
let messagesContainer = document.querySelector("#messages")

chatInput.addEventListener("keypress", event => {
  if(event.keyCode === 13){
    channel.push("new_msg", {body: chatInput.value})
    chatInput.value = ""
  }
})

channel.on("new_msg", payload => { // "new_msg"というイベントが発生したら
  let messageItem = document.createElement("li") // リスト要素を作成
  messageItem.innerText = `[${Date()}] ${payload.body}` // テキストデータを作成。[日付] 本文
  messagesContainer.appendChild(messageItem) // messageItemに要素を追加
})

channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

ここまでできたらJSの実装は終わりです。
あとはChannel側の実装です。

Incoming Events

pushされてくるイベントを受けるときはhandle_in/3を実装します。
今回は"new_msg"というイベントを受けたらトピックにブロードキャスト(全体発信)する処理にします。
lib/hello_web/channels/room_channel.exを編集します。

defmodule HelloWeb.RoomChannel do
    use Phoenix.Channel
  
    def join("room:lobby", _message, socket) do
      {:ok, socket}
    end
    def join("room:" <> _private_room_id, _params, _socket) do
      {:error, %{reason: "unauthorized"}}
    end
    
    def handle_in("new_msg", %{"body" => body}, socket) do
      broadcast!(socket, "new_msg", %{body: body})
      {:noreply, socket}
    end
  end

broadcast!/3関数は指定したソケットのトピックに接続している全てのクライアントに情報を発信し、全てのクライアントのコールバック関数handle_out/3を実行させます。
ここまでで一通りの実装が完了しました。実際にブラウザを2個立ち上げて動作チェックしてみます。

f:id:ysmn_deus:20190315130710g:plain

(´◔౪◔)۶ヨッシャ!
こんなに簡単にリアルタイムで更新されていくチャットが作成できるなんて感無量ですね。(もちろん、ElixirだけでなくJavaScriptの恩恵も他大に受けてますが)

先ほどスルーしたhandle_out/3で様々な情報の加工やフィルタリングを実装できるようです。今回は実装しませんが、重要な機能のようなので一応確認します。

Intercepting Outgoing Events

イベントが発生した際にフィルタリングしたりなにがしかの処理を挟みたい時なんかがあると思います。
そういう際にはhandle_out/3を利用して処理を定義します。
下記の例はユーザーがチャットルームに入ってきた際に発行されるイベントを想定しており、既にアサインしているユーザー(接続が切れて再接続したとか?)は「入室しました」とかメッセージが出ないようにするといったところでしょうか。

intercept ["user_joined"]

def handle_out("user_joined", msg, socket) do
  if Accounts.ignoring_user?(socket.assigns[:user], msg.user_id) do
    {:noreply, socket}
  else
    push(socket, "user_joined", msg)
    {:noreply, socket}
  end
end

最初にintercept/1handle_out/3で加工したりするイベントを定義しておきます。
イベントが発行されたらユーザーがアサインされているかどうかを判定します。もしされていれば何も処理をせず、されていなければソケットに接続しているクライアントにpush/3でメッセージを送信します。
このhandle_out/3はイベント名でパターンマッチされるとはいえイベント毎に実行されます。なのでデータベースに接続して情報を取得する、などの負荷の高い処理をここに書くかどうかは慎重に吟味する必要がありそうです。

Socket Assigns

%Plug.Conn{}のようにチャンネルのソケットに値を追加する(アサインする)ことができるそうです。
Phoenix.Socket.assign/3で登録します。

socket = assign(socket, :user, msg["user"])

socket.assignsというマップとしてソケットに値が保存されます。

Using Token Authentication

ユーザー認証などが必要なケースが大半だと思います。
PhoenixではPhoenix.Tokenを利用することで4ステップでchannelのユーザー認証を実装できるそうです。
実際に実装して動作確認はしませんが、一応一通り見ておきます。

Step 1 - Assign a Token in the Connection

OurAuthという認証のプラグを作成したと仮定します。このプラグではユーザーが認証に通れば%Plug.Conn{}:current_userというキーでユーザー情報をアサインするとします。
このプラグを通過した接続情報に:current_userが存在すればPhoenix.Tokenを利用してトークンを発行し接続情報にトークンを追加する、そうでなければ接続情報には何も入れ無いという処理をプラグとして書きます。
これをput_user_token/2という関数プラグで書いたとします。
上記を踏まえてルーターを実装します。

pipeline :browser do
  ...
  plug OurAuth
  plug :put_user_token
end

defp put_user_token(conn, _) do
  if current_user = conn.assigns[:current_user] do
    token = Phoenix.Token.sign(conn, "user socket", current_user.id)
    assign(conn, :user_token, token)
  else
    conn
  end
end

パイプライン:browserを通過して認証が得られていればcurrent_useruser_tokenが接続情報conn.assignsに含まれます。

Step 2 - Pass the Token to the JavaScript

JavaScriptにユーザートークンを渡すためにテンプレート(レイアウト)にトークン情報を乗せます。
hello_web/templates/layout/app.html.eexなどでしょうか。

<script>window.userToken = "<%= assigns[:user_token] %>";</script>
<script src="<%= Routes.static_path(@conn, "/js/app.js") %>"></script>

JavaScriptに直接渡したい所ですが、テンプレートはレンダリングされてメモリ上で管理され、その後HTTPレスポンスとしてブラウザに渡されます。JSはブラウザに渡された後に静的ファイルとして配信されブラウザ上で統合されるのでテンプレートでレンダリングしておいてブラウザ側で取り組むようにしなければなりませんのでこのような処置になっているのでしょう。

Step 3 - Pass the Token to the Socket Constructor and Verify

パラメータとしてトークンが渡されることが想定されます。なのでhello_web/channels/user_socket.exconnect/3関数でトークン情報"token"を受け取り、評価する処置を実装します。
デフォルトではdef connect(_params, socket, _connect_info)になっていますが、この_paramsにパラメータが放り込まれる想定です。

def connect(%{"token" => token}, socket, _connect_info) do
  # max_age: 1209600 is equivalent to two weeks in seconds
  case Phoenix.Token.verify(socket, "user socket", token, max_age: 1209600) do
    {:ok, user_id} ->
      {:ok, assign(socket, :current_user, user_id)}
    {:error, reason} ->
      :error
  end
end

この処置でトークンからユーザーIDが得られます。channelで使用するsocketにユーザーIDをアサインして完了です。
ブラウザ上でロードされてるJavaScript(本章のサンプルで言えばsocket.js)ではトークンをソケットに乗せます。

let socket = new Socket("/socket", {params: {token: window.userToken}})

この辺はデフォルトで用意されてるあたり、Phoenixではこういう想定なんでしょう。

Step 4 - Connect to the socket in JavaScript

ここまで来たら通常のchannelと変わりません。コネクトして使うのみです。

let socket = new Socket("/socket", {params: {token: window.userToken}})
socket.connect()

let channel = socket.channel("topic:subtopic", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

認証あたりはどっかで試してみたいですね。

Handling Reconnection

クライアントがトピックへサブスクライブした(メッセージを送った)情報はメモリ上にETSテーブルとして保管されています。なんらかの原因でチャンネルがクラッシュした時にはクライアントはサブスクライブしていたトピックへ接続し直す必要があります。
JavaScriptのChannelライブラリにはこのへんの機能が備わっており、サーバー側でChannelがクラッシュしたらChannel.onErrorというコールバックが実行されるようです。この辺を利用してクライアントに再接続させる処理を書くのが良いでしょう。
JavaScriptのライブラリにしか言及がありませんが、たぶんサードパーティのライブラリにも同様の機能が実装されているとは思います。

Resending Client Messages

PhoenixのChannelのクライアントはメッセージ送信時にメッセージをバッファに入れてから送信しているようです。もし接続ができていない、なんか知らんけど送れてないとかある場合はバッファに記録したままタイムアウトを待つ挙動だそうです。(たぶんタイムアウト時間も設定できる)
ブラウザを閉じない限りはこのバッファは継続しているようです。

Resending Server Messages

サーバー側から発行するメッセージは基本的にバッファに溜めたり再送されたりすることは無いそうです。
この辺を保証するようなシステムを作成するには自分たちでその辺のプロトコルを実装する必要がありそうです。

Presence

たぶんある情報をクラスターやサーバー-クライアント間で一意に保つ機能だと思います。

コレに関しては章が分かれてるのでそちらで詳しく見ていきます。

Example Application

たぶんPresenceのサンプル。

phoenixchat.herokuapp.com

↑のソースコード

github.com