some work on multibytes

This commit is contained in:
Florent 2026-03-06 10:40:14 -04:00
parent f57cb66277
commit c378319252
2 changed files with 66 additions and 15 deletions

View file

@ -105,24 +105,46 @@ class ContactCommands(CommandHandlerBase):
data = b"\x0f" + key_bytes
return await self.send(data, [EventType.OK, EventType.ERROR])
async def update_contact(self, contact, path=None, flags=None) -> Event:
async def update_contact(self, contact, path=None, flags=None, path_hash_mode=None) -> Event:
if path is None:
out_path_hex = contact["out_path"]
out_path_len = contact["out_path_len"]
out_path_hash_mode = contact["out_path_hash_mode"]
else:
path_hash_size = 1
res = await self.send_device_query()
if not res is None and res.type != EventType.ERROR:
if "path_hash_mode" in res.payload:
path_hash_size = res.payload["path_hash_mode"] + 1
if path_hash_mode is None: # not specified when calling func
if ":" in path: # mode specified in path string
path_hash_mode = int(path.split(":")[1])
path = path.split(":")[0].replace(":","")
else: # use device one by default
path_hash_mode = contact["out_path_len"] >> 6 # would fallback to previous val
res = await self.send_device_query()
if not res is None and res.type != EventType.ERROR:
if "path_hash_mode" in res.payload:
path_hash_size = res.payload["path_hash_mode"] + 1
else:
if ":" in path: # remove as it has been specified in args
path = path.split(":")[0].replace(":","")
path_hash_size = path_hash_mode + 1
print(path_hash_size)
out_path_hex = path
out_path_len = int(len(path) / (2 * path_hash_size))
out_path_hash_mode = path_hash_mode
print(f"Setting {contact["adv_name"]} path to {out_path_hex} with mode {out_path_hash_mode}")
# reflect the change
contact["out_path_hash_mode"] = path_hash_mode
contact["out_path"] = out_path_hex
contact["out_path_len"] = out_path_len
out_path_hex = out_path_hex + (128 - len(out_path_hex)) * "0"
if out_path_len == -1: # path did not change and contact was flood
out_path_len = 255 # we are signed
else:
out_path_len = out_path_len | (path_hash_mode << 6)
if flags is None:
flags = contact["flags"]
@ -137,7 +159,7 @@ class ContactCommands(CommandHandlerBase):
+ bytes.fromhex(contact["public_key"])
+ contact["type"].to_bytes(1, "little")
+ flags.to_bytes(1, "little")
+ out_path_len.to_bytes(1, "little", signed=True)
+ int(out_path_len).to_bytes(1, "little", signed=False)
+ bytes.fromhex(out_path_hex)
+ bytes.fromhex(adv_name_hex)
+ contact["last_advert"].to_bytes(4, "little")
@ -149,8 +171,8 @@ class ContactCommands(CommandHandlerBase):
async def add_contact(self, contact) -> Event:
return await self.update_contact(contact)
async def change_contact_path(self, contact, path) -> Event:
return await self.update_contact(contact, path)
async def change_contact_path(self, contact, path, path_hash_mode=None) -> Event:
return await self.update_contact(contact, path, path_hash_mode)
async def change_contact_flags(self, contact, flags) -> Event:
return await self.update_contact(contact, flags=flags)
@ -162,4 +184,3 @@ class ContactCommands(CommandHandlerBase):
async def get_autoadd_config(self) -> Event:
data = b"\x3B"
return await self.send(data, [EventType.AUTOADD_CONFIG, EventType.ERROR])

View file

@ -99,7 +99,13 @@ class MessageReader:
c["public_key"] = dbuf.read(32).hex()
c["type"] = dbuf.read(1)[0]
c["flags"] = dbuf.read(1)[0]
c["out_path_len"] = int.from_bytes(dbuf.read(1), signed=True, byteorder="little")
plen = int.from_bytes(dbuf.read(1), signed=False, byteorder="little")
if plen == 255: # flood
c["out_path_hash_mode"] = -1
c["out_path_len"] = -1 # 6 LSB
else:
c["out_path_hash_mode"] = plen >> 6
c["out_path_len"] = plen & 0x3F # 6 LSB
c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex()
c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "")
c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little")
@ -173,7 +179,13 @@ class MessageReader:
res = {}
res["type"] = "PRIV"
res["pubkey_prefix"] = dbuf.read(6).hex()
res["path_len"] = dbuf.read(1)[0]
plen = dbuf.read(1)[0]
if plen == 255 : # direct message
res["path_hash_mode"] = -1
res["path_len"] = plen
else:
res["path_hash_mode"] = plen >> 6
res["path_len"] = plen & 0x3F
txt_type = dbuf.read(1)[0]
res["txt_type"] = txt_type
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
@ -196,7 +208,13 @@ class MessageReader:
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
dbuf.read(2) # reserved
res["pubkey_prefix"] = dbuf.read(6).hex()
res["path_len"] = dbuf.read(1)[0]
plen = dbuf.read(1)[0]
if plen == 255 : # direct message
res["path_hash_mode"] = -1
res["path_len"] = plen
else:
res["path_hash_mode"] = plen >> 6
res["path_len"] = plen & 0x3F
txt_type = dbuf.read(1)[0]
res["txt_type"] = txt_type
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
@ -217,7 +235,13 @@ class MessageReader:
res = {}
res["type"] = "CHAN"
res["channel_idx"] = dbuf.read(1)[0]
res["path_len"] = dbuf.read(1)[0]
plen = dbuf.read(1)[0]
if plen == 255 : # direct message
res["path_hash_mode"] = -1
res["path_len"] = plen
else:
res["path_hash_mode"] = plen >> 6
res["path_len"] = plen & 0x3F
res["txt_type"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False)
text = dbuf.read().strip(b"\0")
@ -249,7 +273,13 @@ class MessageReader:
res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4
dbuf.read(2) # reserved
res["channel_idx"] = dbuf.read(1)[0]
res["path_len"] = dbuf.read(1)[0]
plen = dbuf.read(1)[0]
if plen == 255 : # direct message
res["path_hash_mode"] = -1
res["path_len"] = plen
else:
res["path_hash_mode"] = plen >> 6
res["path_len"] = plen & 0x3F
res["txt_type"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False)
text = dbuf.read()