From 687fc170828814d2e44a3d5d464ccc14a7006e2c Mon Sep 17 00:00:00 2001 From: Carsten Date: Tue, 21 Apr 2026 22:27:20 +0200 Subject: [PATCH] feat(server): metrics schema + context with record/latest/prune --- server/lib/server/metrics.ex | 48 ++++++++++++++ server/lib/server/schema/metric.ex | 22 +++++++ .../20260421202512_create_metrics.exs | 17 +++++ server/test/server/metrics_test.exs | 66 +++++++++++++++++++ 4 files changed, 153 insertions(+) create mode 100644 server/lib/server/metrics.ex create mode 100644 server/lib/server/schema/metric.ex create mode 100644 server/priv/repo/migrations/20260421202512_create_metrics.exs create mode 100644 server/test/server/metrics_test.exs diff --git a/server/lib/server/metrics.ex b/server/lib/server/metrics.ex new file mode 100644 index 0000000..928fb1b --- /dev/null +++ b/server/lib/server/metrics.ex @@ -0,0 +1,48 @@ +defmodule Server.Metrics do + @moduledoc "Metric sample storage and retrieval." + + import Ecto.Query + + alias Server.Repo + alias Server.Schema.Metric + + @spec record_sample(integer(), String.t(), DateTime.t(), map()) :: + {:ok, Metric.t()} | {:error, Ecto.Changeset.t()} + def record_sample(host_id, interval_type, collected_at, payload) do + changeset = + Metric.changeset(%Metric{}, %{ + host_id: host_id, + interval_type: interval_type, + collected_at: collected_at, + payload: payload + }) + + with %Ecto.Changeset{valid?: true} = cs <- changeset, + true <- host_exists?(host_id) || {:host_missing, cs} do + Repo.insert(cs) + else + %Ecto.Changeset{} = cs -> {:error, cs} + {:host_missing, cs} -> {:error, Ecto.Changeset.add_error(cs, :host, "does not exist")} + end + end + + defp host_exists?(host_id) do + Repo.exists?(from(h in Server.Schema.Host, where: h.id == ^host_id)) + end + + @spec latest_sample(integer(), String.t()) :: Metric.t() | nil + def latest_sample(host_id, interval_type) do + from(m in Metric, + where: m.host_id == ^host_id and m.interval_type == ^interval_type, + order_by: [desc: m.collected_at], + limit: 1 + ) + |> Repo.one() + end + + @spec delete_older_than(DateTime.t()) :: {non_neg_integer(), nil} + def delete_older_than(%DateTime{} = cutoff) do + from(m in Metric, where: m.collected_at < ^cutoff) + |> Repo.delete_all() + end +end diff --git a/server/lib/server/schema/metric.ex b/server/lib/server/schema/metric.ex new file mode 100644 index 0000000..5466a06 --- /dev/null +++ b/server/lib/server/schema/metric.ex @@ -0,0 +1,22 @@ +defmodule Server.Schema.Metric do + use Ecto.Schema + import Ecto.Changeset + + @intervals ~w(fast medium slow) + + schema "metrics" do + belongs_to :host, Server.Schema.Host + field :collected_at, :utc_datetime_usec + field :interval_type, :string + field :payload, :map + + timestamps(type: :utc_datetime_usec, updated_at: false) + end + + def changeset(metric, attrs) do + metric + |> cast(attrs, [:host_id, :collected_at, :interval_type, :payload]) + |> validate_required([:host_id, :collected_at, :interval_type, :payload]) + |> validate_inclusion(:interval_type, @intervals) + end +end diff --git a/server/priv/repo/migrations/20260421202512_create_metrics.exs b/server/priv/repo/migrations/20260421202512_create_metrics.exs new file mode 100644 index 0000000..4081951 --- /dev/null +++ b/server/priv/repo/migrations/20260421202512_create_metrics.exs @@ -0,0 +1,17 @@ +defmodule Server.Repo.Migrations.CreateMetrics do + use Ecto.Migration + + def change do + create table(:metrics) do + add :host_id, references(:hosts, on_delete: :delete_all), null: false + add :collected_at, :utc_datetime_usec, null: false + add :interval_type, :string, null: false + add :payload, :string, null: false + + timestamps(type: :utc_datetime_usec, updated_at: false) + end + + create index(:metrics, [:host_id, :collected_at]) + create index(:metrics, [:collected_at]) + end +end diff --git a/server/test/server/metrics_test.exs b/server/test/server/metrics_test.exs new file mode 100644 index 0000000..de6de14 --- /dev/null +++ b/server/test/server/metrics_test.exs @@ -0,0 +1,66 @@ +defmodule Server.MetricsTest do + use Server.DataCase, async: true + + alias Server.Metrics + alias Server.Hosts + + setup do + {:ok, {host, _token}} = Hosts.create_host("pve-01") + %{host: host} + end + + describe "record_sample/4" do + test "inserts a metric row with the given payload", %{host: host} do + ts = DateTime.utc_now() + payload = %{"host" => %{"load1" => 0.5}} + + assert {:ok, metric} = Metrics.record_sample(host.id, "fast", ts, payload) + assert metric.host_id == host.id + assert metric.interval_type == "fast" + assert metric.payload == payload + assert metric.collected_at == ts + end + + test "rejects unknown interval_type", %{host: host} do + ts = DateTime.utc_now() + assert {:error, cs} = Metrics.record_sample(host.id, "nope", ts, %{}) + assert %{interval_type: ["is invalid"]} = errors_on(cs) + end + + test "rejects unknown host_id" do + ts = DateTime.utc_now() + assert {:error, cs} = Metrics.record_sample(999_999, "fast", ts, %{}) + assert %{host: ["does not exist"]} = errors_on(cs) + end + end + + describe "latest_sample/2" do + test "returns most recent sample for a host and interval", %{host: host} do + {:ok, _} = Metrics.record_sample(host.id, "fast", dt(-60), %{"v" => 1}) + {:ok, _} = Metrics.record_sample(host.id, "fast", dt(-30), %{"v" => 2}) + {:ok, _} = Metrics.record_sample(host.id, "fast", dt(-10), %{"v" => 3}) + + assert %{payload: %{"v" => 3}} = Metrics.latest_sample(host.id, "fast") + end + + test "returns nil when no samples exist", %{host: host} do + assert Metrics.latest_sample(host.id, "fast") == nil + end + end + + describe "delete_older_than/1" do + test "deletes samples with collected_at before the cutoff", %{host: host} do + {:ok, _} = Metrics.record_sample(host.id, "fast", dt(-3600 * 50), %{"v" => "old"}) + {:ok, keep} = Metrics.record_sample(host.id, "fast", dt(-60), %{"v" => "fresh"}) + + cutoff = DateTime.add(DateTime.utc_now(), -48 * 3600, :second) + assert {1, nil} = Metrics.delete_older_than(cutoff) + + remaining = Server.Repo.all(Server.Schema.Metric) + assert length(remaining) == 1 + assert hd(remaining).id == keep.id + end + end + + defp dt(offset_seconds), do: DateTime.add(DateTime.utc_now(), offset_seconds, :second) +end