proxMon/docs/superpowers/plans/2026-04-21-phase2-metrics-und-collectors.md
Carsten fe7b07db4f fix(server): only require DASHBOARD_PASSWORD_HASH in prod
Blocking bootstrap in dev meant you couldn't even run 'mix run' to
generate the initial hash. Now dev/test accept an optional env override
and boot without it; prod still raises when unset.
2026-04-21 22:59:24 +02:00

59 KiB

Phase 2 — Metrics Persistence + ZFS/VM/Storage Collectors

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Persist agent samples in SQLite with 48h retention, expose recent samples via a simple JSON route, and ship agent collectors for ZFS pools/datasets, Proxmox storage, VM/LXC runtime, and system info. End state: agent pushes rich samples on fast/medium/slow intervals; server stores them; GET /api/hosts/:name returns the latest data.

Architecture: Channel handlers delegate to a new Server.Metrics context that writes to a single metrics table with a JSON payload column. A Retention GenServer prunes old rows hourly. Agent gains a tiny command-runner abstraction so each external-command collector (zfs, pvesh, apt, pveversion) can be unit-tested with fixture output on macOS. Reporter schedules medium and slow collection alongside the existing fast path.

Tech Stack: Elixir/OTP, Phoenix, Ecto SQLite with JSON column support (%{...} stored as TEXT), ExUnit, fixture-based testing for external commands.


File Structure

server/
├── priv/repo/migrations/<ts>_create_metrics.exs              create
├── lib/server/schema/metric.ex                                create
├── lib/server/metrics.ex                                      create (context)
├── lib/server/retention.ex                                    create (GenServer)
├── lib/server/application.ex                                  modify: add Retention to children
├── lib/server_web/channels/host_channel.ex                    modify: call Metrics.record_sample
├── lib/server_web/controllers/host_controller.ex              create
├── lib/server_web/router.ex                                   modify: add /api/hosts/:name route
├── test/server/metrics_test.exs                               create
├── test/server/retention_test.exs                             create
├── test/server_web/channels/host_channel_test.exs             modify: assert DB write
└── test/server_web/controllers/host_controller_test.exs       create

agent/
├── lib/proxmox_agent/shell.ex                                 create (System.cmd wrapper)
├── lib/proxmox_agent/collectors/zfs.ex                        create
├── lib/proxmox_agent/collectors/storage.ex                    create
├── lib/proxmox_agent/collectors/vms.ex                        create
├── lib/proxmox_agent/collectors/system_info.ex                create
├── lib/proxmox_agent/reporter.ex                              modify: medium + slow handlers
├── test/proxmox_agent/shell_test.exs                          create
├── test/proxmox_agent/collectors/zfs_test.exs                 create
├── test/proxmox_agent/collectors/storage_test.exs             create
├── test/proxmox_agent/collectors/vms_test.exs                 create
├── test/proxmox_agent/collectors/system_info_test.exs         create
└── test/fixtures/                                             create
    ├── zfs/zpool_list.json
    ├── zfs/zpool_status.json
    ├── zfs/zfs_list.json
    ├── pvesh/storage.json
    ├── pvesh/qemu.json
    ├── pvesh/lxc.json
    ├── pvesh/qemu_N_status.json
    ├── pvesh/qemu_N_config.json
    ├── pvesh/qemu_N_agent_interfaces.json
    ├── system/pveversion.txt
    ├── system/zfs_version.txt
    └── system/apt_upgradable.txt

Shell abstraction rationale: Each external-command collector takes a :runner keyword option — a (cmd, args) -> {:ok, output} | {:error, term} function. Default is ProxmoxAgent.Shell.run/2 which wraps System.cmd. Tests inject a fake that returns fixture content. Same pattern as proc_dir: in the existing host collector — consistent, no new abstractions.

Sample payload contract (what the agent sends):

Event Data keys
metric:fast host, zfs_pools, storage, vms_runtime
metric:medium zfs_datasets, vms_detail
metric:slow system_info

Each key's value is a collector-produced map. errors inside each collector allows partial samples.


Task 1: Server — Metrics Schema & Context

Files:

  • Create: server/priv/repo/migrations/<ts>_create_metrics.exs

  • Create: server/lib/server/schema/metric.ex

  • Create: server/lib/server/metrics.ex

  • Create: server/test/server/metrics_test.exs

  • Step 1: Generate migration

cd /Users/cabele/claudeprojects/proxmox_monitor/server
mix ecto.gen.migration create_metrics

Replace the contents of the newly-created migration file with:

defmodule Server.Repo.Migrations.CreateMetrics do
  use Ecto.Migration

  def change do
    create table(:metrics) do
      add :host_id, references(:hosts, on_delete: :delete_all), null: false
      add :collected_at, :utc_datetime_usec, null: false
      add :interval_type, :string, null: false
      add :payload, :string, null: false

      timestamps(type: :utc_datetime_usec, updated_at: false)
    end

    create index(:metrics, [:host_id, :collected_at])
    create index(:metrics, [:collected_at])
  end
end

Note: SQLite has no native JSON column — use TEXT (:string). Ecto's JSON handling happens at the schema layer via the {:map, Jason} type.

  • Step 2: Write the Metric schema

Create server/lib/server/schema/metric.ex:

defmodule Server.Schema.Metric do
  use Ecto.Schema
  import Ecto.Changeset

  @intervals ~w(fast medium slow)

  schema "metrics" do
    belongs_to :host, Server.Schema.Host
    field :collected_at, :utc_datetime_usec
    field :interval_type, :string
    field :payload, :map

    timestamps(type: :utc_datetime_usec, updated_at: false)
  end

  def changeset(metric, attrs) do
    metric
    |> cast(attrs, [:host_id, :collected_at, :interval_type, :payload])
    |> validate_required([:host_id, :collected_at, :interval_type, :payload])
    |> validate_inclusion(:interval_type, @intervals)
    |> assoc_constraint(:host)
  end
end

ecto_sqlite3 serializes :map fields to JSON TEXT transparently — no extra config needed.

  • Step 3: Write failing tests for the Metrics context

Create server/test/server/metrics_test.exs:

defmodule Server.MetricsTest do
  use Server.DataCase, async: true

  alias Server.Metrics
  alias Server.Hosts

  setup do
    {:ok, {host, _token}} = Hosts.create_host("pve-01")
    %{host: host}
  end

  describe "record_sample/4" do
    test "inserts a metric row with the given payload", %{host: host} do
      ts = DateTime.utc_now()
      payload = %{"host" => %{"load1" => 0.5}}

      assert {:ok, metric} = Metrics.record_sample(host.id, "fast", ts, payload)
      assert metric.host_id == host.id
      assert metric.interval_type == "fast"
      assert metric.payload == payload
      assert metric.collected_at == ts
    end

    test "rejects unknown interval_type", %{host: host} do
      ts = DateTime.utc_now()
      assert {:error, cs} = Metrics.record_sample(host.id, "nope", ts, %{})
      assert %{interval_type: ["is invalid"]} = errors_on(cs)
    end

    test "rejects unknown host_id" do
      ts = DateTime.utc_now()
      assert {:error, cs} = Metrics.record_sample(999_999, "fast", ts, %{})
      assert %{host: ["does not exist"]} = errors_on(cs)
    end
  end

  describe "latest_sample/2" do
    test "returns most recent sample for a host and interval", %{host: host} do
      {:ok, _} = Metrics.record_sample(host.id, "fast", dt(-60), %{"v" => 1})
      {:ok, _} = Metrics.record_sample(host.id, "fast", dt(-30), %{"v" => 2})
      {:ok, _} = Metrics.record_sample(host.id, "fast", dt(-10), %{"v" => 3})

      assert %{payload: %{"v" => 3}} = Metrics.latest_sample(host.id, "fast")
    end

    test "returns nil when no samples exist", %{host: host} do
      assert Metrics.latest_sample(host.id, "fast") == nil
    end
  end

  describe "delete_older_than/1" do
    test "deletes samples with collected_at before the cutoff", %{host: host} do
      {:ok, _} = Metrics.record_sample(host.id, "fast", dt(-3600 * 50), %{"v" => "old"})
      {:ok, keep} = Metrics.record_sample(host.id, "fast", dt(-60), %{"v" => "fresh"})

      cutoff = DateTime.add(DateTime.utc_now(), -48 * 3600, :second)
      assert {1, nil} = Metrics.delete_older_than(cutoff)

      remaining = Server.Repo.all(Server.Schema.Metric)
      assert length(remaining) == 1
      assert hd(remaining).id == keep.id
    end
  end

  defp dt(offset_seconds), do: DateTime.add(DateTime.utc_now(), offset_seconds, :second)
