P2P Layer

P2P Layer

Please note that the following document is a very rough draft and that since the code is in continuous improvement cycle, some of the APIs as well as the data structures might be outdated in the code or this document.

The P2P Layer consists of 4 main sub modules ie RPC, PubSub, Kademlia and MessageHandler. This document tries to explain  the flow of the RPC and MessageHandler.

An RPC call is nothing but a Remote Procedure Call which is invoked by a node on its remote peer to fetch a particular resource (e.g. fetch a full block for a given block header).

The Pub-Sub Model is the way Services in the Application Service Layer interact with the Xoken Network. Each node acts as a Publisher, Subscriber and Notifier for each Service that it is in the Application Service Layer. This is done asynchronously and asymmetrically i.e, for a node the peers it is subscribed to (Servers/Notifiers) and peers who have subscribed to it (Clients/Watchers) may not necessarily be equivalent. The subscription topics are predefined for each service (abstract at the moment).

Here is an example of how a single service BlockSync is employing different mechanisms, namely RPC & Pub-Sub for its needs:

Consider a Node A which has a service BlockSync which syncs and validates blocks.  It has a resource “Get Block for the given Block Header “ and has a topic “Latest BlockHeader”.

The P2P Layer will subscribe to say t number of nodes for the “Latest BlockHeader” topic and tries to maintain r number of nodes which provide the “Get Block” resource. So when any of these t nodes get a new block  they will notify node A on the “Latest BlockHeader” topic by sending the block header to it. Upon validating this new header, node A will initiate a RPC call for “Get Block” for that particular header and get the block itself.

This will result in faster propagation of new Blocks throughout the network among other things over just issuing an RPC and checking if the some peer has gotten latest block or not (traditionally used in most BlockChains).

Although this might in a worst case scenario result in Node A receiving multiple copies of the same service message but this will be identified at the P2P level itself by a simple hash of the ServiceMessage which will be same, as the ServiceMessage will be containing the same information.

PubSub has not been implemented yet so the data structures as well as the APIs are not yet final for it.

 

MessageHandler

The messageHandler takes care of multiplexing various messages from Kademlia, RPC and PubSub to various nodes as well as demultiplexing incoming messages to each of them.

RPC and PubSub just provide the nodeId of the peer that they wish to send a request to. MessageHandler chooses the peer and selects the TCP connection in case of RPC and UDP in case of PubSub and Kademlia. To each peer we can have at most one connection for TCP and one connection for UDP.

 

Since Kademlia may need numerous connections (UDP) that are very short lived, we just store the connection handle, UUIDMap mapped to its NodeID. This is to differentiate between connections made by kademlia and connections made by the RPC or PubSub. When RPC or PubSub ask for a peer from Kademlia the peer’s details (UUIDMap, IP, TCPPort , UDPPort) are added to the nodeIdPeerMap which Handle of type NotConnected. Their Handles are created when a send request is called for the first time or if an incomming connection for either of its handles recieved.

It maintains a NodeIdPeerMap which maps NodeIds to a TVar that contains the following details about the peer node  :-

data PeerDetails = PeerDetails
{ nodeId :: NodeId –Redundant, not needed
, rep :: Maybe Int
, ip :: Maybe IP
, udpPort :: Maybe PortNumber
, tcpPort :: Maybe PortNumber
, streamHandle :: Handle
, datagramHandle :: Handle
, tvarUUIDMap :: TVar UUIDMap
}

data Handle
= NotConnected
| Pending
| Connected { connId :: ConnectionId }
deriving (Eq, Ord, Show, Generic)

 

type UUIDMap = HM.HashMap P2PUUID (MVar P2PMessage)

which maps UUIDs of Outgoing requests to their respective MVars on which sendRequest API blocks until its response is put into the MVar. (more explained in the following section)

type NodeIdPeerMap = HM.HashMap NodeId (TVar PeerDetails)

 

APIs

These APIs are internal to the P2P Layer and are exposed to RPC, Kademlia and PubSub.

sendRequest ::
(HasP2PEnv m) => NodeId -> MessageType -> P2PPayload -> m P2PPayload

  • it performs a lookup on the  NodeIdPeer HashMap and gets all the details about the destination NodeId
  • it is a blocking function
  • it creates an empty MVar and a new UUID (for now uuid V4 is used)
  • RPC uses TCP whereas Kademlia and PubSub use UDP
  • it wraps the p2ppayload by attaching  messagetype (Kademlia | PubSub | RPC) and UUID to it.
  • it then checks if the Connection handle exists for that particular transport type  
  • if not then it creates a connection for that particular transport type and adds its handle to the peer details along with launching an async thread of readRequestThread
  • it adds the UUID MVar key value pair to the UUIDMap.
  • it then sends the particular message  using send  API exposed from the Network Layer
  • and then races between a timer for that message response and Take on the MVar
  • if timer expires first then the uuid is deleted from the UUID Hashmap and an exception is thrown else return TakeMVar and then delete the uuid from the UUID Hashmap

sendRequestforKademlia ::
(HasP2PEnv m) => NodeId -> MessageType -> P2PPayload -> PortNumber -> IP -> m P2PPayload

  • Kademlia might send message requests like PING or FIND_NODE to peers who might be exclusive to only Kademlia
  • Such a peer might be dropped by kademlia at any time, hence this is a special sendRequest for kademlia where only the node and its udp handle are saved for the temporary use of kademlia

cleanConnection ::
(HasP2PEnv m) => NodeId -> ConnectionId -> TransportType -> m ()

  • This function is passed to the network layer, which calls it when it has terminated a connection and it needs to be cleaned from the NodeIdPeer HashMap.

