どうも、靖宗です。
今回はETS(Erlang Term Storage)。この章も前回からの引き続きとなります。
ETSというキーワードはちょいちょい出てきてたんですが実際なんなのかまだ分からないところ。Bucketの実装方法としてプロセスとETSの二種類があるよって事でしたがどういったものなのでしょうか。
ETS as a cache
とりあえず名前的にも上記の話的にも何かを記憶しておく機能と思われます。この項目としてはキャッシュとして利用する?
また、この章にETSをキャッシュとして使うな!とも書いてます。どないせいっちゅうねん。
とりあえず、サンプルを追っていきます。
iex> table = :ets.new(:buckets_registry, [:set, :protected]) #Reference<0.1885502827.460455937.234656> iex> :ets.insert(table, {"foo", self()}) true iex> :ets.lookup(table, "foo") [{"foo", #PID<0.41.0>}]
:ets
というモジュールを利用しているという事はErlangの標準ライブラリ的なものなのでしょうか。
:ets.new/2
という関数でETSを作成するようです。一個目の引数はテーブル名、二個目の引数はテーブルの読み書きなどの設定をオプションとして渡せるようです。
今回設定した:set
はキーの重複を許さない設定で、:protected
はテーブルを作成したプロセスのみ書き込み権限を渡し、それ以外のプロセスは読み出ししかできないようにする設定だそうです。
tabelという名称が使われているところからRDB的なものを感じます。
上のサンプルではtable
という変数で受けましたが、名前でもアクセスできるそうです。
iex> :ets.new(:buckets_registry, [:named_table]) :buckets_registry iex> :ets.insert(:buckets_registry, {"foo", self()}) true iex> :ets.lookup(:buckets_registry, "foo") [{"foo", #PID<0.41.0>}]
この章ではKV.Registry
をETSで再現してみるようです。
lib/kv/registry.ex
をETSを使うように変更します。
defmodule KV.Registry do use GenServer ## Client API @doc """ Starts the registry with the given options. `:name` is always required. """ def start_link(opts) do # 1. Pass the name to GenServer's init server = Keyword.fetch!(opts, :name) GenServer.start_link(__MODULE__, server, opts) end @doc """ Looks up the bucket pid for `name` stored in `server`. Returns `{:ok, pid}` if the bucket exists, `:error` otherwise. """ def lookup(server, name) do # 2. Lookup is now done directly in ETS, without accessing the server case :ets.lookup(server, name) do [{^name, pid}] -> {:ok, pid} [] -> :error end end @doc """ Ensures there is a bucket associated with the given `name` in `server`. """ def create(server, name) do GenServer.cast(server, {:create, name}) end ## Server callbacks def init(table) do # 3. We have replaced the names map by the ETS table names = :ets.new(table, [:named_table, read_concurrency: true]) refs = %{} {:ok, {names, refs}} end # 4. The previous handle_call callback for lookup was removed def handle_cast({:create, name}, {names, refs}) do # 5. Read and write to the ETS table instead of the map case lookup(names, name) do {:ok, _pid} -> {:noreply, {names, refs}} :error -> {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket) ref = Process.monitor(pid) refs = Map.put(refs, ref, name) :ets.insert(names, {name, pid}) {:noreply, {names, refs}} end end def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do # 6. Delete from the ETS table instead of the map {name, refs} = Map.pop(refs, ref) :ets.delete(names, name) {:noreply, {names, refs}} end def handle_info(_msg, state) do {:noreply, state} end end
変更箇所がコメントアウトされております。
変更前はKV.Registry.lookup/2
がサーバーへGenServer.call/3
リクエストを送っていたのに対して、ETSを使用したものは直接呼び出せるのでサーバーとのやりとりが必要ありません。(内部実装的にはサーバーとのやりとりになってるのかもしれませんが・・・)
これはETSが全てのプロセスから参照できる実装になってるおかげだと思います。
今回ETSのテーブル作成時に:protected
のオプションを指定してませんが、これはデフォルトで指定されているようです。つまり、このETSテーブルはKV.Registry
のプロセスからのみ変更可能です。
ただし他のプロセスも読み出す事はできます。
また、今回指定しているread_concurrency: true
ですがこれは読み出し最適化オプションだそうです。ETSはメモリ上にデータをキャッシュする機能なので高速に動作するのですが、特に書き込みはあまりせず読み出しばかりするときにはこの設定をするようです。
詳しくはETSのドキュメントをどうぞ。
さて、KV.Registry
を変更したことによりテストが通らなくなってると思います。名前いるようになったし。
その辺を修正していきます。test/kv/registry_test.exs
を修正します。
setup context do _ = start_supervised!({KV.Registry, name: context.test}) %{registry: context.test} end
まずはsetup周辺から。KV.Registry
をcontext.test
という名前で作成しています。(名前が必要になった為)
context.test
というのはExUnit.Case
のTag
という機能らしくcontext[:hogehoge]
という感じで:hogehoge
というタグが登録できるそうです。
デフォルトで幾つかタグが登録されているようで:test
もその一部。キーワードリストはアトムのみで構成されている場合はcontext.test
でもアクセスできる、という流れです。
名前は付けたのでじゃあテスト!
PS > mix test Compiling 2 files (.ex) ... 1) test spawn buckets (KV.RegistryTest) test/kv/registry_test.exs:10 match (=) failed code: assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping") right: :error stacktrace: test/kv/registry_test.exs:14: (test) 2) test removes bucket on crash (KV.RegistryTest) test/kv/registry_test.exs:27 ** (MatchError) no match of right hand side value: :error code: {:ok, bucket} = KV.Registry.lookup(registry, "shopping") stacktrace: test/kv/registry_test.exs:29: (test) 3) test removes buckets on exit (KV.RegistryTest) test/kv/registry_test.exs:20 ** (MatchError) no match of right hand side value: :error code: {:ok, bucket} = KV.Registry.lookup(registry, "shopping") stacktrace: test/kv/registry_test.exs:22: (test) .. Finished in 0.04 seconds 1 doctest, 7 tests, 3 failures Randomized with seed 351000
ことごとく失敗しとるやん・・・
KV.RegistryTest
の全てのテストに於いて
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
がマズいようです。
以下の二つがいけないそうです。
- 早まって(楽観的に)最適化してる(キャッシュレイヤーに追加すべき)
cast/2
を使ってる(call/3
を使うべき)
最初の一個が謎です。
とりあえずサンプルのコードを読んでいきましょう。コードは嘘を吐かない。
Race conditions?(競合状態?)
どうやらテストでETSのテーブルが生成される時間差などが問題になっているようです。
ここでKV.Registry
の生成などを見直します。
KV.Registry.create(registry, "shopping")
が実行される- registryがBucketを生成してキャッシュテーブルが更新される
KV.Registry.lookup(registry, "shopping")
でテーブルへアクセスする- 上記の返値として
{:ok, bucket}
が返ってくる
という流れのはずです。しかし思い出してください。KV.Registry.create/2
はcast/2
なので非同期処理で行われます。なので以下のような流れてテストがうまくいってません。
KV.Registry.create(registry, "shopping")
が実行される- キャッシュテーブルが更新される前に、
KV.Registry.lookup(registry, "shopping")
でテーブルへアクセスする - 上記の返値として
:error
が返ってくる - registryがBucketを生成してキャッシュテーブルが更新される
順番が入れ替わっちゃってますね。むしろよく今までのテストが通ってたものです。今回は読み出しに特化して最適化かけたから?
これを避けるにはcast/2
を利用していたAPIをcall/3
の同期処理に変更する必要があります。
lib/kv/registry.ex
を修正します。
def create(server, name) do GenServer.call(server, {:create, name}) end def handle_call({:create, name}, _from, {names, refs}) do case lookup(names, name) do {:ok, pid} -> {:reply, pid, {names, refs}} :error -> {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket) ref = Process.monitor(pid) refs = Map.put(refs, ref, name) :ets.insert(names, {name, pid}) {:reply, pid, {names, refs}} end end
handle_cast
がそのままレスポンス返すようになった感じです。
このようなエラーを鑑みると、あまりcast/2
を使用するべきではないかもしれません。Elixirの開発者はどちらかといえばcall
を使う傾向があるそうです。
とりあえず修正が完了したのでテストを回しますが、今回は--trace
オプションを付けてテストするようです。
これは全てのテストを同期的に行うそうで、順次テストが回っていくかんじでしょうか。
PS > mix test --trace KV.BucketTest * test are temporary workers (0.00ms) * test stores value by key (0.00ms) * test delete value by key (0.00ms) KV.RegistryTest * test removes bucket on crash (15.0ms) 1) test removes bucket on crash (KV.RegistryTest) test/kv/registry_test.exs:27 Assertion with == failed code: assert KV.Registry.lookup(registry, "shopping") == :error left: {:ok, #PID<0.165.0>} right: :error stacktrace: test/kv/registry_test.exs:33: (test) * test spawn buckets (0.00ms) * test removes buckets on exit (0.00ms) KVTest * test greets the world (0.00ms) * doctest KV.hello/0 (1) (0.00ms) Finished in 0.04 seconds 1 doctest, 7 tests, 1 failure Randomized with seed 249000
なんか失敗してる・・・
エラーメッセージをよく見ると
left: {:ok, #PID<0.165.0>} right: :error
あれ、Bucketがまだ生きてる( ´゚д゚`)
どうやらテストコードでAgent.stop
する→Registryがhandle_info
で{:DOWN, ...}
メッセージを受ける、という箇所が非同期処理で行われている事が問題のようです。
Agent.stop/2
は同期処理だそうなので、handle_info
がコールバックされるのが非同期という事でしょうか。
同期処理を行うと、メッセージングあたりの処理は確実に実行されるらしいので、Agent.stop/2
とlookup
の間に一回同期処理を挟みます。
テストコードを下記のように修正します。
test "removes buckets on exit", %{registry: registry} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") Agent.stop(bucket) # Do a call to ensure the registry processed the DOWN message _ = KV.Registry.create(registry, "bogus") # 同期処理 assert KV.Registry.lookup(registry, "shopping") == :error end test "removes bucket on crash", %{registry: registry} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") # Stop the bucket with non-normal reason Agent.stop(bucket, :shutdown) # Do a call to ensure the registry processed the DOWN message _ = KV.Registry.create(registry, "bogus") # 同期処理 assert KV.Registry.lookup(registry, "shopping") == :error end
特に意味の無い同期処理を一回挟んでます。
これで、もう一度mix test --trace
PS > mix test --trace KV.BucketTest * test stores value by key (0.00ms) * test delete value by key (0.00ms) * test are temporary workers (0.00ms) KV.RegistryTest * test spawn buckets (0.00ms) * test removes buckets on exit (0.00ms) * test removes bucket on crash (0.00ms) KVTest * doctest KV.hello/0 (1) (0.00ms) * test greets the world (0.00ms) Finished in 0.04 seconds 1 doctest, 7 tests, 0 failures Randomized with seed 61000
とおった! これで大体よさそうです。
ただし先ほどまでのテストはRegistryがBucketからEXITシグナルをきちんと受けているかをチェックしたに過ぎず、BucketがきちんとEXITシグナルを出しているかどうかはまた別問題です。
なのでそのテストも一応書いておきましょう。
test "bucket can crash at any time", %{registry: registry} do KV.Registry.create(registry, "shopping") {:ok, bucket} = KV.Registry.lookup(registry, "shopping") # Simulate a bucket crash by explicitly and synchronously shutting it down Agent.stop(bucket, :shutdown) # Now trying to call the dead process causes a :noproc exit catch_exit KV.Bucket.put(bucket, "milk", 3) end
catch_exit/1
というのは{:exit, hogehoge}
となっているかどうかをチェックするアサーションみたいで、内部的にはAgentはGenServerで実装されていたのでAgent.update
の処理がGenServer.call/3
を呼び、Bucketのpidが動作していなければ:exit
を返していくという処理っぽいです。
詳しくはAgentのupdate周りのソースなどをご覧下さい。
この章ではプロセスの並列読み込みや競合などを取り扱いましたが、Elixirの良さを引き出すにはなかなか重要な項目かもしれません。
最後のテストのパラグラフあたりも2018年の8月に更新されたばかりのようで新しい知識を集めていく努力が必要っぽいです。