end
  • Step 4: Run tests — expect compile failure (Server.Metrics not defined)
mix test test/server/metrics_test.exs 2>&1 | tail -10

Expected: module undefined errors.

  • Step 5: Implement the context

Create server/lib/server/metrics.ex:

defmodule Server.Metrics do
  @moduledoc "Metric sample storage and retrieval."

  import Ecto.Query

  alias Server.Repo
  alias Server.Schema.Metric

  @spec record_sample(integer(), String.t(), DateTime.t(), map()) ::
          {:ok, Metric.t()} | {:error, Ecto.Changeset.t()}
  def record_sample(host_id, interval_type, collected_at, payload) do
    %Metric{}
    |> Metric.changeset(%{
      host_id: host_id,
      interval_type: interval_type,
      collected_at: collected_at,
      payload: payload
    })
    |> Repo.insert()
  end

  @spec latest_sample(integer(), String.t()) :: Metric.t() | nil
  def latest_sample(host_id, interval_type) do
    from(m in Metric,
      where: m.host_id == ^host_id and m.interval_type == ^interval_type,
      order_by: [desc: m.collected_at],
      limit: 1
    )
    |> Repo.one()
  end

  @spec delete_older_than(DateTime.t()) :: {non_neg_integer(), nil}
  def delete_older_than(%DateTime{} = cutoff) do
    from(m in Metric, where: m.collected_at < ^cutoff)
    |> Repo.delete_all()
  end
end
  • Step 6: Run the migration and tests
mix ecto.migrate && mix test test/server/metrics_test.exs 2>&1 | tail -6

Expected: 6 tests pass.

  • Step 7: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add server/priv/repo/migrations server/lib/server/schema/metric.ex server/lib/server/metrics.ex server/test/server/metrics_test.exs
git commit -m "feat(server): metrics schema + context with record/latest/prune"

Task 2: Server — Channel Writes Samples

Files:

  • Modify: server/lib/server_web/channels/host_channel.ex

  • Modify: server/test/server_web/channels/host_channel_test.exs

  • Step 1: Extend existing tests to assert DB writes

Open server/test/server_web/channels/host_channel_test.exs and replace the describe "metric:fast event" do ... end block with:

  describe "metric events persist to DB" do
    setup %{token: token, host: host} do
      {:ok, socket} = connect(AgentSocket, %{})

      {:ok, _reply, joined} =
        subscribe_and_join(socket, "host:pve-01", %{
          "token" => token,
          "agent_version" => "0.1.0"
        })

      %{socket: joined, host: host}
    end

    test "metric:fast is stored with interval=fast", %{socket: socket, host: host} do
      ts = "2026-04-21T12:00:00.123456Z"

      ref =
        push(socket, "metric:fast", %{
          "collected_at" => ts,
          "data" => %{"cpu_percent" => 12.3, "load1" => 0.2}
        })

      assert_reply ref, :ok

      sample = Server.Metrics.latest_sample(host.id, "fast")
      assert sample != nil
      assert sample.payload == %{"cpu_percent" => 12.3, "load1" => 0.2}
      {:ok, expected, _} = DateTime.from_iso8601(ts)
      assert DateTime.compare(sample.collected_at, expected) == :eq
    end

    test "metric:medium is stored with interval=medium", %{socket: socket, host: host} do
      ref =
        push(socket, "metric:medium", %{
          "collected_at" => "2026-04-21T12:05:00Z",
          "data" => %{"vms_detail" => []}
        })

      assert_reply ref, :ok

      sample = Server.Metrics.latest_sample(host.id, "medium")
      assert sample != nil
      assert sample.payload == %{"vms_detail" => []}
    end

    test "metric:slow is stored with interval=slow", %{socket: socket, host: host} do
      ref =
        push(socket, "metric:slow", %{
          "collected_at" => "2026-04-21T12:30:00Z",
          "data" => %{"system_info" => %{"pveversion" => "8.3.0"}}
        })

      assert_reply ref, :ok

      sample = Server.Metrics.latest_sample(host.id, "slow")
      assert sample != nil
      assert sample.payload == %{"system_info" => %{"pveversion" => "8.3.0"}}
    end

    test "replies :error when collected_at is missing", %{socket: socket} do
      ref = push(socket, "metric:fast", %{"data" => %{}})
      assert_reply ref, :error, %{reason: "missing_collected_at"}
    end

    test "replies :error when data is missing", %{socket: socket} do
      ref = push(socket, "metric:fast", %{"collected_at" => "2026-04-21T12:00:00Z"})
      assert_reply ref, :error, %{reason: "missing_data"}
    end
  end
  • Step 2: Run tests — expect failure
mix test test/server_web/channels/host_channel_test.exs 2>&1 | tail -10

Expected: tests in the new describe block fail because the channel doesn't persist yet.

  • Step 3: Update HostChannel to persist

Replace the three handle_in("metric:..."/2, ...) clauses in server/lib/server_web/channels/host_channel.ex with:

  @impl true
  def handle_in("metric:" <> kind, payload, socket) when kind in ~w(fast medium slow) do
    with {:ok, collected_at} <- parse_collected_at(payload),
         {:ok, data} <- parse_data(payload),
         {:ok, _} <- Server.Metrics.record_sample(socket.assigns.host_id, kind, collected_at, data) do
      Logger.debug("stored #{kind} sample host=#{socket.assigns.host_name}")
      {:reply, :ok, socket}
    else
      {:error, reason} when is_binary(reason) ->
        Logger.warning("metric:#{kind} rejected host=#{socket.assigns.host_name} reason=#{reason}")
        {:reply, {:error, %{reason: reason}}, socket}

      {:error, %Ecto.Changeset{} = cs} ->
        Logger.warning("metric:#{kind} changeset failed: #{inspect(cs.errors)}")
        {:reply, {:error, %{reason: "invalid_payload"}}, socket}
    end
  end

  defp parse_collected_at(%{"collected_at" => ts}) when is_binary(ts) do
    case DateTime.from_iso8601(ts) do
      {:ok, dt, _} -> {:ok, dt}
      _ -> {:error, "invalid_collected_at"}
    end
  end

  defp parse_collected_at(_), do: {:error, "missing_collected_at"}

  defp parse_data(%{"data" => data}) when is_map(data), do: {:ok, data}
  defp parse_data(_), do: {:error, "missing_data"}
  • Step 4: Run tests — expect pass
mix test test/server_web/channels/host_channel_test.exs 2>&1 | tail -5

Expected: all channel tests pass.

  • Step 5: Run full server suite
mix test 2>&1 | tail -4

Expected: all green.

  • Step 6: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add server/lib/server_web/channels/host_channel.ex server/test/server_web/channels/host_channel_test.exs
git commit -m "feat(server): channel persists fast/medium/slow samples to metrics table"

Task 3: Server — Retention GenServer

Files:

  • Create: server/lib/server/retention.ex

  • Create: server/test/server/retention_test.exs

  • Modify: server/lib/server/application.ex

  • Step 1: Write failing test

Create server/test/server/retention_test.exs:

