From e94381886817d8f0a50afb26b3d35658affe347f Mon Sep 17 00:00:00 2001 From: Nonoo Date: Sun, 18 Oct 2020 10:53:16 +0200 Subject: [PATCH] Use a goroutine reader --- controlstream.go | 190 ++++++++++++++++++++++---------------------- streamconnection.go | 17 ++++ 2 files changed, 110 insertions(+), 97 deletions(-) diff --git a/controlstream.go b/controlstream.go index ea9421a..7780900 100644 --- a/controlstream.go +++ b/controlstream.go @@ -17,7 +17,6 @@ type controlStream struct { authID [6]byte randIDByteForPktSeven [1]byte expectedPkt7ReplySeq uint16 - lastReauthAt time.Time } func (p *controlStream) sendPkt7(replyID []byte, seq uint16) { @@ -111,7 +110,6 @@ func (p *controlStream) sendPktReauth(firstReauthSend bool) { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) p.authSendSeq++ p.authInnerSendSeq++ - p.lastReauthAt = time.Now() } func (p *controlStream) SendDisconnect() { @@ -154,6 +152,84 @@ func (p *controlStream) sendRequestSerialAndAudio() { p.authInnerSendSeq++ } +func (p *controlStream) handleRead(r []byte) { + switch len(r) { + case 21: + if bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) { + gotSeq := binary.LittleEndian.Uint16(r[6:8]) + if r[16] == 0x00 { // This is a pkt7 request from the radio. + // Replying to the radio. + // Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00 + // Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00 + p.sendPkt7(r[17:21], gotSeq) + } else { + if p.expectedPkt7ReplySeq != gotSeq { // TODO + var missingPkts int + if gotSeq > p.expectedPkt7ReplySeq { + missingPkts = int(gotSeq) - int(p.expectedPkt7ReplySeq) + } else { + missingPkts = int(gotSeq) + 65536 - int(p.expectedPkt7ReplySeq) + } + if missingPkts < 1000 { + log.Error("lost ", missingPkts, " packets ", gotSeq, " ", p.expectedPkt7ReplySeq) + } + } + } + } + case 16: + if bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00}) { + // Replying to the radio. + // Example request from radio: 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x13, 0x00, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63 + // Example answer from PC: 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x13, 0x00, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72 + gotSeq := binary.LittleEndian.Uint16(r[6:8]) + p.stream.send([]byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(gotSeq), byte(gotSeq >> 8), + byte(p.stream.localSID >> 24), byte(p.stream.localSID >> 16), byte(p.stream.localSID >> 8), byte(p.stream.localSID), + byte(p.stream.remoteSID >> 24), byte(p.stream.remoteSID >> 16), byte(p.stream.remoteSID >> 8), byte(p.stream.remoteSID)}) + } + case 80: + if bytes.Equal(r[:6], []byte{0x50, 0x00, 0x00, 0x00, 0x00, 0x00}) && bytes.Equal(r[48:51], []byte{0xff, 0xff, 0xff}) { + // Example answer from radio: 0x50, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, + // 0x86, 0x1f, 0x2f, 0xcc, 0x03, 0x03, 0x89, 0x29, + // 0x00, 0x00, 0x00, 0x40, 0x02, 0x03, 0x00, 0x52, + // 0x00, 0x00, 0xf8, 0xad, 0x06, 0x8d, 0xda, 0x7b, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, + // 0x80, 0x00, 0x00, 0x90, 0xc7, 0x0e, 0x86, 0x01, + // 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 + + log.Error("reauth failed") + p.SendDisconnect() + os.Exit(1) + } + case 144: + if bytes.Equal(r[:6], []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00}) && r[96] == 1 { + // Example answer: + // 0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x00, + // 0xc6, 0x5f, 0x6f, 0x0c, 0x5f, 0x8b, 0x1e, 0x89, + // 0x00, 0x00, 0x00, 0x80, 0x03, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x31, 0x30, 0x31, 0x47, 0x39, 0x07, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, + // 0x80, 0x00, 0x00, 0x90, 0xc7, 0x0e, 0x86, 0x01, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x49, 0x43, 0x2d, 0x37, 0x30, 0x35, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x01, 0x00, 0x00, 0x00, 0x69, 0x63, 0x6f, 0x6d, + // 0x2d, 0x70, 0x63, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // 0x00, 0x00, 0x00, 0x00, 0xc0, 0xa8, 0x03, 0x03, + // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 + log.Print("serial and audio request success") + go streams.audio.Start() + } + } +} + func (p *controlStream) Start() { p.stream.open(50001) @@ -203,110 +279,30 @@ func (p *controlStream) Start() { p.sendPktReauth(true) time.AfterFunc(time.Second*2, p.sendRequestSerialAndAudio) - var lastPingAt time.Time - var lastStatusLog time.Time - var errCount int - _, err := rand.Read(p.randIDByteForPktSeven[:]) if err != nil { log.Fatal(err) } + readChan := make(chan []byte) + go p.stream.reader(readChan) + + pingTicker := time.NewTicker(100 * time.Millisecond) + reauthTicker := time.NewTicker(60 * time.Second) + statusLogTicker := time.NewTicker(10 * time.Second) + for { - r, err := p.stream.read() - if err != nil { - errCount++ - if errCount > 5 { - log.Fatal("timeout") - } - log.Error("stream break detected") - } - errCount = 0 - - if len(r) == 21 && bytes.Equal(r[1:6], []byte{0x00, 0x00, 0x00, 0x07, 0x00}) { - gotSeq := binary.LittleEndian.Uint16(r[6:8]) - if r[16] == 0x00 { // This is a pkt7 request from the radio. - // Replying to the radio. - // Example request from radio: 0x00, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63, 0x00, 0x57, 0x2b, 0x12, 0x00 - // Example answer from PC: 0x15, 0x00, 0x00, 0x00, 0x07, 0x00, 0x1c, 0x0e, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72, 0x01, 0x57, 0x2b, 0x12, 0x00 - p.sendPkt7(r[17:21], gotSeq) - } else { - if p.expectedPkt7ReplySeq != gotSeq { // TODO - var missingPkts int - if gotSeq > p.expectedPkt7ReplySeq { - missingPkts = int(gotSeq) - int(p.expectedPkt7ReplySeq) - } else { - missingPkts = int(gotSeq) + 65536 - int(p.expectedPkt7ReplySeq) - } - if missingPkts < 1000 { - log.Error("lost ", missingPkts, " packets ", gotSeq, " ", p.expectedPkt7ReplySeq) - } - } - } - } - if len(r) == 16 && bytes.Equal(r[:6], []byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00}) { - // Replying to the radio. - // Example request from radio: 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x13, 0x00, 0xe4, 0x35, 0xdd, 0x72, 0xbe, 0xd9, 0xf2, 0x63 - // Example answer from PC: 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x13, 0x00, 0xbe, 0xd9, 0xf2, 0x63, 0xe4, 0x35, 0xdd, 0x72 - gotSeq := binary.LittleEndian.Uint16(r[6:8]) - p.stream.send([]byte{0x10, 0x00, 0x00, 0x00, 0x00, 0x00, byte(gotSeq), byte(gotSeq >> 8), - byte(p.stream.localSID >> 24), byte(p.stream.localSID >> 16), byte(p.stream.localSID >> 8), byte(p.stream.localSID), - byte(p.stream.remoteSID >> 24), byte(p.stream.remoteSID >> 16), byte(p.stream.remoteSID >> 8), byte(p.stream.remoteSID)}) - } - if len(r) == 80 && bytes.Equal(r[:6], []byte{0x50, 0x00, 0x00, 0x00, 0x00, 0x00}) && bytes.Equal(r[48:51], []byte{0xff, 0xff, 0xff}) { - // Example answer from radio: 0x50, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, - // 0x86, 0x1f, 0x2f, 0xcc, 0x03, 0x03, 0x89, 0x29, - // 0x00, 0x00, 0x00, 0x40, 0x02, 0x03, 0x00, 0x52, - // 0x00, 0x00, 0xf8, 0xad, 0x06, 0x8d, 0xda, 0x7b, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, - // 0x80, 0x00, 0x00, 0x90, 0xc7, 0x0e, 0x86, 0x01, - // 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 - - log.Error("reauth failed") - p.SendDisconnect() - os.Exit(1) - } - if len(r) == 144 && bytes.Equal(r[:6], []byte{0x90, 0x00, 0x00, 0x00, 0x00, 0x00}) && r[96] == 1 { - // Example answer: - // 0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x00, - // 0xc6, 0x5f, 0x6f, 0x0c, 0x5f, 0x8b, 0x1e, 0x89, - // 0x00, 0x00, 0x00, 0x80, 0x03, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x31, 0x30, 0x31, 0x47, 0x39, 0x07, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, - // 0x80, 0x00, 0x00, 0x90, 0xc7, 0x0e, 0x86, 0x01, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x49, 0x43, 0x2d, 0x37, 0x30, 0x35, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x01, 0x00, 0x00, 0x00, 0x69, 0x63, 0x6f, 0x6d, - // 0x2d, 0x70, 0x63, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - // 0x00, 0x00, 0x00, 0x00, 0xc0, 0xa8, 0x03, 0x03, - // 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 - log.Print("serial and audio request success") - go streams.audio.Start() - } - - if time.Since(lastPingAt) >= 100*time.Millisecond { + select { + case r = <-readChan: + p.handleRead(r) + case <-pingTicker.C: p.sendPkt7(nil, p.stream.sendSeq) p.stream.sendPkt3() p.stream.sendSeq++ - lastPingAt = time.Now() - - if time.Since(p.lastReauthAt) >= 60*time.Second { - p.sendPktReauth(false) - } - - if time.Since(lastStatusLog) >= 10*time.Second { - log.Print("still connected") - lastStatusLog = time.Now() - } + case <-reauthTicker.C: + p.sendPktReauth(false) + case <-statusLogTicker.C: + log.Print("still connected") } } } diff --git a/streamconnection.go b/streamconnection.go index 4d372da..f06590b 100644 --- a/streamconnection.go +++ b/streamconnection.go @@ -37,6 +37,23 @@ func (p *streamConnection) read() ([]byte, error) { return b[:n], err } +func (p *streamConnection) reader(c chan []byte) { + var errCount int + for { + r, err := p.read() + if err == nil { + c <- r + } else { + errCount++ + if errCount > 5 { + log.Fatal("timeout") + } + log.Error("stream break detected") + } + errCount = 0 + } +} + func (p *streamConnection) expect(packetLength int, b []byte) []byte { var r []byte expectStart := time.Now()