newIncomingConnectionHandler ::
(HasP2PEnv m) => NodeId -> Connection -> TransportType -> m ()

  • This function is passed to the network layer, which calls it when it has successfully established an incoming connection.

  • It gets a connection handle which it adds that into the NodeIdPeer HashMap

  • It also forks a  readRequestThread on the connection handle passed to it

readRequestThread ::
(HasP2PEnv m) => ConnectionId -> TVar UUIDMap -> MessageTypeMap m -> m ()

  • this is a forever looping function that keeps reading from the network layer
  • After a message is read, processIncomingMessage is async launched with the received message passed as a bytestring.

processIncomingMessage ::
(HasP2PEnv m) => ConnectionId -> TVar UUIDMap -> MessageTypeMap m -> ByteString -> m ()

  • This segregates the incoming message as a response or a new request
  • This is the thread that takes care of getting a response from its node for the incoming request or matching an incoming response to its outgoing request
  • IF the uuid matches a uuid in the UUIDMap then it’s P2PPayload is put into the MVar
  • ELSE depending on its message type (Kademlia | PubSub | RPC) , a handler passed down from the the message types is called passing the P2PPayload and returning with a response P2PPayload
  • The handlers for Kademlia | PubSub | RPC are stored in  MessageTypeMap when the P2PENV is initialized.

type MessageTypeHandler m = P2PPayload -> m P2PPayload

type MessageTypeMap m = HM.HashMap MessageType (MessageTypeHandler m)

  • Here P2PPayload is just the bytestring containing respective message type formats as per Kademlia,  PubSub or RPC.
  • This response P2PPayload is wrapped with the incoming UUID and the whole response is sent using the send API of the network layer.

 

Remote Procedure Calls (RPC)

  • Used by a Service on a node to request resources from Services running on any other node in the network. And to serve incoming resource requests from peers.
  • The resources are identified with a unique ResourceId
  • Each service has a ServiceId associated with it.
  • RPC maintains the current state of the network through a ResourceToPeer HashMap. The hashmap has the following structure

type ResourceHandler = (ServiceMessage -> ServiceMessage)

type ResourceToPeerMap =
HM.HashMap ResourceId (ResourceHandler, TVar [NodeId])

  • The  List stores the IDs of all the Node that can provide the resource.

For example if the following entry is there in the hashmap

{ key = R1, value = ( <Handler>, <TVar [ Node1, Node2,.. ] >) }

So when service S1 (which registered R1) needs R1 resource, a node will be taken out of the List at random and asked for it, if it doesn’t give R1 then another node is selected and asked until S1 gets R1

    Also when an incoming request arrives for R1, Handler will be passed the request and it will return the response.

 

Exposed APIs

These APIs are exposed to the Service Layer

registerResource :: (HasP2PEnv m) => ResourceId -> ResourceHandler-> m()

  • Used for the first time registration of resources. Each service sends a resource with the corresponding handler function. This function is used for handling incoming requests to that particular resource.
  • This function then add the resources to the ResourceID hashmap.

getResource ::
(HasP2PEnv m) => ResourceId -> ServiceMessage -> m ServiceMessage

  • Called by the Service layer to get a specific resource (specified by ResourceId).
  • The functions uses the ResourceId as key to perform a lookup in the hashmap and get the List of NodeIds.
  • It will select a node at random and ask it for that particular Resource.
  • If the node is able to satisfy the request, it will send the resource back to the service but if the node is unable to provide the resource, it will ask another node from the queue for the resources.

rpcHandler :: (HasP2PEnv m) => P2PPayload -> m P2PPayload

  • This is not exposed to the service layer, but it is passed to the messagehandler for handling of incoming RPC requests.
  • This takes an incoming P2PPayload, verifies that it is a request and then calls the particular ResourceHandler by looking it up the ResourceToPeerMap.
  • It wraps the response returned by the ResourceHandler and then returns the newly constructed response P2PPayload .

 

Periodic Maintenance of the HashMap

We need to continuously maintain the ResourceId hashmap so that all the resources have at least a minimum number of nodes in their NodeId list .

updatePeerInResourceMap :: (HasP2PEnv m) => m ()

To achieve this we spawn off a background thread which does the following

  1. Periodically check the hashmap and see if any resource does not have the required number of nodes. To find this out we find the resource with the minimum number of nodes.
  2. If such a resource exists then ask Kademlia for more peers.
  3. Once Kademlia gives us the required peers we do the following for each peer
  1. Spawn a new thread
  2. Send the Peer an OPTIONS message (This message indicates that we are asking the peer which resources can you offer)
  3. This is a blocking call and will wait for a SUPPORT message from the peer.
  4. The Support message will contain the list of Resources.
  5. Once the SupportMessage is recieved the addNewPeer function is called
  6. The List will be of the form [ ResourceId,.. ]  for each resource we add the NodeId of the Peer to the NodeId List hence updating the HashMap.
  1. After the HashMap is added the Thread will sleep for a specified time and then redo the     

        Above steps. If no new Peers are Added it sleeps for a longer duration.

Additionally the backgroud thread send the Options message in batch (for example to 5 nodes) so it will always add more nodes then currently needed to reach the minimum count.

 

Sending of Options Message

sendOptionsMessage :: (HasP2PEnv m) => [NodeId] -> m ()

  • It creates a Separate thread for each Node and sends an options message to all the nodes one by one.
  • The expected response to this message is a list of resourceIDs which is sent using the Options handler.
  • This ResourceID list is used to add new peers to the ResourceID hashMap

optionsHandler :: (HasP2PEnv m) => P2PPayload -> m P2PPayload

  • This is not exposed to the service layer, but it is passed to the messagehandler for handling of incoming Options request.
  • This takes the incoming Option message as a P2PPayload and then verifies if it is actually an Option message, it will then form a Support message using the list of ResourceIDs
  • The Support message is then transformed into a P2PPayload  which is returned.