feat(agent): reporter schedules fast/medium/slow collection with bundled payloads

This commit is contained in:
Carsten 2026-04-21 22:36:14 +02:00
parent 61fa959d52
commit 663f7a6113

View file

@ -1,13 +1,18 @@
defmodule ProxmoxAgent.Reporter do defmodule ProxmoxAgent.Reporter do
@moduledoc """ @moduledoc """
Maintains a persistent Phoenix Channel connection to the server, joins Maintains a persistent Phoenix Channel connection to the server, joins
`host:<host_id>`, and pushes metric samples on the configured fast interval. `host:<host_id>`, and pushes metric samples on fast/medium/slow intervals.
Payload contract:
metric:fast => %{host, zfs_pools, storage, vms_runtime}
metric:medium => %{zfs_datasets, vms_detail}
metric:slow => %{system_info}
""" """
use Slipstream, restart: :permanent use Slipstream, restart: :permanent
require Logger require Logger
alias ProxmoxAgent.Collectors.Host alias ProxmoxAgent.Collectors.{Host, Storage, SystemInfo, Vms, Zfs}
def start_link(%ProxmoxAgent.Config{} = cfg) do def start_link(%ProxmoxAgent.Config{} = cfg) do
Slipstream.start_link(__MODULE__, cfg, name: __MODULE__) Slipstream.start_link(__MODULE__, cfg, name: __MODULE__)
@ -38,15 +43,47 @@ defmodule ProxmoxAgent.Reporter do
def handle_join(topic, _reply, socket) do def handle_join(topic, _reply, socket) do
Logger.info("reporter: joined #{topic}") Logger.info("reporter: joined #{topic}")
send(self(), :collect_fast) send(self(), :collect_fast)
send(self(), :collect_medium)
send(self(), :collect_slow)
{:ok, socket} {:ok, socket}
end end
@impl Slipstream @impl Slipstream
def handle_info(:collect_fast, socket) do def handle_info(:collect_fast, socket) do
sample = Host.collect() cfg = socket.assigns.cfg
payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: sample}
:ok = push_metric(socket, "metric:fast", payload) data = %{
Process.send_after(self(), :collect_fast, socket.assigns.cfg.fast_seconds * 1000) host: Host.collect(),
zfs_pools: Zfs.collect_pools(),
storage: Storage.collect(node: cfg.host_id),
vms_runtime: Vms.collect_runtime(node: cfg.host_id)
}
push_metric(socket, "metric:fast", data)
Process.send_after(self(), :collect_fast, cfg.fast_seconds * 1000)
{:noreply, socket}
end
def handle_info(:collect_medium, socket) do
cfg = socket.assigns.cfg
data = %{
zfs_datasets: Zfs.collect_datasets(),
vms_detail: Vms.collect_detail(node: cfg.host_id)
}
push_metric(socket, "metric:medium", data)
Process.send_after(self(), :collect_medium, cfg.medium_seconds * 1000)
{:noreply, socket}
end
def handle_info(:collect_slow, socket) do
cfg = socket.assigns.cfg
data = %{system_info: SystemInfo.collect()}
push_metric(socket, "metric:slow", data)
Process.send_after(self(), :collect_slow, cfg.slow_seconds * 1000)
{:noreply, socket} {:noreply, socket}
end end
@ -62,13 +99,15 @@ defmodule ProxmoxAgent.Reporter do
rejoin(socket, topic) rejoin(socket, topic)
end end
defp push_metric(socket, event, payload) do defp push_metric(socket, event, data) do
payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: data}
case push(socket, socket.assigns.topic, event, payload) do case push(socket, socket.assigns.topic, event, payload) do
{:ok, _ref} -> {:ok, _ref} ->
:ok :ok
{:error, reason} -> {:error, reason} ->
Logger.warning("reporter: push failed: #{inspect(reason)}") Logger.warning("reporter: push #{event} failed: #{inspect(reason)}")
:ok :ok
end end
end end