Broadcasting model updates hooking into Ecto.Repo
Recently in a project I'm currently working on, we've had the need to orchestrate our code such that whenever we insert/update/delete entities, we want to broadcast these events to (potentially numerous) listeners.
One common though outdated use-case for this kind of thing would be capturing metrics when performing database operations; though with the advent of :telemetry
integration in Ecto
this is no longer an issue.
The simple solution
Our particular use-case involved notifying interested external applications of changes in data they subscribed to. We decided to jump right in and build up a simple solution iteratively.
In all of our standard Elixir contexts, we simply migrated from:
defmodule MyApp.Accounts do
def create(attrs) do
%MyApp.Account{}
|> MyApp.Account.changeset(attrs)
|> MyApp.Repo.insert()
end
end
to the following:
defmodule MyApp.Accounts do
def create(attrs) do
%MyApp.Account{}
|> MyApp.Account.changeset(attrs)
|> MyApp.Repo.insert()
|> case do
{:ok, account} ->
:ok = broadcast(:create, :account, account)
{:ok, account}
error ->
error
end
end
end
This obviously would have worked but it was pretty verbose and ended up creating a lot of boilerplate. If we ever added new contexts or wanted to modify how exactly we handled these cases we couldn't easily do it; we'd have to go in and be really careful to make sure everything looked okay—what we would have preferred was a more transparent solution and thus we ended up spiking out an approach.
Spiking a more transparent approach
Essentially, all we want to do is hook into database operations. I mentioned a little earlier that some previous use cases for wanting to orchestrate Ecto
was logging, and that this use-case was no longer necessary because Ecto
had be instrumented with :telemetry
. Well what if we could hook into :telemetry
?
I've not deep-dived into how exactly :telemetry
is implemented but glancing at the docs it looks quite similar at a high level to the :gen_event
behaviour where we can define events that happen (in this case, Ecto
defines [:my_app, :repo, :query]
) that we can register event handlers to.
Jumping right in, if we create write some code as follows:
defmodule MyApp.RepoBroadcaster do
@handler_id __MODULE__
def init() do
:ok = :telemetry.attach(@handler_id, [:my_app, :repo, :query], &MyApp.RepoBroadcaster.broadcast_event/4, %{})
end
def broadcast_event([:my_app, :repo, :query], _measurements, _metadata, _config) do
binding()
|> Map.new()
|> process_data()
|> broadcast()
end
defp process_data(...) do
...
end
end
Then the function broadcast_event/4
gets run every time something touches the database! Nice!.. kind of... When we ended up spiking this out, everything worked to a point but the downside is that the data returned by the :telemetry.attach
is all low level Postgrex
(?) stuff; it's instrumented at low enough of a level that we spent a few hours writing code to parse the raw SQL queries/responses (and handling cases where we wouldn't get returned data in certain cases...) into a standard shape to broadcast in a consistent way.
When we looked back at what we had, we had developed a transparent way of hooking into Ecto as we set out to, but we ended up writing a lot of nasty, ugly and very possibly brittle code. We set out to look for a better way, again.
A return to the simple approach
The trade off between the transparent hook into Ecto vs essentially wrapping all of our context functions was one of code duplication / transparency.
We don't want to worry about the broadcasting of events at the context level but at the same time wrapping the return values of our contexts was pretty much exactly the functionality we wanted.
Looking around, there was a promising library which let us decorate functions with custom functionality. We could simply define some decoration functions and in all of our contexts do:
defmodule MyApp.Accounts do
use BroadcastDecorator
@decorate broadcast()
def create(attrs) do
%MyApp.Account{}
|> MyApp.Account.changeset(attrs)
|> MyApp.Repo.insert()
end
end
This is almost the approach we went for, until I realised that we didn't even need this, the solution was going to be much simpler since decoration of functions is pretty much already provided by Elixir in the form of defoverridable
Essentially, when we mark a function as overridable with: defoverridable
we can redefine that function however we want, but retaining access to the original implementation; the reason I didn't want to re-implement MyApp.Repo
's insert/3
, update/3
etc functions was because to replace them we'd need to essentially reimplement what that code was originally doing (or create a new module MyApp.RepoWithBroadcast
which would wrap those functions; with defoverridable
we can simply do:
defmodule MyApp.Repo do
use Ecto.Repo,
otp_app: :my_app,
adapter: Ecto.Adapters.Postgres
defoverridable delete: 2,
delete!: 2,
insert: 2,
insert!: 2,
update: 2,
update!: 2,
update_all: 3
def delete(changeset, opts) do
super(changeset, opts)
|> broadcast(:delete)
end
def delete!(changeset, opts) do
super(changeset, opts)
|> broadcast(:delete)
end
def insert(query, opts) do
super(query, opts)
|> broadcast(:insert)
end
def insert!(query, opts) do
super(query, opts)
|> broadcast(:insert)
end
def update(query, opts) do
super(query, opts)
|> broadcast(:update)
end
def update!(query, opts) do
super(query, opts)
|> broadcast(:update)
end
def update_all(query, updates, opts) do
super(query, updates, opts)
|> broadcast(:update)
end
def broadcast(result) do
...do some broadcasting...
result
end
end
And it pretty much just works defoverridable
more, we're kind of abusing it for this since it seems to be more tailored to letting people define behaviour callbacks but hey, whatever works