From 61595e02936bec89caabd094d9aed947d8eeca1c Mon Sep 17 00:00:00 2001 From: Carsten Date: Tue, 21 Apr 2026 22:04:31 +0200 Subject: [PATCH] feat(server): host channel with token auth and metric events --- .../lib/server_web/channels/host_channel.ex | 64 +++++++++++++ .../server_web/channels/host_channel_test.exs | 94 +++++++++++++++++++ server/test/support/channel_case.ex | 24 +++++ 3 files changed, 182 insertions(+) create mode 100644 server/lib/server_web/channels/host_channel.ex create mode 100644 server/test/server_web/channels/host_channel_test.exs create mode 100644 server/test/support/channel_case.ex diff --git a/server/lib/server_web/channels/host_channel.ex b/server/lib/server_web/channels/host_channel.ex new file mode 100644 index 0000000..1103736 --- /dev/null +++ b/server/lib/server_web/channels/host_channel.ex @@ -0,0 +1,64 @@ +defmodule ServerWeb.HostChannel do + use ServerWeb, :channel + require Logger + + alias Server.Hosts + + @impl true + def join("host:" <> name, params, socket) when name != "" do + token = Map.get(params, "token", "") + agent_version = Map.get(params, "agent_version") + + case Hosts.authenticate(name, token) do + {:ok, host} -> + {:ok, _} = Hosts.mark_online(host, agent_version) + Logger.info("agent joined host:#{name}") + + socket = + socket + |> assign(:host_id, host.id) + |> assign(:host_name, name) + + {:ok, socket} + + {:error, :unknown_host} -> + {:error, %{reason: "unknown_host"}} + + {:error, :invalid_token} -> + {:error, %{reason: "invalid_token"}} + end + end + + def join(_topic, _params, _socket), do: {:error, %{reason: "bad_topic"}} + + @impl true + def handle_in("metric:fast", payload, socket) do + Logger.info("metric:fast host=#{socket.assigns.host_name} data=#{inspect(payload["data"])}") + {:reply, :ok, socket} + end + + def handle_in("metric:medium", payload, socket) do + Logger.info("metric:medium host=#{socket.assigns.host_name} payload=#{inspect(payload)}") + {:reply, :ok, socket} + end + + def handle_in("metric:slow", payload, socket) do + Logger.info("metric:slow host=#{socket.assigns.host_name} payload=#{inspect(payload)}") + {:reply, :ok, socket} + end + + @impl true + def terminate(_reason, socket) do + case socket.assigns[:host_id] do + nil -> + :ok + + id -> + with host when not is_nil(host) <- Server.Repo.get(Server.Schema.Host, id) do + Hosts.mark_offline(host) + end + + :ok + end + end +end diff --git a/server/test/server_web/channels/host_channel_test.exs b/server/test/server_web/channels/host_channel_test.exs new file mode 100644 index 0000000..5450773 --- /dev/null +++ b/server/test/server_web/channels/host_channel_test.exs @@ -0,0 +1,94 @@ +defmodule ServerWeb.HostChannelTest do + use ServerWeb.ChannelCase, async: false + + alias Server.Hosts + alias ServerWeb.AgentSocket + + setup do + {:ok, {host, token}} = Hosts.create_host("pve-01") + %{host: host, token: token} + end + + describe "join" do + test "succeeds with valid token and marks host online", %{host: host, token: token} do + {:ok, socket} = connect(AgentSocket, %{}) + + assert {:ok, _reply, socket} = + subscribe_and_join(socket, "host:pve-01", %{ + "token" => token, + "agent_version" => "0.1.0" + }) + + assert socket.assigns.host_id == host.id + + reloaded = Server.Repo.reload!(host) + assert reloaded.status == "online" + assert reloaded.agent_version == "0.1.0" + assert reloaded.last_seen_at != nil + end + + test "rejects invalid token" do + {:ok, socket} = connect(AgentSocket, %{}) + + assert {:error, %{reason: "invalid_token"}} = + subscribe_and_join(socket, "host:pve-01", %{ + "token" => "garbage", + "agent_version" => "0.1.0" + }) + end + + test "rejects unknown host name" do + {:ok, socket} = connect(AgentSocket, %{}) + + assert {:error, %{reason: "unknown_host"}} = + subscribe_and_join(socket, "host:nope", %{ + "token" => "x", + "agent_version" => "0.1.0" + }) + end + end + + describe "metric:fast event" do + setup %{token: token} do + {:ok, socket} = connect(AgentSocket, %{}) + + {:ok, _reply, joined} = + subscribe_and_join(socket, "host:pve-01", %{ + "token" => token, + "agent_version" => "0.1.0" + }) + + %{socket: joined} + end + + test "accepts metric payload and replies :ok", %{socket: socket} do + ref = + push(socket, "metric:fast", %{ + "collected_at" => "2026-04-21T12:00:00Z", + "data" => %{"cpu_percent" => 12.3, "load1" => 0.2} + }) + + assert_reply ref, :ok + end + end + + describe "terminate" do + test "marks host offline when channel process exits", %{host: host, token: token} do + {:ok, socket} = connect(AgentSocket, %{}) + + {:ok, _, joined} = + subscribe_and_join(socket, "host:pve-01", %{ + "token" => token, + "agent_version" => "0.1.0" + }) + + Process.unlink(joined.channel_pid) + ref = Process.monitor(joined.channel_pid) + close(joined) + assert_receive {:DOWN, ^ref, :process, _, _}, 1_000 + + reloaded = Server.Repo.reload!(host) + assert reloaded.status == "offline" + end + end +end diff --git a/server/test/support/channel_case.ex b/server/test/support/channel_case.ex new file mode 100644 index 0000000..64deece --- /dev/null +++ b/server/test/support/channel_case.ex @@ -0,0 +1,24 @@ +defmodule ServerWeb.ChannelCase do + @moduledoc """ + Test helpers for Phoenix Channels. + + Imports conveniences for testing channels and the sandbox-based Repo so tests + run concurrently and are isolated. + """ + + use ExUnit.CaseTemplate + + using do + quote do + import Phoenix.ChannelTest + import ServerWeb.ChannelCase + + @endpoint ServerWeb.Endpoint + end + end + + setup tags do + Server.DataCase.setup_sandbox(tags) + :ok + end +end