Two Approaches To Building A Chat Server

Alex Woods

Alex Woods

April 28, 2024


Let's build a chat server.

Our server won't have any persistence layer, meaning:

  • no chat history
  • no backlogged messages going to offline users when they log on
  • you can only send messages to other online users

For simplicity's sake, we'll assign users integer ids (from a sequence) starting at 1.

Our API will be very simple:

  • users connect to ws://localhost:8080
  • you send a message in this format, using JSON
{
  "recipient_id": 2,
  "message": "Go Braves!"
}

If you receive a message, it will be in this format

{
  "sender_id": 1,
  "message": "Go Braves!"
}

Now, this is JSON encoded as text. I realize websockets only support text and binary.

Router Approach

Of the two approaches that come to mind for this problem, the first one I'm going to call the "router" approach.

We'll have one goroutine for receiving messages, one for sending them, and one for routing in between recipient and sender goroutines for different users.

adsf

client.go
package main

import (
	"log"
	"sync/atomic"

	"github.com/gorilla/websocket"
)

type IncomingMessage struct {
	RecipientId int    `json:"recipient_id"`
	Message     string `json:"message"`
}

type OutgoingMessage struct {
	SenderId int    `json:"sender_id"`
	Message  string `json:"message"`
}

type Client struct {
	id               int
	router           *Router
	conn             *websocket.Conn
	outgoingMessages chan OutgoingMessage
}

func (c *Client) unregister() {
	c.conn.Close()
	c.router.removeClient <- c
}

func (c *Client) ReceiveMessages() {
	defer c.unregister()

	for {
		var incomingMessage IncomingMessage
		err := c.conn.ReadJSON(&incomingMessage)

		if err != nil {
			// This occurs when the websocket connection is closed
			log.Printf("Error reading message: %v", err)
			break
		}

		log.Printf("Received message `%s` for user %d", incomingMessage.Message, incomingMessage.RecipientId)

		c.router.messages <- Message{
			SenderId:    c.id,
			RecipientId: incomingMessage.RecipientId,
			Message:     incomingMessage.Message,
		}
	}
}

func (c *Client) SendMessages() {
	for outgoingMessage := range c.outgoingMessages {
		c.conn.WriteJSON(outgoingMessage)
	}
}

var nextAvailableIdToAssign uint64 = 0

func generateUserID() int {
	return int(atomic.AddUint64(&nextAvailableIdToAssign, 1))
}

func NewClient(router *Router, conn *websocket.Conn) *Client {
	id := generateUserID()

	return &Client{
		id:               id,
		outgoingMessages: make(chan OutgoingMessage),
		router:           router,
		conn:             conn,
	}
}
router.go
package main

import "log"

type Message struct {
	RecipientId int
	SenderId    int
	Message     string
}

type Router struct {
	messages     chan Message
	addClient    chan *Client
	removeClient chan *Client
	clients      map[int]*Client
}

func NewRouter() *Router {
	return &Router{
		clients: make(map[int]*Client),
		messages:     make(chan Message, 10),
		addClient:    make(chan *Client),
		removeClient: make(chan *Client),
	}
}

func (r *Router) Run() {
	for {
		select {
		case clientToAdd := <-r.addClient:
			r.clients[clientToAdd.id] = clientToAdd
		case clientToRemove := <-r.removeClient:
			delete(r.clients, clientToRemove.id)
		case incomingMessage := <-r.messages:
			recipient := r.clients[incomingMessage.RecipientId]

			if recipient == nil {
				log.Printf("User %d is not connected; cannot send them a message.", incomingMessage.RecipientId)
				continue
			}

			recipient.outgoingMessages <- OutgoingMessage{
				SenderId: incomingMessage.SenderId,
				Message:  incomingMessage.Message,
			}
		}
	}
}
main.go
package main

import (
	"log"
	"net/http"

	"github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
}

func handler(router *Router, w http.ResponseWriter, r *http.Request) {
	conn, _ := upgrader.Upgrade(w, r, nil)

	client := NewClient(router, conn)

	router.addClient <- client

	go client.SendMessages()
	go client.ReceiveMessages()
}

func main() {
	router := NewRouter()
	go router.Run()

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		handler(router, w, r)
	})

	log.Fatal(http.ListenAndServe("localhost:8080", nil))
}

Some notes about this approach:

  • There are 2n + 1 goroutines running, where n is the number of connections.
  • Only the go router.Run() goroutine has access to all the clients.
  • Even registering and de-registering a user occurs via channels.
  • The Router is potentially a bottleneck - 2 unrelated high flux conversations could cause latency for each other.

Let's see how that stacks up against our second approach.

The User Directory Approach

In this approach, we get rid of the potential Router bottleneck.

Each ReceiveMessages goroutine can send data directly to the SendMessages goroutine of the recipient user. It uses a shared data structure, which I'm calling a "user directory", to get the right channel.

adsf

However, can just we write directly to the connection of the correct user in the ReceiveMessages goroutine? Why an extra SendMessages goroutine?

