欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Fabric源码分析之三启动流程代Orderer分析

程序员文章站 2022-07-14 16:02:21
...

一、排序节点的启动

本来是想把Peer和Orderer放到一起,结果发现内容太多了,不得不拆开。其实明白了Peer的流程,Orderer也就差不多了,不同的是由于功能的不同,启动的服务肯定有所不同,诸如通信、数据库等。但到了底层,其生成的方式应该类似,下面就看一下相关的排序节点的源码。

二、启动流程

如果觉得Peer的启动入口函数有点简单,那么Orderer更简单:
1、启动

func main() {
	server.Main()
}

所以就是想在这儿偷懒也没办法了,否则这分析就没法分析了,只好跳到调用的server.Main里看看

func Main() {
	//kingpin是一个命令行的解析工具,正好和Peer中的cobra相辅相成
	fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

	// "version" command
	if fullCmd == version.FullCommand() {
		fmt.Println(metadata.GetVersionInfo())
		return
	}

  //拿到相关的配置文件数据 .yaml
	conf, err := localconfig.Load()
	if err != nil {
		logger.Error("failed to parse config: ", err)
		os.Exit(1)
	}

	//初始化日志
	initializeLogging()
	//初始化本地MSP
	initializeLocalMsp(conf)

  //格式化打印
	prettyPrintStruct(conf)
	//按配置启动命令
	Start(fullCmd, conf)
}

看代码,仍然是挺简单的,所以这时要提高警惕了,当代码一直简单下来,会误导你的大脑分析也简单化。可是,这是不可能的,Order可是整个区块链系统中最重要的一部分——出块可是它来做的。
初始化日志和格式化打印暂时不关心,初始化本地MSP(包括签名证书啥的),其实就是调用mgmt.go中的loadLocaMSP,然后Setup就可以了。这个放到后面MSP相关来分析。
那么,只有最后一个函数Start,是让人觉得有干货的地方。先把代码晾一下:

func Start(cmd string, conf *localconfig.TopLevel) {
	//命令和配置文件被传递进来
	//引导块(创世块)校验
	bootstrapBlock := extractBootstrapBlock(conf)
	if err := ValidateBootstrapBlock(bootstrapBlock); err != nil {
		logger.Panicf("Failed validating bootstrap block: %v", err)
	}

  //处理相关的配置信息和监听信息
	opsSystem := newOperationsSystem(conf.Operations, conf.Metrics)
	err := opsSystem.Start()  //启动监听
	if err != nil {
		logger.Panicf("failed to initialize operations subsystem: %s", err)
	}
	defer opsSystem.Stop()
	metricsProvider := opsSystem.Provider

  //创建帐本工厂
	lf, _ := createLedgerFactory(conf, metricsProvider)
	sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
	clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)
  //集群类型
	clusterType := isClusterType(clusterBootBlock)
	//创建MSP本地签名实例对象---空对象
	signer := localmsp.NewSigner()

  //初始化集群客户配置,签名、私钥、证书和TLS等
	clusterClientConfig := initializeClusterClientConfig(conf, clusterType, bootstrapBlock)
	clusterDialer := &cluster.PredicateDialer{
		ClientConfig: clusterClientConfig,
	}
  //创建复制对象--集群中使用
	r := createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer)
	// Only clusters that are equipped with a recent config block can replicate.
	if clusterType && conf.General.GenesisMethod == "file" {
		r.replicateIfNeeded(bootstrapBlock)
	}
  //同前,创建观察者并监控相关信息到日志
	logObserver := floggingmetrics.NewObserver(metricsProvider)
	flogging.Global.SetObserver(logObserver)

  //利用前面生的配置初始GRPC服务---构造CA证书组件对象
	serverConfig := initializeServerConfig(conf, metricsProvider)
	//这当中会利用SecureOptions、KeepaliveOptions来保存TLS的公私钥,C/S两端的证书以及
	//呼应时间、等待时间等。
	grpcServer := initializeGrpcServer(conf, serverConfig)
	//证书支持
	caSupport := &comm.CredentialSupport{
		AppRootCAsByChain:           make(map[string]comm.CertificateBundle),
		OrdererRootCAsByChainAndOrg: make(comm.OrgRootCAs),
		ClientRootCAs:               serverConfig.SecOpts.ClientRootCAs,
	}

	clusterServerConfig := serverConfig
	clusterGRPCServer := grpcServer
	if clusterType {
		clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, grpcServer, ioutil.ReadFile)
	}

	var servers = []*comm.GRPCServer{grpcServer}
	// If we have a separate gRPC server for the cluster, we need to update its TLS
	// CA certificate pool too.
	//处理集群中的单独GRPC服务器
	if clusterGRPCServer != grpcServer {
		servers = append(servers, clusterGRPCServer)
	}
  //TLS连接认证的回调函数,更新每个通道的TLS客户端CA证书
	tlsCallback := func(bundle *channelconfig.Bundle) {
		// only need to do this if mutual TLS is required or if the orderer node is part of a cluster
		if grpcServer.MutualTLSRequired() || clusterType {
			logger.Debug("Executing callback to update root CAs")
			updateTrustedRoots(caSupport, bundle, servers...)
			if clusterType {
				updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
			}
		}
	}

  //生成新的签名头
	sigHdr, err := signer.NewSignatureHeader()
	if err != nil {
		logger.Panicf("Failed creating a signature header: %v", err)
	}

	expirationLogger := flogging.MustGetLogger("certmonitor")
	crypto.TrackExpiration(
		serverConfig.SecOpts.UseTLS,
		serverConfig.SecOpts.Certificate,
		[][]byte{clusterClientConfig.SecOpts.Certificate},
		sigHdr.Creator,
		expirationLogger.Warnf, // This can be used to piggyback a metric event in the future
		time.Now(),
		time.AfterFunc)

  //多通道注册初始化--创建数据存储路径,包括索引数据库和通道区块数据
	manager := initializeMultichannelRegistrar(clusterBootBlock, r, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
	//设置TLS双向认证标志
	mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
	expiration := conf.General.Authentication.NoExpirationChecks
	//创建服排序Order服务
	server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS, expiration)

	logger.Infof("Starting %s", metadata.GetVersionInfo())
	go handleSignals(addPlatformSignals(map[os.Signal]func(){
		syscall.SIGTERM: func() {
			grpcServer.Stop()
			if clusterGRPCServer != grpcServer {
				clusterGRPCServer.Stop()
			}
		},
	}))

	if clusterGRPCServer != grpcServer {
		logger.Info("Starting cluster listener on", clusterGRPCServer.Address())
		//启动集群单独GRPC服务
		go clusterGRPCServer.Start()
	}
  //初始化Profile服务,用来启动监听,在Peer中有类似代码
	initializeProfilingService(conf)
	//将Order排序服务注册到GRP服务中
	ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
	logger.Info("Beginning to serve requests")
	//启动GRPC服务,开始监听Peer请求
	grpcServer.Start()
}

已经应该对大段代码免疫了。这一大块代码里面还隐藏着很多的小模块,下面继续分析帐本部分:

