mirror of
https://github.com/agessaman/meshcore-packet-capture.git
synced 2026-04-20 23:23:37 +00:00
Add MeshCore Auth Token Generator script
This script generates JWT authentication tokens signed with Ed25519 private keys, implementing the necessary cryptographic functions and command-line interface.
This commit is contained in:
parent
4cb24a72c6
commit
665a97abbf
1 changed files with 245 additions and 0 deletions
245
meshcore-decoder
Normal file
245
meshcore-decoder
Normal file
|
|
@ -0,0 +1,245 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
MeshCore Auth Token Generator
|
||||
Generate JWT authentication tokens signed with Ed25519 private keys
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import base64
|
||||
import time
|
||||
import argparse
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Ed25519 field prime
|
||||
p = 2**255 - 19
|
||||
|
||||
# Ed25519 group order
|
||||
l = 2**252 + 27742317777372353535851937790883648493
|
||||
|
||||
# Ed25519 curve parameter
|
||||
d = -121665 * pow(121666, p - 2, p) % p
|
||||
|
||||
def clamp_scalar(scalar):
|
||||
"""Clamp scalar according to Ed25519 specification."""
|
||||
scalar_bytes = bytearray(scalar)
|
||||
scalar_bytes[0] &= 0xF8
|
||||
scalar_bytes[31] &= 0x7F
|
||||
scalar_bytes[31] |= 0x40
|
||||
return bytes(scalar_bytes)
|
||||
|
||||
def recover_x(y):
|
||||
"""Recover x coordinate from y coordinate on Ed25519 curve."""
|
||||
xx = (y * y - 1) * pow(d * y * y + 1, p - 2, p)
|
||||
x = pow(xx, (p + 3) // 8, p)
|
||||
if (x * x - xx) % p != 0:
|
||||
x = (x * pow(2, (p - 1) // 4, p)) % p
|
||||
if x % 2 != 0:
|
||||
x = p - x
|
||||
return x
|
||||
|
||||
def edwards_add(P, Q):
|
||||
"""Add two points on Edwards curve."""
|
||||
x1, y1 = P
|
||||
x2, y2 = Q
|
||||
x3 = ((x1 * y2 + x2 * y1) * pow(1 + d * x1 * x2 * y1 * y2, p - 2, p)) % p
|
||||
y3 = ((y1 * y2 + x1 * x2) * pow(1 - d * x1 * x2 * y1 * y2, p - 2, p)) % p
|
||||
return (x3, y3)
|
||||
|
||||
def scalar_mult(scalar, P):
|
||||
"""Multiply point P by scalar using double-and-add."""
|
||||
if isinstance(scalar, bytes):
|
||||
n = int.from_bytes(scalar, 'little')
|
||||
else:
|
||||
n = scalar
|
||||
|
||||
Q = (0, 1)
|
||||
while n > 0:
|
||||
if n & 1:
|
||||
Q = edwards_add(Q, P)
|
||||
P = edwards_add(P, P)
|
||||
n >>= 1
|
||||
return Q
|
||||
|
||||
def encode_point(P):
|
||||
"""Encode point to 32-byte representation."""
|
||||
x, y = P
|
||||
result = bytearray(y.to_bytes(32, 'little'))
|
||||
result[31] |= ((x & 1) << 7)
|
||||
return bytes(result)
|
||||
|
||||
# Base point
|
||||
By = 4 * pow(5, p - 2, p) % p
|
||||
Bx = recover_x(By)
|
||||
|
||||
def sign_message(message_bytes, private_key_bytes):
|
||||
"""Sign a message using Ed25519 signature scheme."""
|
||||
scalar = clamp_scalar(private_key_bytes[:32])
|
||||
|
||||
h = hashlib.sha512(private_key_bytes[32:64] + message_bytes).digest()
|
||||
r = int.from_bytes(h, 'little') % l
|
||||
|
||||
R = scalar_mult(r, (Bx, By))
|
||||
A = scalar_mult(scalar, (Bx, By))
|
||||
|
||||
encoded_R = encode_point(R)
|
||||
encoded_A = encode_point(A)
|
||||
h_bytes = hashlib.sha512(encoded_R + encoded_A + message_bytes).digest()
|
||||
h_int = int.from_bytes(h_bytes, 'little') % l
|
||||
|
||||
scalar_int = int.from_bytes(scalar, 'little')
|
||||
S = (r + h_int * scalar_int) % l
|
||||
|
||||
return encoded_R + S.to_bytes(32, 'little')
|
||||
|
||||
def create_auth_token(public_key_hex, private_key_hex, exp_seconds=86400, additional_claims=None):
|
||||
"""Create a MeshCore-compatible JWT auth token."""
|
||||
iat = int(time.time())
|
||||
exp = iat + exp_seconds
|
||||
|
||||
header = {
|
||||
"alg": "Ed25519",
|
||||
"typ": "JWT"
|
||||
}
|
||||
|
||||
payload = {
|
||||
"publicKey": public_key_hex.upper(),
|
||||
"iat": iat,
|
||||
"exp": exp
|
||||
}
|
||||
|
||||
# Add additional claims if provided
|
||||
if additional_claims:
|
||||
payload.update(additional_claims)
|
||||
|
||||
def base64url_encode(data):
|
||||
"""Base64 URL encode without padding."""
|
||||
json_str = json.dumps(data, separators=(',', ':'))
|
||||
encoded = base64.urlsafe_b64encode(json_str.encode('utf-8'))
|
||||
return encoded.rstrip(b'=').decode('utf-8')
|
||||
|
||||
header_encoded = base64url_encode(header)
|
||||
payload_encoded = base64url_encode(payload)
|
||||
|
||||
message = f"{header_encoded}.{payload_encoded}".encode('utf-8')
|
||||
|
||||
private_key_bytes = bytes.fromhex(private_key_hex)
|
||||
signature = sign_message(message, private_key_bytes)
|
||||
|
||||
signature_hex = signature.hex().upper()
|
||||
|
||||
return f"{header_encoded}.{payload_encoded}.{signature_hex}", payload
|
||||
|
||||
def main():
|
||||
# Filter out 'auth-token' from arguments if present (for meshcore-decoder.py auth-token usage)
|
||||
filtered_args = []
|
||||
skip_next = False
|
||||
for i, arg in enumerate(sys.argv[1:], 1):
|
||||
if skip_next:
|
||||
skip_next = False
|
||||
continue
|
||||
if arg == 'auth-token' and i == 1:
|
||||
# Skip 'auth-token' if it's the first argument
|
||||
continue
|
||||
filtered_args.append(arg)
|
||||
|
||||
# Custom argument parser to handle both <public-key> <private-key> [options]
|
||||
# and [options] <public-key> <private-key> formats
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='meshcore-decoder auth-token',
|
||||
description='Generate JWT authentication token signed with Ed25519 private key',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
add_help=False # We'll add custom help handling
|
||||
)
|
||||
|
||||
parser.add_argument('-e', '--exp', metavar='<seconds>', type=int, default=86400,
|
||||
help='Token expiration in seconds from now (default: 86400 = 24 hours)')
|
||||
parser.add_argument('-c', '--claims', metavar='<json>',
|
||||
help='Additional claims as JSON object (e.g., \'{"aud":"mqtt.example.com","sub":"device-123"}\')')
|
||||
parser.add_argument('-j', '--json', action='store_true',
|
||||
help='Output as JSON')
|
||||
parser.add_argument('-h', '--help', action='store_true',
|
||||
help='display help for command')
|
||||
parser.add_argument('keys', nargs='*', help='<public-key> <private-key>')
|
||||
|
||||
args = parser.parse_args(filtered_args)
|
||||
|
||||
# Handle help
|
||||
if args.help:
|
||||
print('Usage: meshcore-decoder auth-token [options] <public-key> <private-key>')
|
||||
print('Generate JWT authentication token signed with Ed25519 private key')
|
||||
print()
|
||||
print('Arguments:')
|
||||
print(' public-key 32-byte public key in hex format')
|
||||
print(' private-key 64-byte private key in hex format')
|
||||
print()
|
||||
print('Options:')
|
||||
print(' -e, --exp <seconds> Token expiration in seconds from now (default: 86400 = 24 hours)')
|
||||
print(' -c, --claims <json> Additional claims as JSON object (e.g., \'{"aud":"mqtt.example.com","sub":"device-123"}\')')
|
||||
print(' -j, --json Output as JSON')
|
||||
print(' -h, --help display help for command')
|
||||
sys.exit(0)
|
||||
|
||||
# Validate we have exactly 2 positional arguments
|
||||
if len(args.keys) != 2:
|
||||
print('Error: Missing required arguments <public-key> and <private-key>', file=sys.stderr)
|
||||
print('Usage: meshcore-decoder auth-token [options] <public-key> <private-key>', file=sys.stderr)
|
||||
print('Use -h or --help for more information', file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
public_key = args.keys[0]
|
||||
private_key = args.keys[1]
|
||||
|
||||
# Validate inputs
|
||||
try:
|
||||
public_key_bytes = bytes.fromhex(public_key)
|
||||
if len(public_key_bytes) != 32:
|
||||
print(f"Error: Public key must be 32 bytes (64 hex characters), got {len(public_key_bytes)} bytes",
|
||||
file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except ValueError as e:
|
||||
print(f"Error: Invalid public key hex format: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
private_key_bytes = bytes.fromhex(private_key)
|
||||
if len(private_key_bytes) != 64:
|
||||
print(f"Error: Private key must be 64 bytes (128 hex characters), got {len(private_key_bytes)} bytes",
|
||||
file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except ValueError as e:
|
||||
print(f"Error: Invalid private key hex format: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Parse additional claims
|
||||
additional_claims = None
|
||||
if args.claims:
|
||||
try:
|
||||
additional_claims = json.loads(args.claims)
|
||||
if not isinstance(additional_claims, dict):
|
||||
print("Error: Claims must be a JSON object", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error: Invalid JSON in claims: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Generate token
|
||||
try:
|
||||
token, payload = create_auth_token(public_key, private_key,
|
||||
args.exp, additional_claims)
|
||||
|
||||
if args.json:
|
||||
output = {
|
||||
"token": token,
|
||||
"payload": payload
|
||||
}
|
||||
print(json.dumps(output, indent=2))
|
||||
else:
|
||||
print(token)
|
||||
except Exception as e:
|
||||
print(f"Error: Failed to generate token: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue