Flaky Playwright Tests and Phoenix: A Distributed Systems Problem
The DBConnection.OwnershipError isn't a bug. It's the BEAM telling you you're building a distributed system wrong
Ever run into that dreaded DBConnection.OwnershipError during your Playwright tests? The one that makes you think, "Great, another flaky test." Let me tell you, this isn’t about flakiness or some elusive race condition. What you’re seeing is the BEAM VM doing exactly what it’s designed to do: enforcing strict process isolation. Remember, this isn’t a monolith you’re testing. You’re working with three fully isolated processes, each with its own boundaries. One of them is stepping over the line, trying to grab a database connection that it’s not supposed to have access to.
Here’s the kind of in-depth explanation I wish I’d had when I first ran into this issue. We’ll dig into how the Ecto SQL Sandbox operates, explore the process dictionary, walk through the ETS permission table lookups, and break down why phoenix_test_playwright it leverages the user agent string as a subtle communication channel. By the time we’re done, you won’t just have your tests running smoothly, you’ll also have a solid grasp of why the solution works, right down to the VM internals.
The Problem: Three Processes, Zero Shared Context
When you write what looks like a simple Playwright feature test:
# test/my_app_web/features/user_registration_test.exs
defmodule MyAppWeb.UserRegistrationTest do
use MyAppWeb.FeatureCase, async: true
test "user can register with valid data", %{conn: conn} do
conn
|> visit("/register")
|> fill_in("Email", with: "alice@example.com")
|> fill_in("Password", with: "SecurePass123!")
|> click_button("Create Account")
|> assert_has(".alert-success", text: "Welcome!")
end
end
You're actually orchestrating three distinct processes that share nothing but the BEAM runtime:
P1: The ExUnit Test Process (The Owner)
This is the process that runs your test code. When you use MyAppWeb.FeatureCase, somewhere in the setup it calls:
# What phoenix_test_playwright does for you
owner_pid = self()
{:ok, _owner_sup} = Ecto.Adapters.SQL.Sandbox.start_owner!(MyApp.Repo, ownership_timeout: :infinity)
This call is deceptively simple. What's actually happening:
- ETS Table Creation: The sandbox creates a public ETS table (by default named
$callers, but this is misleading, it's not about call stacks) that will store permission tuples. - Process Dictionary Injection: The
DBConnection.Ownershipprocess stores the actual connection reference in P1's process dictionary under the key:"$db_connection". This is crucial - only P1 can see this. - Transaction Wrapping: The connection is wrapped in a SQL
BEGINtransaction withROLLBACKqueued for when the owner exits.
The owner process (P1) now holds the database connection. But here's the kicker: when P1 exits, that connection dies with it. This is by design. It's a sandbox, transactions are supposed to be isolated and ephemeral.
P2: The HTTP Request Handler (The Orphan)
When Playwright's process hits your Phoenix endpoint with an HTTP request, Phoenix spawns a new, short-lived process to handle it. This process is supervised by your Endpoint supervisor, not your test. Its ancestry looks like:
P2 (HTTP Handler)
├── Parent: MyAppWeb.Endpoint.ProcessSupervisor
├── Grandparent: MyAppWeb.Endpoint
└── No relation to P1 whatsoever
When your LiveView mounts or controller action runs, it executes in P2's context. If it tries to query the database:
# Inside your LiveView mount/3 (running in P2)
def mount(_params, _session, socket) do
user_count = MyApp.Repo.aggregate(MyApp.Accounts.User, :count)
{:ok, assign(socket, user_count: user_count)}
end
The Repo.aggregate/3 call triggers a DBConnection lookup. Here's the exact sequence inside lib/db_connection.ex:
# Simplified from DBConnection source
def checkout(conn, opts) do
owner = DBConnection.Ownership.find_owner(self())
# ...
end
find_owner/1 does a three-step search:
- Check its own process dictionary:
Process.get(:"$db_connection")→nil - Walk ancestry: Recursively check parent processes → Not found
- Query ETS permission table: Look for
{self(), :allowed, owner_pid}→ Not found
Result: {:error, :not_found} → OwnershipError.
P3: The LiveView GenServer (The Long-Lived Ghost)
The real complexity starts after the initial HTTP request completes. When Playwright upgrades to a WebSocket connection, Phoenix spawns yet another process, a Phoenix.LiveView.Socket GenServer, supervised by the Phoenix.Socket.Pool supervisor:
P3 (LiveView Socket)
├── Parent: Phoenix.Socket.Pool.Supervisor
├── Grandparent: MyAppWeb.Endpoint
└── Still no relation to P1
This process is long-lived. It persists for the duration of the WebSocket connection, handling handle_event/3, handle_info/2, and handle_params/3 callbacks. If your LiveView does any async work:
def handle_info(:delayed_query, socket) do
# This runs AFTER your test might have passed
data = MyApp.Repo.all(MyApp.Analytics.Event)
{:noreply, assign(socket, data: data)}
end
You're now in a race condition. The test process (P1) might finish, trigger a transaction rollback, and exit, while P3 is still executing. When P3's query hits the database, the connection is gone.
How Ecto SQL Sandbox Actually Works
The Process Dictionary: Connection Storage
When start_owner!/2 is called, it eventually reaches DBConnection.Ownership.start_link/2. Here's the critical code from lib/db_connection/ownership.ex:
def init({pool, owner, tag, timeout}) do
# The connection ref is stored in THIS process's dictionary
Process.put(:"$db_connection", {:owner, pool, tag})
# ...
end
The process dictionary is a key-value store local to each process. It's not shared. It's not a global variable. It's not accessible from other processes. This is fundamental BEAM isolation.
When your test process (P1) owns the connection, only P1 can find it in its dictionary. No amount of send/2 or message passing will give P2 or P3 access. They need explicit permission.
The ETS Table: The Permission Ledger
The sandbox creates a public ETS table (by default: :"$db_connection_owners"). Let's examine its structure:
# When you call Ecto.Adapters.SQL.Sandbox.allow/3
:ets.insert_new(@ownership_table, {allowed_pid, :allowed, owner_pid, pool})
The table schema is:
- Key:
allowed_pid(the process being granted access) - Value:
{allowed_pid, :allowed, owner_pid, pool} - Access: Public (readable/writable from any process)
When P2 or P3 attempts a database operation, DBConnection.find_owner/1 queries this table:
# From lib/db_connection/ownership.ex
def find_owner(caller_pid) do
case :ets.lookup(@ownership_table, caller_pid) do
[{^caller_pid, :allowed, owner_pid, _pool}] -> {:ok, owner_pid}
[] -> find_owner_by_ancestry(caller_pid)
end
end
This is a constant-time O(1) lookup, fast, but it means the permission must be inserted before the query happens.
The Permission Check Flow (Step-by-Step)
Let's trace a Repo.insert/1 call from within a LiveView handle_event/3:
# 1. User clicks button
# 2. P3 (LiveView) receives websocket message
# 3. handle_event/3 executes in P3 context
def handle_event("create_user", params, socket) do
# This line:
user = MyApp.Repo.insert!(changeset)
# Expands to:
Ecto.Repo.insert!(MyApp.Repo, changeset)
# Which calls:
Ecto.Adapters.SQL.insert(adapter_meta, query, opts)
# Which calls:
DBConnection.execute(conn, query, opts)
# Inside DBConnection.execute/4:
ownership = DBConnection.Ownership.find_owner(self())
# self() is P3's PID
# find_owner/1 does:
# Step 1: Check own process dictionary
Process.get(:"$db_connection") # => nil
# Step 2: Check ETS table
:ets.lookup(:'$db_connection_owners', self()) # => []
# Step 3: Check ancestry (simplified)
find_ancestor_owner(self()) # => :error
# Result: {:error, :not_found}
# Which raises: DBConnection.OwnershipError
end
The error message is telling you the truth: "cannot find ownership process for #PID<0.x.y>". It searched the process dictionary, the ETS table, and the supervision tree. Nothing.
The User Agent Pattern: A Covert Permission Channel
Why Not Custom Headers?
You might think: "I'll just pass the owner PID in a custom header!" Let's see why that fails:

The WebSocket upgrade handshake does not include custom HTTP headers from the original request. The spec only allows the Cookie header (for session) and a few others. Your X-Test-Owner header dies at the upgrade boundary.
The User Agent Survives
The User-Agent header is different. It's part of the browser's persistent identity, not the request. When you configure Playwright's browser context:
// What phoenix_test_playwright does automatically
const browser = await chromium.launch();
const context = await browser.newContext({
userAgent: "Mozilla/5.0 ... Sandbox: {metadata}"
});
This user agent is sent on every HTTP request and is included in the WebSocket handshake's User-Agent field. It's the only piece of metadata that reliably crosses the HTTP/WebSocket boundary.
How phoenix_test_playwright Encodes Metadata
Let's look at the actual implementation (simplified from the library):
# In test setup
def start_owner_and_encode_metadata(repo) do
# 1. Start owner
{:ok, owner_pid} = Ecto.Adapters.SQL.Sandbox.start_owner!(repo)
# 2. Generate metadata map
metadata = %{
repo: repo,
owner: owner_pid,
test_pid: self()
}
# 3. Encode with Phoenix's built-in encoder
encoded = Phoenix.Ecto.SQL.Sandbox.encode_metadata(metadata)
# Returns a base64-encoded, compressed string
# 4. Inject into browser context
set_browser_user_agent(encoded)
encoded
end
The encode_metadata/1 function produces a string like:
"Phx-Ecto-Sandbox: eJxVjE0KwjAQRfdzin4B0hZc+QAuXLiC5iGppG1I2lSUDqXv7k1c3MzL
m3kTt5T4W6QsQeDJH9JO3Kqt0BVmhXJXJK6VMK0rYVYXwjQvhHl5JO3LO/5O3B8AAAD//wMAJQBLJg=="
This is base64-encoded, zlib-compressed Erlang term. It's opaque, compact, and survives header parsing.
Decoding and Permission Granting
In your endpoint, the Phoenix.Ecto.SQL.Sandbox plug decodes this:
# lib/phoenix_ecto/sql_sandbox.ex
def call(conn, _opts) do
case get_req_header(conn, "user-agent") do
[user_agent | _] ->
case extract_metadata(user_agent) do
{:ok, %{owner: owner_pid, repo: repo}} ->
# CRITICAL: Grant permission to THIS process (P2)
Ecto.Adapters.SQL.Sandbox.allow(repo, owner_pid, self())
:error ->
:ok
end
[] ->
:ok
end
conn
end
The allow/3 call inserts into the ETS table:
# From ecto_sql/lib/ecto/adapters/sql/sandbox.ex
def allow(repo, owner_pid, allowed_pid) do
pool = GenServer.whereis(repo)
# Check owner actually owns a connection
case :ets.lookup(@owner_table, owner_pid) do
[{^owner_pid, :owner, _pool, _tag}] ->
# Insert permission tuple
:ets.insert(@ownership_table, {allowed_pid, :allowed, owner_pid, pool})
:ok
[] ->
{:error, :not_found}
end
end
Now, when P2 queries, the ETS lookup succeeds:
# P2's database query:
:ets.lookup(:'$db_connection_owners', self())
# => [{#PID<0.3421.0>, :allowed, #PID<0.2261.0>, #PID<0.123.0>}]
LiveView Permission: The on_mount Hook
The HTTP handler (P2) is short-lived. After it renders the initial HTML, it terminates. But the WebSocket process (P3) is just starting. It needs the same permission granted again.
The get_connect_info/2 function extracts the user agent from the WebSocket handshake:
# lib/my_app_web/live_helpers.ex
def on_mount(:default, _params, _session, socket) do
if connected?(socket) do # Only for WebSocket-connected mount
case get_connect_info(socket, :user_agent) do
user_agent when is_binary(user_agent) ->
# Same metadata, same permission grant
Phoenix.Ecto.SQL.Sandbox.allow(user_agent, Ecto.Adapters.SQL.Sandbox)
_ -> :ok
end
end
{:cont, socket}
end
Critical detail: connected?(socket) returns false on the initial HTTP render (P2's context) and true on the WebSocket mount (P3's context). This prevents double-granting permission to the same process.
Mox: The Same Isolation, Same Solution
Mox (Elixir's mocking library) has the same problem. Mock expectations are stored in the defining process's dictionary. Let's trace through:
# test file
test "sends welcome email", %{conn: conn} do
# Expectation stored in P1's dictionary
expect(MyApp.MockMailer, :send, fn _email -> {:ok, %{id: "test-123"}} end)
conn
|> visit("/register")
|> click_button("Create Account") # Triggers mailer in P3
# P3 cannot see P1's expectation!
end
# Inside the LiveView
def handle_event("create_account", params, socket) do
# This runs in P3
MyApp.MockMailer.send(email) # Mox looks in P3's dictionary → :error
# Raises: Mox.UnexpectedCallError
end
Mox stores expectations in an ETS table private to the defining process:
# From mox/lib/mox.ex
def expect(mock, name, n \\ 1, code) do
# Store in process dictionary of CURRENT process (P1)
Process.put({mock, name}, %{n: n, code: code})
end
The Mox.allow/3 Solution
Just like Ecto, Mox provides allow/3 for cross-process expectations:
# test/support/live_helpers.ex
def on_mount(:default, _params, _session, socket) do
if connected?(socket) do
case get_connect_info(socket, :user_agent) do
user_agent when is_binary(user_agent) ->
# Decode metadata to get test PID
metadata = Phoenix.Ecto.SQL.Sandbox.decode_metadata(user_agent)
# Allow Mox expectations
Mox.allow(MyApp.MockMailer, metadata.test_pid, self())
Mox.allow(MyApp.MockRepo, metadata.test_pid, self())
Mox.allow(MyApp.MockHTTPClient, metadata.test_pid, self())
# Also allow Ecto sandbox
Phoenix.Ecto.SQL.Sandbox.allow(user_agent, Ecto.Adapters.SQL.Sandbox)
_ -> :ok
end
end
{:cont, socket}
end
Mox Patterns: Stubs vs. Expectations
For async-safe tests, prefer stubs:
# test/support/feature_case.ex
defmodule MyAppWeb.FeatureCase do
use ExUnit.CaseTemplate, async: true
setup _tags do
# Stubs are GLOBAL - visible to all processes
stub(MyApp.MockMailer, :send, fn _email ->
{:ok, %{id: "stubbed-id"}}
end)
stub(MyApp.MockHTTPClient, :get, fn _url ->
{:ok, %{status: 200, body: "ok"}}
end)
:ok
end
end
Stubs are stored in a shared ETS table (:$mox_global), making them visible to all processes. They don't require allow/3.
Use expectations only when you need to assert call count or arguments:
# test file
test "charges credit card exactly once", %{conn: conn} do
expect(MyApp.MockPaymentGateway, :charge, 1, fn _amount ->
{:ok, %{transaction_id: "tx-123"}}
end)
# ... test code ...
# Verify called exactly once
verify!(MyApp.MockPaymentGateway)
end
Never use set_mox_global/1:
# DON'T - this forces async: false
setup :set_mox_global # Creates race conditions between tests
set_mox_global makes expectations global, which means tests can interfere with each other. It's async: false by another name.
The Race Condition: When Tests Finish Too Early
The Problem: Async Work After Test Completion
Here's a real-world scenario that will bite you:
defmodule MyAppWeb.DashboardLive do
use MyAppWeb, :live_view
def mount(_params, _session, socket) do
if connected?(socket) do
# Simulate delayed analytics loading
Process.send_after(self(), :load_charts, 500)
Process.send_after(self(), :load_metrics, 1000)
end
{:ok, assign(socket, page_state: :loading, charts: nil, metrics: nil)}
end
def handle_info(:load_charts, socket) do
# This runs 500ms after mount
charts = MyApp.Analytics.generate_charts() # DB queries here
{:noreply, assign(socket, charts: charts)}
end
def handle_info(:load_metrics, socket) do
# This runs 1000ms after mount
metrics = MyApp.Analytics.get_metrics() # More DB queries
{:noreply, assign(socket, metrics: metrics, page_state: :ready)}
end
end
Your test:
test "shows dashboard", %{conn: conn} do
conn
|> visit("/dashboard")
|> assert_has(".chart-container") # Charts might not be loaded yet!
# Test passes as soon as HTML renders...
# ...but LiveView is still processing :load_charts message
end
Timeline:
t=0ms: Test starts, visits /dashboard
t=10ms: HTTP request (P2) renders initial HTML, shows loading state
t=15ms: WebSocket connects (P3), mount/3 sends delayed messages
t=20ms: Test asserts on HTML, passes
t=25ms: Test process (P1) exits, transaction rolls back
t=500ms: P3 handles :load_charts, tries to query DB
→ 💥 DBConnection.ConnectionError: owner exited
The Solution: assign_async and Semantic State Tracking
Phoenix LiveView has assign_async/3 for exactly this problem:
def handle_info(:load_data, socket) do
socket =
socket
|> assign(page_state: :loading)
|> assign_async(:charts, fn ->
# Runs in a Task process, but error handling is managed
{:ok, %{charts: MyApp.Analytics.generate_charts()}}
end)
|> assign_async(:metrics, fn ->
{:ok, %{metrics: MyApp.Analytics.get_metrics()}}
end)
{:noreply, socket}
end
def handle_async(:charts, {:ok, %{charts: charts}}, socket) do
{:noreply, assign(socket, charts: charts) |> maybe_set_complete()}
end
def handle_async(:metrics, {:ok, %{metrics: metrics}}, socket) do
{:noreply, assign(socket, metrics: metrics) |> maybe_set_complete()}
end
defp maybe_set_complete(%{assigns: assigns} = socket) do
if assigns.charts != :loading && assigns.metrics != :loading do
assign(socket, page_state: :complete)
else
socket
end
end
How assign_async works internally (simplified):
# From phoenix_live_view/lib/phoenix_live_view.ex
def assign_async(socket, key, func) do
task_pid = Task.async(fn ->
try do
result = func.()
send(self(), {:async_result, key, result})
catch
kind, reason -> send(self(), {:async_result, key, {:exit, kind, reason}})
end
end)
put_in(socket.assigns[key], {:loading, task_pid})
end
The key insight: assign_async tracks the state of the async operation, not just the result. When you assert on data-page-state="complete", you're waiting for a deterministic state, not racing against a timer.
Testing with Semantic State
# Template
<div data-page-state={@page_state} data-testid="dashboard">
<%= if @page_state == :loading do %>
<.spinner />
<% else %>
<div class="charts"><%= @charts %></div>
<div class="metrics"><%= @metrics %></div>
<% end %>
</div>
# Test
test "loads all analytics completely", %{conn: conn} do
conn
|> visit("/dashboard")
# Wait for explicit state, not arbitrary timeout
|> assert_has("[data-page-state='complete']", timeout: 5_000)
# Now safe to assert on content
|> assert_has(".charts")
|> assert_has(".metrics")
end
Performance win: The test only waits as long as needed, not a fixed Process.sleep/1 duration.
Advanced Patterns and Edge Cases
Testing GenServer Calls from LiveView
What if your LiveView calls a GenServer that queries the database?
defmodule MyApp.PriceCalculator do
use GenServer
def calculate(product_id) do
GenServer.call(__MODULE__, {:calculate, product_id})
end
def handle_call({:calculate, product_id}, _from, state) do
# This runs in the GenServer process (P4)
product = MyApp.Repo.get!(MyApp.Catalog.Product, product_id)
price = compute_price(product)
{:reply, price, state}
end
end
defmodule MyAppWeb.ProductLive do
def handle_event("calculate", %{"id" => id}, socket) do
price = MyApp.PriceCalculator.calculate(id) # Calls P4
{:noreply, assign(socket, price: price)}
end
end
Now you have four processes: P1 (test), P3 (LiveView), and P4 (GenServer). You need to allow P4 too:
# In your LiveView on_mount
def on_mount(:default, _params, _session, socket) do
if connected?(socket) do
case get_connect_info(socket, :user_agent) do
user_agent when is_binary(user_agent) ->
metadata = decode_metadata(user_agent)
# Allow the GenServer too
Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, metadata.owner, MyApp.PriceCalculator)
# Also need to allow Mox if GenServer uses mocks
Mox.allow(MyApp.MockAPI, metadata.test_pid, MyApp.PriceCalculator)
# Grant to current process (P3)
Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, metadata.owner, self())
_ -> :ok
end
end
{:cont, socket}
end
Better approach: Pass the caller's PID explicitly:
def handle_event("calculate", %{"id" => id}, socket) do
# Tell the GenServer to use our permissions
price = MyApp.PriceCalculator.calculate(id, caller_pid: self())
{:noreply, assign(socket, price: price)}
end
# GenServer
def handle_call({:calculate, id, caller_pid}, _from, state) do
# Allow this specific call
Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, caller_pid, self())
# ... query database ...
end
Testing Oban Jobs Triggered by LiveView
def handle_event("bulk_import", %{"file" => file}, socket) do
# Enqueues job that runs in separate process
%{id: job_id} = Oban.insert!(MyApp.Workers.ImportJob.new(%{file: file}))
# Track job in socket for testing
{:noreply, assign(socket, import_job_id: job_id)}
end
The Oban worker runs in yet another process (P5), outside the sandbox. You have three options:
- Disable Oban in tests (simplest):
# config/test.exs
config :my_app, Oban, testing: :inline # Runs synchronously in test process
- Allow the worker (complex):
defmodule MyApp.Workers.ImportJob do
use Oban.Worker
@impl true
def perform(%Oban.Job{args: args}) do
# Worker needs to decode user agent from args
metadata = args["sandbox_metadata"]
Ecto.Adapters.SQL.Sandbox.allow(metadata, Application.fetch_env!(:my_app, :sandbox_mod))
# ... perform work ...
end
end
- Test the effect, not the job (recommended):
test "bulk imports users", %{conn: conn} do
conn
|> visit("/import")
|> upload_file("input[type=file]", "users.csv")
|> click_button("Import")
# Wait for LiveView to signal completion
|> assert_has("[data-import-state='complete']")
# Assert on final state, not job internals
assert MyApp.Repo.aggregate(MyApp.Accounts.User, :count) == 100
end
Memory Implications
Each sandbox connection holds a PostgreSQL backend process open. With async: true, you can have N connections simultaneously:
# config/test.exs
config :my_app, MyApp.Repo,
pool_size: 20, # Up to 20 parallel tests
ownership_timeout: :infinity
Monitor with :observer.start():
- P1 (test): ~2MB each
- PostgreSQL backend: ~5MB each
- Total for 20 tests: ~140MB
On modern hardware, this is trivial. The parallelism gain far outweighs the memory cost.
Debugging: When It Still Doesn't Work
Tool 1: Trace the Permission Flow
# In iex -S mix test
:dbg.tracer()
:dbg.p(:all, [:call])
# Trace all Sandbox.allow calls
:dbg.tp(Ecto.Adapters.SQL.Sandbox, :allow, 3, [])
# Run test, watch output
You'll see exactly which processes are calling allow/3 and when.
Tool 2: Inspect the ETS Table
# In test setup or IEx
def inspect_sandbox() do
table = :"$db_connection_owners"
# All permissions
:ets.tab2list(table)
|> Enum.each(fn {allowed, :allowed, owner, pool} ->
IO.puts "#{inspect(allowed)} ← allowed by #{inspect(owner)}"
end)
# Owner connections
owner_table = :"$db_connection_owner_table"
:ets.tab2list(owner_table)
|> Enum.each(fn {owner, :owner, pool, tag} ->
IO.puts "Owner: #{inspect(owner)} → Pool: #{inspect(pool)}"
end)
end
Tool 3: Trace Process Exits
# In your test
def test_with_exit_trace do
# Monitor owner
owner = self()
ref = Process.monitor(owner)
# Run test logic...
receive do
{:DOWN, ^ref, :process, ^owner, reason} ->
IO.inspect(reason, label: "Owner exited")
after
5000 -> :ok
end
end
Common Error #1: "owner exited" After Test Passes
Symptom: Test passes, then you see a database error in the logs.
Cause: LiveView is still processing async work.
Fix: Use assign_async and assert on the semantic state, not just UI presence.
Common Error #2: "cannot find ownership process" on Initial Page Load
Symptom: OwnershipError on first HTTP request.
Cause: Phoenix.Ecto.SQL.Sandbox plug not in endpoint, or placed before Plug.Parsers.
Fix: Ensure plug is after parsers but before router:
plug Plug.Parsers, ...
plug Phoenix.Ecto.SQL.Sandbox # Here!
plug MyAppWeb.Router
Common Error #3: Mox.UnexpectedCallError in LiveView
Symptom: Mock works in test but fails in LiveView.
Cause: Didn't call Mox.allow/3 in on_mount.
Fix: Add Mox.allow(mock, test_pid, self()) alongside sandbox allow.
Common Error #4: Intermittent Failures with assign_async
Symptom: Tests pass locally, fail in CI.
Cause: CI is slower, async work times out before assert_has.
Fix: Increase assert_has timeout or optimize async functions.
# Increase timeout for CI
|> assert_has("[data-page-state='complete']", timeout: 10_000)
The Internal Implementation: Reading the Source
Let's examine the actual DBConnection source to understand the permission check:
# From hex.pm package db_connection 2.8.1, lib/db_connection/ownership.ex
defmodule DBConnection.Ownership do
@moduledoc """
DBConnection plugin for ownership.
"""
@ownership_table :"$db_connection_owners"
def find_owner(pid) when is_pid(pid) do
case Process.get(:"$db_connection") do
{:owner, pool, tag} ->
{:ok, {pool, tag}}
_ ->
case :ets.lookup(@ownership_table, pid) do
[{^pid, :allowed, owner, pool}] ->
# Recursively find the owner's connection
case find_owner(owner) do
{:ok, {pool, tag}} -> {:ok, {pool, tag}}
error -> error
end
[] ->
# Walk supervision tree
find_owner_by_ancestry(pid)
end
end
end
defp find_owner_by_ancestry(pid) do
case Process.info(pid, :dictionary) do
{:dictionary, dict} ->
case dict[:"$db_connection_parent"] do
nil -> :error
parent -> find_owner(parent)
end
_ -> :error
end
end
end
Key insights from source:
- Transitive permissions are not automatic: If A allows B, B cannot allow C. Only the original owner can grant permissions.
- Ancestry check uses
$db_connection_parent: This is howallow/3with{:process, parent_pid}works. - ETS lookup is recursive: It finds the owner, then finds the owner's connection.
Alternative Approaches: Trade-offs
Approach 1: Global Sandbox Mode
# config/test.exs
config :my_app, MyApp.Repo, pool: Ecto.Adapters.SQL.Sandbox, ownership_mode: :global
Pros: No allow/3 needed, all processes share one connection.
Cons:
- Forces
async: false(global state) - Tests can interfere (uncommitted data visible across tests)
- PostgreSQL deadlock risk with parallel tests
Verdict: Only for legacy suites, you can't refactor.
Approach 2: Transaction Isolation in Tests
# Don't use sandbox, manage transactions manually
setup do
:ok = Ecto.Adapters.SQL.begin_test_transaction(MyApp.Repo)
on_exit(fn -> Ecto.Adapters.SQL.rollback_test_transaction(MyApp.Repo) end)
end
Pros: No process isolation issues.
Cons:
- Still requires
async: false(shared transaction) - Manual cleanup is error-prone
- Doesn't work with LiveView (process dies before cleanup)
Verdict: Pre-sandbox era pattern, don't use.
Approach 3: Roll Your Own Permission Channel
# Pass PID through application environment
setup do
owner = self()
Application.put_env(:my_app, :test_owner, owner)
on_exit(fn -> Application.delete_env(:my_app, :test_owner) end)
end
# In LiveView
def on_mount(_, _, _, socket) do
if owner = Application.get_env(:my_app, :test_owner) do
Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, owner, self())
end
{:cont, socket}
end
Pros: Simple, no user-agent magic.
Cons:
- Global state (breaks async)
- Race conditions between tests
- Application env is a bottleneck (single process access)
Verdict: Anti-pattern. Global state is death to async tests.
The Correct Mental Model: Distributed Erlang
At its core, the ownership system models distributed Erlang. Each test process is like a remote node that owns resources. The allow/3 function is like granting RPC permissions.

