Add support for restarting the whole process on failure

This commit is contained in:
Nonoo 2020-10-23 14:00:59 +02:00
parent 9dbd5760ce
commit f79d23f755
10 changed files with 677 additions and 310 deletions

View file

@ -14,6 +14,9 @@ type audioStruct struct {
source papipes.Source source papipes.Source
sink papipes.Sink sink papipes.Sink
deinitNeededChan chan bool
deinitFinishedChan chan bool
// Send to this channel to play audio. // Send to this channel to play audio.
play chan []byte play chan []byte
// Read from this channel for audio. // Read from this channel for audio.
@ -24,11 +27,14 @@ type audioStruct struct {
canPlay chan bool canPlay chan bool
} }
var audio audioStruct func (a *audioStruct) playLoop(deinitNeededChan, deinitFinishedChan chan bool) {
func (a *audioStruct) playLoop() {
for { for {
<-a.canPlay select {
case <-a.canPlay:
case <-deinitNeededChan:
deinitFinishedChan <- true
return
}
for { for {
a.mutex.Lock() a.mutex.Lock()
@ -53,9 +59,8 @@ func (a *audioStruct) playLoop() {
written, err := a.source.Write(d) written, err := a.source.Write(d)
if err != nil { if err != nil {
if _, ok := err.(*os.PathError); !ok { if _, ok := err.(*os.PathError); !ok {
exit(err) reportError(err)
} }
return
} }
bytesToWrite -= written bytesToWrite -= written
if bytesToWrite == 0 { if bytesToWrite == 0 {
@ -67,17 +72,26 @@ func (a *audioStruct) playLoop() {
} }
} }
func (a *audioStruct) recLoop() { func (a *audioStruct) recLoop(deinitNeededChan, deinitFinishedChan chan bool) {
defer func() {
deinitFinishedChan <- true
}()
frameBuf := make([]byte, 1920) frameBuf := make([]byte, 1920)
buf := bytes.NewBuffer([]byte{}) buf := bytes.NewBuffer([]byte{})
for { for {
select {
case <-deinitNeededChan:
return
default:
}
n, err := a.sink.Read(frameBuf) n, err := a.sink.Read(frameBuf)
if err != nil { if err != nil {
if _, ok := err.(*os.PathError); !ok { if _, ok := err.(*os.PathError); !ok {
exit(err) reportError(err)
} }
return
} }
buf.Write(frameBuf[:n]) buf.Write(frameBuf[:n])
@ -85,26 +99,48 @@ func (a *audioStruct) recLoop() {
for buf.Len() >= len(frameBuf) { for buf.Len() >= len(frameBuf) {
n, err = buf.Read(frameBuf) n, err = buf.Read(frameBuf)
if err != nil { if err != nil {
exit(err) reportError(err)
} }
if n != len(frameBuf) { if n != len(frameBuf) {
exit(errors.New("audio buffer read error")) reportError(errors.New("audio buffer read error"))
}
select {
case a.rec <- frameBuf:
case <-deinitNeededChan:
return
} }
a.rec <- frameBuf
} }
} }
} }
func (a *audioStruct) loop() { func (a *audioStruct) loop() {
go a.playLoop() playLoopDeinitNeededChan := make(chan bool)
go a.recLoop() playLoopDeinitFinishedChan := make(chan bool)
go a.playLoop(playLoopDeinitNeededChan, playLoopDeinitFinishedChan)
recLoopDeinitNeededChan := make(chan bool)
recLoopDeinitFinishedChan := make(chan bool)
go a.recLoop(recLoopDeinitNeededChan, recLoopDeinitFinishedChan)
var d []byte
for { for {
d := <-a.play select {
case d = <-a.play:
case <-a.deinitNeededChan:
recLoopDeinitNeededChan <- true
<-recLoopDeinitFinishedChan
playLoopDeinitNeededChan <- true
<-playLoopDeinitFinishedChan
a.deinitFinishedChan <- true
return
}
a.mutex.Lock() a.mutex.Lock()
a.playBuf.Write(d) a.playBuf.Write(d)
a.mutex.Unlock() a.mutex.Unlock()
// Non-blocking notify.
select { select {
case a.canPlay <- true: case a.canPlay <- true:
default: default:
@ -112,17 +148,17 @@ func (a *audioStruct) loop() {
} }
} }
func (a *audioStruct) init(devName string) { func (a *audioStruct) init(devName string) error {
a.source.Name = "kappanhang" a.source.Name = "kappanhang-" + devName
a.source.Filename = "/tmp/kappanhang.source" a.source.Filename = "/tmp/kappanhang-" + devName + ".source"
a.source.Rate = 48000 a.source.Rate = 48000
a.source.Format = "s16le" a.source.Format = "s16le"
a.source.Channels = 1 a.source.Channels = 1
a.source.SetProperty("device.buffering.buffer_size", (48000*16)/10) // 100 ms a.source.SetProperty("device.buffering.buffer_size", (48000*16)/10) // 100 ms
a.source.SetProperty("device.description", "kappanhang: "+devName) a.source.SetProperty("device.description", "kappanhang: "+devName)
a.sink.Name = "kappanhang" a.sink.Name = "kappanhang-" + devName
a.sink.Filename = "/tmp/kappanhang.sink" a.sink.Filename = "/tmp/kappanhang-" + devName + ".sink"
a.sink.Rate = 48000 a.sink.Rate = 48000
a.sink.Format = "s16le" a.sink.Format = "s16le"
a.sink.Channels = 1 a.sink.Channels = 1
@ -130,18 +166,21 @@ func (a *audioStruct) init(devName string) {
a.sink.SetProperty("device.description", "kappanhang: "+devName) a.sink.SetProperty("device.description", "kappanhang: "+devName)
if err := a.source.Open(); err != nil { if err := a.source.Open(); err != nil {
exit(err) return err
} }
if err := a.sink.Open(); err != nil { if err := a.sink.Open(); err != nil {
exit(err) return err
} }
a.playBuf = bytes.NewBuffer([]byte{}) a.playBuf = bytes.NewBuffer([]byte{})
a.play = make(chan []byte) a.play = make(chan []byte)
a.canPlay = make(chan bool) a.canPlay = make(chan bool)
a.rec = make(chan []byte) a.rec = make(chan []byte)
a.deinitNeededChan = make(chan bool)
a.deinitFinishedChan = make(chan bool)
go a.loop() go a.loop()
return nil
} }
func (a *audioStruct) deinit() { func (a *audioStruct) deinit() {
@ -160,4 +199,9 @@ func (a *audioStruct) deinit() {
} }
} }
} }
if a.deinitNeededChan != nil {
a.deinitNeededChan <- true
<-a.deinitFinishedChan
}
} }

View file

@ -15,6 +15,11 @@ const rxSeqBufLength = 100 * time.Millisecond
type audioStream struct { type audioStream struct {
common streamCommon common streamCommon
audio audioStruct
deinitNeededChan chan bool
deinitFinishedChan chan bool
timeoutTimer *time.Timer timeoutTimer *time.Timer
receivedAudio bool receivedAudio bool
lastReceivedAudioSeq uint16 lastReceivedAudioSeq uint16
@ -24,41 +29,50 @@ type audioStream struct {
audioSendSeq uint16 audioSendSeq uint16
} }
func (s *audioStream) sendDisconnect() {
s.common.sendDisconnect()
}
// sendPart1 expects 1364 bytes of PCM data. // sendPart1 expects 1364 bytes of PCM data.
func (s *audioStream) sendPart1(pcmData []byte) { func (s *audioStream) sendPart1(pcmData []byte) error {
s.common.send(append([]byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), err := s.common.send(append([]byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))}, 0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))},
pcmData...)) pcmData...))
if err != nil {
return err
}
s.audioSendSeq++ s.audioSendSeq++
return nil
} }
// sendPart2 expects 556 bytes of PCM data. // sendPart2 expects 556 bytes of PCM data.
func (s *audioStream) sendPart2(pcmData []byte) { func (s *audioStream) sendPart2(pcmData []byte) error {
s.common.send(append([]byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8), err := s.common.send(append([]byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00, byte(s.audioSendSeq), byte(s.audioSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID), byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID),
0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))}, 0x80, 0x00, byte((s.audioSendSeq - 1) >> 8), byte(s.audioSendSeq - 1), 0x00, 0x00, byte(len(pcmData) >> 8), byte(len(pcmData))},
pcmData...)) pcmData...))
if err != nil {
return err
}
s.audioSendSeq++ s.audioSendSeq++
return nil
} }
func (s *audioStream) sendRetransmitRequest(seqNum uint16) { func (s *audioStream) sendRetransmitRequest(seqNum uint16) error {
p := []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00, byte(seqNum), byte(seqNum >> 8), p := []byte{0x10, 0x00, 0x00, 0x00, 0x01, 0x00, byte(seqNum), byte(seqNum >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)} byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)}
s.common.send(p) if err := s.common.send(p); err != nil {
s.common.send(p) return err
}
if err := s.common.send(p); err != nil {
return err
}
return nil
} }
type seqNumRange [2]uint16 type seqNumRange [2]uint16
func (s *audioStream) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) { func (s *audioStream) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) error {
seqNumBytes := make([]byte, len(seqNumRanges)*4) seqNumBytes := make([]byte, len(seqNumRanges)*4)
for i := 0; i < len(seqNumRanges); i++ { for i := 0; i < len(seqNumRanges); i++ {
seqNumBytes[i*2] = byte(seqNumRanges[i][0]) seqNumBytes[i*2] = byte(seqNumRanges[i][0])
@ -70,8 +84,13 @@ func (s *audioStream) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange)
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)}, byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)},
seqNumBytes...) seqNumBytes...)
s.common.send(p) if err := s.common.send(p); err != nil {
s.common.send(p) return err
}
if err := s.common.send(p); err != nil {
return err
}
return nil
} }
func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) { func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) {
@ -91,7 +110,7 @@ func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) {
s.lastReceivedAudioSeq = gotSeq s.lastReceivedAudioSeq = gotSeq
s.receivedAudio = true s.receivedAudio = true
audio.play <- e.data s.audio.play <- e.data
} }
func (s *audioStream) handleAudioPacket(r []byte) { func (s *audioStream) handleAudioPacket(r []byte) {
@ -113,19 +132,46 @@ func (s *audioStream) handleRead(r []byte) {
} }
} }
func (s *audioStream) init() { func (s *audioStream) loop() {
s.common.open("audio", 50003) for {
s.rxSeqBufEntryChan = make(chan seqBufEntry) select {
s.rxSeqBuf.init(rxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) case r := <-s.common.readChan:
s.handleRead(r)
case <-s.timeoutTimer.C:
reportError(errors.New("audio stream timeout"))
case e := <-s.rxSeqBufEntryChan:
s.handleRxSeqBufEntry(e)
case d := <-s.audio.rec:
if err := s.sendPart1(d[:1364]); err != nil {
reportError(err)
}
if err := s.sendPart2(d[1364:1920]); err != nil {
reportError(err)
}
case <-s.deinitNeededChan:
s.deinitFinishedChan <- true
return
}
}
} }
func (s *audioStream) start(devName string) { func (s *audioStream) start(devName string) error {
s.common.sendPkt3() if err := s.audio.init(devName); err != nil {
s.common.waitForPkt4Answer() return err
s.common.sendPkt6() }
s.common.waitForPkt6Answer()
audio.init(devName) if err := s.common.sendPkt3(); err != nil {
return err
}
if err := s.common.waitForPkt4Answer(); err != nil {
return err
}
if err := s.common.sendPkt6(); err != nil {
return err
}
if err := s.common.waitForPkt6Answer(); err != nil {
return err
}
log.Print("stream started") log.Print("stream started")
@ -135,17 +181,30 @@ func (s *audioStream) start(devName string) {
s.audioSendSeq = 1 s.audioSendSeq = 1
for { s.deinitNeededChan = make(chan bool)
select { s.deinitFinishedChan = make(chan bool)
case r := <-s.common.readChan: go s.loop()
s.handleRead(r) return nil
case <-s.timeoutTimer.C: }
exit(errors.New("audio stream timeout"))
case e := <-s.rxSeqBufEntryChan: func (s *audioStream) init() error {
s.handleRxSeqBufEntry(e) if err := s.common.init("audio", 50003); err != nil {
case d := <-audio.rec: return err
s.sendPart1(d[:1364]) }
s.sendPart2(d[1364:1920]) s.rxSeqBufEntryChan = make(chan seqBufEntry)
} s.rxSeqBuf.init(rxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan)
} return nil
}
func (s *audioStream) deinit() {
if s.deinitNeededChan != nil {
s.deinitNeededChan <- true
<-s.deinitFinishedChan
}
if s.timeoutTimer != nil {
s.timeoutTimer.Stop()
}
s.common.deinit()
s.rxSeqBuf.deinit()
s.audio.deinit()
} }

View file

@ -12,21 +12,27 @@ import (
) )
type controlStream struct { type controlStream struct {
common streamCommon common streamCommon
serial serialStream
audio audioStream
deinitNeededChan chan bool
deinitFinishedChan chan bool
authSendSeq uint16 authSendSeq uint16
authInnerSendSeq uint16 authInnerSendSeq uint16
authID [6]byte authID [6]byte
serialAndAudioStreamOpened bool serialAndAudioStreamOpened bool
requestSerialAndAudioTimeout *time.Timer requestSerialAndAudioTimeout *time.Timer
} }
func (s *controlStream) sendPktAuth() { func (s *controlStream) sendPktAuth() error {
// The reply to the auth packet will contain a 6 bytes long auth ID with the first 2 bytes set to our randID. // The reply to the auth packet will contain a 6 bytes long auth ID with the first 2 bytes set to our randID.
var randID [2]byte var randID [2]byte
_, err := rand.Read(randID[:]) if _, err := rand.Read(randID[:]); err != nil {
if err != nil { return err
exit(err)
} }
p := []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8), p := []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
@ -45,14 +51,19 @@ func (s *controlStream) sendPktAuth() {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
s.common.send(p) if err := s.common.send(p); err != nil {
s.common.send(p) return err
}
if err := s.common.send(p); err != nil {
return err
}
s.authSendSeq++ s.authSendSeq++
s.authInnerSendSeq++ s.authInnerSendSeq++
return nil
} }
func (s *controlStream) sendPktReauth(firstReauthSend bool) { func (s *controlStream) sendPktReauth(firstReauthSend bool) error {
var magic byte var magic byte
if firstReauthSend { if firstReauthSend {
@ -86,11 +97,15 @@ func (s *controlStream) sendPktReauth(firstReauthSend bool) {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
s.common.send(p) if err := s.common.send(p); err != nil {
s.common.send(p) return err
}
if err := s.common.send(p); err != nil {
return err
}
s.authSendSeq++ s.authSendSeq++
s.authInnerSendSeq++ s.authInnerSendSeq++
return nil
} }
// func (s *controlStream) sendDisconnect() { TODO // func (s *controlStream) sendDisconnect() { TODO
@ -109,17 +124,21 @@ func (s *controlStream) sendPktReauth(firstReauthSend bool) {
// s.common.sendDisconnect() // s.common.sendDisconnect()
// } // }
func (s *controlStream) sendPkt0() { func (s *controlStream) sendPkt0() error {
p := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8), p := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)} byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)}
s.common.send(p) if err := s.common.send(p); err != nil {
s.common.send(p) return err
}
if err := s.common.send(p); err != nil {
return err
}
s.authSendSeq++ s.authSendSeq++
return nil
} }
func (s *controlStream) sendRequestSerialAndAudio() { func (s *controlStream) sendRequestSerialAndAudio() error {
log.Print("requesting serial and audio stream") log.Print("requesting serial and audio stream")
p := []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8), p := []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00, byte(s.authSendSeq), byte(s.authSendSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
@ -140,15 +159,20 @@ func (s *controlStream) sendRequestSerialAndAudio() {
0x00, 0x00, 0xbb, 0x80, 0x00, 0x00, 0xc3, 0x52, 0x00, 0x00, 0xbb, 0x80, 0x00, 0x00, 0xc3, 0x52,
0x00, 0x00, 0xc3, 0x53, 0x00, 0x00, 0x00, 0xa0, 0x00, 0x00, 0xc3, 0x53, 0x00, 0x00, 0x00, 0xa0,
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
s.common.send(p) if err := s.common.send(p); err != nil {
s.common.send(p) return err
}
if err := s.common.send(p); err != nil {
return err
}
s.authSendSeq++ s.authSendSeq++
s.authInnerSendSeq++ s.authInnerSendSeq++
s.requestSerialAndAudioTimeout = time.AfterFunc(3*time.Second, func() { s.requestSerialAndAudioTimeout = time.AfterFunc(3*time.Second, func() {
exit(errors.New("serial and audio request timeout")) reportError(errors.New("serial and audio request timeout"))
}) })
return nil
} }
func (s *controlStream) parseNullTerminatedString(d []byte) (res string) { func (s *controlStream) parseNullTerminatedString(d []byte) (res string) {
@ -159,7 +183,7 @@ func (s *controlStream) parseNullTerminatedString(d []byte) (res string) {
return return
} }
func (s *controlStream) handleRead(r []byte) { func (s *controlStream) handleRead(r []byte) error {
switch len(r) { switch len(r) {
case 16: case 16:
if bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00}) { if bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00}) {
@ -170,8 +194,12 @@ func (s *controlStream) handleRead(r []byte) {
p := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(gotSeq), byte(gotSeq >> 8), p := []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(gotSeq), byte(gotSeq >> 8),
byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID), byte(s.common.localSID >> 24), byte(s.common.localSID >> 16), byte(s.common.localSID >> 8), byte(s.common.localSID),
byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)} byte(s.common.remoteSID >> 24), byte(s.common.remoteSID >> 16), byte(s.common.remoteSID >> 8), byte(s.common.remoteSID)}
s.common.send(p) if err := s.common.send(p); err != nil {
s.common.send(p) return err
}
if err := s.common.send(p); err != nil {
return err
}
} }
case 80: case 80:
if bytes.Equal(r[:6], []byte{0x50, 0x00, 0x00, 0x00, 0x00, 0x00}) && bytes.Equal(r[48:51], []byte{0xff, 0xff, 0xff}) { if bytes.Equal(r[:6], []byte{0x50, 0x00, 0x00, 0x00, 0x00, 0x00}) && bytes.Equal(r[48:51], []byte{0xff, 0xff, 0xff}) {
@ -186,7 +214,7 @@ func (s *controlStream) handleRead(r []byte) {
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
exit(errors.New("reauth failed, try again after about 1 minute")) return errors.New("reauth failed")
} }
case 144: case 144:
if !s.serialAndAudioStreamOpened && bytes.Equal(r[:6], []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00}) && r[96] == 1 { if !s.serialAndAudioStreamOpened && bytes.Equal(r[:6], []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00}) && r[96] == 1 {
@ -215,31 +243,82 @@ func (s *controlStream) handleRead(r []byte) {
s.requestSerialAndAudioTimeout.Stop() s.requestSerialAndAudioTimeout.Stop()
s.requestSerialAndAudioTimeout = nil s.requestSerialAndAudioTimeout = nil
} }
go streams.serial.start()
go streams.audio.start(devName) if err := s.serial.start(devName); err != nil {
return err
}
if err := s.audio.start(devName); err != nil {
return err
}
s.serialAndAudioStreamOpened = true s.serialAndAudioStreamOpened = true
} }
} }
return nil
}
func (s *controlStream) loop() {
startTime := time.Now()
pkt0SendTicker := time.NewTicker(100 * time.Millisecond)
reauthTicker := time.NewTicker(60 * time.Second)
statusLogTicker := time.NewTicker(3 * time.Second)
for {
select {
case r := <-s.common.readChan:
if err := s.handleRead(r); err != nil {
reportError(err)
}
case <-pkt0SendTicker.C:
if err := s.sendPkt0(); err != nil {
reportError(err)
}
case <-reauthTicker.C:
if err := s.sendPktReauth(false); err != nil {
reportError(err)
}
case <-statusLogTicker.C:
if s.serialAndAudioStreamOpened {
log.Print("running for ", time.Since(startTime), " roundtrip latency ", s.common.pkt7.latency)
}
case <-s.deinitNeededChan:
s.deinitFinishedChan <- true
return
}
}
} }
func (s *controlStream) init() { func (s *controlStream) start() error {
s.common.open("control", 50001) if err := s.common.init("control", 50001); err != nil {
} return err
}
func (s *controlStream) start() { if err := s.common.sendPkt3(); err != nil {
startTime := time.Now() return err
}
s.common.sendPkt3()
s.common.pkt7.sendSeq = 1 s.common.pkt7.sendSeq = 1
s.common.pkt7.send(&s.common) if err := s.common.pkt7.send(&s.common); err != nil {
s.common.sendPkt3() return err
s.common.waitForPkt4Answer() }
s.common.sendPkt6() if err := s.common.sendPkt3(); err != nil {
s.common.waitForPkt6Answer() return err
}
if err := s.common.waitForPkt4Answer(); err != nil {
return err
}
if err := s.common.sendPkt6(); err != nil {
return err
}
if err := s.common.waitForPkt6Answer(); err != nil {
return err
}
s.authSendSeq = 1 s.authSendSeq = 1
s.authInnerSendSeq = 1 s.authInnerSendSeq = 1
s.sendPktAuth() if err := s.sendPktAuth(); err != nil {
return err
}
log.Debug("expecting auth answer") log.Debug("expecting auth answer")
// Example success auth packet: 0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, // Example success auth packet: 0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
@ -254,44 +333,65 @@ func (s *controlStream) start() {
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00
r := s.common.expect(96, []byte{0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00}) r, err := s.common.expect(96, []byte{0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00})
if err != nil {
return err
}
if bytes.Equal(r[48:52], []byte{0xff, 0xff, 0xff, 0xfe}) { if bytes.Equal(r[48:52], []byte{0xff, 0xff, 0xff, 0xfe}) {
exit(errors.New("invalid user/password")) return errors.New("invalid user/password")
} }
copy(s.authID[:], r[26:32]) copy(s.authID[:], r[26:32])
log.Print("auth ok, waiting a bit") log.Print("auth ok, waiting a bit")
time.AfterFunc(1*time.Second, func() { time.AfterFunc(2*time.Second, func() {
log.Print("sending reauth 1/2...") log.Print("sending reauth 1/2...")
s.sendPktReauth(true) if err := s.sendPktReauth(true); err != nil {
time.AfterFunc(1*time.Second, func() { reportError(err)
}
time.AfterFunc(2*time.Second, func() {
log.Print("sending reauth 2/2...") log.Print("sending reauth 2/2...")
s.sendPktReauth(false) if err := s.sendPktReauth(false); err != nil {
time.AfterFunc(time.Second, func() { reportError(err)
s.sendRequestSerialAndAudio() }
time.AfterFunc(2*time.Second, func() {
if err := s.sendRequestSerialAndAudio(); err != nil {
reportError(err)
}
}) })
}) })
}) })
s.common.pkt7.startPeriodicSend(&s.common, 5, false) s.common.pkt7.startPeriodicSend(&s.common, 5, false)
pkt0SendTicker := time.NewTicker(100 * time.Millisecond) s.deinitNeededChan = make(chan bool)
reauthTicker := time.NewTicker(60 * time.Second) s.deinitFinishedChan = make(chan bool)
statusLogTicker := time.NewTicker(3 * time.Second) go s.loop()
return nil
for { }
select {
case r = <-s.common.readChan: func (s *controlStream) init() error {
s.handleRead(r) log.Print("init")
case <-pkt0SendTicker.C:
s.sendPkt0() if err := s.serial.init(); err != nil {
case <-reauthTicker.C: return err
s.sendPktReauth(false) }
case <-statusLogTicker.C: if err := s.audio.init(); err != nil {
if s.serialAndAudioStreamOpened { return err
log.Print("running for ", time.Since(startTime), " roundtrip latency ", s.common.pkt7.latency) }
} return nil
} }
}
func (s *controlStream) deinit() {
if s.deinitNeededChan != nil {
s.deinitNeededChan <- true
<-s.deinitFinishedChan
}
if s.requestSerialAndAudioTimeout != nil {
s.requestSerialAndAudioTimeout.Stop()
s.requestSerialAndAudioTimeout = nil
}
s.audio.deinit()
s.serial.deinit()
s.common.deinit()
} }

View file

@ -13,7 +13,7 @@ import (
var logger *zap.SugaredLogger var logger *zap.SugaredLogger
var filenameTrimChars int var filenameTrimChars int
func getCallerFileName(withLine bool) string { func GetCallerFileName(withLine bool) string {
_, filename, line, _ := runtime.Caller(2) _, filename, line, _ := runtime.Caller(2)
extension := filepath.Ext(filename) extension := filepath.Ext(filename)
if withLine { if withLine {
@ -24,35 +24,39 @@ func getCallerFileName(withLine bool) string {
} }
func Printf(a string, b ...interface{}) { func Printf(a string, b ...interface{}) {
logger.Infof(getCallerFileName(false)+": "+a, b...) logger.Infof(GetCallerFileName(false)+": "+a, b...)
} }
func Print(a ...interface{}) { func Print(a ...interface{}) {
logger.Info(append([]interface{}{getCallerFileName(false) + ": "}, a...)...) logger.Info(append([]interface{}{GetCallerFileName(false) + ": "}, a...)...)
} }
func Debugf(a string, b ...interface{}) { func Debugf(a string, b ...interface{}) {
logger.Debugf(getCallerFileName(true)+": "+a, b...) logger.Debugf(GetCallerFileName(true)+": "+a, b...)
} }
func Debug(a ...interface{}) { func Debug(a ...interface{}) {
logger.Debug(append([]interface{}{getCallerFileName(true) + ": "}, a...)...) logger.Debug(append([]interface{}{GetCallerFileName(true) + ": "}, a...)...)
} }
func Errorf(a string, b ...interface{}) { func Errorf(a string, b ...interface{}) {
logger.Errorf(getCallerFileName(true)+": "+a, b...) logger.Errorf(GetCallerFileName(true)+": "+a, b...)
} }
func Error(a ...interface{}) { func Error(a ...interface{}) {
logger.Error(append([]interface{}{getCallerFileName(true) + ": "}, a...)...) logger.Error(append([]interface{}{GetCallerFileName(true) + ": "}, a...)...)
}
func ErrorC(a ...interface{}) {
logger.Error(a)
} }
func Fatalf(a string, b ...interface{}) { func Fatalf(a string, b ...interface{}) {
logger.Fatalf(getCallerFileName(true)+": "+a, b...) logger.Fatalf(GetCallerFileName(true)+": "+a, b...)
} }
func Fatal(a ...interface{}) { func Fatal(a ...interface{}) {
logger.Fatal(append([]interface{}{getCallerFileName(true) + ": "}, a...)...) logger.Fatal(append([]interface{}{GetCallerFileName(true) + ": "}, a...)...)
} }
func Init() { func Init() {

87
main.go
View file

@ -3,43 +3,57 @@ package main
import ( import (
"os" "os"
"os/signal" "os/signal"
"strings"
"syscall" "syscall"
"time"
"github.com/nonoo/kappanhang/log" "github.com/nonoo/kappanhang/log"
) )
var streams struct { var gotErrChan = make(chan bool)
control controlStream
serial serialStream
audio audioStream
}
func exit(err error) { func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) {
if err != nil { // Depleting gotErrChan.
log.Error(err.Error()) var finished bool
for !finished {
select {
case <-gotErrChan:
default:
finished = true
}
} }
streams.audio.common.close() c := controlStream{}
streams.serial.common.close() defer c.deinit()
streams.control.common.close()
serialPort.deinit()
audio.deinit()
log.Print("exiting") if err := c.init(); err != nil {
if err == nil { log.Error(err)
os.Exit(0) return true, 1
} else { }
os.Exit(1) if err := c.start(); err != nil {
log.Error(err)
return
}
select {
case <-gotErrChan:
return
case <-osSignal:
log.Print("sigterm received")
return true, 0
} }
} }
func setupCloseHandler() { func reportError(err error) {
c := make(chan os.Signal) if !strings.Contains(err.Error(), "use of closed network connection") {
signal.Notify(c, os.Interrupt, syscall.SIGTERM) log.ErrorC(log.GetCallerFileName(true), ": ", err)
go func() { }
<-c
exit(nil) // Non-blocking notify.
}() select {
case gotErrChan <- true:
default:
}
} }
func main() { func main() {
@ -47,12 +61,23 @@ func main() {
log.Print("kappanhang by Norbert Varga HA2NON and Akos Marton ES1AKOS https://github.com/nonoo/kappanhang") log.Print("kappanhang by Norbert Varga HA2NON and Akos Marton ES1AKOS https://github.com/nonoo/kappanhang")
parseArgs() parseArgs()
serialPort.init() osSignal := make(chan os.Signal, 1)
streams.audio.init() signal.Notify(osSignal, os.Interrupt, syscall.SIGTERM)
streams.serial.init()
streams.control.init()
setupCloseHandler() var shouldExit bool
var exitCode int
for !shouldExit {
shouldExit, exitCode = runControlStream(osSignal)
if !shouldExit {
log.Print("restarting control stream...")
select {
case <-time.NewTimer(3 * time.Second).C:
case <-osSignal:
shouldExit = true
}
}
}
streams.control.start() log.Print("exiting")
os.Exit(exitCode)
} }

101
pkt7.go
View file

@ -21,24 +21,25 @@ type pkt7Type struct {
timeoutTimer *time.Timer timeoutTimer *time.Timer
latency time.Duration latency time.Duration
lastSendAt time.Time lastSendAt time.Time
periodicStopNeededChan chan bool
periodicStopFinishedChan chan bool
} }
func (p *pkt7Type) isPkt7(r []byte) bool { func (p *pkt7Type) isPkt7(r []byte) bool {
return len(r) == 21 && bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) // Note that the first byte can be 0x15 or 0x00, so we ignore that. return len(r) == 21 && bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) // Note that the first byte can be 0x15 or 0x00, so we ignore that.
} }
// func (p *pkt7Type) tryReceive(timeout time.Duration, s *streamCommon) []byte { func (p *pkt7Type) handle(s *streamCommon, r []byte) error {
// return s.tryReceivePacket(timeout, 21, 1, []byte{0x00, 0x00, 0x00, 0x07, 0x00})
// }
func (p *pkt7Type) handle(s *streamCommon, r []byte) {
gotSeq := binary.LittleEndian.Uint16(r[6:8]) gotSeq := binary.LittleEndian.Uint16(r[6:8])
if r[16] == 0x00 { // This is a pkt7 request from the radio. if r[16] == 0x00 { // This is a pkt7 request from the radio.
// Replying to the radio. // Replying to the radio.
// Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00 // Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00
// Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00 // Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00
if p.sendTicker != nil { // Only replying if the auth is already done. if p.sendTicker != nil { // Only replying if the auth is already done.
p.sendReply(s, r[17:21], gotSeq) if err := p.sendReply(s, r[17:21], gotSeq); err != nil {
return err
}
} }
} else { // This is a pkt7 reply to our request. } else { // This is a pkt7 reply to our request.
if p.sendTicker != nil { // Auth is already done? if p.sendTicker != nil { // Auth is already done?
@ -64,18 +65,18 @@ func (p *pkt7Type) handle(s *streamCommon, r []byte) {
} }
p.lastConfirmedSeq = gotSeq p.lastConfirmedSeq = gotSeq
} }
return nil
} }
func (p *pkt7Type) sendDo(s *streamCommon, replyID []byte, seq uint16) { func (p *pkt7Type) sendDo(s *streamCommon, replyID []byte, seq uint16) error {
// Example request from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x00, 0x78, 0x40, 0xf6, 0x02 // Example request from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x00, 0x78, 0x40, 0xf6, 0x02
// Example reply from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x01, 0x78, 0x40, 0xf6, 0x02 // Example reply from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x01, 0x78, 0x40, 0xf6, 0x02
var replyFlag byte var replyFlag byte
if replyID == nil { if replyID == nil {
replyID = make([]byte, 4) replyID = make([]byte, 4)
var randID [2]byte var randID [2]byte
_, err := rand.Read(randID[:]) if _, err := rand.Read(randID[:]); err != nil {
if err != nil { return err
exit(err)
} }
replyID[0] = randID[0] replyID[0] = randID[0]
replyID[1] = randID[1] replyID[1] = randID[1]
@ -89,18 +90,54 @@ func (p *pkt7Type) sendDo(s *streamCommon, replyID []byte, seq uint16) {
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID), byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID),
replyFlag, replyID[0], replyID[1], replyID[2], replyID[3]} replyFlag, replyID[0], replyID[1], replyID[2], replyID[3]}
s.send(d) if err := s.send(d); err != nil {
s.send(d) return err
}
if err := s.send(d); err != nil {
return err
}
return nil
} }
func (p *pkt7Type) send(s *streamCommon) { func (p *pkt7Type) send(s *streamCommon) error {
p.sendDo(s, nil, p.sendSeq) if err := p.sendDo(s, nil, p.sendSeq); err != nil {
return err
}
p.lastSendAt = time.Now() p.lastSendAt = time.Now()
p.sendSeq++ p.sendSeq++
return nil
} }
func (p *pkt7Type) sendReply(s *streamCommon, replyID []byte, seq uint16) { func (p *pkt7Type) sendReply(s *streamCommon, replyID []byte, seq uint16) error {
p.sendDo(s, replyID, seq) return p.sendDo(s, replyID, seq)
}
func (p *pkt7Type) loop(s *streamCommon) {
for {
if p.timeoutTimer != nil {
select {
case <-p.sendTicker.C:
if err := p.send(s); err != nil {
reportError(err)
}
case <-p.timeoutTimer.C:
reportError(errors.New(s.name + "/ping timeout"))
case <-p.periodicStopNeededChan:
p.periodicStopFinishedChan <- true
return
}
} else {
select {
case <-p.sendTicker.C:
if err := p.send(s); err != nil {
reportError(err)
}
case <-p.periodicStopNeededChan:
p.periodicStopFinishedChan <- true
return
}
}
}
} }
func (p *pkt7Type) startPeriodicSend(s *streamCommon, firstSeqNo uint16, checkPingTimeout bool) { func (p *pkt7Type) startPeriodicSend(s *streamCommon, firstSeqNo uint16, checkPingTimeout bool) {
@ -112,19 +149,21 @@ func (p *pkt7Type) startPeriodicSend(s *streamCommon, firstSeqNo uint16, checkPi
p.timeoutTimer = time.NewTimer(pkt7TimeoutDuration) p.timeoutTimer = time.NewTimer(pkt7TimeoutDuration)
} }
go func() { p.periodicStopNeededChan = make(chan bool)
for { p.periodicStopFinishedChan = make(chan bool)
if checkPingTimeout { go p.loop(s)
select { }
case <-p.sendTicker.C:
p.send(s) func (p *pkt7Type) stopPeriodicSend() {
case <-p.timeoutTimer.C: if p.sendTicker == nil { // Periodic send has not started?
exit(errors.New(s.name + "/ping timeout")) return
} }
} else {
<-p.sendTicker.C p.periodicStopNeededChan <- true
p.send(s) <-p.periodicStopFinishedChan
}
} if p.timeoutTimer != nil {
}() p.timeoutTimer.Stop()
}
p.sendTicker.Stop()
} }

