技術メモ

プログラミングとか電子工作とか

Elixir入門(Mix and OTP編 第8章 Task and gen_tcp)

f:id:ysmn_deus:20190122112104p:plain

どうも、靖宗です。
今回も前回の引き続きとなります。
今回は前回用意したKVServerを作って行くところでしょう。なかなか楽しそう・・・

Echo server

まずは基本のエコーだけの機能のTCPサーバーを作成します。なんかリクエストを送ったらなんかレスポンスを返す感じ。
まずはTCPサーバーの動作の流れを確認します。

  1. 特定のポートをlistenする。そこでソケットを受け取る。
  2. クライアントと繋がるまで待つ。
  3. クライアントからのリクエストを読み込み、レスポンスを返す。

それでは、apps/kv_serverlib/kv_server.exに実装していきます。

defmodule KVServer do
  require Logger

  def accept(port) do
    # The options below mean:
    #
    # 1. `:binary` - receives data as binaries (instead of lists)
    # 2. `packet: :line` - receives data line by line
    # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
    # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
    #
    {:ok, socket} =
      :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info("Accepting connections on port #{port}")
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    serve(client)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

Eralngのライブラリ:gen_tcpで大体の実装はことたりそうです。
accept/1で特定ポートでのlistenを開始し、loop_acceptor/1再帰的に呼び出してエコーバック(きた文章をそのまま返す)を実装しているようです。
リクエストが来る度にserve/1を呼び出し、処理を行う想定のようです。そのうちこのserve/1をいじっていく事になるんでしょうか。

serve/1の主な処理は

socket |> read_line() |> write_line(socket)

です。この辺がElixirらしい。
データをソケットから読み出してして、読み出したデータをソケット先へ返していく処理です。

とりあえず試してみます。iex -S mixで起動して使ってみましょう。

iex(1)> KVServer.accept(4040)

17:13:29.719 [info]  Accepting connections on port 4040
** (MatchError) no match of right hand side value: {:error, :closed} # telnetを切断したらエラーで落ちた
    (kv_server) lib/kv_server.ex:32: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:25: KVServer.serve/1
    (kv_server) lib/kv_server.ex:19: KVServer.loop_acceptor/1
PS> telnet 127.0.0.1 4040
hello
hello
is it me
is it me
you are looking for?
you are looking for?

windowstelnetを使える様にする必要があります。下記を参考にしました。

www.imamura.biz

今回はiexで適当にやりとりしましたが、実際の運用では切断したときの挙動を実装しておくだけでなく、このサーバーのプロセスがクラッシュした際にきちんと再起動するのが望ましいです。
なのでこのサーバーのプロセスもsupervision tree下で管理するのが望ましいというところでしょうか。

Tasks

AgentやGenServerなどはメッセージのやりとりで状態の管理などをしていましたが、今回の様なとくにメッセージのやりとりをせず、ただ実行されてれば良い場合はTaskモジュールを利用するようです。
早速Taskモジュールで実行されるように変更します。lib/kv_server/application.exを編集していきます。

  def start(_type, _args) do
    children = [
      {Task, fn -> KVServer.accept(4040) end} # ここを追記
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

childrenに{Task, 関数}のタプルを渡しています。これでTask,start_link/1としてTaskが起動するそうです。

また、現状ではポート番号をハードコードしてますが、これは環境変数などで変えれる様にしておくのが望ましいでしょう。
なので同じ関数内で環境変数があればそれを利用するように変更します。

port = String.to_integer(System.get_env("PORT") || "4040")
# ...
{Task, fn -> KVServer.accept(port) end}

これでポート番号を変更しやすくなりました。
まだ完全ではありませんが、Taskとして自動起動するかチェックしてみましょう。
mix run --no-haltでコードを実行します。

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

一応Telnetで通信ができてるので自動起動には成功しているようです。
ここで複数のターミナルでTelnetをつなげてみると、反応がありません。

これでは誰かがアクセスしてる最中はレスポンスを返せないゴミサーバーです。
何とかしましょう。

Task supervisor

解決策としてはリクエストを受けたときにプロセスを生成してそちらに処理(serve)を任せるという方針です。
loop_acceptorを変更します。

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  Task.start_link(fn -> serve(client) end)
  loop_acceptor(socket)
end

ただし、これはどこかでやってアカンかったやつでは・・・
そうです、Task.start_link/1なのでどれかのタスクがクラッシュするとこのレスポンスサーバーと他のレスポンスもろともクラッシュします。

前回(Registry)はsupervisorを用いて解決しました。おそらく今回もそれで問題無い筈です。
同様にsupervisor treeが形成されるように修正していきます。

  def start(_type, _args) do
    port = String.to_integer(System.get_env("PORT") || raise "missing $PORT environment variable") # ポートは指定しないと怒られるように変更

    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      {Task, fn -> KVServer.accept(port) end}
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

childrenにTask.Supervisorを追加しています。ここでTask.Supervisorの実装を見てみると、内部ではDynamicSupervisorになっています。

github.com

イメージ的には

    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      {Task, fn -> KVServer.accept(port) end}
    ]

