以太坊启动源码分析

以太坊入口代码位于cmd/geth/main.go,先看一下main()函数:

func main() {

    if err := app.Run(os.Args); err != nil {

        fmt.Fprintln(os.Stderr, err)

        os.Exit(1)

    }

}

 

显然,使用了urfave/cli库,具体可以参见之前一篇博文:https://blog.csdn.net/turkeycock/article/details/80359654

这里有个疑问,为什么没有看到app的flag和command配置呢?我们了解一下Go语言的执行流程就明白了,借用网上的一张神图:

 

可以看到,main()并不是真正意义上的入口,在初始化完常量和变量以后,会先调用模块的init()函数,然后才是main()函数。所以初始化的工作是在init()函数里完成的:

func init() {

    // Initialize the CLI app and start Geth

    app.Action = geth

    app.HideVersion = true // we have a command to print the version

    app.Copyright = "Copyright 2013-2018 The go-ethereum Authors"

    app.Commands = []cli.Command{

        // See chaincmd.go:

        initCommand,

        ……

    }

    sort.Sort(cli.CommandsByName(app.Commands))


    app.Flags = append(app.Flags, nodeFlags...)

    ……


    app.Before = func(ctx *cli.Context) error {

        runtime.GOMAXPROCS(runtime.NumCPU())

        if err := debug.Setup(ctx); err != nil {

            return err

        }

        // Start system runtime metrics collection

        go metrics.CollectProcessMetrics(3 * time.Second)


        utils.SetupNetwork(ctx)

        return nil

    }


    app.After = func(ctx *cli.Context) error {

        debug.Exit()

        console.Stdin.Close() // Resets terminal mode.

        return nil

    }

}

 

可以看到:app.Action=geth,如果没有添加任何command参数的话,主入口是geth()函数。

flag的配置代码位于cmd/utils/flags.go,command的配置代码跟main.go在同一个包中,分散在不同的文件里。

在进入主入口之前,app.Before做了3件事情:

I. runtime.GOMAXPROCS():设置最大可用处理器数

II. metrics.CollectProcessMetrics():创建一个goroutine,每3秒监测一次系统的ram和disk状态

III. utils.SetupNetwork():配置gas limit值

下面开始看geth()函数:

func geth(ctx *cli.Context) error {

    node := makeFullNode(ctx)

    startNode(ctx, node)

    node.Wait()

    return nil

}

 

可以看到,主要做了3件事情:

I. 创建结点

II. 启动结点

III. 结点进入等待状态

 

下面一个一个的分析:

1. 创建结点

func makeFullNode(ctx *cli.Context) *node.Node {

    stack, cfg := makeConfigNode(ctx)

    utils.RegisterEthService(stack, &cfg.Eth)

    ……

    return stack

}

 

中间一堆代码默认情况下不会走进去,先忽略~ 

所以这个函数就干了2件事:创建结点、注册EthService。

1.1 创建结点

先看makeConfigNode()函数:

func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {

    // Load defaults.

    cfg := gethConfig{

        Eth: eth.DefaultConfig,

        Shh: whisper.DefaultConfig,

        Node: defaultNodeConfig(),

        Dashboard: dashboard.DefaultConfig,

    }


    // Load config file.

    if file := ctx.GlobalString(configFileFlag.Name); file != "" {

        if err := loadConfig(file, &cfg); err != nil {

            utils.Fatalf("%v", err)

        }

    }


    // Apply flags.

    utils.SetNodeConfig(ctx, &cfg.Node)

    stack, err := node.New(&cfg.Node)

    if err != nil {

        utils.Fatalf("Failed to create the protocol stack: %v", err)

    }

    utils.SetEthConfig(ctx, stack, &cfg.Eth)

    if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {

        cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)

    }


    utils.SetShhConfig(ctx, stack, &cfg.Shh)

    utils.SetDashboardConfig(ctx, &cfg.Dashboard)


    return stack, cfg

}

 

