涤生

2 分钟阅读

本文是《用 RSocket 解决响应式服务之间的通讯》微型系列的第二篇文章,它将帮助你熟悉 RSocket——一种可能会彻底改变机器之间通讯的新二进制协议。在以下段落中,我们将讨论在云环境中的负载平衡问题以及介绍可恢复性能力,可恢复性能力有助于解决网络问题,尤其是在 IOT 系统中。

如果您不了解 RSocket 基础知识,请查看该系列之前的文章。

高可用性和负载平衡是企业级系统的重要能力

类似银行和保险等许多业务领域,可用性和可靠性是应用程序的核心能力。在这些要求苛刻的行业中,即使在高流量、网络问题导致延迟增加或自然灾害期间,服务也必须 24*7 全天候提供服务。为了确保该软件始终能够提供用户使用,通常会在多个可用区域中对其进行冗余部署。

在这种情况下,每个微服务的至少两个实例部署在至少两个可用性区域中。多个可用区内冗余部署让系统具有弹性且增加了容量(更多个实例能够处理更高的负载)。那么这样会带来什么问题呢?冗余部署引入了额外的复杂性。作为研发人员,我们必须确保进入的的流量分布在所有可用实例中(如果流量打到不可用的实例上,会导致请求失败)。

有两种主要的方法可解决这个问题:服务端负载平衡和客户端负载平衡

第一种方法(服务端负载平衡)基于以下假设,请求者不知道响应者的 IP 地址。取而代之的是,请求者与负载均衡器进行通信,负载均衡器负责在后端的微服务之间分配进入的请求,类似产品有 Ngnix。这种设计在云时代很容易采用。 IaaS 提供商通常具有内置的可靠解决方案,例如 Amazon Web Services 中提供的 Elastic Load Balancer。此外,这种设计有助于开发比普通轮询式负载均衡更复杂的路由策略(例如,自适应负载平衡或链式故障转移)。该服务端负载平衡的主要缺点是必须配置和部署额外的资源,如果我们的系统包含数百个微服务,这可能会很痛苦。此外,它可能会影响延迟-每个请求在负载均衡器上会增加了网络一跳。

第二种方法(客户端负载平衡)与第一种方法有些相反。这里请求者知道给定微服务的每个实例的 IP 地址,而不是通过连接到响应者的中心点来进行负载均衡。有了这些微服务的实例 IP 地址,客户端就可以自己选择响应者实例,然后向其发送请求。该方法好处是不需要任何额外资源,但是必须确保请求者具有响应者所有实例的 IP 地址(请参阅如何使用服务发现模式来处理它)。客户端负载平衡模式的主要优点是其性能(可以减少一个额外的“网络跃点”),进而可以显着减少延迟。这也是 RSocket 实现客户端负载平衡模式的关键原因之一。

Load Balance

RSocket 中的客户端负载平衡

在代码实现方面,RSocket 中客户端负载平衡的实现非常简单。该机制依赖于LoadBalancedRSocketMono对象,根据该对象会从一组可用的 RSocket 实例中选择合适的 RSocket 实例。要访问 RSocket,我们必须订阅LoadBalancedRSocketMonoonNext该方法接收所有 RSocket 实例。而且,它为每个 RSocket 计算统计信息,以便能够估计每个实例的负载,并在特定时间点根据该负载选择性能最佳的实例。

该算法考虑了多个参数,例如延迟,保持的连接数以及未处理的请求数。每个 RSocket 的运行状况都由可用性参数反映出来,该参数的值从零到一,其中零表示给定实例无法处理任何请求,而一则表示分配的请求完全处理完成的 RSocket。下面的代码片段显示了 RSocket 负载平衡 的最基本示例,该示例连接到响应器的三个不同实例并执行 100 个请求。每次它从LoadBalancedRSocketMono对象获取 RSocket 。

@Slf4j
public class LoadBalancedClient {
    static final int[] PORTS = new int[]{7000, 7001, 7002};
    public static void main(String[] args) {
        List rsocketSuppliers = Arrays.stream(PORTS)
                .mapToObj(port -> new RSocketSupplier(() -> RSocketFactory.connect()
                        .transport(TcpClientTransport.create(HOST, port))
                        .start()))
                .collect(Collectors.toList());
        LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono.create((Publisher>) s -> {
            s.onNext(rsocketSuppliers);
            s.onComplete();
        });
        Flux.range(0, 100)
                .flatMap(i -> balancer)
                .doOnNext(rSocket -> rSocket.requestResponse(DefaultPayload.create("test-request")).block())
                .blockLast();
    }
}

