feat(server): metrics schema + context with record/latest/prune
This commit is contained in:
parent
116f1ada14
commit
687fc17082
4 changed files with 153 additions and 0 deletions
48
server/lib/server/metrics.ex
Normal file
48
server/lib/server/metrics.ex
Normal file
|
|
@ -0,0 +1,48 @@
|
||||||
|
defmodule Server.Metrics do
|
||||||
|
@moduledoc "Metric sample storage and retrieval."
|
||||||
|
|
||||||
|
import Ecto.Query
|
||||||
|
|
||||||
|
alias Server.Repo
|
||||||
|
alias Server.Schema.Metric
|
||||||
|
|
||||||
|
@spec record_sample(integer(), String.t(), DateTime.t(), map()) ::
|
||||||
|
{:ok, Metric.t()} | {:error, Ecto.Changeset.t()}
|
||||||
|
def record_sample(host_id, interval_type, collected_at, payload) do
|
||||||
|
changeset =
|
||||||
|
Metric.changeset(%Metric{}, %{
|
||||||
|
host_id: host_id,
|
||||||
|
interval_type: interval_type,
|
||||||
|
collected_at: collected_at,
|
||||||
|
payload: payload
|
||||||
|
})
|
||||||
|
|
||||||
|
with %Ecto.Changeset{valid?: true} = cs <- changeset,
|
||||||
|
true <- host_exists?(host_id) || {:host_missing, cs} do
|
||||||
|
Repo.insert(cs)
|
||||||
|
else
|
||||||
|
%Ecto.Changeset{} = cs -> {:error, cs}
|
||||||
|
{:host_missing, cs} -> {:error, Ecto.Changeset.add_error(cs, :host, "does not exist")}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp host_exists?(host_id) do
|
||||||
|
Repo.exists?(from(h in Server.Schema.Host, where: h.id == ^host_id))
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec latest_sample(integer(), String.t()) :: Metric.t() | nil
|
||||||
|
def latest_sample(host_id, interval_type) do
|
||||||
|
from(m in Metric,
|
||||||
|
where: m.host_id == ^host_id and m.interval_type == ^interval_type,
|
||||||
|
order_by: [desc: m.collected_at],
|
||||||
|
limit: 1
|
||||||
|
)
|
||||||
|
|> Repo.one()
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec delete_older_than(DateTime.t()) :: {non_neg_integer(), nil}
|
||||||
|
def delete_older_than(%DateTime{} = cutoff) do
|
||||||
|
from(m in Metric, where: m.collected_at < ^cutoff)
|
||||||
|
|> Repo.delete_all()
|
||||||
|
end
|
||||||
|
end
|
||||||
22
server/lib/server/schema/metric.ex
Normal file
22
server/lib/server/schema/metric.ex
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
defmodule Server.Schema.Metric do
|
||||||
|
use Ecto.Schema
|
||||||
|
import Ecto.Changeset
|
||||||
|
|
||||||
|
@intervals ~w(fast medium slow)
|
||||||
|
|
||||||
|
schema "metrics" do
|
||||||
|
belongs_to :host, Server.Schema.Host
|
||||||
|
field :collected_at, :utc_datetime_usec
|
||||||
|
field :interval_type, :string
|
||||||
|
field :payload, :map
|
||||||
|
|
||||||
|
timestamps(type: :utc_datetime_usec, updated_at: false)
|
||||||
|
end
|
||||||
|
|
||||||
|
def changeset(metric, attrs) do
|
||||||
|
metric
|
||||||
|
|> cast(attrs, [:host_id, :collected_at, :interval_type, :payload])
|
||||||
|
|> validate_required([:host_id, :collected_at, :interval_type, :payload])
|
||||||
|
|> validate_inclusion(:interval_type, @intervals)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
defmodule Server.Repo.Migrations.CreateMetrics do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def change do
|
||||||
|
create table(:metrics) do
|
||||||
|
add :host_id, references(:hosts, on_delete: :delete_all), null: false
|
||||||
|
add :collected_at, :utc_datetime_usec, null: false
|
||||||
|
add :interval_type, :string, null: false
|
||||||
|
add :payload, :string, null: false
|
||||||
|
|
||||||
|
timestamps(type: :utc_datetime_usec, updated_at: false)
|
||||||
|
end
|
||||||
|
|
||||||
|
create index(:metrics, [:host_id, :collected_at])
|
||||||
|
create index(:metrics, [:collected_at])
|
||||||
|
end
|
||||||
|
end
|
||||||
66
server/test/server/metrics_test.exs
Normal file
66
server/test/server/metrics_test.exs
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
defmodule Server.MetricsTest do
|
||||||
|
use Server.DataCase, async: true
|
||||||
|
|
||||||
|
alias Server.Metrics
|
||||||
|
alias Server.Hosts
|
||||||
|
|
||||||
|
setup do
|
||||||
|
{:ok, {host, _token}} = Hosts.create_host("pve-01")
|
||||||
|
%{host: host}
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "record_sample/4" do
|
||||||
|
test "inserts a metric row with the given payload", %{host: host} do
|
||||||
|
ts = DateTime.utc_now()
|
||||||
|
payload = %{"host" => %{"load1" => 0.5}}
|
||||||
|
|
||||||
|
assert {:ok, metric} = Metrics.record_sample(host.id, "fast", ts, payload)
|
||||||
|
assert metric.host_id == host.id
|
||||||
|
assert metric.interval_type == "fast"
|
||||||
|
assert metric.payload == payload
|
||||||
|
assert metric.collected_at == ts
|
||||||
|
end
|
||||||
|
|
||||||
|
test "rejects unknown interval_type", %{host: host} do
|
||||||
|
ts = DateTime.utc_now()
|
||||||
|
assert {:error, cs} = Metrics.record_sample(host.id, "nope", ts, %{})
|
||||||
|
assert %{interval_type: ["is invalid"]} = errors_on(cs)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "rejects unknown host_id" do
|
||||||
|
ts = DateTime.utc_now()
|
||||||
|
assert {:error, cs} = Metrics.record_sample(999_999, "fast", ts, %{})
|
||||||
|
assert %{host: ["does not exist"]} = errors_on(cs)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "latest_sample/2" do
|
||||||
|
test "returns most recent sample for a host and interval", %{host: host} do
|
||||||
|
{:ok, _} = Metrics.record_sample(host.id, "fast", dt(-60), %{"v" => 1})
|
||||||
|
{:ok, _} = Metrics.record_sample(host.id, "fast", dt(-30), %{"v" => 2})
|
||||||
|
{:ok, _} = Metrics.record_sample(host.id, "fast", dt(-10), %{"v" => 3})
|
||||||
|
|
||||||
|
assert %{payload: %{"v" => 3}} = Metrics.latest_sample(host.id, "fast")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "returns nil when no samples exist", %{host: host} do
|
||||||
|
assert Metrics.latest_sample(host.id, "fast") == nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "delete_older_than/1" do
|
||||||
|
test "deletes samples with collected_at before the cutoff", %{host: host} do
|
||||||
|
{:ok, _} = Metrics.record_sample(host.id, "fast", dt(-3600 * 50), %{"v" => "old"})
|
||||||
|
{:ok, keep} = Metrics.record_sample(host.id, "fast", dt(-60), %{"v" => "fresh"})
|
||||||
|
|
||||||
|
cutoff = DateTime.add(DateTime.utc_now(), -48 * 3600, :second)
|
||||||
|
assert {1, nil} = Metrics.delete_older_than(cutoff)
|
||||||
|
|
||||||
|
remaining = Server.Repo.all(Server.Schema.Metric)
|
||||||
|
assert length(remaining) == 1
|
||||||
|
assert hd(remaining).id == keep.id
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp dt(offset_seconds), do: DateTime.add(DateTime.utc_now(), offset_seconds, :second)
|
||||||
|
end
|
||||||
Loading…
Add table
Add a link
Reference in a new issue