mirror of
https://github.com/datarhei/core.git
synced 2025-10-23 16:03:14 +08:00
Allow to add self to nodes
This commit is contained in:
@@ -90,7 +90,7 @@ type proxy struct {
|
||||
|
||||
limiter net.IPLimiter
|
||||
|
||||
updates chan NodeState
|
||||
updates chan NodeFiles
|
||||
|
||||
lock sync.RWMutex
|
||||
cancel context.CancelFunc
|
||||
@@ -109,7 +109,7 @@ func NewProxy(config ProxyConfig) (Proxy, error) {
|
||||
idupdate: map[string]time.Time{},
|
||||
fileid: map[string]string{},
|
||||
limiter: config.IPLimiter,
|
||||
updates: make(chan NodeState, 64),
|
||||
updates: make(chan NodeFiles, 64),
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
@@ -144,31 +144,32 @@ func (p *proxy) Start() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case state := <-p.updates:
|
||||
case update := <-p.updates:
|
||||
p.logger.Debug().WithFields(log.Fields{
|
||||
"node": state.ID,
|
||||
"state": state.State,
|
||||
"files": len(state.Files),
|
||||
"node": update.ID,
|
||||
"files": len(update.Files),
|
||||
}).Log("Got update")
|
||||
|
||||
if p.id == update.ID {
|
||||
continue
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
|
||||
// Cleanup
|
||||
files := p.idfiles[state.ID]
|
||||
files := p.idfiles[update.ID]
|
||||
for _, file := range files {
|
||||
delete(p.fileid, file)
|
||||
}
|
||||
delete(p.idfiles, state.ID)
|
||||
delete(p.idupdate, state.ID)
|
||||
delete(p.idfiles, update.ID)
|
||||
delete(p.idupdate, update.ID)
|
||||
|
||||
if state.State == "connected" {
|
||||
// Add files
|
||||
for _, file := range state.Files {
|
||||
p.fileid[file] = state.ID
|
||||
}
|
||||
p.idfiles[state.ID] = files
|
||||
p.idupdate[state.ID] = state.LastUpdate
|
||||
// Add files
|
||||
for _, file := range update.Files {
|
||||
p.fileid[file] = update.ID
|
||||
}
|
||||
p.idfiles[update.ID] = files
|
||||
p.idupdate[update.ID] = update.LastUpdate
|
||||
|
||||
p.lock.Unlock()
|
||||
}
|
||||
@@ -192,7 +193,7 @@ func (p *proxy) Stop() {
|
||||
p.cancel = nil
|
||||
|
||||
for _, node := range p.nodes {
|
||||
node.Stop()
|
||||
node.StopFiles()
|
||||
}
|
||||
|
||||
p.nodes = map[string]Node{}
|
||||
@@ -205,10 +206,6 @@ func (p *proxy) Reader() ProxyReader {
|
||||
}
|
||||
|
||||
func (p *proxy) AddNode(id string, node Node) (string, error) {
|
||||
if id == p.id {
|
||||
return "", fmt.Errorf("can't add myself as node or a node with the same ID")
|
||||
}
|
||||
|
||||
if id != node.ID() {
|
||||
return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, node.ID())
|
||||
}
|
||||
@@ -217,7 +214,7 @@ func (p *proxy) AddNode(id string, node Node) (string, error) {
|
||||
defer p.lock.Unlock()
|
||||
|
||||
if n, ok := p.nodes[id]; ok {
|
||||
n.Stop()
|
||||
n.StopFiles()
|
||||
|
||||
delete(p.nodes, id)
|
||||
|
||||
@@ -237,7 +234,7 @@ func (p *proxy) AddNode(id string, node Node) (string, error) {
|
||||
|
||||
p.nodes[id] = node
|
||||
|
||||
node.Start(p.updates)
|
||||
node.StartFiles(p.updates)
|
||||
|
||||
p.logger.Info().WithFields(log.Fields{
|
||||
"address": node.Address(),
|
||||
@@ -256,7 +253,7 @@ func (p *proxy) RemoveNode(id string) error {
|
||||
return ErrNodeNotFound
|
||||
}
|
||||
|
||||
node.Stop()
|
||||
node.StopFiles()
|
||||
|
||||
delete(p.nodes, id)
|
||||
|
||||
|
Reference in New Issue
Block a user