以太坊RPC源码分析

这篇详细分析一下RPC的完整流程。

以太坊遵循JSON RPC规范,API列表参见以下链接:https://github.com/ethereum/wiki/wiki/JSON-RPC

本文主要分析一下API注册和API调用的主要流程。

 

1. API注册流程

先看一张图,理清各组件之间的关系:

 

 

Node启动时会调用各Service的构造函数创建Service实例。Service是一个接口,要对外暴露RPC API的模块都需要实现该接口,比如Ethereum, Whisper, Swarm等等,在图中统一用<module>表示。

在Node的startRPC()函数中,首先会调用每个Service的APIs()函数,把所有RPC API收集到一个数组中:

// Gather all the possible APIs to surface

    apis := n.apis()

    for _, service := range services {

        apis = append(apis, service.APIs()...)

    }

 

我们看一下API结构的定义,代码位于rpc/types.go:

type API struct {

    Namespace string // namespace under which the rpc methods of Service are exposed

    Version string // api version for DApp's

    Service interface{} // receiver instance which holds the methods

    Public bool // indication if the methods must be considered safe for public use

}

 

这里有一个Namespace的概念,可以理解为API的类别分组,方便管理。用过geth客户端的同学可能有印象,我们会在命令行指定类似下面的参数:

geth --rpc --rpcapi “eth,net,web3”

 

这里的eth,net,web3就是Namespace,指定了需要对外暴露哪些类型的RPC API。

除了Namespace,还有一个Service字段,注意它的类型是interface{},而不是我们之前提到的Service接口,所以个人感觉这个命名不太好,改成receiver可能就不容易引起混淆了。

那么这个Service指向哪里呢?我们以eth模块为例,看一下返回的API数组里到底存的是什么内容,代码位于eth/backend.go:

func (s *Ethereum) APIs() []rpc.API {

    ……


    // Append all the local APIs and return

    return append(apis, []rpc.API{

        {

            Namespace: "eth",

            Version: "1.0",

            Service: NewPublicEthereumAPI(s),

            Public: true,

        }, {

            Namespace: "eth",

            Version: "1.0",

            Service: NewPublicMinerAPI(s),

            Public: true,

        },

……)

}

 

其他先不看,我们先看下NewPublicEthereumAPI是个什么东西,代码位于eth.api.go:

func NewPublicEthereumAPI(e *Ethereum) *PublicEthereumAPI {

    return &PublicEthereumAPI{e}

}


type PublicEthereumAPI struct {

    e *Ethereum

}

 

可以看到就,就是一个普通的struct,内部有一个指针指向模块本身,其实就是一个wrapper。现在再回过头来看上面那张图的黄色部分,是不是就很清楚了?

获得所有RPC API的集合后,就开始启动RPC server了。上一篇提到RPC有4种方式:InProc、IPC、HTTP、WS,在Node中对应的字段都用不同颜色标识了。流程都是类似的,这里以HTTP为例进行分析。

HTTP相关的有3个字段:

  • httpEndpoint:这是一个字符串,表示IP和端口号,默认是localhost:8545
  • httpListener:这是一个接口,调用net.Listen()时返回,包含了Accept()/Close()/Addr()这3个函数,可以用来接受和关闭连接
  • httpHandler:这是一个需要重点分析的结构,定义位于rpc/types.go:

 
type Server struct {

    services serviceRegistry


    run int32

    codecsMu sync.Mutex

    codecs *set.Set

}


type serviceRegistry map[string]*service // collection of services

 

可以看到,其中有一个services字段,是一个map,key是Namespace,value是一个service实例。注意这个service类型首字母是小写的,所以是不对外暴露的,定义位于rpc/types.go:

type service struct {

    name string // name for service

    typ reflect.Type // receiver type

    callbacks callbacks // registered handlers

    subscriptions subscriptions // available subscriptions/notifications

}

 

service中包含了两个字段callbacks和subscriptions,继续看:

type callbacks map[string]*callback // collection of RPC callbacks

type subscriptions map[string]*callback // collection of subscription callbacks


type callback struct {

    rcvr reflect.Value // receiver of method

    method reflect.Method // callback

    argTypes []reflect.Type // input argument types

    hasCtx bool // method's first argument is a context (not included in argTypes)

    errPos int // err return idx, of -1 when method cannot return error

    isSubscribe bool // indication if the callback is a subscription

}

 

可以看到,subscription是一种特殊的callback,而callback结构中包含了RPC API所需要的所有信息:

  • rcvr:方法的接收者,这是一个反射值类型,其实就是指向了之前的NewPublicEthereumAPI
  • method:对应rcvr中的函数
  • argTypes:函数参数的类型列表
  • hasCtx:标识函数的第一个参数是否是context.Context类型
  • isSubscribe:是否是subscription类型(因为它们共用一个结构定义)

 

至此,各组件直接的关系就分析完了,可以回顾一下上面的那张图。

 

下面分析一下API注册的具体流程。

看一下rpc/endpoints.go中的startHTTPEndpoint()函数:

func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) {

……

    handler := NewServer()

    for _, api := range apis {

        if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {

            if err := handler.RegisterName(api.Namespace, api.Service); err != nil {

                return nil, nil, err

            }

            log.Debug("HTTP registered", "namespace", api.Namespace)

        }

    }

……

}

 

