feat(server): channel persists fast/medium/slow samples to metrics table
This commit is contained in:
parent
687fc17082
commit
751e035579
2 changed files with 76 additions and 15 deletions
|
|
@ -32,20 +32,35 @@ defmodule ServerWeb.HostChannel do
|
||||||
def join(_topic, _params, _socket), do: {:error, %{reason: "bad_topic"}}
|
def join(_topic, _params, _socket), do: {:error, %{reason: "bad_topic"}}
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_in("metric:fast", payload, socket) do
|
def handle_in("metric:" <> kind, payload, socket) when kind in ~w(fast medium slow) do
|
||||||
Logger.info("metric:fast host=#{socket.assigns.host_name} data=#{inspect(payload["data"])}")
|
with {:ok, collected_at} <- parse_collected_at(payload),
|
||||||
{:reply, :ok, socket}
|
{:ok, data} <- parse_data(payload),
|
||||||
|
{:ok, _} <-
|
||||||
|
Server.Metrics.record_sample(socket.assigns.host_id, kind, collected_at, data) do
|
||||||
|
Logger.debug("stored #{kind} sample host=#{socket.assigns.host_name}")
|
||||||
|
{:reply, :ok, socket}
|
||||||
|
else
|
||||||
|
{:error, reason} when is_binary(reason) ->
|
||||||
|
Logger.warning("metric:#{kind} rejected host=#{socket.assigns.host_name} reason=#{reason}")
|
||||||
|
{:reply, {:error, %{reason: reason}}, socket}
|
||||||
|
|
||||||
|
{:error, %Ecto.Changeset{} = cs} ->
|
||||||
|
Logger.warning("metric:#{kind} changeset failed: #{inspect(cs.errors)}")
|
||||||
|
{:reply, {:error, %{reason: "invalid_payload"}}, socket}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_in("metric:medium", payload, socket) do
|
defp parse_collected_at(%{"collected_at" => ts}) when is_binary(ts) do
|
||||||
Logger.info("metric:medium host=#{socket.assigns.host_name} payload=#{inspect(payload)}")
|
case DateTime.from_iso8601(ts) do
|
||||||
{:reply, :ok, socket}
|
{:ok, dt, _} -> {:ok, dt}
|
||||||
|
_ -> {:error, "invalid_collected_at"}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_in("metric:slow", payload, socket) do
|
defp parse_collected_at(_), do: {:error, "missing_collected_at"}
|
||||||
Logger.info("metric:slow host=#{socket.assigns.host_name} payload=#{inspect(payload)}")
|
|
||||||
{:reply, :ok, socket}
|
defp parse_data(%{"data" => data}) when is_map(data), do: {:ok, data}
|
||||||
end
|
defp parse_data(_), do: {:error, "missing_data"}
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def terminate(_reason, socket) do
|
def terminate(_reason, socket) do
|
||||||
|
|
|
||||||
|
|
@ -48,8 +48,8 @@ defmodule ServerWeb.HostChannelTest do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "metric:fast event" do
|
describe "metric events persist to DB" do
|
||||||
setup %{token: token} do
|
setup %{token: token, host: host} do
|
||||||
{:ok, socket} = connect(AgentSocket, %{})
|
{:ok, socket} = connect(AgentSocket, %{})
|
||||||
|
|
||||||
{:ok, _reply, joined} =
|
{:ok, _reply, joined} =
|
||||||
|
|
@ -58,17 +58,63 @@ defmodule ServerWeb.HostChannelTest do
|
||||||
"agent_version" => "0.1.0"
|
"agent_version" => "0.1.0"
|
||||||
})
|
})
|
||||||
|
|
||||||
%{socket: joined}
|
%{socket: joined, host: host}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "accepts metric payload and replies :ok", %{socket: socket} do
|
test "metric:fast is stored with interval=fast", %{socket: socket, host: host} do
|
||||||
|
ts = "2026-04-21T12:00:00.123456Z"
|
||||||
|
|
||||||
ref =
|
ref =
|
||||||
push(socket, "metric:fast", %{
|
push(socket, "metric:fast", %{
|
||||||
"collected_at" => "2026-04-21T12:00:00Z",
|
"collected_at" => ts,
|
||||||
"data" => %{"cpu_percent" => 12.3, "load1" => 0.2}
|
"data" => %{"cpu_percent" => 12.3, "load1" => 0.2}
|
||||||
})
|
})
|
||||||
|
|
||||||
assert_reply ref, :ok
|
assert_reply ref, :ok
|
||||||
|
|
||||||
|
sample = Server.Metrics.latest_sample(host.id, "fast")
|
||||||
|
assert sample != nil
|
||||||
|
assert sample.payload == %{"cpu_percent" => 12.3, "load1" => 0.2}
|
||||||
|
{:ok, expected, _} = DateTime.from_iso8601(ts)
|
||||||
|
assert DateTime.compare(sample.collected_at, expected) == :eq
|
||||||
|
end
|
||||||
|
|
||||||
|
test "metric:medium is stored with interval=medium", %{socket: socket, host: host} do
|
||||||
|
ref =
|
||||||
|
push(socket, "metric:medium", %{
|
||||||
|
"collected_at" => "2026-04-21T12:05:00Z",
|
||||||
|
"data" => %{"vms_detail" => []}
|
||||||
|
})
|
||||||
|
|
||||||
|
assert_reply ref, :ok
|
||||||
|
|
||||||
|
sample = Server.Metrics.latest_sample(host.id, "medium")
|
||||||
|
assert sample != nil
|
||||||
|
assert sample.payload == %{"vms_detail" => []}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "metric:slow is stored with interval=slow", %{socket: socket, host: host} do
|
||||||
|
ref =
|
||||||
|
push(socket, "metric:slow", %{
|
||||||
|
"collected_at" => "2026-04-21T12:30:00Z",
|
||||||
|
"data" => %{"system_info" => %{"pveversion" => "8.3.0"}}
|
||||||
|
})
|
||||||
|
|
||||||
|
assert_reply ref, :ok
|
||||||
|
|
||||||
|
sample = Server.Metrics.latest_sample(host.id, "slow")
|
||||||
|
assert sample != nil
|
||||||
|
assert sample.payload == %{"system_info" => %{"pveversion" => "8.3.0"}}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "replies :error when collected_at is missing", %{socket: socket} do
|
||||||
|
ref = push(socket, "metric:fast", %{"data" => %{}})
|
||||||
|
assert_reply ref, :error, %{reason: "missing_collected_at"}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "replies :error when data is missing", %{socket: socket} do
|
||||||
|
ref = push(socket, "metric:fast", %{"collected_at" => "2026-04-21T12:00:00Z"})
|
||||||
|
assert_reply ref, :error, %{reason: "missing_data"}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue