mirror of
https://github.com/markqvist/NomadNet.git
synced 2026-05-25 09:54:26 +00:00
431 lines
13 KiB
Python
431 lines
13 KiB
Python
# https://github.com/brianolson/cbor_py
|
|
# Copyright 2014-2015 Brian Olson
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import datetime
|
|
import re
|
|
import struct
|
|
from io import BytesIO
|
|
|
|
|
|
CBOR_TYPE_MASK = 0xE0 # top 3 bits
|
|
CBOR_INFO_BITS = 0x1F # low 5 bits
|
|
|
|
|
|
CBOR_UINT = 0x00
|
|
CBOR_NEGINT = 0x20
|
|
CBOR_BYTES = 0x40
|
|
CBOR_TEXT = 0x60
|
|
CBOR_ARRAY = 0x80
|
|
CBOR_MAP = 0xA0
|
|
CBOR_TAG = 0xC0
|
|
CBOR_7 = 0xE0 # float and other types
|
|
|
|
CBOR_UINT8_FOLLOWS = 24 # 0x18
|
|
CBOR_UINT16_FOLLOWS = 25 # 0x19
|
|
CBOR_UINT32_FOLLOWS = 26 # 0x1a
|
|
CBOR_UINT64_FOLLOWS = 27 # 0x1b
|
|
CBOR_VAR_FOLLOWS = 31 # 0x1f
|
|
|
|
CBOR_BREAK = 0xFF
|
|
|
|
CBOR_FALSE = (CBOR_7 | 20)
|
|
CBOR_TRUE = (CBOR_7 | 21)
|
|
CBOR_NULL = (CBOR_7 | 22)
|
|
CBOR_UNDEFINED = (CBOR_7 | 23) # js 'undefined' value
|
|
|
|
CBOR_FLOAT16 = (CBOR_7 | 25)
|
|
CBOR_FLOAT32 = (CBOR_7 | 26)
|
|
CBOR_FLOAT64 = (CBOR_7 | 27)
|
|
|
|
CBOR_TAG_DATE_STRING = 0 # RFC3339
|
|
CBOR_TAG_DATE_ARRAY = 1 # any number, seconds since 1970-01-01T00:00:00 UTC
|
|
CBOR_TAG_BIGNUM = 2 # big-endian byte string follows
|
|
CBOR_TAG_NEGBIGNUM = 3 # big-endian byte string follows
|
|
CBOR_TAG_DECIMAL = 4
|
|
CBOR_TAG_BIGFLOAT = 5
|
|
CBOR_TAG_BASE64URL = 21
|
|
CBOR_TAG_BASE64 = 22
|
|
CBOR_TAG_BASE16 = 23
|
|
CBOR_TAG_CBOR = 24
|
|
|
|
CBOR_TAG_URI = 32
|
|
CBOR_TAG_REGEX = 35
|
|
CBOR_TAG_MIME = 36
|
|
CBOR_TAG_CBOR_FILEHEADER = 55799 # 0xd9d9f7
|
|
|
|
_CBOR_TAG_BIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_BIGNUM)
|
|
_CBOR_TAG_NEGBIGNUM_BYTES = struct.pack('B', CBOR_TAG | CBOR_TAG_NEGBIGNUM)
|
|
|
|
|
|
def _dumps_bignum_to_bytearray(val):
|
|
out = []
|
|
while val > 0:
|
|
out.insert(0, val & 0x0ff)
|
|
val = val >> 8
|
|
return bytes(out)
|
|
|
|
|
|
def dumps_int(val):
|
|
"return bytes representing int val in CBOR"
|
|
if val >= 0:
|
|
if val <= 23:
|
|
return struct.pack('B', val)
|
|
if val <= 0x0ff:
|
|
return struct.pack('BB', CBOR_UINT8_FOLLOWS, val)
|
|
if val <= 0x0ffff:
|
|
return struct.pack('!BH', CBOR_UINT16_FOLLOWS, val)
|
|
if val <= 0x0ffffffff:
|
|
return struct.pack('!BI', CBOR_UINT32_FOLLOWS, val)
|
|
if val <= 0x0ffffffffffffffff:
|
|
return struct.pack('!BQ', CBOR_UINT64_FOLLOWS, val)
|
|
outb = _dumps_bignum_to_bytearray(val)
|
|
return _CBOR_TAG_BIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb
|
|
val = -1 - val
|
|
return _encode_type_num(CBOR_NEGINT, val)
|
|
|
|
|
|
def dumps_float(val):
|
|
return struct.pack("!Bd", CBOR_FLOAT64, val)
|
|
|
|
|
|
def _encode_type_num(cbor_type, val):
|
|
"""For some CBOR primary type [0..7] and an auxiliary unsigned number,
|
|
return CBOR encoded bytes."""
|
|
assert val >= 0
|
|
if val <= 23:
|
|
return struct.pack('B', cbor_type | val)
|
|
if val <= 0x0ff:
|
|
return struct.pack('BB', cbor_type | CBOR_UINT8_FOLLOWS, val)
|
|
if val <= 0x0ffff:
|
|
return struct.pack('!BH', cbor_type | CBOR_UINT16_FOLLOWS, val)
|
|
if val <= 0x0ffffffff:
|
|
return struct.pack('!BI', cbor_type | CBOR_UINT32_FOLLOWS, val)
|
|
if (((cbor_type == CBOR_NEGINT) and (val <= 0x07fffffffffffffff)) or
|
|
((cbor_type != CBOR_NEGINT) and (val <= 0x0ffffffffffffffff))):
|
|
return struct.pack('!BQ', cbor_type | CBOR_UINT64_FOLLOWS, val)
|
|
if cbor_type != CBOR_NEGINT:
|
|
raise Exception("value too big for CBOR unsigned number: {0!r}".format(val))
|
|
outb = _dumps_bignum_to_bytearray(val)
|
|
return _CBOR_TAG_NEGBIGNUM_BYTES + _encode_type_num(CBOR_BYTES, len(outb)) + outb
|
|
|
|
|
|
def dumps_string(val, is_text=None, is_bytes=None):
|
|
if isinstance(val, str):
|
|
val = val.encode('utf8')
|
|
is_text = True
|
|
is_bytes = False
|
|
if (is_bytes) or not (is_text == True):
|
|
return _encode_type_num(CBOR_BYTES, len(val)) + val
|
|
return _encode_type_num(CBOR_TEXT, len(val)) + val
|
|
|
|
|
|
def dumps_array(arr, sort_keys=False):
|
|
head = _encode_type_num(CBOR_ARRAY, len(arr))
|
|
parts = [dumps(x, sort_keys=sort_keys) for x in arr]
|
|
return head + b''.join(parts)
|
|
|
|
|
|
def dumps_dict(d, sort_keys=False):
|
|
head = _encode_type_num(CBOR_MAP, len(d))
|
|
parts = [head]
|
|
if sort_keys:
|
|
for k in sorted(d.keys()):
|
|
v = d[k]
|
|
parts.append(dumps(k, sort_keys=sort_keys))
|
|
parts.append(dumps(v, sort_keys=sort_keys))
|
|
else:
|
|
for k, v in d.items():
|
|
parts.append(dumps(k, sort_keys=sort_keys))
|
|
parts.append(dumps(v, sort_keys=sort_keys))
|
|
return b''.join(parts)
|
|
|
|
|
|
def dumps_bool(b):
|
|
if b:
|
|
return struct.pack('B', CBOR_TRUE)
|
|
return struct.pack('B', CBOR_FALSE)
|
|
|
|
|
|
def dumps_tag(t, sort_keys=False):
|
|
return _encode_type_num(CBOR_TAG, t.tag) + dumps(t.value, sort_keys=sort_keys)
|
|
|
|
|
|
def dumps(ob, sort_keys=False):
|
|
if ob is None:
|
|
return struct.pack('B', CBOR_NULL)
|
|
if isinstance(ob, bool):
|
|
return dumps_bool(ob)
|
|
if isinstance(ob, (str, bytes, bytearray)):
|
|
if isinstance(ob, bytearray):
|
|
ob = bytes(ob)
|
|
return dumps_string(ob)
|
|
if isinstance(ob, (list, tuple)):
|
|
return dumps_array(ob, sort_keys=sort_keys)
|
|
if isinstance(ob, dict):
|
|
return dumps_dict(ob, sort_keys=sort_keys)
|
|
if isinstance(ob, float):
|
|
return dumps_float(ob)
|
|
if isinstance(ob, int):
|
|
return dumps_int(ob)
|
|
if isinstance(ob, Tag):
|
|
return dumps_tag(ob, sort_keys=sort_keys)
|
|
raise Exception("don't know how to cbor serialize object of type %s" % type(ob))
|
|
|
|
|
|
def dump(obj, fp, sort_keys=False):
|
|
"""obj: Python object to serialize. fp: file-like object capable of .write(bytes)."""
|
|
blob = dumps(obj, sort_keys=sort_keys)
|
|
fp.write(blob)
|
|
|
|
|
|
class Tag(object):
|
|
def __init__(self, tag=None, value=None):
|
|
self.tag = tag
|
|
self.value = value
|
|
|
|
def __repr__(self):
|
|
return "Tag({0!r}, {1!r})".format(self.tag, self.value)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Tag):
|
|
return False
|
|
return (self.tag == other.tag) and (self.value == other.value)
|
|
|
|
def __hash__(self):
|
|
return hash((self.tag, self.value)) if not isinstance(self.value, (list, dict, bytearray)) else id(self)
|
|
|
|
|
|
def loads(data):
|
|
"""Parse CBOR bytes and return Python objects."""
|
|
if data is None:
|
|
raise ValueError("got None for buffer to decode in loads")
|
|
if isinstance(data, (bytes, bytearray, memoryview)):
|
|
fp = BytesIO(bytes(data))
|
|
else:
|
|
fp = data
|
|
return _loads(fp)[0]
|
|
|
|
|
|
def load(fp):
|
|
"""Parse and return object from fp, a file-like object supporting .read(n)."""
|
|
return _loads(fp)[0]
|
|
|
|
|
|
_MAX_DEPTH = 100
|
|
|
|
|
|
def _tag_aux(fp, tb):
|
|
bytes_read = 1
|
|
tag = tb & CBOR_TYPE_MASK
|
|
tag_aux = tb & CBOR_INFO_BITS
|
|
if tag_aux <= 23:
|
|
aux = tag_aux
|
|
elif tag_aux == CBOR_UINT8_FOLLOWS:
|
|
data = fp.read(1)
|
|
aux = struct.unpack_from("!B", data, 0)[0]
|
|
bytes_read += 1
|
|
elif tag_aux == CBOR_UINT16_FOLLOWS:
|
|
data = fp.read(2)
|
|
aux = struct.unpack_from("!H", data, 0)[0]
|
|
bytes_read += 2
|
|
elif tag_aux == CBOR_UINT32_FOLLOWS:
|
|
data = fp.read(4)
|
|
aux = struct.unpack_from("!I", data, 0)[0]
|
|
bytes_read += 4
|
|
elif tag_aux == CBOR_UINT64_FOLLOWS:
|
|
data = fp.read(8)
|
|
aux = struct.unpack_from("!Q", data, 0)[0]
|
|
bytes_read += 8
|
|
else:
|
|
assert tag_aux == CBOR_VAR_FOLLOWS, "bogus tag {0:02x}".format(tb)
|
|
aux = None
|
|
|
|
return tag, tag_aux, aux, bytes_read
|
|
|
|
|
|
def _read_byte(fp):
|
|
tb = fp.read(1)
|
|
if len(tb) == 0:
|
|
raise EOFError()
|
|
return tb[0]
|
|
|
|
|
|
def _loads_var_array(fp, limit, depth, returntags, bytes_read):
|
|
ob = []
|
|
tb = _read_byte(fp)
|
|
while tb != CBOR_BREAK:
|
|
(subob, sub_len) = _loads_tb(fp, tb, limit, depth, returntags)
|
|
bytes_read += 1 + sub_len
|
|
ob.append(subob)
|
|
tb = _read_byte(fp)
|
|
return (ob, bytes_read + 1)
|
|
|
|
|
|
def _loads_var_map(fp, limit, depth, returntags, bytes_read):
|
|
ob = {}
|
|
tb = _read_byte(fp)
|
|
while tb != CBOR_BREAK:
|
|
(subk, sub_len) = _loads_tb(fp, tb, limit, depth, returntags)
|
|
bytes_read += 1 + sub_len
|
|
(subv, sub_len) = _loads(fp, limit, depth, returntags)
|
|
bytes_read += sub_len
|
|
ob[subk] = subv
|
|
tb = _read_byte(fp)
|
|
return (ob, bytes_read + 1)
|
|
|
|
|
|
def _loads_array(fp, limit, depth, returntags, aux, bytes_read):
|
|
ob = []
|
|
for _ in range(aux):
|
|
subob, subpos = _loads(fp, limit, depth, returntags)
|
|
bytes_read += subpos
|
|
ob.append(subob)
|
|
return ob, bytes_read
|
|
|
|
|
|
def _loads_map(fp, limit, depth, returntags, aux, bytes_read):
|
|
ob = {}
|
|
for _ in range(aux):
|
|
subk, subpos = _loads(fp, limit, depth, returntags)
|
|
bytes_read += subpos
|
|
subv, subpos = _loads(fp, limit, depth, returntags)
|
|
bytes_read += subpos
|
|
ob[subk] = subv
|
|
return ob, bytes_read
|
|
|
|
|
|
def _loads(fp, limit=None, depth=0, returntags=False):
|
|
"return (object, bytes read)"
|
|
if depth > _MAX_DEPTH:
|
|
raise Exception("hit CBOR loads recursion depth limit")
|
|
tb = _read_byte(fp)
|
|
return _loads_tb(fp, tb, limit, depth, returntags)
|
|
|
|
|
|
def _loads_tb(fp, tb, limit=None, depth=0, returntags=False):
|
|
# Some special cases of CBOR_7 best handled by special struct.unpack logic here
|
|
if tb == CBOR_FLOAT16:
|
|
data = fp.read(2)
|
|
hibyte, lowbyte = struct.unpack_from("BB", data, 0)
|
|
exp = (hibyte >> 2) & 0x1F
|
|
mant = ((hibyte & 0x03) << 8) | lowbyte
|
|
if exp == 0:
|
|
val = mant * (2.0 ** -24)
|
|
elif exp == 31:
|
|
val = float('Inf') if mant == 0 else float('NaN')
|
|
else:
|
|
val = (mant + 1024.0) * (2 ** (exp - 25))
|
|
if hibyte & 0x80:
|
|
val = -1.0 * val
|
|
return (val, 3)
|
|
elif tb == CBOR_FLOAT32:
|
|
data = fp.read(4)
|
|
pf = struct.unpack_from("!f", data, 0)
|
|
return (pf[0], 5)
|
|
elif tb == CBOR_FLOAT64:
|
|
data = fp.read(8)
|
|
pf = struct.unpack_from("!d", data, 0)
|
|
return (pf[0], 9)
|
|
|
|
tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb)
|
|
|
|
if tag == CBOR_UINT:
|
|
return (aux, bytes_read)
|
|
elif tag == CBOR_NEGINT:
|
|
return (-1 - aux, bytes_read)
|
|
elif tag == CBOR_BYTES:
|
|
ob, subpos = loads_bytes(fp, aux)
|
|
return (ob, bytes_read + subpos)
|
|
elif tag == CBOR_TEXT:
|
|
raw, subpos = loads_bytes(fp, aux, btag=CBOR_TEXT)
|
|
ob = raw.decode('utf8')
|
|
return (ob, bytes_read + subpos)
|
|
elif tag == CBOR_ARRAY:
|
|
if aux is None:
|
|
return _loads_var_array(fp, limit, depth, returntags, bytes_read)
|
|
return _loads_array(fp, limit, depth, returntags, aux, bytes_read)
|
|
elif tag == CBOR_MAP:
|
|
if aux is None:
|
|
return _loads_var_map(fp, limit, depth, returntags, bytes_read)
|
|
return _loads_map(fp, limit, depth, returntags, aux, bytes_read)
|
|
elif tag == CBOR_TAG:
|
|
ob, subpos = _loads(fp, limit, depth + 1, returntags)
|
|
bytes_read += subpos
|
|
if returntags:
|
|
ob = Tag(aux, ob)
|
|
else:
|
|
ob = tagify(ob, aux)
|
|
return ob, bytes_read
|
|
elif tag == CBOR_7:
|
|
if tb == CBOR_TRUE:
|
|
return (True, bytes_read)
|
|
if tb == CBOR_FALSE:
|
|
return (False, bytes_read)
|
|
if tb == CBOR_NULL:
|
|
return (None, bytes_read)
|
|
if tb == CBOR_UNDEFINED:
|
|
return (None, bytes_read)
|
|
raise ValueError("unknown cbor tag 7 byte: {:02x}".format(tb))
|
|
|
|
|
|
def loads_bytes(fp, aux, btag=CBOR_BYTES):
|
|
if aux is not None:
|
|
ob = fp.read(aux)
|
|
return (ob, aux)
|
|
chunklist = []
|
|
total_bytes_read = 0
|
|
while True:
|
|
tb = fp.read(1)[0]
|
|
if tb == CBOR_BREAK:
|
|
total_bytes_read += 1
|
|
break
|
|
tag, tag_aux, aux, bytes_read = _tag_aux(fp, tb)
|
|
assert tag == btag, 'variable length value contains unexpected component'
|
|
ob = fp.read(aux)
|
|
chunklist.append(ob)
|
|
total_bytes_read += bytes_read + aux
|
|
return (b''.join(chunklist), total_bytes_read)
|
|
|
|
|
|
def _bytes_to_biguint(bs):
|
|
out = 0
|
|
for ch in bs:
|
|
out = out << 8
|
|
out = out | ch
|
|
return out
|
|
|
|
|
|
def tagify(ob, aux):
|
|
if aux == CBOR_TAG_DATE_STRING:
|
|
# RFC3339 date string parsing not implemented; return as Tag.
|
|
return Tag(aux, ob)
|
|
if aux == CBOR_TAG_DATE_ARRAY:
|
|
return datetime.datetime.fromtimestamp(ob, tz=datetime.timezone.utc)
|
|
if aux == CBOR_TAG_BIGNUM:
|
|
return _bytes_to_biguint(ob)
|
|
if aux == CBOR_TAG_NEGBIGNUM:
|
|
return -1 - _bytes_to_biguint(ob)
|
|
if aux == CBOR_TAG_REGEX:
|
|
return re.compile(ob)
|
|
return Tag(aux, ob)
|
|
|
|
|
|
def encode(obj, sort_keys=False):
|
|
return dumps(obj, sort_keys=sort_keys)
|
|
|
|
|
|
def decode(data):
|
|
return loads(data)
|