具体就是调用了handler.RegisterName()函数,代码位于rpc/server.go:

func (s *Server) RegisterName(name string, rcvr interface{}) error {

    if s.services == nil {

        s.services = make(serviceRegistry)

    }


    svc := new(service)

    svc.typ = reflect.TypeOf(rcvr)

    rcvrVal := reflect.ValueOf(rcvr)


……

    methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)


    // already a previous service register under given sname, merge methods/subscriptions

    if regsvc, present := s.services[name]; present {

        ……

        for _, m := range methods {

            regsvc.callbacks[formatName(m.method.Name)] = m

        }

        for _, s := range subscriptions {

            regsvc.subscriptions[formatName(s.method.Name)] = s

        }

        return nil

    }


    svc.name = name

    svc.callbacks, svc.subscriptions = methods, subscriptions


    ……


    s.services[svc.name] = svc

    return nil

}

 

可以看到是先创建一个service实例,然后填充它的callbacks和subscriptions字段。其中suitableCallbacks()函数会检查API定义是否符合标准,然后创建callback实例放入map中。

另外如果发现API属于同一个Namespace,会进行合并,因为这里的结构是map而不是数组。

至此,API注册流程就分析完了,接下来分析API调用流程。

 

2. API调用流程:

先试用一下API调用,有一个直观的印象:

curl -H "Content-Type: application/json" -X POST --data '{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x1b4", true],"id":1}' localhost:8545

 

返回结果:

 

请求数据遵循JSON RPC规范:http://www.jsonrpc.org/specification

具体来说,请求对象需要包括下面4个字段:

  • jsonrpc:协议版本号,固定是2.0
  • method:请求调用的函数名,可以看到是Namespace_Method这种命名方式
  • params: 函数参数列表,一般是一个数组
  • id:客户端和服务器之前通信的一个标识,服务器返回响应时必须返回相同的id。可以是数字或者字符串,不建议设为NULL

 

相应的,返回的响应需要包含以下字段:

  • jsonrpc:协议版本号,固定是2.0
  • result/error:返回的结果或者错误,二选一
  • id:客户端和服务器之前通信的一个标识,服务器返回响应时必须返回相同的id。可以是数字或者字符串,不建议设为NULL

 

API调用相关的结构和流程参见下图:

 

下面开始代码分析。还是回到rpc/endpoints.go中的startHTTPEndpoint()函数,主要看后半段:

func StartHTTPEndpoint(endpoint string, apis []API, modules []string, cors []string, vhosts []string) (net.Listener, *Server, error) {

……

    var (

        listener net.Listener

        err error

    )

    if listener, err = net.Listen("tcp", endpoint); err != nil {

        return nil, nil, err

    }

    go NewHTTPServer(cors, vhosts, handler).Serve(listener)

    return listener, handler, err

}

 

首先侦听TCP端口,获得listener接口实例。然后创建了一个http.Server实例,并启动一个goroutine调用它的Serve()方法。看一下NewHTTPServer()函数(rpc/http.go):

func NewHTTPServer(cors []string, vhosts []string, srv *Server) *http.Server {

    // Wrap the CORS-handler within a host-handler

    handler := newCorsHandler(srv, cors)

    handler = newVHostHandler(vhosts, handler)

    return &http.Server{Handler: handler}

}

 

这里有一个Handler参数,用到了装饰者模式,其实最终实现还是在rpc.Server中。Handler是一个接口,需要实现它的ServerHTTP()函数来处理网络数据,代码如下:

