Easy Ergonomic Telemetry in Elixir with Sibyl
Feb 02, 2023
10 min. read
Elixir
Project
Refining Ecto Query Composition
Nov 08, 2022
13 min. read
Elixir
Absinthe
Ecto
Compositional Elixir—Ecto Query Patterns
Jun 10, 2021
18 min. read
Elixir
Ecto
Stuff I use
May 28, 2021
13 min. read
General
A minimal Nix development environment on WSL
Apr 04, 2021
13 min. read
WSL
Nix/NixOS
Elixir pet peeves—Ecto virtual fields
Dec 15, 2020
5 min. read
Elixir
Ecto
My usage of Nix, and Lorri + Direnv
Nov 26, 2020
5 min. read
Nix/NixOS
Build you a `:telemetry` for such learn!
Aug 26, 2020
19 min. read
Elixir
Erlang
Project
Drinking the NixOS kool aid
Jun 30, 2020
9 min. read
Nix/NixOS
Testing with tracing on the BEAM
May 20, 2020
8 min. read
Elixir
Compositional Elixir—Contexts, Error Handling, and, Middleware
Mar 06, 2020
13 min. read
Elixir
GraphQL
Phoenix
Absinthe
Taming my Vim configuration
Jan 14, 2020
7 min. read
(Neo)Vim
Integrating GitHub Issues as a Blogging Platform
Nov 06, 2019
6 min. read
Elixir
GraphQL
About Me
Nov 05, 2019
1 min. read
General

Broadcasting model updates hooking into Ecto.Repo

Recently in a project I'm currently working on, we've had the need to orchestrate our code such that whenever we insert/update/delete entities, we want to broadcast these events to (potentially numerous) listeners.

One common though outdated use-case for this kind of thing would be capturing metrics when performing database operations; though with the advent of :telemetry integration in Ecto this is no longer an issue.

The simple solution

Our particular use-case involved notifying interested external applications of changes in data they subscribed to. We decided to jump right in and build up a simple solution iteratively.

In all of our standard Elixir contexts, we simply migrated from:

defmodule MyApp.Accounts do
  def create(attrs) do
    %MyApp.Account{}
    |> MyApp.Account.changeset(attrs)
    |> MyApp.Repo.insert()
  end
end

to the following:

defmodule MyApp.Accounts do
  def create(attrs) do
    %MyApp.Account{}
    |> MyApp.Account.changeset(attrs)
    |> MyApp.Repo.insert()
    |> case do
      {:ok, account} ->
        :ok = broadcast(:create, :account, account)
        {:ok, account}
      error ->
        error
    end
  end
end

This obviously would have worked but it was pretty verbose and ended up creating a lot of boilerplate. If we ever added new contexts or wanted to modify how exactly we handled these cases we couldn't easily do it; we'd have to go in and be really careful to make sure everything looked okay—what we would have preferred was a more transparent solution and thus we ended up spiking out an approach.

Spiking a more transparent approach

Essentially, all we want to do is hook into database operations. I mentioned a little earlier that some previous use cases for wanting to orchestrate Ecto was logging, and that this use-case was no longer necessary because Ecto had be instrumented with :telemetry. Well what if we could hook into :telemetry? 😉

I've not deep-dived into how exactly :telemetry is implemented but glancing at the docs it looks quite similar at a high level to the :gen_event behaviour where we can define events that happen (in this case, Ecto defines [:my_app, :repo, :query]) that we can register event handlers to.

Jumping right in, if we create write some code as follows:

defmodule MyApp.RepoBroadcaster do
  @handler_id __MODULE__
  def init() do
    :ok = :telemetry.attach(@handler_id, [:my_app, :repo, :query], &MyApp.RepoBroadcaster.broadcast_event/4, %{})
  end

  def broadcast_event([:my_app, :repo, :query], _measurements, _metadata, _config) do
    binding()
    |> Map.new()
    |> process_data()
    |> broadcast()
  end

  defp process_data(...) do
    ...
  end
end

Then the function broadcast_event/4 gets run every time something touches the database! Nice!.. kind of... When we ended up spiking this out, everything worked to a point but the downside is that the data returned by the :telemetry.attach is all low level Postgrex(?) stuff; it's instrumented at low enough of a level that we spent a few hours writing code to parse the raw SQL queries/responses (and handling cases where we wouldn't get returned data in certain cases...) into a standard shape to broadcast in a consistent way.

When we looked back at what we had, we had developed a transparent way of hooking into Ecto as we set out to, but we ended up writing a lot of nasty, ugly and very possibly brittle code. We set out to look for a better way, again.

A return to the simple approach

The trade off between the transparent hook into Ecto vs essentially wrapping all of our context functions was one of code duplication / transparency.

We don't want to worry about the broadcasting of events at the context level but at the same time wrapping the return values of our contexts was pretty much exactly the functionality we wanted.

Looking around, there was a promising library which let us decorate functions with custom functionality. We could simply define some decoration functions and in all of our contexts do:

defmodule MyApp.Accounts do
  use BroadcastDecorator

  @decorate broadcast()
  def create(attrs) do
    %MyApp.Account{}
    |> MyApp.Account.changeset(attrs)
    |> MyApp.Repo.insert()
  end
end

This is almost the approach we went for, until I realised that we didn't even need this, the solution was going to be much simpler since decoration of functions is pretty much already provided by Elixir in the form of defoverridable

Essentially, when we mark a function as overridable with: defoverridable we can redefine that function however we want, but retaining access to the original implementation; the reason I didn't want to re-implement MyApp.Repo's insert/3, update/3 etc functions was because to replace them we'd need to essentially reimplement what that code was originally doing (or create a new module MyApp.RepoWithBroadcast which would wrap those functions; with defoverridable we can simply do:

defmodule MyApp.Repo do
  use Ecto.Repo,
    otp_app: :my_app,
    adapter: Ecto.Adapters.Postgres

  defoverridable delete: 2,
                 delete!: 2,
                 insert: 2,
                 insert!: 2,
                 update: 2,
                 update!: 2,
                 update_all: 3

  def delete(changeset, opts) do
    super(changeset, opts)
    |> broadcast(:delete)
  end

  def delete!(changeset, opts) do
    super(changeset, opts)
    |> broadcast(:delete)
  end

  def insert(query, opts) do
    super(query, opts)
    |> broadcast(:insert)
  end

  def insert!(query, opts) do
    super(query, opts)
    |> broadcast(:insert)
  end

  def update(query, opts) do
    super(query, opts)
    |> broadcast(:update)
  end

  def update!(query, opts) do
    super(query, opts)
    |> broadcast(:update)
  end

  def update_all(query, updates, opts) do
    super(query, updates, opts)
    |> broadcast(:update)
  end

  def broadcast(result) do
    ...do some broadcasting...
    result
  end
end

And it pretty much just works 🙂 What do you guys think about this approach? It works for our use-case pretty well and its as vanilla as you can get with no other dependencies. I'm surprised I haven't run into defoverridable more, we're kind of abusing it for this since it seems to be more tailored to letting people define behaviour callbacks but hey, whatever works 😆