值得提到的是,RSocket 中的客户端负载均衡器还会处理无效连接。如果在LoadBalancedRSocketMono中注册的任何 RSocket 实例停止响应,RSocket 将会自动尝试重新连接。默认情况下,它将在 25 秒内执行五次尝试。如果未成功,则将从可用连接池中删除给定的 RSocket。这种设计结合了服务器端负载平衡和低延迟的优点,并减少了客户端负载平衡的“网络跳数”。

无效连接和恢复机制

在云环境中,进行机器之间通信,实时流数据交互一般不会出现什么网络问题,但是试想一下,如果我们将物联网设备放置在无法稳定、可靠地通过网络连接访问的区域中,问题就比较复杂了。很容易想到,在这样的环境中可能面临的两个主要问题:“网络延迟”“连接稳定性”。从软件的角度来看,我们对“网络延迟”可能没有太好的办法,但是对于“连接稳定性”我们还能够做些工作的。

我们来试试用 RSocket 解决问题,先来选择合适的交互模型开始。在这种情况下,最合适的是“请求流”方法,其中部署在云环境中的微服务是请求者,而温度传感器是响应者。选择好交互模型后,我们再应用可恢复性策略。在 RSocket 中,我们通过在RSocketFactory上调用的resume()方法来实现,如下例所示:

@Slf4j
public class ResumableRequester {
    private static final int CLIENT_PORT = 7001;
    public static void main(String[] args) {
        RSocket socket = RSocketFactory.connect()
                .resume()
                .resumeSessionDuration(RESUME_SESSION_DURATION)
                .transport(TcpClientTransport.create(HOST, CLIENT_PORT))
                .start()
                .block();
        socket.requestStream(DefaultPayload.create("dummy"))
                .map(payload -> {
                    log.info("Received data: [{}]", payload.getDataUtf8());
                    return payload;
                })
                .blockLast();
    }
}
@Slf4j
public class ResumableResponder {
    private static final int SERVER_PORT = 7000;
    static final String HOST = "localhost";
    static final Duration RESUME_SESSION_DURATION = Duration.ofSeconds(60);
    public static void main(String[] args) throws InterruptedException {
        RSocketFactory.receive()
                .resume()
                .resumeSessionDuration(RESUME_SESSION_DURATION)
                .acceptor((setup, sendingSocket) -> Mono.just(new AbstractRSocket() {
                    @Override
                    public Flux requestStream(Payload payload) {
                        log.info("Received 'requestStream' request with payload: [{}]", payload.getDataUtf8());
                        return Flux.interval(Duration.ofMillis(1000))
                                .map(t -> DefaultPayload.create(t.toString()));
                    }
                }))
                .transport(TcpServerTransport.create(HOST, SERVER_PORT))
                .start()
                .subscribe();
        log.info("Server running");
        Thread.currentThread().join();
    }
}

请注意,要运行提供的示例,您需要在计算机上安装socat,请参阅README文件以了解更多详细信息

请求方和响应方的机制类似,它基于一些组件。首先,有一个ResumableFramesStore,它用作帧的缓冲区。按照默认实现,它会将它们存储在内存中,但是我们可以通过实现ResumableFramesStore接口(例如,将帧存储在分布式缓存中,如 Redis)来轻松调整以满足业务的需求。这个缓冲区是用来保存在“keep-alive 帧”之间发出的数据,“keep-alive 帧”是定期来回发送,能够探测出交互双方之间的连接是否稳定;另外,“keep-alive 帧(保活帧)”还包含令牌,这个令牌是为了确定请求者和响应者的最后接收位置。当交互双方需要要恢复连接时,它将发送带有“隐含位置”的“resume 帧(恢复帧)”。隐含位置是根据上次接收到的位置(与“保活帧”中的值相同)加上该时刻接收到的帧的长度计算得出的。此算法适用于通信的双方,在恢复帧中会含有“最后接收的服务器位置”“第一个客户端可用位置”信息的令牌。下图显示了恢复操作的整个流程:

Resumabilty Mechanism

通过采用 RSocket 协议中内置的可恢复性机制,我们可以用相对较少的精力来减少网络问题的影响。就像上面的示例所示,可恢复性在数据流应用程序中可能非常有用,尤其是在 IOT 设备与云环境中的服务通信的场景下。

总结

在本文中,我们讨论了 RSocket 协议的更多高级功能,这些功能有助于减少网络对系统可操作性的影响。我们介绍了客户端负载平衡模式和可恢复性机制的实现。这些功能与健壮的交互模型相结合,构成了协议的核心。

在本微型系列的最后一篇文章中,我们将介绍 RSocket 之上的构建可用抽象层。

作者:Rafał Kowalski ,原文:Reactive Service-To-Service Communication With RSocket (Part 2): Load Balancing and Resumability


涤生的博客。

转载请注明原创出处,谢谢!

欢迎关注我的微信公众号:「涤生的博客」,获取更多技术分享。

涤生-微信公共号