feat(server): host channel with token auth and metric events
This commit is contained in:
parent
d9a52db4ea
commit
61595e0293
3 changed files with 182 additions and 0 deletions
64
server/lib/server_web/channels/host_channel.ex
Normal file
64
server/lib/server_web/channels/host_channel.ex
Normal file
|
|
@ -0,0 +1,64 @@
|
||||||
|
defmodule ServerWeb.HostChannel do
|
||||||
|
use ServerWeb, :channel
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Server.Hosts
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def join("host:" <> name, params, socket) when name != "" do
|
||||||
|
token = Map.get(params, "token", "")
|
||||||
|
agent_version = Map.get(params, "agent_version")
|
||||||
|
|
||||||
|
case Hosts.authenticate(name, token) do
|
||||||
|
{:ok, host} ->
|
||||||
|
{:ok, _} = Hosts.mark_online(host, agent_version)
|
||||||
|
Logger.info("agent joined host:#{name}")
|
||||||
|
|
||||||
|
socket =
|
||||||
|
socket
|
||||||
|
|> assign(:host_id, host.id)
|
||||||
|
|> assign(:host_name, name)
|
||||||
|
|
||||||
|
{:ok, socket}
|
||||||
|
|
||||||
|
{:error, :unknown_host} ->
|
||||||
|
{:error, %{reason: "unknown_host"}}
|
||||||
|
|
||||||
|
{:error, :invalid_token} ->
|
||||||
|
{:error, %{reason: "invalid_token"}}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def join(_topic, _params, _socket), do: {:error, %{reason: "bad_topic"}}
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_in("metric:fast", payload, socket) do
|
||||||
|
Logger.info("metric:fast host=#{socket.assigns.host_name} data=#{inspect(payload["data"])}")
|
||||||
|
{:reply, :ok, socket}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_in("metric:medium", payload, socket) do
|
||||||
|
Logger.info("metric:medium host=#{socket.assigns.host_name} payload=#{inspect(payload)}")
|
||||||
|
{:reply, :ok, socket}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_in("metric:slow", payload, socket) do
|
||||||
|
Logger.info("metric:slow host=#{socket.assigns.host_name} payload=#{inspect(payload)}")
|
||||||
|
{:reply, :ok, socket}
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def terminate(_reason, socket) do
|
||||||
|
case socket.assigns[:host_id] do
|
||||||
|
nil ->
|
||||||
|
:ok
|
||||||
|
|
||||||
|
id ->
|
||||||
|
with host when not is_nil(host) <- Server.Repo.get(Server.Schema.Host, id) do
|
||||||
|
Hosts.mark_offline(host)
|
||||||
|
end
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
94
server/test/server_web/channels/host_channel_test.exs
Normal file
94
server/test/server_web/channels/host_channel_test.exs
Normal file
|
|
@ -0,0 +1,94 @@
|
||||||
|
defmodule ServerWeb.HostChannelTest do
|
||||||
|
use ServerWeb.ChannelCase, async: false
|
||||||
|
|
||||||
|
alias Server.Hosts
|
||||||
|
alias ServerWeb.AgentSocket
|
||||||
|
|
||||||
|
setup do
|
||||||
|
{:ok, {host, token}} = Hosts.create_host("pve-01")
|
||||||
|
%{host: host, token: token}
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "join" do
|
||||||
|
test "succeeds with valid token and marks host online", %{host: host, token: token} do
|
||||||
|
{:ok, socket} = connect(AgentSocket, %{})
|
||||||
|
|
||||||
|
assert {:ok, _reply, socket} =
|
||||||
|
subscribe_and_join(socket, "host:pve-01", %{
|
||||||
|
"token" => token,
|
||||||
|
"agent_version" => "0.1.0"
|
||||||
|
})
|
||||||
|
|
||||||
|
assert socket.assigns.host_id == host.id
|
||||||
|
|
||||||
|
reloaded = Server.Repo.reload!(host)
|
||||||
|
assert reloaded.status == "online"
|
||||||
|
assert reloaded.agent_version == "0.1.0"
|
||||||
|
assert reloaded.last_seen_at != nil
|
||||||
|
end
|
||||||
|
|
||||||
|
test "rejects invalid token" do
|
||||||
|
{:ok, socket} = connect(AgentSocket, %{})
|
||||||
|
|
||||||
|
assert {:error, %{reason: "invalid_token"}} =
|
||||||
|
subscribe_and_join(socket, "host:pve-01", %{
|
||||||
|
"token" => "garbage",
|
||||||
|
"agent_version" => "0.1.0"
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
test "rejects unknown host name" do
|
||||||
|
{:ok, socket} = connect(AgentSocket, %{})
|
||||||
|
|
||||||
|
assert {:error, %{reason: "unknown_host"}} =
|
||||||
|
subscribe_and_join(socket, "host:nope", %{
|
||||||
|
"token" => "x",
|
||||||
|
"agent_version" => "0.1.0"
|
||||||
|
})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "metric:fast event" do
|
||||||
|
setup %{token: token} do
|
||||||
|
{:ok, socket} = connect(AgentSocket, %{})
|
||||||
|
|
||||||
|
{:ok, _reply, joined} =
|
||||||
|
subscribe_and_join(socket, "host:pve-01", %{
|
||||||
|
"token" => token,
|
||||||
|
"agent_version" => "0.1.0"
|
||||||
|
})
|
||||||
|
|
||||||
|
%{socket: joined}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "accepts metric payload and replies :ok", %{socket: socket} do
|
||||||
|
ref =
|
||||||
|
push(socket, "metric:fast", %{
|
||||||
|
"collected_at" => "2026-04-21T12:00:00Z",
|
||||||
|
"data" => %{"cpu_percent" => 12.3, "load1" => 0.2}
|
||||||
|
})
|
||||||
|
|
||||||
|
assert_reply ref, :ok
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "terminate" do
|
||||||
|
test "marks host offline when channel process exits", %{host: host, token: token} do
|
||||||
|
{:ok, socket} = connect(AgentSocket, %{})
|
||||||
|
|
||||||
|
{:ok, _, joined} =
|
||||||
|
subscribe_and_join(socket, "host:pve-01", %{
|
||||||
|
"token" => token,
|
||||||
|
"agent_version" => "0.1.0"
|
||||||
|
})
|
||||||
|
|
||||||
|
Process.unlink(joined.channel_pid)
|
||||||
|
ref = Process.monitor(joined.channel_pid)
|
||||||
|
close(joined)
|
||||||
|
assert_receive {:DOWN, ^ref, :process, _, _}, 1_000
|
||||||
|
|
||||||
|
reloaded = Server.Repo.reload!(host)
|
||||||
|
assert reloaded.status == "offline"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
24
server/test/support/channel_case.ex
Normal file
24
server/test/support/channel_case.ex
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
defmodule ServerWeb.ChannelCase do
|
||||||
|
@moduledoc """
|
||||||
|
Test helpers for Phoenix Channels.
|
||||||
|
|
||||||
|
Imports conveniences for testing channels and the sandbox-based Repo so tests
|
||||||
|
run concurrently and are isolated.
|
||||||
|
"""
|
||||||
|
|
||||||
|
use ExUnit.CaseTemplate
|
||||||
|
|
||||||
|
using do
|
||||||
|
quote do
|
||||||
|
import Phoenix.ChannelTest
|
||||||
|
import ServerWeb.ChannelCase
|
||||||
|
|
||||||
|
@endpoint ServerWeb.Endpoint
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
setup tags do
|
||||||
|
Server.DataCase.setup_sandbox(tags)
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
Add table
Add a link
Reference in a new issue