defmodule Server.RetentionTest do
  use Server.DataCase, async: false

  alias Server.{Hosts, Metrics, Retention}

  test "prune_now/1 deletes samples older than the retention window" do
    {:ok, {host, _}} = Hosts.create_host("pve-01")
    stale_at = DateTime.add(DateTime.utc_now(), -49 * 3600, :second)
    fresh_at = DateTime.add(DateTime.utc_now(), -60, :second)

    {:ok, _} = Metrics.record_sample(host.id, "fast", stale_at, %{"x" => 1})
    {:ok, fresh} = Metrics.record_sample(host.id, "fast", fresh_at, %{"x" => 2})

    {deleted, _} = Retention.prune_now(48 * 3600)

    assert deleted == 1
    remaining = Server.Repo.all(Server.Schema.Metric)
    assert length(remaining) == 1
    assert hd(remaining).id == fresh.id
  end
end
  • Step 2: Run test — expect failure
mix test test/server/retention_test.exs 2>&1 | tail -5

Expected: Server.Retention undefined.

  • Step 3: Implement the GenServer

Create server/lib/server/retention.ex:

defmodule Server.Retention do
  @moduledoc "Deletes metric samples older than the retention window. Runs hourly."

  use GenServer
  require Logger

  @default_retention_seconds 48 * 60 * 60
  @default_interval_ms 60 * 60 * 1_000

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @doc "Synchronous prune used by tests and manual ops."
  def prune_now(retention_seconds \\ @default_retention_seconds) do
    cutoff = DateTime.add(DateTime.utc_now(), -retention_seconds, :second)
    Server.Metrics.delete_older_than(cutoff)
  end

  @impl true
  def init(opts) do
    retention_seconds = Keyword.get(opts, :retention_seconds, @default_retention_seconds)
    interval_ms = Keyword.get(opts, :interval_ms, @default_interval_ms)
    state = %{retention_seconds: retention_seconds, interval_ms: interval_ms}
    Process.send_after(self(), :prune, interval_ms)
    {:ok, state}
  end

  @impl true
  def handle_info(:prune, state) do
    {count, _} = prune_now(state.retention_seconds)
    if count > 0, do: Logger.info("retention: pruned #{count} stale samples")
    Process.send_after(self(), :prune, state.interval_ms)
    {:noreply, state}
  end
end
  • Step 4: Run test — expect pass
mix test test/server/retention_test.exs 2>&1 | tail -5

Expected: 1 test passes.

  • Step 5: Add Retention to the supervision tree

In server/lib/server/application.ex, find the children list and add Server.Retention after {Phoenix.PubSub, name: Server.PubSub}:

      {Phoenix.PubSub, name: Server.PubSub},
      Server.Retention,
      # Start a worker by calling: Server.Worker.start_link(arg)
  • Step 6: Run full test suite
mix test 2>&1 | tail -4

Expected: all green.

  • Step 7: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add server/lib/server/retention.ex server/test/server/retention_test.exs server/lib/server/application.ex
git commit -m "feat(server): retention GenServer prunes samples older than 48h hourly"

Task 4: Server — Simple JSON Route

Files:

  • Create: server/lib/server_web/controllers/host_controller.ex

  • Create: server/test/server_web/controllers/host_controller_test.exs

  • Modify: server/lib/server_web/router.ex

  • Step 1: Write failing test

Create server/test/server_web/controllers/host_controller_test.exs:

defmodule ServerWeb.HostControllerTest do
  use ServerWeb.ConnCase, async: true

  alias Server.{Hosts, Metrics}

  describe "GET /api/hosts/:name" do
    setup do
      {:ok, {host, _token}} = Hosts.create_host("pve-01")

      {:ok, _} =
        Metrics.record_sample(
          host.id,
          "fast",
          DateTime.utc_now(),
          %{"host" => %{"load1" => 0.5}}
        )

      %{host: host}
    end

    test "returns host info and latest samples", %{conn: conn, host: host} do
      conn = get(conn, ~p"/api/hosts/#{host.name}")

      assert %{
               "name" => "pve-01",
               "status" => _,
               "samples" => %{"fast" => %{"payload" => %{"host" => %{"load1" => 0.5}}}}
             } = json_response(conn, 200)
    end

    test "returns 404 for unknown host", %{conn: conn} do
      conn = get(conn, ~p"/api/hosts/nope")
      assert json_response(conn, 404) == %{"error" => "host_not_found"}
    end
  end
end
  • Step 2: Add route

In server/lib/server_web/router.ex, add after the existing scope "/" block:

  scope "/api", ServerWeb do
    pipe_through :api

    get "/hosts/:name", HostController, :show
  end

Verify there is already a :api pipeline above; in Phoenix 1.7 scaffold it exists. If not, add:

  pipeline :api do
    plug :accepts, ["json"]
  end

(Check the file first; add only if missing.)

  • Step 3: Run test — expect failure (controller missing)
mix test test/server_web/controllers/host_controller_test.exs 2>&1 | tail -8

Expected: undefined ServerWeb.HostController.

  • Step 4: Implement controller

Create server/lib/server_web/controllers/host_controller.ex:

defmodule ServerWeb.HostController do
  use ServerWeb, :controller

  alias Server.{Metrics, Repo, Schema.Host}

  def show(conn, %{"name" => name}) do
    case Repo.get_by(Host, name: name) do
      nil ->
        conn
        |> put_status(:not_found)
        |> json(%{error: "host_not_found"})

      %Host{} = host ->
        samples =
          for interval <- ~w(fast medium slow),
              sample = Metrics.latest_sample(host.id, interval),
              into: %{} do
            {interval, %{collected_at: sample.collected_at, payload: sample.payload}}
          end

        json(conn, %{
          name: host.name,
          status: host.status,
          agent_version: host.agent_version,
          last_seen_at: host.last_seen_at,
          samples: samples
        })
    end
  end
end
  • Step 5: Run test — expect pass
mix test test/server_web/controllers/host_controller_test.exs 2>&1 | tail -5

Expected: 2 tests pass.

  • Step 6: Run full test suite
mix test 2>&1 | tail -4

Expected: all green.

  • Step 7: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add server/lib/server_web/controllers/host_controller.ex server/lib/server_web/router.ex server/test/server_web/controllers/host_controller_test.exs
git commit -m "feat(server): GET /api/hosts/:name returns latest fast/medium/slow samples"

Task 5: Agent — Shell Runner

Files:

  • Create: agent/lib/proxmox_agent/shell.ex

  • Create: agent/test/proxmox_agent/shell_test.exs

  • Step 1: Write failing test

Create agent/test/proxmox_agent/shell_test.exs:

defmodule ProxmoxAgent.ShellTest do
  use ExUnit.Case, async: true

  alias ProxmoxAgent.Shell

  test "run/2 returns {:ok, output} on zero exit" do
    assert {:ok, output} = Shell.run("/bin/echo", ["hello"])
    assert String.trim(output) == "hello"
  end

  test "run/2 returns {:error, {:nonzero_exit, code, output}} on non-zero exit" do
    assert {:error, {:nonzero_exit, code, _}} = Shell.run("/bin/sh", ["-c", "exit 7"])
    assert code == 7
  end

  test "run/2 returns {:error, {:enoent, _}} when binary is missing" do
    assert {:error, {:enoent, _}} = Shell.run("/does/not/exist/nope", [])
  end
end
  • Step 2: Run test — expect failure
cd /Users/cabele/claudeprojects/proxmox_monitor/agent
mix test test/proxmox_agent/shell_test.exs 2>&1 | tail -5

Expected: module undefined.

  • Step 3: Implement Shell

Create agent/lib/proxmox_agent/shell.ex:

defmodule ProxmoxAgent.Shell do
  @moduledoc """
  Thin wrapper over System.cmd for testability. Collectors accept an optional
  :runner function of this shape so tests can inject fixture-backed fakes.
  """

  @type result :: {:ok, String.t()} | {:error, term()}

  @spec run(String.t(), [String.t()]) :: result
  def run(command, args) do
    try do
      case System.cmd(command, args, stderr_to_stdout: true) do
        {output, 0} -> {:ok, output}
        {output, code} -> {:error, {:nonzero_exit, code, output}}
      end
    rescue
      e in ErlangError ->
        case e.original do
          :enoent -> {:error, {:enoent, command}}
          other -> {:error, {:system_error, other}}
        end
    end
  end
