From dec1be0c413f5bda3924a2358c445a60c05fb2f7 Mon Sep 17 00:00:00 2001 From: Nonoo Date: Sun, 18 Oct 2020 14:21:58 +0200 Subject: [PATCH] Move pkt7 handling to a separate file --- audiostream.go | 29 +++---------- controlstream.go | 52 +++------------------- pkt7.go | 109 +++++++++++++++++++++++++++++++++++++++++++++++ streamcommon.go | 48 ++++----------------- 4 files changed, 129 insertions(+), 109 deletions(-) create mode 100644 pkt7.go diff --git a/audiostream.go b/audiostream.go index 9c50497..8904c6c 100644 --- a/audiostream.go +++ b/audiostream.go @@ -1,10 +1,6 @@ package main -import ( - "bytes" - "encoding/binary" - "time" -) +import "github.com/nonoo/kappanhang/log" type audioStream struct { common streamCommon @@ -15,19 +11,7 @@ func (s *audioStream) sendDisconnect() { } func (s *audioStream) handleRead(r []byte) { - switch len(r) { - case 21: - if bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) { // Note that the first byte can be 0x15 or 0x00, so we ignore that. - gotSeq := binary.LittleEndian.Uint16(r[6:8]) - if r[16] == 0x00 { // This is a pkt7 request from the radio. - // Replying to the radio. - // Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00 - // Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00 - s.common.sendPkt7Reply(r[17:21], gotSeq) - } else { // This is a pkt7 reply to our request. - } - } - } + // TODO } func (s *audioStream) start() { @@ -38,19 +22,16 @@ func (s *audioStream) start() { s.common.sendPkt6() s.common.waitForPkt6Answer() - s.common.pkt7.sendSeq = 1 + log.Print("stream opened") - pingTicker := time.NewTicker(100 * time.Millisecond) + s.common.pkt7.sendSeq = 1 + s.common.pkt7.startPeriodicSend(&s.common) var r []byte for { select { case r = <-s.common.readChan: s.handleRead(r) - case <-pingTicker.C: - // s.expectedPkt7ReplySeq = s.common.pkt7.sendSeq - // s.lastPkt7SendAt = time.Now() - s.common.sendPkt7() } } } diff --git a/controlstream.go b/controlstream.go index b030856..dcb1fc9 100644 --- a/controlstream.go +++ b/controlstream.go @@ -10,8 +10,6 @@ import ( "github.com/nonoo/kappanhang/log" ) -const pkt7TimeoutDuration = 3 * time.Second - type controlStream struct { common streamCommon authSendSeq uint16 @@ -20,10 +18,6 @@ type controlStream struct { serialAndAudioStreamOpened bool requestSerialAndAudioTimeout *time.Timer - - pkt7TimeoutTimer *time.Timer - pkt7Latency time.Duration - lastPkt7SendAt time.Time } func (s *controlStream) sendPktAuth() { @@ -147,34 +141,6 @@ func (s *controlStream) sendRequestSerialAndAudio() { func (s *controlStream) handleRead(r []byte) { switch len(r) { - case 21: - if bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) { // Note that the first byte can be 0x15 or 0x00, so we ignore that. - gotSeq := binary.LittleEndian.Uint16(r[6:8]) - if r[16] == 0x00 { // This is a pkt7 request from the radio. - // Replying to the radio. - // Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00 - // Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00 - s.common.sendPkt7Reply(r[17:21], gotSeq) - } else { // This is a pkt7 reply to our request. - s.pkt7TimeoutTimer.Stop() - s.pkt7TimeoutTimer.Reset(pkt7TimeoutDuration) - - s.pkt7Latency += time.Since(s.lastPkt7SendAt) - s.pkt7Latency /= 2 - - expectedSeq := s.common.pkt7.lastConfirmedSeq + 1 - if expectedSeq != gotSeq { - var missingPkts int - if gotSeq > expectedSeq { - missingPkts = int(gotSeq) - int(expectedSeq) - } else { - missingPkts = int(gotSeq) + 65536 - int(expectedSeq) - } - log.Error("lost ", missingPkts, " packets ") - } - s.common.pkt7.lastConfirmedSeq = gotSeq - } - } case 16: if bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00}) { // Replying to the radio. @@ -237,7 +203,7 @@ func (s *controlStream) start() { s.common.sendPkt3() s.common.pkt7.sendSeq = 1 - s.common.sendPkt7() + s.common.pkt7.send(&s.common) s.common.sendPkt3() s.common.waitForPkt4Answer() s.common.sendPkt6() @@ -246,8 +212,6 @@ func (s *controlStream) start() { s.authSendSeq = 1 s.authInnerSendSeq = 0x1234 s.sendPktAuth() - s.common.pkt7.sendSeq = 5 - s.common.pkt7.lastConfirmedSeq = s.common.pkt7.sendSeq - 1 log.Debug("expecting auth answer") // Example success auth packet: 0x60, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, @@ -273,8 +237,10 @@ func (s *controlStream) start() { s.sendPkt0() s.sendRequestSerialAndAudio() - pkt7SendTicker := time.NewTicker(100 * time.Millisecond) - s.pkt7TimeoutTimer = time.NewTimer(pkt7TimeoutDuration) + s.common.pkt7.sendSeq = 5 + s.common.pkt7.startPeriodicSend(&s.common) + + pkt0SendTicker := time.NewTicker(100 * time.Millisecond) reauthTicker := time.NewTicker(60 * time.Second) statusLogTicker := time.NewTicker(3 * time.Second) @@ -282,16 +248,12 @@ func (s *controlStream) start() { select { case r = <-s.common.readChan: s.handleRead(r) - case <-pkt7SendTicker.C: - s.lastPkt7SendAt = time.Now() - s.common.sendPkt7() + case <-pkt0SendTicker.C: s.sendPkt0() - case <-s.pkt7TimeoutTimer.C: - log.Fatal("ping timeout") case <-reauthTicker.C: s.sendPktReauth(false) case <-statusLogTicker.C: - log.Print("roundtrip latency ", s.pkt7Latency) + log.Print("roundtrip latency ", s.common.pkt7.latency) } } } diff --git a/pkt7.go b/pkt7.go new file mode 100644 index 0000000..f0de159 --- /dev/null +++ b/pkt7.go @@ -0,0 +1,109 @@ +package main + +import ( + "bytes" + "crypto/rand" + "encoding/binary" + "time" + + "github.com/nonoo/kappanhang/log" +) + +type pkt7Type struct { + sendSeq uint16 + randIDByte [1]byte + lastConfirmedSeq uint16 + + sendTicker *time.Ticker + timeoutTimer *time.Timer + latency time.Duration + lastSendAt time.Time +} + +func (p *pkt7Type) isPkt7(r []byte) bool { + return len(r) == 21 && bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) // Note that the first byte can be 0x15 or 0x00, so we ignore that. +} + +func (p *pkt7Type) handle(s *streamCommon, r []byte) { + gotSeq := binary.LittleEndian.Uint16(r[6:8]) + if r[16] == 0x00 { // This is a pkt7 request from the radio. + // Replying to the radio. + // Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00 + // Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00 + p.sendReply(s, r[17:21], gotSeq) + } else { // This is a pkt7 reply to our request. + if p.timeoutTimer != nil { + p.timeoutTimer.Stop() + p.timeoutTimer.Reset(pkt7TimeoutDuration) + + // Only measure latency after the timeout has been initialized, so the auth is already done. + p.latency += time.Since(p.lastSendAt) + p.latency /= 2 + } + + expectedSeq := p.lastConfirmedSeq + 1 + if expectedSeq != gotSeq { + var missingPkts int + if gotSeq > expectedSeq { + missingPkts = int(gotSeq) - int(expectedSeq) + } else { + missingPkts = int(gotSeq) + 65536 - int(expectedSeq) + } + log.Error(s.name+"/lost ", missingPkts, " packets ") + } + p.lastConfirmedSeq = gotSeq + } +} + +func (p *pkt7Type) sendDo(s *streamCommon, replyID []byte, seq uint16) { + // Example request from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x00, 0x78, 0x40, 0xf6, 0x02 + // Example reply from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x01, 0x78, 0x40, 0xf6, 0x02 + var replyFlag byte + if replyID == nil { + replyID = make([]byte, 4) + var randID [2]byte + _, err := rand.Read(randID[:]) + if err != nil { + log.Fatal(err) + } + replyID[0] = randID[0] + replyID[1] = randID[1] + replyID[2] = p.randIDByte[0] + replyID[3] = 0x03 + } else { + replyFlag = 0x01 + } + + s.send([]byte{0x15, 0x00, 0x00, 0x00, 0x07, 0x00, byte(seq), byte(seq >> 8), + byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), + byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID), + replyFlag, replyID[0], replyID[1], replyID[2], replyID[3]}) +} + +func (p *pkt7Type) send(s *streamCommon) { + p.sendDo(s, nil, p.sendSeq) + p.lastSendAt = time.Now() + p.sendSeq++ +} + +func (p *pkt7Type) sendReply(s *streamCommon, replyID []byte, seq uint16) { + p.sendDo(s, replyID, seq) +} + +func (p *pkt7Type) startPeriodicSend(s *streamCommon) { + p.lastConfirmedSeq = p.sendSeq - 1 + + p.sendTicker = time.NewTicker(100 * time.Millisecond) + p.timeoutTimer = time.NewTimer(pkt7TimeoutDuration) + + go func() { + for { + select { + case <-p.sendTicker.C: + p.send(s) + case <-p.timeoutTimer.C: + log.Fatal(s.name + "ping timeout") + } + } + }() +} diff --git a/streamcommon.go b/streamcommon.go index 9d0645f..ad6b7a8 100644 --- a/streamcommon.go +++ b/streamcommon.go @@ -11,6 +11,8 @@ import ( "github.com/nonoo/kappanhang/log" ) +const pkt7TimeoutDuration = 3 * time.Second + type streamCommon struct { name string conn *net.UDPConn @@ -18,11 +20,7 @@ type streamCommon struct { remoteSID uint32 readChan chan []byte - pkt7 struct { - sendSeq uint16 - randIDByte [1]byte - lastConfirmedSeq uint16 - } + pkt7 pkt7Type } func (s *streamCommon) send(d []byte) { @@ -51,6 +49,10 @@ func (s *streamCommon) reader() { for { r, err := s.read() if err == nil { + if s.pkt7.isPkt7(r) { + s.pkt7.handle(s, r) + } + s.readChan <- r } else { errCount++ @@ -127,45 +129,11 @@ func (s *streamCommon) sendPkt6() { } func (s *streamCommon) waitForPkt6Answer() { - log.Debug("expecting pkt6 answer") + log.Debug(s.name + "/expecting pkt6 answer") // Example answer from radio: 0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0xe8, 0xd0, 0x44, 0x50, 0xa0, 0x61, 0x39, 0xbe s.expect(16, []byte{0x10, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00}) } -func (s *streamCommon) sendPkt7Do(replyID []byte, seq uint16) { - // Example request from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x00, 0x78, 0x40, 0xf6, 0x02 - // Example reply from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x09, 0x00, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x01, 0x78, 0x40, 0xf6, 0x02 - var replyFlag byte - if replyID == nil { - replyID = make([]byte, 4) - var randID [2]byte - _, err := rand.Read(randID[:]) - if err != nil { - log.Fatal(err) - } - replyID[0] = randID[0] - replyID[1] = randID[1] - replyID[2] = s.pkt7.randIDByte[0] - replyID[3] = 0x03 - } else { - replyFlag = 0x01 - } - - s.send([]byte{0x15, 0x00, 0x00, 0x00, 0x07, 0x00, byte(seq), byte(seq >> 8), - byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID), - byte(s.remoteSID >> 24), byte(s.remoteSID >> 16), byte(s.remoteSID >> 8), byte(s.remoteSID), - replyFlag, replyID[0], replyID[1], replyID[2], replyID[3]}) -} - -func (s *streamCommon) sendPkt7() { - s.sendPkt7Do(nil, s.pkt7.sendSeq) - s.pkt7.sendSeq++ -} - -func (s *streamCommon) sendPkt7Reply(replyID []byte, seq uint16) { - s.sendPkt7Do(replyID, seq) -} - func (s *streamCommon) sendDisconnect() { s.send([]byte{0x10, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, byte(s.localSID >> 24), byte(s.localSID >> 16), byte(s.localSID >> 8), byte(s.localSID),