//引导块(创世块)
func extractBootstrapBlock(conf *localconfig.TopLevel) *cb.Block {
	var bootstrapBlock *cb.Block

	// Select the bootstrapping mechanism
	switch conf.General.GenesisMethod {
	case "provisional":
		bootstrapBlock = encoder.New(genesisconfig.Load(conf.General.GenesisProfile)).GenesisBlockForChannel(conf.General.SystemChannel)
	case "file":
		bootstrapBlock = file.New(conf.General.GenesisFile).GenesisBlock()
	default:
		logger.Panic("Unknown genesis method:", conf.General.GenesisMethod)
	}

	return bootstrapBlock
}
func New(fileName string) bootstrap.Helper {
	return &fileBootstrapper{
		//此处需要解析用工具创建的创世块文件
		GenesisBlockFile: fileName,
	}
}
type Block struct {
	Header               *BlockHeader   `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
	Data                 *BlockData     `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	Metadata             *BlockMetadata `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"`
	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
	XXX_unrecognized     []byte         `json:"-"`
	XXX_sizecache        int32          `json:"-"`
}
func (b *fileBootstrapper) GenesisBlock() *cb.Block {
	bootstrapFile, fileErr := ioutil.ReadFile(b.GenesisBlockFile)
	if fileErr != nil {
		panic(errors.Errorf("unable to bootstrap orderer. Error reading genesis block file: %v", fileErr))
	}
	genesisBlock := &cb.Block{}
	unmarshallErr := proto.Unmarshal(bootstrapFile, genesisBlock)
	if unmarshallErr != nil {
		panic(errors.Errorf("unable to bootstrap orderer. Error unmarshalling genesis block: %v", unmarshallErr))

	}
	return genesisBlock
}
//帐本工厂
func createLedgerFactory(conf *config.TopLevel, metricsProvider metrics.Provider) (blockledger.Factory, string) {
	var lf blockledger.Factory
	var ld string
	switch conf.General.LedgerType {
	case "file":
		ld = conf.FileLedger.Location
		if ld == "" {
			ld = createTempDir(conf.FileLedger.Prefix)
		}
		logger.Debug("Ledger dir:", ld)
		//创建一个新的帐本对象-不同的通道保有自己的帐本,保存在不同的子目录下
		lf = fileledger.New(ld, metricsProvider)
		// The file-based ledger stores the blocks for each channel
		// in a fsblkstorage.ChainsDir sub-directory that we have
		// to create separately. Otherwise the call to the ledger
		// Factory's ChainIDs below will fail (dir won't exist).
		createSubDir(ld, fsblkstorage.ChainsDir)
	case "json":
		ld = conf.FileLedger.Location
		if ld == "" {
			ld = createTempDir(conf.FileLedger.Prefix)
		}
		logger.Debug("Ledger dir:", ld)
		lf = jsonledger.New(ld)
	case "ram":
		fallthrough
	default:
		//内存型帐本
		lf = ramledger.New(int(conf.RAMLedger.HistorySize))
	}
	return lf, ld
}
func NewProvider(conf *Conf, indexConfig *blkstorage.IndexConfig, metricsProvider metrics.Provider) blkstorage.BlockStoreProvider {
	p := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: conf.getIndexDir()})
	// create stats instance at provider level and pass to newFsBlockStore
	stats := newStats(metricsProvider)
	return &FsBlockstoreProvider{conf, indexConfig, p, stats}
}

再看一下多通道的注册初始化:

func initializeMultichannelRegistrar(
	bootstrapBlock *cb.Block,
	ri *replicationInitiator,
	clusterDialer *cluster.PredicateDialer,
	srvConf comm.ServerConfig,
	srv *comm.GRPCServer,
	conf *localconfig.TopLevel,
	signer crypto.LocalSigner,
	metricsProvider metrics.Provider,
	healthChecker healthChecker,
	lf blockledger.Factory,
	callbacks ...channelconfig.BundleActor,
) *multichannel.Registrar {
	//通过创世块获得相关ID
	genesisBlock := extractBootstrapBlock(conf)
	// Are we bootstrapping?
	if len(lf.ChainIDs()) == 0 {
		initializeBootstrapChannel(genesisBlock, lf)
	} else {
		logger.Info("Not bootstrapping because of existing channels")
	}

	consenters := make(map[string]consensus.Consenter)

	//传入帐本工厂对象,共识等信息
	registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, callbacks...)

	consenters["solo"] = solo.New()
	var kafkaMetrics *kafka.Metrics
	consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker)
	// Note, we pass a 'nil' channel here, we could pass a channel that
	// closes if we wished to cleanup this routine on exit.
	go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
	if isClusterType(bootstrapBlock) {
		//RAFT出现了,在这里
		initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
	}
	registrar.Initialize(consenters)
	return registrar
}
func initializeBootstrapChannel(genesisBlock *cb.Block, lf blockledger.Factory) {
	//具体的取得通道ID在这里
	chainID, err := utils.GetChainIDFromBlock(genesisBlock)
	if err != nil {
		logger.Fatal("Failed to parse channel ID from genesis block:", err)
	}
	//继续获取帐本对象
	gl, err := lf.GetOrCreate(chainID)
	if err != nil {
		logger.Fatal("Failed to create the system channel:", err)
	}

  //将通过通道ID获取的通道帐本内加入创世区块
	//gl.Append(genesisBlock) => fsBlockStore.AddBlock(block
	//*common.Block)=>blockfileMgr.addBlock(block *common.Block)
	//首先使用ProtoBuf将创世块文件序列化为字节数组,根据大小判断新区块是否生成(前面提到的64M)
	//将区块长度加入区块文件,将上面的序列化数组加入区块文件,
	//更新索引数据库,更新检查点和区块链信息
	if err := gl.Append(genesisBlock); err != nil {
		logger.Fatal("Could not write genesis block to ledger:", err)
	}
}

