P2P网络数据处理流程及数据交互
一、P2P网络数据处理流程
监听(ListenLoop)+拨号(Dial) –> 建立连接(SetupConn) –> Enc 握手(doEncHandshake) –> 协议握手(doProtoHandshake) –> 添加Peer Addpeer –> Run Peer
1. Enc握手 doEncHandshake
监听时接收到Enc握手:receiverEncHandshake
拨号时发起初始End握手:initiatorEncHandshake
链接的发起者被称为initiator(主动拨号),链接的被动接受者被成为receiver(被动监听)。 这两种模式下处理的流程是不同的,完成握手后, 生成了一个sec可以理解为拿到了对称加密的密钥。 然后创建了一个newRLPXFrameRW帧读写器,完成加密信道的创建过程。
initiatorEncHandshake 和receiverEncHandshake有些像,但逻辑处理是相反的过程。
makeAuthMsg
makeAuthMsg这个方法创建了handshake message。 首先对端的公钥可以通过对端的ID来获取。对端的公钥对于发起者来说是知道的;对于接收者来说是不知道的。
根据对端的ID计算出对端公钥remotePub
生成一个随机的初始值initNonce
生成一个随机的私钥
使用自己的私钥和对方的公钥生成的一个共享秘密
用共享秘密来加密这个initNonce
这里把发起者的公钥告知对方
这一步,主要是构建authMsgV4结构体。
sealEIP8
sealEIP8对msg进行rlp的编码,填充一下数据,然后使用对方的公钥把数据进行加密。
readHandshakeMsg
readHandshakeMsg有两个地方调用: 一个是在initiatorEncHandshake,另外一个就是在receiverEncHandshake。 这个方法比较简单, 首先用一种格式尝试解码,如果不行就换另外一种。基本上就是使用自己的私钥进行解码然后调用rlp解码成结构体。 结构体的描述就是authRespV4.里面最重要的就是对端的随机公钥。 双方通过自己的私钥和对端的随机公钥可以得到一样的共享秘密。 而这个共享秘密是第三方拿不到的。
secrets
secrets函数是在handshake完成之后调用。它通过自己的随机私钥和对端的公钥来生成一个共享秘密,这个共享秘密是瞬时的(只在当前这个链接中存在)。
这个函数计算出IngressMAC和EgressMAC用于rlpxFrameRW中ReadMsg,WriteMsg数据的接收发送。
数据帧结构
2. 协议握手doProtoHandshake
这个方法比较简单,加密信道已经创建完毕。 我们看到这里只是约定了是否使用Snappy加密然后就退出了。
在这个函数,发送给对方 handshakeMsg = 0×00.在readProtocolHandshake中读取接收对方发过来的handshakeMsg。
3. RLPX 数据分帧
在完成Encode握手之后,调用newRLPXFrameRW方法创建rlpxFrameRW对象,这的对象提供ReadMsg和WriteMsg方法
ReadMsg
)
1读取帧头header
2 验证帧头MAC
3 获取帧体Frame大小
4 读取帧体数据
5 验证帧体MAC信息
6 解密帧体内容(NewCTR à XORKeyStream)
7 解码帧体(RLP Decode)
8 解析帧体结构(msg.Size & msg.Payload)
9 snappy解码
WriteMsg
1 RLP编码msg.Code
2 如果snappy,就对读取payload并进行snappy编码
3 写帧头header (32字节)
4 写帧头MAC
5 写帧体信息(ptype+payload+padding)
6 写帧体MAC
4. runPeer
newPeerHook,建立peer的钩子函数
广播PeerEventTypeAdd事件
运行protocol
广播PeerEventTypeDrop事件
删除peer
run protocol
1 启动协程readLoop,读取消息并根据msg.Code处理消息:
pingMsg->pongMsg
discMsg->RLP解码msg.Payload返回reason
其他协议消息处理,根据msg.Code的取值范围,把msg分给注册的协议进行处理。
2 启动协程pingLoop
根据pingInterval(15秒)定时发送pingMsg消息
3 启动协议
startProtocols主要功能是启动协程运行注册协议的run函数proto.Run(p, rw),这个rw参数类型是protoRW,它实现的ReadMsg和WriteMsg增加msg.Code取值范围的处理。不同的protocol有不同的code取值范围,根据offset和Length确定。
二、P2P网络数据交互
1. 发送交易数据SendTransactions
事件触发交易广播txBroadcastLoop
本地发送了一个交易,或者是接收到别人发来的交易信息。 txpool会产生一条消息,消息被传递到txCh通道。然后被goroutine txBroadcastLoop()处理, 发送给其他不知道这个交易的peer。
ProtocolManager在Start的时候,订阅TxPreEvent并启动txBroadcastLoop协程监听事件。
当监听到事件后,调用BroadcastTx进行广播,广播按照委员及候选委员,接入节点,轻节点逐层广播。
发送交易之前,会把tx.Hash放到peer的knownTxs中:
新连接建立txsyncLoop
txsyncLoop负责每个新连接的初始事务同步。 当新的peer出现时,我们转发所有当前待处理的事务。
在txsyncLoop函数中定义了一个send函数来广播交易信息:
2. 发送区块哈希值SendNewBlockHashes
广播挖矿区块 NewMinedBlockEvent
ProtocolManager在Start的时候,订阅NewMinedBlockEvent并启动 minedBroadcastLoop()协程监听事件。
监听到事件后,开始广播区块信息。
先根据BroadcastBlock输入的参数propagate决定是否广播区块,当propagate为true时,广播区块信息。之后开始广播区块哈希。
广播时,先把hash放到knownBlocks里面,在广播区块和区块哈希
基于块通知的同步Fetcher
Fetcher Start函数中启动协程:
Fetcher模块的queue里面缓存了已经完成fetch的block,等待按照顺序插入到本地的区块链中。优先级别就是他们的区块号,这样区块数小的排在最前面。最后调用insert方法把给定的区块插入本地的区块链。
在insert函数中,有两处广播:一是如果区块头通过验证,那么马上对区块进行广播;二是如果插入成功, 那么广播区块,第二个参数为false,那么只会对区块的hash进行广播。
定时同步syncer
syncer中会定时的同BestPeer()来同步信息: 当有新的Peer增加的时候 会同步, 这个时候可能触发区块广播; 定时触发 10秒一次。
3. 发送区块内容SendNewBlock
参照SendNewBlockHashes的处理流程。
4. 发送区块头信息SendBlockHeaders
在通过握手后runPeer时,会运行protocol的run函数,接着调用startProtocols函数,进而进入NewProtocolManager的时候定义的Run,每一个SubProtocols都有一个Run。
这个run方法首先创建了一个peer对象,然后调用了handle方法来处理这个peer。注意,这里的peer区别于p2p中的peer,但是它包含p2p的peer。
在handle最后,循环调用handleMsg, 这个方法很长,主要是处理接收到各种消息之后的应对措施。
对于GetBlockHeadersMsg的消息处理,结果调用SendBlockHeaders返回给对端:
首先解码msg,解析出getBlockHeadersData结构体。
查找方式:
从Hash指定的开始朝创世区块移动,也就是反向移动。
从Hash指定的开始正向移动。
通过Number反向查找。
通过Number正向查找。
查找结果发给对端:
5. 发送区块体信息SendBlockBodies
没有用到。
6. RLP编码发送区块体信息SendBlockBodiesRLP
调用流程参考“4 发送区块头信息SendBlockHeaders”。
收到GetBlockBodiesMsg,解析msg信息,组织bodies并发送给对端。
7. 发送节点信息SendNodeData
调用流程参考“4 发送区块头信息SendBlockHeaders”。
GetNodeDataMsg对应的协议版本要大于等于eth63.
8. RLP编码发送节点信息SendReceiptsRLP
调用流程参考“4 发送区块头信息SendBlockHeaders”。
9. 请求一个区块头RequestOneHeader
调用流程参考“4 发送区块头信息SendBlockHeaders”。
10. 通过Hash请求区块头RequestHeadersByHash
首先,在协议初始化的时候,调用protocolManager.Start
之后启动syncer(), syncer中会定时的同BestPeer()来同步信息: 当有新的Peer增加的时候 会同步; 定时触发 10秒一次同步。
)
pm.synchronise会调用中 Downloader中的同步函数。 Synchronise试图和一个peer来同步,如果同步过程中遇到一些错误,那么会删除掉Peer。然后会被重试。
最后,在syncWithPeer中会启动几个fetcher 分别负责header,bodies,receipts处理。spawnSync给每个fetcher启动一个goroutine, 然后阻塞的等待fetcher出错。
在fetchHeight中,会发出RequestHeadersByHash请求。
fetchHeaders方法用来获取header。 然后根据获取的header去获取body和receipt等信息。fetchHeaders不断的重复这样的操作,发送header请求,等待所有的返回,直到完成所有的header请求。
11. 通过Number请求区块头RequestHeadersByNumber
调用流程参考“10 通过Hash请求区块头RequestHeadersByHash”。
12. 请求区块体RequestBodies
调用流程参考“10 通过Hash请求区块头RequestHeadersByHash”。
13. 请求收据RequestReceipts
调用流程参考“10 通过Hash请求区块头RequestHeadersByHash”。
14. 请求节点信息RequestNodeData
在创建Downloader的时候,会同时启动协程 startFetcher,进而启动runStateSync。
15. 握手Handshake
head是当前的区块头,genesis是创世区块的信息,只有创世区块相同才能握手成功。如果接收到任何一个错误(发送,接收),或者是超时,那么就断开连接,握手失败。
readStatus,检查对端返回的各种情况。
感谢HPB团队整理。
推荐阅读