技術メモ

プログラミングとか電子工作とか

Elixir入門(Mix and OTP編 第6章 ETS)

f:id:ysmn_deus:20190122112104p:plain

どうも、靖宗です。
今回はETS(Erlang Term Storage)。この章も前回からの引き続きとなります。
ETSというキーワードはちょいちょい出てきてたんですが実際なんなのかまだ分からないところ。Bucketの実装方法としてプロセスとETSの二種類があるよって事でしたがどういったものなのでしょうか。

ETS as a cache

とりあえず名前的にも上記の話的にも何かを記憶しておく機能と思われます。この項目としてはキャッシュとして利用する?

また、この章にETSをキャッシュとして使うな!とも書いてます。どないせいっちゅうねん。

とりあえず、サンプルを追っていきます。

iex> table = :ets.new(:buckets_registry, [:set, :protected])
#Reference<0.1885502827.460455937.234656>
iex> :ets.insert(table, {"foo", self()})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.41.0>}]

:etsというモジュールを利用しているという事はErlangの標準ライブラリ的なものなのでしょうか。
:ets.new/2という関数でETSを作成するようです。一個目の引数はテーブル名、二個目の引数はテーブルの読み書きなどの設定をオプションとして渡せるようです。
今回設定した:setはキーの重複を許さない設定で、:protectedはテーブルを作成したプロセスのみ書き込み権限を渡し、それ以外のプロセスは読み出ししかできないようにする設定だそうです。
tabelという名称が使われているところからRDB的なものを感じます。

上のサンプルではtableという変数で受けましたが、名前でもアクセスできるそうです。

iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self()})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.41.0>}]

この章ではKV.RegistryをETSで再現してみるようです。
lib/kv/registry.exをETSを使うように変更します。

defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry with the given options.

  `:name` is always required.
  """
  def start_link(opts) do
    # 1. Pass the name to GenServer's init
    server = Keyword.fetch!(opts, :name)
    GenServer.start_link(__MODULE__, server, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    # 2. Lookup is now done directly in ETS, without accessing the server
    case :ets.lookup(server, name) do
      [{^name, pid}] -> {:ok, pid}
      [] -> :error
    end
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server callbacks

  def init(table) do
    # 3. We have replaced the names map by the ETS table
    names = :ets.new(table, [:named_table, read_concurrency: true])
    refs  = %{}
    {:ok, {names, refs}}
  end

  # 4. The previous handle_call callback for lookup was removed

  def handle_cast({:create, name}, {names, refs}) do
    # 5. Read and write to the ETS table instead of the map
    case lookup(names, name) do
      {:ok, _pid} ->
        {:noreply, {names, refs}}
      :error ->
        {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
        ref = Process.monitor(pid)
        refs = Map.put(refs, ref, name)
        :ets.insert(names, {name, pid})
        {:noreply, {names, refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
    # 6. Delete from the ETS table instead of the map
    {name, refs} = Map.pop(refs, ref)
    :ets.delete(names, name)
    {:noreply, {names, refs}}
  end

  def handle_info(_msg, state) do
    {:noreply, state}
  end
end

変更箇所がコメントアウトされております。

変更前はKV.Registry.lookup/2がサーバーへGenServer.call/3リクエストを送っていたのに対して、ETSを使用したものは直接呼び出せるのでサーバーとのやりとりが必要ありません。(内部実装的にはサーバーとのやりとりになってるのかもしれませんが・・・)
これはETSが全てのプロセスから参照できる実装になってるおかげだと思います。

今回ETSのテーブル作成時に:protectedのオプションを指定してませんが、これはデフォルトで指定されているようです。つまり、このETSテーブルはKV.Registryのプロセスからのみ変更可能です。
ただし他のプロセスも読み出す事はできます。
また、今回指定しているread_concurrency: trueですがこれは読み出し最適化オプションだそうです。ETSはメモリ上にデータをキャッシュする機能なので高速に動作するのですが、特に書き込みはあまりせず読み出しばかりするときにはこの設定をするようです。
詳しくはETSのドキュメントをどうぞ。

erlang.org

さて、KV.Registryを変更したことによりテストが通らなくなってると思います。名前いるようになったし。
その辺を修正していきます。test/kv/registry_test.exsを修正します。

  setup context do
    _ = start_supervised!({KV.Registry, name: context.test})
    %{registry: context.test}
  end

まずはsetup周辺から。KV.Registrycontext.testという名前で作成しています。(名前が必要になった為)
context.testというのはExUnit.CaseTagという機能らしくcontext[:hogehoge]という感じで:hogehogeというタグが登録できるそうです。
デフォルトで幾つかタグが登録されているようで:testもその一部。キーワードリストはアトムのみで構成されている場合はcontext.testでもアクセスできる、という流れです。

名前は付けたのでじゃあテスト!

PS > mix test
Compiling 2 files (.ex)
...

  1) test spawn buckets (KV.RegistryTest)
     test/kv/registry_test.exs:10
     match (=) failed
     code:  assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
     right: :error
     stacktrace:
       test/kv/registry_test.exs:14: (test)



  2) test removes bucket on crash (KV.RegistryTest)
     test/kv/registry_test.exs:27
     ** (MatchError) no match of right hand side value: :error
     code: {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
     stacktrace:
       test/kv/registry_test.exs:29: (test)



  3) test removes buckets on exit (KV.RegistryTest)
     test/kv/registry_test.exs:20
     ** (MatchError) no match of right hand side value: :error
     code: {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
     stacktrace:
       test/kv/registry_test.exs:22: (test)

..

Finished in 0.04 seconds
1 doctest, 7 tests, 3 failures

Randomized with seed 351000

ことごとく失敗しとるやん・・・
KV.RegistryTestの全てのテストに於いて

{:ok, bucket} = KV.Registry.lookup(registry, "shopping")

がマズいようです。
以下の二つがいけないそうです。

  1. 早まって(楽観的に)最適化してる(キャッシュレイヤーに追加すべき)
  2. cast/2を使ってる(call/3を使うべき)

最初の一個が謎です。
とりあえずサンプルのコードを読んでいきましょう。コードは嘘を吐かない。

Race conditions?(競合状態?)

どうやらテストでETSのテーブルが生成される時間差などが問題になっているようです。
ここでKV.Registryの生成などを見直します。

  1. KV.Registry.create(registry, "shopping")が実行される
  2. registryがBucketを生成してキャッシュテーブルが更新される
  3. KV.Registry.lookup(registry, "shopping")でテーブルへアクセスする
  4. 上記の返値として{:ok, bucket}が返ってくる

という流れのはずです。しかし思い出してください。KV.Registry.create/2cast/2なので非同期処理で行われます。なので以下のような流れてテストがうまくいってません。

  1. KV.Registry.create(registry, "shopping")が実行される
  2. キャッシュテーブルが更新される前に、KV.Registry.lookup(registry, "shopping")でテーブルへアクセスする
  3. 上記の返値として:errorが返ってくる
  4. registryがBucketを生成してキャッシュテーブルが更新される

順番が入れ替わっちゃってますね。むしろよく今までのテストが通ってたものです。今回は読み出しに特化して最適化かけたから?
これを避けるにはcast/2を利用していたAPIcall/3の同期処理に変更する必要があります。
lib/kv/registry.exを修正します。

def create(server, name) do
  GenServer.call(server, {:create, name})
end

def handle_call({:create, name}, _from, {names, refs}) do
  case lookup(names, name) do
    {:ok, pid} ->
      {:reply, pid, {names, refs}}
    :error ->
      {:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
      ref = Process.monitor(pid)
      refs = Map.put(refs, ref, name)
      :ets.insert(names, {name, pid})
      {:reply, pid, {names, refs}}
  end
end

handle_castがそのままレスポンス返すようになった感じです。
このようなエラーを鑑みると、あまりcast/2を使用するべきではないかもしれません。Elixirの開発者はどちらかといえばcallを使う傾向があるそうです。
とりあえず修正が完了したのでテストを回しますが、今回は--traceオプションを付けてテストするようです。
これは全てのテストを同期的に行うそうで、順次テストが回っていくかんじでしょうか。

PS > mix test --trace

KV.BucketTest
  * test are temporary workers (0.00ms)
  * test stores value by key (0.00ms)
  * test delete value by key (0.00ms)

KV.RegistryTest
  * test removes bucket on crash (15.0ms)

  1) test removes bucket on crash (KV.RegistryTest)
     test/kv/registry_test.exs:27
     Assertion with == failed
     code:  assert KV.Registry.lookup(registry, "shopping") == :error
     left:  {:ok, #PID<0.165.0>}
     right: :error
     stacktrace:
       test/kv/registry_test.exs:33: (test)

  * test spawn buckets (0.00ms)
  * test removes buckets on exit (0.00ms)

KVTest
  * test greets the world (0.00ms)
  * doctest KV.hello/0 (1) (0.00ms)


Finished in 0.04 seconds
1 doctest, 7 tests, 1 failure

Randomized with seed 249000

なんか失敗してる・・・
エラーメッセージをよく見ると

     left:  {:ok, #PID<0.165.0>}
     right: :error

あれ、Bucketがまだ生きてる( ´゚д゚`)
どうやらテストコードでAgent.stopする→Registryがhandle_info{:DOWN, ...}メッセージを受ける、という箇所が非同期処理で行われている事が問題のようです。
Agent.stop/2は同期処理だそうなので、handle_infoがコールバックされるのが非同期という事でしょうか。
同期処理を行うと、メッセージングあたりの処理は確実に実行されるらしいので、Agent.stop/2lookupの間に一回同期処理を挟みます。
テストコードを下記のように修正します。

  test "removes buckets on exit", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    Agent.stop(bucket)

    # Do a call to ensure the registry processed the DOWN message
    _ = KV.Registry.create(registry, "bogus") # 同期処理
    assert KV.Registry.lookup(registry, "shopping") == :error
  end

  test "removes bucket on crash", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # Stop the bucket with non-normal reason
    Agent.stop(bucket, :shutdown)

    # Do a call to ensure the registry processed the DOWN message
    _ = KV.Registry.create(registry, "bogus") # 同期処理
    assert KV.Registry.lookup(registry, "shopping") == :error
  end

特に意味の無い同期処理を一回挟んでます。
これで、もう一度mix test --trace

PS > mix test --trace

KV.BucketTest
  * test stores value by key (0.00ms)
  * test delete value by key (0.00ms)
  * test are temporary workers (0.00ms)

KV.RegistryTest
  * test spawn buckets (0.00ms)
  * test removes buckets on exit (0.00ms)
  * test removes bucket on crash (0.00ms)

KVTest
  * doctest KV.hello/0 (1) (0.00ms)
  * test greets the world (0.00ms)


Finished in 0.04 seconds
1 doctest, 7 tests, 0 failures

Randomized with seed 61000

とおった! これで大体よさそうです。

ただし先ほどまでのテストはRegistryがBucketからEXITシグナルをきちんと受けているかをチェックしたに過ぎず、BucketがきちんとEXITシグナルを出しているかどうかはまた別問題です。
なのでそのテストも一応書いておきましょう。

  test "bucket can crash at any time", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # Simulate a bucket crash by explicitly and synchronously shutting it down
    Agent.stop(bucket, :shutdown)

    # Now trying to call the dead process causes a :noproc exit
    catch_exit KV.Bucket.put(bucket, "milk", 3)
  end

catch_exit/1というのは{:exit, hogehoge}となっているかどうかをチェックするアサーションみたいで、内部的にはAgentはGenServerで実装されていたのでAgent.updateの処理がGenServer.call/3を呼び、Bucketのpidが動作していなければ:exitを返していくという処理っぽいです。
詳しくはAgentのupdate周りのソースなどをご覧下さい。

github.com

この章ではプロセスの並列読み込みや競合などを取り扱いましたが、Elixirの良さを引き出すにはなかなか重要な項目かもしれません。
最後のテストのパラグラフあたりも2018年の8月に更新されたばかりのようで新しい知識を集めていく努力が必要っぽいです。