Files
ChameleonUltra/software/script/chameleon_com.py
T
Philippe Teuwen 8499535aad Clarify protocol. Disruptive changes: see below
This huge commit tries to enhance several things related to the fw/cli protocol.
Generally, the idea is to be verbose, explicit and reuse conventions, in order to enhance code maintainability and understandability for the other contributors.

docs/protocol.md got heavily updated

Many commands have been renamed for consistency. you are invited to adapt your client for easier maintenance

Guidelines, also written in docs/protocol.md "New data payloads: guidelines for developers":
- Now protocol data exchanged over USB or BLE are defined in netdata.h as packed structs and values are stored in Network byte order (=Big Endian)
- Command-specific payloads are defined in their respective cmd_processor handler in app_cmd.c and chameleon_cmd.py
- Define C `struct` for cmd/resp data greater than a single byte, use and abuse of `struct.pack`/`struct.unpack` in Python. So one can understand the payload format at a simple glimpse.
- If single byte of data to return, still use a 1-byte `data`, not `status`.
- Use unambiguous types such as `uint16_t`, not `int` or `enum`. Cast explicitly `int` and `enum` to `uint_t` of proper size
- Use Network byte order for 16b and 32b integers
  - Macros `U16NTOHS`, `U32NTOHL` must be used on reception of a command payload.
  - Macros `U16HTONS`, `U32HTONL` must be used on creation of a response payload.
  - In Python, use the modifier `!` with all `struct.pack`/`struct.unpack`
- Concentrate payload parsing in the handlers, avoid further parsing in their callers. This is true for the firmware and the client.
- In cmd_processor handlers: don't reuse input `length`/`data` parameters for creating the response content
- Avoid hardcoding offsets, use `sizeof()`, `offsetof(struct, field)` in C and `struct.calcsize()` in Python
- Use the exact same command and fields names in firmware and in client, use function names matching the command names for their handlers unless there is a very good reason not to do so. This helps grepping around. Names must start with a letter, not a number, because some languages require it (e.g. `14a_scan` not possible in Python)
- Respect commands order in `m_data_cmd_map`, `data_cmd.h` and `chameleon_cmd.py` definitions
- Even if a command is not yet implemented in firmware or in client but a command number is allocated, add it to `data_cmd.h` and `chameleon_cmd.py` with some `FIXME: to be implemented` comment
- Validate data before using it, both when receiving command data in the firmware and when receiving response data in the client.
- Validate response status in client.

Disruptive changes:
- GET_DEVICE_CAPABILITIES: list of cmds in data are now really Big Endian
  Note: the initial attempt to use macros PP_HTONS were actually considering wrongly that the platform was Big Endian (BYTE_ORDER was actually undefined) while it is actually Little Endian.
- GET_APP_VERSION: response is now a tuple of bytes: major|minor (previously it was in reversed order as a single uint16_t in Little Endian)
- SET_SLOT_TAG_TYPE: tag_type now on 2 bytes, to prepare remapping of its enum
- SET_SLOT_DATA_DEFAULT: tag_type now on 2 bytes, to prepare remapping of its enum
- GET_SLOT_INFO: tag_type now on 2 bytes, to prepare remapping of its enum
- GET_DEVICE_CHIP_ID: now returns its 64b ID following Network byte order (previously, bytes were in the reverse order)
- GET_DEVICE_ADDRESS: now returns its 56b address following Network byte order (previously, bytes were in the reverse order). CLI does not reverse the response anymore so it displays the same value as before.
- MF1_GET_DETECTION_COUNT: now returns its 32b value following Network byte order (previously Little Endian)
- GET_GIT_VERSION response status is now STATUS_DEVICE_SUCCESS
- GET_DEVICE_MODEL response status is now STATUS_DEVICE_SUCCESS
- MF1_READ_EMU_BLOCK_DATA response status is now STATUS_DEVICE_SUCCESS
- GET_DEVICE_CAPABILITIES response status is now STATUS_DEVICE_SUCCESS
- HF14A_SCAN: entirely new response format, room for ATS and multiple tags
- MF1_DETECT_SUPPORT response status is now HF_TAG_OK and support is indicated as bool in 1 byte of data
- MF1_DETECT_PRNG response status is now HF_TAG_OK and prng_type is returned in 1 byte of data with a new enum mf1_prng_type_t == MifareClassicPrngType
- MF1_DETECT_DARKSIDE response status is now HF_TAG_OK and darkside_status is returned in 1 byte of data with a new enum mf1_darkside_status_t == MifareClassicDarksideStatus
- MF1_DARKSIDE_ACQUIRE response status is now HF_TAG_OK and darkside_status is returned in 1 byte of data. If OK, followed by 24 bytes as previously
- MF1_GET_ANTI_COLL_DATA: in case slot does not contain anticoll data, instead of STATUS_PAR_ERR, now it returns STATUS_DEVICE_SUCCESS with empty data
- MF1_SET_ANTI_COLL_DATA and MF1_GET_ANTI_COLL_DATA now use the same data format as HF14A_SCAN