主要是初始化和加载一些配置,其中最重要的一行是通过node.New()创建结点,变量名叫stack,代表协议栈的含义。

先看一下结点的默认配置,代码位于node/defaults.go:

var DefaultConfig = Config{

    DataDir: DefaultDataDir(),

    HTTPPort: DefaultHTTPPort,

    HTTPModules: []string{"net", "web3"},

    HTTPVirtualHosts: []string{"localhost"},

    WSPort: DefaultWSPort,

    WSModules: []string{"net", "web3"},

    P2P: p2p.Config{

        ListenAddr: ":30303",

        MaxPeers: 25,

        NAT: nat.Any(),

    },

}

 

mac默认的datadir位于$HOME/Library/Ethereum,HTTP默认端口号8545,WebSocket默认端口号8546,P2P默认端口号30303,最大支持25个对等结点。

utils.SetNodeConfig()代码位于cmd/utils/flags.go,主要是检查有没有一些global的配置,如果有的话覆盖掉刚刚的默认配置。代码比较简单就不分析了。

接下来就是调用node.New()创建结点了,代码位于node/node.go:

func New(conf *Config) (*Node, error) {

    // Copy config and resolve the datadir so future changes to the current

    // working directory don't affect the node.

    confCopy := *conf

    conf = &confCopy

    if conf.DataDir != "" {

        absdatadir, err := filepath.Abs(conf.DataDir)

        if err != nil {

            return nil, err

        }

        conf.DataDir = absdatadir

    }


……


    // Ensure that the AccountManager method works before the node has started.

    // We rely on this in cmd/geth.

    am, ephemeralKeystore, err := makeAccountManager(conf)

……


    // Note: any interaction with Config that would create/touch files

    // in the data directory or instance directory is delayed until Start.

    return &Node{

        accman: am,

        ephemeralKeystore: ephemeralKeystore,

        config: conf,

        serviceFuncs: []ServiceConstructor{},

        ipcEndpoint: conf.IPCEndpoint(),

        httpEndpoint: conf.HTTPEndpoint(),

        wsEndpoint: conf.WSEndpoint(),

        eventmux: new(event.TypeMux),

        log: conf.Logger,

    }, nil

}

 

可以看出,主要做了3件事:

I. 把datadir转成绝对路径

II. 调用makeAccountManager()初始化账号管理器

III. 创建一个Node实例并返回

 

关于账号管理系统后面会专门写一篇文章分析,这里就先略过了。

至此,我们获得了一个Node实例,makeConfigNode()函数就分析完了。

 

看一下Node结构中的一些重要成员:

I. accman:刚刚创建的账号管理器实例

II. config:创建该结点使用的配置

III. serviceFuncs:一个函数指针数组,保存所有注册Service的构造函数

那么Service是一个什么样的概念呢?先看一下构造函数的原型:

type ServiceConstructor func(ctx *ServiceContext) (Service, error)

 

出现了两个新类型:ServiceContext和Service。先看一下ServiceContext的定义:

// ServiceContext is a collection of service independent options inherited from

// the protocol stack, that is passed to all constructors to be optionally used;

// as well as utility methods to operate on the service environment.

type ServiceContext struct {

    config *Config

    services map[reflect.Type]Service // Index of the already constructed services

    EventMux *event.TypeMux // Event multiplexer used for decoupled notifications

    AccountManager *accounts.Manager // Account manager created by the node.

}

 

从注释可以看出,ServiceContext主要是存储了一些从结点(或者叫协议栈)那里继承过来的、和具体Service无关的一些信息,比如结点config、account manager等。其中有一个services字段保存了当前正在运行的所有Service,是一个map类型,key是Service的类型,value是Service实例。

接下来看一下Service的定义:

type Service interface {

    // Protocols retrieves the P2P protocols the service wishes to start.

    Protocols() []p2p.Protocol


    // APIs retrieves the list of RPC descriptors the service provides

    APIs() []rpc.API


    // Start is called after all services have been constructed and the networking

    // layer was also initialized to spawn any goroutines required by the service.

    Start(server *p2p.Server) error


    // Stop terminates all goroutines belonging to the service, blocking until they

    // are all terminated.

    Stop() error

}

 

Service是一个接口,定义了4个需要实现的函数。换句话说,任何实现了这4个方法的类型,都可以称之为一个Service。

 

1.2 注册eth Service

接下来分析utils.RegisterEthService()方法:(cmd/utils/flags.go)

func RegisterEthService(stack *node.Node, cfg *eth.Config) {

    var err error

    if cfg.SyncMode == downloader.LightSync {

        err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {

            return les.New(ctx, cfg)

        })

    } else {

        err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {

            fullNode, err := eth.New(ctx, cfg)

            if fullNode != nil && cfg.LightServ > 0 {

                ls, _ := les.NewLesServer(fullNode, cfg)

                fullNode.AddLesServer(ls)

            }

            return fullNode, err

        })

    }

    if err != nil {

        Fatalf("Failed to register the Ethereum service: %v", err)

    }

}

 

这里有2个分支,如果是配置成轻量级结点会进上面那个分支,如果是全结点会进else分支。然后调用Node的Register()方法注册Service:(node/node.go)

func (n *Node) Register(constructor ServiceConstructor) error {

    n.lock.Lock()

    defer n.lock.Unlock()


    if n.server != nil {

        return ErrNodeRunning

    }

    n.serviceFuncs = append(n.serviceFuncs, constructor)

    return nil

}

 

看到了吧?所谓的注册,其实就是把Service的构造函数放进结点的serviceFuncs数组。

这里只是注册,具体要等到启动结点的时候才真正调用构造函数创建Service。

 

2. 启动结点

我们回到cmd/geth/main.go分析下一个函数startNode():

func startNode(ctx *cli.Context, stack *node.Node) {

    // Start up the node itself

    utils.StartNode(stack)


// Subscribe and process account related events

……

}

 

后面一部分订阅和处理账号event相关的代码先忽略,看一下utils.StartNode()函数,代码位于cmd/utils/cmd.go:

func StartNode(stack *node.Node) {

    if err := stack.Start(); err != nil {

        Fatalf("Error starting protocol stack: %v", err)

    }

    go func() {

        sigc := make(chan os.Signal, 1)

        signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)

        defer signal.Stop(sigc)

        <-sigc

        log.Info("Got interrupt, shutting down...")

        go stack.Stop()

        for i := 10; i > 0; i-- {

            <-sigc

            if i > 1 {

                log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)

            }

        }

        debug.Exit() // ensure trace and CPU profile data is flushed.

        debug.LoudPanic("boom")

    }()

}

 

后半段的goroutine主要是为了捕获中断信号以停止结点运行的,所以这里实际就是调用了Node的Start()函数。这个函数比较长,我们分成几段来看:

I. 创建P2P server

II. 创建Service

III. 启动P2P server

IV. 启动Service

V. 启动RPC server

 

2.1 创建P2P server

以太坊是一个去中心化的平台,所以首要任务是创建P2P server:

func (n *Node) Start() error {

……

    n.lock.Lock()

    defer n.lock.Unlock()


    // Short circuit if the node's already running

    if n.server != nil {

        return ErrNodeRunning

    }

    if err := n.openDataDir(); err != nil {

        return err

    }


    // Initialize the p2p server. This creates the node key and

    // discovery databases.

    n.serverConfig = n.config.P2P

    n.serverConfig.PrivateKey = n.config.NodeKey()

    n.serverConfig.Name = n.config.NodeName()

    n.serverConfig.Logger = n.log

    if n.serverConfig.StaticNodes == nil {

        n.serverConfig.StaticNodes = n.config.StaticNodes()

    }

    if n.serverConfig.TrustedNodes == nil {

        n.serverConfig.TrustedNodes = n.config.TrustedNodes()

    }

    if n.serverConfig.NodeDatabase == "" {

        n.serverConfig.NodeDatabase = n.config.NodeDB()

    }

    running := &p2p.Server{Config: n.serverConfig}

    n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)


