Rpc 中间件

分布式系统的 CAP 理论

  • 一致性(Consistency):所有节点访问同一份最新的数据副本
  • 可用性(Availability):非故障的节点在合理的时间内返回合理的响应(不是错误或者超时的响应)
  • 分区容错性(Partition Tolerance):分布式系统出现网络分区的时候,仍然能够对外提供服务。

分布式保证服务高并发稳定性

  • 缓存:缓存是一种提高数据读取性能的技术,通过在内存中存储经常访问的数据,可以减少对数据库或者其他存储系统的访问,从而提高系统的响应速度。缓存可以应用在多个层次,例如浏览器缓存、CDN 缓存、反向代理缓存、应用缓存等。
  • 降级:降级是在系统压力过大或者部分服务不可用的情况下,暂时关闭一些非核心的服务,以保证核心服务的正常运行。降级可以在多个层次进行,例如页面降级、功能降级、服务降级等。
  • 限流:限流是一种控制系统处理请求的速率的技术,以防止系统过载。限流可以通过多种算法实现,例如固定窗口、滑动窗口、漏桶算法、令牌桶算法等。

Nacos 配置中心 注册中心

RPC 框架

泛指调用远程的函数的方法,对编解码和网络层有特殊的优化,从而有更高的效率。

RPC 框架主要由 Server、Client、Server Stub、Client Stub 组件组成

  • Client Stub 主要是将客户端的请求参数、请求服务地址、请求服务名称做一个封装,并发送给 Server Stub
  • Server Stub 主要用于接收 Client Stub 发送的数据并解析,去调用 Server 端的本地方法

层次结构:代理曾、注册中心层(服务发现、注册、管理)、

RPC 协议定制化程度高,可以采用体积更小的 Protobuf 或其他序列化协议去保存结构体数据,同时也不需要像 HTTP 那样考虑各种浏览器行为,如 302 重定向跳转。因此性能也会更好一些。其底层不仅可以通过 tcp udp 实现,也可以通过 http 实现。

RPC 协议会建个连接池,在请求量大的时候,建立多条连接放在池内,要发数据的时候就从池里取一条连接出来,用完放回去,下次再复用,可以说非常环保。

目前,对外一般用 HTTP 协议,而内部集群的微服务之间则采用 RPC 协议进行通讯。

Dubbo 上线问题

记录、学习、汇总实习和技术博客中看到的 Dubbo 实战遇到的问题

服务端接口不指定通讯协议导致 OOM

问题背景:

问题描述:

  • 开发人员使用方没有配置接口的 protocol 字段,而是写在接口实现类上

  • dubbo-3.0.4 框架:

    • 若没有指定 protocol 字段,会创建服务端提供的所有协议的 invoker
    • EasyREST 包createClientInvoker创建 REST 协议的 invoker 时需要 HttpMethod 参数
    • 由于注解写在实现类而不是接口上面,导致消费端无法共享相应的 REST 配置信息,即 HttpMethod 为 null,因此 RESTClient 创建失败
    • 在服务更新过程中,客户端每次都是全量更新注册中心的服务端信息,这个过程中如果 invoker 一直注册失败,会不断将失败的Client存到List<ResteasyClient>中,如果有 n 台机器需要依次更新,这时客户端的数组中会存 n*n 个失败的连接实例,从而导致 OOM

解决方法(针对框架,开发人员需要牢记 Dubbo 配置必须在接口上):

  • ResteasyClient 对象由 RestProtocol 协议对象持有,针对没有创建成功 invoker 的场景应把其对应的 ResteasyClient 销毁