That question actually applies to the previous example as well - "why not send data to the connection directly in the Router?"

Basically, because we're breaking the contract of the library we're using. Here's the direct quote from the gorilla websocket docs:

Connections support one concurrent reader and one concurrent writer.

Applications are responsible for ensuring that no more than one goroutine calls the write methods (NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel) concurrently and that no more than one goroutine calls the read methods (NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler) concurrently.

So, we can't let multipe goroutines write to a single connection. Now, we could have a single goroutine for sending and receiving called something like ManageConnection, with a select statement in a for loop, but I believe separating them is cleaner.

client.go
package main

import (
	"log"
	"sync/atomic"

	"github.com/gorilla/websocket"
)

type IncomingMessage struct {
	RecipientId int    `json:"recipient_id"`
	Message     string `json:"message"`
}

type OutgoingMessage struct {
	SenderId int    `json:"sender_id"`
	Message  string `json:"message"`
}

type Client struct {
	id               int
	conn             *websocket.Conn
	userDirectory    *UserDirectory
	outgoingMessages chan OutgoingMessage
}

func (c *Client) unregister() {
	c.conn.Close()
	c.userDirectory.RemoveUser(c.id)
}

func (c *Client) ReceiveMessages() {
	defer c.unregister()

	for {
		var incomingMessage IncomingMessage
		err := c.conn.ReadJSON(&incomingMessage)

		if err != nil {
			// This occurs when the websocket connection is closed
			log.Printf("Error reading message: %v", err)
			break
		}

		log.Printf("Received message `%s` for user %d", incomingMessage.Message, incomingMessage.RecipientId)

		recipient, err := c.userDirectory.GetUser(incomingMessage.RecipientId)

		if err != nil {
			log.Printf("User %d is not connected; cannot send them a message.", incomingMessage.RecipientId)
			continue
		}

		recipient.outgoingMessages <- OutgoingMessage{
			SenderId: c.id,
			Message:  incomingMessage.Message,
		}
	}
}

func (c *Client) SendMessages() {
	for outgoingMessage := range c.outgoingMessages {
		c.conn.WriteJSON(outgoingMessage)
	}
}

var nextAvailableIdToAssign uint64 = 0

func generateUserID() int {
	return int(atomic.AddUint64(&nextAvailableIdToAssign, 1))
}

func NewClient(userDirectory *UserDirectory, conn *websocket.Conn) *Client {
	id := generateUserID()

	return &Client{
		id:               id,
		outgoingMessages: make(chan OutgoingMessage),
		userDirectory:    userDirectory,
		conn:             conn,
	}
}
user-directory.go
package main

import (
	"errors"
	"sync"
)

// @note: sync.Map could replace this, but I don't like
// the fact that it doesn't have type safety
//
// seems they're discussing that
// https://github.com/golang/go/discussions/48287
type UserDirectory struct {
	lock  sync.RWMutex
	users map[int]*Client
}

// must be a singleton!
func NewUserDirectory() *UserDirectory {
	return &UserDirectory{
		lock:  sync.RWMutex{},
		users: make(map[int]*Client),
	}
}

func (u *UserDirectory) AddUser(client *Client) {
	u.lock.Lock()
	defer u.lock.Unlock()

	u.users[client.id] = client
}

func (u *UserDirectory) RemoveUser(id int) {
	u.lock.Lock()
	defer u.lock.Unlock()

	delete(u.users, id)
}

func (u *UserDirectory) GetUser(id int) (*Client, error) {
	u.lock.RLock()
	defer u.lock.RUnlock()

	user := u.users[id]

	if user == nil {
		return nil, errors.New("user not found")
	}

	return user, nil
}
main.go
package main

import (
	"log"
	"net/http"

	"github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
}

func handler(userDirectory *UserDirectory, w http.ResponseWriter, r *http.Request) {
	conn, _ := upgrader.Upgrade(w, r, nil)

	client := NewClient(userDirectory, conn)
	userDirectory.AddUser(client)

	go client.SendMessages()
	go client.ReceiveMessages()
}

func main() {
	userDirectory := NewUserDirectory()

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		handler(userDirectory, w, r)
	})

	log.Fatal(http.ListenAndServe("localhost:8080", nil))
}

Some notes about this approach:

  • It uses 2n go routines, where n is the number of connections. As I've mentioned this could be n, if we collapsed sending and receiving messages into 1 go routine. This comment applies to the router approach as well.
  • We are using a RWMutex so the only point of contention should be if users are frequently entering and joining the chat server.
    • However, typical sending of messages does not suffer from a single bottleneck.

Comparison

The main difference between these two approaches is the bottleneck of the router. I'm not sure one is strictly superior to the other, because in order to get around the bottleneck of the router I introduced shared state, albeit shared state optimized for reading, which is the main use case.

Check out the Github repo here.

P.S.

Feedback welcome if you guys see something silly I'm doing or have a good idea about another approach!

Want to know when I write a new article?

Get new posts in your inbox