其实可以发现,通过解析创世块,通过其得到通道ID是很多初始化函数都调用的,这个有点意思。GetOrCreate接口有四个继承实现的地方,都在common/ledger/blockledger下面的四个目录内。它调用blkstorageProvider.OpenBlockStore然后再调用common/ledger/blkstroage/fs_blockstroe_provider.go中的OpenBlockStore。即:

func (flf *fileLedgerFactory) GetOrCreate(chainID string) (blockledger.ReadWriter, error) {
	flf.mutex.Lock()
	defer flf.mutex.Unlock()

	key := chainID
	// check cache
	ledger, ok := flf.ledgers[key]
	if ok {
		return ledger, nil
	}
	// open fresh
	blockStore, err := flf.blkstorageProvider.OpenBlockStore(key)
	if err != nil {
		return nil, err
	}
	ledger = NewFileLedger(blockStore)
	flf.ledgers[key] = ledger  //存储下面代码生成的fsBlockStore对象实例
	return ledger, nil
}
type DBHandle struct {
	dbName string
	db     *DB
}
func (p *FsBlockstoreProvider) OpenBlockStore(ledgerid string) (blkstorage.BlockStore, error) {
	indexStoreHandle := p.leveldbProvider.GetDBHandle(ledgerid)
	return newFsBlockStore(ledgerid, p.conf, p.indexConfig, indexStoreHandle, p.stats), nil
}
type fsBlockStore struct {
	id      string
	conf    *Conf
	//前边的事件注册
	fileMgr *blockfileMgr
	stats   *ledgerStats
}
type blockfileMgr struct {
	//通道ID目录
	rootDir           string
	//配置文件内容:order_data目录和最大块大小,默认64M
	conf              *Conf
	//通过通道ID得到LEVELDB的句柄,即通道ID和LEVELDB二者绑定
	db                *leveldbhelper.DBHandle
	//区块链索引结构体,实现了Index接口
	index             index
	//区块检查点相关信息
	cpInfo            *checkpointInfo
	//信息访问的互斥锁
	cpInfoCond        *sync.Cond
	//区块的实际路径和句柄
	currentFileWriter *blockfileWriter
	//区块简要信息,高度,HASH,上一块的HASH
	bcInfo            atomic.Value
}
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
	dbHandle *leveldbhelper.DBHandle, stats *stats) *fsBlockStore {
	//创建fileMgr
	fileMgr := newBlockfileMgr(id, conf, indexConfig, dbHandle)

	// create ledgerStats and initialize blockchain_height stat
	ledgerStats := stats.ledgerStats(id)
	//获取相关对象
	info := fileMgr.getBlockchainInfo()
	ledgerStats.updateBlockchainHeight(info.Height)

	return &fsBlockStore{id, conf, fileMgr, ledgerStats}
}
//newBlockfileMgr函数调用下面这个函数生成索引数据库
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) (*blockIndex, error) {
	indexItems := indexConfig.AttrsToIndex
	logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
	indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
	for _, indexItem := range indexItems {
		indexItemsMap[indexItem] = true
	}
	// This dependency is needed because the index 'IndexableAttrTxID' is used for detecting the duplicate txid
	// and the results are reused in the other two indexes. Ideally, all three indexes should be merged into one
	// for efficiency purpose - [FAB-10587]
	if (indexItemsMap[blkstorage.IndexableAttrTxValidationCode] || indexItemsMap[blkstorage.IndexableAttrBlockTxID]) &&
		!indexItemsMap[blkstorage.IndexableAttrTxID] {
		return nil, errors.Errorf("dependent index [%s] is not enabled for [%s] or [%s]",
			blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockTxID)
	}
	return &blockIndex{indexItemsMap, db}, nil
}

其实从上面的代码中就可以看出拿到通道ID后就可以拿到相应的帐本,就可以构建ChainSupport,而有了它,就可以将相关的对象以KV形式存储起来。下面是它的定义:

type ChainSupport struct {
	//账本资源对象含通道配置资源对象(configResources类 型)与区块账本对象(FileLedger类型)
	 *ledgerResources  
 //过滤通道消息,默认有四个:
 //Empty-RejectRule拒绝空消息过滤器
 //expirationRejectRule拒绝过期的签名者身份证书的过滤器
 //MaxBytesRule检验消息最大长度(默认 98MB)的过滤器
 //sigFilter验证消息签名是否满足ChannelWriters(/Channel/Writers)通道写权限策略要求的过滤器
	 msgprocessor.Processor
 // 构造新区块并提交区块文件,创建新通道和更新通道配置。
 //初始化时设置最新的区块号lastBlock、通道配置序号lastConfigSeq、 最新的配置区块
 //号lastConfigBlockNum、多通道注册管理器Registrar对象(创建新应用通道)以及关联通道的
 //链支持对象(更新通道配置)
	 *BlockWriter
 //共识排序服务对交易排序,然后提交到缓存交易消息列表,
 //执行打包出块、通道管理等操作
	 consensus.Chain
 //消息切割组件
 //获取指定通道上 的Orderer配置,包含共识组件类型、交易出块周期时间、区块最大字节数、通道限制参///数(如通道数 量)等。
 //基于该配置创建消息切割组件(receiver类型),按出块规则切分将本地的缓存交易消息列表
 //交由区块账本写组件构造新区块,并提交到 账本区块文件
	 cutter blockcutter.Receiver
	 // 本地签名人
	 crypto.LocalSigner
}

chain := newChainSupport(  // 构造应用通道的链支持对象
	 r,
	 ledgerResources,
	 consenters,
	 signer)
r.chains[chainID] = chain // 将链支持对象注册到多通道管理器

chain.start()  // 启动链支持对象

2、广播服务
广播服务分为以下几种:
正常的交易信息,新增通道信息,更新通道信息,看正面的代码:

func NewServer(
	r *multichannel.Registrar,
	metricsProvider metrics.Provider,
	debug *localconfig.Debug,
	timeWindow time.Duration,
	mutualTLS bool,
	expirationCheckDisabled bool,
) ab.AtomicBroadcastServer {
	s := &server{
		dh: deliver.NewHandler(
			deliverSupport{Registrar: r},
			timeWindow,
			mutualTLS,
			deliver.NewMetrics(metricsProvider),
			expirationCheckDisabled,
		),
		bh: &broadcast.Handler{
			SupportRegistrar: broadcastSupport{Registrar: r},
			Metrics:          broadcast.NewMetrics(metricsProvider),
		},
		debug:     debug,
		Registrar: r,
	}
	return s
}
//接收客户端消息
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
	logger.Debugf("Starting new Broadcast handler")
	defer func() {
		if r := recover(); r != nil {
			logger.Criticalf("Broadcast client triggered panic: %s\n%s", r, debug.Stack())
		}
		logger.Debugf("Closing Broadcast stream")
	}()
	return s.bh.Handle(&broadcastMsgTracer{
		AtomicBroadcast_BroadcastServer: srv,
		msgTracer: msgTracer{
			debug:    s.debug,
			function: "Broadcast",
		},
	})
}
//处理消息-类似于比特币或EOS的消息处理中心
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
	addr := util.ExtractRemoteAddress(srv.Context())
	logger.Debugf("Starting new broadcast loop for %s", addr)
	for {
		msg, err := srv.Recv()
		if err == io.EOF {
			logger.Debugf("Received EOF from %s, hangup", addr)
			return nil
		}
		if err != nil {
			logger.Warningf("Error reading from %s: %s", addr, err)
			return err
		}

    //验证消息,开始排序
		resp := bh.ProcessMessage(msg, addr)
		err = srv.Send(resp)
		if resp.Status != cb.Status_SUCCESS {
			return err
		}

		if err != nil {
			logger.Warningf("Error sending to %s: %s", addr, err)
			return err
		}
	}

}

3、分发服务
分发服务包手以下几个部分:
接收客户端发来的请求,解析客户端消息.看下面的代码:

func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
	logger.Debugf("Starting new Deliver handler")
	defer func() {
		if r := recover(); r != nil {
			logger.Criticalf("Deliver client triggered panic: %s\n%s", r, debug.Stack())
		}
		logger.Debugf("Closing Deliver stream")
	}()

	policyChecker := func(env *cb.Envelope, channelID string) error {
		chain := s.GetChain(channelID)
		if chain == nil {
			return errors.Errorf("channel %s not found", channelID)
		}
		// In maintenance mode, we typically require the signature of /Channel/Orderer/Readers.
		// This will block Deliver requests from peers (which normally satisfy /Channel/Readers).
		sf := msgprocessor.NewSigFilter(policies.ChannelReaders, policies.ChannelOrdererReaders, chain)
		return sf.Apply(env)
	}
	deliverServer := &deliver.Server{
		PolicyChecker: deliver.PolicyCheckerFunc(policyChecker),
		Receiver: &deliverMsgTracer{
			Receiver: srv,
			msgTracer: msgTracer{
				debug:    s.debug,
				function: "Deliver",
			},
		},
		ResponseSender: &responseSender{
			AtomicBroadcast_DeliverServer: srv,
		},
	}
	return s.dh.Handle(srv.Context(), deliverServer)
}
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
	addr := util.ExtractRemoteAddress(ctx)
	logger.Debugf("Starting new deliver loop for %s", addr)
	h.Metrics.StreamsOpened.Add(1)
	defer h.Metrics.StreamsClosed.Add(1)
	for {
		logger.Debugf("Attempting to read seek info message from %s", addr)
		envelope, err := srv.Recv()
		if err == io.EOF {
			logger.Debugf("Received EOF from %s, hangup", addr)
			return nil
		}
		if err != nil {
			logger.Warningf("Error reading from %s: %s", addr, err)
			return err
		}

    //分发区块信息
		status, err := h.deliverBlocks(ctx, srv, envelope)
		if err != nil {
			return err
		}

		err = srv.SendStatusResponse(status)
		if status != cb.Status_SUCCESS {
			return err
		}
		if err != nil {
			logger.Warningf("Error sending to %s: %s", addr, err)
			return err
		}

		logger.Debugf("Waiting for new SeekInfo from %s", addr)
	}
}

通过分析客户端的消息,获取帐本上的相关区块数据并回复给客户端,如果尚未得到请求的区块,则一直阻塞直到得到相关请求信息。

4、共识相关
这里主要是共识的服务启动和相关的排序切割等,看代码:

//在前面的initializeMultichannelRegistrar调用了registrar.Initialize(consenters)
func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
	r.consenters = consenters
	existingChains := r.ledgerFactory.ChainIDs()

	for _, chainID := range existingChains {
		rl, err := r.ledgerFactory.GetOrCreate(chainID)
		if err != nil {
			logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
		}
		configTx := configTx(rl)
		if configTx == nil {
			logger.Panic("Programming error, configTx should never be nil here")
		}
		ledgerResources := r.newLedgerResources(configTx)
		chainID := ledgerResources.ConfigtxValidator().ChainID()

		if _, ok := ledgerResources.ConsortiumsConfig(); ok {
			if r.systemChannelID != "" {
				logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
			}

			chain := newChainSupport(
				r,
				ledgerResources,
				r.consenters,
				r.signer,
				r.blockcutterMetrics,
			)
			r.templator = msgprocessor.NewDefaultTemplator(chain)
			chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain, r.config))

			// Retrieve genesis block to log its hash. See FAB-5450 for the purpose
			iter, pos := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}})
			defer iter.Close()
			if pos != uint64(0) {
				logger.Panicf("Error iterating over system channel: '%s', expected position 0, got %d", chainID, pos)
			}
			genesisBlock, status := iter.Next()
			if status != cb.Status_SUCCESS {
				logger.Panicf("Error reading genesis block of system channel '%s'", chainID)
			}
			logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s",
				chainID, genesisBlock.Header.Hash(), chain.SharedConfig().ConsensusType())

			r.chains[chainID] = chain
			r.systemChannelID = chainID
			r.systemChannel = chain
			// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
			defer chain.start()
		} else {
			logger.Debugf("Starting chain: %s", chainID)
			chain := newChainSupport(
				r,
				ledgerResources,
				r.consenters,
				r.signer,
				r.blockcutterMetrics,
			)
			r.chains[chainID] = chain
			//启动共识-共识有多个版本,都在order/consensus目录下,启动也是相关部分
			chain.start()
		}

	}
......
}
func (cs *ChainSupport) start() {
	cs.Chain.Start()
}
func (chain *chainImpl) Start() {
	//实际启动相关服务
	go startThread(chain)
}
func startThread(chain *chainImpl) {
	var err error

	// Create topic if it does not exist (requires Kafka v0.10.1.0)
	err = setupTopicForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.consenter.topicDetail(), chain.channel)
	if err != nil {
		// log for now and fallback to auto create topics setting for broker
		logger.Infof("[channel: %s]: failed to create Kafka topic = %s", chain.channel.topic(), err)
	}