销毁方法

  • Map<String, ResteasyClient>代替 List<ResteasyClient>,同一个服务端只存一次
  • WeakHashMap<String, ResteasyClient>自动回收不用的 ResteasyClient(参考pr

参考

父子线程并发 RPC 调用死锁

问题描述:

  • 直接通过 Executors 构造 固定数量+无限长等待队列 线程池,该线程池在共享的无界队列上操作的固定数量的线程。在任何时候,最多nThreads线程将处于活动处理任务状态。如果在所有线程都处于活动状态时提交额外的任务,它们将在队列中等待,直到有线程可用。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 并发调用 Rpc 接口获取返回值
Future<String> future = executor.submit(()->{
return 数据查询任务;
})
  • 数据查询任务中存在父子调用关系,数据查询任务同步调用 Redis 查询缓存和 Rpc 调用微服务获得数据。如果本地 Redis 缓存中能获得数据,就直接返回缓存值并取消 Rpc 调用 future.cancel(true)

解决办法

  • 父子任务分开并发执行,耗时的异步线程应做好线程池隔离
  • CompletableFuture.anyOf(task1,tsak2) 优先取先完成的那个任务的返回值

参考

尝试开发基础版 Rpc 中间件

服务发现流程:使用远程服务的时候首先需要配置一个 dubbo.xml 文件或者在使用的类加上@Reference,二者都是用来对dubbo消费者引用服务进行一些配置,然后应用在启动的时候会将配置信息转化为一个ReferenceBean对象,并调用createProxy方法创建一个远程服务接口的代理对象。

代理对象的 invoke 创建:启动时主动订阅注册中心,会显示地调用一次notify接口,这个接口会尝试将远程服务注册的url转换成一个本地的invoker

  • 转换过程中会根据具体的 protocol 创建对应的 invoker,如 Rest 和 Dubbo 需要解析不同的参数。

服务注册地址发生改变时:会通知给消费者,dubbo 协议里面明确讲到服务发生改变会全量更新所有客户端的缓存

Netty 数据传输

在网络通信中,序列化和编码通常是结合使用的。序列化将对象转换为字节流,编码将字节流转换为网络传输所需的格式(如按照协议规定的格式进行编码)。在接收端,解码将网络传输的数据解析为原始的格式(如解码成对象),以便进行进一步的处理和使用。

Netty 是一个高性能事件驱动型非阻塞 IO 框架

  • 提供了 ByteBuf 容器,更方便地处理数据。ByteBuf 在重写编解码器的 encoder/decoder 方法中作为参数传入。
  • channel

编码:先序列化,再压缩,最后写进 ByteBuf 对象中

解码:从 ByteBuf 对象中读,然后转成我们需要的对象

代理模式

静态代理

静态代理中,我们对目标对象的每个方法的增强都是手动完成的(后面会具体演示代码),非常不灵活(比如接口一旦新增加方法,目标对象和代理对象都要进行修改)且麻烦(需要对每个目标类都单独写一个代理类)。 实际应用场景非常非常少,日常开发几乎看不到使用静态代理的场景。

从 JVM 层面来说, 静态代理在编译时就将接口、实现类、代理类这些都变成了一个个实际的 class 文件。

静态代理的步骤

  • 定义一个接口及其实现类;
  • 创建一个代理类同样实现这个接口
  • 将目标对象注入进代理类,然后在代理类的对应方法调用目标类中的对应方法。这样的话,我们就可以通过代理类屏蔽对目标对象的访问,并且可以在目标方法执行前后做一些自己想做的事情。

JDK 动态代理

  • 定义一个接口及其实现类;
  • 自定义 InvocationHandler 并重写 invoke 方法,在 invoke 方法中我们会调用原生方法(被代理类的方法)并自定义一些处理逻辑;
  • 通过 Proxy.newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h) 方法创建代理对象;
public interface InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
}
public class RpcClientProxy implements InvocationHandler {
// 在代理类内,根据传入的类 clazz,获得代理对象
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
// 当通过代理类调用的所有方法,实际上调用的是这个invoke(传入的只是服务接口,具体的实现在服务端实现类中)
public Object invoke(Object proxy, Method method, Object[] args) {...}
}

那么是如何调用服务端实现类的代码的呢?注入

  1. 在实例化 bean 之后,postProcessAfterInitialization 方法被调用。
  2. 对于每个 bean 类的字段,检查是否标记了 @RpcReference 注解。
  3. 如果发现标记了 @RpcReference 注解的字段,根据注解中的信息创建相应的 RpcServiceConfig 对象。
  4. 通过 RpcClientProxy 创建了一个代理对象 clientProxy,这个代理对象实现了 declaredField.getType() 所表示的接口,这个接口可能就是 HelloService 接口或其它接口,取决于 helloService 字段的类型。
  5. 这个代理对象 clientProxy 会在方法调用时委托给 RpcClientProxy 的 invoke 方法,该方法负责处理远程调用的逻辑。
  6. 通过反射设置 clientProxy 到 HelloController 类的 helloService 字段上,替换了原来的字段值。
Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
declaredField.setAccessible(true);
try {
declaredField.set(bean, clientProxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}

传输协议

*   0     1     2     3     4        5     6     7     8         9        10     11     12   13   14 15 16
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+-------+-----+---+-----+----+
* | magic code |version | full length | messageType| codec|compress| RequestId |
* +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* | body |
* +-------------------------------------------------------------------------------------------------------+
* 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型)
* 1B compress(压缩类型) 1B codec(序列化类型) 4B requestId(请求的Id)

注册中心

服务注册:在 zk 里面创建一个对应的持久节点

当我们的服务被注册进 zookeeper 的时候,我们将完整的服务名称 rpcServiceName (class name+group+version)作为根节点 ,子节点是对应的服务地址(ip+端口号)。相关代码在 ZkServiceProviderImpl.publishService() 中。

如果我们要获得某个服务对应的地址的话,就直接根据完整的服务名称来获取到其下的所有子节点,然后通过具体的负载均衡策略取出一个就可以了。相关代码在 ZkServiceDiscoveryImpl.lookupService() 中。

Zookeeper 客户端

Curator 是 Netflix 公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。项目内采用 CuratorUtils 类调用 Curator。

重试策略:ExponentialBackoffRetry是指数退避重试策略

// zk的对外接口,后面我们要干什么只需要通过单例模式获得这个类,然后用这个类的方法
public interface ServiceProvider {
// 添加到本地 serviceMap 中(远程Rpc调用,获取本地的调用类)
void addService(RpcServiceConfig rpcServiceConfig);
Object getService(String rpcServiceName);
// 注册服务到 zookeeper 中(服务发现与注册)
void publishService(RpcServiceConfig rpcServiceConfig);
}

负载均衡

随机选取策略轮询策略加权轮询策略最少活跃连接策略一致性 Hash 策略

序列化

  • 序列化:将数据结构或对象转化为二进制字节流
  • 反序列化:将在序列化过程中生成的二进制字节流转化为数据结构或对象

数据传输过程中,可能会出现粘包和半包问题,你是如何解决的?

自定义消息结构(其他的还有固定长度传输、特殊字符分割):MagicNumber 魔数、ContentLength 请求长度