From 663f7a611399f5880e00492ea14ecd8902c91d78 Mon Sep 17 00:00:00 2001 From: Carsten Date: Tue, 21 Apr 2026 22:36:14 +0200 Subject: [PATCH] feat(agent): reporter schedules fast/medium/slow collection with bundled payloads --- agent/lib/proxmox_agent/reporter.ex | 55 ++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/agent/lib/proxmox_agent/reporter.ex b/agent/lib/proxmox_agent/reporter.ex index 38dbcc6..df99161 100644 --- a/agent/lib/proxmox_agent/reporter.ex +++ b/agent/lib/proxmox_agent/reporter.ex @@ -1,13 +1,18 @@ defmodule ProxmoxAgent.Reporter do @moduledoc """ Maintains a persistent Phoenix Channel connection to the server, joins - `host:`, and pushes metric samples on the configured fast interval. + `host:`, and pushes metric samples on fast/medium/slow intervals. + + Payload contract: + metric:fast => %{host, zfs_pools, storage, vms_runtime} + metric:medium => %{zfs_datasets, vms_detail} + metric:slow => %{system_info} """ use Slipstream, restart: :permanent require Logger - alias ProxmoxAgent.Collectors.Host + alias ProxmoxAgent.Collectors.{Host, Storage, SystemInfo, Vms, Zfs} def start_link(%ProxmoxAgent.Config{} = cfg) do Slipstream.start_link(__MODULE__, cfg, name: __MODULE__) @@ -38,15 +43,47 @@ defmodule ProxmoxAgent.Reporter do def handle_join(topic, _reply, socket) do Logger.info("reporter: joined #{topic}") send(self(), :collect_fast) + send(self(), :collect_medium) + send(self(), :collect_slow) {:ok, socket} end @impl Slipstream def handle_info(:collect_fast, socket) do - sample = Host.collect() - payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: sample} - :ok = push_metric(socket, "metric:fast", payload) - Process.send_after(self(), :collect_fast, socket.assigns.cfg.fast_seconds * 1000) + cfg = socket.assigns.cfg + + data = %{ + host: Host.collect(), + zfs_pools: Zfs.collect_pools(), + storage: Storage.collect(node: cfg.host_id), + vms_runtime: Vms.collect_runtime(node: cfg.host_id) + } + + push_metric(socket, "metric:fast", data) + Process.send_after(self(), :collect_fast, cfg.fast_seconds * 1000) + {:noreply, socket} + end + + def handle_info(:collect_medium, socket) do + cfg = socket.assigns.cfg + + data = %{ + zfs_datasets: Zfs.collect_datasets(), + vms_detail: Vms.collect_detail(node: cfg.host_id) + } + + push_metric(socket, "metric:medium", data) + Process.send_after(self(), :collect_medium, cfg.medium_seconds * 1000) + {:noreply, socket} + end + + def handle_info(:collect_slow, socket) do + cfg = socket.assigns.cfg + + data = %{system_info: SystemInfo.collect()} + + push_metric(socket, "metric:slow", data) + Process.send_after(self(), :collect_slow, cfg.slow_seconds * 1000) {:noreply, socket} end @@ -62,13 +99,15 @@ defmodule ProxmoxAgent.Reporter do rejoin(socket, topic) end - defp push_metric(socket, event, payload) do + defp push_metric(socket, event, data) do + payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: data} + case push(socket, socket.assigns.topic, event, payload) do {:ok, _ref} -> :ok {:error, reason} -> - Logger.warning("reporter: push failed: #{inspect(reason)}") + Logger.warning("reporter: push #{event} failed: #{inspect(reason)}") :ok end end