背景
RPC框架注重性能,目前RPC框架使用的是
Vert.x
的io.vertx.core.http.HttpServer
作为服务提供者服务器。优点是代码简单;缺点是底层网络传输使用的是HTTP协议,HTTP协议头部信息和请求响应格式较重,影响网络传输性能。(HTTP协议会有大量请求和响应标头)
基于上述问题,可以自定义一套RPC协议,可以利用TCP等传输层协议,自定义请求响应格式,实现性能更高、更灵活、更安全的RPC协议。
网络传输
网络传输:选择一个高性能通信的网络协议和传输方式。
HTTP协议协议头比较大,并且HTTP本身属于无状态协议,每个HTTP请求都是独立的,每次请求/响应都需要重新建立和关闭链接,影响性能
。
为了解决上述问题,HTTP/1.1引入持久连接(Keep-Alive),允许单个TCP连接上发送多个HTTP请求和响应,避免了每次请求都要重新建立和关闭连接的开销。
HTTP是应用层协议,性能不如传输层TCP协议性能高。因此选择TCP协议完成网络传输。
消息结构
消息结构:用最少的空间传输需要的信息。
自定义消息结构时,为了节省空间,需要使用轻量的类型,如byte字节类型,只需要1个字节(8个bit位)。
消息结构
:
除此之外,还需要
- 请求id:唯一标识某个请求,TCP双向通信,标识成对出现的请求与响应对;
- 请求体:发送的消息内容;
- 请求体数据长度:保证能够完整地获取请求体内容信息。
请求头总长17字节。本质上就是一个字节数组。
代码实现
消息结构
协议消息结构
/*** 协议消息结构*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage<T> {/*** 消息头*/private Header header;/*** 消息体(请求或响应对象)*/private T body;/*** 协议消息头*/@Datapublic static class Header {/*** 魔数,保证安全性*/private byte magic;/*** 版本号*/private byte version;/*** 序列化器*/private byte serializer;/*** 消息类型(请求 / 响应)*/private byte type;/*** 状态*/private byte status;/*** 请求 id*/private long requestId;/*** 消息体长度*/private int bodyLength;}}
各类常量与枚举
/*** 协议常量*/
public interface ProtocolConstant {/*** 消息头长度*/int MESSAGE_HEADER_LENGTH = 17;/*** 协议魔数*/byte PROTOCOL_MAGIC = 0x1;/*** 协议版本号*/byte PROTOCOL_VERSION = 0x1;
}
/*** 协议消息的状态枚举*/
@Getter
public enum ProtocolMessageStatusEnum {OK("ok", 20),BAD_REQUEST("badRequest", 40),BAD_RESPONSE("badResponse", 50);private final String text;private final int value;ProtocolMessageStatusEnum(String text, int value) {this.text = text;this.value = value;}/*** 根据 value 获取枚举** @param value* @return*/public static ProtocolMessageStatusEnum getEnumByValue(int value) {for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()) {if (anEnum.value == value) {return anEnum;}}return null;}
}
/*** 协议消息的类型枚举*/
@Getter
public enum ProtocolMessageTypeEnum {REQUEST(0),RESPONSE(1),HEART_BEAT(2),OTHERS(3);private final int key;ProtocolMessageTypeEnum(int key) {this.key = key;}/*** 根据 key 获取枚举** @param key* @return*/public static ProtocolMessageTypeEnum getEnumByKey(int key) {for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()) {if (anEnum.key == key) {return anEnum;}}return null;}
}
package com.starlink.codec.protocol;import cn.hutool.core.util.ObjectUtil;
import lombok.Getter;import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;/*** 协议消息的序列化器枚举*/
@Getter
public enum ProtocolMessageSerializerEnum {JDK(0, "jdk"),JSON(1, "json"),KRYO(2, "kryo"),HESSIAN(3, "hessian");private final int key;private final String value;ProtocolMessageSerializerEnum(int key, String value) {this.key = key;this.value = value;}/*** 获取值列表** @return*/public static List<String> getValues() {return Arrays.stream(values()).map(item -> item.value).collect(Collectors.toList());}/*** 根据 key 获取枚举** @param key* @return*/public static ProtocolMessageSerializerEnum getEnumByKey(int key) {for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {if (anEnum.key == key) {return anEnum;}}return null;}/*** 根据 value 获取枚举** @param value* @return*/public static ProtocolMessageSerializerEnum getEnumByValue(String value) {if (ObjectUtil.isEmpty(value)) {return null;}for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {if (anEnum.value.equals(value)) {return anEnum;}}return null;}
}
TCP网络传输
1)TCP服务器实现
vertx.createNetServer()
public class VertxTcpServer implements HttpServer {private byte[] handleRequest(byte[] requestData) {return "Hello, client!".getBytes();}@Overridepublic void doStart(int port) {// 创建 Vert.x 实例Vertx vertx = Vertx.vertx();// 创建 TCP 服务器NetServer server = vertx.createNetServer();// 处理请求server.connectHandler(socket -> {// 处理连接socket.handler(buffer -> {// 处理接收到的字节数组byte[] requestData = buffer.getBytes();// 在这里进行自定义的字节数组处理逻辑,比如解析请求、调用服务、构造响应等byte[] responseData = handleRequest(requestData);// 发送响应socket.write(Buffer.buffer(responseData));});});// 启动 TCP 服务器并监听指定端口server.listen(port, result -> {if (result.succeeded()) {System.out.println("TCP server started on port " + port);} else {System.err.println("Failed to start TCP server: " + result.cause());}});}public static void main(String[] args) {new VertxTcpServer().doStart(8888);}
}
socket.write向连接的客户端发送数据,数据格式为Buffer是Vert.x提供的字节数据缓冲实现。
2)TCP客户端实现
vertx.createNetClient()
public class VertxTcpClient {public void start() {// 创建 Vert.x 实例Vertx vertx = Vertx.vertx();vertx.createNetClient().connect(8888, "localhost", result -> {if (result.succeeded()) {System.out.println("Connected to TCP server");io.vertx.core.net.NetSocket socket = result.result();// 发送数据socket.write("Hello, server!");// 接收响应socket.handler(buffer -> {System.out.println("Received response from server: " + buffer.toString());});} else {System.err.println("Failed to connect to TCP server");}});}public static void main(String[] args) {new VertxTcpClient().start();}
}
3)编码 / 解码器
Vert.x的TCP服务器收发消息都是Buffer类型,需要编解码器实现Java对象与Buffer进行转换。
交互逻辑如下:
编码器:依次向Buffer缓冲区写入消息对象属性
。
public class ProtocolMessageEncoder {/*** 编码*/public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {if (protocolMessage == null || protocolMessage.getHeader() == null) {return Buffer.buffer();}ProtocolMessage.Header header = protocolMessage.getHeader();// 依次向缓冲区写入字节Buffer buffer = Buffer.buffer();buffer.appendByte(header.getMagic());buffer.appendByte(header.getVersion());buffer.appendByte(header.getSerializer());buffer.appendByte(header.getType());buffer.appendByte(header.getStatus());buffer.appendLong(header.getRequestId());// 获取序列化器ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());if (serializerEnum == null) {throw new RuntimeException("序列化协议不存在");}Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());// 写入 body 长度和数据buffer.appendInt(bodyBytes.length);buffer.appendBytes(bodyBytes);return buffer;}
}
解码器:依次从Buffer缓冲区指定位置读取数据,构造完整消息对象
。
/*** 协议消息解码器*/
public class ProtocolMessageDecoder {/*** 解码*/public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {// 分别从指定位置读出 BufferProtocolMessage.Header header = new ProtocolMessage.Header();byte magic = buffer.getByte(0);// 校验魔数if (magic != ProtocolConstant.PROTOCOL_MAGIC) {throw new RuntimeException("消息 magic 非法");}header.setMagic(magic);header.setVersion(buffer.getByte(1));header.setSerializer(buffer.getByte(2));header.setType(buffer.getByte(3));header.setStatus(buffer.getByte(4));header.setRequestId(buffer.getLong(5));header.setBodyLength(buffer.getInt(13));// 解决粘包问题,只读指定长度的数据byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());// 解析消息体ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());if (serializerEnum == null) {throw new RuntimeException("序列化消息的协议不存在");}Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());if (messageTypeEnum == null) {throw new RuntimeException("序列化消息的类型不存在");}switch (messageTypeEnum) {case REQUEST:RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);return new ProtocolMessage<>(header, request);case RESPONSE:RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);return new ProtocolMessage<>(header, response);case HEART_BEAT:case OTHERS:default:throw new RuntimeException("暂不支持该消息类型");}}}
4)请求处理器(服务提供者)
解码器解码请求,反射调用方法封装返回结果。(每一个都当完整帧处理)
public class TcpServerHandler implements Handler<NetSocket> {@Overridepublic void handle(NetSocket netSocket) {// 处理连接netSocket.handler(buffer -> {// 接受请求,解码ProtocolMessage<RpcRequest> protocolMessage;try {protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);} catch (IOException e) {throw new RuntimeException("协议消息解码错误");}RpcRequest rpcRequest = protocolMessage.getBody();// 处理请求// 构造响应结果对象RpcResponse rpcResponse = new RpcResponse();try {// 获取要调用的服务实现类,通过反射调用Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());// 封装返回结果rpcResponse.setData(result);rpcResponse.setDataType(method.getReturnType());rpcResponse.setMessage("ok");} catch (Exception e) {e.printStackTrace();rpcResponse.setMessage(e.getMessage());rpcResponse.setException(e);}// 发送响应,编码ProtocolMessage.Header header = protocolMessage.getHeader();header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);try {Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);netSocket.write(encode);} catch (IOException e) {throw new RuntimeException("协议消息编码错误");}});}
}
5)发送请求(服务消费者)
调整服务消费者发送请求代码,改HTTP请求为TCP请求。(修改服务代理类)
/*** 服务代理(JDK 动态代理)*/
@Slf4j
public class ServiceProxy implements InvocationHandler {/*** 调用代理*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args) {// 构造请求String serviceName = method.getDeclaringClass().getName();RpcRequest rpcRequest = RpcRequest.builder().serviceName(serviceName).methodName(method.getName()).parameterTypes(method.getParameterTypes()).args(args).build();try {// 从注册中心获取服务提供者请求地址RpcConfig rpcConfig = RpcApplication.getRpcConfig();Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();serviceMetaInfo.setServiceName(serviceName);serviceMetaInfo.setServiceVersion(RpcConstants.DEFAULT_SERVICE_VERSION);List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());if (CollUtil.isEmpty(serviceMetaInfoList)) {throw new RuntimeException("暂无服务地址");}// 负载均衡策略ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);// 发送 TCP 请求Vertx vertx = Vertx.vertx();NetClient netClient = vertx.createNetClient();CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();netClient.connect(selectedServiceMetaInfo.getServicePort(), selectedServiceMetaInfo.getServiceHost(),result -> {if (result.succeeded()) {log.info("Connected to TCP server");io.vertx.core.net.NetSocket socket = result.result();// 构造消息,发送数据ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();ProtocolMessage.Header header = new ProtocolMessage.Header();header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);header.setVersion(ProtocolConstant.PROTOCOL_VERSION);header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());header.setRequestId(IdUtil.getSnowflakeNextId());protocolMessage.setHeader(header);protocolMessage.setBody(rpcRequest);// 编码请求try {Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);socket.write(encodeBuffer);} catch (IOException e) {throw new RuntimeException("协议消息编码错误");}// 接收响应socket.handler(buffer -> {try {ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);responseFuture.complete(rpcResponseProtocolMessage.getBody());} catch (IOException e) {throw new RuntimeException("协议消息解码错误");}});} else {log.error("Failed to connect to TCP server");}});RpcResponse rpcResponse = responseFuture.get();// 关闭连接netClient.close();return rpcResponse.getData();} catch (Exception e) {log.error("请求发送失败.");}return null;}
}
为了方便得到响应结果,CompletableFuture
转异步为同步。
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(xxx,result -> {// 完成了响应responseFuture.complete(rpcResponseProtocolMessage.getBody());});
);
// 阻塞,直到响应完成,才会继续向下执行
RpcResponse rpcResponse = responseFuture.get();
Vert.x解决半包和粘包
Vert.x框架中,RecordParser
可以根据指定的分隔符或者数据长度来解析数据流,确保每次获取完整的数据包。
具体实现思路如下:
解析完整帧
解析完整帧,再处理业务逻辑。
/*** TCP 消息处理器包装* 装饰者模式,使用 recordParser 对原有的 buffer 处理能力进行增强*/
public class TcpBufferWrapperHandler implements Handler<Buffer> {/*** 解析器,用于解决半包、粘包问题*/private final RecordParser recordParser;public TcpBufferWrapperHandler(Handler<Buffer> bufferHandler) {recordParser = initRecordParser(bufferHandler);}@Overridepublic void handle(Buffer buffer) {// 向解析器提供数据recordParser.handle(buffer);}/*** 初始化解析器*/private RecordParser initRecordParser(Handler<Buffer> bufferHandler) {// 构造 parserRecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);parser.setOutput(new Handler<Buffer>() {// 初始化int bodyLength = -1;// 一个完整消息帧Buffer messageBuffer = Buffer.buffer();@Overridepublic void handle(Buffer buffer) {// 1. 每次循环,首先读取消息头if (-1 == bodyLength) {// 读取消息体长度bodyLength = buffer.getInt(13);parser.fixedSizeMode(bodyLength);// 写入头信息到结果messageBuffer.appendBuffer(buffer);} else {// 2. 然后读取消息体// 写入体信息到结果messageBuffer.appendBuffer(buffer);// 已拼接为完整 Buffer,执行处理bufferHandler.handle(messageBuffer);// 重置一轮parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);bodyLength = -1;messageBuffer = Buffer.buffer();}}});return parser;}
}
请求处理器改造
public class TcpServerHandler implements Handler<NetSocket> {@Overridepublic void handle(NetSocket netSocket) {// 处理完整帧后再处理业务逻辑netSocket.handler(new TcpBufferWrapperHandler(buffer -> {// 接受请求,解码ProtocolMessage<RpcRequest> protocolMessage;try {protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);} catch (IOException e) {throw new RuntimeException("协议消息解码错误");}RpcRequest rpcRequest = protocolMessage.getBody();// 处理请求// 构造响应结果对象RpcResponse rpcResponse = new RpcResponse();try {// 获取要调用的服务实现类,通过反射调用Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());// 封装返回结果rpcResponse.setData(result);rpcResponse.setDataType(method.getReturnType());rpcResponse.setMessage("ok");} catch (Exception e) {e.printStackTrace();rpcResponse.setMessage(e.getMessage());rpcResponse.setException(e);}// 发送响应,编码ProtocolMessage.Header header = protocolMessage.getHeader();header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);try {Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);netSocket.write(encode);} catch (IOException e) {throw new RuntimeException("协议消息编码错误");}}));}
}
通信协议SPI
通过配置和SPI动态切换通信协议,主要作用是服务消费者发送请求以及服务提供者处理逻辑协议区分。
Vetrx服务端
public interface VertxServer {void doStart(int port);
}
/*** Vertx HTTP 服务器*/
public class VertxHttpServer implements VertxServer {private static final Logger logger = LoggerFactory.getLogger(VertxHttpServer.class);/*** 启动服务器** @param port 端口*/public void doStart(int port) {// 创建 Vert.x 实例Vertx vertx = Vertx.vertx();// 创建 HTTP 服务器io.vertx.core.http.HttpServer server = vertx.createHttpServer();// 处理请求server.requestHandler(new HttpServerHandler());// 启动 HTTP 服务器并监听指定端口server.listen(port, result -> {if (result.succeeded()) {logger.info("Server is now listening on port " + port);} else {logger.error("Failed to start server: " + result.cause());}});}
}
public class VertxTcpServer implements VertxServer {@Overridepublic void doStart(int port) {// 创建 Vert.x 实例Vertx vertx = Vertx.vertx();// 创建 TCP 服务器NetServer server = vertx.createNetServer();// 处理请求server.connectHandler(new TcpServerHandler());// 启动 TCP 服务器并监听指定端口server.listen(port, result -> {if (result.succeeded()) {System.out.println("TCP server started on port " + port);} else {System.err.println("Failed to start TCP server: " + result.cause());}});}
}
public class VertxServerFactory {static {SpiLoader.load(VertxServer.class);}public static VertxServer getInstance(String key) {return SpiLoader.getInstance(VertxServer.class, key);}
}
服务提供者启动服务器时从VertxServerFactory加载特定服务器:
public class ServiceStarter {public static void main(String[] args) {RpcApplication.init();RpcConfig rpcConfig = RpcApplication.getRpcConfig();Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());// 注册服务String serviceName = HelloService.class.getName();LocalRegistry.register(serviceName, HelloServiceImpl.class);ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());serviceMetaInfo.setServicePort(rpcConfig.getServerPort());serviceMetaInfo.setServiceName(serviceName);serviceMetaInfo.setServiceVersion(rpcConfig.getVersion());try {registry.register(serviceMetaInfo);} catch (Exception e) {throw new RuntimeException(e);}// 启动服务器VertxServer server = VertxServerFactory.getInstance(rpcConfig.getProtocol());server.(rpcConfig.getServerPort());}
}
Vetrx客户端
/*** 客户端发送请求*/
public interface VertxClient {RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws Exception;
}
public class VertxHttpClient implements VertxClient {@Overridepublic RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws Exception {RpcConfig rpcConfig = RpcApplication.getRpcConfig();// 指定序列化器final Serializer serializer = SerializerFactory.getInstance(rpcConfig.getSerializer());// 序列化byte[] bodyBytes = serializer.serialize(rpcRequest);HttpResponse httpResponse = HttpRequest.post(serviceMetaInfo.getServiceAddress()).body(bodyBytes).execute();byte[] result = httpResponse.bodyBytes();// 反序列化return serializer.deserialize(result, RpcResponse.class);}
}
@Slf4j
public class VertxTcpClient implements VertxClient {@Overridepublic RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws Exception {// 发送 TCP 请求Vertx vertx = Vertx.vertx();NetClient netClient = vertx.createNetClient();CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),result -> {if (result.succeeded()) {log.info("Connected to TCP server");io.vertx.core.net.NetSocket socket = result.result();// 构造消息,发送数据ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();ProtocolMessage.Header header = new ProtocolMessage.Header();header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);header.setVersion(ProtocolConstant.PROTOCOL_VERSION);header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());header.setRequestId(IdUtil.getSnowflakeNextId());protocolMessage.setHeader(header);protocolMessage.setBody(rpcRequest);// 编码请求try {Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);socket.write(encodeBuffer);} catch (IOException e) {throw new RuntimeException("协议消息编码错误");}// 接收响应socket.handler(buffer -> {try {ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);responseFuture.complete(rpcResponseProtocolMessage.getBody());} catch (IOException e) {throw new RuntimeException("协议消息解码错误");}});} else {log.error("Failed to connect to TCP server");}});RpcResponse rpcResponse = responseFuture.get();// 关闭连接netClient.close();return rpcResponse;}
}
public class VertxClientFactory {static {SpiLoader.load(VertxClient.class);}public static VertxClient getInstance(String key) {return SpiLoader.getInstance(VertxClient.class, key);}
}
服务代理从VertxClientFactory获取对应客户端实现并发送请求,获取响应:
public Object invoke(Object proxy, Method method, Object[] args) {// 构造请求String serviceName = method.getDeclaringClass().getName();RpcRequest rpcRequest = RpcRequest.builder().serviceName(serviceName).methodName(method.getName()).parameterTypes(method.getParameterTypes()).args(args).build();try {// 从注册中心获取服务提供者请求地址RpcConfig rpcConfig = RpcApplication.getRpcConfig();Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();serviceMetaInfo.setServiceName(serviceName);serviceMetaInfo.setServiceVersion(RpcConstants.DEFAULT_SERVICE_VERSION);List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());if (CollUtil.isEmpty(serviceMetaInfoList)) {throw new RuntimeException("暂无服务地址");}// 负载均衡策略ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);RpcResponse rpcResponse = VertxClientFactory.getInstance(rpcConfig.getProtocol()).doRequest(rpcRequest, selectedServiceMetaInfo);return rpcResponse.getData();} catch (Exception e) {log.error("请求发送失败.");}return null;}
最后再配置VertxServer、VertxClient两个接口的SPI配置文件,key都一样,分别为http、tcp。
SPI文件