Files
MirageServer/controller/protocol_common_utils.go

341 lines
8.3 KiB
Go

package controller
import (
"encoding/binary"
"encoding/json"
"sort"
"strings"
"sync"
"time"
"github.com/klauspost/compress/zstd"
"github.com/rs/zerolog/log"
"tailscale.com/smallzstd"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
)
// mapResponseStreamState tracks state associated with a stream of MapResponse messages,
// which may optionally send only deltas from the previous message.
type mapResponseStreamState struct {
// peersByID is the peers sent in the last stream message,
// for comparison in generating deltas in the new message.
peersByID map[int64]Machine
}
func (h *Mirage) generateMapResponse(
mapRequest tailcfg.MapRequest,
machine *Machine,
streamState *mapResponseStreamState,
) (*tailcfg.MapResponse, error) {
log.Trace().
Str("func", "generateMapResponse").
Str("machine", mapRequest.Hostinfo.Hostname).
Msg("Creating Map response")
//cgao6: change to use User's DNSConfig
node, err := h.toNode(*machine) //h.cfg.BaseDomain, h.cfg.DNSConfig)
if err != nil {
log.Error().
Caller().
Str("func", "generateMapResponse").
Err(err).
Msg("Cannot convert to node")
return nil, err
}
org, err := h.GetOrgnaizationByID(machine.User.OrganizationID)
if err != nil {
log.Error().
Caller().
Str("func", "generateMapResponse").
Err(err).
Msg("Cannot get organization of the machine")
return nil, err
}
// enableSelf 表示该节点的用户是否启用了self
enableSelf, err := h.UpdateACLRulesOfOrg(org, &machine.User)
if err != nil {
log.Error().
Caller().
Str("func", "generateMapResponse").
Err(err).
Msg("Cannot get ACL rules")
return nil, err
}
// organization be set to field :machine.User.Organization
machine.User.Organization = *org
peers, invalidNodeIDs, err := h.getValidPeers(machine, enableSelf)
if invalidNodeIDs != nil {
log.Trace().Msg("Should ignore invalidNodeIDs for current")
}
if err != nil {
log.Error().
Caller().
Str("func", "generateMapResponse").
Err(err).
Msg("Cannot fetch peers")
return nil, err
}
profiles := h.getMapResponseUserProfiles(*machine, peers)
//cgao6: use User's DNSconfig instead
dnsConfig := getMapResponseDNSConfig(
h.cfg.IPPrefixes, //
// h.cfg.DNSConfig,
// h.cfg.BaseDomain,
*machine,
peers,
)
now := time.Now()
//org := &machine.User.Organization
derpMap, err := h.LoadOrgDERPs(machine.User.OrganizationID)
if err != nil {
log.Error().
Caller().
Str("func", "generateMapResponse").
Err(err).
Msg("Failed to get DERP map of machine")
}
resp := tailcfg.MapResponse{
KeepAlive: false,
Node: node,
// TODO: Only send if updated
DERPMap: derpMap, //cgao6: h.DERPMap,
// TODO(kradalby): Implement:
// https://github.com/tailscale/tailscale/blob/main/tailcfg/tailcfg.go#L1351-L1374
// PeersChanged
// PeersRemoved
// PeersChangedPatch
// PeerSeenChange
// OnlineChange
// TODO: Only send if updated
DNSConfig: dnsConfig,
// TODO: Only send if updated
Domain: org.Name,
// Do not instruct clients to collect services, we do not
// support or do anything with them
CollectServices: "false",
// TODO: Only send if updated
PacketFilter: org.AclRules,
UserProfiles: profiles,
// TODO: Only send if updated
SSHPolicy: org.SshPolicy,
ControlTime: &now,
Debug: &tailcfg.Debug{
DisableLogTail: true,
SetRandomizeClientPort: "true",
},
}
toNodes := func(machines Machines) ([]*tailcfg.Node, error) {
return h.toNodes(machines) //, h.cfg.BaseDomain, h.cfg.DNSConfig)
}
resp, err = applyMapResponseDelta(resp, streamState, peers, toNodes)
if err != nil {
log.Error().
Caller().
Str("func", "generateMapResponse").
Err(err).
Msg("Cannot apply map response deltas")
return nil, err
}
resp.ClientVersion = &tailcfg.ClientVersion{}
if mapRequest.Hostinfo.OS == "windows" {
if IsUpdateAvailable(mapRequest.Hostinfo.IPNVersion, h.cfg.ClientVersion.Win.Version) {
resp.ClientVersion.RunningLatest = false
resp.ClientVersion.LatestVersion = strings.Split(h.cfg.ClientVersion.Win.Version, "-")[0]
resp.ClientVersion.NotifyURL = h.cfg.ClientVersion.Win.Url
} else {
resp.ClientVersion.RunningLatest = true
}
}
log.Trace().
Str("func", "generateMapResponse").
Str("machine", mapRequest.Hostinfo.Hostname).
// Interface("payload", resp).
Msgf("Generated map response: %s", tailMapResponseToString(resp))
return &resp, nil
}
func (h *Mirage) getMapResponseData(
mapRequest tailcfg.MapRequest,
machine *Machine,
streamState *mapResponseStreamState,
) ([]byte, error) {
mapResponse, err := h.generateMapResponse(mapRequest, machine, streamState)
if err != nil {
return nil, err
}
return h.marshalMapResponse(mapResponse, key.MachinePublic{}, mapRequest.Compress)
}
func (h *Mirage) getMapKeepAliveResponseData(
mapRequest tailcfg.MapRequest,
machine *Machine,
) ([]byte, error) {
keepAliveResponse := tailcfg.MapResponse{
KeepAlive: true,
}
return h.marshalMapResponse(keepAliveResponse, key.MachinePublic{}, mapRequest.Compress)
}
func (h *Mirage) marshalResponse(
resp interface{},
machineKey key.MachinePublic,
) ([]byte, error) {
jsonBody, err := json.Marshal(resp)
if err != nil {
log.Error().
Caller().
Err(err).
Msg("Cannot marshal response")
return nil, err
}
return jsonBody, nil
}
func (h *Mirage) marshalMapResponse(
resp interface{},
machineKey key.MachinePublic,
compression string,
) ([]byte, error) {
jsonBody, err := json.Marshal(resp)
if err != nil {
log.Error().
Caller().
Err(err).
Msg("Cannot marshal map response")
}
var respBody []byte
if compression == ZstdCompression {
respBody = zstdEncode(jsonBody)
} else {
respBody = jsonBody
}
data := make([]byte, reservedResponseHeaderSize)
binary.LittleEndian.PutUint32(data, uint32(len(respBody)))
data = append(data, respBody...)
return data, nil
}
func zstdEncode(in []byte) []byte {
encoder, ok := zstdEncoderPool.Get().(*zstd.Encoder)
if !ok {
panic("invalid type in sync pool")
}
out := encoder.EncodeAll(in, nil)
_ = encoder.Close()
zstdEncoderPool.Put(encoder)
return out
}
var zstdEncoderPool = &sync.Pool{
New: func() any {
encoder, err := smallzstd.NewEncoder(
nil,
zstd.WithEncoderLevel(zstd.SpeedFastest))
if err != nil {
panic(err)
}
return encoder
},
}
// applyMapResponseDelta returns a modified MapResponse
// with fields modified which make use of delta (send on changes).
//
// mapResponse the current mapResponse with delta fields not set.
// streamState optional previous state of mapResponse sent in this stream. Set to nil for a "full update" (no deltas).
// currentPeers list of peers currently available for the node that this mapResponse is for.
// toNodes a function to convert the Headscale Machines structure to Tailscale Nodes structure.
func applyMapResponseDelta(
mapResponse tailcfg.MapResponse,
streamState *mapResponseStreamState,
currentPeers Machines,
toNodes func(Machines) ([]*tailcfg.Node, error)) (tailcfg.MapResponse, error) {
// Peer delta
currentPeersByID := machinesByID(currentPeers)
if streamState.peersByID == nil {
// 1st map, send full nodes
nodePeers, err := toNodes(currentPeers)
if err != nil {
return tailcfg.MapResponse{}, err
}
mapResponse.Peers = nodePeers
} else {
// Update PeersChanged with any peers which were removed or changed
var peersChanged []Machine
for id, peer := range currentPeersByID {
previousPeer, hadPrevious := streamState.peersByID[id]
if !hadPrevious || previousPeer.LastSuccessfulUpdate.Before(*peer.LastSuccessfulUpdate) {
peersChanged = append(peersChanged, peer)
}
}
nodesChanged, err := toNodes(peersChanged)
if err != nil {
return tailcfg.MapResponse{}, err
}
sort.Slice(nodesChanged, func(i, j int) bool {
return nodesChanged[i].ID < nodesChanged[j].ID
})
mapResponse.PeersChanged = nodesChanged
// Update PeersRemoved with any peers which are no longer present
for id := range streamState.peersByID {
if _, has := currentPeersByID[id]; !has {
mapResponse.PeersRemoved = append(mapResponse.PeersRemoved, tailcfg.NodeID(id))
}
}
}
// Update streamState for use in the next message
streamState.peersByID = currentPeersByID
// TODO(kallen): Also Implement the following deltas for even smaller
// message sizes:
//
// PeersChangedPatch
// PeerSeenChange
// OnlineChange
return mapResponse, nil
}