fix: prevent deadlock by timeouting blocking writes

This commit is contained in:
Jannis Mattheis
2024-12-07 12:15:44 +01:00
parent 40ad444c84
commit b6094d54f2
10 changed files with 31 additions and 17 deletions

View File

@@ -32,7 +32,7 @@ func (e *ClientAnswer) Execute(rooms *Rooms, current ClientInfo) error {
return fmt.Errorf("permission denied for session %s", e.SID)
}
room.Users[session.Host].Write <- outgoing.ClientAnswer(*e)
room.Users[session.Host].WriteTimeout(outgoing.ClientAnswer(*e))
return nil
}

View File

@@ -32,7 +32,7 @@ func (e *ClientICE) Execute(rooms *Rooms, current ClientInfo) error {
return fmt.Errorf("permission denied for session %s", e.SID)
}
room.Users[session.Host].Write <- outgoing.ClientICE(*e)
room.Users[session.Host].WriteTimeout(outgoing.ClientICE(*e))
return nil
}

View File

@@ -70,7 +70,7 @@ func (e *Create) Execute(rooms *Rooms, current ClientInfo) error {
Streaming: false,
Owner: true,
Addr: current.Addr,
Write: current.Write,
_write: current.Write,
},
},
}

View File

@@ -20,7 +20,7 @@ func (e *Disconnected) Execute(rooms *Rooms, current ClientInfo) error {
func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) {
roomID := rooms.connected[current.ID]
delete(rooms.connected, current.ID)
current.Write <- outgoing.CloseWriter{Code: e.Code, Reason: e.Reason}
writeTimeout[outgoing.Message](current.Write, outgoing.CloseWriter{Code: e.Code, Reason: e.Reason})
if roomID == "" {
return
@@ -46,14 +46,14 @@ func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) {
if bytes.Equal(session.Client.Bytes(), current.ID.Bytes()) {
host, ok := room.Users[session.Host]
if ok {
host.Write <- outgoing.EndShare(id)
host.WriteTimeout(outgoing.EndShare(id))
}
room.closeSession(rooms, id)
}
if bytes.Equal(session.Host.Bytes(), current.ID.Bytes()) {
client, ok := room.Users[session.Client]
if ok {
client.Write <- outgoing.EndShare(id)
client.WriteTimeout(outgoing.EndShare(id))
}
room.closeSession(rooms, id)
}
@@ -62,7 +62,7 @@ func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) {
if user.Owner && room.CloseOnOwnerLeave {
for _, member := range room.Users {
delete(rooms.connected, member.ID)
member.Write <- outgoing.CloseWriter{Code: websocket.CloseNormalClosure, Reason: CloseOwnerLeft}
member.WriteTimeout(outgoing.CloseWriter{Code: websocket.CloseNormalClosure, Reason: CloseOwnerLeft})
}
rooms.closeRoom(roomID)
return

View File

@@ -5,6 +5,6 @@ type Health struct {
}
func (e *Health) Execute(rooms *Rooms, current ClientInfo) error {
e.Response <- len(rooms.connected)
writeTimeout(e.Response, len(rooms.connected))
return nil
}

View File

@@ -32,7 +32,7 @@ func (e *HostICE) Execute(rooms *Rooms, current ClientInfo) error {
return fmt.Errorf("permission denied for session %s", e.SID)
}
room.Users[session.Client].Write <- outgoing.HostICE(*e)
room.Users[session.Client].WriteTimeout(outgoing.HostICE(*e))
return nil
}

View File

@@ -32,7 +32,7 @@ func (e *HostOffer) Execute(rooms *Rooms, current ClientInfo) error {
return fmt.Errorf("permission denied for session %s", e.SID)
}
room.Users[session.Client].Write <- outgoing.HostOffer(*e)
room.Users[session.Client].WriteTimeout(outgoing.HostOffer(*e))
return nil
}

View File

@@ -38,7 +38,7 @@ func (e *Join) Execute(rooms *Rooms, current ClientInfo) error {
Streaming: false,
Owner: false,
Addr: current.Addr,
Write: current.Write,
_write: current.Write,
}
rooms.connected[current.ID] = room.ID
room.notifyInfoChanged()

View File

@@ -25,7 +25,7 @@ func (e *StopShare) Execute(rooms *Rooms, current ClientInfo) error {
if bytes.Equal(session.Host.Bytes(), current.ID.Bytes()) {
client, ok := room.Users[session.Client]
if ok {
client.Write <- outgoing.EndShare(id)
client.WriteTimeout(outgoing.EndShare(id))
}
room.closeSession(rooms, id)
}

View File

@@ -4,8 +4,10 @@ import (
"fmt"
"net"
"sort"
"time"
"github.com/rs/xid"
"github.com/rs/zerolog/log"
"github.com/screego/server/config"
"github.com/screego/server/ws/outgoing"
)
@@ -60,8 +62,8 @@ func (r *Room) newSession(host, client xid.ID, rooms *Rooms, v4, v6 net.IP) {
Username: clientName,
}}
}
r.Users[host].Write <- outgoing.HostSession{Peer: client, ID: id, ICEServers: iceHost}
r.Users[client].Write <- outgoing.ClientSession{Peer: host, ID: id, ICEServers: iceClient}
r.Users[host].WriteTimeout(outgoing.HostSession{Peer: client, ID: id, ICEServers: iceHost})
r.Users[client].WriteTimeout(outgoing.ClientSession{Peer: host, ID: id, ICEServers: iceClient})
}
func (r *Rooms) addresses(prefix string, v4, v6 net.IP, tcp bool) (result []string) {
@@ -122,10 +124,10 @@ func (r *Room) notifyInfoChanged() {
return left.Name < right.Name
})
current.Write <- outgoing.Room{
current.WriteTimeout(outgoing.Room{
ID: r.ID,
Users: users,
}
})
}
}
@@ -135,5 +137,17 @@ type User struct {
Name string
Streaming bool
Owner bool
Write chan<- outgoing.Message
_write chan<- outgoing.Message
}
func (u *User) WriteTimeout(msg outgoing.Message) {
writeTimeout(u._write, msg)
}
func writeTimeout[T any](ch chan<- T, msg T) {
select {
case <-time.After(2 * time.Second):
log.Warn().Interface("event", fmt.Sprintf("%T", msg)).Interface("payload", msg).Msg("Client write loop didn't accept the message.")
case ch <- msg:
}
}