区块链技术开发教程

区块链技术指南,区块链技术的工作原理讲解,区块链技术入门教程

超级账本源码分析:peer服务启动流程

2018-11-26

区块链开发教程,本章节介绍peer node start命令的执行流程。

一、 前言

上一节中我们分析了peer的命令结构,peer的node子命令中添加了一个start命令,这个命令就是启动一个peer的服务,本章节我们分析这个peer node start 命令的执行流程。 

二、 peer 入口node start

var nodeStartCmd = &cobra.Command{
Use:   "start",
Short: "Starts the node.",
Long:  `Starts a node that interacts with the network.`,
RunE: func(cmd *cobra.Command, args []string) error {
return serve(args)
},
}


三、 serve函数

serve函数的主要作用是启动fabric peer的各个服务,其实就是启动了各种GRPC的服务端, 比如EventsServer 服务、chaincodesupport 服务等等。

下表是各个服务的作用描述:


服务

描述

EventsServer 服务

可以提供事件的注册机制,比如可以通过它监听chaincode中的事件

ChaincodeSupport服务

提供chaincode的执行,停止功能

ServerAdmin 服务

提供对服务器、模块日志级别的获取和控制

Endorser 服务

为交易提供背书服务

GossipService 服务

网络中处理gossip消息的接受和发送

func serve(args []string) error {
logger.Infof("Starting %s", version.GetInfo())
ledgermgmt.Initialize()
// Parameter overrides must be processed before any parameters are
// cached. Failures to cache cause the server to terminate immediately.
if chaincodeDevMode {
logger.Info("Running in chaincode development mode")
logger.Info("Disable loading validity system chaincode")
 
viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
 
}
 
if err := peer.CacheConfiguration(); err != nil {
return err
}
 
peerEndpoint, err := peer.GetPeerEndpoint()
if err != nil {
err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
return err
}
 
listenAddr := viper.GetString("peer.listenAddress")
 
secureConfig, err := peer.GetSecureConfig()
if err != nil {
logger.Fatalf("Error loading secure config for peer (%s)", err)
}
peerServer, err := peer.CreatePeerServer(listenAddr, secureConfig)
if err != nil {
logger.Fatalf("Failed to create peer server (%s)", err)
}
 
if secureConfig.UseTLS {
logger.Info("Starting peer with TLS enabled")
// set up CA support
caSupport := comm.GetCASupport()
caSupport.ServerRootCAs = secureConfig.ServerRootCAs
}
 
 //启动eventsHub服务
//TODO - do we need different SSL material for events ?
ehubGrpcServer, err := createEventHubServer(secureConfig)
if err != nil {
grpclog.Fatalf("Failed to create ehub server: %v", err)
}
 
//启动chaincode服务
// enable the cache of chaincode info
ccprovider.EnableCCInfoCache()
 
ccSrv, ccEpFunc := createChaincodeServer(peerServer, listenAddr)
registerChaincodeSupport(ccSrv.Server(), ccEpFunc)
go ccSrv.Start()
 
logger.Debugf("Running peer")
 
 //启动admin服务
// Register the Admin server
pb.RegisterAdminServer(peerServer.Server(), core.NewAdminServer())
 
 //启动admin服务
// Register the Endorser server
serverEndorser := endorser.NewEndorserServer()
pb.RegisterEndorserServer(peerServer.Server(), serverEndorser)
 
// Initialize gossip component
bootstrap := viper.GetStringSlice("peer.gossip.bootstrap")
 
serializedIdentity, err := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
if err != nil {
logger.Panicf("Failed serializing self identity: %v", err)
}
 
 //启动gossip服务
messageCryptoService := peergossip.NewMCS(
peer.NewChannelPolicyManagerGetter(),
localmsp.NewSigner(),
mgmt.NewDeserializersManager())
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
 
// callback function for secure dial options for gossip service
secureDialOpts := func() []grpc.DialOption {
var dialOpts []grpc.DialOption
// set max send/recv msg sizes
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize())))
// set the keepalive options
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...)
 
