mirror of
https://github.com/agessaman/meshcore-bot.git
synced 2026-03-30 12:05:38 +00:00
- Introduced a mechanism to utilize the main event loop for scheduling coroutines, preventing deadlocks when the main loop is running. - Updated the JWT renewal interval to 12 hours, with tokens now valid for 24 hours, improving token management. - Refactored various async function calls in the MessageScheduler, WeatherService, and PacketCaptureService to ensure consistent event loop usage. - Improved error handling and logging for scheduled tasks, enhancing robustness and maintainability.
649 lines
24 KiB
Python
649 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Utilities for packet capture service
|
|
Includes auth token functionality with on-device signing support
|
|
Adapted from meshcore-packet-capture auth_token.py
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import hashlib
|
|
import base64
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Try PyNaCl first (preferred for JWT tokens)
|
|
try:
|
|
import nacl.bindings
|
|
import nacl.signing
|
|
import nacl.exceptions
|
|
PYNACL_AVAILABLE = True
|
|
except ImportError:
|
|
PYNACL_AVAILABLE = False
|
|
nacl = None
|
|
|
|
# Fallback to cryptography library
|
|
try:
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519
|
|
from cryptography.hazmat.backends import default_backend
|
|
CRYPTOGRAPHY_AVAILABLE = True
|
|
except ImportError:
|
|
CRYPTOGRAPHY_AVAILABLE = False
|
|
ed25519 = None
|
|
|
|
|
|
def hex_to_bytes(hex_str: str) -> bytes:
|
|
"""Convert hex string to bytes.
|
|
|
|
Args:
|
|
hex_str: Hexadecimal string to convert.
|
|
|
|
Returns:
|
|
bytes: Converted bytes object.
|
|
"""
|
|
return bytes.fromhex(hex_str.replace('0x', '').replace(' ', ''))
|
|
|
|
|
|
def bytes_to_hex(data: bytes) -> str:
|
|
"""Convert bytes to hex string (lowercase).
|
|
|
|
Args:
|
|
data: Bytes object to convert.
|
|
|
|
Returns:
|
|
str: Hexadecimal representation of the bytes (lowercase).
|
|
"""
|
|
return data.hex()
|
|
|
|
|
|
def base64url_encode(data: bytes) -> str:
|
|
"""Base64url encode (URL-safe base64 without padding).
|
|
|
|
Args:
|
|
data: Data to encode.
|
|
|
|
Returns:
|
|
str: URL-safe Base64 encoded string.
|
|
"""
|
|
b64 = base64.b64encode(data).decode('ascii')
|
|
return b64.replace('+', '-').replace('/', '_').replace('=', '')
|
|
|
|
|
|
def base64url_decode(data: str) -> bytes:
|
|
"""Base64url decode.
|
|
|
|
Args:
|
|
data: URL-safe Base64 encoded string.
|
|
|
|
Returns:
|
|
bytes: Decoded bytes.
|
|
"""
|
|
b64 = data.replace('-', '+').replace('_', '/')
|
|
padding = 4 - (len(b64) % 4)
|
|
if padding != 4:
|
|
b64 += '=' * padding
|
|
return base64.b64decode(b64)
|
|
|
|
|
|
def int_to_bytes_le(value: int, length: int) -> bytes:
|
|
"""Convert integer to little-endian bytes.
|
|
|
|
Args:
|
|
value: Integer value to convert.
|
|
length: Number of bytes to use.
|
|
|
|
Returns:
|
|
bytes: Little-endian byte representation.
|
|
"""
|
|
return value.to_bytes(length, byteorder='little')
|
|
|
|
|
|
def bytes_to_int_le(data: bytes) -> int:
|
|
"""Convert little-endian bytes to integer.
|
|
|
|
Args:
|
|
data: Bytes object to convert.
|
|
|
|
Returns:
|
|
int: Integer value.
|
|
"""
|
|
return int.from_bytes(data, byteorder='little')
|
|
|
|
|
|
# Ed25519 group order
|
|
L = 2**252 + 27742317777372353535851937790883648493
|
|
|
|
|
|
def ed25519_sign_with_expanded_key(message: bytes, scalar: bytes, prefix: bytes, public_key: bytes) -> bytes:
|
|
"""Sign a message using Ed25519 with pre-expanded key (orlp format).
|
|
|
|
This implements RFC 8032 Ed25519 signing with an already-expanded key.
|
|
This matches exactly how orlp/ed25519's ed25519_sign() works.
|
|
|
|
Args:
|
|
message: Message to sign.
|
|
scalar: First 32 bytes of orlp private key (clamped scalar).
|
|
prefix: Last 32 bytes of orlp private key (prefix for nonce).
|
|
public_key: 32-byte public key.
|
|
|
|
Returns:
|
|
bytes: 64-byte signature (R || s).
|
|
|
|
Raises:
|
|
ImportError: If PyNaCl is not available.
|
|
"""
|
|
if not PYNACL_AVAILABLE:
|
|
raise ImportError("PyNaCl is required for Ed25519 signing")
|
|
|
|
# Step 1: Compute nonce r = H(prefix || message) mod L
|
|
h_r = hashlib.sha512(prefix + message).digest()
|
|
r = bytes_to_int_le(h_r) % L
|
|
|
|
# Step 2: Compute R = r * B (base point multiplication)
|
|
r_bytes = int_to_bytes_le(r, 32)
|
|
R = nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(r_bytes)
|
|
|
|
# Step 3: Compute challenge k = H(R || public_key || message) mod L
|
|
h_k = hashlib.sha512(R + public_key + message).digest()
|
|
k = bytes_to_int_le(h_k) % L
|
|
|
|
# Step 4: Compute s = (r + k * scalar) mod L
|
|
scalar_int = bytes_to_int_le(scalar)
|
|
s = (r + k * scalar_int) % L
|
|
s_bytes = int_to_bytes_le(s, 32)
|
|
|
|
# Step 5: Signature is R || s
|
|
return R + s_bytes
|
|
|
|
|
|
def read_private_key_file(key_file_path: str) -> Optional[str]:
|
|
"""Read a private key from a file (64-byte hex format for orlp/ed25519).
|
|
|
|
Args:
|
|
key_file_path: Path to the private key file.
|
|
|
|
Returns:
|
|
Optional[str]: Private key as hex string (128 hex chars = 64 bytes), or None if invalid.
|
|
"""
|
|
if not os.path.exists(key_file_path):
|
|
return None
|
|
|
|
try:
|
|
with open(key_file_path, 'r') as f:
|
|
key = f.read().strip()
|
|
# Remove whitespace
|
|
key = ''.join(key.split())
|
|
# Remove 0x prefix if present
|
|
key = key.replace('0x', '').replace('0X', '')
|
|
|
|
# Should be 128 hex chars (64 bytes) for orlp format
|
|
if len(key) == 128:
|
|
# Validate it's valid hex
|
|
int(key, 16)
|
|
return key.lower()
|
|
# Try 64 hex chars (32 bytes) - convert to orlp format if needed
|
|
elif len(key) == 64:
|
|
# This is a seed - we'd need to expand it, but for now just return as-is
|
|
# The caller should handle expansion
|
|
return key.lower()
|
|
else:
|
|
logger.debug(f"Private key file has wrong length: {len(key)} (expected 64 or 128 hex chars)")
|
|
return None
|
|
except Exception as e:
|
|
logger.debug(f"Error reading private key file: {e}")
|
|
return None
|
|
|
|
|
|
async def _create_auth_token_with_device(
|
|
payload_dict: Dict[str, Any],
|
|
public_key_hex: str,
|
|
meshcore_instance: Any,
|
|
chunk_size: int = 120
|
|
) -> str:
|
|
"""Create auth token using on-device signing via meshcore.commands.sign().
|
|
|
|
Args:
|
|
payload_dict: Token payload as dictionary.
|
|
public_key_hex: Public key in hex (for verification).
|
|
meshcore_instance: Connected MeshCore instance.
|
|
chunk_size: Maximum chunk size for signing (device may have limits).
|
|
|
|
Returns:
|
|
str: JWT-style token string (header.payload.signature).
|
|
|
|
Raises:
|
|
ImportError: If meshcore package is missing.
|
|
Exception: If device is not connected or signing fails.
|
|
"""
|
|
try:
|
|
from meshcore import EventType
|
|
except ImportError:
|
|
# Fallback if meshcore not available
|
|
raise ImportError("meshcore package required for on-device signing")
|
|
|
|
if not meshcore_instance or not meshcore_instance.is_connected:
|
|
raise Exception("MeshCore device not connected")
|
|
|
|
if not hasattr(meshcore_instance, 'commands') or not hasattr(meshcore_instance.commands, 'sign'):
|
|
raise Exception("Device does not support signing (meshcore.commands.sign not available)")
|
|
|
|
# IMPORTANT: The device signs with self_id.sign(), which uses the device's LocalIdentity
|
|
# The LocalIdentity has its own pub_key and prv_key that may differ from the exported key.
|
|
# We MUST use the device's actual signing public key (from self_id) in the JWT payload
|
|
# so the signature will verify correctly.
|
|
device_signing_public_key = public_key_hex # Default to provided key
|
|
if hasattr(meshcore_instance, 'self_info') and meshcore_instance.self_info:
|
|
device_info_public_key = meshcore_instance.self_info.get('public_key', '')
|
|
if device_info_public_key:
|
|
# Normalize to hex string if it's bytes
|
|
if isinstance(device_info_public_key, bytes):
|
|
device_info_public_key = device_info_public_key.hex()
|
|
elif isinstance(device_info_public_key, bytearray):
|
|
device_info_public_key = bytes(device_info_public_key).hex()
|
|
device_signing_public_key = device_info_public_key.upper()
|
|
if device_signing_public_key != public_key_hex.upper():
|
|
logger.debug("⚠️ Device's self_id public key differs from provided key")
|
|
logger.debug(" Using device's self_id public key for JWT payload (required for verification)")
|
|
logger.debug(f" Device signing key (self_id): {device_signing_public_key[:32]}...")
|
|
logger.debug(f" Provided key: {public_key_hex[:32]}...")
|
|
else:
|
|
logger.debug("✓ Device's self_id public key matches provided key")
|
|
else:
|
|
logger.debug("⚠️ Could not get device's self_info public_key, using provided key")
|
|
else:
|
|
logger.debug("⚠️ Could not get device's self_info, using provided public key")
|
|
logger.debug(" If signature doesn't verify, device may be using different key")
|
|
|
|
# Update payload with the device's actual signing public key (from self_id)
|
|
# This is critical: the signature was created with self_id's private key,
|
|
# so it will only verify with self_id's public key
|
|
payload_dict['publicKey'] = device_signing_public_key
|
|
|
|
# Create JWT header
|
|
header = {
|
|
'alg': 'Ed25519',
|
|
'typ': 'JWT'
|
|
}
|
|
|
|
# Encode header and payload
|
|
header_json = json.dumps(header, separators=(',', ':'))
|
|
payload_json = json.dumps(payload_dict, separators=(',', ':'))
|
|
|
|
# Base64url encode
|
|
header_encoded = base64url_encode(header_json.encode('utf-8'))
|
|
payload_encoded = base64url_encode(payload_json.encode('utf-8'))
|
|
|
|
# Create signing input (what we'll sign)
|
|
signing_input = f"{header_encoded}.{payload_encoded}"
|
|
signing_input_bytes = signing_input.encode('utf-8')
|
|
|
|
# Check if message needs to be chunked
|
|
if len(signing_input_bytes) > chunk_size:
|
|
# Device may have signing size limits - chunk the message
|
|
# For now, we'll try full message first
|
|
logger.debug(f"Signing input is {len(signing_input_bytes)} bytes (chunk_size: {chunk_size})")
|
|
|
|
# Request device to sign
|
|
try:
|
|
result = await meshcore_instance.commands.sign(signing_input_bytes)
|
|
|
|
if result.type == EventType.ERROR:
|
|
error_payload = result.payload if hasattr(result, 'payload') else {}
|
|
error_reason = error_payload.get('reason', 'unknown')
|
|
raise Exception(f"Device signing failed: {error_reason}")
|
|
|
|
# Check for SIGNATURE event type (or OK with signature in payload)
|
|
signature_bytes = None
|
|
if result.type == EventType.SIGNATURE:
|
|
# Get signature from result
|
|
signature_bytes = result.payload.get('signature')
|
|
elif result.type == EventType.OK:
|
|
# Some devices return OK with signature in payload
|
|
signature_bytes = result.payload.get('signature')
|
|
else:
|
|
# Try to get signature from payload regardless of event type
|
|
signature_bytes = result.payload.get('signature') if hasattr(result, 'payload') else None
|
|
if not signature_bytes:
|
|
raise Exception(f"Unexpected response type from device: {result.type}")
|
|
|
|
if not signature_bytes:
|
|
raise Exception("Device returned empty signature")
|
|
|
|
# Convert signature to hex if it's bytes
|
|
if isinstance(signature_bytes, bytes):
|
|
signature_hex = bytes_to_hex(signature_bytes)
|
|
elif isinstance(signature_bytes, bytearray):
|
|
signature_hex = bytes_to_hex(bytes(signature_bytes))
|
|
elif isinstance(signature_bytes, str):
|
|
signature_hex = signature_bytes.replace('0x', '').replace(' ', '').lower()
|
|
else:
|
|
raise Exception(f"Unexpected signature type: {type(signature_bytes)}")
|
|
|
|
# Return JWT format: header.payload.signature
|
|
token = f"{header_encoded}.{payload_encoded}.{signature_hex}"
|
|
|
|
# Debug: Log JWT contents
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
try:
|
|
# Decode header and payload for debugging
|
|
header_decoded = base64url_decode(header_encoded).decode('utf-8')
|
|
payload_decoded = base64url_decode(payload_encoded).decode('utf-8')
|
|
logger.debug(f"JWT Header: {header_decoded}")
|
|
logger.debug(f"JWT Payload: {payload_decoded}")
|
|
logger.debug(f"JWT Signature (hex): {signature_hex[:32]}...{signature_hex[-32:]}")
|
|
except Exception as e:
|
|
logger.debug(f"Could not decode JWT for logging: {e}")
|
|
|
|
return token
|
|
except Exception as e:
|
|
logger.debug(f"Device signing error: {e}")
|
|
raise
|
|
|
|
|
|
def _create_auth_token_python(
|
|
payload_dict: Dict[str, Any],
|
|
private_key_hex: str,
|
|
public_key_hex: str
|
|
) -> str:
|
|
"""Create auth token using Python signing (PyNaCl).
|
|
|
|
Args:
|
|
payload_dict: Token payload as dictionary.
|
|
private_key_hex: 64-byte private key in hex (orlp format: scalar || prefix).
|
|
public_key_hex: 32-byte public key in hex.
|
|
|
|
Returns:
|
|
str: JWT-style token string (header.payload.signature).
|
|
|
|
Raises:
|
|
ImportError: If PyNaCl is required but missing.
|
|
ValueError: If key lengths are invalid.
|
|
"""
|
|
if not PYNACL_AVAILABLE:
|
|
raise ImportError("PyNaCl is required for Python signing. Install with: pip install pynacl")
|
|
|
|
# Create JWT header
|
|
header = {
|
|
'alg': 'Ed25519',
|
|
'typ': 'JWT'
|
|
}
|
|
|
|
# Encode header and payload
|
|
header_json = json.dumps(header, separators=(',', ':'))
|
|
payload_json = json.dumps(payload_dict, separators=(',', ':'))
|
|
|
|
# Base64url encode
|
|
header_encoded = base64url_encode(header_json.encode('utf-8'))
|
|
payload_encoded = base64url_encode(payload_json.encode('utf-8'))
|
|
|
|
# Create signing input
|
|
signing_input = f"{header_encoded}.{payload_encoded}"
|
|
signing_input_bytes = signing_input.encode('utf-8')
|
|
|
|
# Parse keys
|
|
private_bytes = hex_to_bytes(private_key_hex)
|
|
public_bytes = hex_to_bytes(public_key_hex)
|
|
|
|
if len(private_bytes) != 64:
|
|
raise ValueError(f"Private key must be 64 bytes (orlp format), got {len(private_bytes)}")
|
|
|
|
if len(public_bytes) != 32:
|
|
raise ValueError(f"Public key must be 32 bytes, got {len(public_bytes)}")
|
|
|
|
# Extract scalar and prefix from orlp private key
|
|
scalar = private_bytes[:32]
|
|
prefix = private_bytes[32:64]
|
|
|
|
# Sign using Ed25519 with expanded key
|
|
signature_bytes = ed25519_sign_with_expanded_key(
|
|
signing_input_bytes,
|
|
scalar,
|
|
prefix,
|
|
public_bytes
|
|
)
|
|
|
|
# Convert signature to hex
|
|
signature_hex = bytes_to_hex(signature_bytes)
|
|
|
|
# Return JWT format
|
|
token = f"{header_encoded}.{payload_encoded}.{signature_hex}"
|
|
|
|
# Debug: Log JWT contents
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
try:
|
|
# Decode header and payload for debugging
|
|
header_decoded = base64url_decode(header_encoded).decode('utf-8')
|
|
payload_decoded = base64url_decode(payload_encoded).decode('utf-8')
|
|
logger.debug(f"JWT Header: {header_decoded}")
|
|
logger.debug(f"JWT Payload: {payload_decoded}")
|
|
logger.debug(f"JWT Signature (hex): {signature_hex[:32]}...{signature_hex[-32:]}")
|
|
except Exception as e:
|
|
logger.debug(f"Could not decode JWT for logging: {e}")
|
|
|
|
return token
|
|
|
|
|
|
async def _fetch_private_key_from_device(meshcore_instance: Any) -> Optional[str]:
|
|
"""Attempt to export private key from device.
|
|
|
|
Args:
|
|
meshcore_instance: Connected MeshCore instance.
|
|
|
|
Returns:
|
|
Optional[str]: Private key as hex string (128 hex chars), or None if not available.
|
|
"""
|
|
if not meshcore_instance or not meshcore_instance.is_connected:
|
|
return None
|
|
|
|
try:
|
|
from meshcore import EventType
|
|
|
|
if hasattr(meshcore_instance, 'commands') and hasattr(meshcore_instance.commands, 'export_private_key'):
|
|
logger.debug("Attempting to export private key from device...")
|
|
result = await meshcore_instance.commands.export_private_key()
|
|
|
|
if result.type == EventType.PRIVATE_KEY:
|
|
device_private_key = result.payload.get("private_key")
|
|
if device_private_key:
|
|
# Convert to hex string if it's bytes
|
|
if isinstance(device_private_key, bytes):
|
|
device_private_key = bytes_to_hex(device_private_key)
|
|
elif isinstance(device_private_key, bytearray):
|
|
device_private_key = bytes_to_hex(bytes(device_private_key))
|
|
elif isinstance(device_private_key, str):
|
|
device_private_key = device_private_key.replace('0x', '').replace(' ', '').lower()
|
|
|
|
# Validate length (should be 128 hex chars = 64 bytes)
|
|
if len(device_private_key) == 128:
|
|
logger.debug("✓ Successfully exported private key from device")
|
|
return device_private_key
|
|
elif len(device_private_key) == 64:
|
|
# 32-byte seed - would need expansion, but return as-is
|
|
logger.debug("Device returned 32-byte seed (may need expansion)")
|
|
return device_private_key
|
|
else:
|
|
logger.debug(f"Exported private key has wrong length: {len(device_private_key)}")
|
|
elif result.type == EventType.DISABLED:
|
|
logger.debug("Private key export is disabled on device")
|
|
elif result.type == EventType.ERROR:
|
|
logger.debug(f"Device returned error when exporting private key: {result.payload}")
|
|
except Exception as e:
|
|
logger.debug(f"Failed to export private key from device: {e}")
|
|
|
|
return None
|
|
|
|
|
|
async def create_auth_token_async(
|
|
meshcore_instance: Optional[Any] = None,
|
|
public_key_hex: Optional[str] = None,
|
|
private_key_hex: Optional[str] = None,
|
|
iata: str = "LOC",
|
|
timestamp: Optional[int] = None,
|
|
audience: Optional[str] = None,
|
|
exp: Optional[int] = None,
|
|
owner_public_key: Optional[str] = None,
|
|
owner_email: Optional[str] = None,
|
|
use_device: bool = True
|
|
) -> str:
|
|
"""Create a JWT-style authentication token for MQTT authentication.
|
|
|
|
Supports on-device signing (preferred) with fallback to Python signing.
|
|
|
|
Args:
|
|
meshcore_instance: Optional connected MeshCore instance for on-device signing.
|
|
public_key_hex: Public key in hex (required).
|
|
private_key_hex: Private key in hex (64 bytes = 128 hex chars, orlp format).
|
|
Required if meshcore_instance not available or device signing fails.
|
|
iata: IATA code (default: "LOC").
|
|
timestamp: Unix timestamp for 'iat' claim (default: current time).
|
|
audience: Optional audience for token (e.g., MQTT broker hostname).
|
|
exp: Optional expiration time (Unix timestamp).
|
|
owner_public_key: Optional owner public key.
|
|
owner_email: Optional owner email.
|
|
use_device: If True, try on-device signing first (default: True).
|
|
|
|
Returns:
|
|
str: JWT-style token string (header.payload.signature).
|
|
|
|
Raises:
|
|
ValueError: If public_key_hex is missing or private key is missing for Python signing.
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = int(time.time())
|
|
|
|
# Get public key from device if not provided
|
|
if not public_key_hex and meshcore_instance and hasattr(meshcore_instance, 'self_info'):
|
|
try:
|
|
self_info = meshcore_instance.self_info
|
|
if isinstance(self_info, dict):
|
|
public_key_hex = self_info.get('public_key', '')
|
|
elif hasattr(self_info, 'public_key'):
|
|
public_key_hex = self_info.public_key
|
|
|
|
# Convert to hex if needed
|
|
if isinstance(public_key_hex, bytes):
|
|
public_key_hex = bytes_to_hex(public_key_hex)
|
|
elif isinstance(public_key_hex, bytearray):
|
|
public_key_hex = bytes_to_hex(bytes(public_key_hex))
|
|
except Exception as e:
|
|
logger.debug(f"Could not get public key from device: {e}")
|
|
|
|
if not public_key_hex:
|
|
raise ValueError("public_key_hex is required")
|
|
|
|
# Normalize public key (remove 0x, whitespace, uppercase)
|
|
public_key_hex = public_key_hex.replace('0x', '').replace(' ', '').upper()
|
|
|
|
# Create JWT payload
|
|
# Default expiration: 24 hours from now (86400 seconds)
|
|
if exp is None:
|
|
exp = timestamp + 86400
|
|
|
|
payload_dict = {
|
|
'publicKey': public_key_hex,
|
|
'iat': timestamp,
|
|
'exp': exp
|
|
}
|
|
|
|
if audience:
|
|
payload_dict['aud'] = audience
|
|
|
|
# Add owner information if provided (using 'owner' and 'email' field names to match original)
|
|
if owner_public_key:
|
|
# Normalize owner public key (remove 0x, whitespace, uppercase)
|
|
owner_pubkey_clean = owner_public_key.replace('0x', '').replace(' ', '').upper()
|
|
payload_dict['owner'] = owner_pubkey_clean
|
|
|
|
if owner_email:
|
|
# Normalize email to lowercase (matches original script)
|
|
payload_dict['email'] = owner_email.lower()
|
|
|
|
# Try on-device signing first if available
|
|
if use_device and meshcore_instance and meshcore_instance.is_connected:
|
|
try:
|
|
if hasattr(meshcore_instance, 'commands') and hasattr(meshcore_instance.commands, 'sign'):
|
|
logger.debug("Using on-device signing for auth token")
|
|
return await _create_auth_token_with_device(
|
|
payload_dict,
|
|
public_key_hex,
|
|
meshcore_instance
|
|
)
|
|
except Exception as device_error:
|
|
logger.debug(f"On-device signing failed: {device_error}, falling back to Python signing")
|
|
# Fall through to Python signing
|
|
|
|
# Fallback to Python signing
|
|
if not private_key_hex:
|
|
# Try to fetch from device
|
|
if meshcore_instance:
|
|
private_key_hex = await _fetch_private_key_from_device(meshcore_instance)
|
|
|
|
# Try to read from file if still not available
|
|
if not private_key_hex:
|
|
private_key_file = os.getenv('PACKETCAPTURE_PRIVATE_KEY_FILE') or os.getenv('PRIVATE_KEY_FILE')
|
|
if private_key_file:
|
|
private_key_hex = read_private_key_file(private_key_file)
|
|
|
|
if not private_key_hex:
|
|
raise ValueError(
|
|
"private_key_hex is required for Python signing. "
|
|
"Either provide private_key_hex, set PACKETCAPTURE_PRIVATE_KEY_FILE, "
|
|
"or ensure device supports on-device signing."
|
|
)
|
|
|
|
# Normalize private key
|
|
private_key_hex = private_key_hex.replace('0x', '').replace(' ', '').lower()
|
|
|
|
# If 64 hex chars (32 bytes), we need the full 64-byte orlp format
|
|
# For now, assume it's already in orlp format (128 hex chars)
|
|
if len(private_key_hex) == 64:
|
|
logger.warning("Private key is 32 bytes - may need expansion to orlp format (64 bytes)")
|
|
# Could expand here if needed, but for now assume caller provides full key
|
|
|
|
logger.debug("Using Python signing for auth token")
|
|
return _create_auth_token_python(payload_dict, private_key_hex, public_key_hex)
|
|
|
|
|
|
# Legacy synchronous function (for backward compatibility)
|
|
def create_auth_token(
|
|
private_key_hex: str,
|
|
public_key_hex: str,
|
|
iata: str = "LOC",
|
|
timestamp: Optional[int] = None,
|
|
audience: Optional[str] = None
|
|
) -> str:
|
|
"""Synchronous version of create_auth_token (Python signing only).
|
|
|
|
Args:
|
|
private_key_hex: Private key in hex (64 bytes = 128 hex chars, orlp format).
|
|
public_key_hex: Public key in hex (32 bytes = 64 hex chars).
|
|
iata: IATA code (default: "LOC").
|
|
timestamp: Unix timestamp (default: current time).
|
|
audience: Optional audience for token.
|
|
|
|
Returns:
|
|
str: JWT-style token string (header.payload.signature).
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = int(time.time())
|
|
|
|
payload_dict = {
|
|
'publicKey': public_key_hex.replace('0x', '').replace(' ', '').upper(),
|
|
'iat': timestamp
|
|
}
|
|
|
|
if audience:
|
|
payload_dict['aud'] = audience
|
|
|
|
return _create_auth_token_python(
|
|
payload_dict,
|
|
private_key_hex.replace('0x', '').replace(' ', '').lower(),
|
|
public_key_hex.replace('0x', '').replace(' ', '').upper()
|
|
)
|
|
|