From 9c8e356a21e417002cfd7fe059d07df585f93be7 Mon Sep 17 00:00:00 2001 From: Nonoo Date: Wed, 28 Oct 2020 22:32:45 +0100 Subject: [PATCH] Init virtual sound card, serial port and serial port TCP server only once We init only once with the first device name we acquire, so apps using these interfaces won't have issues with the interface going down while the app is running. --- audio-linux.go | 104 ++++++++++++++++++++++++++---------------------- audiostream.go | 9 ++--- main.go | 4 ++ serial-linux.go | 22 ++++++++-- serialstream.go | 29 +++++--------- serialtcpsrv.go | 54 ++++++++++++++++--------- 6 files changed, 130 insertions(+), 92 deletions(-) diff --git a/audio-linux.go b/audio-linux.go index 9eaa6ce..b277c06 100644 --- a/audio-linux.go +++ b/audio-linux.go @@ -35,6 +35,8 @@ type audioStruct struct { canPlay chan bool } +var audio audioStruct + func (a *audioStruct) playLoop(deinitNeededChan, deinitFinishedChan chan bool) { for { select { @@ -142,7 +144,7 @@ func (a *audioStruct) loop() { select { case d = <-a.play: case <-a.deinitNeededChan: - a.close() + a.closeIfNeeded() recLoopDeinitNeededChan <- true <-recLoopDeinitFinishedChan @@ -170,67 +172,75 @@ func (a *audioStruct) loop() { } } -func (a *audioStruct) init(devName string) error { +// We only init the audio once, with the first device name we acquire, so apps using the virtual sound card +// won't have issues with the interface going down while the app is running. +func (a *audioStruct) initIfNeeded(devName string) error { bufferSizeInBits := (audioSampleRate * audioSampleBytes * 8) / 1000 * pulseAudioBufferLength.Milliseconds() - a.source.Name = "kappanhang-" + devName - a.source.Filename = "/tmp/kappanhang-" + devName + ".source" - a.source.Rate = audioSampleRate - a.source.Format = "s16le" - a.source.Channels = 1 - a.source.SetProperty("device.buffering.buffer_size", bufferSizeInBits) - a.source.SetProperty("device.description", "kappanhang: "+devName) + if !a.source.IsOpen() { + a.source.Name = "kappanhang-" + devName + a.source.Filename = "/tmp/kappanhang-" + devName + ".source" + a.source.Rate = audioSampleRate + a.source.Format = "s16le" + a.source.Channels = 1 + a.source.SetProperty("device.buffering.buffer_size", bufferSizeInBits) + a.source.SetProperty("device.description", "kappanhang: "+devName) - // Cleanup previous pipes. - sources, err := papipes.GetActiveSources() - if err == nil { - for _, i := range sources { - if i.Filename == a.source.Filename { - i.Close() + // Cleanup previous pipes. + sources, err := papipes.GetActiveSources() + if err == nil { + for _, i := range sources { + if i.Filename == a.source.Filename { + i.Close() + } } } + + if err := a.source.Open(); err != nil { + return err + } } - a.sink.Name = "kappanhang-" + devName - a.sink.Filename = "/tmp/kappanhang-" + devName + ".sink" - a.sink.Rate = audioSampleRate - a.sink.Format = "s16le" - a.sink.Channels = 1 - a.sink.UseSystemClockForTiming = true - a.sink.SetProperty("device.buffering.buffer_size", bufferSizeInBits) - a.sink.SetProperty("device.description", "kappanhang: "+devName) + if !a.sink.IsOpen() { + a.sink.Name = "kappanhang-" + devName + a.sink.Filename = "/tmp/kappanhang-" + devName + ".sink" + a.sink.Rate = audioSampleRate + a.sink.Format = "s16le" + a.sink.Channels = 1 + a.sink.UseSystemClockForTiming = true + a.sink.SetProperty("device.buffering.buffer_size", bufferSizeInBits) + a.sink.SetProperty("device.description", "kappanhang: "+devName) - // Cleanup previous pipes. - sinks, err := papipes.GetActiveSinks() - if err == nil { - for _, i := range sinks { - if i.Filename == a.sink.Filename { - i.Close() + // Cleanup previous pipes. + sinks, err := papipes.GetActiveSinks() + if err == nil { + for _, i := range sinks { + if i.Filename == a.sink.Filename { + i.Close() + } } } + + if err := a.sink.Open(); err != nil { + return err + } } - if err := a.source.Open(); err != nil { - return err + if a.playBuf == nil { + log.Print("opened device " + a.source.Name) + + a.playBuf = bytes.NewBuffer([]byte{}) + a.play = make(chan []byte) + a.canPlay = make(chan bool) + a.rec = make(chan []byte) + a.deinitNeededChan = make(chan bool) + a.deinitFinishedChan = make(chan bool) + go a.loop() } - - if err := a.sink.Open(); err != nil { - return err - } - - log.Print("opened device " + a.source.Name) - - a.playBuf = bytes.NewBuffer([]byte{}) - a.play = make(chan []byte) - a.canPlay = make(chan bool) - a.rec = make(chan []byte) - a.deinitNeededChan = make(chan bool) - a.deinitFinishedChan = make(chan bool) - go a.loop() return nil } -func (a *audioStruct) close() { +func (a *audioStruct) closeIfNeeded() { if a.source.IsOpen() { if err := a.source.Close(); err != nil { if _, ok := err.(*os.PathError); !ok { @@ -249,7 +259,7 @@ func (a *audioStruct) close() { } func (a *audioStruct) deinit() { - a.close() + a.closeIfNeeded() if a.deinitNeededChan != nil { a.deinitNeededChan <- true diff --git a/audiostream.go b/audiostream.go index 76849b1..6fb3a96 100644 --- a/audiostream.go +++ b/audiostream.go @@ -13,8 +13,6 @@ const audioRxSeqBufLength = 120 * time.Millisecond type audioStream struct { common streamCommon - audio audioStruct - deinitNeededChan chan bool deinitFinishedChan chan bool @@ -86,7 +84,7 @@ func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) { s.lastReceivedSeq = gotSeq s.receivedAudio = true - s.audio.play <- e.data + audio.play <- e.data } // var drop int @@ -141,7 +139,7 @@ func (s *audioStream) loop() { reportError(errors.New("audio stream timeout, try rebooting the radio")) case e := <-s.rxSeqBufEntryChan: s.handleRxSeqBufEntry(e) - case d := <-s.audio.rec: + case d := <-audio.rec: if err := s.sendPart1(d[:1364]); err != nil { reportError(err) } @@ -160,7 +158,7 @@ func (s *audioStream) init(devName string) error { return err } - if err := s.audio.init(devName); err != nil { + if err := audio.initIfNeeded(devName); err != nil { return err } @@ -195,5 +193,4 @@ func (s *audioStream) deinit() { } s.common.deinit() s.rxSeqBuf.deinit() - s.audio.deinit() } diff --git a/main.go b/main.go index d8de00b..1925c99 100644 --- a/main.go +++ b/main.go @@ -110,6 +110,10 @@ func main() { } } + audio.deinit() + serialTCPSrv.deinit() + serialPort.deinit() + log.Print("exiting") os.Exit(exitCode) } diff --git a/serial-linux.go b/serial-linux.go index 0492bc1..e8c424a 100644 --- a/serial-linux.go +++ b/serial-linux.go @@ -23,8 +23,9 @@ type serialPortStruct struct { write chan []byte } +var serialPort serialPortStruct + func (s *serialPortStruct) writeLoop() { - s.write = make(chan []byte) var b []byte for { select { @@ -47,7 +48,6 @@ func (s *serialPortStruct) writeLoop() { } func (s *serialPortStruct) readLoop() { - s.read = make(chan []byte) for { b := make([]byte, maxSerialFrameLength) n, err := s.pty.Master.Read(b) @@ -66,7 +66,20 @@ func (s *serialPortStruct) readLoop() { } } -func (s *serialPortStruct) init(devName string) (err error) { +// We only init the virtual serial port once, with the first device name we acquire, so apps using the +// virtual serial port won't have issues with the interface going down while the app is running. +func (s *serialPortStruct) initIfNeeded(devName string) (err error) { + if s.pty != nil { + // Depleting channel which may contain data while the serial connection to the server was offline. + for { + select { + case <-s.read: + default: + return + } + } + } + s.pty, err = term.OpenPTY() if err != nil { return err @@ -94,6 +107,9 @@ func (s *serialPortStruct) init(devName string) (err error) { } log.Print("opened ", n, " as ", s.symlink) + s.write = make(chan []byte) + s.read = make(chan []byte) + s.readLoopDeinitNeededChan = make(chan bool) s.readLoopDeinitFinishedChan = make(chan bool) go s.readLoop() diff --git a/serialstream.go b/serialstream.go index b0e905f..741d40c 100644 --- a/serialstream.go +++ b/serialstream.go @@ -12,9 +12,6 @@ const serialRxSeqBufLength = 120 * time.Millisecond type serialStream struct { common streamCommon - serialPort serialPortStruct - tcpsrv serialTCPSrv - sendSeq uint16 rxSeqBuf seqBuf @@ -95,11 +92,11 @@ func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) { e.data = e.data[21:] - if s.serialPort.write != nil { - s.serialPort.write <- e.data + if serialPort.write != nil { + serialPort.write <- e.data } - if s.tcpsrv.toClient != nil { - s.tcpsrv.toClient <- e.data + if serialTCPSrv.isClientConnected() { + serialTCPSrv.toClient <- e.data } } @@ -175,7 +172,7 @@ func (s *serialStream) loop() { if enableSerialDevice { for { select { - case r := <-s.serialPort.read: + case r := <-serialPort.read: s.gotDataForRadio(r) case r := <-s.common.readChan: @@ -184,7 +181,7 @@ func (s *serialStream) loop() { } case e := <-s.rxSeqBufEntryChan: s.handleRxSeqBufEntry(e) - case r := <-s.tcpsrv.fromClient: + case r := <-serialTCPSrv.fromClient: s.gotDataForRadio(r) case <-s.readFromSerialPort.frameTimeout.C: s.readFromSerialPort.buf.Reset() @@ -203,7 +200,7 @@ func (s *serialStream) loop() { } case e := <-s.rxSeqBufEntryChan: s.handleRxSeqBufEntry(e) - case r := <-s.tcpsrv.fromClient: + case r := <-serialTCPSrv.fromClient: s.gotDataForRadio(r) case <-s.readFromSerialPort.frameTimeout.C: s.readFromSerialPort.buf.Reset() @@ -222,10 +219,13 @@ func (s *serialStream) init(devName string) error { } if enableSerialDevice { - if err := s.serialPort.init(devName); err != nil { + if err := serialPort.initIfNeeded(devName); err != nil { return err } } + if err := serialTCPSrv.initIfNeeded(); err != nil { + return err + } if err := s.common.start(); err != nil { return err @@ -241,10 +241,6 @@ func (s *serialStream) init(devName string) error { log.Print("stream started") - if err := s.tcpsrv.start(); err != nil { - return err - } - s.rxSeqBufEntryChan = make(chan seqBufEntry) s.rxSeqBuf.init(serialRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) @@ -263,9 +259,6 @@ func (s *serialStream) deinit() { _ = s.sendOpenClose(true) } - s.tcpsrv.stop() - s.serialPort.deinit() - if s.deinitNeededChan != nil { s.deinitNeededChan <- true <-s.deinitFinishedChan diff --git a/serialtcpsrv.go b/serialtcpsrv.go index c65a80d..87f3604 100644 --- a/serialtcpsrv.go +++ b/serialtcpsrv.go @@ -6,7 +6,7 @@ import ( "net" ) -type serialTCPSrv struct { +type serialTCPSrvStruct struct { listener net.Listener client net.Conn @@ -16,15 +16,17 @@ type serialTCPSrv struct { writeLoopDeinitNeededChan chan bool writeLoopDeinitFinishedChan chan bool + deinitNeededChan chan bool deinitFinishedChan chan bool } -func (s *serialTCPSrv) writeLoop(errChan chan error) { - s.toClient = make(chan []byte) - defer func() { - s.toClient = nil - }() +var serialTCPSrv serialTCPSrvStruct +func (s *serialTCPSrvStruct) isClientConnected() bool { + return s.writeLoopDeinitNeededChan != nil +} + +func (s *serialTCPSrvStruct) writeLoop(errChan chan error) { var b []byte for { select { @@ -38,13 +40,14 @@ func (s *serialTCPSrv) writeLoop(errChan chan error) { written, err := s.client.Write(b) if err != nil { errChan <- err + break } b = b[written:] } } } -func (s *serialTCPSrv) disconnectClient() { +func (s *serialTCPSrvStruct) disconnectClient() { if s.client != nil { s.client.Close() } @@ -57,7 +60,7 @@ func (s *serialTCPSrv) disconnectClient() { } } -func (s *serialTCPSrv) loop() { +func (s *serialTCPSrvStruct) loop() { for { var err error s.client, err = s.listener.Accept() @@ -66,8 +69,8 @@ func (s *serialTCPSrv) loop() { if err != io.EOF { reportError(err) } - s.listener.Close() s.disconnectClient() + <-s.deinitNeededChan s.deinitFinishedChan <- true return } @@ -91,15 +94,32 @@ func (s *serialTCPSrv) loop() { case s.fromClient <- b[:n]: case <-writeErrChan: connected = false + case <-s.deinitNeededChan: + s.disconnectClient() + s.deinitFinishedChan <- true + return } } - s.client.Close() + s.disconnectClient() log.Print("client ", s.client.RemoteAddr().String(), " disconnected") } } -func (s *serialTCPSrv) start() (err error) { +// We only init the serial port TCP server once, with the first device name we acquire, so apps using the +// serial port TCP server won't have issues with the interface going down while the app is running. +func (s *serialTCPSrvStruct) initIfNeeded() (err error) { + if s.listener != nil { + // Depleting channel which may contain data while the serial connection to the server was offline. + for { + select { + case <-s.fromClient: + default: + return + } + } + } + s.listener, err = net.Listen("tcp", fmt.Sprint(":", serialTCPPort)) if err != nil { fmt.Println(err) @@ -109,22 +129,20 @@ func (s *serialTCPSrv) start() (err error) { log.Print("exposing serial port on tcp port ", serialTCPPort) s.fromClient = make(chan []byte) + s.toClient = make(chan []byte) + s.deinitNeededChan = make(chan bool) s.deinitFinishedChan = make(chan bool) go s.loop() return } -func (s *serialTCPSrv) stop() { +func (s *serialTCPSrvStruct) deinit() { if s.listener != nil { s.listener.Close() } s.disconnectClient() - if s.fromClient != nil { - close(s.fromClient) - } - if s.deinitFinishedChan != nil { - <-s.deinitFinishedChan - } + s.deinitNeededChan <- true + <-s.deinitFinishedChan }