Initial commit, pt. 49

This commit is contained in:
Dmitrii Okunev
2024-07-21 02:39:00 +01:00
parent 710a3bbe51
commit 53954a6a0d
5 changed files with 27 additions and 3 deletions

View File

@@ -94,7 +94,7 @@ func runSplitProcesses(
f := getFork(procName) f := getFork(procName)
if f != nil { if f != nil {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
//f.Process.Kill() f.Process.Kill()
logger.Debugf(ctx, "waiting for process '%s' to die", procName) logger.Debugf(ctx, "waiting for process '%s' to die", procName)
f.Wait() f.Wait()
} }
@@ -203,4 +203,16 @@ func setReady(
if err != nil { if err != nil {
logger.Fatal(ctx, err) logger.Fatal(ctx, err)
} }
err = mainProcess.ReadOne(
ctx,
func(ctx context.Context, source mainprocess.ProcessName, content any) error {
_, ok := content.(mainprocess.MessageReadyConfirmed)
if !ok {
return fmt.Errorf("got unexpected type '%T' instead of %T", content, mainprocess.MessageReadyConfirmed{})
}
return nil
},
)
assertNoError(err)
} }

View File

@@ -115,6 +115,7 @@ func runStreamd(
} }
if mainProcess != nil { if mainProcess != nil {
setReady(ctx, mainProcess)
go func() { go func() {
err := mainProcess.Serve( err := mainProcess.Serve(
ctx, ctx,
@@ -135,7 +136,6 @@ func runStreamd(
) )
logger.Fatalf(ctx, "communication (with the main process) error: %v", err) logger.Fatalf(ctx, "communication (with the main process) error: %v", err)
}() }()
setReady(ctx, mainProcess)
} }
<-ctx.Done() <-ctx.Done()

View File

@@ -24,6 +24,7 @@ func NewClient(
return nil, fmt.Errorf("unable to connect to '%s': %w", addr, err) return nil, fmt.Errorf("unable to connect to '%s': %w", addr, err)
} }
logger.Default().Tracef("connected to '%s' from '%s'", conn.RemoteAddr(), conn.LocalAddr()) logger.Default().Tracef("connected to '%s' from '%s'", conn.RemoteAddr(), conn.LocalAddr())
conn.(*net.TCPConn).SetNoDelay(true)
msg := RegistrationMessage{ msg := RegistrationMessage{
Password: password, Password: password,

View File

@@ -16,6 +16,7 @@ import (
func init() { func init() {
gob.Register(RegistrationMessage{}) gob.Register(RegistrationMessage{})
gob.Register(RegistrationResult{}) gob.Register(RegistrationResult{})
gob.Register(MessageReadyConfirmed{})
gob.Register(MessageReady{}) gob.Register(MessageReady{})
gob.Register(MessageFromMain{}) gob.Register(MessageFromMain{})
gob.Register(MessageToMain{}) gob.Register(MessageToMain{})
@@ -142,6 +143,7 @@ func (m *Manager) Serve(
return fmt.Errorf("unable to accept connection: %w", err) return fmt.Errorf("unable to accept connection: %w", err)
} }
logger.Tracef(ctx, "accepted a connection from '%s'", conn.RemoteAddr()) logger.Tracef(ctx, "accepted a connection from '%s'", conn.RemoteAddr())
conn.(*net.TCPConn).SetNoDelay(true)
m.addNewConnection(ctx, conn, onReceivedMessage) m.addNewConnection(ctx, conn, onReceivedMessage)
} }
@@ -292,7 +294,10 @@ func (m *Manager) processMessage(
logger.Tracef(ctx, "got a message to the main process from '%s': %#+v", source, message.Content) logger.Tracef(ctx, "got a message to the main process from '%s': %#+v", source, message.Content)
switch message.Content.(type) { switch message.Content.(type) {
case MessageReady: case MessageReady:
return m.setReady(source, conn) var result *multierror.Error
result = multierror.Append(result, m.SendMessagePreReady(ctx, source, MessageReadyConfirmed{}))
result = multierror.Append(result, m.setReady(source, conn))
return result.ErrorOrNil()
default: default:
return onReceivedMessage(ctx, source, message.Content) return onReceivedMessage(ctx, source, message.Content)
} }
@@ -520,3 +525,4 @@ func (m *Manager) SendMessage(
} }
type MessageReady struct{} type MessageReady struct{}
type MessageReadyConfirmed struct{}

View File

@@ -37,6 +37,9 @@ func Test(t *testing.T) {
callCount := map[string]int{} callCount := map[string]int{}
handleCall := func(procName string, content any) { handleCall := func(procName string, content any) {
if _, ok := content.(MessageReadyConfirmed); ok {
return
}
logger.Tracef(ctx, "handleCall('%s', %#+v)", procName, content) logger.Tracef(ctx, "handleCall('%s', %#+v)", procName, content)
count := callCount[procName] count := callCount[procName]
count++ count++
@@ -70,6 +73,7 @@ func Test(t *testing.T) {
handleCall("child0", content) handleCall("child0", content)
return nil return nil
}) })
c0.SendMessage(ctx, "main", MessageReady{})
c1, err := NewClient("child1", m.Addr().String(), m.Password()) c1, err := NewClient("child1", m.Addr().String(), m.Password())
require.NoError(t, err) require.NoError(t, err)
@@ -78,6 +82,7 @@ func Test(t *testing.T) {
handleCall("child1", content) handleCall("child1", content)
return nil return nil
}) })
c1.SendMessage(ctx, "main", MessageReady{})
_, err = NewClient("child2", m.Addr().String(), m.Password()) _, err = NewClient("child2", m.Addr().String(), m.Password())
require.Error(t, err) require.Error(t, err)