……

}

 

代码首先做了一些检查工作:加锁、判断结点是否已经运行、检查datadir是否可以打开,然后初始化P2P server配置,最后用该配置创建了一个p2p.Server实例。首先初始化Node中的services字段,然后遍历serviceFuncs,也就是之前注册的所有Service的构造函数列表。在创建Service实例之前,先为每个Service创建一个ServiceContext,之前提到过,ServiceContext里存储的是从Node继承过来的一些信息。接着通过构造函数创建Service实例,然后加入到service这个map中。

 

2.2 创建Service

// Otherwise copy and specialize the P2P configuration

    services := make(map[reflect.Type]Service)

    for _, constructor := range n.serviceFuncs {

        // Create a new context for the particular service

        ctx := &ServiceContext{

            config: n.config,

            services: make(map[reflect.Type]Service),

            EventMux: n.eventmux,

            AccountManager: n.accman,

        }

        for kind, s := range services { // copy needed for threaded access

            ctx.services[kind] = s

        }

        // Construct and save the service

        service, err := constructor(ctx)

        if err != nil {

            return err

        }

        kind := reflect.TypeOf(service)

        if _, exists := services[kind]; exists {

            return &DuplicateServiceError{Kind: kind}

        }

        services[kind] = service

    }

 

首先初始化Node中的services字段,然后遍历serviceFuncs,也就是之前注册的所有Service的构造函数列表。在创建Service实例之前,先为每个Service创建一个ServiceContext,之前提到过,ServiceContext里存储的是从Node继承过来的一些信息。接着通过构造函数创建Service实例,然后加入到service这个map中。

 

2.3 启动P2P server

// Gather the protocols and start the freshly assembled P2P server

    for _, service := range services {

        running.Protocols = append(running.Protocols, service.Protocols()...)

    }

    if err := running.Start(); err != nil {

        return convertFileLockError(err)

    }

 

首先把所有Service支持的协议集合到一起,然后调用p2p.Server的Start()方法启动P2P server(代码位于p2p/server.go)。P2P server会绑定一个UDP端口和一个TCP端口,端口号是相同的(默认30303)。UDP端口主要用于结点发现,TCP端口主要用于业务数据传输,基于RLPx加密传输协议。所以具体来说,Start()方法做了以下几件事情:

I. 侦听UDP端口:用于结点发现

II. 发起UDP请求获取结点表:内部会启动goroutine来完成

III. 侦听TCP端口:用于业务数据传输,基于RLPx协议

IV. 发起TCP请求连接到其他结点:也是启动goroutine完成

 

相对应的代码如下所示:

func (srv *Server) Start() (err error) {

……

srv.running = true

……


// 侦听UDP端口(用于结点发现)

    if !srv.NoDiscovery || srv.DiscoveryV5 {

        addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)

        if err != nil {

            return err

        }

        conn, err = net.ListenUDP("udp", addr)

        if err != nil {

            return err

        }

        realaddr = conn.LocalAddr().(*net.UDPAddr)

        if srv.NAT != nil {

            if !realaddr.IP.IsLoopback() {

                go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")

            }

            // TODO: react to external IP changes over time.

            if ext, err := srv.NAT.ExternalIP(); err == nil {

                realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}

            }

        }

    }


……

// 发起UDP请求获取结点表(内部会启动goroutine)

    if !srv.NoDiscovery {

        cfg := discover.Config{

            PrivateKey: srv.PrivateKey,

            AnnounceAddr: realaddr,

            NodeDBPath: srv.NodeDatabase,

            NetRestrict: srv.NetRestrict,

            Bootnodes: srv.BootstrapNodes,

            Unhandled: unhandled,

        }

        ntab, err := discover.ListenUDP(conn, cfg)

        if err != nil {

            return err

        }

        srv.ntab = ntab

    }


