defmodule ServerWeb.HostChannel do use ServerWeb, :channel require Logger alias Server.Hosts @impl true def join("host:" <> name, params, socket) when name != "" do token = Map.get(params, "token", "") agent_version = Map.get(params, "agent_version") case Hosts.authenticate(name, token) do {:ok, host} -> {:ok, _} = Hosts.mark_online(host, agent_version) Logger.info("agent joined host:#{name}") socket = socket |> assign(:host_id, host.id) |> assign(:host_name, name) {:ok, socket} {:error, :unknown_host} -> {:error, %{reason: "unknown_host"}} {:error, :invalid_token} -> {:error, %{reason: "invalid_token"}} end end def join(_topic, _params, _socket), do: {:error, %{reason: "bad_topic"}} @impl true def handle_in("metric:" <> kind, payload, socket) when kind in ~w(fast medium slow) do with {:ok, collected_at} <- parse_collected_at(payload), {:ok, data} <- parse_data(payload), {:ok, _} <- Server.Metrics.record_sample(socket.assigns.host_id, kind, collected_at, data) do Logger.debug("stored #{kind} sample host=#{socket.assigns.host_name}") {:reply, :ok, socket} else {:error, reason} when is_binary(reason) -> Logger.warning("metric:#{kind} rejected host=#{socket.assigns.host_name} reason=#{reason}") {:reply, {:error, %{reason: reason}}, socket} {:error, %Ecto.Changeset{} = cs} -> Logger.warning("metric:#{kind} changeset failed: #{inspect(cs.errors)}") {:reply, {:error, %{reason: "invalid_payload"}}, socket} end end defp parse_collected_at(%{"collected_at" => ts}) when is_binary(ts) do case DateTime.from_iso8601(ts) do {:ok, dt, _} -> {:ok, dt} _ -> {:error, "invalid_collected_at"} end end defp parse_collected_at(_), do: {:error, "missing_collected_at"} defp parse_data(%{"data" => data}) when is_map(data), do: {:ok, data} defp parse_data(_), do: {:error, "missing_data"} @impl true def terminate(_reason, socket) do case socket.assigns[:host_id] do nil -> :ok id -> with host when not is_nil(host) <- Server.Repo.get(Server.Schema.Host, id) do Hosts.mark_offline(host) end :ok end end end