Updated server.go file to remove import from acl and pubsub packages. Use ACL and PubSub interfaces instead. Created NewServer function that returns and instance on server

This commit is contained in:
Kelvin Clement Mwinuka
2024-01-15 12:03:08 +08:00
parent aea3feec0e
commit b1af2c907e

View File

@@ -29,7 +29,7 @@ type Server struct {
keyLocks map[string]*sync.RWMutex keyLocks map[string]*sync.RWMutex
keyCreationLock *sync.Mutex keyCreationLock *sync.Mutex
commands []utils.Command Commands []utils.Command
raft *raft.Raft raft *raft.Raft
memberList *memberlist.MemberList memberList *memberlist.MemberList
@@ -48,6 +48,67 @@ type Server struct {
AOFEngine *aof.Engine AOFEngine *aof.Engine
} }
type Opts struct {
Config utils.Config
ACL utils.ACL
PubSub utils.PubSub
CancelCh *chan os.Signal
Commands []utils.Command
}
func NewServer(opts Opts) *Server {
server := &Server{
Config: opts.Config,
ACL: opts.ACL,
PubSub: opts.PubSub,
CancelCh: opts.CancelCh,
Commands: opts.Commands,
store: make(map[string]interface{}),
keyLocks: make(map[string]*sync.RWMutex),
keyCreationLock: &sync.Mutex{},
}
if server.IsInCluster() {
server.raft = raft.NewRaft(raft.Opts{
Config: opts.Config,
Server: server,
GetCommand: server.getCommand,
})
server.memberList = memberlist.NewMemberList(memberlist.MemberlistOpts{
Config: opts.Config,
HasJoinedCluster: server.raft.HasJoinedCluster,
AddVoter: server.raft.AddVoter,
RemoveRaftServer: server.raft.RemoveServer,
IsRaftLeader: server.raft.IsRaftLeader,
ApplyMutate: server.raftApply,
})
} else {
// Set up standalone snapshot engine
server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{
Config: opts.Config,
StartSnapshot: server.StartSnapshot,
FinishSnapshot: server.FinishSnapshot,
GetState: server.GetState,
SetLatestSnapshotMilliseconds: server.SetLatestSnapshot,
GetLatestSnapshotMilliseconds: server.GetLatestSnapshot,
CreateKeyAndLock: server.CreateKeyAndLock,
KeyUnlock: server.KeyUnlock,
SetValue: server.SetValue,
})
// Set up standalone AOF engine
server.AOFEngine = aof.NewAOFEngine(aof.Opts{
Config: opts.Config,
GetState: server.GetState,
StartRewriteAOF: server.StartRewriteAOF,
FinishRewriteAOF: server.FinishRewriteAOF,
CreateKeyAndLock: server.CreateKeyAndLock,
KeyUnlock: server.KeyUnlock,
SetValue: server.SetValue,
HandleCommand: server.handleCommand,
})
}
return server
}
func (server *Server) StartTCP(ctx context.Context) { func (server *Server) StartTCP(ctx context.Context) {
conf := server.Config conf := server.Config
@@ -166,12 +227,6 @@ func (server *Server) handleConnection(ctx context.Context, conn net.Conn) {
func (server *Server) Start(ctx context.Context) { func (server *Server) Start(ctx context.Context) {
conf := server.Config conf := server.Config
server.store = make(map[string]interface{})
server.keyLocks = make(map[string]*sync.RWMutex)
server.keyCreationLock = &sync.Mutex{}
server.LoadModules(ctx)
if conf.TLS && len(conf.CertKeyPairs) <= 0 { if conf.TLS && len(conf.CertKeyPairs) <= 0 {
log.Fatal("must provide certificate and key file paths for TLS mode") log.Fatal("must provide certificate and key file paths for TLS mode")
return return
@@ -179,34 +234,10 @@ func (server *Server) Start(ctx context.Context) {
if server.IsInCluster() { if server.IsInCluster() {
// Initialise raft and memberlist // Initialise raft and memberlist
server.raft = raft.NewRaft(raft.Opts{
Config: conf,
Server: server,
GetCommand: server.getCommand,
})
server.memberList = memberlist.NewMemberList(memberlist.MemberlistOpts{
Config: conf,
HasJoinedCluster: server.raft.HasJoinedCluster,
AddVoter: server.raft.AddVoter,
RemoveRaftServer: server.raft.RemoveServer,
IsRaftLeader: server.raft.IsRaftLeader,
ApplyMutate: server.raftApply,
})
server.raft.RaftInit(ctx) server.raft.RaftInit(ctx)
server.memberList.MemberListInit(ctx) server.memberList.MemberListInit(ctx)
} else { } else {
// Initialize and start standalone snapshot engine // Initialize and start standalone snapshot engine
server.SnapshotEngine = snapshot.NewSnapshotEngine(snapshot.Opts{
Config: conf,
StartSnapshot: server.StartSnapshot,
FinishSnapshot: server.FinishSnapshot,
GetState: server.GetState,
SetLatestSnapshotMilliseconds: server.SetLatestSnapshot,
GetLatestSnapshotMilliseconds: server.GetLatestSnapshot,
CreateKeyAndLock: server.CreateKeyAndLock,
KeyUnlock: server.KeyUnlock,
SetValue: server.SetValue,
})
if conf.RestoreSnapshot { if conf.RestoreSnapshot {
err := server.SnapshotEngine.Restore(ctx) err := server.SnapshotEngine.Restore(ctx)
if err != nil { if err != nil {
@@ -214,18 +245,7 @@ func (server *Server) Start(ctx context.Context) {
} }
} }
server.SnapshotEngine.Start(ctx) server.SnapshotEngine.Start(ctx)
// Initialize standalone AOF engine // Initialize standalone AOF engine
server.AOFEngine = aof.NewAOFEngine(aof.Opts{
Config: conf,
GetState: server.GetState,
StartRewriteAOF: server.StartRewriteAOF,
FinishRewriteAOF: server.FinishRewriteAOF,
CreateKeyAndLock: server.CreateKeyAndLock,
KeyUnlock: server.KeyUnlock,
SetValue: server.SetValue,
HandleCommand: server.handleCommand,
})
if conf.RestoreAOF && !conf.RestoreSnapshot { if conf.RestoreAOF && !conf.RestoreSnapshot {
err := server.AOFEngine.Restore(ctx) err := server.AOFEngine.Restore(ctx)
if err != nil { if err != nil {