以太坊主动数据同步简析(以太坊主动数据同步简析)
开门见山,以太坊数据同步是以节点(peer)作为数据载体存放和传输主要以Header,Body,Reciept组成的数据主体,通过以太坊p2p通信协议管理数据同步事务,最后交给执行器(主动同步会交给Downloader,被动同步会交给Fetcher)执行最后的数据下载任务。
那么问起数据主体我们进行同步需要同步哪些数据?不同数据又是否需要分类同步?
本实验室 进行了全面而详细的分析。
二、数据主体:进行数据同步首先需要明确我们同步的数据主体由什么组成,笼统来分有两类,一类不需要节点发送主动同步请求,节点会在完成数据打包的时候自行向网络广播,这一类数据有三种——完整block,区块Hash,和交易Transaction。而我们今天重点探讨下一类数据。
第二类数据是需要节点主动发送同步请求随后其他节点响应才能进行同步的数据,除了第一类数据其他需要同步的数据都属于第二类,所以可以说他的种类更加繁杂,而其中最重要的也有三种——区块头Header,区块体Body,交易回执Receipt(具体代码⬇⬇⬇),跟第一类不同这三者可以说都是完整区块的某一组成部分,这也显示了主动同步希望节点进行数据同步时范围自由可控,能够优先同步必要数据而放弃同步暂时非必要的数据。
type Header struct {
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
UncleHash common.Hash `json:"sha3Uncles" gencodec:"required"`
Coinbase common.Address `json:"miner" gencodec:"required"`
root common.Hash `json:"stateRoot" gencodec:"required"`
TxHash common.Hash `json:"transactionsRoot" gencodec:"required"`
ReceiptHash common.Hash `json:"receiptsRoot" gencodec:"required"`
Bloom Bloom `json:"logsBloom" gencodec:"required"`
Difficulty *big.Int `json:"difficulty" gencodec:"required"`
number *big.Int `json:"number" gencodec:"required"`
GasLimit uint64 `json:"gasLimit" gencodec:"required"`
GasUsed uint64 `json:"gasUsed" gencodec:"required"`
Time uint64 `json:"timestamp" gencodec:"required"`
Extra []byte `json:"extraData" gencodec:"required"`
MixDigest common.Hash `json:"mixHash"`
Nonce BlockNonce `json:"nonce"`
}
⬆Header表示区块头,在区块里扮演这相当重要的角色,ParentHash会记录上一个区块的区块hash,uncleHash会记录叔块hash,coinbase则标识矿工地址。Root,Txhash,ReceiptHash这三个分别是state trie,tx trie,Receipt.trie三个前缀树的根节点RLP编码hash一个用于描述世界状态,一个用于描述交易,一个用于描述交易回执。bloom是区块头中的布隆过滤器用于快速判断目标hash是否在某个集合中。Header中以上数据都是区块头用来对区块以及区块中重要数据模块利用hash进行标识的。剩下的数据则各有他用不细赘述。可以看出一个区块头其实已经将一个区块框架基本描述出来了这也为后面谈到的数据同步策略埋下了伏笔。
type Body struct {
transactions []*Transaction
Uncles []*Header
}
⬆Body就比较简明了,里面包含一组交易对象,和一个叔块的区块头数组。
type Receipt struct {
// Consensus fields: These fields are defined by the Yellow Paper
PostState []byte `json:"root"`
Status uint64 `json:"status"`
CumulativeGasUsed uint64 `json:"cumulativeGasUsed" gencodec:"required"`
Bloom Bloom `json:"logsBloom" gencodec:"required"`
Logs []*Log `json:"logs" gencodec:"required"`
// Implementation fields: These fields are added by geth when processing a transaction.
// They are stored in the chain database.
TxHash common.Hash `json:"transactionHash" gencodec:"required"`
ContractAddress common.Address `json:"contractAddress"`
GasUsed uint64 `json:"gasUsed" gencodec:"required"`
// Inclusion information: These fields provide information about the inclusion of the
// transaction corresponding to this receipt.
BlockHash common.Hash `json:"blockHash,omitempty"`
BlockNumber *big.Int `json:"blockNumber,omitempty"`
TransactionIndex uint `json:"transactionIndex"`
}
⬆Receipt是区块中所有交易对象在执行完成后生成的一个数组,用于记录交易处理信息。他们会在生成后被逐个插入到Receipt trie中同时也会生成区块头中的ReceiptHash。一个交易回执记录了三部分信息用于交易的管理和标识,第一部分为共识部分,PostState,Status,CumulativeGasUsed,Bloom,Logs只有这五个数据在ReceiptRLP会被encode构成回执hash随后回执hash会参与共识的校验,第二部分数据是交易部分,TxHash指的是交易回执所对应的交易哈希,,ContractAddress是当这笔交易是部署新合约时记录新合约的地址,GasUsed表示该笔交易的Gas使用量,第三部分是区块部分,他记录的是BlockHash区块哈希,BlockNumber当前区块数,TransactionIndex 该交易在区块中的序号。对于Receipt在数据同步策略中有着直观的区别,下文说到的fullSync和fastSync最直观的区别就在于到底是同步对端的Receipt,还是本地自行生成Receipt。
三、数据载体数据传输载体为peer节点,这个数据载体peer,如果你在源码里面去认真寻找你会发现,某些角落里就能找到一个peer.go.首先我列出有关数据同步模块我所找到的peer结构。有p2p/peer.go ,eth/peer.go ,les/peer.go ,eth/downloader/peer.go。以太坊的网络也有传输层,会话层,表示层和协议层。
那p2p包中的peer扮演了一个底层的节点模型,传输层会基于UDP协议发现相邻peer并维持peer连接,还会基于TCP协议建立peer之间的信息交流通道。会话层Peer管理主要管理的是节点和上层子协议的交互,而NodeTable管理主要管理的是底层基于UDP协议构建的节点连接表table。所以p2p这一层的peer节点他需要能获取和开启子协议的能力,还需要有ping通其他节点的能力当然还要去接受其他节点消息。这就体现在p2p/peer.go中的3个重要功能模块,pingLoop,readLoop,startProtocols而这也是peer.run()方法里的3个函数。
func (p *Peer) run() (remoteRequested bool, err error) {
var (
writeStart = make(chan struct{}, 1)
writeErr = make(chan error, 1)
readErr = make(chan error, 1)
reason DiscReason // sent to the peer
)
p.wg.Add(2)
go p.readLoop(readErr)
go p.pingLoop()
// Start all protocol handlers.
writeStart <- struct{}{}
p.startProtocols(writeStart, writeErr)//开启子协议Handle
loop:... // 省略等待error和断开连接的loop代码段
}
close(p.closed)
p.rw.close(reason)
p.wg.Wait()
return remoteRequested, err
}
// Peer represents a connected remote node.
type Peer struct {
rw *conn
running map[string]*protoRW
log log.Logger
created mclock.AbsTime
wg sync.WaitGroup
protoErr chan error
closed chan struct{}
disc chan DiscReason
// events receives message send / receive events if set
events *event.Feed
}
可以看出上面是底层peer结构,下图则是子协议层的peer结构,不同的子协议层会有不同的peer结构,这也就是为什么会有eth/peer.go和les/peer.go毕竟不同子协议应用于不同的数据场景。
type peer struct {
id string
*p2p.Peer //可以看出其实他是对网络层的peer节点进行封装的顶层节点。
rw p2p.MsgReadWriter
version int // protocol version negotiated
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
head common.Hash
td *big.Int
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedProp s chan *propEvent // Queue of blocks to broadcast to the peer
queuedAnns chan *types.Block // Queue of blocks to announce to the peer
term chan struct{} // Termination channel to stop the broadcaster
}
我们可以看出数据载体有底层peer,和子协议层peer,而进行数据同步需要利用通信协议管理连接两层peer,由底层实现peer的消息分发,消息的监听,子协议层peer执行数据的下载和同步,而这两层之间就需要非常重要的protocolManger进行管理和沟通。
四、数据同步流程那么如何启动protocolManager呢?我们上面介绍了peer结构,他主要是代表远程其他节点,在以太坊中还有一类同样是代表了节点的结构Node,他主要代表了本地节点自身会稍微有点区别,而节点想要启动p2p网络就要依靠本地Node模块,其中利用Node.Start()函数开启了两个任务,一个是启动Ethereum Service,其中的Ethereum.start就启动了protocolmanager;另一个是启动p2p.Server,新建并刷新K桶,开启UDP端口监听,同时监听TCP端口,处理从远端节点发来的message。
当protocolManager启动了之后,他就开始需要连接底层逻辑层peer(p2p)和顶层协议层peer(eth):
当启动了protocolManager,首先节点会进行初始化,调用NewProtocolManager,初始化的过程中会调用SubProtocol函数向地城p2p.peer获取一个消息读写通道,然后再构建一个消息处理器Handle,Handle,他不仅会接受请求同步的消息,也会接受节点响应的消息,在protocolManger的start方法中他会启动一个定期同步协程syncer,他会根据消息通知调用fecher或者downloader执行器执行数据同步。
- 数据同步模式
- 主动数据同步指的是本地节点自发的向相邻节点请求区块数据,数据入口在eth/downloader/downloader,和eth/handler里
- 被动数据同步指本地节点收到其他节点数据同步消息(Message)而后请求区块数据,数据入口在eth/fetcher/fetcher.go
- 主动同步流程我们以主动同步为例,当调用downloader执行器时,他会先调用findAncestor找到本地链和远程链的共同祖先,从共同祖先开始同步,他会配置4个fetchers,分别是fetcherHeader,fetcherBodies,fetcherReceipts,ProcessHeaders最后根据同步模式调用收到processFullSyncContent 和processFastSyncContent最后调用spawnSync执行同步。
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
//省略部分代码
...
...
// 获取区块高度
latest, err := d.fetchHeight(p)
if err != nil {
return err
}
height := latest.Number.Uint64()
//获取共同祖先
origin, err := d.findAncestor(p, latest)
if err != nil {
return err
}
//
...
...
//
// 这里他们是为了确保其实中心点在任何快速同步的中心点之前。快速同步使用的是二分查找的方法寻找共同祖先所以这里需要进行验证。
pivot := uint64(0)
if d.mode == FastSync {
if height <= uint64(fsMinFullBlocks) {
origin = 0
} else {
pivot = height - uint64(fsMinFullBlocks)
if pivot <= origin {
origin = pivot - 1
}
}
}
d.committed = 1
if d.mode == FastSync && pivot != 0 {
d.committed = 0
}
//省略部分代码
...
...
//
//构造四个fetcher,获取数据,放在缓存中。
fetchers := []func() error{
func() error { return d.fetchHeaders(p, origin 1, pivot) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin 1) }, // Bodies are retrieved during normal and fast sync
func() error { return d.fetchReceipts(origin 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin 1, pivot, td) },
}
//根据不同的同步模式,快速同步则加入processFastSyncContent进入fetchers,完整同步则加入processFullSyncContent
if d.mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
} else if d.mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
//最后执行同步,
return d.spawnSync(fetchers)
}
func (d *Downloader) spawnSync(fetchers []func() error) error {
//省略部分代码
...
//
//在spawnSync中会执行fetchers中的5个fetcher
for _, fn := range fetchers {
fn := fn
go func() { defer d.cancelWg.Done(); errc <- fn() }()
}
var err error
//这里会监听fetchers的error返回,当执行出错时关闭queue队列并取消下载
for i := 0; i < len(fetchers); i {
if i == len(fetchers)-1 {
// Close the queue when all fetchers have exited.
// This will cause the block processor to end when
// it has processed the queue.
d.queue.Close()
}
if err = <-errc; err != nil {
break
}
}
d.queue.Close()
d.Cancel()
return err
}
以上为同步过程的源码,从源码里可以看出主要的下载任务是集中在他构造的可拓展fetchers里,他是一个fetcher任务队列。而根据不同的数据同步策略他的fetcher任务队列不仅构造不同,fetcher之间的协作流程也不尽相同。
五、数据同步策略fetcher任务队列的作用概括起来就是在进行,数据填充,数据组装,和最后的数据插入。而其中填充什么样的数据,那些数据需要组装,什么时候插入数据,均由一个重要的数据结构mode进行标识,他代表了数据同步策略,主要分为三种。
- lightSync :轻节点同步,数据填充的时候只填充区块头数据,不需要进行数据组装,调用insertHeaderchain直接插入区块头。
- fullSync :全节点同步,数据填充的时候需要填充区块头Header,区块体Body,但不填充交易回执Receipt,将数据组装到结果集Result,之后调用importBlockResults将结果集内数据插入到主链,和lightSync区别在于轻节点同步没有区块体,所以他不会执行和验证交易而fullSync全节点同步会在插入数据的同时执行交易并验证这也是他为什么同步速度慢的原因,之后会自行生成交易回执Receipt,所以他在填充组装的时候不需要Receipt参与。
- fastSync :快速同步,数据填充的时候会填充Header,Body,同时也会填充Receipt,然后组装3个数据,执行数据插入,而在执行数据插入的时候和fullSync,lightSync完全不同,他调用的commitFastSyncData这个函数和fullSync全节点同步调用的importBlockResults不一样他不会执行交易而只是提交同步数据,所以他为了保证安全加了一步验证获取来的交易回执。而这也才是快速同步的一部分,快速同步为了保证数据的安全性古老的区块会使用上述方法同步,对于时间上来说比较新的区块会使用fullSync的同步模式进行同步。用而言之就是fastSync会快速同步一大部分而一小部分还是会按照fullSync的同步模式同步。
以太坊数据同步非常复杂,他涵盖了其网络架构,通信协议管理,数据传输通道的相互配合,数据同步策略的安排,以及加速数据同步速率的算法。其中还有众多细节和逻辑关系。而数据同步是区块链节点和外界通信的基石他还有更多的架构彩蛋令人拍案叫绝。期待后续文章和大家分享。
,
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com