if comm.TLSEnabled() {
tlsCert := peerServer.ServerCertificate()
dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetPeerCredentials(tlsCert)))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
return dialOpts
}
err = service.InitGossipService(serializedIdentity, peerEndpoint.Address, peerServer.Server(),
messageCryptoService, secAdv, secureDialOpts, bootstrap...)
if err != nil {
return err
}
defer service.GetGossipService().Stop()
 
 //初始化系统链码
//initialize system chaincodes
initSysCCs()
 
//this brings up all the chains (including testchainid)
peer.Initialize(func(cid string) {
logger.Debugf("Deploying system CC, for chain <%s>", cid)
scc.DeploySysCCs(cid)
})
 
logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]",
peerEndpoint.Id, viper.GetString("peer.networkId"), peerEndpoint.Address)
 
// Start the grpc server. Done in a goroutine so we can deploy the
// genesis block if needed.
serve := make(chan error)
 
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
logger.Debugf("sig: %s", sig)
serve <- nil
}()
 
go func() {
var grpcErr error
if grpcErr = peerServer.Start(); grpcErr != nil {
grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
} else {
logger.Info("peer server exited")
}
serve <- grpcErr
}()
 
if err := writePid(config.GetPath("peer.fileSystemPath")+"/peer.pid", os.Getpid()); err != nil {
return err
}
 
// Start the event hub server
if ehubGrpcServer != nil {
go ehubGrpcServer.Start()
}
 
 //如果配置使能了profile,则启动profiing服务
// Start profiling http endpoint if enabled
if viper.GetBool("peer.profile.enabled") {
go func() {
profileListenAddress := viper.GetString("peer.profile.listenAddress")
logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
logger.Errorf("Error starting profiler: %s", profileErr)
}
}()
}
 
logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]",
peerEndpoint.Id, viper.GetString("peer.networkId"), peerEndpoint.Address)
 
// set the logging level for specific modules defined via environment
// variables or core.yaml
overrideLogModules := []string{"msp", "gossip", "ledger", "cauthdsl", "policies", "grpc"}
for _, module := range overrideLogModules {
err = common.SetLogLevelFromViper(module)
if err != nil {
logger.Warningf("Error setting log level for module '%s': %s", module, err.Error())
}
}
 
flogging.SetPeerStartupModulesMap()
 
// Block until grpc server exits
 //阻塞直到grpc退出
