add tcp server

This commit is contained in:
hdt3213
2019-06-02 11:42:04 +08:00
committed by wyb
parent 4e26f7b859
commit 65c23dcf6b
13 changed files with 433 additions and 0 deletions

5
.gitignore vendored
View File

@@ -10,3 +10,8 @@
# Output of the go coverage tool, specifically when used with LiteIDE # Output of the go coverage tool, specifically when used with LiteIDE
*.out *.out
.idea/
*.iml
logs/
target

9
.idea/dictionaries/hdt3213.xml generated Normal file
View File

@@ -0,0 +1,9 @@
<component name="ProjectDictionaryState">
<dictionary name="hdt3213">
<words>
<w>godis</w>
<w>shuting</w>
<w>yaml</w>
</words>
</dictionary>
</component>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@@ -1,2 +1,9 @@
# godis # godis
A Golang implements K-V cache, compatible with redis protocol A Golang implements K-V cache, compatible with redis protocol
# Structure
- server: tcp server
- protocol: redis protocol parser
- db: data container

22
src/cmd/main.go Normal file
View File

@@ -0,0 +1,22 @@
package main
import (
"github.com/HDT3213/godis/src/server"
"time"
"github.com/HDT3213/godis/src/lib/logger"
)
func main() {
logger.Setup(&logger.Settings{
Path: "logs",
Name: "godis",
Ext: ".log",
TimeFormat: "2006-01-02",
})
server.ListenAndServe(&server.Config{
Address: ":6399",
MaxConnect: 16,
Timeout: 2 * time.Second,
}, server.MakeEchoHandler())
}

View File

@@ -0,0 +1,13 @@
package tcp
import (
"net"
"context"
)
type HandleFunc func(ctx context.Context, conn net.Conn)
type Handler interface {
Handle(ctx context.Context, conn net.Conn)
Close()error
}

78
src/lib/files/files.go Normal file
View File

@@ -0,0 +1,78 @@
package files
import (
"mime/multipart"
"io/ioutil"
"path"
"os"
"fmt"
)
func GetSize(f multipart.File) (int, error) {
content, err := ioutil.ReadAll(f)
return len(content), err
}
func GetExt(fileName string) string {
return path.Ext(fileName)
}
func CheckNotExist(src string) bool {
_, err := os.Stat(src)
return os.IsNotExist(err)
}
func CheckPermission(src string) bool {
_, err := os.Stat(src)
return os.IsPermission(err)
}
func IsNotExistMkDir(src string) error {
if notExist := CheckNotExist(src); notExist == true {
if err := MkDir(src); err != nil {
return err
}
}
return nil
}
func MkDir(src string) error {
err := os.MkdirAll(src, os.ModePerm)
if err != nil {
return err
}
return nil
}
func Open(name string, flag int, perm os.FileMode) (*os.File, error) {
f, err := os.OpenFile(name, flag, perm)
if err != nil {
return nil, err
}
return f, nil
}
func MustOpen(fileName, dir string) (*os.File, error) {
perm := CheckPermission(dir)
if perm == true {
return nil, fmt.Errorf("permission denied dir: %s", dir)
}
err := IsNotExistMkDir(dir)
if err != nil {
return nil, fmt.Errorf("error during make dir %s, err: %s", dir, err)
}
f, err := Open(dir + string(os.PathSeparator) + fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("fail to open file, err: %s", err)
}
return f, nil
}

90
src/lib/logger/logger.go Normal file
View File

@@ -0,0 +1,90 @@
package logger
import (
"fmt"
"log"
"os"
"path/filepath"
"runtime"
"time"
"github.com/go-gin-demo/lib/files"
"io"
)
type Settings struct {
Path string `yaml:"path"`
Name string `yaml:"name"`
Ext string `yaml:"ext"`
TimeFormat string `yaml:"time-format"`
}
var (
F *os.File
DefaultPrefix = ""
DefaultCallerDepth = 2
logger *log.Logger
logPrefix = ""
levelFlags = []string{"DEBUG", "INFO", "WARN", "ERROR", "FATAL"}
)
type Level int
const (
DEBUG Level = iota
INFO
WARNING
ERROR
FATAL
)
func Setup(settings *Settings) {
var err error
dir := settings.Path
fileName := fmt.Sprintf("%s-%s.%s",
settings.Name,
time.Now().Format(settings.TimeFormat),
settings.Ext)
logFile, err := files.MustOpen(fileName, dir)
if err != nil {
log.Fatalf("logging.Setup err: %s", err)
}
mw := io.MultiWriter(os.Stdout, logFile)
logger = log.New(mw, DefaultPrefix, log.LstdFlags)
}
func setPrefix(level Level) {
_, file, line, ok := runtime.Caller(DefaultCallerDepth)
if ok {
logPrefix = fmt.Sprintf("[%s][%s:%d] ", levelFlags[level], filepath.Base(file), line)
} else {
logPrefix = fmt.Sprint("[%s] ", levelFlags[level])
}
logger.SetPrefix(logPrefix)
}
func Debug(v ...interface{}) {
setPrefix(DEBUG)
logger.Println(v)
}
func Info(v ...interface{}) {
setPrefix(INFO)
logger.Println(v)
}
func Warn(v ...interface{}) {
setPrefix(WARNING)
logger.Println(v)
}
func Error(v ...interface{}) {
setPrefix(ERROR)
logger.Println(v)
}
func Fatal(v ...interface{}) {
setPrefix(FATAL)
logger.Fatalln(v)
}