For clients to detect Ultra/Lite with older firmwares, one can issue the GET_APP_VERSION and urge the user to flash his device if needed.
On older firmwares, it will return a status=b'\x00' and data=b'\x00\x01' while up-to-date firmwares will return status=STATUS_DEVICE_SUCCESS and data greater or equal to b'\x01\x00' (v1.0).

Other changes: cf CHANGELOG, and probably a few small changes I forgot about..

TODO:
- remap `tag_specific_type_t` enum to allow future tags (e.g. LF tags) without reshuffling enum and affecting users stored cards
- TEST!
2023-09-18 00:53:39 +02:00

347 lines
13 KiB
Python

import queue
import struct
import threading
import time
import serial
import chameleon_status
class NotOpenException(Exception):
"""
Chameleon err status
"""
class OpenFailException(Exception):
"""
Chameleon open fail(serial port may be error)
"""
class CMDInvalidException(Exception):
"""
CMD invalid(Unsupported)
"""
class Response:
"""
Chameleon Response Data
"""
def __init__(self, cmd, status, data=b''):
self.cmd = cmd
self.status = status
self.data: bytearray = data
class ChameleonCom:
"""
Chameleon device base class
Communication and Data frame implemented
"""
data_frame_sof = 0x11
data_max_length = 512
commands = []
def __init__(self):
"""
Create a chameleon device instance
"""
self.serial_instance: serial.Serial | None = None
self.send_data_queue = queue.Queue()
self.wait_response_map = {}
self.event_closing = threading.Event()
def isOpen(self):
"""
Chameleon is connected and init.
:return:
"""
return self.serial_instance is not None and self.serial_instance.isOpen()
def open(self, port):
"""
Open chameleon port to communication
And init some variables
:param port: com port, comXXX or ttyXXX
:return:
"""
if not self.isOpen():
error = None
try:
# open serial port
self.serial_instance = serial.Serial(port=port, baudrate=115200)
except Exception as e:
error = e
finally:
if error is not None:
raise OpenFailException(error)
try:
self.serial_instance.dtr = 1 # must make dtr enable
except Exception:
# not all serial support dtr, e.g. virtual serial over BLE
pass
self.serial_instance.timeout = 0 # do not block
# clear variable
self.send_data_queue.queue.clear()
self.wait_response_map.clear()
# Start a sub thread to process data
self.event_closing.clear()
threading.Thread(target=self.thread_data_receive).start()
threading.Thread(target=self.thread_data_transfer).start()
threading.Thread(target=self.thread_check_timeout).start()
return self
def check_open(self):
"""
:return:
"""
if not self.isOpen():
raise NotOpenException("Please call open() function to start device.")
@staticmethod
def lrc_calc(array):
"""
calc lrc and auto cut byte
:param array: value array
:return: u8 result
"""
# add and cut byte and return
ret = 0x00
for b in array:
ret += b
ret &= 0xFF
return (0x100 - ret) & 0xFF
def close(self):
"""
Close chameleon and clear variable
:return:
"""
self.event_closing.set()
try:
self.serial_instance.close()
except Exception:
pass
finally:
self.serial_instance = None
self.wait_response_map.clear()
self.send_data_queue.queue.clear()
def thread_data_receive(self):
"""
SubThread to receive data from chameleon device
:return:
"""
data_buffer = bytearray()
data_position = 0
data_cmd = 0x0000
data_status = 0x0000
data_length = 0x0000
while self.isOpen():
# receive
try:
data_bytes = self.serial_instance.read()
except Exception as e:
if not self.event_closing.is_set():
print(f"Serial Error {e}, thread for receiver exit.")
self.close()
break
if len(data_bytes) > 0:
data_byte = data_bytes[0]
data_buffer.append(data_byte)
if data_position < struct.calcsize('!BB'): # start of frame + lrc1
if data_position == 0:
if data_buffer[data_position] != self.data_frame_sof:
print("Data frame no sof byte.")
data_position = 0
data_buffer.clear()
continue
if data_position == struct.calcsize('!B'):
if data_buffer[data_position] != self.lrc_calc(data_buffer[:data_position]):
data_position = 0
data_buffer.clear()
print("Data frame sof lrc error.")
continue
elif data_position == struct.calcsize('!BBHHH'): # frame head lrc
if data_buffer[data_position] != self.lrc_calc(data_buffer[:data_position]):
data_position = 0
data_buffer.clear()
print("Data frame head lrc error.")
continue
# frame head complete, cache info
_, _, data_cmd, data_status, data_length = struct.unpack("!BBHHH", data_buffer[:data_position])
if data_length > self.data_max_length:
data_position = 0
data_buffer.clear()
print("Data frame data length larger than max.")
continue
elif data_position > struct.calcsize('!BBHHH'): # // frame data
if data_position == (struct.calcsize(f'!BBHHHB{data_length}s')):
if data_buffer[data_position] == self.lrc_calc(data_buffer[:data_position]):
# ok, lrc for data is correct.
# and we are receive completed
# print(f"Buffer data = {data_buffer.hex()}")
if data_cmd in self.wait_response_map:
# call processor
if 'callback' in self.wait_response_map[data_cmd]:
fn_call = self.wait_response_map[data_cmd]['callback']
else:
fn_call = None
data_response = data_buffer[struct.calcsize('!BBHHHB'):
struct.calcsize(f'!BBHHHB{data_length}s')]
if callable(fn_call):
# delete wait task from map
del self.wait_response_map[data_cmd]
fn_call(data_cmd, data_status, data_response)
else:
self.wait_response_map[data_cmd]['response'] = Response(data_cmd, data_status,
data_response)
else:
print(f"No task wait process: ${data_cmd}")
else:
print("Data frame global lrc error.")
data_position = 0
data_buffer.clear()
continue
data_position += 1
else:
time.sleep(0.001)
def thread_data_transfer(self):
"""
SubThread to transfer data to chameleon device
:return:
"""
while self.isOpen():
# get a task from queue(if exists)
if self.send_data_queue.empty():
time.sleep(0.001)
continue
task = self.send_data_queue.get()
task_cmd = task['cmd']
task_timeout = task['timeout']
task_close = task['close']
# register to wait map
if 'callback' in task and callable(task['callback']):
self.wait_response_map[task_cmd] = {'callback': task['callback']} # The callback for this task
else:
self.wait_response_map[task_cmd] = {'response': None}
# set start time
start_time = time.time()
self.wait_response_map[task_cmd]['start_time'] = start_time
self.wait_response_map[task_cmd]['end_time'] = start_time + task_timeout
self.wait_response_map[task_cmd]['is_timeout'] = False
try:
# send to device
self.serial_instance.write(task['frame'])
except Exception as e:
print(f"Serial Error {e}, thread for transfer exit.")
self.close()
break
# update queue status
self.send_data_queue.task_done()
# disconnect if DFU command has been sent
if task_close:
self.close()
def thread_check_timeout(self):
"""
Check task timeout
:return:
"""
while self.isOpen():
for task_cmd in self.wait_response_map.keys():
if time.time() > self.wait_response_map[task_cmd]['end_time']:
if 'callback' in self.wait_response_map[task_cmd]:
# not sync, call function to notify timeout.
self.wait_response_map[task_cmd]['callback'](task_cmd, None, None)
else:
# sync mode, set timeout flag
self.wait_response_map[task_cmd]['is_timeout'] = True
time.sleep(0.001)
def make_data_frame_bytes(self, cmd: int, data: bytearray = None, status: int = 0) -> bytearray:
"""
Make data frame
:return: frame
"""
if data is None:
data = b''
frame = bytearray(struct.pack(f'!BBHHHB{len(data)}sB',
self.data_frame_sof, 0x00, cmd, status, len(data), 0x00, data, 0x00))
# lrc1
frame[struct.calcsize('!B')] = self.lrc_calc(frame[:struct.calcsize('!B')])
# lrc2
frame[struct.calcsize('!BBHHH')] = self.lrc_calc(frame[:struct.calcsize('!BBHHH')])
# lrc3
frame[struct.calcsize(f'!BBHHHB{len(data)}s')] = self.lrc_calc(frame[:struct.calcsize(f'!BBHHHB{len(data)}s')])
return frame
def send_cmd_auto(self, cmd: int, data: bytearray = None, status: int = 0, callback=None, timeout: int = 3,
close: bool = False):
"""
Send cmd to device
:param cmd: cmd
:param data: bytes data (optional)
:param status: status (optional)
:param callback: call on response
:param timeout: wait response timeout
:param close: close connection after executing
:return:
"""
self.check_open()
# delete old task
if cmd in self.wait_response_map:
del self.wait_response_map[cmd]
# make data frame
data_frame = self.make_data_frame_bytes(cmd, data, status)
task = {'cmd': cmd, 'frame': data_frame, 'timeout': timeout, 'close': close}
if callable(callback):
task['callback'] = callback
self.send_data_queue.put(task)
return self
def send_cmd_sync(self, cmd: int, data: bytearray or bytes or list or int = None, status: int = 0,
timeout: int = 3) -> Response:
"""
Send cmd to device, and block receive data.
:param cmd: cmd
:param data: bytes data (optional)
:param status: status (optional)
:param timeout: wait response timeout
:return: response data
"""
if isinstance(data, int):
data = [data] # warp array.
if len(self.commands):
# check if chameleon can understand this command
if cmd not in self.commands:
raise CMDInvalidException(f"This device doesn't declare that it can support this command: {cmd}.\nMake "
f"sure firmware is up to date and matches client")
# first to send cmd, no callback mode(sync)
self.send_cmd_auto(cmd, data, status, None, timeout)
# wait cmd start process
while cmd not in self.wait_response_map:
time.sleep(0.01)
# wait response data set
while self.wait_response_map[cmd]['response'] is None:
if 'is_timeout' in self.wait_response_map[cmd] and self.wait_response_map[cmd]['is_timeout']:
raise TimeoutError(f"CMD {cmd} exec timeout")
time.sleep(0.01)
# ok, data received.
data_response = self.wait_response_map[cmd]['response']
del self.wait_response_map[cmd]
if data_response.status == chameleon_status.Device.STATUS_INVALID_CMD:
raise CMDInvalidException(f"Device unsupported cmd: {cmd}")
return data_response
if __name__ == '__main__':
cml = ChameleonCom().open("com19")
resp = cml.send_cmd_sync(0x03E9, bytearray([0x01, 0x02]), 0xBEEF)
print(resp)