どうも、靖宗です。
今回はGenServerということですが、Mix and OTP編はおそらく通読する必要があると思います。
もし有り難いことにこのブログをご覧になってる方がいらっしゃったらMix and OTP編の最初から見ていただけると幸いです。
- Our first GenServer
- Testing a GenServer
- The need for monitoring
- call, cast or info?
- Monitors or links?
前回KVBucket
モジュールを作成しましたがこの章ではもう少しインターフェースを発展させていく感じでしょうか。
CREATE shopping OK PUT shopping milk 1 OK GET shopping milk 1 OK
httpのリクエストっぽい。
ここでは前回のKVBucketに名前を付けて取り扱いたいというところ。
エージェントは本質的にはプロセスなので、アトムで名前が登録できるはず。
iex> Agent.start_link(fn -> %{} end, name: :shopping) {:ok, #PID<0.43.0>} iex> KV.Bucket.put(:shopping, "milk", 1) :ok iex> KV.Bucket.get(:shopping, "milk") 1
ただし、アトムに対してはガベージコレクションが働かないので、一回アトムで定義しちゃうと少なくともそのコード内では使えなくなるとのこと。
使わなけりゃいいんじゃね?とも思いますが、リストのような感じで順次増やしていくケースなんかでは致命的になるのかも。
なにはともあれ「やべえよ!」って書いてあるのでやらないのが無難。
じゃあどうすんだよ!ってことで、おそらく考えられたのがGenServerなのでしょう。
プロセスの名前とプロセスそのものを紐付けるサーバー(プロセスのState?)を立てて管理する仕組みらしく、なんかバグがあれば記録するようなモニタリング機能まで備えているっぽいです。
Our first GenServer
GenServerは2つの要素でできており、クライアントのAPIとサーバーのコールバックAPIで構成するようです。
この場合、クライアントはBucketの名前を必要とし、データを入れたり出したりする方で、サーバーはBucketの名前とプロセスを管理する方でしょうか。
これらを二つのファイルに分けても良いそうですが、今回は一個のファイル(lib/kv/registry.ex
)に書いていきます。
defmodule KV.Registry do use GenServer ## Client API @doc """ Starts the registry. """ def start_link(opts) do GenServer.start_link(__MODULE__, :ok, opts) end @doc """ Looks up the bucket pid for `name` stored in `server`. Returns `{:ok, pid}` if the bucket exists, `:error` otherwise. """ def lookup(server, name) do GenServer.call(server, {:lookup, name}) end @doc """ Ensures there is a bucket associated with the given `name` in `server`. """ def create(server, name) do GenServer.cast(server, {:create, name}) end ## Server Callbacks def init(:ok) do {:ok, %{}} end def handle_call({:lookup, name}, _from, names) do {:reply, Map.fetch(names, name), names} end def handle_cast({:create, name}, names) do if Map.has_key?(names, name) do {:noreply, names} else {:ok, bucket} = KV.Bucket.start_link([]) {:noreply, Map.put(names, name, bucket)} end end end
クライアントAPI
一番最初のクライアントAPIstart_link/1
ですがGenServerを作成するにあたり3つの引数が必要なようです。
- サーバーのコールバックAPIがどこに実装されてるのか。この場合は現モジュールなので
__MODULE__
で指定してる。他の所に実装されてる場合はモジュール名? - 初期化の引数。たぶんサーバーのコールバックAPI
init/1
にそのまま引数が放り込まれる。 - オプションのリスト。サーバーの名前とからしい。とりあえず今はこのままであとで改良する。
GenServerにはcalls
とcasts
という2種類のリクエストが送信できる。calls
は必ずレスポンスが伴い同期処理、casts
はレスポンスがないし非同期処理という違いらしい。
これを踏まえてlookup/2
とcreate/2
という2種類の関数を実装する。
def lookup(server, name) do GenServer.call(server, {:lookup, name}) end def create(server, name) do GenServer.cast(server, {:create, name}) end
lookup/2
でGenServerに登録されてるBucketのプロセスを探すのだろう、そうなればレスポンスが必要なので必然的に"call/2"がチョイスされる。
create/2
でGenServerにBucketのプロセスIDを登録する処理をお願いする。クライアントが命名したBucketのプロセスIDを管理しなくても良いので、生成もGenServerがするほうが自然?
ここで、どちらのリクエストもserver
+タプルという引数になっているが、後のサーバーのコールバックAPIの実装などでパターンマッチして処理を分けたりするので大体この形式になるそうです。
なんだかピンとこない人もいそうですが、おそらくこの辺はサーバーのコールバックAPIと一緒に考えないといけないと思いますので次へ。
サーバーのコールバックAPI
サーバーのコールバックAPIにはinit/1
、handle_call/2
、handle_cast/2
の3つを作成しています。
まずはinit/1
ですが、これはクライアントAPIに出てきたGenServer.start_link(__MODULE__, :ok, opts)
の2個目の引数(ここで言う:ok
)が渡されます。
つまり、クライアント側でGenServer.start_link
を呼ぶと、新しいプロセスが立ち上がり、そのプロセス内でinit/1
が実行されるという事だと思います。
以降のhandle_call/2
、handle_cast/2
も、この新しくたったプロセス内で実行されるはずです。(これがコールバックといってる所以?)
handle_call/2
は、クライアント側でGenServer.call
が実行された際に実行される関数です。
(余談ですが、ソースコードを鑑みるに、handle_call/2
はbehaviourとして実装されていたので内部でDynamic dispatchされている感じでしょうか。)
def handle_call({:lookup, name}, _from, names) do {:reply, Map.fetch(names, name), names} end
おそらく引数の{:lookup, name}とパターンマッチしない場合はエラーの送出か{:error, term}
みたいなのが返ってきそうです。
_from
はリクエストを送ってきたプロセス、names
はサーバーの保持している状態(init
で生成した%{}
か、若しくはそれを更新したマップ)です。
この最後の引数がオブジェクト指向だと気持ち悪いのですが、状態を抱えているオブジェクトなど存在しないことが前提であれば納得がいきます。常に、状態を紡いでいく感じでしょうか。
このコールバック関数は{:reply, Map.fetch(names, name), names}
というタプルを返してます。
:reply
はレスポンスであるインジケータ的役割(パターンマッチとかで使える)、Map.fetch(names, name)
は今回({:lookup, name}を発行した、クライアントAPIのlookup
)欲しいレスポンス、names
はBucket上で保持する新しい状態です。
実際にGenServerをuse
して作成したモジュールを通したときに得られるレスポンスは、たぶん{:ok, reply(コールバックの2個目の返値)}
とかになると思います。
お次はhandle_cast/2
。
handle_call/2
が理解できてればほぼ同義ですが、前の項目で記述したようにcast
は返値がないので関数が返すタプルがちょっと異なります。
def handle_cast({:create, name}, names) do if Map.has_key?(names, name) do {:noreply, names} else {:ok, bucket} = KV.Bucket.start_link([]) {:noreply, Map.put(names, name, bucket)} end end
関数の引数が{:create, name}
になってますので、パターンマッチで要素name
を取得しています。
handle_call/2
は_from
の引数がないのでどこから呼び出されたかが関数内では使えなさそう。まぁ値を返さなくてもいいのでそらそうなのかな。
ここではname
が既に既存の状態の内部(引数names
)にあるかないかで条件分岐してます。
既に存在する場合は何もせずGenServer内で保持する状態をそのまま保持するために{:noreply, names}
を返しています。おそらく返値が{:noreply, hoge}
の時には何も返値を返さないのでしょう。
保持しているマップ内にBucket名が存在しない場合はKV.Bucket.start_link([])
でBucketのプロセスを立ち上げ、保持している状態の中で管理するようにマップへ追加しています。
GenServerが書けたのでテストを書きます!
Testing a GenServer
なにやらここで書くテストは先ほどのエージェントのテストとはちょっと違うそうです。
サンプルを追っていきます。テストファイルはtest/kv/registry_test.exs
に書いていきます。
defmodule KV.RegistryTest do use ExUnit.Case, async: true setup do registry = start_supervised!(KV.Registry) %{registry: registry} end test "spawns buckets", %{registry: registry} do assert KV.Registry.lookup(registry, "shopping") == :error KV.Registry.create(registry, "shopping") assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping") KV.Bucket.put(bucket, "milk", 1) assert KV.Bucket.get(bucket, "milk") == 1 end end
とはいうもののstart_supervised!
以外はほぼ一緒?
一応先にmix test
してみます。
PS > mix test Compiling 2 files (.ex) Generated kv app ..... Finished in 0.03 seconds 1 doctest, 4 tests, 0 failures Randomized with seed 762000
通りました。(´◔౪◔)۶ヨッシャ!
ここでstart_supervised!
とはなんぞや、というところですが、引数に取ったモジュールをstart_link/1
で起動してくれる関数のようです。
手動でstart_link/1
を呼び出してもいい気はしますが、もしプロセスが共有されたりするとassert KV.Registry.lookup(registry, "shopping") == :error
の箇所のように「無くてエラーになる」処理が思わぬ挙動を起こす可能性があります。(例えばプロセスが共有されて、無い筈の"shopping"Bucketが生成されてたり。)
start_supervised!
を利用すると、別のテストでは必ずそのプロセスが終了しているようにしてくれるのだそうです。
なのでbucket_test.exs
も書き換えときましょう。
setup do # {:ok, bucket} = KV.Bucket.start_link([]) bucket = start_supervised!(KV.Bucket) %{bucket: bucket} end
あとここでGenServerを停止するクライアントAPIも追加しておきましょう。
## Client API @doc """ Stops the registry. """ def stop(server) do GenServer.stop(server) end
The need for monitoring
モニタリング機能の話でしょうか?
止まったりクラッシュしたBucketを野放しにしておく訳にはいかないので、そこらへんの機能を作ります。
まずはKV.RegistryTest
にテストを書きます。
test "removes buckets on exit", %{registry: registry} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") Agent.stop(bucket) # Bucketをここで外部からわざと止める assert KV.Registry.lookup(registry, "shopping") == :error # ここでエラーが返ってくるのが望ましい end
monitor
をセットすればこの辺の監視は自動でやってくれるみたいです。
このmonitor
という機能はどうもProcessモジュールの機能のようなので、とりあえずiexでやってみます。
iex.bat -S mix
でiexを立ち上げます。
iex(1)> {:ok, pid} = KV.Bucket.start_link([]) {:ok, #PID<0.144.0>} iex(2)> Process.monitor(pid) #Reference<0.2536230812.419430406.126900> iex(3)> flush() # 一応なんもメッセージが届いてないか確認 :ok iex(4)> Agent.stop(pid) # エージェントを停止 :ok iex(5)> flush() # メッセージを表示 {:DOWN, #Reference<0.2536230812.419430406.126900>, :process, #PID<0.144.0>, :normal} :ok
この仕組みを用いてGenServerのプロセスに;DOWN
のメッセージが来たらなんかするって処理を書くのでしょうか。
最後に:normal
という終了時の状態っぽいものも返ってきてるので、再起動するか否かの柔軟性が作れそうです。
それでは、このモニター機能を使ってGenServerに機能を実装していきます。
ここで、今まではGenServerは1種類の状態(names
)しか管理していませんでしたが、更にもう1種類refs
を管理するようにします。
names
を渡していってた所に{names, refs}
を渡していく要領でしょう。
## Server callbacks def init(:ok) do names = %{} # Bucketの名前とPIDのマップ refs = %{} # モニター用リファレンス {:ok, {names, refs}} end def handle_call({:lookup, name}, _from, state) do # namesしか必要無いし更新しないのでstateとしてまとめてる {names, _} = state {:reply, Map.fetch(names, name), state} end def handle_cast({:create, name}, {names, refs}) do # namesもrefsもいじる可能性があるので{names, refs}で状態を受けてる if Map.has_key?(names, name) do {:noreply, {names, refs}} else {:ok, pid} = KV.Bucket.start_link([]) ref = Process.monitor(pid) refs = Map.put(refs, ref, name) names = Map.put(names, name, pid) {:noreply, {names, refs}} end end def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do {name, refs} = Map.pop(refs, ref) names = Map.delete(names, name) {:noreply, {names, refs}} end def handle_info(_msg, state) do {:noreply, state} end
今回の変更はサーバーのコールバックAPIのみの更新で済んでるので「クライアントAPIと分けた方がいいよ!」って理由の一個らしい。
このぐらいの規模だったら別に分ける必要ないとは思いますが、大規模な時は分けた方が可読性も上がりそう。
handle_info
でパターンマッチしてrefs
の処理を行っている。
ただし、handle_info
は2種類あって、{:DOWN, ...}
じゃないメッセージを握りつぶしている。
この理由は次の項目で説明される(とおもう)。
call, cast or info?
init/1
を除けば、大まかにリクエスト関連のコールバックAPIはhandle_call/3
、handle_cast/2
、handle_info/2
の3つ。
どれで処理するかの指針は下記の通り。
handle_call/3
は同期的なリクエストに用いる。レスポンスを待つようなリクエストは基本これ。handle_cast/2
はレスポンスを気にしないような非同期的なリクエストに用いる。ほとんど無いと思うけどメッセージを受け取ってない可能性があっても1回の処理としては良いような、そんなかんじ。(あとでチェックしてもう一回実行すれば良い?) 1,handle_info/2
はGenServer.call/2
とかGenServer.cast/2
とかで送れないようなメッセージのやりとりに限定して使う。主にモニタリング?
send/2
とか使えばhandle_info/2
経由でGenServerにメッセージを送信できるらしい。
ただし、この機能のせいで謎のメッセージが送られちゃったりするとパターンマッチしないメッセージが送られてきた場合GenServerがクラッシュすることもあるそうで。
なのでdef handle_info(_msg, state) do
で全ての謎のメッセージは握りつぶしているようです。
これを鑑みるにGenServerには不用意にメッセージを送らない方がいいとは思います。(もとよりあんまりしないと思いますが。)
じゃあ、handle_call/3
やhandle_cast/2
もパターンマッチしない場合の握りつぶしを書いといた方がいいのでは?とも思いますが、この辺はGenServerさんがうまいことやってくれてるみたいです。
そもそもhandle_call
などが呼び出されるのはGenServerのAPI経由なので、ここで処理しているのだと思います。
Monitors or links?
プロセスの十一章でリンク(spawn_link
など?)を学んでいますが、このモニターを使うのとリンクを使うのとどっちがいいんでしょうか。
リンクは基本的に双方向性があります。リンクして発行した先のプロセスが終了しても、発行した元のプロセスが終了しても同時にどちらも終了します。
モニターは一方向性でモニターしているプロセス(先ほどの例で言えばGenServerのサーバー側のプロセス)が監視しているプロセスの状態を受け取るのみです。監視されてるプロセスが終了しても監視してる側のプロセスは終了しません。
結局、プロセスが同時に落ちて欲しい場合はリンクを、そうでなく状態の情報が欲しいだけの場合はモニターを利用するのがいいと思います。
以上を踏まえてhandle_cast/2
の実装に戻ると、ここではリンクしてるしモニターしてるような状況であることが分かります。
{:ok, pid} = KV.Bucket.start_link([]) # リンク! ref = Process.monitor(pid) # モニター!
よくない!
BucketがクラッシュしたごときでRegistryまでクラッシュして欲しくないのでここはなんとかしましょう。
基本的にElixirではプロセスを直接立ち上げることを避ける傾向があるそうです。
これを実現する為にsupervisors
というものを利用するようですが、このお話は次の章で。
テストの所のstart_supervised!
と関係あるのかな?