View File

@@ -0,0 +1,19 @@
package atomic
import "sync/atomic"
type AtomicBool uint32
func (b *AtomicBool)Get()bool {
return atomic.LoadUint32((*uint32)(b)) != 0
}
func (b *AtomicBool)Set(v bool) {
if v {
atomic.StoreUint32((*uint32)(b), 1)
} else {
atomic.StoreUint32((*uint32)(b), 0)
}
}

37
src/lib/sync/wait/wait.go Normal file
View File

@@ -0,0 +1,37 @@
package wait
import (
"sync"
"time"
)
type Wait struct {
wg sync.WaitGroup
}
func (w *Wait)Add(delta int) {
w.wg.Add(delta)
}
func (w *Wait)Done() {
w.wg.Done()
}
func (w *Wait)Wait() {
w.wg.Wait()
}
func (w *Wait)WaitWithTimeout(timeout time.Duration)bool {
c := make(chan bool)
go func() {
defer close(c)
w.wg.Wait()
c <- true
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}

79
src/server/echo.go Normal file
View File

@@ -0,0 +1,79 @@
package server
import (
"net"
"context"
"bufio"
"github.com/HDT3213/godis/src/lib/logger"
"sync"
"io"
"github.com/HDT3213/godis/src/lib/sync/atomic"
"time"
"github.com/HDT3213/godis/src/lib/sync/wait"
)
type EchoHandler struct {
activeConn sync.Map
closing atomic.AtomicBool
}
func MakeEchoHandler()(*EchoHandler) {
return &EchoHandler{
}
}
type Client struct {
Conn net.Conn
Waiting wait.Wait
}
func (c *Client)Close()error {
c.Waiting.WaitWithTimeout(10 * time.Second)
c.Conn.Close()
return nil
}
func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) {
if h.closing.Get() {
// closing handler refuse new connection
conn.Close()
}
client := &Client {
Conn: conn,
}
h.activeConn.Store(client, 1)
reader := bufio.NewReader(conn)
for {
// may occurs: client EOF, client timeout, server early close
msg, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
logger.Info("connection close")
h.activeConn.Delete(conn)
} else {
logger.Warn(err)
}
return
}
client.Waiting.Add(1)
//logger.Info("sleeping")
//time.Sleep(10 * time.Second)
b := []byte(msg)
conn.Write(b)
client.Waiting.Done()
}
}
func (h *EchoHandler)Close()error {
logger.Info("handler shuting down...")
h.closing.Set(true)
// TODO: concurrent wait
h.activeConn.Range(func(key interface{}, val interface{})bool {
client := key.(*Client)
client.Close()
return true
})
return nil
}

62
src/server/server.go Normal file
View File

@@ -0,0 +1,62 @@
package server
import (
"net"
"fmt"
"github.com/HDT3213/godis/src/interface/tcp"
"time"
"context"
"github.com/HDT3213/godis/src/lib/logger"
"os"
"os/signal"
"syscall"
"github.com/HDT3213/godis/src/lib/sync/atomic"
)
type Config struct {
Address string `yaml:"address"`
MaxConnect uint32 `yaml:"max-connect"`
Timeout time.Duration `yaml:"timeout"`
}
func ListenAndServe(cfg *Config, handler tcp.Handler) {
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
logger.Fatal(fmt.Sprintf("listen err: %v", err))
}
// listen signal
var closing atomic.AtomicBool
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
go func() {
sig := <-sigCh
switch sig {
case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
logger.Info("shuting down...")
closing.Set(true)
listener.Close() // listener.Accept() will return err immediately
}
}()
// listen port
logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
// closing listener than closing handler while shuting down
defer handler.Close()
defer listener.Close() // close listener during unexpected error
ctx, _ := context.WithCancel(context.Background())
for {
conn, err := listener.Accept()
if err != nil {
if closing.Get() {
return // handler will be closed by defer
}
logger.Error(fmt.Sprintf("accept err: %v", err))
continue
}
// handle
logger.Info("accept link")
go handler.Handle(ctx, conn)
}
}

6
vendor/vendor.json vendored Normal file
View File

@@ -0,0 +1,6 @@
{
"comment": "",
"ignore": "test",
"package": [],
"rootPath": "github.com/HDT3213/godis"
}