どうも、靖宗です。
今回も前回の引き続きとなります。
今回は前回用意したKVServer
を作って行くところでしょう。なかなか楽しそう・・・
Echo server
まずは基本のエコーだけの機能のTCPサーバーを作成します。なんかリクエストを送ったらなんかレスポンスを返す感じ。
まずはTCPサーバーの動作の流れを確認します。
- 特定のポートをlistenする。そこでソケットを受け取る。
- クライアントと繋がるまで待つ。
- クライアントからのリクエストを読み込み、レスポンスを返す。
それでは、apps/kv_server
のlib/kv_server.ex
に実装していきます。
defmodule KVServer do require Logger def accept(port) do # The options below mean: # # 1. `:binary` - receives data as binaries (instead of lists) # 2. `packet: :line` - receives data line by line # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes # {:ok, socket} = :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true]) Logger.info("Accepting connections on port #{port}") loop_acceptor(socket) end defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) serve(client) loop_acceptor(socket) end defp serve(socket) do socket |> read_line() |> write_line(socket) serve(socket) end defp read_line(socket) do {:ok, data} = :gen_tcp.recv(socket, 0) data end defp write_line(line, socket) do :gen_tcp.send(socket, line) end end
Eralngのライブラリ:gen_tcp
で大体の実装はことたりそうです。
accept/1
で特定ポートでのlistenを開始し、loop_acceptor/1
を再帰的に呼び出してエコーバック(きた文章をそのまま返す)を実装しているようです。
リクエストが来る度にserve/1
を呼び出し、処理を行う想定のようです。そのうちこのserve/1
をいじっていく事になるんでしょうか。
serve/1
の主な処理は
socket |> read_line() |> write_line(socket)
です。この辺がElixirらしい。
データをソケットから読み出してして、読み出したデータをソケット先へ返していく処理です。
とりあえず試してみます。iex -S mix
で起動して使ってみましょう。
iex(1)> KVServer.accept(4040) 17:13:29.719 [info] Accepting connections on port 4040 ** (MatchError) no match of right hand side value: {:error, :closed} # telnetを切断したらエラーで落ちた (kv_server) lib/kv_server.ex:32: KVServer.read_line/1 (kv_server) lib/kv_server.ex:25: KVServer.serve/1 (kv_server) lib/kv_server.ex:19: KVServer.loop_acceptor/1
PS> telnet 127.0.0.1 4040 hello hello is it me is it me you are looking for? you are looking for?
windowsはtelnetを使える様にする必要があります。下記を参考にしました。
今回はiexで適当にやりとりしましたが、実際の運用では切断したときの挙動を実装しておくだけでなく、このサーバーのプロセスがクラッシュした際にきちんと再起動するのが望ましいです。
なのでこのサーバーのプロセスもsupervision tree下で管理するのが望ましいというところでしょうか。
Tasks
AgentやGenServerなどはメッセージのやりとりで状態の管理などをしていましたが、今回の様なとくにメッセージのやりとりをせず、ただ実行されてれば良い場合はTaskモジュールを利用するようです。
早速Taskモジュールで実行されるように変更します。lib/kv_server/application.ex
を編集していきます。
def start(_type, _args) do children = [ {Task, fn -> KVServer.accept(4040) end} # ここを追記 ] opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
childrenに{Task, 関数}
のタプルを渡しています。これでTask,start_link/1
としてTaskが起動するそうです。
また、現状ではポート番号をハードコードしてますが、これは環境変数などで変えれる様にしておくのが望ましいでしょう。
なので同じ関数内で環境変数があればそれを利用するように変更します。
port = String.to_integer(System.get_env("PORT") || "4040") # ... {Task, fn -> KVServer.accept(port) end}
これでポート番号を変更しやすくなりました。
まだ完全ではありませんが、Taskとして自動起動するかチェックしてみましょう。
mix run --no-halt
でコードを実行します。
$ telnet 127.0.0.1 4040 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. say you say you say me say me
一応Telnetで通信ができてるので自動起動には成功しているようです。
ここで複数のターミナルでTelnetをつなげてみると、反応がありません。
これでは誰かがアクセスしてる最中はレスポンスを返せないゴミサーバーです。
何とかしましょう。
Task supervisor
解決策としてはリクエストを受けたときにプロセスを生成してそちらに処理(serve
)を任せるという方針です。
loop_acceptor
を変更します。
defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) Task.start_link(fn -> serve(client) end) loop_acceptor(socket) end
ただし、これはどこかでやってアカンかったやつでは・・・
そうです、Task.start_link/1
なのでどれかのタスクがクラッシュするとこのレスポンスサーバーと他のレスポンスもろともクラッシュします。
前回(Registry)はsupervisorを用いて解決しました。おそらく今回もそれで問題無い筈です。
同様にsupervisor treeが形成されるように修正していきます。
def start(_type, _args) do port = String.to_integer(System.get_env("PORT") || raise "missing $PORT environment variable") # ポートは指定しないと怒られるように変更 children = [ {Task.Supervisor, name: KVServer.TaskSupervisor}, {Task, fn -> KVServer.accept(port) end} ] opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
childrenにTask.Supervisor
を追加しています。ここでTask.Supervisor
の実装を見てみると、内部ではDynamicSupervisorになっています。
イメージ的には
children = [ {Task.Supervisor, name: KVServer.TaskSupervisor}, {Task, fn -> KVServer.accept(port) end} ]
とKV.Supervisor
の
children = [ {DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one}, {KV.Registry, name: KV.Registry} ]
が同じ様な箇所というところでしょうか。ここは順番も重要です。(Task.Supervisor
はTask
より先に起動する)
Registryの時にcreate
で呼び出した箇所で{:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
したときのように、Task
もloop_acceptor
で呼び出すプロセスをTask.Supervisor
で起動するようにします。
あっちいったりこっちいったりでヤヤコシイですが、慣れるまでは仕方なさそう・・・
defp loop_acceptor(socket) do {:ok, client} = :gen_tcp.accept(socket) {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end) :ok = :gen_tcp.controlling_process(client, pid) # (゚д゚)⁉ loop_acceptor(socket) end
:ok = :gen_tcp.controlling_process(client, pid)
という見慣れない処理が出てきました。
これはTask.Supervisor
で起動したプロセスを接続されたclientのcontrolling process
にするものだとか。この処理が無いとなにがしかのクライアントがクラッシュした時に全部のクライアントを落とすことになるらしい。おーこわ。
ここまでできたのでmix run --no-halt
したいが、windowsだとPORT=4040 mix run --no-halt
とかすると怒られるので、先に環境変数を追記してから実行する。
PS > $env:PORT=4040 PS > mix run --no-halt 18:35:30.881 [info] Accepting connections on port 4040
複数ターミナルを開いてもいける!
ついでにiexで開いてobserverでみてみる。
PS > iex.bat -S mix Interactive Elixir (1.8.0) - press Ctrl+C to exit (type h() ENTER for help) 18:37:13.505 [info] Accepting connections on port 4040 iex(1)> :observer.start :ok
ちゃんとKVServer.TaskSupervisor
の下に2個プロセスが立ち上がってる!(このときは2端末からtelnetした)
ここで、何も考えずにsupervisorを設定してきましたが、strategyが正しいかどうかを考えておきます。
アクセプター({Task, fn -> KVServer.accept(port) end}
で起動してるプロセス)がクラッシュしたときにクライアントのレスポンスなどを処理しているプロセスを終了させる必要は現状ではありません。
Taskのsupervisorがクラッシュした際も、アクセプターのプロセスを終了させる必要は特にありません。なのでこのままで大丈夫そうです。
ただし、再起動の設定はどうでしょうか。Taskモジュールで起動したプロセスはデフォルトでは:temporary
、つまり再起動しないようになっています。各クライアントとの接続プロセスはそれで良さそうですが(再起動すると無意味なプロセスが生成される)、アクセプターは再起動してほしい所です。再起動するように修正しましょう。
アクセプターをモジュールとして定義し、use Task, restart: :permanent
として設定してstart_link
で起動する(KV.Bucket
を参照)のも一つの手ですが、今回は違う手法を採ります。
Supervisor.child_spec/2
を使用して下記のように設定できます。(lib/kv_server/application.ex
)
def start(_type, _args) do port = String.to_integer(System.get_env("PORT") || raise "missing $PORT environment variable") children = [ {Task.Supervisor, name: KVServer.TaskSupervisor}, Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent) # ここ ] opts = [strategy: :one_for_one, name: KVServer.Supervisor] Supervisor.start_link(children, opts) end
Supervisor.child_spec/2
はMix and OTP編 4章で少々言及がありました。この時はとりあえず紹介されてたから実行しただけでしたが、supervisorのchildrenに追加するときに使う関数だったことが分かりました。
AgentやGenServerを使用しているライブラリを利用する際にも、各ライブラリのstart_link
などを変更する訳にもいきませんのでこの手法を使うことが想定されます。
Elixir(もしくはErlang?)の拡張性の高さはこういう所にも現れてる気がします。