end
  • Step 4: Run test — expect pass
mix test test/proxmox_agent/shell_test.exs 2>&1 | tail -5

Expected: 3 tests pass.

  • Step 5: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/shell.ex agent/test/proxmox_agent/shell_test.exs
git commit -m "feat(agent): Shell.run wrapper for testable external commands"

Task 6: Agent — ZFS Collector

Files:

  • Create: agent/test/fixtures/zfs/zpool_list.json

  • Create: agent/test/fixtures/zfs/zpool_status.json

  • Create: agent/test/fixtures/zfs/zfs_list.json

  • Create: agent/test/proxmox_agent/collectors/zfs_test.exs

  • Create: agent/lib/proxmox_agent/collectors/zfs.ex

  • Step 1: Write fixture JSON

Create agent/test/fixtures/zfs/zpool_list.json:

{
  "output_version": { "command": "zpool list", "vers_major": 0, "vers_minor": 1 },
  "pools": {
    "rpool": {
      "name": "rpool",
      "size": 500000000000,
      "alloc": 200000000000,
      "free": 300000000000,
      "frag": 17,
      "cap": 40,
      "health": "ONLINE"
    },
    "tank": {
      "name": "tank",
      "size": 8000000000000,
      "alloc": 6000000000000,
      "free": 2000000000000,
      "frag": 55,
      "cap": 75,
      "health": "DEGRADED"
    }
  }
}

Create agent/test/fixtures/zfs/zpool_status.json:

{
  "output_version": { "command": "zpool status", "vers_major": 0, "vers_minor": 1 },
  "pools": {
    "rpool": {
      "name": "rpool",
      "state": "ONLINE",
      "scan": {
        "function": "scrub",
        "state": "FINISHED",
        "end_time": "Sat Apr 19 02:00:00 2026"
      },
      "error_count": "0",
      "vdevs": {
        "mirror-0": {
          "name": "mirror-0",
          "vdev_type": "mirror",
          "state": "ONLINE",
          "read_errors": "0",
          "write_errors": "0",
          "checksum_errors": "0"
        }
      }
    },
    "tank": {
      "name": "tank",
      "state": "DEGRADED",
      "scan": {
        "function": "scrub",
        "state": "FINISHED",
        "end_time": "Tue Mar 01 08:00:00 2026"
      },
      "error_count": "2",
      "vdevs": {
        "raidz2-0": {
          "name": "raidz2-0",
          "vdev_type": "raidz2",
          "state": "DEGRADED",
          "read_errors": "0",
          "write_errors": "0",
          "checksum_errors": "2"
        }
      }
    }
  }
}

Create agent/test/fixtures/zfs/zfs_list.json:

{
  "output_version": { "command": "zfs list", "vers_major": 0, "vers_minor": 1 },
  "datasets": {
    "rpool": {
      "name": "rpool",
      "type": "FILESYSTEM",
      "properties": {
        "used": { "value": "200000000000" },
        "available": { "value": "300000000000" },
        "usedbysnapshots": { "value": "5000000000" }
      }
    },
    "rpool/data": {
      "name": "rpool/data",
      "type": "FILESYSTEM",
      "properties": {
        "used": { "value": "100000000000" },
        "available": { "value": "300000000000" },
        "usedbysnapshots": { "value": "2000000000" }
      }
    },
    "rpool/data@daily-2026-04-20": {
      "name": "rpool/data@daily-2026-04-20",
      "type": "SNAPSHOT",
      "properties": {
        "creation": { "value": "1745107200" }
      }
    },
    "rpool/data@daily-2026-04-21": {
      "name": "rpool/data@daily-2026-04-21",
      "type": "SNAPSHOT",
      "properties": {
        "creation": { "value": "1745193600" }
      }
    }
  }
}

The Unix timestamps 1745107200 and 1745193600 correspond to 2025-04-20 and 2025-04-21 — plausible ages for tests.

  • Step 2: Write failing tests

Create agent/test/proxmox_agent/collectors/zfs_test.exs:

defmodule ProxmoxAgent.Collectors.ZfsTest do
  use ExUnit.Case, async: true

  alias ProxmoxAgent.Collectors.Zfs

  @fixtures Path.expand("../../fixtures/zfs", __DIR__)

  defp fake_runner do
    fn
      "zpool", ["list", "-j", "--json-int"] ->
        {:ok, File.read!(Path.join(@fixtures, "zpool_list.json"))}

      "zpool", ["status", "-j", "--json-flat-vdevs", "--json-int"] ->
        {:ok, File.read!(Path.join(@fixtures, "zpool_status.json"))}

      "zfs", ["list", "-j", "--json-int", "-t", "all"] ->
        {:ok, File.read!(Path.join(@fixtures, "zfs_list.json"))}
    end
  end

  describe "collect_pools/1" do
    test "returns a summary per pool" do
      sample = Zfs.collect_pools(runner: fake_runner())
      assert is_list(sample.pools)
      assert length(sample.pools) == 2
      rpool = Enum.find(sample.pools, &(&1.name == "rpool"))
      tank = Enum.find(sample.pools, &(&1.name == "tank"))

      assert rpool.health == "ONLINE"
      assert rpool.capacity_percent == 40
      assert rpool.fragmentation_percent == 17
      assert rpool.size_bytes == 500_000_000_000
      assert rpool.error_count == 0
      assert rpool.degraded_vdev_count == 0

      assert tank.health == "DEGRADED"
      assert tank.error_count == 2
      assert tank.degraded_vdev_count == 1
    end

    test "populates errors list when zpool fails" do
      failing = fn _, _ -> {:error, {:enoent, "zpool"}} end
      sample = Zfs.collect_pools(runner: failing)
      assert sample.pools == []
      assert length(sample.errors) >= 1
    end
  end

  describe "collect_datasets/1" do
    test "returns datasets and per-dataset snapshot summary" do
      sample = Zfs.collect_datasets(runner: fake_runner())
      assert length(sample.datasets) == 2

      rpool_data = Enum.find(sample.datasets, &(&1.name == "rpool/data"))
      assert rpool_data.used_bytes == 100_000_000_000
      assert rpool_data.usedbysnapshots_bytes == 2_000_000_000
      assert rpool_data.snapshot_count == 2
      assert rpool_data.newest_snapshot_unix == 1_745_193_600
      assert rpool_data.oldest_snapshot_unix == 1_745_107_200
    end
  end
end
  • Step 3: Run tests — expect failure
mix test test/proxmox_agent/collectors/zfs_test.exs 2>&1 | tail -10

Expected: ProxmoxAgent.Collectors.Zfs undefined.

  • Step 4: Implement Zfs collector

Create agent/lib/proxmox_agent/collectors/zfs.ex:

