diff --git a/packages/simplexmq/lib/src/crypto.dart b/packages/simplexmq/lib/src/crypto.dart index 46fa8943a9..bc00093efb 100644 --- a/packages/simplexmq/lib/src/crypto.dart +++ b/packages/simplexmq/lib/src/crypto.dart @@ -6,9 +6,12 @@ import 'package:pointycastle/asymmetric/oaep.dart'; import 'package:pointycastle/asymmetric/rsa.dart'; import 'package:pointycastle/block/aes_fast.dart'; import 'package:pointycastle/block/modes/gcm.dart'; +import 'package:pointycastle/digests/sha256.dart'; import 'package:pointycastle/key_generators/api.dart'; import 'package:pointycastle/key_generators/rsa_key_generator.dart'; import 'package:pointycastle/random/fortuna_random.dart'; +import 'package:pointycastle/signers/rsa_signer.dart'; +import 'buffer.dart' show empty; class AESKey { final Uint8List _key; @@ -44,7 +47,6 @@ Uint8List _randomBytes(int len, Random seedSource) { return bytes; } -final empty = Uint8List(0); final paddingByte = '#'.codeUnitAt(0); Uint8List encryptAES(AESKey key, Uint8List iv, int padTo, Uint8List data) { @@ -90,3 +92,19 @@ Uint8List decryptOAEP(RSAPrivateKey key, Uint8List data) { ..init(false, PrivateKeyParameter(key)); return oaep.process(data); } + +Uint8List signPSS(RSAPrivateKey privateKey, Uint8List data) { + final signer = RSASigner(SHA256Digest(), '0609608648016503040201') + ..init(true, PrivateKeyParameter(privateKey)); + return signer.generateSignature(data).bytes; +} + +bool verifyPSS(RSAPublicKey publicKey, Uint8List data, Uint8List sig) { + final verifier = RSASigner(SHA256Digest(), '0609608648016503040201') + ..init(false, PublicKeyParameter(publicKey)); + try { + return verifier.verifySignature(data, RSASignature(sig)); + } on ArgumentError { + return false; + } +} diff --git a/packages/simplexmq/lib/src/parser.dart b/packages/simplexmq/lib/src/parser.dart index 18374e1825..752187bc26 100644 --- a/packages/simplexmq/lib/src/parser.dart +++ b/packages/simplexmq/lib/src/parser.dart @@ -17,6 +17,7 @@ final charDot = cc('.'); class Parser { final Uint8List _s; + final List _positions = []; int _pos = 0; bool _fail = false; Parser(this._s); @@ -35,6 +36,20 @@ class Parser { return res; } + T? tryParse(T? Function(Parser p) parse) { + if (_fail || _pos >= _s.length) { + _fail = true; + return null; + } + _positions.add(_pos); + final res = parse(this); + final prevPos = _positions.removeLast(); + if (res == null) { + _pos = prevPos; + _fail = false; + } + } + // takes a required number of bytes Uint8List? take(int len) => _run(() { final end = _pos + len; diff --git a/packages/simplexmq/lib/src/protocol.dart b/packages/simplexmq/lib/src/protocol.dart index 2e90d76847..f80b88d97d 100644 --- a/packages/simplexmq/lib/src/protocol.dart +++ b/packages/simplexmq/lib/src/protocol.dart @@ -174,7 +174,7 @@ class ERR extends BrokerCommand { : cmdErr = err == ErrorType.CMD ? throw ArgumentError('CMD error should be created with ERR.CMD') : null; - ERR.cmd(this.cmdErr) : err = ErrorType.CMD; + ERR.cmd(CmdErrorType this.cmdErr) : err = ErrorType.CMD; @override Uint8List serialize() { final _err = errorTags[err]!; diff --git a/packages/simplexmq/lib/src/transport.dart b/packages/simplexmq/lib/src/transport.dart index 87240ad203..baf6cf9a41 100644 --- a/packages/simplexmq/lib/src/transport.dart +++ b/packages/simplexmq/lib/src/transport.dart @@ -5,6 +5,7 @@ import 'package:pointycastle/digests/sha256.dart'; import 'buffer.dart'; import 'crypto.dart'; import 'parser.dart'; +import 'protocol.dart'; import 'rsa_keys.dart'; abstract class Transport { @@ -13,16 +14,6 @@ abstract class Transport { Future close(); } -Stream blockStream(Transport t, int blockSize) async* { - try { - while (true) { - yield await t.read(blockSize); - } - } catch (e) { - return; - } -} - class SMPServer { final String host; final int? port; @@ -59,11 +50,63 @@ const int _binaryRsaTransport = 0; const int _transportBlockSize = 4096; const int _maxTransportBlockSize = 65536; +class _Request { + final Uint8List queueId; + final Completer completer = Completer(); + _Request(this.queueId); +} + +class Either { + final L? left; + final R? right; + Either.left(L this.left) : right = null; + Either.right(R this.right) : left = null; +} + +enum ClientErrorType { SMPServerError, SMPResponseError, SMPUnexpectedResponse } + +class BrokerResponse { + final BrokerCommand? command; + final ClientErrorType? errorType; + final ERR? error; + BrokerResponse(BrokerCommand this.command) + : errorType = null, + error = null; + BrokerResponse.error(ClientErrorType this.errorType, this.error) + : command = null; +} + +class ClientTransmission { + final String corrId; + final Uint8List queueId; + final ClientCommand command; + ClientTransmission(this.corrId, this.queueId, this.command); + + Uint8List serialize() => + unwordsN([encodeAscii(corrId), encode64(queueId), command.serialize()]); +} + +class BrokerTransmission { + final String corrId; + final Uint8List queueId; + final BrokerCommand? command; + final ERR? error; + BrokerTransmission(this.corrId, this.queueId, BrokerCommand this.command) + : error = null; + BrokerTransmission.error(this.corrId, this.queueId, ERR this.error) + : command = null; +} + +final badBlock = BrokerTransmission.error('', empty, ERR(ErrorType.BLOCK)); + class SMPTransportClient { final Transport _conn; final _sndKey = SessionKey.create(); final _rcvKey = SessionKey.create(); final int blockSize; + int _corrId = 0; + bool _messageStreamCreated = false; + final Map _sentCommands = {}; SMPTransportClient._(this._conn, this.blockSize); static Future connect(Transport conn, @@ -75,6 +118,92 @@ class SMPTransportClient { return _conn.close(); } + Future sendSMPCommand( + RSAPrivateKey? key, Uint8List queueId, ClientCommand cmd) async { + final corrId = (++_corrId).toString(); + final t = ClientTransmission(corrId, queueId, cmd).serialize(); + final sig = key == null ? empty : encode64(signPSS(key, t)); + final data = unwordsN([sig, t, empty]); + final r = _sentCommands[corrId] = _Request(queueId); + await _writeEncrypted(data); + print('block sent'); + return r.completer.future; + } + + Stream messageStream() { + if (_messageStreamCreated) { + throw Exception('message stream already created'); + } + _messageStreamCreated = true; + return _messageStream(); + } + + Stream _messageStream() async* { + try { + while (true) { + final block = await _readEncrypted(); + final t = _parseBrokerTransmission(block); + print('block received'); + print(t); + if (t.corrId == '') { + yield t; + } else { + final r = _sentCommands.remove(t.corrId); + if (r == null) { + yield t; + } else { + final cmd = t.command; + r.completer.complete(r.queueId.equal(t.queueId) + ? cmd == null + ? BrokerResponse.error( + ClientErrorType.SMPResponseError, t.error) + : cmd is ERR + ? BrokerResponse.error( + ClientErrorType.SMPServerError, cmd) + : BrokerResponse(cmd) + : BrokerResponse.error( + ClientErrorType.SMPUnexpectedResponse, null)); + } + } + } + } catch (e) { + return; + } + } + + static BrokerTransmission _parseBrokerTransmission(Uint8List s) { + final p = Parser(s); + p.space(); + final cId = p.word(); + p.space(); + final queueId = p.tryParse((p) => p.base64()) ?? empty; + p.space(); + if (p.fail || cId == null) return badBlock; + final corrId = decodeAscii(cId); + final command = smpCommandP(p); + if (command == null) { + return BrokerTransmission.error( + corrId, queueId, ERR.cmd(CmdErrorType.SYNTAX)); + } + if (command is! BrokerCommand) { + return BrokerTransmission.error( + corrId, queueId, ERR.cmd(CmdErrorType.PROHIBITED)); + } + final qErr = _tQueueError(queueId, command); + if (qErr != null) { + return BrokerTransmission.error(corrId, queueId, ERR.cmd(qErr)); + } + return BrokerTransmission(corrId, queueId, command); + } + + static CmdErrorType? _tQueueError(Uint8List queueId, BrokerCommand cmd) { + if (cmd is IDS || cmd is PONG) { + if (queueId.isNotEmpty) return CmdErrorType.HAS_AUTH; + } else if (cmd is! ERR && queueId.isEmpty) { + return CmdErrorType.NO_QUEUE; + } + } + static Future _clientHandshake( Transport conn, Uint8List? keyHash, int? blkSize) async { final srv = await _getHeaderAndPublicKey_1_2(conn, keyHash); @@ -166,6 +295,12 @@ class SMPTransportClient { return decryptAES(_rcvKey.aesKey, iv, block); } + Future _writeEncrypted(Uint8List data) { + final iv = _nextIV(_sndKey); + final block = encryptAES(_sndKey.aesKey, iv, blockSize, data); + return _conn.write(block); + } + static Uint8List _nextIV(SessionKey sk) { final c = encodeInt32(sk._counter++); final start = sk.baseIV.sublist(0, 4); diff --git a/packages/simplexmq_io/test/transport_test.dart b/packages/simplexmq_io/test/transport_test.dart index a9ba0db785..3b46e0edc9 100644 --- a/packages/simplexmq_io/test/transport_test.dart +++ b/packages/simplexmq_io/test/transport_test.dart @@ -1,21 +1,37 @@ -// import 'dart:io'; import 'package:simplexmq/simplexmq.dart'; import 'package:simplexmq/src/buffer.dart'; +import 'package:simplexmq/src/crypto.dart'; +import 'package:simplexmq/src/rsa_keys.dart'; import 'package:simplexmq_io/simplexmq_io.dart'; import 'package:test/test.dart'; +final keyHash = + decode64(encodeAscii('pH7bg7B6vB3uJ1poKmClTAqr7yYWnAtapnIDN7ypKxU=')); void main() { - group('SMP transport', () { - test( - 'establish connection (expects SMP server on localhost:5223)', - () async { - final conn = await SocketTransport.connect('localhost', 5223); - final smp = await SMPTransportClient.connect(conn, - keyHash: decode64( - encodeAscii('pH7bg7B6vB3uJ1poKmClTAqr7yYWnAtapnIDN7ypKxU='))); - expect(smp is SMPTransportClient, true); - }, - skip: 'requires SMP server on port 5223', - ); - }); + group('SMP transport (expects SMP server on localhost:5223)', () { + test('establish connection', () async { + final conn = await SocketTransport.connect('localhost', 5223); + final smp = await SMPTransportClient.connect(conn, keyHash: keyHash); + expect(smp is SMPTransportClient, true); + }); + + test('should create SMP queue and send message', () async { + final conn1 = await SocketTransport.connect('localhost', 5223); + final alice = await SMPTransportClient.connect(conn1, keyHash: keyHash); + final aliceKeys = generateRSAkeyPair(); + final rcvKeyStr = encode64(encodeRsaPubKey(aliceKeys.publicKey)); + + // final conn2 = await SocketTransport.connect('localhost', 5223); + // final bob = await SMPTransportClient.connect(conn2, keyHash: keyHash); + // final bobKeys = generateRSAkeyPair(); + // final sndKeyStr = encode64(encodeRsaPubKey(bobKeys.publicKey)); + + // print('we are here'); + + final resp = await alice.sendSMPCommand( + aliceKeys.privateKey, empty, NEW(rcvKeyStr)); + print(resp); + }); + // }); + }, skip: 'requires SMP server on port 5223'); }