return <-serve
}
```

3.1 Admin服务

Admin服务实现对服务器状态、模块日志级别的获取和控制,注册函数RegisterAdminServer在fabric/protos/peer/admin.pb.go中:

// Server API for Admin service
 
type AdminServer interface {
 // Return the serve status.
GetStatus(context.Context, *google_protobuf.Empty) (*ServerStatus, error)
StartServer(context.Context, *google_protobuf.Empty) (*ServerStatus, error)
GetModuleLogLevel(context.Context, *LogLevelRequest) (*LogLevelResponse, error)
SetModuleLogLevel(context.Context, *LogLevelRequest) (*LogLevelResponse, error)
RevertLogLevels(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error)
}
 
func RegisterAdminServer(s *grpc.Server, srv AdminServer) {
s.RegisterService(&_Admin_serviceDesc, srv)
}

从上面可以出Admin服务对外提供了5个方法,这5个方法的实现具体在fabric/core/admin.go中:

// ServerAdmin implementation of the Admin service for the Peer
type ServerAdmin struct {
}
 
// GetStatus reports the status of the server
func (*ServerAdmin) GetStatus(context.Context, *empty.Empty) (*pb.ServerStatus, error) {
status := &pb.ServerStatus{Status: pb.ServerStatus_STARTED}
log.Debugf("returning status: %s", status)
return status, nil
}
 
// StartServer starts the server
func (*ServerAdmin) StartServer(context.Context, *empty.Empty) (*pb.ServerStatus, error) {
status := &pb.ServerStatus{Status: pb.ServerStatus_STARTED}
log.Debugf("returning status: %s", status)
return status, nil
}
 
// GetModuleLogLevel gets the current logging level for the specified module
// TODO Modify the signature so as to remove the error return - it's always been nil
func (*ServerAdmin) GetModuleLogLevel(ctx context.Context, request *pb.LogLevelRequest) (*pb.LogLevelResponse, error) {
logLevelString := flogging.GetModuleLevel(request.LogModule)
logResponse := &pb.LogLevelResponse{LogModule: request.LogModule, LogLevel: logLevelString}
return logResponse, nil
}
 
// SetModuleLogLevel sets the logging level for the specified module
func (*ServerAdmin) SetModuleLogLevel(ctx context.Context, request *pb.LogLevelRequest) (*pb.LogLevelResponse, error) {
logLevelString, err := flogging.SetModuleLevel(request.LogModule, request.LogLevel)
logResponse := &pb.LogLevelResponse{LogModule: request.LogModule, LogLevel: logLevelString}
return logResponse, err
}
 
// RevertLogLevels reverts the log levels for all modules to the level
// defined at the end of peer startup.
func (*ServerAdmin) RevertLogLevels(context.Context, *empty.Empty) (*empty.Empty, error) {
err := flogging.RevertToPeerStartupLevels()
 
return &empty.Empty{}, err
}

从上面的代码我们可以发现GetStatus和StartServer函数的内容是一样的,StartServer函数只返回了一个状态,其他没有任何操作,也就是说默认Admin服务都是启动的,那Admin服务最有意义的方法应该是日志等级相关的操作了。GetModuleLogLevel获取当前模块的日志等级,SetModuleLogLevel设置当前日志模块等级。RevertLogLevels方法可以将当前peer的日志等级恢复到启动时的等级。

四、系统链码的Register和Deploy

serve函数除了启动EventsServerChaincodeSupport服务之外还做一件比较重要的任务,就是注册和部署系统链码。系统链码的Register相当于用户链码的install。系统链码的Deploy相当于用户链码的实例化。本小节主要介绍系统链码的安装和部署的时机,下一小节我们详细介绍系统链码的安装和部署流程。

系统链码安装是scc.RegisterSysCCS函数,这个函数的调用路径是serve函数调用了registerChaincodeSupport函数,然后调用了scc.RegisterSysCCS

func registerChaincodeSupport(grpcServer *grpc.Server, ccEpFunc ccEndpointFunc) {
//get user mode
userRunsCC := chaincode.IsDevMode()
 
 //get chaincode startup timeout
ccStartupTimeout := viper.GetDuration("chaincode.startuptimeout")
if ccStartupTimeout < time.Duration(5)*time.Second {
logger.Warningf("Invalid chaincode startup timeout value %s (should be at least 5s); defaulting to 5s", ccStartupTimeout)
ccStartupTimeout = time.Duration(5) * time.Second
} else {
logger.Debugf("Chaincode startup timeout value set to %s", ccStartupTimeout)
}
 
ccSrv := chaincode.NewChaincodeSupport(ccEpFunc, userRunsCC, ccStartupTimeout)
 
 //Now that chaincode is initialized, register all system chaincodes.
scc.RegisterSysCCs()
 
pb.RegisterChaincodeSupportServer(grpcServer, ccSrv)
}

系统的链码的安装是scc.DeploySysCCS,这个函数的调用路径是serve函数调用了InitSysCCs函数,然后调用了scc.DeploySysCCs:

func initSysCCs() {
 //deploy system chaincodes
scc.DeploySysCCs("")
logger.Infof("Deployed system chaincodess")
}

五、总结

本小节主要介绍node start 命令的执行流程,也就是serve函数,serve函数会启动了EventsServer、ChaincodeSupport、Endorser、 GossipService 服务,另外serve函数还安装和部署了系统链码,下一章节我们介绍系统链码安装和部署的详细流程。


-END-

001周末班+1611二维码.jpg

区块链应用案例手箭头.png

003学习路径+公众号.jpg



7*24客服电话

150-1118-1611

扫码添加助教微信

二维码
菜单 在线咨询 回到顶部
关闭