proxMon/agent/lib/proxmox_agent/reporter.ex

114 lines
3.1 KiB
Elixir

defmodule ProxmoxAgent.Reporter do
@moduledoc """
Maintains a persistent Phoenix Channel connection to the server, joins
`host:<host_id>`, and pushes metric samples on fast/medium/slow intervals.
Payload contract:
metric:fast => %{host, zfs_pools, storage, vms_runtime}
metric:medium => %{zfs_datasets, vms_detail}
metric:slow => %{system_info}
"""
use Slipstream, restart: :permanent
require Logger
alias ProxmoxAgent.Collectors.{Host, Storage, SystemInfo, Vms, Zfs}
def start_link(%ProxmoxAgent.Config{} = cfg) do
Slipstream.start_link(__MODULE__, cfg, name: __MODULE__)
end
@impl Slipstream
def init(cfg) do
socket =
new_socket()
|> assign(:cfg, cfg)
|> assign(:topic, "host:" <> cfg.host_id)
|> connect!(uri: cfg.server_url)
{:ok, socket}
end
@impl Slipstream
def handle_connect(socket) do
topic = socket.assigns.topic
cfg = socket.assigns.cfg
payload = %{"token" => cfg.token, "agent_version" => ProxmoxAgent.version()}
Logger.info("reporter: connected, joining #{topic}")
{:ok, join(socket, topic, payload)}
end
@impl Slipstream
def handle_join(topic, _reply, socket) do
Logger.info("reporter: joined #{topic}")
send(self(), :collect_fast)
send(self(), :collect_medium)
send(self(), :collect_slow)
{:ok, socket}
end
@impl Slipstream
def handle_info(:collect_fast, socket) do
cfg = socket.assigns.cfg
data = %{
host: Host.collect(),
zfs_pools: Zfs.collect_pools(),
storage: Storage.collect(node: cfg.host_id),
vms_runtime: Vms.collect_runtime(node: cfg.host_id)
}
push_metric(socket, "metric:fast", data)
Process.send_after(self(), :collect_fast, cfg.fast_seconds * 1000)
{:noreply, socket}
end
def handle_info(:collect_medium, socket) do
cfg = socket.assigns.cfg
data = %{
zfs_datasets: Zfs.collect_datasets(),
vms_detail: Vms.collect_detail(node: cfg.host_id)
}
push_metric(socket, "metric:medium", data)
Process.send_after(self(), :collect_medium, cfg.medium_seconds * 1000)
{:noreply, socket}
end
def handle_info(:collect_slow, socket) do
cfg = socket.assigns.cfg
data = %{system_info: SystemInfo.collect()}
push_metric(socket, "metric:slow", data)
Process.send_after(self(), :collect_slow, cfg.slow_seconds * 1000)
{:noreply, socket}
end
@impl Slipstream
def handle_disconnect(reason, socket) do
Logger.warning("reporter: disconnected — #{inspect(reason)}; reconnecting")
reconnect(socket)
end
@impl Slipstream
def handle_topic_close(topic, reason, socket) do
Logger.warning("reporter: topic #{topic} closed: #{inspect(reason)}; rejoining")
rejoin(socket, topic)
end
defp push_metric(socket, event, data) do
payload = %{collected_at: DateTime.utc_now() |> DateTime.to_iso8601(), data: data}
case push(socket, socket.assigns.topic, event, payload) do
{:ok, _ref} ->
:ok
{:error, reason} ->
Logger.warning("reporter: push #{event} failed: #{inspect(reason)}")
:ok
end
end
end