......
	chain.processMessagesToBlocks() // Keep up to date with the channel
}
//RAFT
func initializeEtcdraftConsenter(
	consenters map[string]consensus.Consenter,
	conf *localconfig.TopLevel,
	lf blockledger.Factory,
	clusterDialer *cluster.PredicateDialer,
	bootstrapBlock *cb.Block,
	ri *replicationInitiator,
	srvConf comm.ServerConfig,
	srv *comm.GRPCServer,
	registrar *multichannel.Registrar,
	metricsProvider metrics.Provider,
) {
	replicationRefreshInterval := conf.General.Cluster.ReplicationBackgroundRefreshInterval
	if replicationRefreshInterval == 0 {
		replicationRefreshInterval = defaultReplicationBackgroundRefreshInterval
	}

	systemChannelName, err := utils.GetChainIDFromBlock(bootstrapBlock)
	if err != nil {
		ri.logger.Panicf("Failed extracting system channel name from bootstrap block: %v", err)
	}
	systemLedger, err := lf.GetOrCreate(systemChannelName)
	if err != nil {
		ri.logger.Panicf("Failed obtaining system channel (%s) ledger: %v", systemChannelName, err)
	}
	getConfigBlock := func() *cb.Block {
		return multichannel.ConfigBlock(systemLedger)
	}

	exponentialSleep := exponentialDurationSeries(replicationBackgroundInitialRefreshInterval, replicationRefreshInterval)
	ticker := newTicker(exponentialSleep)

	icr := &inactiveChainReplicator{
		logger:                            logger,
		scheduleChan:                      ticker.C,
		quitChan:                          make(chan struct{}),
		replicator:                        ri,
		chains2CreationCallbacks:          make(map[string]chainCreation),
		retrieveLastSysChannelConfigBlock: getConfigBlock,
		registerChain:                     ri.registerChain,
	}

	// Use the inactiveChainReplicator as a channel lister, since it has knowledge
	// of all inactive chains.
	// This is to prevent us pulling the entire system chain when attempting to enumerate
	// the channels in the system.
	ri.channelLister = icr

	go icr.run()
	raftConsenter := etcdraft.New(clusterDialer, conf, srvConf, srv, registrar, icr, metricsProvider)
	consenters["etcdraft"] = raftConsenter
}

正面就是排序和切割:

func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool) {
	if len(r.pendingBatch) == 0 {
		// We are beginning a new batch, mark the time
		r.PendingBatchStartTime = time.Now()
	}

	ordererConfig, ok := r.sharedConfigFetcher.OrdererConfig()
	if !ok {
		logger.Panicf("Could not retrieve orderer config to query batch parameters, block cutting is not possible")
	}

	batchSize := ordererConfig.BatchSize()

	messageSizeBytes := messageSizeBytes(msg)
	if messageSizeBytes > batchSize.PreferredMaxBytes {
		logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, batchSize.PreferredMaxBytes)

		// cut pending batch, if it has any messages
		if len(r.pendingBatch) > 0 {
			messageBatch := r.Cut()
			messageBatches = append(messageBatches, messageBatch)
		}

		// create new batch with single message
		messageBatches = append(messageBatches, []*cb.Envelope{msg})

		// Record that this batch took no time to fill
		r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(0)

		return
	}

	messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > batchSize.PreferredMaxBytes

	if messageWillOverflowBatchSizeBytes {
		logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
		logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
		messageBatch := r.Cut()
		r.PendingBatchStartTime = time.Now()
		messageBatches = append(messageBatches, messageBatch)
	}

	logger.Debugf("Enqueuing message into batch")
	r.pendingBatch = append(r.pendingBatch, msg)
	r.pendingBatchSizeBytes += messageSizeBytes
	pending = true

	if uint32(len(r.pendingBatch)) >= batchSize.MaxMessageCount {
		logger.Debugf("Batch size met, cutting batch")
		messageBatch := r.Cut()
		messageBatches = append(messageBatches, messageBatch)
		pending = false
	}

	return
}

// Cut returns the current batch and starts a new one
func (r *receiver) Cut() []*cb.Envelope {
	r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds())
	r.PendingBatchStartTime = time.Time{}
	batch := r.pendingBatch
	r.pendingBatch = nil
	r.pendingBatchSizeBytes = 0
	return batch
}

代码真心的不少,需要认真的分析。

三、总结

Orderer节点要比Peer复杂一些,但基本上也是按套路在走。麻烦在于细节太多,很多细小的部分可能把握不太准确,还需要推敲。

相关标签: blockchain