【GeeRPC】项目总结:使用 Golang 实现 RPC 框架
文章目录
- 项目总结:使用 Golang 实现 RPC 框架
- 谈谈 RPC 框架
- 什么是 RPC 框架
- 实现一个 RPC 框架需要什么?
- 项目总结·文章结构安排
- Part1:消息编码
- 编解码器的实现
- 通信过程
- Part2:服务端
- Accept:阻塞地等待连接请求并开启 goroutine 进行处理
- ServeConn:对连接进行处理直到连接挂断
- serveCodec:通过解码器读取 Request,进行处理,并将结果编码回发 Response
- request 结构
- readRequest:通过 GobCodec 从字节流构造 request
- findService:根据服务和方法获取服务名和方法名
- 回到 readRequest
- 回到 serveCodec
- handleRequest:处理一次具体的 RPC 调用请求
- sendResponse:向 client 发送具体的响应
- Register:将服务注册到 Server 当中,服务当中包含方法
- 回顾 Service
- 回到 Register
- ServeHTTP:处理 HTTP 请求
- HandleHTTP
- Part3:注册中心与服务发现
- GeeRegistry:GeeRPC 的注册中心
- GeeRegistryDiscovery:GeeRPC 的服务发现模块
- MultiServersDiscovery:基本的服务发现模块
- 回到 GeeRegistryDiscovery
- Part4:高性能客户端
- Call 方法:客户端基于负载均衡策略选择单个服务实例执行 RPC 调用
- XDial:一个较为通用的 RPC 客户端连接函数,支持多种协议
- Client:底层的客户端实例
- Call 实例:承载一次 RPC 调用
- 回到 Client,实现 Client 的接收功能 receive
- 同步进度,回到 XDial 和 call 方法
- Client 的 Call 方法:发起 RPC 调用并通过 Context 引入超时机制
- Broadcast 方法:客户端向所有可以用的 Server 进行 RPC 调用广播
- Part5:从 main 函数出发完整地体验一次 GeeRPC 的使用
项目总结:使用 Golang 实现 RPC 框架
二月中旬我参考 Geektutu 的 GeeRPC 教程动手实现了一个 RPC 框架。现在我来对 GeeRPC 项目进行总结,采用的方式与总结 Gee 以及 Zinx 不同,使用一个 GeeRPC 框架需要服务器(Server)、客户端(Client)和注册中心(Registry)三部分,我将首先对每个部分进行剖析,之后从真正使用一个 RPC 远程调用出发,对 GeeRPC 的工作流程进行分析。
学习 GeeRPC 的八篇文章链接如下:
- 【GeeRPC】7天用 Go 从零实现 RPC 框架 GeeRPC
- 【GeeRPC】Day1:服务端与消息编码
- 【GeeRPC】Day2:支持并发与异步的客户端
- 【GeeRPC】Day3:服务注册(Service Register)
- 【GeeRPC】Day4:超时处理(timeout)
- 【GeeRPC】Day5:支持 HTTP 协议
- 【GeeRPC】Day6:负载均衡
- 【GeeRPC】Day7:服务发现与注册中心
谈谈 RPC 框架
什么是 RPC 框架
RPC(Remote Procedure Call,远程过程调用)框架是一种用于实现分布式系统中跨网络调用远程服务的工具。它允许程序像调用本地函数一样调用远程服务器上的函数,隐藏了底层网络通信的复杂性。
RPC 框架的核心组件如下:
- 客户端(Client):发起远程调用的进程;
- 服务端(Server):提供远程调用的进程;
- 存根(Stub):客户端和服务端各有一个存根,分别负责调用请求打包(序列化)和接收响应(反序列化);
- 通信协议(Protocol):定义数据传输的格式和规则;
RPC 的工作流程:
- 客户端调用:客户端通过本地存根发起远程调用;
- 序列号:存根将调用信息(方法名、参数等)序列化后发给服务端;
- 网络传输:序列化后的数据通过网络传输到服务端;
- 反序列化:服务端存根接收并反序列化数据;
- 执行调用:服务端执行相应的方法;
- 返回结果:服务端将结果序列化后返回给客户端;
- 客户端接收:客户端存根接收并反序列化结果,返回给调用者。
RPC 的优点:
- 透明性:调用远程服务就像调用本地函数一样简单;
- 高效性:优化网络通信,提升性能。最直观的表现是 RPC 将采用更加高效的数据编码方式,提高数据在网络当中的传输效率,这一点有别于基于 HTTP 的纯文本传输进行通信的方法;
- 跨语言支持:许多 RPC 框架支持多种编程语言。
RPC 的缺点:
- 复杂性:处理网络故障、超时等问题增加了系统的复杂性;
- 耦合性:服务端和客户端接口紧密耦合,接口改变将会影响双方。
实现一个 RPC 框架需要什么?
首先,需要先确定通信双方所选择的通信协议(比如 TCP、HTTP/2、WebSocket 等)。之后,需要约定好通信双方的编码格式(比如 XML、JSON、Protobuf 等)。
然后,我们还需要解决一系列服务的可用性问题,比如处理连接超时、支持异步请求和并发等。
在分布式场景下,可能会有很多服务实例,客户端并不关心这些实例的地址和部署位置,只关心自己能否接收到 RPC 调用期待的结果。为了实现上述需求,自然引出了注册中心和负载均衡。客户端和服务端只需要感知注册中心,服务端将其所能够提供的服务注册到注册中心,客户端从注册中心拉取可以调用的服务即可。注册中心应该实现服务的动态添加、删除、使用心跳确保服务可用等功能。
项目总结·文章结构安排
与 Gee 或 Zinx 不同,GeeRPC 是一个接近千行的较大的项目,如果直接采用“从 main 函数”开始的方法对整个项目进行分析非常的困难,因为 main 当中表面上一个很简单的函数调用背后可能隐藏着很庞杂的实现细节。因此我打算从 RPC 框架的必要组件入手,从最简单的编解码方法开始,逐步分析客户端、服务端以及注册中心,最后再从 main 函数出发总览全局,透彻地理解这整个项目。
Part1:消息编码
编解码器的实现
一个典型的 RPC 调用如下:
err = client.Call("Arith.Multiply", args, &reply)
上述语句的行为是,客户端 client 调用 Call,Call 的参数是方法名、方法参数及保存返回值的引用(&reply
),Call 调用的返回值是 err。
根据 RPC 调用的特点,GeeRPC 将消息抽象为两部分,即 Header 和 Body。Header 结构的定义如下:
type Header struct {ServiceMethod string // format: "Service.Method"Seq uint64 // sequence number chosen by clientError string
}
- ServiceMethod 保存调用的方法名;
- Seq 保存请求的序列号,可以被认为是某个请求的 ID,用于区分不同的请求;
- Error 是错误信息,在客户端发送的消息当中置为空,如果服务端发生错误,将把 Error 置在消息的 Header 当中保存。
Header 是 Message 的一部分,设定好 Header 之后,我们进一步考虑如何对 Message 进行编码。GeeRPC 将编解码方法抽象为一个 Codec 接口,并定义了一些方法:
type Codec interface {io.CloserReadHeader(*Header) errorReadBody(interface{}) errorWrite(*Header, interface{}) error
}
可以看到,Codec 接口内嵌了 io.Closer
的方法,用于关闭资源。此外,Codec 还实现了:
- ReadHeader:参数就是 Header 类型的指针,用于解码出一条 Message 中的 Header;
- ReadBody:参数是空接口类型,用于解码 Body;
- Write:参数是 Header 的指针和空接口类型,当然对应的就是 Header 和 Body 这两个部分。Write 方法的作用是对 Message 进行编码,即:将 Header 和 Body 编码。
GeeRPC 框架中采用 gob
的格式对 Message 进行编码,gob
是一种用于 Go 对象序列化和反序列化的编码格式。gob
是 Go 语言特有的二进制编码格式,专门为 Go 的数据结构设计,能够高效地编码和解码 Go 的数据类型。
在对 codec 进行初始化时,会新建一个实现了 Codec 接口的 GobCodec 来对数据进行编解码:
type NewCodecFunc func(closer io.ReadWriteCloser) Codectype Type stringconst (GobType Type = "application/gob"JsonType Type = "application/json"
)var NewCodecFuncMap map[Type]NewCodecFuncfunc init() {NewCodecFuncMap = make(map[Type]NewCodecFunc)NewCodecFuncMap[GobType] = NewGobCodec // NewGobCodec 还没定义, 将在 gob.go 定义
}
GobCodec 的实现如下:
type GobCodec struct {conn io.ReadWriteCloserbuf *bufio.Writerdec *gob.Decoderenc *gob.Encoder
}// 👇 确保 GobCodec 实现了 Codec 接口
var _ Codec = (*GobCodec)(nil)// 👇 GobCodec 的工厂函数
func NewGobCodec(conn io.ReadWriteCloser) Codec {buf := bufio.NewWriter(conn)return &GobCodec{conn: conn,buf: buf,dec: gob.NewDecoder(conn),enc: gob.NewEncoder(buf),}
}
GobCodec 的成员包括:
- conn:保存连接实例;
- buf:缓冲区,一般设置缓冲区的作用是提升性能;
- enc:gob 的编码器;
- dec:gob 的解码器。
GobCodec 实现了的 Codec 的方法如下:
func (c *GobCodec) ReadHeader(h *Header) error {return c.dec.Decode(h)
}func (c *GobCodec) ReadBody(body interface{}) error {return c.dec.Decode(body)
}func (c *GobCodec) Write(h *Header, body interface{}) (err error) {defer func() {_ = c.buf.Flush()if err != nil {_ = c.Close()}}()if err = c.enc.Encode(h); err != nil {log.Println("rpc: gob error encoding header:", err)return}if err = c.enc.Encode(body); err != nil {log.Println("rpc: gob error encoding body:", err)return}return
}func (c *GobCodec) Close() error {return c.conn.Close()
}
可以看到,GobCodec 方法的实现还是非常简单的,但是需要明确的一点就是,ReadHeader 和 ReadBody 是从当前连接的字节流(stream)当中读取字节并进行解码,得到的结果将会保存到 Header 指针或空接口类型当中。
通信过程
对于 GeeRPC 而言,通信双方唯一需要协商的内容就是消息的编解码方式,这部分信息将由 Option 来承载:
const MagicNumber = 0x3bef5c// Option 用于客户端和服务端之间的协议协商
type Option struct {MagicNumber int // MagicNumber marks this's a geerpc requestCodecType codec.Type // client may choose different Codec to encode bodyConnectTimeout time.DurationHandleTimeout time.Duration
}// DefaultOption 是默认的协议选项, 使用 Gob 编码
var DefaultOption = &Option{MagicNumber: MagicNumber,CodecType: codec.GobType,ConnectTimeout: time.Second * 10,
}
当然,Option 当中还承载了“连接超时”和“处理超时”的信息,我们将在后续客户端与服务端的超时处理部分进行总结。还是先回到消息编码的部分,为了实现上的简单,GeeRPC 客户端固定采用 JSON 格式对 Option 进行编码,而后续的 Head 和 Body 则采用 CodecType 指定的格式进行编码。也就是说,报文的发送格式为:
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
在某个连接所发送的字节流当中,Option 被固定在最开始的位置,Header 和 Body 可以有多个,其形式可能是:
| Option | Header1 | Body1 | Header2 | Body2 | ...
至此,我们便完成了 GeeRPC 消息编码部分的总结,后续在 Server 和 Client 中用到 Codec 的时候,我将对消息编码部分的内容进行再次回顾。
Part2:服务端
我们已经处理完了通信的过程,包括在客户端和服务端之间协商并确定通信的方式,以及编解码器的具体实现,现在我们来回顾服务端的实现。
总得来说,对于一个提供服务的 Server,它最基本的功能就是,根据收到的 Request,进行相应的业务处理,再将业务处理的结构以及一些额外的信息(比如序号、服务可用状态等)构造成 Response 原路发送回请求服务的客户端。
我们再细化一下,在一个 RPC 框架下,Server 接收到的消息是服务端和客户端已经协商好的编码形式的字节流,Server 需要先将 Header 和 Body 解析出来。我们刚才已经回顾过,Header 当中包含 ServiceMethod,因此我们可以从 Header 得知 Client 请求调用的是哪个方法。之后,Server 端从注册中心找到对应的服务并调用,得到业务处理的结果,构造成 Response 通过 Codec 再写回到字节流当中,完成一次 RPC 调用。
Server 在实现上只具有一个成员,那就是保存注册服务的 map:
// Server represents an RPC Server.
type Server struct {serviceMap sync.Map // serviceMap 存储注册的服务, 键为服务名, 值为 *service
}
Server 的方法非常的多,包括:
Register(rcvr interface{}) error
findService(serviceMethod string) (svc *service, mtype *methodType, err error)
ServeConn(conn io.ReadWriteCloser)
serveCodec(cc codec.Codec, opt *Option)
readRequestHeader(cc codec.Codec) (*codec.Header, error)
readRequest(cc codec.Codec) (*request, error)
sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex)
handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration)
Accept(lis net.Listener)
ServeHTTP(w http.ResponseWriter, req *http.Request)
HandleHTTP()
其中导出的方法包括:
Register(rcvr interface{}) error
ServeConn(conn io.ReadWriteCloser)
Accept(lis net.Listener)
ServeHTTP(w http.ResponseWriter, req *http.Request)
HandleHTTP()
私有的方法包括:
findService(serviceMethod string) (svc *service, mtype *methodType, err error)
serveCodec(cc codec.Codec, opt *Option)
readRequestHeader(cc codec.Codec) (*codec.Header, error)
readRequest(cc codec.Codec) (*request, error)
sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex)
handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration)
启动一个服务器之前需要先建立一个服务器实例,Server 的工厂函数非常简单:
// NewServer returns a new Server.
func NewServer() *Server {return &Server{}
}
现在我们对 Server 的每一个方法进行剖析:
Accept:阻塞地等待连接请求并开启 goroutine 进行处理
之后通过 Accept 方法接收一个 Listener 对象,表示 Server 可以在这个地址对请求进行监听,Accept 方法的实现是:
// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {for {conn, err := lis.Accept()if err != nil {log.Println("rpc server: accept error:", err)return}go server.ServeConn(conn)}
}
在 for loop 当中,Accept 阻塞地接收连接请求,并通过 ServeConn 方法对得到的 conn 进行进一步的处理。
ServeConn:对连接进行处理直到连接挂断
在 Accept 中启动一个 ServeConn 方法的 goroutine,参数就是 conn,用于处理从 conn 得到的字节流:
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// ServeConn: 处理单个连接
func (server *Server) ServeConn(conn io.ReadWriteCloser) {defer func() { _ = conn.Close() }()var opt Optionif err := json.NewDecoder(conn).Decode(&opt); err != nil {log.Println("rpc server: options error: ", err)return}if opt.MagicNumber != MagicNumber {log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)return}f := codec.NewCodecFuncMap[opt.CodecType]if f == nil {log.Printf("rpc server: invalid codec type %s", opt.CodecType)return}server.serveCodec(f(conn), &opt)
}
需要再次强调的是,conn 发送过来的是字节流,而不是结构化的数据,因此在理解 ServeConn 的行为时,我们应该转换思路。
首先,ServeConn defer 了一个关闭连接的 func,它的作用是在 err 发生的时候将连接关闭。需要注意的是 ServeConn 在连接正常的情况下是不会 return 的,因为最后一条语句所使用的 serveCodec 方法使用了 for loop 阻塞。
之后,ServeConn 中建立了一个 Option 类型的 opt,用于保存从字节流当中读取到的数据。首先使用 json 的 Decoder 读取 opt,原因在于我们的约定当中使用 JSON 保存 Option,而使用 gob 保存 Header 和 Body。
读取完 Option 之后,先比对读到的 MagicNumber 是否相同,这也可以视为一种加密的手段。如果 MagicNumber 相同,便从 opt 当中读取 CodecType(默认就是 gob),通过 codec 当中的 NewCodecFuncMap 建立解码相应编码字节流的解码器 f。需要注意 f 的类型是:func(closer io.ReadWriteCloser) Codec
。
最后调用 serveCodec 方法,从字节流中继续解码 Header 和 Body。
serveCodec:通过解码器读取 Request,进行处理,并将结果编码回发 Response
serveCodec 可以说是 Server 的关键中间组件,它的作用是将 Request 解码与 Response 编码联通在一起。
首先,serveCodec 的第一个参数是 Codec 接口类型,而它的调用者 ServeConn 传入的参数是 f(conn)
,f 保存的是 GobCodec 的工厂函数,因此 f(conn)
将会新建一个 GobCodec 类型的编解码器实例。
serveCodec 当中首先初始化了一个 sending 和 wg,二者的作用都是并发控制。sending 是一个 Mutex 的指针,而 wg 是 WaitGroup 的指针。
之后,开启一个 for loop,通过 readRequest 读取 Header 和 Body,并构造 request 实例。在进一步剖析 serveCodec 之前,我们先来仔细研究一下 readRequest 方法和 request 结果。
request 结构
request 结构的定义如下:
// request stores all information of a call
// request 存储一个请求的所有信息.
type request struct {h *codec.Header // header of requestargv, replyv reflect.Value // argv and replyv of requestmtype *methodType // 方法svc *service // 服务信息
}
显然,request 是用来保存一次 client RPC 调用的所有信息的结构体,包括 Header、RPC 的参数及返回值类型、方法和服务信息。request 结构不具有方法。
readRequest:通过 GobCodec 从字节流构造 request
readRequest 方法的实现如下:
func (server *Server) readRequest(cc codec.Codec) (*request, error) {h, err := server.readRequestHeader(cc)if err != nil {return nil, err}req := &request{h: h}req.svc, req.mtype, err = server.findService(h.ServiceMethod)if err != nil {return req, err}req.argv = req.mtype.newArgv()req.replyv = req.mtype.newReplyv()// make sure that argvi is a pointer, ReadBody need a pointer as parameterargvi := req.argv.Interface()if req.argv.Type().Kind() != reflect.Ptr {argvi = req.argv.Addr().Interface()}if err = cc.ReadBody(argvi); err != nil {log.Println("rpc server: read argv err:", err)}return req, nil
}
其中又包含着 readRequestHeader 方法,readRequestHeader 方法的实现如下:
func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {var h codec.Headerif err := cc.ReadHeader(&h); err != nil {if err != io.EOF && err != io.ErrUnexpectedEOF {log.Println("rpc server: read header error:", err)}return nil, err}return &h, nil
}
readRequestHeader 方法的作用就是通过 Codec 解码出字节流当中 Header。
在读取到 Header 之后,在 readRequest 当中会根据 Header 对 request 进行构造,从 Header 当中可以读取到 Client 此次 RPC 调用所需要的方法名,我们需要通过方法名到 findService 方法当中去寻找具体的方法。
findService 方法较为复杂,它设计到服务 service 的设计。
findService:根据服务和方法获取服务名和方法名
findService 的设计如下:
// findService: 根据服务(service)和方法(method)获取服务名和方法名
func (server *Server) findService(serviceMethod string) (svc *service, mtype *methodType, err error) {dot := strings.LastIndex(serviceMethod, ".")if dot < 0 {err = errors.New("rpc server: service/method request ill-formed: " + serviceMethod)return}serviceName, methodName := serviceMethod[:dot], serviceMethod[dot+1:]// 👆 根据 serviceMethod 解析并获取服务名和方法名svci, ok := server.serviceMap.Load(serviceName) // 从 serviceMap 中查找服务if !ok {err = errors.New("rpc server: can't find service " + serviceName)return}svc = svci.(*service)mtype = svc.method[methodName] // 从 svc.method 中查找方法if mtype == nil {err = errors.New("rpc server: can't find method " + methodName)}return
}
它的参数是 Header 当中保存的 serviceMethod,返回的参数是 service 类型的指针、methodType 类型的指针以及错误信息。
service 结构保存的是一个服务实例:
type service struct {name string // 服务的名称typ reflect.Type // 服务的类型rcvr reflect.Value // 服务的接收者(即服务实例)method map[string]*methodType // 服务的方法集合, 键为方法名, 值为 methodType
}
而 methodType 保存的是服务类型,其中包括方法参数的类型、返回值的类型以及调用次数:
type methodType struct {method reflect.MethodArgType reflect.TypeReplyType reflect.TypenumCalls uint64
}
回到 findService 函数本身,我们来研究一下 findService 函数体当中的行为。首先,findService 通过 strings.LastIndex
对 serviceMethod 进行分割,即:将 service 和 Method 通过 .
进行分割,得到服务名和方法名。
根据服务名,到 Server 的 serviceMap 成员当中查找服务:
svci, ok := server.serviceMap.Load(serviceName) // 从 serviceMap 中查找服务
查找到服务之后,再从服务当中查找方法:
svc = svci.(*service)
mtype = svc.method[methodName] // 从 svc.method 中查找方法
研究到这里,其实我对 serviceMap 以及服务和方法的查找仍然有些模糊,在后面的服务注册环节应该会研究到此中细节。在此我们先认定,通过 findService 方法我们可以获得服务和方法的句柄。
回到 readRequest
饶了一个圈,我们回到了 readRequest。我们先进行进度同步:readRequest 的作用是读取字节流并从中构造 request 结构,request 表示的就是 client 所进行的一次 RPC 调用传入的服务 + 方法名、参数以及保存返回值的引用。现在我们通过 findService 方法得到了 client 想要使用的方法的服务与方法句柄:
req.svc, req.mtype, err = server.findService(h.ServiceMethod)
将 svc 和 mtype 一并保存到 request 类型的 req 当中。接下来我们通过 golang 的反射机制构造本次 RPC 调用对应方法所需要的参数值类型和返回值类型,具体要用到的是 methodType 的 newArgv 方法和 newReplyv 方法:
func (m *methodType) newArgv() reflect.Value {var argv reflect.Value// arg may be a pointer type or a value typeif m.ArgType.Kind() == reflect.Ptr {argv = reflect.New(m.ArgType.Elem())} else {argv = reflect.New(m.ArgType).Elem()}return argv
}func (m *methodType) newReplyv() reflect.Value {// reply must be a pointer typereplyv := reflect.New(m.ReplyType.Elem())switch m.ReplyType.Elem().Kind() {case reflect.Map:replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem()))case reflect.Slice:replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0, 0))}return replyv
}
将结果保存到 req 实例:
req.argv = req.mtype.newArgv()
req.replyv = req.mtype.newReplyv()
我们需要确保参数类型是指针,因为我们需要从字节流的 Body 当中读取参数,使用 Codec 解码的结果应该保存到指针当中:
// make sure that argvi is a pointer, ReadBody need a pointer as parameter
argvi := req.argv.Interface()
if req.argv.Type().Kind() != reflect.Ptr {argvi = req.argv.Addr().Interface()
}
最后通过 Codec 的 ReadBody 方法将读取到的 argvi(传入的参数)保存到 req 当中,这一步是自动完成的,因为 argvi 引用的是 req.argv
的地址。
至此 readRequest 完成,将 req 返回,req 是一个 request 类型的指针。
回到 serveCodec
我们再次同步进度。在 serveCodec 当中,我们通过 req, err := server.readRequest(cc)
从字节流构造了 request 对象。之后进行错误处理,如果错误不为空,那么通过 sendResponse 方法发送一个非法请求的提示给客户端,sendResponse 方法将在后面的 handleRequest 进行分析。
如果 readRequest 在对 request 进行构造时没有出错,就将 WaitGroup 通过 Add 加一(WaitGroup 的作用就是用于 goroutine 的管理),然后开启一个 goroutine,调用 handleRequest 方法来处理 request 对象,即:根据 request 进行具体的业务处理。
由于 serveCodec 使用了 for loop,因此它将阻塞地等待来自 conn 的字节流当中的消息。当有错误出现导致 for loop 中断时,WaitGroup 的作用体现了出来,它将通过 wg.Wait()
等待其下辖的所有 goroutine 执行完毕,才会结束。
现在我们来对 handleRequest 方法进行研究,看一下 GeeRPC 如何处理来自 Client 的 RPC 调用请求。
handleRequest:处理一次具体的 RPC 调用请求
handleRequest 方法的实现如下:
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {defer wg.Done()called := make(chan struct{}) // 用于通知请求处理完成sent := make(chan struct{}) // 用于通知响应已发送go func() {err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用 req.svc.call 执行远程方法called <- struct{}{} // 向 called 通道发送信号if err != nil { // 如果方法返回错误, 设置错误信息并发送错误响应req.h.Error = err.Error()server.sendResponse(cc, req.h, invalidRequest, sending)sent <- struct{}{}return}server.sendResponse(cc, req.h, req.replyv.Interface(), sending) // 如果方法执行成功, 发送正常响应sent <- struct{}{} // 向 sent 通道发送信号, 表示响应已发送}()// 👇 监听请求处理的超时和完成if timeout == 0 { // 如果 timeout 为 0, 表示不启用超时机制, 直接等待请求完成<-called<-sentreturn}select {case <-time.After(timeout):req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)server.sendResponse(cc, req.h, invalidRequest, sending)case <-called:<-sent}
}
首先,handleRequest 的参数包括 Codec 编解码器、request 指针 req、sending 锁、wg WaitGroup 以及 time。handleRequest 首先 defer 一个 wg.Done()
它等价于 wg.Add(-1)
,即通过 WaitGroup 当前的线程执行完毕。
之后 handleRequest 建立了两个 struct{}
类型的 channel,用于 goroutine 间通信:
called := make(chan struct{}) // 用于通知请求处理完成
sent := make(chan struct{}) // 用于通知响应已发送
随后开启一个 goroutine:
go func() {err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用 req.svc.call 执行远程方法called <- struct{}{} // 向 called 通道发送信号if err != nil { // 如果方法返回错误, 设置错误信息并发送错误响应req.h.Error = err.Error()server.sendResponse(cc, req.h, invalidRequest, sending)sent <- struct{}{}return}server.sendResponse(cc, req.h, req.replyv.Interface(), sending) // 如果方法执行成功, 发送正常响应sent <- struct{}{} // 向 sent 通道发送信号, 表示响应已发送
}()
在这个 goroutine 当中直接根据 service 对 method 进行调用,并传入参数及返回值引用。未导出的 call 方法相当关键,它是 service 类型的方法,可以说 err := req.svc.call(req.mtype, req.argv. req.replyv)
就是一次具体的 RPC 调用。call 方法的实现如下:
func (s *service) call(m *methodType, argv, replyv reflect.Value) error {/*s *service: 表示当前服务的实例m *methodType: 表示要调用的方法的信息argv reflect.Value: 表示方法的参数值replyv reflect.Value: 表示方法的返回值(通过指针的方式保存返回值)返回值: error*/atomic.AddUint64(&m.numCalls, 1) // 原子性地增加调用次数, 保证并发安全f := m.method.Func // 获取方法的反射函数returnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv}) // 通过反射调用方法if errInter := returnValues[0].Interface(); errInter != nil { // 处理返回值return errInter.(error)}return nil
}
执行 RPC 调用之后,通过 called 发送信号通知外部的父 goroutine 当前请求已完成。无论是否出错,都用过 sendResponse 方法将 response 回发给 client。随后通过 sent 发送信号通知外部的父 goroutine 响应已发送。
最后要进行的就是超时处理,如果 timeout == 0
,那么不进行超时处理,阻塞地等待 called 和 sent 信号的到来便退出。否则通过 select + time.After 进行超时处理。
sendResponse:向 client 发送具体的响应
sendResponse 的实现比较简单:
func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {sending.Lock()defer sending.Unlock()if err := cc.Write(h, body); err != nil {log.Println("rpc server: write response error:", err)}
}
它的作用就是将传入的 Header 以及 Body 通过 Codec 编码回写到发送给 Client 的字节流。
至此,我们认真解析了服务端从字节流当中解析数据,并执行一次 RPC 调用,再将 Response 发回给 Client 的过程。所涉及的方法包括:
- Accept:阻塞地等待连接请求并开启 goroutine 进行处理
- ServeConn:对连接进行处理直到连接挂断
- serveCodec:通过解码器读取 Request,进行处理,并将结果编码回发 Response
- readRequest:通过 GobCodec 从字节流构造 request
- readRequestHeader:从字节流中解析 Header
- findService:根据服务和方法获取服务名和方法名
- handleRequest:处理一次具体的 RPC 调用请求
- sendResponse:向 client 发送具体的响应
Server 还剩下:
- Register;
- ServeHTTP;
- HandleHTTP;
三个方法没有深入研究。
Register:将服务注册到 Server 当中,服务当中包含方法
我们可以通过这样的方式来将服务注册到 Server 当中:
type Foo intfunc (f Foo) Sum(args Args, reply *int) error {*reply = args.Num1 + args.Num2return nil
}func (f Foo) Sleep(args Args, reply *int) error {time.Sleep(time.Second * time.Duration(args.Num1))*reply = args.Num1 + args.Num2return nil
}func Xxx() {var foo Foo// ... start a server_ = server.Register(&foo)
}
Register 的实现如下:
// Register publishes in the server the set of methods
func (server *Server) Register(rcvr interface{}) error {// Register 注册一个服务到服务端s := newService(rcvr) // 创建服务实例// 👇 使用 sync.Map 的 LoadOrStore 方法存储服务, 如果服务已经存在, 则返回错误if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup {return errors.New("rpc: service already defined: " + s.name)}return nil
}
不难看出,Register 方法当中新建了一个服务实例,通过 newService 工厂函数来实现。Register 的参数是一个空接口类型,根据刚才的例子我们已经知道,传入的其实是一个具有若干种方法的结构,这个结构被称为 Service,它所具有的方法称作 Method,二者合并起来构成serveMethod
或service.Method
。
回顾 Service
Service 结构的定义我们之前已经提到,现在不妨回顾一下:
type service struct {name string // 服务的名称typ reflect.Type // 服务的类型rcvr reflect.Value // 服务的接收者(即服务实例)method map[string]*methodType // 服务的方法集合, 键为方法名, 值为 methodType
}// newService 创建了一个新的服务实例
func newService(rcvr interface{}) *service {s := new(service)s.rcvr = reflect.ValueOf(rcvr) // 获取服务实例的值s.name = reflect.Indirect(s.rcvr).Type().Name() // 获取服务实例的类型名称s.typ = reflect.TypeOf(rcvr)if !ast.IsExported(s.name) {log.Fatalf("rpc server: %s is not a valid service name", s.name)}s.registerMethods() // 调用 registerMethods 方法注册对应服务的方法, registerMethods 在下面实现return s // 返回一个 service 实例
}
通过 registerMethods 方法,可以将 service 具备的 Method 进行注册,并保存到这个 service 实例的 method 当中,method 成员是一个 map,它的 key 是 string,即方法名,value 是 methodType 的指针,可以理解为一种函数指针。registerMethods 方法及其相关结构与方法如下:
type methodType struct {method reflect.MethodArgType reflect.TypeReplyType reflect.TypenumCalls uint64
}// registerMethods 注册了服务的方法
func (s *service) registerMethods() {s.method = make(map[string]*methodType)for i := 0; i < s.typ.NumMethod(); i++ { // 遍历服务的所有方法method := s.typ.Method(i)mType := method.Type// 👇 检查方法的签名是否符合 RPC 方法的规范// 1. 方法必须有三个入参和一个输出参数if mType.NumIn() != 3 || mType.NumOut() != 1 {continue}// 2. 输出参数的类型必须是 error 类型if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {continue}argType, replyType := mType.In(1), mType.In(2)// 3. 参数和返回值的类型必须是导出的或内置类型if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {continue}// 经过检查, 符合规定的方法注册到 s.method 当中s.method[method.Name] = &methodType{method: method,ArgType: argType,ReplyType: replyType,}// 日志: 输出注册的方法名log.Printf("rpc server: register %s.%s\n", s.name, method.Name)}
}// isExportedOrBuiltinType 的作用是检查类型是否是导出的或内置的类型
func isExportedOrBuiltinType(t reflect.Type) bool {return ast.IsExported(t.Name()) || t.PkgPath() == ""
}
回到 Register
创建服务实例并保存了该服务当中的方法之后,即可将这个服务加入到 Server 的 serviceMap 成员当中了,后续通过服务名就可以索引到具体的服务:
if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup {return errors.New("rpc: service already defined: " + s.name)}
ServeHTTP:处理 HTTP 请求
ServeHTTP 的实现如下:
// ServeHTTP implements a http.Handler that answers RPC requests.
// ServeHTTP 方法实现了 http.Handler 接口, 用于处理 HTTP 请求
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {if req.Method != "CONNECT" {// 首先检查请求方法是否为 CONNECT, 如果不是, 返回 405 Method Not Allowed 错误, 并提示客户端必须使用 CONNECT 方法w.Header().Set("Content-Type", "text/plain; charset=utf-8")w.WriteHeader(http.StatusMethodNotAllowed)_, _ = io.WriteString(w, "405 must CONNECT\n")return}// 如果请求方法是 CONNECT, 则通过 http.Hijacker 接口劫持连接, 获取底层的 net.Conn 对象, 劫持连接后, 服务器可以直接// 控制底层的 TCP 连接, 而不需要 HTTP 协议进行通信, conn 应该就是劫持到的 TCP 连接conn, _, err := w.(http.Hijacker).Hijack()if err != nil {log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())return}// 劫持连接后, 服务端向客户端发送一个简单的 HTTP 响应, 表示连接已建立_, _ = io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")// 最后调用 server.ServeConn(conn) 方法, 开始处理 RPC 请求server.ServeConn(conn)
}
我的理解是 ServeHTTP 方法的作用就是处理有 HTTP 请求到来的情况,不难看出 ServeHTTP 实际上是实现了 http.Handler 的接口,这样 Server 就可以直接通过 http.ListenAndServe 对 HTTP 请求进行监听了。
HandleHTTP
// HandleHTTP registers an HTTP handler for RPC messages on rpcPath.
// It is still necessary to invoke http.Serve(), typically in a go statement.
// HandleHTTP 方法用于处理 HTTP 处理器. 它将 defaultDebugPath 路径与 server 关联起来, 使得客户端访问该路径时, 会调用 server
// 的 ServeHTTP 方法defaultDebugPath = "/debug/geerpc" // 默认的调试信息路径, 用于注册 HTTP 处理器func (server *Server) HandleHTTP() {http.Handle(defaultRPCPath, server)http.Handle(defaultDebugPath, debugHTTP{server})log.Println("rpc server debug path:", defaultDebugPath)
}
HandleHTTP 方法将 defaultDebugPath(一个常量)与 server 关联了起来,使得客户端访问 defaultDebugPath 时,调用 Server。
至此,我们已经完成了 GeeRPC 服务端的剖析,现在我们来回顾一下 demo 中给出的支持并发与异步的高性能客户端。
Part3:注册中心与服务发现
在 Geektutu 给出的教程当中,注册中心(Registry)和服务发现(Discovery)两个模块被安排在了 Day7,但由于我们并非按照教程的顺序对 GeeRPC 进行学习,而是从学习完 GeeRPC 项目之后复盘整个项目的角度出发,因此在这篇文章中我首先对注册中心与服务发现进行回顾,它连接了服务端与客户端,随后再对客户端进行剖析。
遵循与分析服务端时一样的套路,在分析注册中心与服务发现模块之前,我们需要先搞清楚为什么需要这两个模块,这两个模块在 GeeRPC 框架当中扮演着什么角色?
如上图所示基于注册中心,客户端和服务端不需要对彼此感知,而只需要对注册中心感知,客户端即可发起一次服务端提供的远程调用。
在分布式场景下,提供相同服务的多个不同服务实例可能分布在不同的物理位置上,此时就需要一个服务发现模块来维护不同服务实例的状态,并负责远程调用的资源调度。
GeeRegistry:GeeRPC 的注册中心
GeeRPC 实现的注册中心 GeeRegistry 是一个相对简单的注册中心,大体上它的功能就是保存当前可用的服务器列表,并通过心跳(Heartbeat)机制确认服务是否可用。为了实现上的简单,GeeRegistry 通过 HTTP 协议提供服务,且所有信息都通过 HTTP Header 承载。使用 HTTP 的 GET 方法可以获取所有可用的服务列表,使用 HTTP 的 POST 方法可以添加服务实例或发送心跳。
在深入源码之前我们进一步分析一下,在实现 Gee 时我们已经知道,为了使 GeeRegistry 可以通过 HTTP 协议提供服务,在 golang 的语境下,GeeRegistry 应该实现 HTTP 的 Handler 接口,这样才能够通过 http.Serve 处理 HTTP Request,而实现 Handler 接口,其实就是要为 GeeRegistry 构造好一个 ServeHTTP 方法,它的参数是 ResponseWriter 和 Request 指针。所以在我们的构想当中,GeeRegistry 应该是具备一个 ServeHTTP 方法的,当然也确实具备。在 ServeHTTP 方法当中,应该分上面提到的两种情况分别处理 GET 和 POST 两种方法。
现在让我们来深入源码,首先研究一下 GeeRegistry 结构及其工厂函数:
type GeeRegistry struct {timeout time.Durationmu sync.Mutexservers map[string]*ServerItem
}type ServerItem struct {Addr stringstart time.Time
}const (defaultPath = "/_geerpc_/registry"defaultTimeout = time.Minute * 5
)func New(timeout time.Duration) *GeeRegistry {return &GeeRegistry{servers: make(map[string]*ServerItem),timeout: timeout,}
}var DefaultGeeRegistry = New(defaultTimeout)
GeeRegistry 的成员包含一个 timeout,一个互斥锁和一个保存服务器信息的 map。timeout 成员的作用就是记录心跳的周期。互斥锁 mu 确保同一时间只有一个协程在对注册中心进行修改。servers 的 key 是 string,value 是 *ServerItem
,ServerItem 包含地址 Addr 和服务开启时间 start(用于计算当前服务是否过期)。默认的过期时间是 5 分钟。
我们刚才已经说过,GeeRegistry 是通过 HTTP 协议来工作的,因此它实际上没有对外暴露的接口,而应该实现 ServeHTTP 方法,并通过 http.Handle 进行注册。这样 GeeRegistry 就可以基于 HTTP 协议提供服务。具体的实现为:
func HandleHTTP() {DefaultGeeRegistry.HandleHTTP(defaultPath)
}func (r *GeeRegistry) HandleHTTP(registryPath string) {http.Handle(registryPath, r)log.Println("rpc registry path:", registryPath)
}func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {switch req.Method {case "GET": // 返回所有可用的服务列表, 通过自定义字段 X-Geerpc-Servers 承载w.Header().Set("X-Geerpc-Servers", strings.Join(r.aliveServers(), ","))case "POST": // 添加服务实例或发送心跳,通过自定义字段 X-Geerpc-Server 承载addr := req.Header.Get("X-Geerpc-Server")if addr == "" {w.WriteHeader(http.StatusInternalServerError)return}r.putServer(addr)default:w.WriteHeader(http.StatusMethodNotAllowed)}
}
当我们在 main 函数当中启动服务之前,需要先启动注册中心,直接调用 HandleHTTP 函数即可,它封装了对 DefaultGeeRegistry 的 HandleHTTP 方法的调用。http.Handle 将 handler 与给定的 registryPath 这个 URL 相绑定,通过指定的 HTTP Method 对 URL 进行访问即可调用对应的方法,而这个 handler 正是 GeeRegistry 本身。当客户端向 GeeRegistry 发起 Request 时,GeeRegistry 通过 ServeHTTP 当中提供的方法回发 Response,完成服务。
现在我们来研究一下 GeeRegistry 的 ServeHTTP 方法如何实现。正如我们刚才所提到的,GeeRegistry 通过 HTTP 协议提供服务,GET 请求将会得到当前可用的服务列表,而 POST 请求则更新了 GeeRegistry 的服务列表,此处的更新有两种情况,一种是新的服务器注册,另一种是现有的服务器发过来的一次心跳。
我们已经提到,GeeRegistry 通过 HTTP Header 承载所有有用的信息。
对于 GET 方法,GeeRegistry 设置一个 X-Geerpc-Servers
头部字段,并将该字段的值设置为当前存活的 Servers 列表。当 HTTPClient 通过 GET 方法请求 GeeRegistry 服务时,就可以通过得到的 Response 当中的 Header 得到当前 GeeRPC 服务端可用的 Servers 列表。
我们刚才已经提到,HTTPClient 可以用过发送一个 POST 方法的 Request 将服务器信息发送给 Registry。当 GeeRegistry 接收到 Method 为 POST 的 Request 时,不需要回发 Response,而是解析这个 Request 当中的 X-Geerpc-Server
字段,从中得到对应的 Server 信息,并通过 putServer 方法将 Server 加入到 GeeRegistry 当中。对于已经存在且存活的 Server,putServer 则会更新 Server 的 start 字段,即更新 Server 的开始时间。putServer 方法的实现如下:
func (r *GeeRegistry) putServer(addr string) {r.mu.Lock()defer r.mu.Unlock()s := r.servers[addr]if s == nil { // 如果当前服务没有注册, 那么将它加入到注册中心的服务列表当中r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}} else { // 如果当前服务已经注册, 那么更新它的注册时间s.start = time.Now()}
}
可以看到,Server 的地址 addr 实际上就是一个字符串。通过 Heartbeat 和 sendHeartbeat 两个方法实现 GeeRegistry 的心跳机制,Heartbeat 方法是 GeeRegistry 对外暴露的接口,一个新创建的 Server 实例可以通过 Heartbeat 方法将自己注册到 GeeRegistry 当中。Heartbeat 和 sendHeartbeat 方法的实现如下:
func Heartbeat(registry, addr string, duration time.Duration) {if duration == 0 {// make sure there is enough time to send heartbeat before the service is removed from registryduration = defaultTimeout - time.Duration(1)*time.Minute}var err errorerr = sendHeartbeat(registry, addr)go func() {t := time.NewTimer(duration)for err == nil {<-t.Cerr = sendHeartbeat(registry, addr)}}()
}func sendHeartbeat(registry, addr string) error {log.Println(addr, "send heart beat to registry", registry)httpClient := &http.Client{}req, _ := http.NewRequest("POST", registry, nil)req.Header.Set("X-Geerpc-Server", addr)if _, err := httpClient.Do(req); err != nil {log.Println("rpc server: heart beat err:", err)return err}return nil
}
sendHeartbeat 方法的实现很好理解,在这个方法中将会创建一个匿名的 HTTPClient,并向 GeeRegistry 提供服务的 URL 发送一个 Request,Request 使用 POST 方法,通知 GeeRegistry 需要改变 Server list 的状态。
Heartbeat 当中有一个可以深挖的细节,那就是以下片段:
// ... ... ...
err = sendHeartbeat(registry, addr)
go func() {
t := time.NewTimer(duration)
for err == nil {<-t.Cerr = sendHeartbeat(registry, addr)
}
}()
// ... ... ...
我们首先要明确 duration 字段的作用,它的值比 Server 的默认过期时间 defaultTimeout
少一分钟。这个代码片段的作用就是每隔 (defaultTimeout - 1) * time.Minute
的时间就向 GeeRegistry 发送一次 Heartbeat,避免服务器超时。在 goroutine 当中,for loop 的终止条件是 sendHeartbeat 出现错误,当服务器出错时,停止发送心跳。
我们已经研究了如何通过 POST 方法的 HTTP Request 向 GeeRegistry 注册一个 Server,并通过心跳机制确保 Server 存活。现在我们回顾一下在何时使用 GET 方法通过 HTTP Request 从 GeeRegistry 获取 Server list。实际上,GET 方法应该在服务发现模块当中使用,服务发现模块的作用就是保存若干个当前可用的服务实例,并在客户端请求 RPC 调用时通过内置的调度机制选择服务实例实现服务调用。
根据以上的分析,我们接下来展开聊一聊 GeeRPC 的服务发现模块。
GeeRegistryDiscovery:GeeRPC 的服务发现模块
实际上分析到这一步,我们不难看出在 GeeRPC 的设计中,注册中心 Registry 和服务发现模块 Discovery 是互补的。
Registry 的功能可以概括为:
- 基于心跳机制维护一个当前存活的服务器列表;
- 接收新的服务器注册;
- 处理来自发现模块的拉取服务器列表的请求(即发现模块向 GeeRegistry 发起 GET 方法的 HTTP Request,得到当前存活的服务器列表);
而 Discovery 的功能可以概括为:
- 维护一个服务器列表(可以是自动的也可以是手动的,自动维护就是通过向 GeeRegistry 发起请求来完成);
- 当来自 client 的一次 RPC 请求到来时,根据某个调度算法,确定最终由哪个服务器来执行本次请求。
GeeRPC 的服务发现模块 GeeRegistryDiscovery 的定义如下:
type GeeRegistryDiscovery struct {*MultiServersDiscoveryregistry stringtimeout time.DurationlastUpdate time.Time
}const defaultUpdateTimeout = time.Second * 10func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {if timeout == 0 {timeout = defaultUpdateTimeout}d := &GeeRegistryDiscovery{MultiServersDiscovery: NewMultiServerDiscovery(make([]string, 0)),registry: registerAddr,timeout: timeout,}return d
}
它内嵌了一个 MultiServersDiscovery,这是一个仅支持手动对服务列表进行维护的发现中心,嵌入它可以复用很多它的方法。
MultiServersDiscovery:基本的服务发现模块
MultiServersDiscovery 的定义和方法如下:
type SelectMode intconst (RandomSelect SelectMode = iota // select randomlyRoundRobinSelect // select using Robbin Algorithm
)type Discovery interface { // Discovery 是一个接口类型, 包含了服务发现所需要的最基本的接口Refresh() error // Refresh 从注册中心更新服务列表Update(servers []string) error // Update 手动更新服务列表Get(mode SelectMode) (string, error) // Get 根据负载均衡策略, 选择一个服务实例GetAll() ([]string, error) // GetAll 返回所有服务实例
}// MultiServersDiscovery is a discovery for multi servers without a registry center
// user provides the server addresses explicitly instead
// MultiServersDiscovery 是一个具体的服务发现实现, 用于管理多个服务器地址
type MultiServersDiscovery struct {r *rand.Rand // generate random numbermu sync.RWMutex // protect followingservers []stringindex int // record the selected position for robbin algorithm
}// NewMultiServerDiscovery creates a MultiServerDiscovery instance
// 构造函数, 用于创建 MultiServerDiscovery 实例
func NewMultiServerDiscovery(servers []string) *MultiServersDiscovery {d := &MultiServersDiscovery{servers: servers,r: rand.New(rand.NewSource(time.Now().UnixNano())),}d.index = d.r.Intn(math.MaxInt32 - 1)return d
}// 这行代码确保 MultiServersDiscovery 实现了 Discovery 接口
var _ Discovery = (*MultiServersDiscovery)(nil)// Refresh doesn't make sense for MultiServersDiscovery, so ignore it
func (d *MultiServersDiscovery) Refresh() error {return nil
}// Update the servers of discovery dynamically
// Update 方法用于手动更新服务器列表
func (d *MultiServersDiscovery) Update(servers []string) error {d.mu.Lock()defer d.mu.Unlock()d.servers = serversreturn nil
}// Get a server according to mode
// Get 方法根据指定的选择模式返回一个服务器地址
func (d *MultiServersDiscovery) Get(mode SelectMode) (string, error) {d.mu.Lock()defer d.mu.Unlock()n := len(d.servers)if n == 0 {return "", errors.New("rpc discovery: no available servers")}switch mode {case RandomSelect:return d.servers[d.r.Intn(n)], nilcase RoundRobinSelect:s := d.servers[d.index%n]d.index = (d.index + 1) % nreturn s, nildefault:return "", errors.New("rpc discovery: not supported select mode")}
}// GetAll returns all servers in discovery
// GetAll 方法返回所有服务器地址的副本
func (d *MultiServersDiscovery) GetAll() ([]string, error) {d.mu.RLock()defer d.mu.RUnlock()// returns a copy of d.serversservers := make([]string, len(d.servers), len(d.servers))copy(servers, d.servers)return servers, nil
}
MultiServersDiscovery 共有四个成员字段以及四个方法。四个字段包括:
- r:用于生成随机数的随机种子;
- mu:保护 servers slice 的读写操作;
- servers:保存 servers 列表;
- index:保存用于 robin 算法的 index;
四个方法包括:
- Refresh:从注册中心更新服务列表,但由于基础的服务发现模块没有使用注册中心,因此 Refresh 方法我们留到 GeeRegistryDiscovery 再实现;
- Update:手动更新服务列表;
- Get:根据负载均衡策略选择一个服务实例;
- Get All:返回所有服务实例。
每一个方法的实现都比较好理解,此处不再赘述。
回到 GeeRegistryDiscovery
在 MultiServersDiscovery 的基础上,GeeRegistryDiscovery 还添加了 registry、timeout 和 lastUpdate 三个字段。registry 记录了 GeeRegistry 提供服务的 URL 地址,timeout 和 lastUpdate 用于确保当前 Discovery 维护的服务器列表没有超时。
基于 MultiServersDiscovery,GeeRegistryDiscovery 重新实现了 Discovery 接口的四个方法:
// Update 用于手动维护 Discovery 当中的服务器地址
func (d *GeeRegistryDiscovery) Update(servers []string) error {d.mu.Lock()defer d.mu.Unlock()d.servers = serversd.lastUpdate = time.Now()return nil
}// Refresh 自动从注册中心 GeeRegistry 请求当前处于 alive 状态的服务器地址
func (d *GeeRegistryDiscovery) Refresh() error {d.mu.Lock()defer d.mu.Unlock()if d.lastUpdate.Add(d.timeout).After(time.Now()) {return nil}log.Println("rpc registry: refresh servers from registry", d.registry)resp, err := http.Get(d.registry)if err != nil {log.Println("rpc registry refresh err:", err)return err}servers := strings.Split(resp.Header.Get("X-Geerpc-Servers"), ",")d.servers = make([]string, 0, len(servers))for _, server := range servers {if strings.TrimSpace(server) != "" {d.servers = append(d.servers, strings.TrimSpace(server))}}d.lastUpdate = time.Now()return nil
}// Get 通过 SelectMode 调度服务实例, 可以看到此处服用了 MultiServersDiscovery 当中的方法
func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {if err := d.Refresh(); err != nil {return "", err}return d.MultiServersDiscovery.Get(mode)
}// GetAll 返回所有服务实例, 仍然复用 MultiServersDiscovery 当中的方法
func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {if err := d.Refresh(); err != nil {return nil, err}return d.MultiServersDiscovery.GetAll()
}
在使用 Get 或 GetAll 方法获取服务实例之前,GeeRegistryDiscovery 会调用 Refresh 方法更新当前所维护的服务列表。在 Refresh 当中,要做的就是从注册中心 GeeRegistry 自动获取当前可用的服务列表,通过发送 Method 为 GET 的 HTTP Request 来完成。
至此,我们完成了 GeeRPC 当中注册中心和服务发现模块的回顾,并搞清楚了这两个模块具备哪些功能。
Part4:高性能客户端
我们现在已经知道,在 GeeRPC 框架下,基于注册中心和服务发现模块,可以同时有多个服务实例为发起 RPC 调用的客户端提供服务。GeeRPC 在支持并发与异步的客户端 Client 的基础上,实现了一个支持负载均衡的客户端 XClient,它包括服务发现模块 d、负载均衡模式 mode 以及通信协议选项 opt,为了尽可能地复用已经创建的客户端实例,XClient 还使用 clients 这个 map 对已经创建的实例进行保存。
XClient 的结构实现如下:
type XClient struct {d Discoverymode SelectModeopt *geerpc.Optionmu sync.Mutexclients map[string]*geerpc.Client
}// 目的是为了确保 geerpc.Client 实现了 io.Closer 接口, 目前还需要实现 Close 方法
var _ io.Closer = (*XClient)(nil)// NewXClient 是构造函数, 用于创建 XClient 实例
func NewXClient(d Discovery, mode SelectMode, opt *geerpc.Option) *XClient {return &XClient{d: d, mode: mode, opt: opt, clients: make(map[string]*geerpc.Client)}
}
XClient 直接暴露给用户的两个方法是 Call 和 Broadcast,前者基于服务发现模块的调度机制选择单个服务实例执行 Client 发起的 RPC 请求,而后者向所有服务实例广播 RPC 请求。我们先后从这两个暴露在外的方法对客户端的实现进行剖析。
Call 方法:客户端基于负载均衡策略选择单个服务实例执行 RPC 调用
需要首先明确的一点是,在 XClient 底层执行客户端功能的其实是 Client,XClient 通过一个 map 保存 Client,进而复用底层 Client 的方法。
XClient 的 Call 方法的实现如下:
// Call invokes the named function, waits for it to complete,
// and returns its error status.
// xc will choose a proper server.
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {rpcAddr, err := xc.d.Get(xc.mode) // Get 即服务发现模块的 Get 方法, 根据 mode 选取一个 Serverif err != nil {return err}return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}
第一行 rpcAddr, err := xc.d.Get(xc.mode)
得到的 rpcAddr 是一个 string 类型,它保存的内容就是 RPC Server 的地址。通过进一步调用 xc.call(rpcAddr, ctx, serviceMethod, args, reply)
来执行具体的请求。
在剖析 call 方法之前,我们先来研究一下 Call 的形参。ctx 是一个 context.Context
类型的变量,它的作用是为客户端引入了超时控制;serviceMethod 即客户端想要调用的具体的服务和方法,它是 string 类型,一个例子是 foo.Sum
;args 和 reply 是空接口类型,对于空接口类型的形参而言,在传入实参时可以传入任意类型的值,因此空接口很适合用来接收 RPC 调用的参数及返回值。对于 foo.Sum
方法,它的参数是一个 Args 类型,包含 Num1 和 Num2 两个成员,即两个要加在一起的数,为了尽可能地简化问题,加数都是 int 类型。reply 是一个 int 类型的指针,用于保存整型数相加的结果。在 Call 当中进一步调用 call 方法,根据 rpcAddr 执行具体的 RPC 调用。
call 方法的实现如下:
// call 方法用于向指定的 RPC 服务器地址发起调用
func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {client, err := xc.dial(rpcAddr)if err != nil {return err}return client.Call(ctx, serviceMethod, args, reply)
}
它首先调用了 XClient 的 dial 方法,dial 方法的实现如下:
// dial 方法用于连接到指定的 RPC 服务器地址
func (xc *XClient) dial(rpcAddr string) (*geerpc.Client, error) {xc.mu.Lock()defer xc.mu.Unlock()client, ok := xc.clients[rpcAddr]if ok && !client.IsAvailable() {_ = client.Close()delete(xc.clients, rpcAddr)client = nil}if client == nil {var err errorclient, err = geerpc.XDial(rpcAddr, xc.opt)if err != nil {return nil, err}xc.clients[rpcAddr] = client}return client, nil
}
我们可以看到,dial 方法的形参是 rpcAddr,返回值是一个 Client 类型的指针和 error。我刚才已经提到,为了尽可能地复用已经创建好的 Client 实例,XClient 使用一个 map 类型对 *Client
进行保存,map 的 key 是 string,对应 rpcAddr,value 就是 *Client
。剖析 dial 方法的行为,如果 map 当中保存着 Client 实例,并且这个实例没有过期,那么直接返回这个实例;否则,dial 会通过 XDial 创建一个新的 Client,命名为 client,并将其保存在 map 当中。最后将 client 返回。
XDial 比较复杂,它的功能是基于 rpcAddr 的形式,建立一个与 RPC Server 的客户端连接。XDial 支持多种协议,比如 HTTP 协议、TCP 协议、Unix 协议等。鉴于其较为复杂,我们首先研究一下 XDial 创建客户端的过程,再回到基于 XClient 完成一个客户端调用的解读。
XDial:一个较为通用的 RPC 客户端连接函数,支持多种协议
XDial 的实现如下:
// XDial calls different functions to connect to a RPC server
// according the first parameter rpcAddr.
// rpcAddr is a general format (protocol@addr) to represent a rpc server
// eg, http@10.0.0.1:7001, tcp@10.0.0.1:9999, unix@/tmp/geerpc.sock
func XDial(rpcAddr string, opts ...*Option) (*Client, error) {// XDial 是一个通用的 RPC 客户端连接函数, 支持多种协议. 它的主要功能是根据 rpcAddr 的格式解析协议和地址,// 并调用相应的底层连接函数来建立与 RPC 服务器的连接.parts := strings.Split(rpcAddr, "@")if len(parts) != 2 {return nil, fmt.Errorf("rpc client err: wrong format '%s', expected protocol@addr", rpcAddr)}protocol, addr := parts[0], parts[1]switch protocol {case "http":return DialHTTP("tcp", addr, opts...)default:return Dial(protocol, addr, opts...)}
}
它首先根据 @
对 rpcAddr 进行了拆分,@
之前是客户端请求连接 RPC Server 的协议,而 @
之后是客户端请求连接的 RPC Server 的具体的地址。返回的就是创建的客户端实例以及错误。
XDial 将客户端的创建划分为两种情况,分别是使用 HTTP 协议的情况和使用其它协议的情况。我们来看一下具体的实现:
// DialHTTP 是一个便捷函数, 用于通过 HTTP 协议连接到 RPC 服务器, 它封装了底层的连接建立和协议切换逻辑
func DialHTTP(network, address string, opts ...*Option) (*Client, error) {return dialTimeout(NewHTTPClient, network, address, opts...)
}func Dial(network, address string, opts ...*Option) (*Client, error) {// Dial: 对外暴露的客户端连接函数// 返回: 客户端实例和错误return dialTimeout(NewClient, network, address, opts...)
}
不难看出,无论是 Dial 还是 DialHTTP,最终都通过 dialTimeout 返回一个 Client 实例,区别在于第一个入参不同,DialHTTP 使用的是 NewHTTPClient,Dial 使用的是 NewClient。NewClient 和 NewHTTPClient 都是工厂函数,说明在 dialTimeout 当中要使用创建 Client 的工厂函数创建客户端实例。dialTimeout 的实现如下,它的作用是为客户端的创建套了一层壳,从而为客户端建立的过程引入超时处理机制,当客户端的创建超时时,需要记录超时错误:
// clientResult 封装了客户端连接的结果
type clientResult struct {client *Client // 成功连接后返回的客户端实例err error // 连接过程中发生的错误
}// dialTimeout 实现了带超时的客户端连接逻辑
// f: 创建客户端实例的函数
// network: 网络类型 (比如 tcp)
// address: 服务器地址
// opts: 连接选项 (可变参数)
func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (client *Client, err error) {opt, err := parseOptions(opts...) // 对传入的 opts 进行解析, 如果为空则使用默认的 optif err != nil {return nil, err}conn, err := net.DialTimeout(network, address, opt.ConnectTimeout) // 库函数调用// 👆 基于 opt 当中的 ConnectTimeout 创建 connif err != nil {return nil, err}// close the connection if client is nildefer func() {if err != nil {_ = conn.Close()}}()ch := make(chan clientResult) // 创建一个 clientResult 类型的通道, 用于接收连接结果// IMPORTANT: 使用 channel 的目的就是结合 select 完成超时控制go func() { // 启动 goroutine 创建客户端client, err := f(conn, opt)ch <- clientResult{client: client, err: err}}()if opt.ConnectTimeout == 0 { // 如果 ConnectTimeout 为 0, 则等待 goroutine 完成并返回结果result := <-chreturn result.client, result.err}select { // 否则, 使用 select 监听超时和结果通道case <-time.After(opt.ConnectTimeout): // 如果超时, 返回超时错误return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)case result := <-ch: // 如果收到结果, 返回客户端实例和错误return result.client, result.err}
}
dialTimeout 的形参 f 有两种可能的情况,一种是 NewHTTPClient,另一种是 NewClient。
我们之前已经提到过,GeeRPC 的服务端是支持 HTTP 协议的,即 Server 实际上实现了 ServeHTTP 这个方法,即实现了 http.Handler 接口。在 Server 的 ServeHTTP 方法当中,Server 通过 Hijack 对 HTTP 协议进行拦截,并截获底层的 TCP 连接,基于 TCP 连接,Server 将 HTTP 协议切换为了 RPC 协议。客户端要做的就是正确地向 Server 发起 CONNECT 方法的 Request,用于建立 HTTP 连接。因此 NewHTTPClient 的实现为:
// NewHTTPClient 函数用于创建一个基于 HTTP 的 RPC 客户端. 它通过已经建立的 TCP 连接 (conn) 与服务器进行 HTTP 协议
// 的握手, 并在握手成功后切换到 RPC 协议
func NewHTTPClient(conn net.Conn, opt *Option) (*Client, error) {// 通过 io.WriteString 向服务器发送一个 HTTP CONNECT 请求, 请求路径为 defaultRPCPath// 请求格式: CONNECT /_geerpc_ HTTP/1.0// 这个请求是告诉服务器, 客户端希望通过 HTTP 协议建立连接, 并切换到 RPC 协议_, _ = io.WriteString(conn, fmt.Sprintf("CONNECT %s HTTP/1.0\n\n", defaultRPCPath))// Require successful HTTP response// before switching to RPC protocal./*使用 http.ReadResponse 从连接中读取服务器的 HTTP 响应.bufio.NewReader(conn) 将 conn 包装成一个带缓冲的读取器, 以便逐行读取 HTTP 响应http.Request{Method: "CONNECT"} 是一个虚拟的 HTTP 请求对象, 用于告诉 http.ReadResponse 这是一个 CONNECT 请求的响应*/resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})if err == nil && resp.Status == connected {// 连接成功, 此时调用 NewClient(conn, opt) 新建一个 TCP 客户端并返回return NewClient(conn, opt)}if err == nil {err = errors.New("unexpected HTTP response: " + resp.Status)}return nil, err
}
可以看到,如何 HTTP 连接成功,那么最终仍然要创建一个 TCP 客户端并返回,通过 NewClient 创建一个 TCP 客户端。
Client:底层的客户端实例
NewClient 将会创建一个 Client 实例,而 Client 承载着一个 RPC 框架中客户端所有的基础功能。总结来说,一个 Client 的作用包括发送和接收消息,并记录一次 RPC 调用的完成状态。
Client 的结构定义及其工厂函数 NewClient 的实现如下:
type Client struct {cc codec.Codecopt *Optionsending sync.Mutex // protect followingheader codec.Headermu sync.Mutex // protect followingseq uint64pending map[uint64]*Callclosing bool // user has called Closeshutdown bool // server has told us to stop
}var _ io.Closer = (*Client)(nil) // Client 需要实现 Close 方法已满足 io.Closer 接口的要求func NewClient(conn net.Conn, opt *Option) (*Client, error) {f := codec.NewCodecFuncMap[opt.CodecType]if f == nil {err := fmt.Errorf("invalid codec type %s", opt.CodecType)log.Println("rpc client: codec error:", err)return nil, err}// send options with serverif err := json.NewEncoder(conn).Encode(opt); err != nil {log.Println("rpc client: options error: ", err)_ = conn.Close()return nil, err}return newClientCodec(f(conn), opt), nil
}func newClientCodec(cc codec.Codec, opt *Option) *Client {client := &Client{seq: 1,cc: cc,opt: opt,pending: make(map[uint64]*Call),}go client.receive()return client
}
NewClient 当中的 newClientCodec 创建了一个 Client ,在 goroutine 中开启 Client 的 receive 方法,并返回 Client。
Call 实例:承载一次 RPC 调用
可以看到,Client 当中包含了一个名为 pending 的字段(pending 保存的是目前尚未完成的 RPC 调用),它的类型是 key 为 uint64,value 为 *Call
的 map。Call 承载着一次具体的 RPC 调用,一个 Client 可以承载多个 Call,即基于一个 RPC 客户端可以发起多次 RPC 调用。Call 的结构定义如下,它只有一个 done 方法,用于标记当前 RPC 调用已经完成:
// in client/client.go
type Call struct {Seq uint64ServiceMethod string // format "<service>.<method>"Args interface{} // arguments to the funcReply interface{} // reply from the functionError error // if error oocurs, it will be setDone chan *Call // Strobes with call is complete
}func (call *Call) done() {call.Done <- call
}
Call 结构的字段包括:
- Seq:用于标识当前调用的序列号;
- ServiceMethod:格式为
"<service>.<method>"
,用于保存当前调用的服务名和方法名; - Args:当前 RPC 调用传入的参数;
- Reply:保存当前 RPC 调用的返回值;
- Error:记录 RPC 调用可能出现的错误;
- Done:这个字段比较有意思,它是一个
*Call
类型的 channel,在 done 这个 Call 唯一的方法中被使用。当一次调用完成后,使用 done 可以将 call 自身发送给 Done 这个 channel,它的作用是控制 RPC 调用的并发与异步,我将在之后 Client 的方法 Go 和 Call 当中进行详细解读。
回到 Client,实现 Client 的接收功能 receive
剖析完 Call 之后,我们回到 Client。在 Client 的工厂函数当中,开启了一个执行 Client 的 receive 方法的 goroutine。receive 实现的正是客户端的接收功能,接收到的响应包括三种情况:
- call 不存在,可能的原因是 RPC 调用的请求发送不完整;
- call 存在,但服务端处理出错,故 call 的 Error 字段不为空;
- call 存在,且没有出错,即服务端正常处理,此时需要从 stream 当中读取 Reply。
根据上述分析,receive 及其相关方法的实现如下:
func (client *Client) removeCall(seq uint64) *Call {client.mu.Lock()defer client.mu.Unlock()call := client.pending[seq]delete(client.pending, seq) // 内置的 delete 的第一个参数是 map, 第二个参数是键, 可以删除 map 中的键值对return call
}func (client *Client) terminateCalls(err error) {client.sending.Lock()defer client.sending.Unlock()client.mu.Lock()defer client.mu.Unlock()client.shutdown = truefor _, call := range client.pending {call.Error = errcall.done()}
}func (client *Client) receive() {var err errorfor err == nil {var h codec.Headerif err = client.cc.ReadHeader(&h); err != nil {break}call := client.removeCall(h.Seq)switch {case call == nil:err = client.cc.ReadBody(nil)case h.Error != "":call.Error = fmt.Errorf(h.Error)err = client.cc.ReadBody(nil)call.done()default:err = client.cc.ReadBody(call.Reply)if err != nil {call.Error = errors.New("reading body " + err.Error())}call.done()}}// err != nil 的时候client.terminateCalls(err)
}
需要明确的一点是,client.cc 的 ReadHeader 和 ReadBody 方法都是从输入流 stream 当中进行读取,如果没有消息到来,将会阻塞。receive 接受的就是来自服务端执行一次 RPC 调用的响应,首先从 Header 的 Seq 字段当中可以获取到当前响应对应 Client 的哪次 RPC 调用请求,找到具体的 RPC 调用,即 Call。根据 Call 的情况,进行不同的处理。
如果 call 为 nil,原因是 Server 回发的 Response 中 Header 保存的 Seq 与当前处于 pending 当中的 Seq 对应不上,原因可能是请求在发送给 Server 之后被 Client 取消,但 Server 仍旧处理了这个请求并回发了 Response。
如果 Header 当中包含错误,说明 RPC 调用出错,此时需要记录错误并标记此次 call 完成。
如果 call 不为空且没有出错,那么正常对返回值进行读取并标记本次 call 完成即可。返回值将会保存在 call 的 Reply 字段当中,用于进一步的处理。
当有错误发生时,Client 的 receive 方法将停止 for loop,并终止所用当前处于 pending 的 Call,停止接收消息。
同步进度,回到 XDial 和 call 方法
现在让我们来同步一下进度。在最开始,我们从 XClient 暴露给用户的 Call 方法入手,Call 方法首先通过 SelectMode 得到 rpcAddr,再通过 call 方法建立一个与 rpcAddr 的客户端。在 call 方法当中,通过 dial 建立了 client(dial 又会进一步调用 XDial,XDial 通过 DialHTTP 或 Dial 建立 TCP Client,从而引出了底层的 Client 对象及 Call 对象),现在我们就可以通过 client 发起一次 RPC 远程调用:
// call 方法用于向指定的 RPC 服务器地址发起调用
func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {client, err := xc.dial(rpcAddr)if err != nil {return err}return client.Call(ctx, serviceMethod, args, reply)
}
具体来说,在 call 当中使用 client.Call(ctx, serviceMethod, args, reply)
,即直接是有 Client 类型的 Call 方法完成一次 RPC 调用,传入的参数包括 ctx、ServiceMethod、args 和 reply。
Client 的 Call 方法:发起 RPC 调用并通过 Context 引入超时机制
Client 的 Call 方法实现如下:
// Call 是 RPC 客户端实现的核心部分, 用于发起远程调用并支持上下文 (context.Context) 的超时和取消功能
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {call := client.Go(serviceMethod, args, reply, make(chan *Call, 1)) // 发起一个异步的远程调用select {// 监听上下文的取消或超时, 以及调用结果的返回case <-ctx.Done():client.removeCall(call.Seq)return errors.New("rpc client: call failed: " + ctx.Err().Error())case call := <-call.Done:return call.Error}
}
在 Call 当中,直接通过 Go 发起一个异步的远程调用。Go 的实现如下:
// Go invokes the functions asynchronously.
// It returns the Call structure representing the invocation.
func (client *Client) Go(ServiceMethod string, args, reply interface{}, done chan *Call) *Call {if done == nil {done = make(chan *Call, 10) // buffered channel} else if cap(done) == 0 {log.Panic("rpc client: done channel is unbuffered")}call := &Call{ServiceMethod: ServiceMethod,Args: args,Reply: reply,Done: done,}client.send(call)return call
}
在 Go 当中,构建了一个 Call 实例 call,刚才我们已经提到,它承载了一次 RPC 调用,构建完成后,通过 Client 的 send 方法将 call 实例发送给 Server。send 及其相关方法的实现如下:
func (client *Client) registerCall(call *Call) (uint64, error) {client.mu.Lock()defer client.mu.Unlock()if client.closing || client.shutdown {return 0, ErrShutdown}call.Seq = client.seqclient.pending[call.Seq] = callclient.seq++return call.Seq, nil
}func (client *Client) send(call *Call) {// make sure that the client will send a complete requestclient.sending.Lock()defer client.sending.Unlock()// register this callseq, err := client.registerCall(call)if err != nil {call.Error = errcall.done()return}// prepare request headerclient.header.ServiceMethod = call.ServiceMethodclient.header.Seq = seqclient.header.Error = ""// encode and send the requestif err = client.cc.Write(&client.header, call.Args); err != nil {call = client.removeCall(seq)if call != nil {call.Error = errcall.done()}}
}
send 方法不难理解,它所做的就是构建了一个 Request,Header 当中包含的是 ServiceMethod、Seq 以及空的 Error,Body 包含的就是本次 RPC 调用的参数,将其通过 TCP Connection 发送给 Server,再通过 receive 等待 Server 回发的 Response 即可。
回到 Client 的 Call 方法,Go 的返回值是一个 Call 实例 call,Call 方法在得到 call 之后,通过 channel + select 实现超时机制,在此处我们就可以看到之前所说的 Call 对象当中 Done 这个 channel 的作用:
select {
// 监听上下文的取消或超时, 以及调用结果的返回
case <-ctx.Done():client.removeCall(call.Seq)return errors.New("rpc client: call failed: " + ctx.Err().Error())
case call := <-call.Done:return call.Error
}
如果 ctx 的 Done 先到达,代表超时;而如果 call 的 Done 先到达,代表本次 RPC 调用完成,返回 call 当中的错误即可。
至此,我们完整过了一遍 GeeRPC 当中 XClient 对外暴露的 Call 方法的工作流程,接下来我们来研究一下另一个对外暴露的 Broadcast 接口。
Broadcast 方法:客户端向所有可以用的 Server 进行 RPC 调用广播
XClient 另一个对外暴露的接口是 Broadcast。Broadcast 向所有 Server 广播 RPC 调用请求,如果其中一个实例发生错误,则返回错误,如果其中一个实例调用成功,则返回调用的结果。
Broadcast 方法的实现如下:
// Broadcast invokes the named function for every server registered in discovery
// Broadcast 方法用于向所有服务器广播调用
func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply interface{}) error {servers, err := xc.d.GetAll() // 获得所有服务器地址if err != nil {return err}var wg sync.WaitGroupvar mu sync.Mutexvar e errorreplyDone := reply == nilctx, cancel := context.WithCancel(ctx)for _, rpcAddr := range servers {wg.Add(1)go func(rpcAddr string) {defer wg.Done()var clonedReply interface{}if reply != nil {clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()}err := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)mu.Lock()if err != nil && e == nil {e = errcancel()}if err == nil && !replyDone {reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())replyDone = true}mu.Unlock()}(rpcAddr)}wg.Wait()return e
}
可以看到 Broadcast 方法的实现非常地简洁清晰,由于我们刚才已经对 Call 方法的实现进行了完整的解读,所以此处的 Broadcast 方法的实现细节不再赘述,它本质上就是借助服务发现模块的 GetAll 方法获取所有当前可用的服务实例,对每一个服务实例进行遍历,在每一个 Server 上发起 RPC 调用之前,通过反射复制一份 Reply,使得每一个传入的 Reply 都是传址调用。
需要注意的两个点是:
- Broadcast 方法借助 context.WithCancel 确保有错误发生时,快速失败;
- Broadcast 方法借助 WaitGroup 和 Mutex 进行并发控制,WaitGroup 用于追踪每一个承载 RPC 调用的 goroutine,Mutex 保护资源调用。
至此,我们完成了对 GeeRPC 高性能客户端的回顾。
Part5:从 main 函数出发完整地体验一次 GeeRPC 的使用
接下来到了实践环节,我们借助 Geektutu 在 Day7 给出的 Demo,完整地体验一次 GeeRPC 框架。
完整的项目目录如下:
main.go
的实现如下:
package mainimport ("Geektutu/GeeRPC/geerpc""Geektutu/GeeRPC/geerpc/registry""Geektutu/GeeRPC/geerpc/xclient""context""log""net""net/http""sync""time"
)type Foo inttype Args struct{ Num1, Num2 int }func (f Foo) Sum(args Args, reply *int) error {*reply = args.Num1 + args.Num2return nil
}func (f Foo) Sleep(args Args, reply *int) error {time.Sleep(time.Second * time.Duration(args.Num1))*reply = args.Num1 + args.Num2return nil
}func startRegistry(wg *sync.WaitGroup) {l, _ := net.Listen("tcp", ":9999")registry.HandleHTTP()wg.Done()_ = http.Serve(l, nil)
}func startServer(registryAddr string, wg *sync.WaitGroup) {var foo Fool, _ := net.Listen("tcp", ":0")server := geerpc.NewServer()_ = server.Register(&foo)registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)wg.Done()server.Accept(l)
}func foo(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, args *Args) {var reply intvar err errorswitch typ {case "call":err = xc.Call(ctx, serviceMethod, args, &reply)case "broadcast":err = xc.Broadcast(ctx, serviceMethod, args, &reply)}if err != nil {log.Printf("%s %s error: %v", typ, serviceMethod, err)} else {log.Printf("%s %s success: %d + %d = %d", typ, serviceMethod, args.Num1, args.Num2, reply)}
}func call(registry string) {d := xclient.NewGeeRegistryDiscovery(registry, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer func() { _ = xc.Close() }()// send request & receive responsevar wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()foo(xc, context.Background(), "call", "Foo.Sum", &Args{Num1: i, Num2: i * i})}(i)}wg.Wait()
}func broadcast(registry string) {d := xclient.NewGeeRegistryDiscovery(registry, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer func() { _ = xc.Close() }()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()foo(xc, context.Background(), "broadcast", "Foo.Sum", &Args{Num1: i, Num2: i * i})// expect 2 - 5 timeoutctx, _ := context.WithTimeout(context.Background(), time.Second*2)foo(xc, ctx, "broadcast", "Foo.Sleep", &Args{Num1: i, Num2: i * i})}(i)}wg.Wait()
}func main() {log.SetFlags(0)registryAddr := "http://localhost:9999/_geerpc_/registry"var wg sync.WaitGroupwg.Add(1)go startRegistry(&wg)wg.Wait()time.Sleep(time.Second)wg.Add(2)go startServer(registryAddr, &wg)go startServer(registryAddr, &wg)wg.Wait()time.Sleep(time.Second)call(registryAddr)broadcast(registryAddr)
}
我们依然从 main 函数体出发。
首先,通过指定 registryAddr := "http://localhost:9999/_geerpc_/registry"
,确定了 GeeRPC 的 GeeRegistry 的服务地址。
之后,通过开启一个 startRegistry 的 goroutine 启动 GeeRegistry。由于 GeeRegistry 通过 HTTP 提供服务,因此需要将 GeeRegistry 的启动放在一个 goroutine 当中,这样 http.Serve
方法才会并发地监听请求。
然后。通过 startServer 开启两个 RPC Server,startServer 的实现如下:
func startServer(registryAddr string, wg *sync.WaitGroup) {var foo Fool, _ := net.Listen("tcp", ":0")server := geerpc.NewServer()_ = server.Register(&foo)registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)wg.Done()server.Accept(l)
}
在 net.Listen
方法中,如果指定端口号为 0,那么系统将随机分配一个可用的端口号,作为 Server 提供服务的端口。通过 Registry 将名为 Foo 的服务注册到 Server 当中,Foo 提供两种方法,分别是 Sum 和 Sleep。注册服务之后,通过 Heartbeat 向 GeeRegistry 发送心跳,将 Server 注册到 Registry 当中。最后通过 Server 的 Accept 方法接收 Listener,开启 goroutine 在指定的端口进行服务监听。
最后,客户端的服务调用封装在了 call 和 broadcast 函数当中。我们仅以 call 为例,对 call 进行剖析,broadcast 与 call 的工作机制基本相同。call 的实现如下:
func call(registry string) {d := xclient.NewGeeRegistryDiscovery(registry, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer func() { _ = xc.Close() }()// send request & receive responsevar wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()foo(xc, context.Background(), "call", "Foo.Sum", &Args{Num1: i, Num2: i * i})}(i)}wg.Wait()
}
在 call 中,我们要做的是使用 XClient 对外暴露的 Call 方法实现一次 RPC 调用。因此 call 函数的前两行依次创建了服务发现模块并使用服务发现模块创建 XClient 对象。之后通过 foo 函数进行远程调用,foo 的实现如下:
func foo(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, args *Args) {var reply intvar err errorswitch typ {case "call":err = xc.Call(ctx, serviceMethod, args, &reply)case "broadcast":err = xc.Broadcast(ctx, serviceMethod, args, &reply)}if err != nil {log.Printf("%s %s error: %v", typ, serviceMethod, err)} else {log.Printf("%s %s success: %d + %d = %d", typ, serviceMethod, args.Num1, args.Num2, reply)}
}
如果 switch case 为 call,那么 foo 函数将调用 XClient 的 Call 方法进行远程调用。
现在让我们在 Demo 的基础上,构建一个更加复杂的服务,并注册到服务中心。
我们首先定义一个名为 YGGP 的服务,并为其注册名为 LengthOfLongestSubstring 的方法,它对应的是 LeetCode 3. 无重复字符的最长子串的解决方案。同时定义 YGGPArgs 保存输入的参数。
type YGGP struct{}type YGGPArgs struct {Str string
}func (yggp YGGP) LengthOfLongestSubstring(args YGGPArgs, reply *int) error {s := args.Strvar slow, fast, length intmp := make(map[byte]int)slow, fast, length = 0, 0, len(s)for fast < length {mp[s[fast]]++for mp[s[fast]] > 1 && slow < fast {mp[s[slow]]--slow++}fast++*reply = max(*reply, fast-slow)}return nil
}
之后我们新建一个服务实例并注册 YGGP 服务:
func startYGGPServer(registryAddr string, wg *sync.WaitGroup) {var yggp YGGPl, _ := net.Listen("tcp", ":0")server := geerpc.NewServer()_ = server.Register(&yggp)registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)wg.Done()server.Accept(l)
}
然后我们新建一个 YGGP 的 XClient,进行 LengthOfLongestSubstring 方法的 RPC 调用:
func yggp(xc *xclient.XClient, ctx context.Context, typ, serviceMethod string, args *YGGPArgs) {var reply intvar err errorswitch typ {case "call":err = xc.Call(ctx, serviceMethod, args, &reply)case "broadcast":err = xc.Broadcast(ctx, serviceMethod, args, &reply)}if err != nil {log.Printf("%s %s error: %v", typ, serviceMethod, err)} else {log.Printf("%s %s success: LengthOfLongestSubstring(%s) is %d", typ, serviceMethod, args.Str, reply)}
}func yggpcall(registry string) {TestStr := []string{"abcabcbb", "abcabcbbabcabcbb", "bbbbb", "pwwkew", "pwwkewpwwkew"}d := xclient.NewGeeRegistryDiscovery(registry, 0)xc := xclient.NewXClient(d, xclient.RandomSelect, nil)defer func() { _ = xc.Close() }()var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {defer wg.Done()yggp(xc, context.Background(), "call", "YGGP.LengthOfLongestSubstring", &YGGPArgs{Str: TestStr[i],})}(i)}wg.Wait()
}
最后我们在 main 函数中使用我们刚才注册的方法:
func main() {log.SetFlags(0)registryAddr := "http://localhost:9999/_geerpc_/registry"var wg sync.WaitGroupwg.Add(1)go startRegistry(&wg)wg.Wait()time.Sleep(time.Second)wg.Add(1)//go startServer(registryAddr, &wg)//go startServer(registryAddr, &wg)go startYGGPServer(registryAddr, &wg)wg.Wait()time.Sleep(time.Second)yggpcall(registryAddr)//call(registryAddr)//broadcast(registryAddr)
}
运行结果如下:
rpc registry path: /_geerpc_/registry
rpc server: register YGGP.LengthOfLongestSubstring
tcp@[::]:53762 send heart beat to registry http://localhost:9999/_geerpc_/registry
rpc registry: refresh servers from registry http://localhost:9999/_geerpc_/registry
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(abcabcbbabcabcbb) is 3
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(abcabcbb) is 3
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(pwwkewpwwkew) is 4
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(bbbbb) is 1
call YGGP.LengthOfLongestSubstring success: LengthOfLongestSubstring(pwwkew) is 3
可以看到,我们注册的方法可以得到正确的结果,实验成功。
至此,我前前后后花了三天的实践,完整地总结了 GeeRPC 项目。
相关文章:
【GeeRPC】项目总结:使用 Golang 实现 RPC 框架
文章目录 项目总结:使用 Golang 实现 RPC 框架谈谈 RPC 框架什么是 RPC 框架实现一个 RPC 框架需要什么?项目总结文章结构安排 Part1:消息编码编解码器的实现通信过程 Part2:服务端Accept:阻塞地等待连接请求并开启 go…...
人工智能在医疗影像诊断中的应用与挑战
引言 近年来,人工智能(AI)技术在医疗领域的应用逐渐成为研究热点,尤其是在医疗影像诊断方面。AI技术的引入为医疗影像诊断带来了更高的效率和准确性,有望缓解医疗资源紧张的问题,同时为患者提供更优质的医疗…...
烧结银技术赋能新能源汽车超级快充与高效驱动
烧结银技术赋能新能源汽车超级快充与高效驱动 在新能源汽车领域,高压快充技术的突破与高功率密度驱动系统的创新正成为行业竞争的焦点。比亚迪于 2025 年发布的超级 e 平台,通过整合全域千伏高压架构、兆瓦级闪充技术及碳化硅(SiC࿰…...
大模型幻觉产生的【九大原因】
知识问答推理幻觉产生的原因 1.知识库结构切割不合理 大段落切割向量化 切分太小可以实现更精准化的回复内,向量匹配相似度越高。检索内容碎片化严重、可能包含不符合内容的文本数据。切分太大内容资料更完整,但是会影响相似度,同时更消耗资…...
4小时速通shell外加100例
🔥 Shell 基础——从入门到精通 🚀 🌱 第一章:Shell,简单说! 👶 什么是Shell?它到底能做什么?这章让你快速了解Shell的强大之处! 👶 什么是Shell…...
AD(Altium Designer)更换PCB文件的器件封装
一、确定是否拥有想换的器件PCB封装 1.1 打开现有的原理图 1.2 确定是否拥有想换的器件PCB文件 1.2.1 如果有 按照1.3进行切换器件PCB封装 1.2.2 如果没有 按照如下链接进行添加 AD(Altium Designer)已有封装库的基础上添加器件封装-CSDN博客https://blog.csdn.net/XU15…...
Postgresql 删除数据库报错
1、删除数据库时,报错存在其他会话连接 ## 错误现象,存在其他的会话连接正在使用数据库 ERROR: database "cs" is being accessed by other users DETAIL: There is 1 other session using the database.2、解决方法 ## 终止被删除数据库下…...
人工智能时代——深度探索如何构建开放可控的专利生态体系
# 人工智能时代——深度探索如何构建开放可控的专利生态体系 引言:AI专利革命的战略抉择第一章 战略认知与基本原则1.1 人工智能专利革命的范式重构1.1.1 技术维度变革1.1.2 法律维度挑战1.1.3 文明安全的不可控风险 1.2 战略定位体系构建1.2.1 双循环治理框架的立体…...
✨【数据变形术:联合体在通信协议中的降维打击】✨
(万字长文详解联合体的二进制魔法与工程实践) 🔮 原理解析:内存空间的量子叠加态 文字叙述: 联合体(union)是C语言中最具魔法的数据结构,其所有成员共享同一块内存空间。这种特性使…...
docker compose部署minio报错
背景 部分服务使用docker-compose单节点编排,其中对象存储服务使用minio,在minio中配置了aksk后报错 Error: IAM sub-system is partially initialized, unable to write the IAM forma 解决 minio如果配置了aksk等iam类的配置则需要持久化存储到etcd…...
软件开发通用之状态机初认识-基本概念及简单应用
0 前言 在程序开发阶段(其实也不限于程序,还包含硬件电路设计,协议设计等),无论使用何种语言,何种工具,何种系统,程序的运行必须符合开发者的预设逻辑,而单独通过大脑记…...
蓝桥杯 之 第27场月赛总结
文章目录 习题1.抓猪拿国一2.蓝桥字符3.蓝桥大使4.拳头对决 习题 比赛地址 1.抓猪拿国一 十分简单的签到题 print(sum(list(range(17))))2.蓝桥字符 常见的字符匹配的问题,是一个二维dp的问题,转化为对应的动态规划求解 力扣的相似题目 可以关注灵神…...
适配器模式 (Adapter Pattern)
适配器模式 (Adapter Pattern) 是一种结构型设计模式,它将一个类的接口转换成客户希望的另外一个接口,使得原本由于接口不兼容而不能一起工作的类可以一起工作。 在现实生活中,适配器的例子随处可见,比如电源适配器,它将不同电压的电流转换为设备所需的电压,确保设备能正…...
操作系统WIN11无法出现WLAN图标(解决方案)
本人操作系统WIN11之后无网络图标 于是在设置里查看了一下,是网卡驱动没了 网上去下载一个驱动类软件自行处理即可。 本人使用手机USB网络连的电脑,然后发现网卡驱动凭空出现了,就很困惑,没有下载驱动就恢复了。...
HCL—我与虚拟机的爱恨情仇[特殊字符][特殊字符][特殊字符]️
时隔了三周,我可能算是了解了虚拟机了吧。自从上一次的安装虚拟机,我与HCL、虚拟机就没有停止过纠缠。 为什么很多win11电脑使用不了HCL,或者无法启动HCL设备? 首先来解答,为什么很多win11电脑使用不了HCL,…...
illustrate:一款蛋白/核酸结构快速渲染为“卡通风格”的小工具
本期向大家介绍一款蛋白/核酸结构快速渲染(卡通风格)的小工具——illustrate。放心!本期完全不涉及代码,不折腾人,请放心食用。 结构渲染效果示例如下: PDB ID: 1ttt 该小工具适用绘制蛋白或复合物整体轮廓…...
Linux上位机开发实战(能用的开发板计算资源)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 大家所能想到的嵌入式上位机开发,如果是linux,同时涉及到嵌入式的话,一般都会认为是把pc linux的软件port到板子…...
kotlin 内联函数 inline
高阶函数实现的原理:函数类型其实是生成了一个对象 。 inline翻译成中文的意思就是内联,在kotlin里面inline被用来修饰函数,表明当前函数在编译时是以内嵌的形式进行编译的,从而减少了一层函数调用栈: inline fun fun…...
vue3配置代理实现axios请求本地接口返回PG库数据【前后端实操】
前端编写 安装 axios 如果当前未安装axios,可以执行如下指令安装 npm install axios配置代理 当前为基于Vite构建的项目,在 vite.config.ts 中配置代理,在defineConfig中新增server配置,主要关注两个点: 一、需要代…...
论文阅读:2023 arxiv Multiscale Positive-Unlabeled Detection of AI-Generated Texts
总目录 大模型安全相关研究:https://blog.csdn.net/WhiffeYF/article/details/142132328 Multiscale Positive-Unlabeled Detection of AI-Generated Texts https://arxiv.org/abs/2305.18149 https://www.doubao.com/chat/2114270649152258 https://github.com/YuchuanTi…...
【数学建模】最大最小值模型详解
数学建模中的最大最小值模型详解 文章目录 数学建模中的最大最小值模型详解引言最大最小值模型的基本概念最大化问题最小化问题 常见的求解方法1. 微积分法2. 线性规划3. 非线性规划4. 动态规划 实际应用案例案例1:生产规划问题案例2:投资组合优化 最大最…...
Camera2 实现重力感应四个方向调试相机预览
Camera2API 实现重力感应四个方向调试相机预览 文章目录 需求场景 需求实现setAspectRatio 设置显示长宽postScale postRotate 设置缩放和旋转manager.openCamera 打开相机startPreviewgetPreviewRequestBuilder 设置预览参数:createCaptureSession 预览准备工作set…...
C++::多态
目录 一.多态的概念 二.多态的定义及实现 二.1多态的构成条件 二.2虚函数 1.虚函数的写法 2.虚函数的重写/覆盖 3.协变 二.3析构函数的重写 二.4override和final关键字 编辑二.5重载/重写/隐藏的对比 三.多态的运行原理(一部分) 四.多态的常…...
278.缀点成线
1232. 缀点成线 - 力扣(LeetCode) class Solution {public boolean checkStraightLine(int[][] coordinates) {if(coordinates.length2){return true;}int xcoordinates[1][0]-coordinates[0][0];int ycoordinates[1][1]-coordinates[0][1];for(int i1;i…...
xssgame第8关注入详解
1.SVG利用实现xss攻击 1.代码如下: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>tes…...
《数据库原理》SQLServer期末复习_题型+考点
目录 题型: 一. 概况分析题(5小题,每小题2分,共10分) 二. 计算题(3小题,每小题5分,共15分) 三. 数据库设计(2小题,每小题10分,共2…...
RK3588开发笔记-RTL8852wifi6模块驱动编译报错解决
目录 前言 一、问题背景 二、驱动编译 总结 前言 在基于 RK3588 进行开发,使用 RTL8852 WiFi6 模块时,遇到了一个让人头疼的驱动编译报错问题:“VFs_internal_I_am_really_a_filesystem_and_am_NoT_a_driver, but does”。经过一番摸索和尝试,最终成功解决了这个问题,在…...
机器学习算法实战——天气数据分析(主页有源码)
✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ 1. 引言 天气数据分析是气象学和数据科学交叉领域的一个重要研究方向。随着大数据技术的发展,气象数据的采集、存储和分…...
java项目之基于ssm的毕业论文管理系统(源码+文档)
项目简介 毕业论文管理系统实现了以下功能: 本毕业论文管理系统主要实现的功能模块包括学生模块、导师模块和管理员模块三大部分,具体功能分析如下: (1)导师功能模块:导师注册登录后主要功能模块包括个人…...
【Vue3入门1】02- vue3的基本操作(上)
本文介绍vue3中的一些方法的操作。 目录 1. 绑定事件 v-on 2. 按键修饰符 3. 显示和隐藏 v-show 4. 条件渲染 v-if 5. 条件渲染if-else 1. 绑定事件 v-on 点击事件 v-on:click" 发生事件 " <body><div id"app">{{ msg }} <h2&g…...
Redis集群搭建和高可用方案(Java实现)
Redis集群搭建和高可用方案(Java实现) 我将详细介绍如何使用Java技术搭建Redis集群并实现高可用方案。 1. Redis集群架构概述 Redis集群可以通过以下几种方式实现: 主从复制Sentinel哨兵模式Redis Cluster集群模式2. 使用Java实现Redis集群连接 2.1 使用Jedis客户端 Je…...
【大模型算法工程】大模型应用工具化、忠诚度以及知识库场景下PDF双栏解析问题的讨论
1. 大模型时代应用工具化以及无忠诚度现象讨论 接触大模型久了,也慢慢探到一些大模型能力表现非常自然和突出的场景,比如AI搜索(依赖大模型的理解总结能力)、AI对话(即chat,依赖大模型的生成能力࿰…...
Rust语言学习
Rust语言学习 通用编程概念所有权所有权引用和借用slice struct(结构体)定义并实例化一个结构体使用结构体方法语法 枚举 enums定义枚举match控制流运算符if let 简单控制流 使用包、Crate和模块管理不断增长的项目(模块系统)包和crate定义模块来控制作用…...
AI比人脑更强,因为被植入思维模型【16】反脆弱
毛选中就有言,不经历困难,我们就不会掌握战胜困难的方法。 这个世界纷繁复杂,不是强者总是运气好,而是他们能够失败后快速复原,不断找到战胜困难的方法。 定义 马斯洛需求层次模型是一种将人类需求从低到高按层次进…...
系统架构设计知识体系总结
1.技术选型 1.什么是技术选型? 技术选型是指评估和选择在项目或系统开发中使用的最合适的技术和工具的过程。这涉及考虑基于其能力、特性、与项目需求的兼容性、可扩展性、性能、维护和其他因素的各种可用选项。技术选型的目标是确定与项目目标相符合、能够有效解…...
计算机视觉的多模态模型
计算机视觉的多模态模型 是指能够同时处理和理解 多种类型数据(模态) 的模型。这些模态可以包括图像、文本、音频、视频、深度信息等。多模态模型的核心目标是利用不同模态之间的互补信息,提升模型的性能和泛化能力。 1. 多模态模型的核心思想…...
Scrapy 入门教程
Scrapy 入门教程 Scrapy 是一个用于爬取网站数据的 Python 框架,功能强大且易于扩展。本文将介绍 Scrapy 的基本概念、安装方法、使用示例,并展示如何编写一个基本的爬虫。 1. 什么是 Scrapy? Scrapy 是一个开源的、用于爬取网站数据的框架…...
Oracle OCP认证是否值得考?
Oracle OCP(Oracle Certified Professional)认证是数据库领域的传统权威认证,但随着云数据库和开源技术的崛起,其价值正面临分化。是否值得考取,需结合你的职业定位、行业需求及长期规划综合判断。以下是关键分析&…...
OpenCV中距离公式
一、各类距离公式总结 常见距离公式 欧氏距离: 曼哈顿距离(L1): 切比雪夫距离(Chessboard): 1、点与点距离(欧氏距离) 二维空间 设两点坐标为 P1(x1,y1)、P2(x2,y2),其距离…...
DeepSeek自学手册:《从理论(模型训练)到实践(模型应用)》|73页|附PPT下载方法
导 读INTRODUCTION 今天分享是由ai呀蔡蔡团队带来的DeepSeek自学手册:《从理论(模型训练)到实践(模型应用)》,这是一篇关于DeepSeek模型训练、应用场景及替代方案的综合指南文章,主要介绍了Deep…...
Doris官网上没有的一些Fe参数了,都在源码中
一、FE配置源码 apache-doris-src\fe\fe-common\src\main\java\org\apache\doris\common\Config.java 二、BE配置源码 apache-doris-src\be\src\common\config.cpp 三、FE源码 package org.apache.doris.common;public class Config extends ConfigBase {ConfField(descri…...
(一)丶Windows安装RabbitMQ可能会遇到的问题
一丶可能会忘了配置ERLang的环境变量 二丶执行命令时报错 第一步 rabbitmq-plugins enable rabbitmq_management 第二部 rabbitmqctl status 三丶修改.erlang.cookie 文件 1.找到C盘目下的.erlang.cookie文件 C:\Users\admin\.erlang.cookie C:\Windows\System32\config\sys…...
stm32g030移植RT-Thread
移植流程 移植前需要安装Keil.STM32G0xx_DFP.1.2.0.pack组件,大致的移植过程: CubeMX配置RT-Thread组件配置工程模板配置 参考例程配置:拷贝仓库原有的stm32g070-st-nucleo工程,然后另起一个名字,目录结构如下 完整…...
Parsing error: Unexpected token, expected “,“
今天在使用Trae AI 编程工具开发大文件切片上传功能,使用的是VUE3,TS技术栈,开发完成运行时,编译报错(Parsing error: Unexpected token, expected ","),让AI自行修复此问题多次后还是没有解决&a…...
Day23: 数组中数字出现的次数
整数数组 sockets 记录了一个袜子礼盒的颜色分布情况,其中 sockets[i] 表示该袜子的颜色编号。礼盒中除了一款撞色搭配的袜子,每种颜色的袜子均有两只。请设计一个程序,在时间复杂度 O(n),空间复杂度O(1) 内找到这双撞色搭配袜子的…...
目标检测——清洗数据
清洗VOC格式数据集代码示例 import os import xml.etree.ElementTree as ETdef process_annotations(image_folder, annotation_folder):# 遍历标签文件夹中的所有XML文件for xml_file in os.listdir(annotation_folder):if not xml_file.endswith(.xml):continuexml_path os…...
嵌入式基础知识学习:UART是什么?
UART(Universal Asynchronous Receiver/Transmitter,通用异步收发传输器)是一种广泛应用于嵌入式系统和通信设备的异步串行通信协议。它通过两根数据线(TX和RX)实现设备间的全双工数据传输,无需共享时钟信号…...
SpringBoot项目实战(初级)
目录 一、数据库搭建 二、代码开发 1.pom.xml 2.thymeleaf模块处理的配置类 3.application配置文件 4.配置(在启动类中) 5.编写数据层 ②编写dao层 ③编写service层 接口 实现类 注意 补充(注入的3个注解) 1.AutoWir…...
合成层优化
以下是关于 合成层(Composite Layer)优化 的系统梳理,涵盖基础原理、触发条件、优化策略及进阶实践,帮助深入理解如何通过分层渲染提升页面性能: 一、合成层基础概念 1. 什么是合成层? 定义:浏览器将页面元素提升为独立的图形层(Graphics Layer),由 GPU 单独处理,避…...
什么是MCP|工作原理是什么|怎么使用MCP|图解MCP
写在前面 Manus的爆火似乎推动了MCP的出圈,虽然Manus没有用MCP。这篇文章我们就讲讲MCP,当然我也是最近才学习到MCP的,如果理解有误的地方,欢迎评论区指出! 1. 为什么需要MCP? 1.1 LLM 现状 我们都知道…...