Init virtual sound card, serial port and serial port TCP server only once

We init only once with the first device name we acquire, so apps using these
interfaces won't have issues with the interface going down while the app is
running.
This commit is contained in:
Nonoo 2020-10-28 22:32:45 +01:00
parent cac084b67f
commit 9c8e356a21
6 changed files with 130 additions and 92 deletions

View file

@ -35,6 +35,8 @@ type audioStruct struct {
canPlay chan bool
}
var audio audioStruct
func (a *audioStruct) playLoop(deinitNeededChan, deinitFinishedChan chan bool) {
for {
select {
@ -142,7 +144,7 @@ func (a *audioStruct) loop() {
select {
case d = <-a.play:
case <-a.deinitNeededChan:
a.close()
a.closeIfNeeded()
recLoopDeinitNeededChan <- true
<-recLoopDeinitFinishedChan
@ -170,67 +172,75 @@ func (a *audioStruct) loop() {
}
}
func (a *audioStruct) init(devName string) error {
// We only init the audio once, with the first device name we acquire, so apps using the virtual sound card
// won't have issues with the interface going down while the app is running.
func (a *audioStruct) initIfNeeded(devName string) error {
bufferSizeInBits := (audioSampleRate * audioSampleBytes * 8) / 1000 * pulseAudioBufferLength.Milliseconds()
a.source.Name = "kappanhang-" + devName
a.source.Filename = "/tmp/kappanhang-" + devName + ".source"
a.source.Rate = audioSampleRate
a.source.Format = "s16le"
a.source.Channels = 1
a.source.SetProperty("device.buffering.buffer_size", bufferSizeInBits)
a.source.SetProperty("device.description", "kappanhang: "+devName)
if !a.source.IsOpen() {
a.source.Name = "kappanhang-" + devName
a.source.Filename = "/tmp/kappanhang-" + devName + ".source"
a.source.Rate = audioSampleRate
a.source.Format = "s16le"
a.source.Channels = 1
a.source.SetProperty("device.buffering.buffer_size", bufferSizeInBits)
a.source.SetProperty("device.description", "kappanhang: "+devName)
// Cleanup previous pipes.
sources, err := papipes.GetActiveSources()
if err == nil {
for _, i := range sources {
if i.Filename == a.source.Filename {
i.Close()
// Cleanup previous pipes.
sources, err := papipes.GetActiveSources()
if err == nil {
for _, i := range sources {
if i.Filename == a.source.Filename {
i.Close()
}
}
}
if err := a.source.Open(); err != nil {
return err
}
}
a.sink.Name = "kappanhang-" + devName
a.sink.Filename = "/tmp/kappanhang-" + devName + ".sink"
a.sink.Rate = audioSampleRate
a.sink.Format = "s16le"
a.sink.Channels = 1
a.sink.UseSystemClockForTiming = true
a.sink.SetProperty("device.buffering.buffer_size", bufferSizeInBits)
a.sink.SetProperty("device.description", "kappanhang: "+devName)
if !a.sink.IsOpen() {
a.sink.Name = "kappanhang-" + devName
a.sink.Filename = "/tmp/kappanhang-" + devName + ".sink"
a.sink.Rate = audioSampleRate
a.sink.Format = "s16le"
a.sink.Channels = 1
a.sink.UseSystemClockForTiming = true
a.sink.SetProperty("device.buffering.buffer_size", bufferSizeInBits)
a.sink.SetProperty("device.description", "kappanhang: "+devName)
// Cleanup previous pipes.
sinks, err := papipes.GetActiveSinks()
if err == nil {
for _, i := range sinks {
if i.Filename == a.sink.Filename {
i.Close()
// Cleanup previous pipes.
sinks, err := papipes.GetActiveSinks()
if err == nil {
for _, i := range sinks {
if i.Filename == a.sink.Filename {
i.Close()
}
}
}
if err := a.sink.Open(); err != nil {
return err
}
}
if err := a.source.Open(); err != nil {
return err
if a.playBuf == nil {
log.Print("opened device " + a.source.Name)
a.playBuf = bytes.NewBuffer([]byte{})
a.play = make(chan []byte)
a.canPlay = make(chan bool)
a.rec = make(chan []byte)
a.deinitNeededChan = make(chan bool)
a.deinitFinishedChan = make(chan bool)
go a.loop()
}
if err := a.sink.Open(); err != nil {
return err
}
log.Print("opened device " + a.source.Name)
a.playBuf = bytes.NewBuffer([]byte{})
a.play = make(chan []byte)
a.canPlay = make(chan bool)
a.rec = make(chan []byte)
a.deinitNeededChan = make(chan bool)
a.deinitFinishedChan = make(chan bool)
go a.loop()
return nil
}
func (a *audioStruct) close() {
func (a *audioStruct) closeIfNeeded() {
if a.source.IsOpen() {
if err := a.source.Close(); err != nil {
if _, ok := err.(*os.PathError); !ok {
@ -249,7 +259,7 @@ func (a *audioStruct) close() {
}
func (a *audioStruct) deinit() {
a.close()
a.closeIfNeeded()
if a.deinitNeededChan != nil {
a.deinitNeededChan <- true

View file

@ -13,8 +13,6 @@ const audioRxSeqBufLength = 120 * time.Millisecond
type audioStream struct {
common streamCommon
audio audioStruct
deinitNeededChan chan bool
deinitFinishedChan chan bool
@ -86,7 +84,7 @@ func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) {
s.lastReceivedSeq = gotSeq
s.receivedAudio = true
s.audio.play <- e.data
audio.play <- e.data
}
// var drop int
@ -141,7 +139,7 @@ func (s *audioStream) loop() {
reportError(errors.New("audio stream timeout, try rebooting the radio"))
case e := <-s.rxSeqBufEntryChan:
s.handleRxSeqBufEntry(e)
case d := <-s.audio.rec:
case d := <-audio.rec:
if err := s.sendPart1(d[:1364]); err != nil {
reportError(err)
}
@ -160,7 +158,7 @@ func (s *audioStream) init(devName string) error {
return err
}
if err := s.audio.init(devName); err != nil {
if err := audio.initIfNeeded(devName); err != nil {
return err
}
@ -195,5 +193,4 @@ func (s *audioStream) deinit() {
}
s.common.deinit()
s.rxSeqBuf.deinit()
s.audio.deinit()
}

View file

@ -110,6 +110,10 @@ func main() {
}
}
audio.deinit()
serialTCPSrv.deinit()
serialPort.deinit()
log.Print("exiting")
os.Exit(exitCode)
}

View file

@ -23,8 +23,9 @@ type serialPortStruct struct {
write chan []byte
}
var serialPort serialPortStruct
func (s *serialPortStruct) writeLoop() {
s.write = make(chan []byte)
var b []byte
for {
select {
@ -47,7 +48,6 @@ func (s *serialPortStruct) writeLoop() {
}
func (s *serialPortStruct) readLoop() {
s.read = make(chan []byte)
for {
b := make([]byte, maxSerialFrameLength)
n, err := s.pty.Master.Read(b)
@ -66,7 +66,20 @@ func (s *serialPortStruct) readLoop() {
}
}
func (s *serialPortStruct) init(devName string) (err error) {
// We only init the virtual serial port once, with the first device name we acquire, so apps using the
// virtual serial port won't have issues with the interface going down while the app is running.
func (s *serialPortStruct) initIfNeeded(devName string) (err error) {
if s.pty != nil {
// Depleting channel which may contain data while the serial connection to the server was offline.
for {
select {
case <-s.read:
default:
return
}
}
}
s.pty, err = term.OpenPTY()
if err != nil {
return err
@ -94,6 +107,9 @@ func (s *serialPortStruct) init(devName string) (err error) {
}
log.Print("opened ", n, " as ", s.symlink)
s.write = make(chan []byte)
s.read = make(chan []byte)
s.readLoopDeinitNeededChan = make(chan bool)
s.readLoopDeinitFinishedChan = make(chan bool)
go s.readLoop()

View file

@ -12,9 +12,6 @@ const serialRxSeqBufLength = 120 * time.Millisecond
type serialStream struct {
common streamCommon
serialPort serialPortStruct
tcpsrv serialTCPSrv
sendSeq uint16
rxSeqBuf seqBuf
@ -95,11 +92,11 @@ func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) {
e.data = e.data[21:]
if s.serialPort.write != nil {
s.serialPort.write <- e.data
if serialPort.write != nil {
serialPort.write <- e.data
}
if s.tcpsrv.toClient != nil {
s.tcpsrv.toClient <- e.data
if serialTCPSrv.isClientConnected() {
serialTCPSrv.toClient <- e.data
}
}
@ -175,7 +172,7 @@ func (s *serialStream) loop() {
if enableSerialDevice {
for {
select {
case r := <-s.serialPort.read:
case r := <-serialPort.read:
s.gotDataForRadio(r)
case r := <-s.common.readChan:
@ -184,7 +181,7 @@ func (s *serialStream) loop() {
}
case e := <-s.rxSeqBufEntryChan:
s.handleRxSeqBufEntry(e)
case r := <-s.tcpsrv.fromClient:
case r := <-serialTCPSrv.fromClient:
s.gotDataForRadio(r)
case <-s.readFromSerialPort.frameTimeout.C:
s.readFromSerialPort.buf.Reset()
@ -203,7 +200,7 @@ func (s *serialStream) loop() {
}
case e := <-s.rxSeqBufEntryChan:
s.handleRxSeqBufEntry(e)
case r := <-s.tcpsrv.fromClient:
case r := <-serialTCPSrv.fromClient:
s.gotDataForRadio(r)
case <-s.readFromSerialPort.frameTimeout.C:
s.readFromSerialPort.buf.Reset()
@ -222,10 +219,13 @@ func (s *serialStream) init(devName string) error {
}
if enableSerialDevice {
if err := s.serialPort.init(devName); err != nil {
if err := serialPort.initIfNeeded(devName); err != nil {
return err
}
}
if err := serialTCPSrv.initIfNeeded(); err != nil {
return err
}
if err := s.common.start(); err != nil {
return err
@ -241,10 +241,6 @@ func (s *serialStream) init(devName string) error {
log.Print("stream started")
if err := s.tcpsrv.start(); err != nil {
return err
}
s.rxSeqBufEntryChan = make(chan seqBufEntry)
s.rxSeqBuf.init(serialRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan)
@ -263,9 +259,6 @@ func (s *serialStream) deinit() {
_ = s.sendOpenClose(true)
}
s.tcpsrv.stop()
s.serialPort.deinit()
if s.deinitNeededChan != nil {
s.deinitNeededChan <- true
<-s.deinitFinishedChan

View file

@ -6,7 +6,7 @@ import (
"net"
)
type serialTCPSrv struct {
type serialTCPSrvStruct struct {
listener net.Listener
client net.Conn
@ -16,15 +16,17 @@ type serialTCPSrv struct {
writeLoopDeinitNeededChan chan bool
writeLoopDeinitFinishedChan chan bool
deinitNeededChan chan bool
deinitFinishedChan chan bool
}
func (s *serialTCPSrv) writeLoop(errChan chan error) {
s.toClient = make(chan []byte)
defer func() {
s.toClient = nil
}()
var serialTCPSrv serialTCPSrvStruct
func (s *serialTCPSrvStruct) isClientConnected() bool {
return s.writeLoopDeinitNeededChan != nil
}
func (s *serialTCPSrvStruct) writeLoop(errChan chan error) {
var b []byte
for {
select {
@ -38,13 +40,14 @@ func (s *serialTCPSrv) writeLoop(errChan chan error) {
written, err := s.client.Write(b)
if err != nil {
errChan <- err
break
}
b = b[written:]
}
}
}
func (s *serialTCPSrv) disconnectClient() {
func (s *serialTCPSrvStruct) disconnectClient() {
if s.client != nil {
s.client.Close()
}
@ -57,7 +60,7 @@ func (s *serialTCPSrv) disconnectClient() {
}
}
func (s *serialTCPSrv) loop() {
func (s *serialTCPSrvStruct) loop() {
for {
var err error
s.client, err = s.listener.Accept()
@ -66,8 +69,8 @@ func (s *serialTCPSrv) loop() {
if err != io.EOF {
reportError(err)
}
s.listener.Close()
s.disconnectClient()
<-s.deinitNeededChan
s.deinitFinishedChan <- true
return
}
@ -91,15 +94,32 @@ func (s *serialTCPSrv) loop() {
case s.fromClient <- b[:n]:
case <-writeErrChan:
connected = false
case <-s.deinitNeededChan:
s.disconnectClient()
s.deinitFinishedChan <- true
return
}
}
s.client.Close()
s.disconnectClient()
log.Print("client ", s.client.RemoteAddr().String(), " disconnected")
}
}
func (s *serialTCPSrv) start() (err error) {
// We only init the serial port TCP server once, with the first device name we acquire, so apps using the
// serial port TCP server won't have issues with the interface going down while the app is running.
func (s *serialTCPSrvStruct) initIfNeeded() (err error) {
if s.listener != nil {
// Depleting channel which may contain data while the serial connection to the server was offline.
for {
select {
case <-s.fromClient:
default:
return
}
}
}
s.listener, err = net.Listen("tcp", fmt.Sprint(":", serialTCPPort))
if err != nil {
fmt.Println(err)
@ -109,22 +129,20 @@ func (s *serialTCPSrv) start() (err error) {
log.Print("exposing serial port on tcp port ", serialTCPPort)
s.fromClient = make(chan []byte)
s.toClient = make(chan []byte)
s.deinitNeededChan = make(chan bool)
s.deinitFinishedChan = make(chan bool)
go s.loop()
return
}
func (s *serialTCPSrv) stop() {
func (s *serialTCPSrvStruct) deinit() {
if s.listener != nil {
s.listener.Close()
}
s.disconnectClient()
if s.fromClient != nil {
close(s.fromClient)
}
if s.deinitFinishedChan != nil {
<-s.deinitFinishedChan
}
s.deinitNeededChan <- true
<-s.deinitFinishedChan
}