Files
simplexmq/src/Simplex/Messaging/Server/QueueStore/STM.hs
Evgeny Poberezkin 227d83d0e7 SMP commands for notifications (NKEY/NID and NSUB/NMSG) with separate queue ID and key (#199)
* SMP commands for notifications (LSTN, NTFY) with separate queue IDs and keys

* rename Notifier types

* remove notify key and id from NEW and IDS commands (TODO add other commands)

* fix StoreLog serialization

* add commands for managing notifications

* add notification subscribers to server state, add notifier ID and key to store log

* add notifier ID and key to the queue

* refactor END notification to work for both types of subscriptions, deliver message notification (NMSG)

* process NSUB command - subscribe to message notifications

* test for message notifications

* fix SMP client function for NSUB command

* fix parse/serialize NID command

* refactor use ifM

* check duplicate notifier ID only against other notifier IDs

* refactor getQueue

* test notifier ID and key with store log

* Update src/Simplex/Messaging/Client.hs

Co-authored-by: Efim Poberezkin <8711996+efim-poberezkin@users.noreply.github.com>

* Update src/Simplex/Messaging/Server.hs

Co-authored-by: Efim Poberezkin <8711996+efim-poberezkin@users.noreply.github.com>

* store log: s/NOTIFY/NOTIFIER/

Co-authored-by: Efim Poberezkin <8711996+efim-poberezkin@users.noreply.github.com>
2021-11-14 18:52:29 +00:00

119 lines
4.0 KiB
Haskell

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}
module Simplex.Messaging.Server.QueueStore.STM where
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore
import UnliftIO.STM
data QueueStoreData = QueueStoreData
{ queues :: Map RecipientId QueueRec,
senders :: Map SenderId RecipientId,
notifiers :: Map NotifierId RecipientId
}
type QueueStore = TVar QueueStoreData
newQueueStore :: STM QueueStore
newQueueStore = newTVar QueueStoreData {queues = M.empty, senders = M.empty, notifiers = M.empty}
instance MonadQueueStore QueueStore STM where
addQueue :: QueueStore -> RecipientPublicKey -> (RecipientId, SenderId) -> STM (Either ErrorType ())
addQueue store rKey ids@(rId, sId) = do
cs@QueueStoreData {queues, senders} <- readTVar store
if M.member rId queues || M.member sId senders
then return $ Left DUPLICATE_
else do
writeTVar store $
cs
{ queues = M.insert rId (mkQueueRec rKey ids) queues,
senders = M.insert sId rId senders
}
return $ Right ()
getQueue :: QueueStore -> SParty (p :: Party) -> QueueId -> STM (Either ErrorType QueueRec)
getQueue st party qId = do
cs <- readTVar st
pure $ case party of
SRecipient -> getRcpQueue cs qId
SSender -> getPartyQueue cs senders
SNotifier -> getPartyQueue cs notifiers
SBroker -> Left INTERNAL
where
getPartyQueue ::
QueueStoreData ->
(QueueStoreData -> Map QueueId RecipientId) ->
Either ErrorType QueueRec
getPartyQueue cs recipientIds =
case M.lookup qId $ recipientIds cs of
Just rId -> getRcpQueue cs rId
Nothing -> Left AUTH
secureQueue :: QueueStore -> RecipientId -> SenderPublicKey -> STM (Either ErrorType ())
secureQueue store rId sKey =
updateQueues store rId $ \cs c ->
case senderKey c of
Just _ -> (Left AUTH, cs)
_ -> (Right (), cs {queues = M.insert rId c {senderKey = Just sKey} (queues cs)})
addQueueNotifier :: QueueStore -> RecipientId -> NotifierId -> NotifierPublicKey -> STM (Either ErrorType ())
addQueueNotifier store rId nId nKey = do
cs@QueueStoreData {queues, notifiers} <- readTVar store
if M.member nId notifiers
then pure $ Left DUPLICATE_
else case M.lookup rId queues of
Nothing -> pure $ Left AUTH
Just q -> case notifier q of
Just _ -> pure $ Left AUTH
_ -> do
writeTVar store $
cs
{ queues = M.insert rId q {notifier = Just (nId, nKey)} queues,
notifiers = M.insert nId rId notifiers
}
pure $ Right ()
suspendQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
suspendQueue store rId =
updateQueues store rId $ \cs c ->
(Right (), cs {queues = M.insert rId c {status = QueueOff} (queues cs)})
deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
deleteQueue store rId =
updateQueues store rId $ \cs c ->
( Right (),
cs
{ queues = M.delete rId (queues cs),
senders = M.delete (senderId c) (senders cs)
}
)
updateQueues ::
QueueStore ->
RecipientId ->
(QueueStoreData -> QueueRec -> (Either ErrorType (), QueueStoreData)) ->
STM (Either ErrorType ())
updateQueues store rId update = do
cs <- readTVar store
let conn = getRcpQueue cs rId
either (return . Left) (_update cs) conn
where
_update cs c = do
let (res, cs') = update cs c
writeTVar store cs'
return res
getRcpQueue :: QueueStoreData -> RecipientId -> Either ErrorType QueueRec
getRcpQueue cs rId = maybe (Left AUTH) Right . M.lookup rId $ queues cs