技術メモ

プログラミングとか電子工作とか

Elixir入門(Mix and OTP編 第3章 GenServer)

f:id:ysmn_deus:20190122112104p:plain

どうも、靖宗です。
今回はGenServerということですが、Mix and OTP編はおそらく通読する必要があると思います。
もし有り難いことにこのブログをご覧になってる方がいらっしゃったらMix and OTP編の最初から見ていただけると幸いです。

前回KVBucketモジュールを作成しましたがこの章ではもう少しインターフェースを発展させていく感じでしょうか。

CREATE shopping
OK

PUT shopping milk 1
OK

GET shopping milk
1
OK

httpのリクエストっぽい。
ここでは前回のKVBucketに名前を付けて取り扱いたいというところ。
エージェントは本質的にはプロセスなので、アトムで名前が登録できるはず。

iex> Agent.start_link(fn -> %{} end, name: :shopping)
{:ok, #PID<0.43.0>}
iex> KV.Bucket.put(:shopping, "milk", 1)
:ok
iex> KV.Bucket.get(:shopping, "milk")
1

ただし、アトムに対してはガベージコレクションが働かないので、一回アトムで定義しちゃうと少なくともそのコード内では使えなくなるとのこと。
使わなけりゃいいんじゃね?とも思いますが、リストのような感じで順次増やしていくケースなんかでは致命的になるのかも。
なにはともあれ「やべえよ!」って書いてあるのでやらないのが無難。

じゃあどうすんだよ!ってことで、おそらく考えられたのがGenServerなのでしょう。
プロセスの名前とプロセスそのものを紐付けるサーバー(プロセスのState?)を立てて管理する仕組みらしく、なんかバグがあれば記録するようなモニタリング機能まで備えているっぽいです。

Our first GenServer

GenServerは2つの要素でできており、クライアントのAPIとサーバーのコールバックAPIで構成するようです。
この場合、クライアントはBucketの名前を必要とし、データを入れたり出したりする方で、サーバーはBucketの名前とプロセスを管理する方でしょうか。
これらを二つのファイルに分けても良いそうですが、今回は一個のファイル(lib/kv/registry.ex)に書いていきます。

defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    GenServer.call(server, {:lookup, name})
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server Callbacks

  def init(:ok) do
    {:ok, %{}}
  end

  def handle_call({:lookup, name}, _from, names) do
    {:reply, Map.fetch(names, name), names}
  end

  def handle_cast({:create, name}, names) do
    if Map.has_key?(names, name) do
      {:noreply, names}
    else
      {:ok, bucket} = KV.Bucket.start_link([])
      {:noreply, Map.put(names, name, bucket)}
    end
  end
end

クライアントAPI

一番最初のクライアントAPIstart_link/1ですがGenServerを作成するにあたり3つの引数が必要なようです。

  1. サーバーのコールバックAPIがどこに実装されてるのか。この場合は現モジュールなので__MODULE__で指定してる。他の所に実装されてる場合はモジュール名?
  2. 初期化の引数。たぶんサーバーのコールバックAPIinit/1にそのまま引数が放り込まれる。
  3. オプションのリスト。サーバーの名前とからしい。とりあえず今はこのままであとで改良する。

GenServerにはcallscastsという2種類のリクエストが送信できる。callsは必ずレスポンスが伴い同期処理、castsはレスポンスがないし非同期処理という違いらしい。
これを踏まえてlookup/2create/2という2種類の関数を実装する。

  def lookup(server, name) do
    GenServer.call(server, {:lookup, name})
  end

  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

lookup/2でGenServerに登録されてるBucketのプロセスを探すのだろう、そうなればレスポンスが必要なので必然的に"call/2"がチョイスされる。
create/2でGenServerにBucketのプロセスIDを登録する処理をお願いする。クライアントが命名したBucketのプロセスIDを管理しなくても良いので、生成もGenServerがするほうが自然?

ここで、どちらのリクエストもserver+タプルという引数になっているが、後のサーバーのコールバックAPIの実装などでパターンマッチして処理を分けたりするので大体この形式になるそうです。

なんだかピンとこない人もいそうですが、おそらくこの辺はサーバーのコールバックAPIと一緒に考えないといけないと思いますので次へ。

サーバーのコールバックAPI

サーバーのコールバックAPIにはinit/1handle_call/2handle_cast/2の3つを作成しています。

まずはinit/1ですが、これはクライアントAPIに出てきたGenServer.start_link(__MODULE__, :ok, opts)の2個目の引数(ここで言う:ok)が渡されます。
つまり、クライアント側でGenServer.start_linkを呼ぶと、新しいプロセスが立ち上がり、そのプロセス内でinit/1が実行されるという事だと思います。
以降のhandle_call/2handle_cast/2も、この新しくたったプロセス内で実行されるはずです。(これがコールバックといってる所以?)

handle_call/2は、クライアント側でGenServer.callが実行された際に実行される関数です。
(余談ですが、ソースコードを鑑みるに、handle_call/2はbehaviourとして実装されていたので内部でDynamic dispatchされている感じでしょうか。)

  def handle_call({:lookup, name}, _from, names) do
    {:reply, Map.fetch(names, name), names}
  end

おそらく引数の{:lookup, name}とパターンマッチしない場合はエラーの送出か{:error, term}みたいなのが返ってきそうです。
_fromはリクエストを送ってきたプロセス、namesはサーバーの保持している状態(initで生成した%{}か、若しくはそれを更新したマップ)です。
この最後の引数がオブジェクト指向だと気持ち悪いのですが、状態を抱えているオブジェクトなど存在しないことが前提であれば納得がいきます。常に、状態を紡いでいく感じでしょうか。

このコールバック関数は{:reply, Map.fetch(names, name), names}というタプルを返してます。
:replyはレスポンスであるインジケータ的役割(パターンマッチとかで使える)、Map.fetch(names, name)は今回({:lookup, name}を発行した、クライアントAPIlookup)欲しいレスポンス、namesBucket上で保持する新しい状態です。
実際にGenServerをuseして作成したモジュールを通したときに得られるレスポンスは、たぶん{:ok, reply(コールバックの2個目の返値)}とかになると思います。

お次はhandle_cast/2
handle_call/2が理解できてればほぼ同義ですが、前の項目で記述したようにcastは返値がないので関数が返すタプルがちょっと異なります。

  def handle_cast({:create, name}, names) do
    if Map.has_key?(names, name) do
      {:noreply, names}
    else
      {:ok, bucket} = KV.Bucket.start_link([])
      {:noreply, Map.put(names, name, bucket)}
    end
  end

関数の引数が{:create, name}になってますので、パターンマッチで要素nameを取得しています。
handle_call/2_fromの引数がないのでどこから呼び出されたかが関数内では使えなさそう。まぁ値を返さなくてもいいのでそらそうなのかな。
ここではnameが既に既存の状態の内部(引数names)にあるかないかで条件分岐してます。
既に存在する場合は何もせずGenServer内で保持する状態をそのまま保持するために{:noreply, names}を返しています。おそらく返値が{:noreply, hoge}の時には何も返値を返さないのでしょう。
保持しているマップ内にBucket名が存在しない場合はKV.Bucket.start_link([])Bucketのプロセスを立ち上げ、保持している状態の中で管理するようにマップへ追加しています。

GenServerが書けたのでテストを書きます!

Testing a GenServer

なにやらここで書くテストは先ほどのエージェントのテストとはちょっと違うそうです。
サンプルを追っていきます。テストファイルはtest/kv/registry_test.exsに書いていきます。

defmodule KV.RegistryTest do
  use ExUnit.Case, async: true

  setup do
    registry = start_supervised!(KV.Registry)
    %{registry: registry}
  end

  test "spawns buckets", %{registry: registry} do
    assert KV.Registry.lookup(registry, "shopping") == :error

    KV.Registry.create(registry, "shopping")
    assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    KV.Bucket.put(bucket, "milk", 1)
    assert KV.Bucket.get(bucket, "milk") == 1
  end
end

とはいうもののstart_supervised!以外はほぼ一緒?
一応先にmix testしてみます。

PS > mix test
Compiling 2 files (.ex)
Generated kv app
.....

Finished in 0.03 seconds
1 doctest, 4 tests, 0 failures

Randomized with seed 762000

通りました。(´◔౪◔)۶ヨッシャ!

ここでstart_supervised!とはなんぞや、というところですが、引数に取ったモジュールをstart_link/1で起動してくれる関数のようです。
手動でstart_link/1を呼び出してもいい気はしますが、もしプロセスが共有されたりするとassert KV.Registry.lookup(registry, "shopping") == :errorの箇所のように「無くてエラーになる」処理が思わぬ挙動を起こす可能性があります。(例えばプロセスが共有されて、無い筈の"shopping"Bucketが生成されてたり。)
start_supervised!を利用すると、別のテストでは必ずそのプロセスが終了しているようにしてくれるのだそうです。
なのでbucket_test.exsも書き換えときましょう。

  setup do
#    {:ok, bucket} = KV.Bucket.start_link([])
    bucket = start_supervised!(KV.Bucket)
    %{bucket: bucket}
  end

あとここでGenServerを停止するクライアントAPIも追加しておきましょう。

## Client API

@doc """
Stops the registry.
"""
def stop(server) do
  GenServer.stop(server)
end

The need for monitoring

モニタリング機能の話でしょうか?
止まったりクラッシュしたBucketを野放しにしておく訳にはいかないので、そこらへんの機能を作ります。
まずはKV.RegistryTestにテストを書きます。

test "removes buckets on exit", %{registry: registry} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  Agent.stop(bucket) # Bucketをここで外部からわざと止める
  assert KV.Registry.lookup(registry, "shopping") == :error # ここでエラーが返ってくるのが望ましい
end

monitorをセットすればこの辺の監視は自動でやってくれるみたいです。
このmonitorという機能はどうもProcessモジュールの機能のようなので、とりあえずiexでやってみます。
iex.bat -S mixでiexを立ち上げます。

iex(1)> {:ok, pid} = KV.Bucket.start_link([])
{:ok, #PID<0.144.0>}
iex(2)> Process.monitor(pid)
#Reference<0.2536230812.419430406.126900>
iex(3)> flush() # 一応なんもメッセージが届いてないか確認
:ok
iex(4)> Agent.stop(pid) # エージェントを停止
:ok
iex(5)> flush() # メッセージを表示
{:DOWN, #Reference<0.2536230812.419430406.126900>, :process, #PID<0.144.0>,
 :normal}
:ok

この仕組みを用いてGenServerのプロセスに;DOWNのメッセージが来たらなんかするって処理を書くのでしょうか。
最後に:normalという終了時の状態っぽいものも返ってきてるので、再起動するか否かの柔軟性が作れそうです。

それでは、このモニター機能を使ってGenServerに機能を実装していきます。
ここで、今まではGenServerは1種類の状態(names)しか管理していませんでしたが、更にもう1種類refsを管理するようにします。
namesを渡していってた所に{names, refs}を渡していく要領でしょう。

## Server callbacks

def init(:ok) do
  names = %{} # Bucketの名前とPIDのマップ
  refs = %{} # モニター用リファレンス
  {:ok, {names, refs}}
end

def handle_call({:lookup, name}, _from, state) do # namesしか必要無いし更新しないのでstateとしてまとめてる
  {names, _} = state
  {:reply, Map.fetch(names, name), state}
end

def handle_cast({:create, name}, {names, refs}) do # namesもrefsもいじる可能性があるので{names, refs}で状態を受けてる
  if Map.has_key?(names, name) do
    {:noreply, {names, refs}}
  else
    {:ok, pid} = KV.Bucket.start_link([])
    ref = Process.monitor(pid)
    refs = Map.put(refs, ref, name)
    names = Map.put(names, name, pid)
    {:noreply, {names, refs}}
  end
end

def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
  {name, refs} = Map.pop(refs, ref)
  names = Map.delete(names, name)
  {:noreply, {names, refs}}
end

def handle_info(_msg, state) do
  {:noreply, state}
end

今回の変更はサーバーのコールバックAPIのみの更新で済んでるので「クライアントAPIと分けた方がいいよ!」って理由の一個らしい。
このぐらいの規模だったら別に分ける必要ないとは思いますが、大規模な時は分けた方が可読性も上がりそう。

handle_infoでパターンマッチしてrefsの処理を行っている。

ただし、handle_infoは2種類あって、{:DOWN, ...}じゃないメッセージを握りつぶしている。
この理由は次の項目で説明される(とおもう)。

call, cast or info?

init/1を除けば、大まかにリクエスト関連のコールバックAPIhandle_call/3handle_cast/2handle_info/2の3つ。
どれで処理するかの指針は下記の通り。

  1. handle_call/3は同期的なリクエストに用いる。レスポンスを待つようなリクエストは基本これ。
  2. handle_cast/2はレスポンスを気にしないような非同期的なリクエストに用いる。ほとんど無いと思うけどメッセージを受け取ってない可能性があっても1回の処理としては良いような、そんなかんじ。(あとでチェックしてもう一回実行すれば良い?) 1, handle_info/2GenServer.call/2とかGenServer.cast/2とかで送れないようなメッセージのやりとりに限定して使う。主にモニタリング?

send/2とか使えばhandle_info/2経由でGenServerにメッセージを送信できるらしい。
ただし、この機能のせいで謎のメッセージが送られちゃったりするとパターンマッチしないメッセージが送られてきた場合GenServerがクラッシュすることもあるそうで。
なのでdef handle_info(_msg, state) doで全ての謎のメッセージは握りつぶしているようです。
これを鑑みるにGenServerには不用意にメッセージを送らない方がいいとは思います。(もとよりあんまりしないと思いますが。)

じゃあ、handle_call/3handle_cast/2もパターンマッチしない場合の握りつぶしを書いといた方がいいのでは?とも思いますが、この辺はGenServerさんがうまいことやってくれてるみたいです。
そもそもhandle_callなどが呼び出されるのはGenServerのAPI経由なので、ここで処理しているのだと思います。

Monitors or links?

プロセスの十一章でリンク(spawn_linkなど?)を学んでいますが、このモニターを使うのとリンクを使うのとどっちがいいんでしょうか。

リンクは基本的に双方向性があります。リンクして発行した先のプロセスが終了しても、発行した元のプロセスが終了しても同時にどちらも終了します。
モニターは一方向性でモニターしているプロセス(先ほどの例で言えばGenServerのサーバー側のプロセス)が監視しているプロセスの状態を受け取るのみです。監視されてるプロセスが終了しても監視してる側のプロセスは終了しません。

結局、プロセスが同時に落ちて欲しい場合はリンクを、そうでなく状態の情報が欲しいだけの場合はモニターを利用するのがいいと思います。

以上を踏まえてhandle_cast/2の実装に戻ると、ここではリンクしてるしモニターしてるような状況であることが分かります。

{:ok, pid} = KV.Bucket.start_link([]) # リンク!
ref = Process.monitor(pid) # モニター!

よくない!
BucketがクラッシュしたごときでRegistryまでクラッシュして欲しくないのでここはなんとかしましょう。
基本的にElixirではプロセスを直接立ち上げることを避ける傾向があるそうです。
これを実現する為にsupervisorsというものを利用するようですが、このお話は次の章で。
テストの所のstart_supervised!と関係あるのかな?