Two Approaches To Building A Chat Server
Let’s build a chat server.
Our server won’t have any persistence layer, meaning:
- no chat history
- no backlogged messages going to offline users when they log on
- you can only send messages to other online users
For simplicity’s sake, we’ll assign users integer ids (from a sequence) starting at 1.
Our API will be very simple:
- users connect to
ws://localhost:8080
- you send a message in this format, using JSON
{ "recipient_id": 2, "message": "Go Braves!"}
If you receive a message, it will be in this format
{ "sender_id": 1, "message": "Go Braves!"}
Now, this is JSON encoded as text. I realize websockets only support text and binary.
Router Approach
Of the two approaches that come to mind for this problem, the first one I’m going to call the “router” approach.
We’ll have one goroutine for receiving messages, one for sending them, and one for routing in between recipient and sender goroutines for different users.
1package main2
3import (4 "log"5 "sync/atomic"6
7 "github.com/gorilla/websocket"8)9
10type IncomingMessage struct {11 RecipientId int `json:"recipient_id"`12 Message string `json:"message"`13}14
15type OutgoingMessage struct {16 SenderId int `json:"sender_id"`17 Message string `json:"message"`18}19
20type Client struct {21 id int22 router *Router23 conn *websocket.Conn24 outgoingMessages chan OutgoingMessage25}26
27func (c *Client) unregister() {28 c.conn.Close()29 c.router.removeClient <- c30}31
32func (c *Client) ReceiveMessages() {33 defer c.unregister()34
35 for {36 var incomingMessage IncomingMessage37 err := c.conn.ReadJSON(&incomingMessage)38
39 if err != nil {40 // This occurs when the websocket connection is closed41 log.Printf("Error reading message: %v", err)42 break43 }44
45 log.Printf("Received message `%s` for user %d", incomingMessage.Message, incomingMessage.RecipientId)46
47 c.router.messages <- Message{48 SenderId: c.id,49 RecipientId: incomingMessage.RecipientId,50 Message: incomingMessage.Message,51 }52 }53}54
55func (c *Client) SendMessages() {56 for outgoingMessage := range c.outgoingMessages {57 c.conn.WriteJSON(outgoingMessage)58 }59}60
61var nextAvailableIdToAssign uint64 = 062
63func generateUserID() int {64 return int(atomic.AddUint64(&nextAvailableIdToAssign, 1))65}66
67func NewClient(router *Router, conn *websocket.Conn) *Client {68 id := generateUserID()69
70 return &Client{71 id: id,72 outgoingMessages: make(chan OutgoingMessage),73 router: router,74 conn: conn,75 }76}
1package main2
3import "log"4
5type Message struct {6 RecipientId int7 SenderId int8 Message string9}10
11type Router struct {12 messages chan Message13 addClient chan *Client14 removeClient chan *Client15 clients map[int]*Client16}17
18func NewRouter() *Router {19 return &Router{20 clients: make(map[int]*Client),21 messages: make(chan Message, 10),22 addClient: make(chan *Client),23 removeClient: make(chan *Client),24 }25}26
27func (r *Router) Run() {28 for {29 select {30 case clientToAdd := <-r.addClient:31 r.clients[clientToAdd.id] = clientToAdd32 case clientToRemove := <-r.removeClient:33 delete(r.clients, clientToRemove.id)34 case incomingMessage := <-r.messages:35 recipient := r.clients[incomingMessage.RecipientId]36
37 if recipient == nil {38 log.Printf("User %d is not connected; cannot send them a message.", incomingMessage.RecipientId)39 continue40 }41
42 recipient.outgoingMessages <- OutgoingMessage{43 SenderId: incomingMessage.SenderId,44 Message: incomingMessage.Message,45 }46 }47 }48}
1package main2
3import (4 "log"5 "net/http"6
7 "github.com/gorilla/websocket"8)9
10var upgrader = websocket.Upgrader{11 ReadBufferSize: 1024,12 WriteBufferSize: 1024,13}14
15func handler(router *Router, w http.ResponseWriter, r *http.Request) {16 conn, _ := upgrader.Upgrade(w, r, nil)17
18 client := NewClient(router, conn)19
20 router.addClient <- client21
22 go client.SendMessages()23 go client.ReceiveMessages()24}25
26func main() {27 router := NewRouter()28 go router.Run()29
30 http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {31 handler(router, w, r)32 })33
34 log.Fatal(http.ListenAndServe("localhost:8080", nil))35}
Some notes about this approach:
- There are
2n + 1
goroutines running, wheren
is the number of connections. - Only the
go router.Run()
goroutine has access to all the clients. - Even registering and de-registering a user occurs via channels.
- The
Router
is potentially a bottleneck - 2 unrelated high flux conversations could cause latency for each other.
Let’s see how that stacks up against our second approach.
The User Directory Approach
In this approach, we get rid of the potential Router
bottleneck.
Each ReceiveMessages
goroutine can send data directly to the SendMessages
goroutine of the recipient user. It uses a shared data structure, which I’m calling a “user directory”, to get the right channel.
However, can just we write directly to the connection of the correct user in the ReceiveMessages
goroutine? Why an extra SendMessages
goroutine?
That question actually applies to the previous example as well - “why not send data to the connection directly in the Router
?”
Basically, because we’re breaking the contract of the library we’re using. Here’s the direct quote from the gorilla websocket docs:
Connections support one concurrent reader and one concurrent writer.
Applications are responsible for ensuring that no more than one goroutine calls the write methods (NextWriter, SetWriteDeadline, WriteMessage, WriteJSON, EnableWriteCompression, SetCompressionLevel) concurrently and that no more than one goroutine calls the read methods (NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler) concurrently.
So, we can’t let multipe goroutines write to a single connection. Now, we could have a single goroutine for sending and receiving called something like ManageConnection
, with a select
statement in a for
loop, but I believe separating them is cleaner.
1package main2
3import (4 "log"5 "sync/atomic"6
7 "github.com/gorilla/websocket"8)9
10type IncomingMessage struct {11 RecipientId int `json:"recipient_id"`12 Message string `json:"message"`13}14
15type OutgoingMessage struct {16 SenderId int `json:"sender_id"`17 Message string `json:"message"`18}19
20type Client struct {21 id int22 conn *websocket.Conn23 userDirectory *UserDirectory24 outgoingMessages chan OutgoingMessage25}26
27func (c *Client) unregister() {28 c.conn.Close()29 c.userDirectory.RemoveUser(c.id)30}31
32func (c *Client) ReceiveMessages() {33 defer c.unregister()34
35 for {36 var incomingMessage IncomingMessage37 err := c.conn.ReadJSON(&incomingMessage)38
39 if err != nil {40 // This occurs when the websocket connection is closed41 log.Printf("Error reading message: %v", err)42 break43 }44
45 log.Printf("Received message `%s` for user %d", incomingMessage.Message, incomingMessage.RecipientId)46
47 recipient, err := c.userDirectory.GetUser(incomingMessage.RecipientId)48
49 if err != nil {50 log.Printf("User %d is not connected; cannot send them a message.", incomingMessage.RecipientId)51 continue52 }53
54 recipient.outgoingMessages <- OutgoingMessage{55 SenderId: c.id,56 Message: incomingMessage.Message,57 }58 }59}60
61func (c *Client) SendMessages() {62 for outgoingMessage := range c.outgoingMessages {63 c.conn.WriteJSON(outgoingMessage)64 }65}66
67var nextAvailableIdToAssign uint64 = 068
69func generateUserID() int {70 return int(atomic.AddUint64(&nextAvailableIdToAssign, 1))71}72
73func NewClient(userDirectory *UserDirectory, conn *websocket.Conn) *Client {74 id := generateUserID()75
76 return &Client{77 id: id,78 outgoingMessages: make(chan OutgoingMessage),79 userDirectory: userDirectory,80 conn: conn,81 }82}
1package main2
3import (4 "errors"5 "sync"6)7
8// @note: sync.Map could replace this, but I don't like9// the fact that it doesn't have type safety10//11// seems they're discussing that12// https://github.com/golang/go/discussions/4828713type UserDirectory struct {14 lock sync.RWMutex15 users map[int]*Client16}17
18// must be a singleton!19func NewUserDirectory() *UserDirectory {20 return &UserDirectory{21 lock: sync.RWMutex{},22 users: make(map[int]*Client),23 }24}25
26func (u *UserDirectory) AddUser(client *Client) {27 u.lock.Lock()28 defer u.lock.Unlock()29
30 u.users[client.id] = client31}32
33func (u *UserDirectory) RemoveUser(id int) {34 u.lock.Lock()35 defer u.lock.Unlock()36
37 delete(u.users, id)38}39
40func (u *UserDirectory) GetUser(id int) (*Client, error) {41 u.lock.RLock()42 defer u.lock.RUnlock()43
44 user := u.users[id]45
46 if user == nil {47 return nil, errors.New("user not found")48 }49
50 return user, nil51}
1package main2
3import (4 "log"5 "net/http"6
7 "github.com/gorilla/websocket"8)9
10var upgrader = websocket.Upgrader{11 ReadBufferSize: 1024,12 WriteBufferSize: 1024,13}14
15func handler(userDirectory *UserDirectory, w http.ResponseWriter, r *http.Request) {16 conn, _ := upgrader.Upgrade(w, r, nil)17
18 client := NewClient(userDirectory, conn)19 userDirectory.AddUser(client)20
21 go client.SendMessages()22 go client.ReceiveMessages()23}24
25func main() {26 userDirectory := NewUserDirectory()27
28 http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {29 handler(userDirectory, w, r)30 })31
32 log.Fatal(http.ListenAndServe("localhost:8080", nil))33}
Some notes about this approach:
- It uses
2n
go routines, wheren
is the number of connections. As I’ve mentioned this could ben
, if we collapsed sending and receiving messages into 1 go routine. This comment applies to the router approach as well. - We are using a
RWMutex
so the only point of contention should be if users are frequently entering and joining the chat server.- However, typical sending of messages does not suffer from a single bottleneck.
Comparison
The main difference between these two approaches is the bottleneck of the router. I’m not sure one is strictly superior to the other, because in order to get around the bottleneck of the router I introduced shared state, albeit shared state optimized for reading, which is the main use case.
Check out the Github repo here.
P.S.
Feedback welcome if you guys see something silly I’m doing or have a good idea about another approach!
Wow! You read the whole thing. People who make it this far sometimes
want to receive emails when I post something new.
I also have an RSS feed.