defmodule ProxmoxAgent.Collectors.Zfs do
  @moduledoc """
  Collects ZFS pool health (fast path) and dataset/snapshot info (medium path).
  Delegates shelling out to an injectable runner so tests can supply fixtures.
  """

  @type pool_summary :: %{
          name: String.t(),
          health: String.t(),
          size_bytes: non_neg_integer(),
          allocated_bytes: non_neg_integer(),
          free_bytes: non_neg_integer(),
          fragmentation_percent: non_neg_integer(),
          capacity_percent: non_neg_integer(),
          error_count: non_neg_integer(),
          vdev_count: non_neg_integer(),
          degraded_vdev_count: non_neg_integer(),
          last_scrub_end: String.t() | nil
        }

  @spec collect_pools(keyword()) :: %{pools: [pool_summary()], errors: [map()]}
  def collect_pools(opts \\ []) do
    runner = runner(opts)

    {list_result, list_err} = decode(runner.("zpool", ["list", "-j", "--json-int"]), :zpool_list)

    {status_result, status_err} =
      decode(runner.("zpool", ["status", "-j", "--json-flat-vdevs", "--json-int"]), :zpool_status)

    pools = merge_pools(list_result, status_result)
    errors = Enum.filter([list_err, status_err], & &1)
    %{pools: pools, errors: errors}
  end

  @type dataset_summary :: %{
          name: String.t(),
          used_bytes: non_neg_integer(),
          usedbysnapshots_bytes: non_neg_integer(),
          snapshot_count: non_neg_integer(),
          newest_snapshot_unix: non_neg_integer() | nil,
          oldest_snapshot_unix: non_neg_integer() | nil
        }

  @spec collect_datasets(keyword()) :: %{datasets: [dataset_summary()], errors: [map()]}
  def collect_datasets(opts \\ []) do
    runner = runner(opts)

    {list_result, err} =
      decode(runner.("zfs", ["list", "-j", "--json-int", "-t", "all"]), :zfs_list)

    datasets = summarize_datasets(list_result)
    errors = if err, do: [err], else: []
    %{datasets: datasets, errors: errors}
  end

  defp runner(opts), do: Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2)

  defp decode({:ok, body}, _tag) do
    case Jason.decode(body) do
      {:ok, map} -> {map, nil}
      {:error, e} -> {nil, %{tag: "decode", message: Exception.message(e)}}
    end
  end

  defp decode({:error, reason}, tag), do: {nil, %{tag: Atom.to_string(tag), message: inspect(reason)}}

  defp merge_pools(nil, _), do: []
  defp merge_pools(_, nil), do: []

  defp merge_pools(%{"pools" => list_pools}, %{"pools" => status_pools}) do
    Enum.map(list_pools, fn {name, list_info} ->
      status_info = Map.get(status_pools, name, %{})
      vdevs = Map.get(status_info, "vdevs", %{}) |> Map.values()

      %{
        name: name,
        health: Map.get(list_info, "health"),
        size_bytes: Map.get(list_info, "size", 0),
        allocated_bytes: Map.get(list_info, "alloc", 0),
        free_bytes: Map.get(list_info, "free", 0),
        fragmentation_percent: Map.get(list_info, "frag", 0),
        capacity_percent: Map.get(list_info, "cap", 0),
        error_count: to_int(Map.get(status_info, "error_count", "0")),
        vdev_count: length(vdevs),
        degraded_vdev_count: Enum.count(vdevs, &(&1["state"] != "ONLINE")),
        last_scrub_end: get_in(status_info, ["scan", "end_time"])
      }
    end)
  end

  defp summarize_datasets(nil), do: []

  defp summarize_datasets(%{"datasets" => datasets}) do
    by_type = Enum.group_by(datasets, fn {_, d} -> d["type"] end)

    filesystems = Map.get(by_type, "FILESYSTEM", [])
    snapshots_by_ds = group_snapshots(Map.get(by_type, "SNAPSHOT", []))

    Enum.map(filesystems, fn {_name, ds} ->
      name = ds["name"]
      snaps = Map.get(snapshots_by_ds, name, [])

      %{
        name: name,
        used_bytes: get_prop_int(ds, "used"),
        usedbysnapshots_bytes: get_prop_int(ds, "usedbysnapshots"),
        snapshot_count: length(snaps),
        newest_snapshot_unix: snaps |> Enum.map(& &1.creation) |> max_or_nil(),
        oldest_snapshot_unix: snaps |> Enum.map(& &1.creation) |> min_or_nil()
      }
    end)
  end

  defp group_snapshots(snapshots) do
    Enum.reduce(snapshots, %{}, fn {_, snap}, acc ->
      [parent | _] = String.split(snap["name"], "@", parts: 2)
      creation = get_prop_int(snap, "creation")
      entry = %{name: snap["name"], creation: creation}
      Map.update(acc, parent, [entry], &[entry | &1])
    end)
  end

  defp get_prop_int(ds, key) do
    case get_in(ds, ["properties", key, "value"]) do
      nil -> 0
      v -> to_int(v)
    end
  end

  defp to_int(v) when is_integer(v), do: v
  defp to_int(v) when is_binary(v), do: String.to_integer(v)

  defp max_or_nil([]), do: nil
  defp max_or_nil(list), do: Enum.max(list)
  defp min_or_nil([]), do: nil
  defp min_or_nil(list), do: Enum.min(list)
end
  • Step 5: Run tests — expect pass
mix test test/proxmox_agent/collectors/zfs_test.exs 2>&1 | tail -5

Expected: 3 tests pass.

  • Step 6: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/collectors/zfs.ex agent/test/proxmox_agent/collectors/zfs_test.exs agent/test/fixtures/zfs
git commit -m "feat(agent): zfs collector for pools + datasets/snapshots with fixture tests"

Task 7: Agent — Proxmox Storage Collector

Files:

  • Create: agent/test/fixtures/pvesh/storage.json

  • Create: agent/test/proxmox_agent/collectors/storage_test.exs

  • Create: agent/lib/proxmox_agent/collectors/storage.ex

  • Step 1: Fixture

Create agent/test/fixtures/pvesh/storage.json:

[
  {
    "storage": "local",
    "type": "dir",
    "content": "backup,iso,vztmpl",
    "active": 1,
    "enabled": 1,
    "used": 50000000000,
    "total": 500000000000,
    "avail": 450000000000,
    "used_fraction": 0.1
  },
  {
    "storage": "local-zfs",
    "type": "zfspool",
    "content": "images,rootdir",
    "active": 1,
    "enabled": 1,
    "used": 200000000000,
    "total": 500000000000,
    "avail": 300000000000,
    "used_fraction": 0.4
  },
  {
    "storage": "backup-nfs",
    "type": "nfs",
    "content": "backup",
    "active": 0,
    "enabled": 1,
    "used": 0,
    "total": 0,
    "avail": 0,
    "used_fraction": 0.0
  }
]
  • Step 2: Write failing test

Create agent/test/proxmox_agent/collectors/storage_test.exs:

defmodule ProxmoxAgent.Collectors.StorageTest do
  use ExUnit.Case, async: true

  alias ProxmoxAgent.Collectors.Storage

  @fixtures Path.expand("../../fixtures/pvesh", __DIR__)

  defp fake_runner do
    fn
      "pvesh", ["get", "/nodes/" <> _, "--output-format", "json"] ->
        {:ok, File.read!(Path.join(@fixtures, "storage.json"))}
    end
  end

  test "returns one summary per storage entry" do
    sample = Storage.collect(node: "pve-01", runner: fake_runner())

    assert length(sample.storages) == 3
    local = Enum.find(sample.storages, &(&1.name == "local"))
    assert local.type == "dir"
    assert local.active == true
    assert local.used_bytes == 50_000_000_000
    assert local.total_bytes == 500_000_000_000
    assert local.content == "backup,iso,vztmpl"

    nfs = Enum.find(sample.storages, &(&1.name == "backup-nfs"))
    assert nfs.active == false
  end

  test "populates errors on failure" do
    failing = fn _, _ -> {:error, {:enoent, "pvesh"}} end
    sample = Storage.collect(node: "pve-01", runner: failing)
    assert sample.storages == []
    assert sample.errors != []
  end
end
  • Step 3: Run test — expect failure
mix test test/proxmox_agent/collectors/storage_test.exs 2>&1 | tail -5

Expected: module undefined.

  • Step 4: Implement

Create agent/lib/proxmox_agent/collectors/storage.ex:

defmodule ProxmoxAgent.Collectors.Storage do
  @moduledoc "Collects Proxmox storage summary via pvesh."

  @spec collect(keyword()) :: %{storages: [map()], errors: [map()]}
  def collect(opts \\ []) do
    runner = Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2)
    node = Keyword.fetch!(opts, :node)

    case runner.("pvesh", ["get", "/nodes/#{node}/storage", "--output-format", "json"]) do
      {:ok, body} ->
        case Jason.decode(body) do
          {:ok, list} when is_list(list) ->
            %{storages: Enum.map(list, &normalize/1), errors: []}

          {:error, e} ->
            %{storages: [], errors: [%{tag: "decode", message: Exception.message(e)}]}
        end

      {:error, reason} ->
        %{storages: [], errors: [%{tag: "pvesh", message: inspect(reason)}]}
    end
  end

  defp normalize(entry) do
    %{
      name: entry["storage"],
      type: entry["type"],
      content: entry["content"],
      active: entry["active"] == 1,
      enabled: entry["enabled"] == 1,
      used_bytes: entry["used"] || 0,
      total_bytes: entry["total"] || 0,
      avail_bytes: entry["avail"] || 0,
      used_fraction: entry["used_fraction"] || 0.0
    }
  end