func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {

……


    body := io.LimitReader(r.Body, maxRequestContentLength)

    codec := NewJSONCodec(&httpReadWriteNopCloser{body, w})

    defer codec.Close()


    w.Header().Set("content-type", contentType)

    srv.ServeSingleRequest(ctx, codec, OptionMethodInvocation)

}

 

可以看到,首先创建一个Reader用于读取原始数据,然后创建一个JSON的编解码器,最后调用ServeSingleRequest()函数:

func (s *Server) ServeSingleRequest(ctx context.Context, codec ServerCodec, options CodecOption) {

    s.serveRequest(ctx, codec, true, options)

}



func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {

……



    s.codecs.Add(codec)

……



    for atomic.LoadInt32(&s.run) == 1 {

        reqs, batch, err := s.readRequest(codec)

……



// If a single shot request is executing, run and return immediately

        if singleShot {

            if batch {

                s.execBatch(ctx, codec, reqs)

            } else {

                s.exec(ctx, codec, reqs[0])

            }

            return nil

        }

        // For multi-shot connections, start a goroutine to serve and loop back

        pend.Add(1)



        go func(reqs []*serverRequest, batch bool) {

            defer pend.Done()

            if batch {

                s.execBatch(ctx, codec, reqs)

            } else {

                s.exec(ctx, codec, reqs[0])

            }

        }(reqs, batch)



}

return nil

}

 

可以看到,就是一个循环,每次调用readRequest()解析请求数据,然后调用exec()或者execBatch()执行API调用。

首先看一下readRequest()函数:

func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {

    reqs, batch, err := codec.ReadRequestHeaders()

……


    requests := make([]*serverRequest, len(reqs))


    for i, r := range reqs {

        var ok bool

        var svc *service

……


        if svc, ok = s.services[r.service]; !ok { // rpc method isn't available

            requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}

            continue

        }

……


        if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method

            requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}

            if r.params != nil && len(callb.argTypes) > 0 {

                if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {

                    requests[i].args = args

                } else {

                    requests[i].err = &invalidParamsError{err.Error()}

                }

            }

            continue

        }


        requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}

    }


    return requests, batch, nil

}

 

这里就对应于图上的两个红色箭头部分:首先codec把原始JSON数据解析为一个rpcRequest数组,然后遍历这个数组,根据Namespace找到对应的service,再从service的callbacks表中查询需要调用的method,最后组装成一个新的数据结构serverRequest。

接着就是调用exec()执行这个severRequest指向的API实现了:

func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {

    var response interface{}

    var callback func()

    if req.err != nil {

        response = codec.CreateErrorResponse(&req.id, req.err)

    } else {

        response, callback = s.handle(ctx, codec, req)

    }


    if err := codec.Write(response); err != nil {

        log.Error(fmt.Sprintf("%v\n", err))

        codec.Close()

    }


    // when request was a subscribe request this allows these subscriptions to be actived

    if callback != nil {

        callback()

    }

}

 

可以看到调用了handle()方法获取响应数据,然后通过codec组装成JSON发送给请求端。

看一下handle()函数:

func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {

……


    arguments := []reflect.Value{req.callb.rcvr}

    if req.callb.hasCtx {

        arguments = append(arguments, reflect.ValueOf(ctx))

    }

    if len(req.args) > 0 {

        arguments = append(arguments, req.args...)

    }


    // execute RPC method and return result

    reply := req.callb.method.Func.Call(arguments)

    if len(reply) == 0 {

        return codec.CreateResponse(req.id, nil), nil

    }

……


    return codec.CreateResponse(req.id, reply[0].Interface()), nil

}

 

首先处理参数列表,如果发现调用的函数需要Context参数则加到最前面。然后就是通过反射调用API了,最后把结果送给codec,按JSON RPC的格式要求组装成响应返回就可以了。

这里提到一个Context类型,主要是用来存储一些和请求相关的信息,初始化代码位于rpc/http.go:

ctx := context.Background()

    ctx = context.WithValue(ctx, "remote", r.RemoteAddr)

    ctx = context.WithValue(ctx, "scheme", r.Proto)

    ctx = context.WithValue(ctx, "local", r.Host)

 

具体实现比较有意思,类似于一个单链表:

 

至此,API调用的整个流程就分析完了。

 

延伸阅读:以太坊源码学习启动篇

免责声明:信息仅供参考,不构成投资及交易建议。投资者据此操作,风险自担。
如果觉得文章对你有用,请随意赞赏收藏
相关推荐
相关下载
登录后评论
Copyright © 2019 宽客在线