defmodule ProxmoxAgent.Reporter do @moduledoc """ Maintains a persistent Phoenix Channel connection to the server, joins `host:`, and pushes metric samples on fast/medium/slow intervals. Payload contract: metric:fast => %{host, zfs_pools, storage, vms_runtime} metric:medium => %{zfs_datasets, vms_detail} metric:slow => %{system_info} """ use Slipstream, restart: :permanent require Logger alias ProxmoxAgent.Collectors.{Host, Storage, SystemInfo, Vms, Zfs} def start_link(%ProxmoxAgent.Config{} = cfg) do Slipstream.start_link(__MODULE__, cfg, name: __MODULE__) end @impl Slipstream def init(cfg) do socket = new_socket() |> assign(:cfg, cfg) |> assign(:topic, "host:" <> cfg.host_id) |> connect!(uri: cfg.server_url) {:ok, socket} end @impl Slipstream def handle_connect(socket) do topic = socket.assigns.topic cfg = socket.assigns.cfg payload = %{"token" => cfg.token, "agent_version" => ProxmoxAgent.version()} Logger.info("reporter: connected, joining #{topic}") {:ok, join(socket, topic, payload)} end @impl Slipstream def handle_join(topic, _reply, socket) do Logger.info("reporter: joined #{topic}") send(self(), :collect_fast) send(self(), :collect_medium) send(self(), :collect_slow) {:ok, socket} end @impl Slipstream def handle_info(:collect_fast, socket) do cfg = socket.assigns.cfg data = %{ host: Host.collect(), zfs_pools: Zfs.collect_pools(), storage: Storage.collect(node: cfg.host_id), vms_runtime: Vms.collect_runtime(node: cfg.host_id) } push_metric(socket, "metric:fast", data) Process.send_after(self(), :collect_fast, cfg.fast_seconds * 1000) {:noreply, socket} end def handle_info(:collect_medium, socket) do cfg = socket.assigns.cfg data = %{ zfs_datasets: Zfs.collect_datasets(), vms_detail: Vms.collect_detail(node: cfg.host_id) } push_metric(socket, "metric:medium", data) Process.send_after(self(), :collect_medium, cfg.medium_seconds * 1000) {:noreply, socket} end def handle_info(:collect_slow, socket) do cfg = socket.assigns.cfg data = %{system_info: SystemInfo.collect()} push_metric(socket, "metric:slow", data) Process.send_after(self(), :collect_slow, cfg.slow_seconds * 1000) {:noreply, socket} end @impl Slipstream def handle_disconnect(reason, socket) do Logger.warning("reporter: disconnected — #{inspect(reason)}; reconnecting") reconnect(socket) end @impl Slipstream def handle_topic_close(topic, reason, socket) do Logger.warning("reporter: topic #{topic} closed: #{inspect(reason)}; rejoining") rejoin(socket, topic) end defp push_metric(socket, event, data) do payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: data} case push(socket, socket.assigns.topic, event, payload) do {:ok, _ref} -> :ok {:error, reason} -> Logger.warning("reporter: push #{event} failed: #{inspect(reason)}") :ok end end end