defmodule ProxmoxAgent.Collectors.Zfs do @moduledoc """ Collects ZFS pool health (fast path) and dataset/snapshot info (medium path). Delegates shelling out to an injectable runner so tests can supply fixtures. """ @type pool_summary :: %{ name: String.t(), health: String.t(), size_bytes: non_neg_integer(), allocated_bytes: non_neg_integer(), free_bytes: non_neg_integer(), fragmentation_percent: non_neg_integer(), capacity_percent: non_neg_integer(), error_count: non_neg_integer(), vdev_count: non_neg_integer(), degraded_vdev_count: non_neg_integer(), last_scrub_end: String.t() | nil } @spec collect_pools(keyword()) :: %{pools: [pool_summary()], errors: [map()]} def collect_pools(opts \\ []) do runner = runner(opts) {list_result, list_err} = decode(runner.("zpool", ["list", "-j", "--json-int"]), :zpool_list) {status_result, status_err} = decode(runner.("zpool", ["status", "-j", "--json-flat-vdevs", "--json-int"]), :zpool_status) pools = merge_pools(list_result, status_result) errors = Enum.filter([list_err, status_err], & &1) %{pools: pools, errors: errors} end @type dataset_summary :: %{ name: String.t(), used_bytes: non_neg_integer(), usedbysnapshots_bytes: non_neg_integer(), snapshot_count: non_neg_integer(), newest_snapshot_unix: non_neg_integer() | nil, oldest_snapshot_unix: non_neg_integer() | nil } @spec collect_datasets(keyword()) :: %{datasets: [dataset_summary()], errors: [map()]} def collect_datasets(opts \\ []) do runner = runner(opts) {list_result, err} = decode(runner.("zfs", ["list", "-j", "--json-int", "-t", "all"]), :zfs_list) datasets = summarize_datasets(list_result) errors = if err, do: [err], else: [] %{datasets: datasets, errors: errors} end defp runner(opts), do: Keyword.get(opts, :runner, &ProxmoxAgent.Shell.run/2) defp decode({:ok, body}, _tag) do case Jason.decode(body) do {:ok, map} -> {map, nil} {:error, e} -> {nil, %{tag: "decode", message: Exception.message(e)}} end end defp decode({:error, reason}, tag), do: {nil, %{tag: Atom.to_string(tag), message: inspect(reason)}} defp merge_pools(nil, _), do: [] defp merge_pools(_, nil), do: [] defp merge_pools(%{"pools" => list_pools}, %{"pools" => status_pools}) do Enum.map(list_pools, fn {name, list_info} -> status_info = Map.get(status_pools, name, %{}) vdevs = Map.get(status_info, "vdevs", %{}) |> Map.values() %{ name: name, health: Map.get(list_info, "health"), size_bytes: Map.get(list_info, "size", 0), allocated_bytes: Map.get(list_info, "alloc", 0), free_bytes: Map.get(list_info, "free", 0), fragmentation_percent: Map.get(list_info, "frag", 0), capacity_percent: Map.get(list_info, "cap", 0), error_count: to_int(Map.get(status_info, "error_count", "0")), vdev_count: length(vdevs), degraded_vdev_count: Enum.count(vdevs, &(&1["state"] != "ONLINE")), last_scrub_end: get_in(status_info, ["scan", "end_time"]) } end) end defp summarize_datasets(nil), do: [] defp summarize_datasets(%{"datasets" => datasets}) do by_type = Enum.group_by(datasets, fn {_, d} -> d["type"] end) filesystems = Map.get(by_type, "FILESYSTEM", []) snapshots_by_ds = group_snapshots(Map.get(by_type, "SNAPSHOT", [])) Enum.map(filesystems, fn {_name, ds} -> name = ds["name"] snaps = Map.get(snapshots_by_ds, name, []) %{ name: name, used_bytes: get_prop_int(ds, "used"), usedbysnapshots_bytes: get_prop_int(ds, "usedbysnapshots"), snapshot_count: length(snaps), newest_snapshot_unix: snaps |> Enum.map(& &1.creation) |> max_or_nil(), oldest_snapshot_unix: snaps |> Enum.map(& &1.creation) |> min_or_nil() } end) end defp group_snapshots(snapshots) do Enum.reduce(snapshots, %{}, fn {_, snap}, acc -> [parent | _] = String.split(snap["name"], "@", parts: 2) creation = get_prop_int(snap, "creation") entry = %{name: snap["name"], creation: creation} Map.update(acc, parent, [entry], &[entry | &1]) end) end defp get_prop_int(ds, key) do case get_in(ds, ["properties", key, "value"]) do nil -> 0 v -> to_int(v) end end defp to_int(v) when is_integer(v), do: v defp to_int(v) when is_binary(v), do: String.to_integer(v) defp max_or_nil([]), do: nil defp max_or_nil(list), do: Enum.max(list) defp min_or_nil([]), do: nil defp min_or_nil(list), do: Enum.min(list) end