feat(agent): slipstream reporter — join, push, auto-reconnect
This commit is contained in:
parent
ce828084c8
commit
3ae38f95a9
2 changed files with 75 additions and 2 deletions
75
agent/lib/proxmox_agent/reporter.ex
Normal file
75
agent/lib/proxmox_agent/reporter.ex
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
defmodule ProxmoxAgent.Reporter do
|
||||
@moduledoc """
|
||||
Maintains a persistent Phoenix Channel connection to the server, joins
|
||||
`host:<host_id>`, and pushes metric samples on the configured fast interval.
|
||||
"""
|
||||
|
||||
use Slipstream, restart: :permanent
|
||||
require Logger
|
||||
|
||||
alias ProxmoxAgent.Collectors.Host
|
||||
|
||||
def start_link(%ProxmoxAgent.Config{} = cfg) do
|
||||
Slipstream.start_link(__MODULE__, cfg, name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl Slipstream
|
||||
def init(cfg) do
|
||||
socket =
|
||||
new_socket()
|
||||
|> assign(:cfg, cfg)
|
||||
|> assign(:topic, "host:" <> cfg.host_id)
|
||||
|> connect!(uri: cfg.server_url)
|
||||
|
||||
{:ok, socket}
|
||||
end
|
||||
|
||||
@impl Slipstream
|
||||
def handle_connect(socket) do
|
||||
topic = socket.assigns.topic
|
||||
cfg = socket.assigns.cfg
|
||||
|
||||
payload = %{"token" => cfg.token, "agent_version" => ProxmoxAgent.version()}
|
||||
Logger.info("reporter: connected, joining #{topic}")
|
||||
{:ok, join(socket, topic, payload)}
|
||||
end
|
||||
|
||||
@impl Slipstream
|
||||
def handle_join(topic, _reply, socket) do
|
||||
Logger.info("reporter: joined #{topic}")
|
||||
send(self(), :collect_fast)
|
||||
{:ok, socket}
|
||||
end
|
||||
|
||||
@impl Slipstream
|
||||
def handle_info(:collect_fast, socket) do
|
||||
sample = Host.collect()
|
||||
payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: sample}
|
||||
:ok = push_metric(socket, "metric:fast", payload)
|
||||
Process.send_after(self(), :collect_fast, socket.assigns.cfg.fast_seconds * 1000)
|
||||
{:ok, socket}
|
||||
end
|
||||
|
||||
@impl Slipstream
|
||||
def handle_disconnect(reason, socket) do
|
||||
Logger.warning("reporter: disconnected — #{inspect(reason)}; reconnecting")
|
||||
reconnect(socket)
|
||||
end
|
||||
|
||||
@impl Slipstream
|
||||
def handle_topic_close(topic, reason, socket) do
|
||||
Logger.warning("reporter: topic #{topic} closed: #{inspect(reason)}; rejoining")
|
||||
rejoin(socket, topic)
|
||||
end
|
||||
|
||||
defp push_metric(socket, event, payload) do
|
||||
case push(socket, socket.assigns.topic, event, payload) do
|
||||
{:ok, _ref} ->
|
||||
:ok
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.warning("reporter: push failed: #{inspect(reason)}")
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
Loading…
Add table
Add a link
Reference in a new issue