Using Agent to Maintain Channel State in Phoenix

In my previous post, we used Phoenix Presence to track user presence in a given "challenge room" (think: chat room). In this post, we'll build our own Agent-backed module to track channel state that does not pertain to user presence.

In our pair programming app, the shared text-editor is accompanied by a drop-down menu of programming languages. If one user selects a particular language, say Ruby, that choice should be broadcast to all of the other users in the challenge room and their language selection should be updated as well. Furthermore, when a user arrives in a challenge room, their selected language should automatically be set to whatever that challenge room's currently selected language is. This real-time functionality is backed by the ChallengeChannel.

Our feature should behave like this:

Why Agent?

Agent is a super simple wrapper around state.

Often in Elixir there is a need to share or store state that must be accessed from different processes or by the same process at different points in time. The Agent module provides a basic server implementation that allows state to be retrieved and updated via a simple API. –– Hex Docs

Agent, which is built on top of GenServer, behaves similarly to GenServer. Both allow us to start up a process to track state and maintain a centralized data store, handle and respond to error events and manage the running of the process from a supervision tree.

Agent provides a more simple interface than GenServer however. We can get an Agent-backed module up and running by writing less code than with a GenServer

We'll spin up an Agent process and register it as a global process managed by the supervision tree of our app. This way, we can use the Agent to share information (in this case, the currently selected challenge language) across all channels.

Let's get started!

Defining the Agent Module

We'll define our Agent-backed module, ChallengeChannel.Monitor in lib/phoenix_pair/challenge_channel/monitor.ex.

defmodule PhoenixPair.ChallengeChannel.Monitor do  
  def start_link(initial_state) do
    Agent.start_link(fn -> initial_state end, name: __MODULE__)
  end
end  

The start_link function tells our Agent-backed module how to start up the Agent process. It will get invoked by our supervision tree when the app starts up.

Supervising The Agent

We want our Agent process to start running when our app boots up, and we want it to be part of the supervisor tree of our app.

We'll add our Monitor module as one of the workers to be supervised by our app:

# lib/phoenix_pair.ex  
def start(_type, _args) do  
  import Supervisor.Spec

  children = [
    supervisor(PhoenixPair.Repo, []),
    supervisor(PhoenixPair.Endpoint, []),
    worker(PhoenixPair.ChallengeChannel.Monitor, [%{}])
  ]
end  

The worker function takes in a first argument of the process we want to start and supervise. It takes in a second argument of a list of parameters to pass to the supervised module's start_link function.

We define the initial state that gets passed to Monitor.start_link/1 as an empty map. This is because we will build out the state of each channel as users interact with each channel.

Now we're ready to build out the functionality of our module.

Using Agent to maintain state

We'll store the currently selected language of a given challenge room, or channel. We'll broadcast this information, which represents the current state of a channel, when a user joins the channel.

So, in our channel's join function we'll call on our Monitor module to give us the state of the channel, if such a state exists, or initialize the state of the channel, if this user is the first visitor.

 # challenge_channel.ex

def join("challenges:" <> challenge_id, _params, socket) do  
  challenge = Repo.get(Challenge, id)
  send(self, {:after_join, challenge})
  {:ok, %{challenge: challenge}, assign(socket, :challenge, challenge)}
end

def handle_info({:after_join, challenge}, socket) do  
  Monitor.get_challenge_state(challenge.id)
  broadcast! socket, "user:joined", %{challenge_state: challenge_state(challenge.id)}
  {:noreply, socket}
end  

Here, we tell our channel to respond to the :after_join message by asking our Monitor for the state of the challenge channel and broadcasting that state to the client.

Our Monitor therefore needs to be able to respond to the get_challenge_state function with the state of the given challenge, i.e. the currently selected language for that challenge.

The #get_challenge_state function

Agent, like GenServer, provides a separation between the client and server.

Actions that we execute inside the server are atomic––they will lock up the server until the completion of the action. This means that we may want to think twice before executing expensive actions on the Agent's server, since it will delay any other actions from being performed on that server.

On the other hand, executing an action in the client, by copying over the current state of the agent process to the client and operating on it there, could create a race condition. In a race condition, process #1 could be executing on one client copy of state, and process #2 could create a new version of state before process #1 completes.

The operation of fetching state for a given challenge ID is not an expensive one, and we don't want to encounter a race condition in which one user manages to update the currently selected language before another user finishes getting/updating that language. So we'll use server-side processes to interact with our Agent's state.

To execute a process on the Agent server, we use an anonymous function to operate on state:

# lib/phoenix_pair/challenge_channel/monitor.ex

def get_challenge_state(challenge) do  
  Agent.get(__MODULE__, fn state -> get_challenge_state(state, challenge) end)
end

defp get_challenge_state(state, challenge) do  
  case state[challenge] do
    nil ->
      state
      |> Map.put(challenge, %{language: "ruby"})
      |> Map.get(challenge)
    data ->
      state[challenge]
  end
end  

We pass an anonymous function as the second argument to Agent.get. That anonymous function in turn calls a private helper function, get_challenge_state.

This function uses a case statement to pattern match the result of accessing the challenge ID key of the Agent's state. If such a key exists, we will return the value of that key. If no such key exists, we'll create the initial state for that challenge in the format:

# where challenge ID is 1
%{1 => %{language: "ruby"}}

The #update_language function

When a user selects a new language from the drop-down menu, we will send a message to the channel of "language:update", with a payload of the newly selected language.

Our channel will respond to that message by updating the section of our Monitor Agent's state for that challenge ID. Then, it will broadcast the new challenge state to all subscribing clients.

# challenge_channel.ex

def handle_in("language:update", %{"response" => response}, socket) do  
  Monitor.language_update(socket.assigns.challenge.id, response)
  broadcast! socket, "language:updated", challenge_state(current_challenge(socket).id)
  {:noreply, socket}
end  

Our Monitor.language_update/2 function will also be a server-side function.

# monitor.ex  
def language_update(challenge, language) do  
  Agent.update(__MODULE__, fn state -> language_update(state, challenge, language) end)
end

defp language_update(state, challenge, language) do  
  put_in(state, [challenge, :language], language)
end  

Here, we pass an anonymous function as the second argument to Agent.update. This anonymous function calls a private helper function, language_update, which creates a new version of the Agent's state, with the nested map under this challenge's ID updated to reflect the newly selected language.

Conclusion

Our Monitor module leverages Agent to maintain a fairly simple state––the currently selected language for each challenge channel. We could easily grow our Monitor module to store more complex information in state, and to operate on that state by way of the ChallengeChannel.

As we continue to leverage Phoenix Channels and WebSockets to build real-time features, we move away from the stateless paradigm of HTTP requests.

Instead of a web app that relies on a series of stateless request/response cycles, we send messages through bi-directional channels, and share information across requests and across clients.

Elixir offers a number of tools that are a great fit for sharing data across processes. Agent is one such tool, and it is an excellent way to share data across channels, allowing our real-time features to maintain statefulness.

subscribe and never miss a post!

Blog Logo

Sophie DeBenedetto

comments powered by Disqus
comments powered by Disqus