View file

@ -232,7 +232,11 @@ func (s *seqBuf) watcher() {
e, err := s.get() e, err := s.get()
if err == nil { if err == nil {
if s.entryChan != nil { if s.entryChan != nil {
s.entryChan <- e select {
case s.entryChan <- e:
case <-s.watcherCloseNeededChan:
return
}
} }
} else { } else {
log.Error(err) log.Error(err)
@ -271,7 +275,10 @@ func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, ent
go s.watcher() go s.watcher()
} }
// func (s *seqBuf) deinit() { func (s *seqBuf) deinit() {
// s.watcherCloseNeededChan <- true if s.watcherCloseNeededChan == nil { // Init has not ran?
// <-s.watcherCloseDoneChan return
// } }
s.watcherCloseNeededChan <- true
<-s.watcherCloseDoneChan
}

View file

@ -8,7 +8,13 @@ import (
) )
type serialPortStruct struct { type serialPortStruct struct {
pty *term.PTY pty *term.PTY
symlink string
writeLoopDeinitNeededChan chan bool
writeLoopDeinitFinishedChan chan bool
readLoopDeinitNeededChan chan bool
readLoopDeinitFinishedChan chan bool
// Read from this channel to receive serial data. // Read from this channel to receive serial data.
read chan []byte read chan []byte
@ -16,20 +22,24 @@ type serialPortStruct struct {
write chan []byte write chan []byte
} }
var serialPort serialPortStruct
func (s *serialPortStruct) writeLoop() { func (s *serialPortStruct) writeLoop() {
s.write = make(chan []byte) s.write = make(chan []byte)
var b []byte var b []byte
for { for {
b = <-s.write select {
case b = <-s.write:
case <-s.writeLoopDeinitNeededChan:
s.writeLoopDeinitFinishedChan <- true
return
}
bytesToWrite := len(b) bytesToWrite := len(b)
for bytesToWrite > 0 { for bytesToWrite > 0 {
written, err := s.pty.Master.Write(b) written, err := s.pty.Master.Write(b)
if err != nil { if err != nil {
if _, ok := err.(*os.PathError); !ok { if _, ok := err.(*os.PathError); !ok {
exit(err) reportError(err)
} }
} }
b = b[written:] b = b[written:]
@ -45,31 +55,55 @@ func (s *serialPortStruct) readLoop() {
n, err := s.pty.Master.Read(b) n, err := s.pty.Master.Read(b)
if err != nil { if err != nil {
if _, ok := err.(*os.PathError); !ok { if _, ok := err.(*os.PathError); !ok {
exit(err) reportError(err)
} }
} }
s.read <- b[:n]
select {
case s.read <- b[:n]:
case <-s.readLoopDeinitNeededChan:
s.readLoopDeinitFinishedChan <- true
return
}
} }
} }
func (s *serialPortStruct) init() { func (s *serialPortStruct) init(devName string) (err error) {
var err error
s.pty, err = term.OpenPTY() s.pty, err = term.OpenPTY()
if err != nil { if err != nil {
exit(err) return err
} }
n, err := s.pty.PTSName() n, err := s.pty.PTSName()
if err != nil { if err != nil {
exit(err) return err
} }
log.Print("opened ", n) s.symlink = "/tmp/kappanhang-" + devName + ".pty"
_ = os.Remove(s.symlink)
if err := os.Symlink(n, s.symlink); err != nil {
return err
}
log.Print("opened ", n, " as ", s.symlink)
s.readLoopDeinitNeededChan = make(chan bool)
s.readLoopDeinitFinishedChan = make(chan bool)
go s.readLoop() go s.readLoop()
s.writeLoopDeinitNeededChan = make(chan bool)
s.writeLoopDeinitFinishedChan = make(chan bool)
go s.writeLoop() go s.writeLoop()
return nil
} }
func (s *serialPortStruct) deinit() { func (s *serialPortStruct) deinit() {
if s.pty != nil { if s.pty != nil {
s.pty.Close() s.pty.Close()
} }
_ = os.Remove(s.symlink)
if s.readLoopDeinitNeededChan != nil {
s.readLoopDeinitNeededChan <- true
<-s.readLoopDeinitFinishedChan
}
if s.writeLoopDeinitNeededChan != nil {
s.writeLoopDeinitNeededChan <- true
<-s.writeLoopDeinitFinishedChan
}
} }

View file

@ -6,33 +6,68 @@ import (
type serialStream struct { type serialStream struct {
common streamCommon common streamCommon
}
func (s *serialStream) init() { serialPort serialPortStruct
s.common.open("serial", 50002)
deinitNeededChan chan bool
deinitFinishedChan chan bool
} }
func (s *serialStream) handleRead(r []byte) { func (s *serialStream) handleRead(r []byte) {
} }
func (s *serialStream) start() { func (s *serialStream) loop() {
s.common.sendPkt3() for {
s.common.waitForPkt4Answer() select {
s.common.sendPkt6() case r := <-s.common.readChan:
s.common.waitForPkt6Answer() s.handleRead(r)
case <-s.deinitNeededChan:
s.deinitFinishedChan <- true
return
}
}
}
func (s *serialStream) start(devName string) error {
if err := s.serialPort.init(devName); err != nil {
return err
}
if err := s.common.sendPkt3(); err != nil {
return err
}
if err := s.common.waitForPkt4Answer(); err != nil {
return err
}
if err := s.common.sendPkt6(); err != nil {
return err
}
if err := s.common.waitForPkt6Answer(); err != nil {
return err
}
log.Print("stream started") log.Print("stream started")
s.common.pkt7.startPeriodicSend(&s.common, 1, false) s.common.pkt7.startPeriodicSend(&s.common, 1, false)
for { s.deinitNeededChan = make(chan bool)
select { s.deinitFinishedChan = make(chan bool)
case r := <-s.common.readChan: go s.loop()
s.handleRead(r) return nil
}
}
} }
func (s *serialStream) sendDisconnect() { func (s *serialStream) init() error {
s.common.sendDisconnect() if err := s.common.init("serial", 50002); err != nil {
return err
}
return nil
}
func (s *serialStream) deinit() {
if s.deinitNeededChan != nil {
s.deinitNeededChan <- true
<-s.deinitFinishedChan
}
s.common.deinit()
s.serialPort.deinit()
} }

View file

@ -15,22 +15,23 @@ import (
const expectTimeoutDuration = time.Second const expectTimeoutDuration = time.Second
type streamCommon struct { type streamCommon struct {
name string name string
conn *net.UDPConn conn *net.UDPConn
localSID uint32 localSID uint32
remoteSID uint32 remoteSID uint32
gotRemoteSID bool gotRemoteSID bool
readChan chan []byte readChan chan []byte
readerClosedChan chan bool readerCloseNeededChan chan bool
readerCloseFinishedChan chan bool
pkt7 pkt7Type pkt7 pkt7Type
} }
func (s *streamCommon) send(d []byte) { func (s *streamCommon) send(d []byte) error {
_, err := s.conn.Write(d) if _, err := s.conn.Write(d); err != nil {
if err != nil { return err
exit(err)
} }
return nil
} }
func (s *streamCommon) read() ([]byte, error) { func (s *streamCommon) read() ([]byte, error) {
@ -43,15 +44,21 @@ func (s *streamCommon) reader() {
for { for {
r, err := s.read() r, err := s.read()
if err != nil { if err != nil {
break reportError(err)
} } else if s.pkt7.isPkt7(r) {
if s.pkt7.isPkt7(r) { if err := s.pkt7.handle(s, r); err != nil {
s.pkt7.handle(s, r) reportError(err)
}
continue
} }
s.readChan <- r select {
case s.readChan <- r:
case <-s.readerCloseNeededChan:
s.readerCloseFinishedChan <- true
return
}
} }
s.readerClosedChan <- true
} }
func (s *streamCommon) tryReceivePacket(timeout time.Duration, packetLength, matchStartByte int, b []byte) []byte { func (s *streamCommon) tryReceivePacket(timeout time.Duration, packetLength, matchStartByte int, b []byte) []byte {
@ -71,26 +78,85 @@ func (s *streamCommon) tryReceivePacket(timeout time.Duration, packetLength, mat
return r return r
} }
func (s *streamCommon) expect(packetLength int, b []byte) []byte { func (s *streamCommon) expect(packetLength int, b []byte) ([]byte, error) {
r := s.tryReceivePacket(expectTimeoutDuration, packetLength, 0, b) r := s.tryReceivePacket(expectTimeoutDuration, packetLength, 0, b)
if r == nil { if r == nil {
exit(errors.New(s.name + "/expect timeout")) return nil, errors.New(s.name + "/expect timeout")
} }
return r return r, nil
} }
func (s *streamCommon) open(name string, portNumber int) { func (s *streamCommon) sendPkt3() error {
p := []byte{0x10, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00,
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
if err := s.send(p); err != nil {
return err
}
if err := s.send(p); err != nil {
return err
}
return nil
}
func (s *streamCommon) waitForPkt4Answer() error {
log.Debug(s.name + "/expecting a pkt4 answer")
// Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x8c, 0x7d, 0x45, 0x7a, 0x1d, 0xf6, 0xe9, 0x0b
r, err := s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00})
if err != nil {
return err
}
s.remoteSID = binary.BigEndian.Uint32(r[8:12])
s.gotRemoteSID = true
return nil
}
func (s *streamCommon) sendPkt6() error {
p := []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00,
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
if err := s.send(p); err != nil {
return err
}
if err := s.send(p); err != nil {
return err
}
return nil
}
func (s *streamCommon) waitForPkt6Answer() error {
log.Debug(s.name + "/expecting pkt6 answer")
// Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0xe8, 0xd0, 0x44, 0x50, 0xa0, 0x61, 0x39, 0xbe
_, err := s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00})
return err
}
func (s *streamCommon) sendDisconnect() error {
log.Print(s.name + "/disconnecting")
p := []byte{0x10, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00,
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
if err := s.send(p); err != nil {
return err
}
if err := s.send(p); err != nil {
return err
}
return nil
}
func (s *streamCommon) init(name string, portNumber int) error {
s.name = name s.name = name
hostPort := fmt.Sprint(connectAddress, ":", portNumber) hostPort := fmt.Sprint(connectAddress, ":", portNumber)
log.Print(s.name+"/connecting to ", hostPort) log.Print(s.name+"/connecting to ", hostPort)
raddr, err := net.ResolveUDPAddr("udp", hostPort) raddr, err := net.ResolveUDPAddr("udp", hostPort)
if err != nil { if err != nil {
exit(err) return err
} }
s.conn, err = net.DialUDP("udp", nil, raddr) s.conn, err = net.DialUDP("udp", nil, raddr)
if err != nil { if err != nil {
exit(err) return err
} }
// Constructing the local session ID by combining the local IP address and port. // Constructing the local session ID by combining the local IP address and port.
@ -99,71 +165,25 @@ func (s *streamCommon) open(name string, portNumber int) {
_, err = rand.Read(s.pkt7.randIDBytes[:]) _, err = rand.Read(s.pkt7.randIDBytes[:])
if err != nil { if err != nil {
exit(err) return err
} }
s.readChan = make(chan []byte) s.readChan = make(chan []byte)
s.readerClosedChan = make(chan bool) s.readerCloseNeededChan = make(chan bool)
s.readerCloseFinishedChan = make(chan bool)
go s.reader() go s.reader()
return nil
} }
func (s *streamCommon) close() { func (s *streamCommon) deinit() {
s.sendDisconnect() s.pkt7.stopPeriodicSend()
if s.gotRemoteSID && s.conn != nil {
_ = s.sendDisconnect()
}
s.conn.Close() s.conn.Close()
// Depleting the read channel. if s.readerCloseNeededChan != nil {
var finished bool s.readerCloseNeededChan <- true
for !finished { <-s.readerCloseFinishedChan
select {
case <-s.readChan:
default:
finished = true
}
} }
// Waiting for the reader to finish.
<-s.readerClosedChan
}
func (s *streamCommon) sendPkt3() {
p := []byte{0x10, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00,
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
s.send(p)
s.send(p)
}
func (s *streamCommon) waitForPkt4Answer() {
log.Debug(s.name + "/expecting a pkt4 answer")
// Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x8c, 0x7d, 0x45, 0x7a, 0x1d, 0xf6, 0xe9, 0x0b
r := s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00})
s.remoteSID = binary.BigEndian.Uint32(r[8:12])
s.gotRemoteSID = true
}
func (s *streamCommon) sendPkt6() {
p := []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00,
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
s.send(p)
s.send(p)
}
func (s *streamCommon) waitForPkt6Answer() {
log.Debug(s.name + "/expecting pkt6 answer")
// Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0xe8, 0xd0, 0x44, 0x50, 0xa0, 0x61, 0x39, 0xbe
s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00})
}
func (s *streamCommon) sendDisconnect() {
if !s.gotRemoteSID || s.conn == nil {
return
}
log.Print(s.name + "/disconnecting")
p := []byte{0x10, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00,
byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),
byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID)}
s.send(p)
s.send(p)
} }