KV.Supervisor

    children = [
      {DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one},
      {KV.Registry, name: KV.Registry}
    ]

が同じ様な箇所というところでしょうか。ここは順番も重要です。(Task.SupervisorTaskより先に起動する)
Registryの時にcreateで呼び出した箇所で{:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)したときのように、Taskloop_acceptorで呼び出すプロセスをTask.Supervisorで起動するようにします。
あっちいったりこっちいったりでヤヤコシイですが、慣れるまでは仕方なさそう・・・

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
  :ok = :gen_tcp.controlling_process(client, pid) # (゚д゚)⁉
  loop_acceptor(socket)
end

:ok = :gen_tcp.controlling_process(client, pid)という見慣れない処理が出てきました。
これはTask.Supervisorで起動したプロセスを接続されたclientのcontrolling processにするものだとか。この処理が無いとなにがしかのクライアントがクラッシュした時に全部のクライアントを落とすことになるらしい。おーこわ。

ここまでできたのでmix run --no-haltしたいが、windowsだとPORT=4040 mix run --no-haltとかすると怒られるので、先に環境変数を追記してから実行する。

PS > $env:PORT=4040
PS > mix run --no-halt

18:35:30.881 [info]  Accepting connections on port 4040

複数ターミナルを開いてもいける!
ついでにiexで開いてobserverでみてみる。

PS > iex.bat -S mix
Interactive Elixir (1.8.0) - press Ctrl+C to exit (type h() ENTER for help)

18:37:13.505 [info]  Accepting connections on port 4040
iex(1)> :observer.start
:ok

f:id:ysmn_deus:20190214183822p:plain

ちゃんとKVServer.TaskSupervisorの下に2個プロセスが立ち上がってる!(このときは2端末からtelnetした)

ここで、何も考えずにsupervisorを設定してきましたが、strategyが正しいかどうかを考えておきます。
アクセプター({Task, fn -> KVServer.accept(port) end}で起動してるプロセス)がクラッシュしたときにクライアントのレスポンスなどを処理しているプロセスを終了させる必要は現状ではありません。
Taskのsupervisorがクラッシュした際も、アクセプターのプロセスを終了させる必要は特にありません。なのでこのままで大丈夫そうです。

ただし、再起動の設定はどうでしょうか。Taskモジュールで起動したプロセスはデフォルトでは:temporary、つまり再起動しないようになっています。各クライアントとの接続プロセスはそれで良さそうですが(再起動すると無意味なプロセスが生成される)、アクセプターは再起動してほしい所です。再起動するように修正しましょう。

アクセプターをモジュールとして定義し、use Task, restart: :permanentとして設定してstart_linkで起動する(KV.Bucketを参照)のも一つの手ですが、今回は違う手法を採ります。
Supervisor.child_spec/2を使用して下記のように設定できます。(lib/kv_server/application.ex

  def start(_type, _args) do
    port = String.to_integer(System.get_env("PORT") || raise "missing $PORT environment variable")

    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent) # ここ
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

Supervisor.child_spec/2Mix and OTP編 4章で少々言及がありました。この時はとりあえず紹介されてたから実行しただけでしたが、supervisorのchildrenに追加するときに使う関数だったことが分かりました。
AgentやGenServerを使用しているライブラリを利用する際にも、各ライブラリのstart_linkなどを変更する訳にもいきませんのでこの手法を使うことが想定されます。
Elixir(もしくはErlang?)の拡張性の高さはこういう所にも現れてる気がします。