end
  • Step 5: Run test — expect pass
mix test test/proxmox_agent/collectors/storage_test.exs 2>&1 | tail -5

Expected: 2 tests pass.

  • Step 6: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/collectors/storage.ex agent/test/proxmox_agent/collectors/storage_test.exs agent/test/fixtures/pvesh/storage.json
git commit -m "feat(agent): pvesh storage collector"

Task 8: Agent — VM/LXC Collector

Files:

  • Create: agent/test/fixtures/pvesh/qemu.json

  • Create: agent/test/fixtures/pvesh/lxc.json

  • Create: agent/test/fixtures/pvesh/qemu_100_config.json

  • Create: agent/test/fixtures/pvesh/qemu_100_agent_interfaces.json

  • Create: agent/test/proxmox_agent/collectors/vms_test.exs

  • Create: agent/lib/proxmox_agent/collectors/vms.ex

  • Step 1: Fixtures

Create agent/test/fixtures/pvesh/qemu.json:

[
  {
    "vmid": 100,
    "name": "nginx-proxy",
    "status": "running",
    "uptime": 86400,
    "cpu": 0.05,
    "mem": 536870912,
    "maxmem": 2147483648,
    "tags": "web;production"
  },
  {
    "vmid": 101,
    "name": "db-backup",
    "status": "stopped",
    "uptime": 0,
    "cpu": 0,
    "mem": 0,
    "maxmem": 4294967296,
    "tags": "db"
  }
]

Create agent/test/fixtures/pvesh/lxc.json:

[
  {
    "vmid": 200,
    "name": "minecraft",
    "status": "running",
    "uptime": 3600,
    "cpu": 0.15,
    "mem": 2147483648,
    "maxmem": 4294967296,
    "tags": ""
  }
]

Create agent/test/fixtures/pvesh/qemu_100_config.json:

{
  "name": "nginx-proxy",
  "cores": 2,
  "memory": 2048,
  "onboot": 1,
  "scsi0": "local-zfs:vm-100-disk-0,size=32G",
  "net0": "virtio=AA:BB:CC:DD:EE:FF,bridge=vmbr0"
}

Create agent/test/fixtures/pvesh/qemu_100_agent_interfaces.json:

{
  "result": [
    {
      "name": "lo",
      "ip-addresses": [
        { "ip-address": "127.0.0.1", "ip-address-type": "ipv4" }
      ]
    },
    {
      "name": "eth0",
      "ip-addresses": [
        { "ip-address": "192.168.1.10", "ip-address-type": "ipv4" },
        { "ip-address": "fe80::a", "ip-address-type": "ipv6" }
      ]
    }
  ]
}
  • Step 2: Write failing tests

Create agent/test/proxmox_agent/collectors/vms_test.exs:

defmodule ProxmoxAgent.Collectors.VmsTest do
  use ExUnit.Case, async: true

  alias ProxmoxAgent.Collectors.Vms

  @fixtures Path.expand("../../fixtures/pvesh", __DIR__)

  defp read!(name), do: File.read!(Path.join(@fixtures, name))

  defp fake_runner do
    fn
      "pvesh", ["get", "/nodes/" <> rest, "--output-format", "json"] ->
        cond do
          String.ends_with?(rest, "/qemu") ->
            {:ok, read!("qemu.json")}

          String.ends_with?(rest, "/lxc") ->
            {:ok, read!("lxc.json")}

          String.ends_with?(rest, "/qemu/100/config") ->
            {:ok, read!("qemu_100_config.json")}

          String.ends_with?(rest, "/qemu/100/agent/network-get-interfaces") ->
            {:ok, read!("qemu_100_agent_interfaces.json")}

          String.ends_with?(rest, "/qemu/101/config") ->
            {:ok, ~s({"name":"db-backup","cores":4,"memory":4096})}

          String.ends_with?(rest, "/qemu/101/agent/network-get-interfaces") ->
            {:error, {:nonzero_exit, 1, "QEMU guest agent is not running"}}

          String.ends_with?(rest, "/lxc/200/config") ->
            {:ok, ~s({"hostname":"minecraft","cores":4,"memory":4096,"net0":"name=eth0,ip=10.0.0.5/24"})}
        end
    end
  end

  describe "collect_runtime/1" do
    test "returns qemu + lxc runtime info" do
      sample = Vms.collect_runtime(node: "pve-01", runner: fake_runner())

      assert length(sample.vms) == 3
      nginx = Enum.find(sample.vms, &(&1.vmid == 100))
      assert nginx.type == "qemu"
      assert nginx.name == "nginx-proxy"
      assert nginx.status == "running"
      assert nginx.cpu_usage == 0.05
      assert nginx.mem_bytes == 536_870_912
      assert nginx.max_mem_bytes == 2_147_483_648
      assert nginx.tags == ["web", "production"]

      mc = Enum.find(sample.vms, &(&1.vmid == 200))
      assert mc.type == "lxc"
    end
  end

  describe "collect_detail/1" do
    test "returns per-VM config + IPs" do
      sample = Vms.collect_detail(node: "pve-01", runner: fake_runner())

      nginx = Enum.find(sample.vms, &(&1.vmid == 100))
      assert nginx.config["cores"] == 2
      assert nginx.config["memory"] == 2048
      assert "192.168.1.10" in nginx.ips

      db = Enum.find(sample.vms, &(&1.vmid == 101))
      assert db.config["cores"] == 4
      assert db.ips == []
      assert length(db.errors) == 1

      mc = Enum.find(sample.vms, &(&1.vmid == 200))
      assert mc.config["hostname"] == "minecraft"
      assert "10.0.0.5" in mc.ips
    end
  end
end
  • Step 3: Run tests — expect failure
mix test test/proxmox_agent/collectors/vms_test.exs 2>&1 | tail -10

Expected: module undefined.

  • Step 4: Implement

Create agent/lib/proxmox_agent/collectors/vms.ex:

defmodule ProxmoxAgent.Collectors.Vms do
  @moduledoc """
  Collects VM/LXC runtime (fast path) and per-VM detail incl. IPs (medium path).
  """

  @spec collect_runtime(keyword()) :: %{vms: [map()], errors: [map()]}
  def collect_runtime(opts) do
    runner = runner(opts)
    node = Keyword.fetch!(opts, :node)

    {qemu, e1} = list(runner, node, "qemu")
    {lxc, e2} = list(runner, node, "lxc")

    vms =
      Enum.map(qemu, &normalize_runtime(&1, "qemu")) ++
        Enum.map(lxc, &normalize_runtime(&1, "lxc"))

    %{vms: vms, errors: Enum.filter([e1, e2], & &1)}
  end

  @spec collect_detail(keyword()) :: %{vms: [map()], errors: [map()]}
  def collect_detail(opts) do
    runner = runner(opts)
    node = Keyword.fetch!(opts, :node)

    {qemu, e1} = list(runner, node, "qemu")
    {lxc, e2} = list(runner, node, "lxc")

    qemu_details = Enum.map(qemu, &qemu_detail(runner, node, &1))
    lxc_details = Enum.map(lxc, &lxc_detail(runner, node, &1))

    %{vms: qemu_details ++ lxc_details, errors: Enum.filter([e1, e2], & &1)}
  end

  defp runner(opts), do: Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2)

  defp list(runner, node, type) do
    case runner.("pvesh", ["get", "/nodes/#{node}/#{type}", "--output-format", "json"]) do
      {:ok, body} ->
        case Jason.decode(body) do
          {:ok, list} when is_list(list) -> {list, nil}
          {:error, e} -> {[], %{tag: "decode_#{type}", message: Exception.message(e)}}
        end

      {:error, reason} ->
        {[], %{tag: "list_#{type}", message: inspect(reason)}}
    end
  end

  defp normalize_runtime(entry, type) do
    %{
      vmid: entry["vmid"],
      type: type,
      name: entry["name"] || entry["hostname"],
      status: entry["status"],
      uptime_seconds: entry["uptime"] || 0,
      cpu_usage: entry["cpu"] || 0.0,
      mem_bytes: entry["mem"] || 0,
      max_mem_bytes: entry["maxmem"] || 0,
      tags: parse_tags(entry["tags"])
    }
  end

  defp parse_tags(nil), do: []
  defp parse_tags(""), do: []

  defp parse_tags(str) when is_binary(str) do
    str |> String.split([";", ","], trim: true) |> Enum.map(&String.trim/1)
  end

  defp qemu_detail(runner, node, entry) do
    vmid = entry["vmid"]
    {config, cfg_err} = fetch_json(runner, "/nodes/#{node}/qemu/#{vmid}/config")
    {ips, ip_err} = fetch_qemu_agent_ips(runner, node, vmid)

    %{
      vmid: vmid,
      type: "qemu",
      name: entry["name"],
      config: config || %{},
      ips: ips,
      errors: Enum.filter([cfg_err, ip_err], & &1)
    }
  end

  defp lxc_detail(runner, node, entry) do
    vmid = entry["vmid"]
    {config, cfg_err} = fetch_json(runner, "/nodes/#{node}/lxc/#{vmid}/config")
    ips = extract_lxc_ips(config || %{})

    %{
      vmid: vmid,
      type: "lxc",
      name: entry["name"],
      config: config || %{},
      ips: ips,
      errors: Enum.filter([cfg_err], & &1)
    }
  end

  defp fetch_json(runner, path) do
    case runner.("pvesh", ["get", path, "--output-format", "json"]) do
      {:ok, body} ->
        case Jason.decode(body) do
          {:ok, map} -> {map, nil}
          {:error, e} -> {nil, %{tag: "decode", message: Exception.message(e)}}
        end

      {:error, reason} ->
        {nil, %{tag: "pvesh", message: inspect(reason)}}
    end
  end

  defp fetch_qemu_agent_ips(runner, node, vmid) do
    case runner.("pvesh", [
           "get",
           "/nodes/#{node}/qemu/#{vmid}/agent/network-get-interfaces",
           "--output-format",
           "json"
         ]) do
      {:ok, body} ->
        case Jason.decode(body) do
          {:ok, %{"result" => interfaces}} ->
            ips =
              interfaces
              |> Enum.reject(&(&1["name"] == "lo"))
              |> Enum.flat_map(&Map.get(&1, "ip-addresses", []))
              |> Enum.filter(&(&1["ip-address-type"] == "ipv4"))
              |> Enum.map(& &1["ip-address"])

            {ips, nil}

          _ ->
            {[], %{tag: "agent_ips", message: "unexpected shape"}}
        end

      {:error, _reason} ->
        {[], nil}
    end
  end

  defp extract_lxc_ips(config) do
    config
    |> Enum.filter(fn {k, _} -> String.starts_with?(to_string(k), "net") end)
    |> Enum.flat_map(fn {_, val} -> parse_lxc_net(val) end)
  end

  defp parse_lxc_net(val) when is_binary(val) do
    val
    |> String.split(",")
    |> Enum.find_value([], fn pair ->
      case String.split(pair, "=", parts: 2) do
        ["ip", ip] ->
          ip = ip |> String.split("/") |> hd()
          if ip == "dhcp", do: [], else: [ip]

        _ ->
          nil
      end
    end)
  end

  defp parse_lxc_net(_), do: []
end
  • Step 5: Run tests — expect pass
mix test test/proxmox_agent/collectors/vms_test.exs 2>&1 | tail -5

Expected: 2 tests pass.

  • Step 6: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/collectors/vms.ex agent/test/proxmox_agent/collectors/vms_test.exs agent/test/fixtures/pvesh
git commit -m "feat(agent): vms/lxc collectors for runtime and detail with fixtures"

Task 9: Agent — System Info Collector

Files:

  • Create: agent/test/fixtures/system/pveversion.txt

  • Create: agent/test/fixtures/system/zfs_version.txt

  • Create: agent/test/fixtures/system/apt_upgradable.txt

  • Create: agent/test/proxmox_agent/collectors/system_info_test.exs

  • Create: agent/lib/proxmox_agent/collectors/system_info.ex

  • Step 1: Fixtures

Create agent/test/fixtures/system/pveversion.txt:

pve-manager/8.3.1/abc123 (running kernel: 6.8.12-1-pve)

Create agent/test/fixtures/system/zfs_version.txt:

zfs-2.3.0-pve2
zfs-kmod-2.3.0-pve2

Create agent/test/fixtures/system/apt_upgradable.txt:

Listing...
libssl3/stable 3.0.11-1~deb12u2 amd64 [upgradable from: 3.0.11-1~deb12u1]
openssh-server/stable 1:9.2p1-2+deb12u3 amd64 [upgradable from: 1:9.2p1-2+deb12u2]
  • Step 2: Write failing test

Create agent/test/proxmox_agent/collectors/system_info_test.exs:

defmodule ProxmoxAgent.Collectors.SystemInfoTest do
  use ExUnit.Case, async: true

  alias ProxmoxAgent.Collectors.SystemInfo

  @fixtures Path.expand("../../fixtures/system", __DIR__)

  defp fake_runner do
    fn
      "pveversion", [] -> {:ok, File.read!(Path.join(@fixtures, "pveversion.txt"))}
      "zfs", ["--version"] -> {:ok, File.read!(Path.join(@fixtures, "zfs_version.txt"))}
      "apt", ["list", "--upgradable"] -> {:ok, File.read!(Path.join(@fixtures, "apt_upgradable.txt"))}
    end
  end

  test "collects pveversion, zfs version and pending upgrade count" do
    sample = SystemInfo.collect(runner: fake_runner())

    assert sample.pve_version =~ "pve-manager/8.3.1"
    assert sample.zfs_version =~ "2.3.0"
    assert sample.pending_updates == 2
    assert sample.errors == []
  end

  test "partial sample when one command fails" do
    partial = fn
      "pveversion", [] -> {:ok, "pve-manager/8.3.1/abc (running kernel: 6.8.12-1-pve)\n"}
      "zfs", ["--version"] -> {:error, {:enoent, "zfs"}}
      "apt", ["list", "--upgradable"] -> {:ok, "Listing...\n"}
    end

    sample = SystemInfo.collect(runner: partial)
    assert sample.pve_version =~ "8.3.1"
    assert sample.zfs_version == nil
    assert sample.pending_updates == 0
    assert length(sample.errors) == 1
  end
end
  • Step 3: Run test — expect failure
mix test test/proxmox_agent/collectors/system_info_test.exs 2>&1 | tail -5

Expected: module undefined.

  • Step 4: Implement

Create agent/lib/proxmox_agent/collectors/system_info.ex:

defmodule ProxmoxAgent.Collectors.SystemInfo do
  @moduledoc "Slow-path system metadata: pveversion, zfs version, apt upgradable count."

  @spec collect(keyword()) :: %{
          pve_version: String.t() | nil,
          zfs_version: String.t() | nil,
          pending_updates: non_neg_integer(),
          agent_version: String.t(),
          errors: [map()]
        }
  def collect(opts \\ []) do
    runner = Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2)

    {pve, e1} = trim_output(runner.("pveversion", []), :pveversion)
    {zfs, e2} = trim_output(runner.("zfs", ["--version"]), :zfs_version)
    {apt, e3} = runner.("apt", ["list", "--upgradable"]) |> count_upgrades()

    %{
      pve_version: pve,
      zfs_version: zfs,
      pending_updates: apt,
      agent_version: ProxmoxAgent.version(),
      errors: Enum.filter([e1, e2, e3], & &1)
    }
  end

  defp trim_output({:ok, text}, _tag), do: {String.trim(text) |> first_line(), nil}

  defp trim_output({:error, reason}, tag),
    do: {nil, %{tag: Atom.to_string(tag), message: inspect(reason)}}

  defp first_line(str), do: str |> String.split("\n", parts: 2) |> hd()

  defp count_upgrades({:ok, text}) do
    count =
      text
      |> String.split("\n", trim: true)
      |> Enum.count(&String.contains?(&1, "[upgradable"))

    {count, nil}
  end

  defp count_upgrades({:error, reason}),
    do: {0, %{tag: "apt_upgradable", message: inspect(reason)}}
end
  • Step 5: Run test — expect pass
mix test test/proxmox_agent/collectors/system_info_test.exs 2>&1 | tail -5

Expected: 2 tests pass.

  • Step 6: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/collectors/system_info.ex agent/test/proxmox_agent/collectors/system_info_test.exs agent/test/fixtures/system
git commit -m "feat(agent): system info collector for pveversion/zfs/apt"

Task 10: Agent — Reporter Schedules Medium + Slow

Files:

  • Modify: agent/lib/proxmox_agent/reporter.ex

  • Step 1: Update Reporter to schedule all three intervals and bundle multi-collector payloads

Replace the existing handle_info(:collect_fast, socket) clause and add medium/slow. Full new module body:

defmodule ProxmoxAgent.Reporter do
  @moduledoc """
  Maintains a persistent Phoenix Channel connection to the server, joins
  `host:<host_id>`, and pushes metric samples on fast/medium/slow intervals.

  Payload contract:
    metric:fast   => %{host, zfs_pools, storage, vms_runtime}
    metric:medium => %{zfs_datasets, vms_detail}
    metric:slow   => %{system_info}
  """

  use Slipstream, restart: :permanent
  require Logger

  alias ProxmoxAgent.Collectors.{Host, Storage, SystemInfo, Vms, Zfs}

  def start_link(%ProxmoxAgent.Config{} = cfg) do
    Slipstream.start_link(__MODULE__, cfg, name: __MODULE__)
  end

  @impl Slipstream
  def init(cfg) do
    socket =
      new_socket()
      |> assign(:cfg, cfg)
      |> assign(:topic, "host:" <> cfg.host_id)
      |> connect!(uri: cfg.server_url)

    {:ok, socket}
  end

  @impl Slipstream
  def handle_connect(socket) do
    topic = socket.assigns.topic
    cfg = socket.assigns.cfg

    payload = %{"token" => cfg.token, "agent_version" => ProxmoxAgent.version()}
    Logger.info("reporter: connected, joining #{topic}")
    {:ok, join(socket, topic, payload)}
  end

  @impl Slipstream
  def handle_join(topic, _reply, socket) do
    Logger.info("reporter: joined #{topic}")
    send(self(), :collect_fast)
    send(self(), :collect_medium)
    send(self(), :collect_slow)
    {:ok, socket}
  end

  @impl Slipstream
  def handle_info(:collect_fast, socket) do
    cfg = socket.assigns.cfg

    data = %{
      host: Host.collect(),
      zfs_pools: Zfs.collect_pools(),
      storage: Storage.collect(node: cfg.host_id),
      vms_runtime: Vms.collect_runtime(node: cfg.host_id)
    }

    push_metric(socket, "metric:fast", data)
    Process.send_after(self(), :collect_fast, cfg.fast_seconds * 1000)
    {:noreply, socket}
  end

  def handle_info(:collect_medium, socket) do
    cfg = socket.assigns.cfg

    data = %{
      zfs_datasets: Zfs.collect_datasets(),
      vms_detail: Vms.collect_detail(node: cfg.host_id)
    }

    push_metric(socket, "metric:medium", data)
    Process.send_after(self(), :collect_medium, cfg.medium_seconds * 1000)
    {:noreply, socket}
  end

  def handle_info(:collect_slow, socket) do
    cfg = socket.assigns.cfg

    data = %{system_info: SystemInfo.collect()}

    push_metric(socket, "metric:slow", data)
    Process.send_after(self(), :collect_slow, cfg.slow_seconds * 1000)
    {:noreply, socket}
  end

  @impl Slipstream
  def handle_disconnect(reason, socket) do
    Logger.warning("reporter: disconnected — #{inspect(reason)}; reconnecting")
    reconnect(socket)
  end

  @impl Slipstream
  def handle_topic_close(topic, reason, socket) do
    Logger.warning("reporter: topic #{topic} closed: #{inspect(reason)}; rejoining")
    rejoin(socket, topic)
  end

  defp push_metric(socket, event, data) do
    payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: data}

    case push(socket, socket.assigns.topic, event, payload) do
      {:ok, _ref} ->
        :ok

      {:error, reason} ->
        Logger.warning("reporter: push #{event} failed: #{inspect(reason)}")
        :ok
    end
  end
end
  • Step 2: Compile and run all agent tests
cd /Users/cabele/claudeprojects/proxmox_monitor/agent
mix compile --warnings-as-errors 2>&1 | tail -5
mix test 2>&1 | tail -5

Expected: compile clean, all tests pass (existing + new collector tests).

  • Step 3: Commit
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/reporter.ex
git commit -m "feat(agent): reporter schedules fast/medium/slow collection with bundled payloads"

Task 11: End-to-End Smoke Test

Goal: Agent runs against local server, all three intervals produce rows in metrics, GET /api/hosts/pve-dev-01 returns the latest samples.

  • Step 1: Start server
cd /Users/cabele/claudeprojects/proxmox_monitor/server
mix ecto.migrate
mix phx.server

Run in a separate terminal/background. Wait for Running ServerWeb.Endpoint log line.

  • Step 2: Re-use or create a host token

Previous Phase 1 smoke test registered pve-dev-01. If that host still exists, use its token. If not:

mix run -e 'Server.Release.register_host("pve-dev-01")'

Copy the printed TOKEN.

  • Step 3: Write agent config
cat > /tmp/agent-local.toml <<EOF
server_url = "ws://localhost:4000/socket/websocket"
token = "<TOKEN>"
host_id = "pve-dev-01"

[intervals]
fast_seconds = 5
medium_seconds = 10
slow_seconds = 20
EOF
  • Step 4: Start agent
cd /Users/cabele/claudeprojects/proxmox_monitor/agent
AGENT_CONFIG=/tmp/agent-local.toml mix run --no-halt
  • Step 5: Wait 25 seconds then check the API
sleep 25
curl -s http://localhost:4000/api/hosts/pve-dev-01 | python3 -m json.tool

Expected: JSON containing name, status: "online", and a samples object with keys fast, medium, slow, each with collected_at and payload. On macOS the payload will contain errors for /proc, zpool, zfs, pvesh, pveversion, apt — that's correct: the agent is architecturally sound, it just can't actually read Proxmox data from a Mac.

  • Step 6: Verify row counts grow
cd /Users/cabele/claudeprojects/proxmox_monitor/server
mix run -e 'IO.inspect(Server.Repo.aggregate(Server.Schema.Metric, :count, :id))'

Expected: a number >= 5 after ~25 seconds of runtime (fast every 5s = ~5 rows, medium every 10s = ~2-3 rows, slow every 20s = ~1-2 rows).

  • Step 7: Stop agent, clean up
# Stop agent with Ctrl+C,a  (or pkill for automation)
rm /tmp/agent-local.toml

No code changes; no commit.


Phase 2 Exit Criteria

  • cd server && mix test — all green.
  • cd agent && mix test — all green.
  • Smoke test: agent pushes three intervals; DB grows; API returns structured samples.
  • Retention GenServer running in server supervision tree.
  • All commits on main.

Next up (Phase 3): LiveView dashboard. See roadmap in proxmox-monitor-konzept.md.