……

    dynPeers := srv.maxDialedConns()

    dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)


……

// 侦听TCP端口(用于业务数据传输,基于RLPx协议)

    if srv.ListenAddr != "" {

        if err := srv.startListening(); err != nil {

            return err

        }

    }


……

// 启动新线程发起TCP连接请求

    go srv.run(dialer)

……

}

 

2.4 启动Service 

// Start each of the services

    started := []reflect.Type{}

    for kind, service := range services {

        // Start the next service, stopping all previous upon failure

        if err := service.Start(running); err != nil {

            for _, kind := range started {

                services[kind].Stop()

            }

            running.Stop()


            return err

        }

        // Mark the service started for potential cleanup

        started = append(started, kind)

    }

主要就是依次调用每个Service的Start()方法,然后把启动的Service的类型存储到started表中。另外如果启动过程中发现某个Service之前已经启动过了,则返回错误。

 

2.5 启动RPC server

// Lastly start the configured RPC interfaces

    if err := n.startRPC(services); err != nil {

        for _, service := range services {

            service.Stop()

        }

        running.Stop()

        return err

    }

 

RPC即远程调用接口,也就是Service对外暴露出来的API。具体调用方式可以分为以下几种:

I. InProc:进程内调用,严格来说这种不能算是RPC,不过出于架构上的统一,以太坊也为这种调用方式配置了一个handler

II. IPC:进程间调用,通过Unix Domain Socket(datadir/geth.ipc)

III. HTTP:通过HTTP协议调用

IV. WS:通过WebSocket调用

接下来看下startRPC()函数就比较清楚了,主要就是启动这几项RPC服务。每种RPC服务都需要提供一个handler,另外除了InProc之外,其他3种服务还需要启动一个server来监听外部连接请求。RPC具体实现细节留待后面的文章分析。代码如下:

func (n *Node) startRPC(services map[reflect.Type]Service) error {

    // Gather all the possible APIs to surface

    apis := n.apis()

    for _, service := range services {

        apis = append(apis, service.APIs()...)

    }

    // Start the various API endpoints, terminating all in case of errors

    if err := n.startInProc(apis); err != nil {

        return err

    }

    if err := n.startIPC(apis); err != nil {

        n.stopInProc()

        return err

    }

    if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil {

        n.stopIPC()

        n.stopInProc()

        return err

    }

    if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {

        n.stopHTTP()

        n.stopIPC()

        n.stopInProc()

        return err

    }

    // All API endpoints started successfully

    n.rpcAPIs = apis

    return nil

}

 

到这里结点启动的流程就分析完了。

 

3. 结点进入等待状态

其实就是让主线程进入阻塞状态,保持进程不退出,直到从channel中收到stop消息。

具体就是调用Node的Wait()函数:

func (n *Node) Wait() {

    n.lock.RLock()

    if n.server == nil {

        n.lock.RUnlock()

        return

    }

    stop := n.stop

    n.lock.RUnlock()


    <-stop

}

 

至此,以太坊的启动代码就走读完了,总结一下:

I. 以太坊启动主要做了3件事:创建结点、启动结点、结点进入等待状态

II. 创建结点过程主要做了2件事:根据配置创建Node实例、注册eth Service

III. 启动结点过程主要做了5件事:创建P2P server、创建Service、启动P2P server、启动Service、启动RPC server

IV. 最后结点进入等待状态,等待退出

 

延伸阅读:以太坊RPC源码分析

免责声明:信息仅供参考,不构成投资及交易建议。投资者据此操作,风险自担。
如果觉得文章对你有用,请随意赞赏收藏
相关推荐
相关下载
登录后评论
最新评论 (3)
Copyright © 2019 宽客在线