From 3ae38f95a9a2ecb32e5ca472c70a69450bddbf4b Mon Sep 17 00:00:00 2001 From: Carsten Date: Tue, 21 Apr 2026 22:08:57 +0200 Subject: [PATCH] =?UTF-8?q?feat(agent):=20slipstream=20reporter=20?= =?UTF-8?q?=E2=80=94=20join,=20push,=20auto-reconnect?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/lib/proxmox_agent/collectors/host.ex | 2 - agent/lib/proxmox_agent/reporter.ex | 75 ++++++++++++++++++++++ 2 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 agent/lib/proxmox_agent/reporter.ex diff --git a/agent/lib/proxmox_agent/collectors/host.ex b/agent/lib/proxmox_agent/collectors/host.ex index 23f8e95..2f7971d 100644 --- a/agent/lib/proxmox_agent/collectors/host.ex +++ b/agent/lib/proxmox_agent/collectors/host.ex @@ -47,8 +47,6 @@ defmodule ProxmoxAgent.Collectors.Host do {fun.(), nil} rescue e -> {fallback, {tag, Exception.message(e)}} - catch - :error, reason -> {fallback, {tag, reason}} end end diff --git a/agent/lib/proxmox_agent/reporter.ex b/agent/lib/proxmox_agent/reporter.ex new file mode 100644 index 0000000..12d4bef --- /dev/null +++ b/agent/lib/proxmox_agent/reporter.ex @@ -0,0 +1,75 @@ +defmodule ProxmoxAgent.Reporter do + @moduledoc """ + Maintains a persistent Phoenix Channel connection to the server, joins + `host:`, and pushes metric samples on the configured fast interval. + """ + + use Slipstream, restart: :permanent + require Logger + + alias ProxmoxAgent.Collectors.Host + + def start_link(%ProxmoxAgent.Config{} = cfg) do + Slipstream.start_link(__MODULE__, cfg, name: __MODULE__) + end + + @impl Slipstream + def init(cfg) do + socket = + new_socket() + |> assign(:cfg, cfg) + |> assign(:topic, "host:" <> cfg.host_id) + |> connect!(uri: cfg.server_url) + + {:ok, socket} + end + + @impl Slipstream + def handle_connect(socket) do + topic = socket.assigns.topic + cfg = socket.assigns.cfg + + payload = %{"token" => cfg.token, "agent_version" => ProxmoxAgent.version()} + Logger.info("reporter: connected, joining #{topic}") + {:ok, join(socket, topic, payload)} + end + + @impl Slipstream + def handle_join(topic, _reply, socket) do + Logger.info("reporter: joined #{topic}") + send(self(), :collect_fast) + {:ok, socket} + end + + @impl Slipstream + def handle_info(:collect_fast, socket) do + sample = Host.collect() + payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: sample} + :ok = push_metric(socket, "metric:fast", payload) + Process.send_after(self(), :collect_fast, socket.assigns.cfg.fast_seconds * 1000) + {:ok, socket} + end + + @impl Slipstream + def handle_disconnect(reason, socket) do + Logger.warning("reporter: disconnected — #{inspect(reason)}; reconnecting") + reconnect(socket) + end + + @impl Slipstream + def handle_topic_close(topic, reason, socket) do + Logger.warning("reporter: topic #{topic} closed: #{inspect(reason)}; rejoining") + rejoin(socket, topic) + end + + defp push_metric(socket, event, payload) do + case push(socket, socket.assigns.topic, event, payload) do + {:ok, _ref} -> + :ok + + {:error, reason} -> + Logger.warning("reporter: push failed: #{inspect(reason)}") + :ok + end + end +end