When you think in these terms, the solution is obvious: explicit, fine-grained permission grants, exactly what you'd do in a real distributed system.
Final Architecture: The Complete Picture

This is what you're building. It's not a simple test, it's a microservices architecture compressed into a single VM.
TL;DR: Key Takeaways
DBConnection.OwnershipErroris correct behavior - The BEAM is protecting you from shared-state bugs.- Three processes, three contexts - Test, HTTP, and WebSocket processes are isolated by design.
- User agent is the only reliable metadata channel - It survives HTTP → WebSocket upgrade. Headers don't.
allow/3is a permission grant, not a connection transfer - The owner still controls the connection; others get temporary access.async: falseis admitting defeat - UsePhoenix.Ecto.SQL.Sandboxplug andon_mounthooks instead.- Race conditions are real - Use
assign_asyncand assert on the semantic state likedata-page-state="complete". - Mox needs the same treatment - Call
Mox.allow/3alongside your sandbox allow. - This is distributed systems 101 - The same patterns apply to
Node.connect/2and:rpc.multicall/4.
The ownership system isn't a limitation to work around—it's a correct design for concurrent, isolated tests. Once you stop fighting it and start leveraging it, your tests become reliable, parallel, and fast.
References
- phoenix_test_playwright - Library that implements this pattern
- Ecto.Adapters.SQL.Sandbox - Official documentation
- Phoenix.Ecto.SQL.Sandbox - Plug documentation
- Mox Multi-process Collaboration - Mox process isolation
- DBConnection Ownership Source - Implementation details
- Phoenix LiveView assign_async - Deterministic async handling