proxMon/docs/superpowers/plans/2026-04-21-phase2-metrics-und-collectors.md
Carsten fe7b07db4f fix(server): only require DASHBOARD_PASSWORD_HASH in prod
Blocking bootstrap in dev meant you couldn't even run 'mix run' to
generate the initial hash. Now dev/test accept an optional env override
and boot without it; prod still raises when unset.
2026-04-21 22:59:24 +02:00

2081 lines
59 KiB
Markdown

# Phase 2 — Metrics Persistence + ZFS/VM/Storage Collectors
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Persist agent samples in SQLite with 48h retention, expose recent samples via a simple JSON route, and ship agent collectors for ZFS pools/datasets, Proxmox storage, VM/LXC runtime, and system info. End state: agent pushes rich samples on fast/medium/slow intervals; server stores them; `GET /api/hosts/:name` returns the latest data.
**Architecture:** Channel handlers delegate to a new `Server.Metrics` context that writes to a single `metrics` table with a JSON `payload` column. A `Retention` GenServer prunes old rows hourly. Agent gains a tiny command-runner abstraction so each external-command collector (`zfs`, `pvesh`, `apt`, `pveversion`) can be unit-tested with fixture output on macOS. Reporter schedules medium and slow collection alongside the existing fast path.
**Tech Stack:** Elixir/OTP, Phoenix, Ecto SQLite with JSON column support (`%{...}` stored as TEXT), ExUnit, fixture-based testing for external commands.
---
## File Structure
```
server/
├── priv/repo/migrations/<ts>_create_metrics.exs create
├── lib/server/schema/metric.ex create
├── lib/server/metrics.ex create (context)
├── lib/server/retention.ex create (GenServer)
├── lib/server/application.ex modify: add Retention to children
├── lib/server_web/channels/host_channel.ex modify: call Metrics.record_sample
├── lib/server_web/controllers/host_controller.ex create
├── lib/server_web/router.ex modify: add /api/hosts/:name route
├── test/server/metrics_test.exs create
├── test/server/retention_test.exs create
├── test/server_web/channels/host_channel_test.exs modify: assert DB write
└── test/server_web/controllers/host_controller_test.exs create
agent/
├── lib/proxmox_agent/shell.ex create (System.cmd wrapper)
├── lib/proxmox_agent/collectors/zfs.ex create
├── lib/proxmox_agent/collectors/storage.ex create
├── lib/proxmox_agent/collectors/vms.ex create
├── lib/proxmox_agent/collectors/system_info.ex create
├── lib/proxmox_agent/reporter.ex modify: medium + slow handlers
├── test/proxmox_agent/shell_test.exs create
├── test/proxmox_agent/collectors/zfs_test.exs create
├── test/proxmox_agent/collectors/storage_test.exs create
├── test/proxmox_agent/collectors/vms_test.exs create
├── test/proxmox_agent/collectors/system_info_test.exs create
└── test/fixtures/ create
├── zfs/zpool_list.json
├── zfs/zpool_status.json
├── zfs/zfs_list.json
├── pvesh/storage.json
├── pvesh/qemu.json
├── pvesh/lxc.json
├── pvesh/qemu_N_status.json
├── pvesh/qemu_N_config.json
├── pvesh/qemu_N_agent_interfaces.json
├── system/pveversion.txt
├── system/zfs_version.txt
└── system/apt_upgradable.txt
```
**Shell abstraction rationale:** Each external-command collector takes a `:runner` keyword option — a `(cmd, args) -> {:ok, output} | {:error, term}` function. Default is `ProxmoxAgent.Shell.run/2` which wraps `System.cmd`. Tests inject a fake that returns fixture content. Same pattern as `proc_dir:` in the existing host collector — consistent, no new abstractions.
**Sample payload contract (what the agent sends):**
| Event | Data keys |
|---------------|---------------------------------------------------------------|
| `metric:fast` | `host`, `zfs_pools`, `storage`, `vms_runtime` |
| `metric:medium` | `zfs_datasets`, `vms_detail` |
| `metric:slow` | `system_info` |
Each key's value is a collector-produced map. `errors` inside each collector allows partial samples.
---
## Task 1: Server — Metrics Schema & Context
**Files:**
- Create: `server/priv/repo/migrations/<ts>_create_metrics.exs`
- Create: `server/lib/server/schema/metric.ex`
- Create: `server/lib/server/metrics.ex`
- Create: `server/test/server/metrics_test.exs`
- [ ] **Step 1: Generate migration**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor/server
mix ecto.gen.migration create_metrics
```
Replace the contents of the newly-created migration file with:
```elixir
defmodule Server.Repo.Migrations.CreateMetrics do
use Ecto.Migration
def change do
create table(:metrics) do
add :host_id, references(:hosts, on_delete: :delete_all), null: false
add :collected_at, :utc_datetime_usec, null: false
add :interval_type, :string, null: false
add :payload, :string, null: false
timestamps(type: :utc_datetime_usec, updated_at: false)
end
create index(:metrics, [:host_id, :collected_at])
create index(:metrics, [:collected_at])
end
end
```
Note: SQLite has no native JSON column — use TEXT (`:string`). Ecto's JSON handling happens at the schema layer via the `{:map, Jason}` type.
- [ ] **Step 2: Write the Metric schema**
Create `server/lib/server/schema/metric.ex`:
```elixir
defmodule Server.Schema.Metric do
use Ecto.Schema
import Ecto.Changeset
@intervals ~w(fast medium slow)
schema "metrics" do
belongs_to :host, Server.Schema.Host
field :collected_at, :utc_datetime_usec
field :interval_type, :string
field :payload, :map
timestamps(type: :utc_datetime_usec, updated_at: false)
end
def changeset(metric, attrs) do
metric
|> cast(attrs, [:host_id, :collected_at, :interval_type, :payload])
|> validate_required([:host_id, :collected_at, :interval_type, :payload])
|> validate_inclusion(:interval_type, @intervals)
|> assoc_constraint(:host)
end
end
```
`ecto_sqlite3` serializes `:map` fields to JSON TEXT transparently — no extra config needed.
- [ ] **Step 3: Write failing tests for the Metrics context**
Create `server/test/server/metrics_test.exs`:
```elixir
defmodule Server.MetricsTest do
use Server.DataCase, async: true
alias Server.Metrics
alias Server.Hosts
setup do
{:ok, {host, _token}} = Hosts.create_host("pve-01")
%{host: host}
end
describe "record_sample/4" do
test "inserts a metric row with the given payload", %{host: host} do
ts = DateTime.utc_now()
payload = %{"host" => %{"load1" => 0.5}}
assert {:ok, metric} = Metrics.record_sample(host.id, "fast", ts, payload)
assert metric.host_id == host.id
assert metric.interval_type == "fast"
assert metric.payload == payload
assert metric.collected_at == ts
end
test "rejects unknown interval_type", %{host: host} do
ts = DateTime.utc_now()
assert {:error, cs} = Metrics.record_sample(host.id, "nope", ts, %{})
assert %{interval_type: ["is invalid"]} = errors_on(cs)
end
test "rejects unknown host_id" do
ts = DateTime.utc_now()
assert {:error, cs} = Metrics.record_sample(999_999, "fast", ts, %{})
assert %{host: ["does not exist"]} = errors_on(cs)
end
end
describe "latest_sample/2" do
test "returns most recent sample for a host and interval", %{host: host} do
{:ok, _} = Metrics.record_sample(host.id, "fast", dt(-60), %{"v" => 1})
{:ok, _} = Metrics.record_sample(host.id, "fast", dt(-30), %{"v" => 2})
{:ok, _} = Metrics.record_sample(host.id, "fast", dt(-10), %{"v" => 3})
assert %{payload: %{"v" => 3}} = Metrics.latest_sample(host.id, "fast")
end
test "returns nil when no samples exist", %{host: host} do
assert Metrics.latest_sample(host.id, "fast") == nil
end
end
describe "delete_older_than/1" do
test "deletes samples with collected_at before the cutoff", %{host: host} do
{:ok, _} = Metrics.record_sample(host.id, "fast", dt(-3600 * 50), %{"v" => "old"})
{:ok, keep} = Metrics.record_sample(host.id, "fast", dt(-60), %{"v" => "fresh"})
cutoff = DateTime.add(DateTime.utc_now(), -48 * 3600, :second)
assert {1, nil} = Metrics.delete_older_than(cutoff)
remaining = Server.Repo.all(Server.Schema.Metric)
assert length(remaining) == 1
assert hd(remaining).id == keep.id
end
end
defp dt(offset_seconds), do: DateTime.add(DateTime.utc_now(), offset_seconds, :second)
end
```
- [ ] **Step 4: Run tests — expect compile failure (`Server.Metrics` not defined)**
```bash
mix test test/server/metrics_test.exs 2>&1 | tail -10
```
Expected: module undefined errors.
- [ ] **Step 5: Implement the context**
Create `server/lib/server/metrics.ex`:
```elixir
defmodule Server.Metrics do
@moduledoc "Metric sample storage and retrieval."
import Ecto.Query
alias Server.Repo
alias Server.Schema.Metric
@spec record_sample(integer(), String.t(), DateTime.t(), map()) ::
{:ok, Metric.t()} | {:error, Ecto.Changeset.t()}
def record_sample(host_id, interval_type, collected_at, payload) do
%Metric{}
|> Metric.changeset(%{
host_id: host_id,
interval_type: interval_type,
collected_at: collected_at,
payload: payload
})
|> Repo.insert()
end
@spec latest_sample(integer(), String.t()) :: Metric.t() | nil
def latest_sample(host_id, interval_type) do
from(m in Metric,
where: m.host_id == ^host_id and m.interval_type == ^interval_type,
order_by: [desc: m.collected_at],
limit: 1
)
|> Repo.one()
end
@spec delete_older_than(DateTime.t()) :: {non_neg_integer(), nil}
def delete_older_than(%DateTime{} = cutoff) do
from(m in Metric, where: m.collected_at < ^cutoff)
|> Repo.delete_all()
end
end
```
- [ ] **Step 6: Run the migration and tests**
```bash
mix ecto.migrate && mix test test/server/metrics_test.exs 2>&1 | tail -6
```
Expected: 6 tests pass.
- [ ] **Step 7: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add server/priv/repo/migrations server/lib/server/schema/metric.ex server/lib/server/metrics.ex server/test/server/metrics_test.exs
git commit -m "feat(server): metrics schema + context with record/latest/prune"
```
---
## Task 2: Server — Channel Writes Samples
**Files:**
- Modify: `server/lib/server_web/channels/host_channel.ex`
- Modify: `server/test/server_web/channels/host_channel_test.exs`
- [ ] **Step 1: Extend existing tests to assert DB writes**
Open `server/test/server_web/channels/host_channel_test.exs` and replace the `describe "metric:fast event" do ... end` block with:
```elixir
describe "metric events persist to DB" do
setup %{token: token, host: host} do
{:ok, socket} = connect(AgentSocket, %{})
{:ok, _reply, joined} =
subscribe_and_join(socket, "host:pve-01", %{
"token" => token,
"agent_version" => "0.1.0"
})
%{socket: joined, host: host}
end
test "metric:fast is stored with interval=fast", %{socket: socket, host: host} do
ts = "2026-04-21T12:00:00.123456Z"
ref =
push(socket, "metric:fast", %{
"collected_at" => ts,
"data" => %{"cpu_percent" => 12.3, "load1" => 0.2}
})
assert_reply ref, :ok
sample = Server.Metrics.latest_sample(host.id, "fast")
assert sample != nil
assert sample.payload == %{"cpu_percent" => 12.3, "load1" => 0.2}
{:ok, expected, _} = DateTime.from_iso8601(ts)
assert DateTime.compare(sample.collected_at, expected) == :eq
end
test "metric:medium is stored with interval=medium", %{socket: socket, host: host} do
ref =
push(socket, "metric:medium", %{
"collected_at" => "2026-04-21T12:05:00Z",
"data" => %{"vms_detail" => []}
})
assert_reply ref, :ok
sample = Server.Metrics.latest_sample(host.id, "medium")
assert sample != nil
assert sample.payload == %{"vms_detail" => []}
end
test "metric:slow is stored with interval=slow", %{socket: socket, host: host} do
ref =
push(socket, "metric:slow", %{
"collected_at" => "2026-04-21T12:30:00Z",
"data" => %{"system_info" => %{"pveversion" => "8.3.0"}}
})
assert_reply ref, :ok
sample = Server.Metrics.latest_sample(host.id, "slow")
assert sample != nil
assert sample.payload == %{"system_info" => %{"pveversion" => "8.3.0"}}
end
test "replies :error when collected_at is missing", %{socket: socket} do
ref = push(socket, "metric:fast", %{"data" => %{}})
assert_reply ref, :error, %{reason: "missing_collected_at"}
end
test "replies :error when data is missing", %{socket: socket} do
ref = push(socket, "metric:fast", %{"collected_at" => "2026-04-21T12:00:00Z"})
assert_reply ref, :error, %{reason: "missing_data"}
end
end
```
- [ ] **Step 2: Run tests — expect failure**
```bash
mix test test/server_web/channels/host_channel_test.exs 2>&1 | tail -10
```
Expected: tests in the new `describe` block fail because the channel doesn't persist yet.
- [ ] **Step 3: Update HostChannel to persist**
Replace the three `handle_in("metric:..."/2, ...)` clauses in `server/lib/server_web/channels/host_channel.ex` with:
```elixir
@impl true
def handle_in("metric:" <> kind, payload, socket) when kind in ~w(fast medium slow) do
with {:ok, collected_at} <- parse_collected_at(payload),
{:ok, data} <- parse_data(payload),
{:ok, _} <- Server.Metrics.record_sample(socket.assigns.host_id, kind, collected_at, data) do
Logger.debug("stored #{kind} sample host=#{socket.assigns.host_name}")
{:reply, :ok, socket}
else
{:error, reason} when is_binary(reason) ->
Logger.warning("metric:#{kind} rejected host=#{socket.assigns.host_name} reason=#{reason}")
{:reply, {:error, %{reason: reason}}, socket}
{:error, %Ecto.Changeset{} = cs} ->
Logger.warning("metric:#{kind} changeset failed: #{inspect(cs.errors)}")
{:reply, {:error, %{reason: "invalid_payload"}}, socket}
end
end
defp parse_collected_at(%{"collected_at" => ts}) when is_binary(ts) do
case DateTime.from_iso8601(ts) do
{:ok, dt, _} -> {:ok, dt}
_ -> {:error, "invalid_collected_at"}
end
end
defp parse_collected_at(_), do: {:error, "missing_collected_at"}
defp parse_data(%{"data" => data}) when is_map(data), do: {:ok, data}
defp parse_data(_), do: {:error, "missing_data"}
```
- [ ] **Step 4: Run tests — expect pass**
```bash
mix test test/server_web/channels/host_channel_test.exs 2>&1 | tail -5
```
Expected: all channel tests pass.
- [ ] **Step 5: Run full server suite**
```bash
mix test 2>&1 | tail -4
```
Expected: all green.
- [ ] **Step 6: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add server/lib/server_web/channels/host_channel.ex server/test/server_web/channels/host_channel_test.exs
git commit -m "feat(server): channel persists fast/medium/slow samples to metrics table"
```
---
## Task 3: Server — Retention GenServer
**Files:**
- Create: `server/lib/server/retention.ex`
- Create: `server/test/server/retention_test.exs`
- Modify: `server/lib/server/application.ex`
- [ ] **Step 1: Write failing test**
Create `server/test/server/retention_test.exs`:
```elixir
defmodule Server.RetentionTest do
use Server.DataCase, async: false
alias Server.{Hosts, Metrics, Retention}
test "prune_now/1 deletes samples older than the retention window" do
{:ok, {host, _}} = Hosts.create_host("pve-01")
stale_at = DateTime.add(DateTime.utc_now(), -49 * 3600, :second)
fresh_at = DateTime.add(DateTime.utc_now(), -60, :second)
{:ok, _} = Metrics.record_sample(host.id, "fast", stale_at, %{"x" => 1})
{:ok, fresh} = Metrics.record_sample(host.id, "fast", fresh_at, %{"x" => 2})
{deleted, _} = Retention.prune_now(48 * 3600)
assert deleted == 1
remaining = Server.Repo.all(Server.Schema.Metric)
assert length(remaining) == 1
assert hd(remaining).id == fresh.id
end
end
```
- [ ] **Step 2: Run test — expect failure**
```bash
mix test test/server/retention_test.exs 2>&1 | tail -5
```
Expected: `Server.Retention` undefined.
- [ ] **Step 3: Implement the GenServer**
Create `server/lib/server/retention.ex`:
```elixir
defmodule Server.Retention do
@moduledoc "Deletes metric samples older than the retention window. Runs hourly."
use GenServer
require Logger
@default_retention_seconds 48 * 60 * 60
@default_interval_ms 60 * 60 * 1_000
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc "Synchronous prune used by tests and manual ops."
def prune_now(retention_seconds \\ @default_retention_seconds) do
cutoff = DateTime.add(DateTime.utc_now(), -retention_seconds, :second)
Server.Metrics.delete_older_than(cutoff)
end
@impl true
def init(opts) do
retention_seconds = Keyword.get(opts, :retention_seconds, @default_retention_seconds)
interval_ms = Keyword.get(opts, :interval_ms, @default_interval_ms)
state = %{retention_seconds: retention_seconds, interval_ms: interval_ms}
Process.send_after(self(), :prune, interval_ms)
{:ok, state}
end
@impl true
def handle_info(:prune, state) do
{count, _} = prune_now(state.retention_seconds)
if count > 0, do: Logger.info("retention: pruned #{count} stale samples")
Process.send_after(self(), :prune, state.interval_ms)
{:noreply, state}
end
end
```
- [ ] **Step 4: Run test — expect pass**
```bash
mix test test/server/retention_test.exs 2>&1 | tail -5
```
Expected: 1 test passes.
- [ ] **Step 5: Add Retention to the supervision tree**
In `server/lib/server/application.ex`, find the children list and add `Server.Retention` **after** `{Phoenix.PubSub, name: Server.PubSub}`:
```elixir
{Phoenix.PubSub, name: Server.PubSub},
Server.Retention,
# Start a worker by calling: Server.Worker.start_link(arg)
```
- [ ] **Step 6: Run full test suite**
```bash
mix test 2>&1 | tail -4
```
Expected: all green.
- [ ] **Step 7: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add server/lib/server/retention.ex server/test/server/retention_test.exs server/lib/server/application.ex
git commit -m "feat(server): retention GenServer prunes samples older than 48h hourly"
```
---
## Task 4: Server — Simple JSON Route
**Files:**
- Create: `server/lib/server_web/controllers/host_controller.ex`
- Create: `server/test/server_web/controllers/host_controller_test.exs`
- Modify: `server/lib/server_web/router.ex`
- [ ] **Step 1: Write failing test**
Create `server/test/server_web/controllers/host_controller_test.exs`:
```elixir
defmodule ServerWeb.HostControllerTest do
use ServerWeb.ConnCase, async: true
alias Server.{Hosts, Metrics}
describe "GET /api/hosts/:name" do
setup do
{:ok, {host, _token}} = Hosts.create_host("pve-01")
{:ok, _} =
Metrics.record_sample(
host.id,
"fast",
DateTime.utc_now(),
%{"host" => %{"load1" => 0.5}}
)
%{host: host}
end
test "returns host info and latest samples", %{conn: conn, host: host} do
conn = get(conn, ~p"/api/hosts/#{host.name}")
assert %{
"name" => "pve-01",
"status" => _,
"samples" => %{"fast" => %{"payload" => %{"host" => %{"load1" => 0.5}}}}
} = json_response(conn, 200)
end
test "returns 404 for unknown host", %{conn: conn} do
conn = get(conn, ~p"/api/hosts/nope")
assert json_response(conn, 404) == %{"error" => "host_not_found"}
end
end
end
```
- [ ] **Step 2: Add route**
In `server/lib/server_web/router.ex`, add after the existing `scope "/"` block:
```elixir
scope "/api", ServerWeb do
pipe_through :api
get "/hosts/:name", HostController, :show
end
```
Verify there is already a `:api` pipeline above; in Phoenix 1.7 scaffold it exists. If not, add:
```elixir
pipeline :api do
plug :accepts, ["json"]
end
```
(Check the file first; add only if missing.)
- [ ] **Step 3: Run test — expect failure (controller missing)**
```bash
mix test test/server_web/controllers/host_controller_test.exs 2>&1 | tail -8
```
Expected: undefined `ServerWeb.HostController`.
- [ ] **Step 4: Implement controller**
Create `server/lib/server_web/controllers/host_controller.ex`:
```elixir
defmodule ServerWeb.HostController do
use ServerWeb, :controller
alias Server.{Metrics, Repo, Schema.Host}
def show(conn, %{"name" => name}) do
case Repo.get_by(Host, name: name) do
nil ->
conn
|> put_status(:not_found)
|> json(%{error: "host_not_found"})
%Host{} = host ->
samples =
for interval <- ~w(fast medium slow),
sample = Metrics.latest_sample(host.id, interval),
into: %{} do
{interval, %{collected_at: sample.collected_at, payload: sample.payload}}
end
json(conn, %{
name: host.name,
status: host.status,
agent_version: host.agent_version,
last_seen_at: host.last_seen_at,
samples: samples
})
end
end
end
```
- [ ] **Step 5: Run test — expect pass**
```bash
mix test test/server_web/controllers/host_controller_test.exs 2>&1 | tail -5
```
Expected: 2 tests pass.
- [ ] **Step 6: Run full test suite**
```bash
mix test 2>&1 | tail -4
```
Expected: all green.
- [ ] **Step 7: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add server/lib/server_web/controllers/host_controller.ex server/lib/server_web/router.ex server/test/server_web/controllers/host_controller_test.exs
git commit -m "feat(server): GET /api/hosts/:name returns latest fast/medium/slow samples"
```
---
## Task 5: Agent — Shell Runner
**Files:**
- Create: `agent/lib/proxmox_agent/shell.ex`
- Create: `agent/test/proxmox_agent/shell_test.exs`
- [ ] **Step 1: Write failing test**
Create `agent/test/proxmox_agent/shell_test.exs`:
```elixir
defmodule ProxmoxAgent.ShellTest do
use ExUnit.Case, async: true
alias ProxmoxAgent.Shell
test "run/2 returns {:ok, output} on zero exit" do
assert {:ok, output} = Shell.run("/bin/echo", ["hello"])
assert String.trim(output) == "hello"
end
test "run/2 returns {:error, {:nonzero_exit, code, output}} on non-zero exit" do
assert {:error, {:nonzero_exit, code, _}} = Shell.run("/bin/sh", ["-c", "exit 7"])
assert code == 7
end
test "run/2 returns {:error, {:enoent, _}} when binary is missing" do
assert {:error, {:enoent, _}} = Shell.run("/does/not/exist/nope", [])
end
end
```
- [ ] **Step 2: Run test — expect failure**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor/agent
mix test test/proxmox_agent/shell_test.exs 2>&1 | tail -5
```
Expected: module undefined.
- [ ] **Step 3: Implement Shell**
Create `agent/lib/proxmox_agent/shell.ex`:
```elixir
defmodule ProxmoxAgent.Shell do
@moduledoc """
Thin wrapper over System.cmd for testability. Collectors accept an optional
:runner function of this shape so tests can inject fixture-backed fakes.
"""
@type result :: {:ok, String.t()} | {:error, term()}
@spec run(String.t(), [String.t()]) :: result
def run(command, args) do
try do
case System.cmd(command, args, stderr_to_stdout: true) do
{output, 0} -> {:ok, output}
{output, code} -> {:error, {:nonzero_exit, code, output}}
end
rescue
e in ErlangError ->
case e.original do
:enoent -> {:error, {:enoent, command}}
other -> {:error, {:system_error, other}}
end
end
end
end
```
- [ ] **Step 4: Run test — expect pass**
```bash
mix test test/proxmox_agent/shell_test.exs 2>&1 | tail -5
```
Expected: 3 tests pass.
- [ ] **Step 5: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/shell.ex agent/test/proxmox_agent/shell_test.exs
git commit -m "feat(agent): Shell.run wrapper for testable external commands"
```
---
## Task 6: Agent — ZFS Collector
**Files:**
- Create: `agent/test/fixtures/zfs/zpool_list.json`
- Create: `agent/test/fixtures/zfs/zpool_status.json`
- Create: `agent/test/fixtures/zfs/zfs_list.json`
- Create: `agent/test/proxmox_agent/collectors/zfs_test.exs`
- Create: `agent/lib/proxmox_agent/collectors/zfs.ex`
- [ ] **Step 1: Write fixture JSON**
Create `agent/test/fixtures/zfs/zpool_list.json`:
```json
{
"output_version": { "command": "zpool list", "vers_major": 0, "vers_minor": 1 },
"pools": {
"rpool": {
"name": "rpool",
"size": 500000000000,
"alloc": 200000000000,
"free": 300000000000,
"frag": 17,
"cap": 40,
"health": "ONLINE"
},
"tank": {
"name": "tank",
"size": 8000000000000,
"alloc": 6000000000000,
"free": 2000000000000,
"frag": 55,
"cap": 75,
"health": "DEGRADED"
}
}
}
```
Create `agent/test/fixtures/zfs/zpool_status.json`:
```json
{
"output_version": { "command": "zpool status", "vers_major": 0, "vers_minor": 1 },
"pools": {
"rpool": {
"name": "rpool",
"state": "ONLINE",
"scan": {
"function": "scrub",
"state": "FINISHED",
"end_time": "Sat Apr 19 02:00:00 2026"
},
"error_count": "0",
"vdevs": {
"mirror-0": {
"name": "mirror-0",
"vdev_type": "mirror",
"state": "ONLINE",
"read_errors": "0",
"write_errors": "0",
"checksum_errors": "0"
}
}
},
"tank": {
"name": "tank",
"state": "DEGRADED",
"scan": {
"function": "scrub",
"state": "FINISHED",
"end_time": "Tue Mar 01 08:00:00 2026"
},
"error_count": "2",
"vdevs": {
"raidz2-0": {
"name": "raidz2-0",
"vdev_type": "raidz2",
"state": "DEGRADED",
"read_errors": "0",
"write_errors": "0",
"checksum_errors": "2"
}
}
}
}
}
```
Create `agent/test/fixtures/zfs/zfs_list.json`:
```json
{
"output_version": { "command": "zfs list", "vers_major": 0, "vers_minor": 1 },
"datasets": {
"rpool": {
"name": "rpool",
"type": "FILESYSTEM",
"properties": {
"used": { "value": "200000000000" },
"available": { "value": "300000000000" },
"usedbysnapshots": { "value": "5000000000" }
}
},
"rpool/data": {
"name": "rpool/data",
"type": "FILESYSTEM",
"properties": {
"used": { "value": "100000000000" },
"available": { "value": "300000000000" },
"usedbysnapshots": { "value": "2000000000" }
}
},
"rpool/data@daily-2026-04-20": {
"name": "rpool/data@daily-2026-04-20",
"type": "SNAPSHOT",
"properties": {
"creation": { "value": "1745107200" }
}
},
"rpool/data@daily-2026-04-21": {
"name": "rpool/data@daily-2026-04-21",
"type": "SNAPSHOT",
"properties": {
"creation": { "value": "1745193600" }
}
}
}
}
```
The Unix timestamps `1745107200` and `1745193600` correspond to 2025-04-20 and 2025-04-21 — plausible ages for tests.
- [ ] **Step 2: Write failing tests**
Create `agent/test/proxmox_agent/collectors/zfs_test.exs`:
```elixir
defmodule ProxmoxAgent.Collectors.ZfsTest do
use ExUnit.Case, async: true
alias ProxmoxAgent.Collectors.Zfs
@fixtures Path.expand("../../fixtures/zfs", __DIR__)
defp fake_runner do
fn
"zpool", ["list", "-j", "--json-int"] ->
{:ok, File.read!(Path.join(@fixtures, "zpool_list.json"))}
"zpool", ["status", "-j", "--json-flat-vdevs", "--json-int"] ->
{:ok, File.read!(Path.join(@fixtures, "zpool_status.json"))}
"zfs", ["list", "-j", "--json-int", "-t", "all"] ->
{:ok, File.read!(Path.join(@fixtures, "zfs_list.json"))}
end
end
describe "collect_pools/1" do
test "returns a summary per pool" do
sample = Zfs.collect_pools(runner: fake_runner())
assert is_list(sample.pools)
assert length(sample.pools) == 2
rpool = Enum.find(sample.pools, &(&1.name == "rpool"))
tank = Enum.find(sample.pools, &(&1.name == "tank"))
assert rpool.health == "ONLINE"
assert rpool.capacity_percent == 40
assert rpool.fragmentation_percent == 17
assert rpool.size_bytes == 500_000_000_000
assert rpool.error_count == 0
assert rpool.degraded_vdev_count == 0
assert tank.health == "DEGRADED"
assert tank.error_count == 2
assert tank.degraded_vdev_count == 1
end
test "populates errors list when zpool fails" do
failing = fn _, _ -> {:error, {:enoent, "zpool"}} end
sample = Zfs.collect_pools(runner: failing)
assert sample.pools == []
assert length(sample.errors) >= 1
end
end
describe "collect_datasets/1" do
test "returns datasets and per-dataset snapshot summary" do
sample = Zfs.collect_datasets(runner: fake_runner())
assert length(sample.datasets) == 2
rpool_data = Enum.find(sample.datasets, &(&1.name == "rpool/data"))
assert rpool_data.used_bytes == 100_000_000_000
assert rpool_data.usedbysnapshots_bytes == 2_000_000_000
assert rpool_data.snapshot_count == 2
assert rpool_data.newest_snapshot_unix == 1_745_193_600
assert rpool_data.oldest_snapshot_unix == 1_745_107_200
end
end
end
```
- [ ] **Step 3: Run tests — expect failure**
```bash
mix test test/proxmox_agent/collectors/zfs_test.exs 2>&1 | tail -10
```
Expected: `ProxmoxAgent.Collectors.Zfs` undefined.
- [ ] **Step 4: Implement Zfs collector**
Create `agent/lib/proxmox_agent/collectors/zfs.ex`:
```elixir
defmodule ProxmoxAgent.Collectors.Zfs do
@moduledoc """
Collects ZFS pool health (fast path) and dataset/snapshot info (medium path).
Delegates shelling out to an injectable runner so tests can supply fixtures.
"""
@type pool_summary :: %{
name: String.t(),
health: String.t(),
size_bytes: non_neg_integer(),
allocated_bytes: non_neg_integer(),
free_bytes: non_neg_integer(),
fragmentation_percent: non_neg_integer(),
capacity_percent: non_neg_integer(),
error_count: non_neg_integer(),
vdev_count: non_neg_integer(),
degraded_vdev_count: non_neg_integer(),
last_scrub_end: String.t() | nil
}
@spec collect_pools(keyword()) :: %{pools: [pool_summary()], errors: [map()]}
def collect_pools(opts \\ []) do
runner = runner(opts)
{list_result, list_err} = decode(runner.("zpool", ["list", "-j", "--json-int"]), :zpool_list)
{status_result, status_err} =
decode(runner.("zpool", ["status", "-j", "--json-flat-vdevs", "--json-int"]), :zpool_status)
pools = merge_pools(list_result, status_result)
errors = Enum.filter([list_err, status_err], & &1)
%{pools: pools, errors: errors}
end
@type dataset_summary :: %{
name: String.t(),
used_bytes: non_neg_integer(),
usedbysnapshots_bytes: non_neg_integer(),
snapshot_count: non_neg_integer(),
newest_snapshot_unix: non_neg_integer() | nil,
oldest_snapshot_unix: non_neg_integer() | nil
}
@spec collect_datasets(keyword()) :: %{datasets: [dataset_summary()], errors: [map()]}
def collect_datasets(opts \\ []) do
runner = runner(opts)
{list_result, err} =
decode(runner.("zfs", ["list", "-j", "--json-int", "-t", "all"]), :zfs_list)
datasets = summarize_datasets(list_result)
errors = if err, do: [err], else: []
%{datasets: datasets, errors: errors}
end
defp runner(opts), do: Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2)
defp decode({:ok, body}, _tag) do
case Jason.decode(body) do
{:ok, map} -> {map, nil}
{:error, e} -> {nil, %{tag: "decode", message: Exception.message(e)}}
end
end
defp decode({:error, reason}, tag), do: {nil, %{tag: Atom.to_string(tag), message: inspect(reason)}}
defp merge_pools(nil, _), do: []
defp merge_pools(_, nil), do: []
defp merge_pools(%{"pools" => list_pools}, %{"pools" => status_pools}) do
Enum.map(list_pools, fn {name, list_info} ->
status_info = Map.get(status_pools, name, %{})
vdevs = Map.get(status_info, "vdevs", %{}) |> Map.values()
%{
name: name,
health: Map.get(list_info, "health"),
size_bytes: Map.get(list_info, "size", 0),
allocated_bytes: Map.get(list_info, "alloc", 0),
free_bytes: Map.get(list_info, "free", 0),
fragmentation_percent: Map.get(list_info, "frag", 0),
capacity_percent: Map.get(list_info, "cap", 0),
error_count: to_int(Map.get(status_info, "error_count", "0")),
vdev_count: length(vdevs),
degraded_vdev_count: Enum.count(vdevs, &(&1["state"] != "ONLINE")),
last_scrub_end: get_in(status_info, ["scan", "end_time"])
}
end)
end
defp summarize_datasets(nil), do: []
defp summarize_datasets(%{"datasets" => datasets}) do
by_type = Enum.group_by(datasets, fn {_, d} -> d["type"] end)
filesystems = Map.get(by_type, "FILESYSTEM", [])
snapshots_by_ds = group_snapshots(Map.get(by_type, "SNAPSHOT", []))
Enum.map(filesystems, fn {_name, ds} ->
name = ds["name"]
snaps = Map.get(snapshots_by_ds, name, [])
%{
name: name,
used_bytes: get_prop_int(ds, "used"),
usedbysnapshots_bytes: get_prop_int(ds, "usedbysnapshots"),
snapshot_count: length(snaps),
newest_snapshot_unix: snaps |> Enum.map(& &1.creation) |> max_or_nil(),
oldest_snapshot_unix: snaps |> Enum.map(& &1.creation) |> min_or_nil()
}
end)
end
defp group_snapshots(snapshots) do
Enum.reduce(snapshots, %{}, fn {_, snap}, acc ->
[parent | _] = String.split(snap["name"], "@", parts: 2)
creation = get_prop_int(snap, "creation")
entry = %{name: snap["name"], creation: creation}
Map.update(acc, parent, [entry], &[entry | &1])
end)
end
defp get_prop_int(ds, key) do
case get_in(ds, ["properties", key, "value"]) do
nil -> 0
v -> to_int(v)
end
end
defp to_int(v) when is_integer(v), do: v
defp to_int(v) when is_binary(v), do: String.to_integer(v)
defp max_or_nil([]), do: nil
defp max_or_nil(list), do: Enum.max(list)
defp min_or_nil([]), do: nil
defp min_or_nil(list), do: Enum.min(list)
end
```
- [ ] **Step 5: Run tests — expect pass**
```bash
mix test test/proxmox_agent/collectors/zfs_test.exs 2>&1 | tail -5
```
Expected: 3 tests pass.
- [ ] **Step 6: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/collectors/zfs.ex agent/test/proxmox_agent/collectors/zfs_test.exs agent/test/fixtures/zfs
git commit -m "feat(agent): zfs collector for pools + datasets/snapshots with fixture tests"
```
---
## Task 7: Agent — Proxmox Storage Collector
**Files:**
- Create: `agent/test/fixtures/pvesh/storage.json`
- Create: `agent/test/proxmox_agent/collectors/storage_test.exs`
- Create: `agent/lib/proxmox_agent/collectors/storage.ex`
- [ ] **Step 1: Fixture**
Create `agent/test/fixtures/pvesh/storage.json`:
```json
[
{
"storage": "local",
"type": "dir",
"content": "backup,iso,vztmpl",
"active": 1,
"enabled": 1,
"used": 50000000000,
"total": 500000000000,
"avail": 450000000000,
"used_fraction": 0.1
},
{
"storage": "local-zfs",
"type": "zfspool",
"content": "images,rootdir",
"active": 1,
"enabled": 1,
"used": 200000000000,
"total": 500000000000,
"avail": 300000000000,
"used_fraction": 0.4
},
{
"storage": "backup-nfs",
"type": "nfs",
"content": "backup",
"active": 0,
"enabled": 1,
"used": 0,
"total": 0,
"avail": 0,
"used_fraction": 0.0
}
]
```
- [ ] **Step 2: Write failing test**
Create `agent/test/proxmox_agent/collectors/storage_test.exs`:
```elixir
defmodule ProxmoxAgent.Collectors.StorageTest do
use ExUnit.Case, async: true
alias ProxmoxAgent.Collectors.Storage
@fixtures Path.expand("../../fixtures/pvesh", __DIR__)
defp fake_runner do
fn
"pvesh", ["get", "/nodes/" <> _, "--output-format", "json"] ->
{:ok, File.read!(Path.join(@fixtures, "storage.json"))}
end
end
test "returns one summary per storage entry" do
sample = Storage.collect(node: "pve-01", runner: fake_runner())
assert length(sample.storages) == 3
local = Enum.find(sample.storages, &(&1.name == "local"))
assert local.type == "dir"
assert local.active == true
assert local.used_bytes == 50_000_000_000
assert local.total_bytes == 500_000_000_000
assert local.content == "backup,iso,vztmpl"
nfs = Enum.find(sample.storages, &(&1.name == "backup-nfs"))
assert nfs.active == false
end
test "populates errors on failure" do
failing = fn _, _ -> {:error, {:enoent, "pvesh"}} end
sample = Storage.collect(node: "pve-01", runner: failing)
assert sample.storages == []
assert sample.errors != []
end
end
```
- [ ] **Step 3: Run test — expect failure**
```bash
mix test test/proxmox_agent/collectors/storage_test.exs 2>&1 | tail -5
```
Expected: module undefined.
- [ ] **Step 4: Implement**
Create `agent/lib/proxmox_agent/collectors/storage.ex`:
```elixir
defmodule ProxmoxAgent.Collectors.Storage do
@moduledoc "Collects Proxmox storage summary via pvesh."
@spec collect(keyword()) :: %{storages: [map()], errors: [map()]}
def collect(opts \\ []) do
runner = Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2)
node = Keyword.fetch!(opts, :node)
case runner.("pvesh", ["get", "/nodes/#{node}/storage", "--output-format", "json"]) do
{:ok, body} ->
case Jason.decode(body) do
{:ok, list} when is_list(list) ->
%{storages: Enum.map(list, &normalize/1), errors: []}
{:error, e} ->
%{storages: [], errors: [%{tag: "decode", message: Exception.message(e)}]}
end
{:error, reason} ->
%{storages: [], errors: [%{tag: "pvesh", message: inspect(reason)}]}
end
end
defp normalize(entry) do
%{
name: entry["storage"],
type: entry["type"],
content: entry["content"],
active: entry["active"] == 1,
enabled: entry["enabled"] == 1,
used_bytes: entry["used"] || 0,
total_bytes: entry["total"] || 0,
avail_bytes: entry["avail"] || 0,
used_fraction: entry["used_fraction"] || 0.0
}
end
end
```
- [ ] **Step 5: Run test — expect pass**
```bash
mix test test/proxmox_agent/collectors/storage_test.exs 2>&1 | tail -5
```
Expected: 2 tests pass.
- [ ] **Step 6: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/collectors/storage.ex agent/test/proxmox_agent/collectors/storage_test.exs agent/test/fixtures/pvesh/storage.json
git commit -m "feat(agent): pvesh storage collector"
```
---
## Task 8: Agent — VM/LXC Collector
**Files:**
- Create: `agent/test/fixtures/pvesh/qemu.json`
- Create: `agent/test/fixtures/pvesh/lxc.json`
- Create: `agent/test/fixtures/pvesh/qemu_100_config.json`
- Create: `agent/test/fixtures/pvesh/qemu_100_agent_interfaces.json`
- Create: `agent/test/proxmox_agent/collectors/vms_test.exs`
- Create: `agent/lib/proxmox_agent/collectors/vms.ex`
- [ ] **Step 1: Fixtures**
Create `agent/test/fixtures/pvesh/qemu.json`:
```json
[
{
"vmid": 100,
"name": "nginx-proxy",
"status": "running",
"uptime": 86400,
"cpu": 0.05,
"mem": 536870912,
"maxmem": 2147483648,
"tags": "web;production"
},
{
"vmid": 101,
"name": "db-backup",
"status": "stopped",
"uptime": 0,
"cpu": 0,
"mem": 0,
"maxmem": 4294967296,
"tags": "db"
}
]
```
Create `agent/test/fixtures/pvesh/lxc.json`:
```json
[
{
"vmid": 200,
"name": "minecraft",
"status": "running",
"uptime": 3600,
"cpu": 0.15,
"mem": 2147483648,
"maxmem": 4294967296,
"tags": ""
}
]
```
Create `agent/test/fixtures/pvesh/qemu_100_config.json`:
```json
{
"name": "nginx-proxy",
"cores": 2,
"memory": 2048,
"onboot": 1,
"scsi0": "local-zfs:vm-100-disk-0,size=32G",
"net0": "virtio=AA:BB:CC:DD:EE:FF,bridge=vmbr0"
}
```
Create `agent/test/fixtures/pvesh/qemu_100_agent_interfaces.json`:
```json
{
"result": [
{
"name": "lo",
"ip-addresses": [
{ "ip-address": "127.0.0.1", "ip-address-type": "ipv4" }
]
},
{
"name": "eth0",
"ip-addresses": [
{ "ip-address": "192.168.1.10", "ip-address-type": "ipv4" },
{ "ip-address": "fe80::a", "ip-address-type": "ipv6" }
]
}
]
}
```
- [ ] **Step 2: Write failing tests**
Create `agent/test/proxmox_agent/collectors/vms_test.exs`:
```elixir
defmodule ProxmoxAgent.Collectors.VmsTest do
use ExUnit.Case, async: true
alias ProxmoxAgent.Collectors.Vms
@fixtures Path.expand("../../fixtures/pvesh", __DIR__)
defp read!(name), do: File.read!(Path.join(@fixtures, name))
defp fake_runner do
fn
"pvesh", ["get", "/nodes/" <> rest, "--output-format", "json"] ->
cond do
String.ends_with?(rest, "/qemu") ->
{:ok, read!("qemu.json")}
String.ends_with?(rest, "/lxc") ->
{:ok, read!("lxc.json")}
String.ends_with?(rest, "/qemu/100/config") ->
{:ok, read!("qemu_100_config.json")}
String.ends_with?(rest, "/qemu/100/agent/network-get-interfaces") ->
{:ok, read!("qemu_100_agent_interfaces.json")}
String.ends_with?(rest, "/qemu/101/config") ->
{:ok, ~s({"name":"db-backup","cores":4,"memory":4096})}
String.ends_with?(rest, "/qemu/101/agent/network-get-interfaces") ->
{:error, {:nonzero_exit, 1, "QEMU guest agent is not running"}}
String.ends_with?(rest, "/lxc/200/config") ->
{:ok, ~s({"hostname":"minecraft","cores":4,"memory":4096,"net0":"name=eth0,ip=10.0.0.5/24"})}
end
end
end
describe "collect_runtime/1" do
test "returns qemu + lxc runtime info" do
sample = Vms.collect_runtime(node: "pve-01", runner: fake_runner())
assert length(sample.vms) == 3
nginx = Enum.find(sample.vms, &(&1.vmid == 100))
assert nginx.type == "qemu"
assert nginx.name == "nginx-proxy"
assert nginx.status == "running"
assert nginx.cpu_usage == 0.05
assert nginx.mem_bytes == 536_870_912
assert nginx.max_mem_bytes == 2_147_483_648
assert nginx.tags == ["web", "production"]
mc = Enum.find(sample.vms, &(&1.vmid == 200))
assert mc.type == "lxc"
end
end
describe "collect_detail/1" do
test "returns per-VM config + IPs" do
sample = Vms.collect_detail(node: "pve-01", runner: fake_runner())
nginx = Enum.find(sample.vms, &(&1.vmid == 100))
assert nginx.config["cores"] == 2
assert nginx.config["memory"] == 2048
assert "192.168.1.10" in nginx.ips
db = Enum.find(sample.vms, &(&1.vmid == 101))
assert db.config["cores"] == 4
assert db.ips == []
assert length(db.errors) == 1
mc = Enum.find(sample.vms, &(&1.vmid == 200))
assert mc.config["hostname"] == "minecraft"
assert "10.0.0.5" in mc.ips
end
end
end
```
- [ ] **Step 3: Run tests — expect failure**
```bash
mix test test/proxmox_agent/collectors/vms_test.exs 2>&1 | tail -10
```
Expected: module undefined.
- [ ] **Step 4: Implement**
Create `agent/lib/proxmox_agent/collectors/vms.ex`:
```elixir
defmodule ProxmoxAgent.Collectors.Vms do
@moduledoc """
Collects VM/LXC runtime (fast path) and per-VM detail incl. IPs (medium path).
"""
@spec collect_runtime(keyword()) :: %{vms: [map()], errors: [map()]}
def collect_runtime(opts) do
runner = runner(opts)
node = Keyword.fetch!(opts, :node)
{qemu, e1} = list(runner, node, "qemu")
{lxc, e2} = list(runner, node, "lxc")
vms =
Enum.map(qemu, &normalize_runtime(&1, "qemu")) ++
Enum.map(lxc, &normalize_runtime(&1, "lxc"))
%{vms: vms, errors: Enum.filter([e1, e2], & &1)}
end
@spec collect_detail(keyword()) :: %{vms: [map()], errors: [map()]}
def collect_detail(opts) do
runner = runner(opts)
node = Keyword.fetch!(opts, :node)
{qemu, e1} = list(runner, node, "qemu")
{lxc, e2} = list(runner, node, "lxc")
qemu_details = Enum.map(qemu, &qemu_detail(runner, node, &1))
lxc_details = Enum.map(lxc, &lxc_detail(runner, node, &1))
%{vms: qemu_details ++ lxc_details, errors: Enum.filter([e1, e2], & &1)}
end
defp runner(opts), do: Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2)
defp list(runner, node, type) do
case runner.("pvesh", ["get", "/nodes/#{node}/#{type}", "--output-format", "json"]) do
{:ok, body} ->
case Jason.decode(body) do
{:ok, list} when is_list(list) -> {list, nil}
{:error, e} -> {[], %{tag: "decode_#{type}", message: Exception.message(e)}}
end
{:error, reason} ->
{[], %{tag: "list_#{type}", message: inspect(reason)}}
end
end
defp normalize_runtime(entry, type) do
%{
vmid: entry["vmid"],
type: type,
name: entry["name"] || entry["hostname"],
status: entry["status"],
uptime_seconds: entry["uptime"] || 0,
cpu_usage: entry["cpu"] || 0.0,
mem_bytes: entry["mem"] || 0,
max_mem_bytes: entry["maxmem"] || 0,
tags: parse_tags(entry["tags"])
}
end
defp parse_tags(nil), do: []
defp parse_tags(""), do: []
defp parse_tags(str) when is_binary(str) do
str |> String.split([";", ","], trim: true) |> Enum.map(&String.trim/1)
end
defp qemu_detail(runner, node, entry) do
vmid = entry["vmid"]
{config, cfg_err} = fetch_json(runner, "/nodes/#{node}/qemu/#{vmid}/config")
{ips, ip_err} = fetch_qemu_agent_ips(runner, node, vmid)
%{
vmid: vmid,
type: "qemu",
name: entry["name"],
config: config || %{},
ips: ips,
errors: Enum.filter([cfg_err, ip_err], & &1)
}
end
defp lxc_detail(runner, node, entry) do
vmid = entry["vmid"]
{config, cfg_err} = fetch_json(runner, "/nodes/#{node}/lxc/#{vmid}/config")
ips = extract_lxc_ips(config || %{})
%{
vmid: vmid,
type: "lxc",
name: entry["name"],
config: config || %{},
ips: ips,
errors: Enum.filter([cfg_err], & &1)
}
end
defp fetch_json(runner, path) do
case runner.("pvesh", ["get", path, "--output-format", "json"]) do
{:ok, body} ->
case Jason.decode(body) do
{:ok, map} -> {map, nil}
{:error, e} -> {nil, %{tag: "decode", message: Exception.message(e)}}
end
{:error, reason} ->
{nil, %{tag: "pvesh", message: inspect(reason)}}
end
end
defp fetch_qemu_agent_ips(runner, node, vmid) do
case runner.("pvesh", [
"get",
"/nodes/#{node}/qemu/#{vmid}/agent/network-get-interfaces",
"--output-format",
"json"
]) do
{:ok, body} ->
case Jason.decode(body) do
{:ok, %{"result" => interfaces}} ->
ips =
interfaces
|> Enum.reject(&(&1["name"] == "lo"))
|> Enum.flat_map(&Map.get(&1, "ip-addresses", []))
|> Enum.filter(&(&1["ip-address-type"] == "ipv4"))
|> Enum.map(& &1["ip-address"])
{ips, nil}
_ ->
{[], %{tag: "agent_ips", message: "unexpected shape"}}
end
{:error, _reason} ->
{[], nil}
end
end
defp extract_lxc_ips(config) do
config
|> Enum.filter(fn {k, _} -> String.starts_with?(to_string(k), "net") end)
|> Enum.flat_map(fn {_, val} -> parse_lxc_net(val) end)
end
defp parse_lxc_net(val) when is_binary(val) do
val
|> String.split(",")
|> Enum.find_value([], fn pair ->
case String.split(pair, "=", parts: 2) do
["ip", ip] ->
ip = ip |> String.split("/") |> hd()
if ip == "dhcp", do: [], else: [ip]
_ ->
nil
end
end)
end
defp parse_lxc_net(_), do: []
end
```
- [ ] **Step 5: Run tests — expect pass**
```bash
mix test test/proxmox_agent/collectors/vms_test.exs 2>&1 | tail -5
```
Expected: 2 tests pass.
- [ ] **Step 6: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/collectors/vms.ex agent/test/proxmox_agent/collectors/vms_test.exs agent/test/fixtures/pvesh
git commit -m "feat(agent): vms/lxc collectors for runtime and detail with fixtures"
```
---
## Task 9: Agent — System Info Collector
**Files:**
- Create: `agent/test/fixtures/system/pveversion.txt`
- Create: `agent/test/fixtures/system/zfs_version.txt`
- Create: `agent/test/fixtures/system/apt_upgradable.txt`
- Create: `agent/test/proxmox_agent/collectors/system_info_test.exs`
- Create: `agent/lib/proxmox_agent/collectors/system_info.ex`
- [ ] **Step 1: Fixtures**
Create `agent/test/fixtures/system/pveversion.txt`:
```
pve-manager/8.3.1/abc123 (running kernel: 6.8.12-1-pve)
```
Create `agent/test/fixtures/system/zfs_version.txt`:
```
zfs-2.3.0-pve2
zfs-kmod-2.3.0-pve2
```
Create `agent/test/fixtures/system/apt_upgradable.txt`:
```
Listing...
libssl3/stable 3.0.11-1~deb12u2 amd64 [upgradable from: 3.0.11-1~deb12u1]
openssh-server/stable 1:9.2p1-2+deb12u3 amd64 [upgradable from: 1:9.2p1-2+deb12u2]
```
- [ ] **Step 2: Write failing test**
Create `agent/test/proxmox_agent/collectors/system_info_test.exs`:
```elixir
defmodule ProxmoxAgent.Collectors.SystemInfoTest do
use ExUnit.Case, async: true
alias ProxmoxAgent.Collectors.SystemInfo
@fixtures Path.expand("../../fixtures/system", __DIR__)
defp fake_runner do
fn
"pveversion", [] -> {:ok, File.read!(Path.join(@fixtures, "pveversion.txt"))}
"zfs", ["--version"] -> {:ok, File.read!(Path.join(@fixtures, "zfs_version.txt"))}
"apt", ["list", "--upgradable"] -> {:ok, File.read!(Path.join(@fixtures, "apt_upgradable.txt"))}
end
end
test "collects pveversion, zfs version and pending upgrade count" do
sample = SystemInfo.collect(runner: fake_runner())
assert sample.pve_version =~ "pve-manager/8.3.1"
assert sample.zfs_version =~ "2.3.0"
assert sample.pending_updates == 2
assert sample.errors == []
end
test "partial sample when one command fails" do
partial = fn
"pveversion", [] -> {:ok, "pve-manager/8.3.1/abc (running kernel: 6.8.12-1-pve)\n"}
"zfs", ["--version"] -> {:error, {:enoent, "zfs"}}
"apt", ["list", "--upgradable"] -> {:ok, "Listing...\n"}
end
sample = SystemInfo.collect(runner: partial)
assert sample.pve_version =~ "8.3.1"
assert sample.zfs_version == nil
assert sample.pending_updates == 0
assert length(sample.errors) == 1
end
end
```
- [ ] **Step 3: Run test — expect failure**
```bash
mix test test/proxmox_agent/collectors/system_info_test.exs 2>&1 | tail -5
```
Expected: module undefined.
- [ ] **Step 4: Implement**
Create `agent/lib/proxmox_agent/collectors/system_info.ex`:
```elixir
defmodule ProxmoxAgent.Collectors.SystemInfo do
@moduledoc "Slow-path system metadata: pveversion, zfs version, apt upgradable count."
@spec collect(keyword()) :: %{
pve_version: String.t() | nil,
zfs_version: String.t() | nil,
pending_updates: non_neg_integer(),
agent_version: String.t(),
errors: [map()]
}
def collect(opts \\ []) do
runner = Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2)
{pve, e1} = trim_output(runner.("pveversion", []), :pveversion)
{zfs, e2} = trim_output(runner.("zfs", ["--version"]), :zfs_version)
{apt, e3} = runner.("apt", ["list", "--upgradable"]) |> count_upgrades()
%{
pve_version: pve,
zfs_version: zfs,
pending_updates: apt,
agent_version: ProxmoxAgent.version(),
errors: Enum.filter([e1, e2, e3], & &1)
}
end
defp trim_output({:ok, text}, _tag), do: {String.trim(text) |> first_line(), nil}
defp trim_output({:error, reason}, tag),
do: {nil, %{tag: Atom.to_string(tag), message: inspect(reason)}}
defp first_line(str), do: str |> String.split("\n", parts: 2) |> hd()
defp count_upgrades({:ok, text}) do
count =
text
|> String.split("\n", trim: true)
|> Enum.count(&String.contains?(&1, "[upgradable"))
{count, nil}
end
defp count_upgrades({:error, reason}),
do: {0, %{tag: "apt_upgradable", message: inspect(reason)}}
end
```
- [ ] **Step 5: Run test — expect pass**
```bash
mix test test/proxmox_agent/collectors/system_info_test.exs 2>&1 | tail -5
```
Expected: 2 tests pass.
- [ ] **Step 6: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/collectors/system_info.ex agent/test/proxmox_agent/collectors/system_info_test.exs agent/test/fixtures/system
git commit -m "feat(agent): system info collector for pveversion/zfs/apt"
```
---
## Task 10: Agent — Reporter Schedules Medium + Slow
**Files:**
- Modify: `agent/lib/proxmox_agent/reporter.ex`
- [ ] **Step 1: Update Reporter to schedule all three intervals and bundle multi-collector payloads**
Replace the existing `handle_info(:collect_fast, socket)` clause and add medium/slow. Full new module body:
```elixir
defmodule ProxmoxAgent.Reporter do
@moduledoc """
Maintains a persistent Phoenix Channel connection to the server, joins
`host:<host_id>`, and pushes metric samples on fast/medium/slow intervals.
Payload contract:
metric:fast => %{host, zfs_pools, storage, vms_runtime}
metric:medium => %{zfs_datasets, vms_detail}
metric:slow => %{system_info}
"""
use Slipstream, restart: :permanent
require Logger
alias ProxmoxAgent.Collectors.{Host, Storage, SystemInfo, Vms, Zfs}
def start_link(%ProxmoxAgent.Config{} = cfg) do
Slipstream.start_link(__MODULE__, cfg, name: __MODULE__)
end
@impl Slipstream
def init(cfg) do
socket =
new_socket()
|> assign(:cfg, cfg)
|> assign(:topic, "host:" <> cfg.host_id)
|> connect!(uri: cfg.server_url)
{:ok, socket}
end
@impl Slipstream
def handle_connect(socket) do
topic = socket.assigns.topic
cfg = socket.assigns.cfg
payload = %{"token" => cfg.token, "agent_version" => ProxmoxAgent.version()}
Logger.info("reporter: connected, joining #{topic}")
{:ok, join(socket, topic, payload)}
end
@impl Slipstream
def handle_join(topic, _reply, socket) do
Logger.info("reporter: joined #{topic}")
send(self(), :collect_fast)
send(self(), :collect_medium)
send(self(), :collect_slow)
{:ok, socket}
end
@impl Slipstream
def handle_info(:collect_fast, socket) do
cfg = socket.assigns.cfg
data = %{
host: Host.collect(),
zfs_pools: Zfs.collect_pools(),
storage: Storage.collect(node: cfg.host_id),
vms_runtime: Vms.collect_runtime(node: cfg.host_id)
}
push_metric(socket, "metric:fast", data)
Process.send_after(self(), :collect_fast, cfg.fast_seconds * 1000)
{:noreply, socket}
end
def handle_info(:collect_medium, socket) do
cfg = socket.assigns.cfg
data = %{
zfs_datasets: Zfs.collect_datasets(),
vms_detail: Vms.collect_detail(node: cfg.host_id)
}
push_metric(socket, "metric:medium", data)
Process.send_after(self(), :collect_medium, cfg.medium_seconds * 1000)
{:noreply, socket}
end
def handle_info(:collect_slow, socket) do
cfg = socket.assigns.cfg
data = %{system_info: SystemInfo.collect()}
push_metric(socket, "metric:slow", data)
Process.send_after(self(), :collect_slow, cfg.slow_seconds * 1000)
{:noreply, socket}
end
@impl Slipstream
def handle_disconnect(reason, socket) do
Logger.warning("reporter: disconnected — #{inspect(reason)}; reconnecting")
reconnect(socket)
end
@impl Slipstream
def handle_topic_close(topic, reason, socket) do
Logger.warning("reporter: topic #{topic} closed: #{inspect(reason)}; rejoining")
rejoin(socket, topic)
end
defp push_metric(socket, event, data) do
payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: data}
case push(socket, socket.assigns.topic, event, payload) do
{:ok, _ref} ->
:ok
{:error, reason} ->
Logger.warning("reporter: push #{event} failed: #{inspect(reason)}")
:ok
end
end
end
```
- [ ] **Step 2: Compile and run all agent tests**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor/agent
mix compile --warnings-as-errors 2>&1 | tail -5
mix test 2>&1 | tail -5
```
Expected: compile clean, all tests pass (existing + new collector tests).
- [ ] **Step 3: Commit**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor
git add agent/lib/proxmox_agent/reporter.ex
git commit -m "feat(agent): reporter schedules fast/medium/slow collection with bundled payloads"
```
---
## Task 11: End-to-End Smoke Test
**Goal:** Agent runs against local server, all three intervals produce rows in `metrics`, `GET /api/hosts/pve-dev-01` returns the latest samples.
- [ ] **Step 1: Start server**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor/server
mix ecto.migrate
mix phx.server
```
Run in a separate terminal/background. Wait for `Running ServerWeb.Endpoint` log line.
- [ ] **Step 2: Re-use or create a host token**
Previous Phase 1 smoke test registered `pve-dev-01`. If that host still exists, use its token. If not:
```bash
mix run -e 'Server.Release.register_host("pve-dev-01")'
```
Copy the printed TOKEN.
- [ ] **Step 3: Write agent config**
```bash
cat > /tmp/agent-local.toml <<EOF
server_url = "ws://localhost:4000/socket/websocket"
token = "<TOKEN>"
host_id = "pve-dev-01"
[intervals]
fast_seconds = 5
medium_seconds = 10
slow_seconds = 20
EOF
```
- [ ] **Step 4: Start agent**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor/agent
AGENT_CONFIG=/tmp/agent-local.toml mix run --no-halt
```
- [ ] **Step 5: Wait 25 seconds then check the API**
```bash
sleep 25
curl -s http://localhost:4000/api/hosts/pve-dev-01 | python3 -m json.tool
```
Expected: JSON containing `name`, `status: "online"`, and a `samples` object with keys `fast`, `medium`, `slow`, each with `collected_at` and `payload`. On macOS the payload will contain `errors` for `/proc`, `zpool`, `zfs`, `pvesh`, `pveversion`, `apt` — that's correct: the agent is architecturally sound, it just can't actually read Proxmox data from a Mac.
- [ ] **Step 6: Verify row counts grow**
```bash
cd /Users/cabele/claudeprojects/proxmox_monitor/server
mix run -e 'IO.inspect(Server.Repo.aggregate(Server.Schema.Metric, :count, :id))'
```
Expected: a number >= 5 after ~25 seconds of runtime (fast every 5s = ~5 rows, medium every 10s = ~2-3 rows, slow every 20s = ~1-2 rows).
- [ ] **Step 7: Stop agent, clean up**
```bash
# Stop agent with Ctrl+C,a (or pkill for automation)
rm /tmp/agent-local.toml
```
No code changes; no commit.
---
## Phase 2 Exit Criteria
- `cd server && mix test` — all green.
- `cd agent && mix test` — all green.
- Smoke test: agent pushes three intervals; DB grows; API returns structured samples.
- Retention GenServer running in server supervision tree.
- All commits on `main`.
Next up (Phase 3): LiveView dashboard. See roadmap in `proxmox-monitor-konzept.md`.