当前位置:Java -> Spring WebFlux 重试

Spring WebFlux 重试

如果你使用Spring WebFlux,你可能希望你的请求更加具有弹性。在这种情况下,我们可以使用WebFlux库中附带的重试机制。

WebFlux logo有一些情况我们可以考虑:

  • 向服务器发送过多请求
  • 内部服务器错误
  • 意外的格式
  • 服务器超时
WebFlux和MockWebServer
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>2.7.15</version>
</dependency>
 
<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>mockwebserver</artifactId>
    <version>4.11.0</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    <version>3.5.9</version>
</dependency>


import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.SocketPolicy;
import org.junit.jupiter.api.Test;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
 
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
 
class WebFluxRetry {
 
    @Test
    void testTooManyRequests() throws IOException {
        MockWebServer server = new MockWebServer();
        MockResponse tooManyRequests = new MockResponse()
                .setBody("Too Many Requests")
                .setResponseCode(429);
        MockResponse successfulRequests = new MockResponse()
                .setBody("successful");
 
        server.enqueue(tooManyRequests);
        server.enqueue(tooManyRequests);
        server.enqueue(successfulRequests);
        server.start();
 
        WebClient webClient = WebClient.builder()
                .baseUrl("http://" + server.getHostName() + ":" + server.getPort())
                .build();
 
        Mono<String> result = webClient.get()
                .retrieve()
                .bodyToMono(String.class)
                .retry(2);
 
        StepVerifier.create(result)
                .expectNextMatches(s -> s.equals("successful"))
                .verifyComplete();
 
        server.shutdown();
    }
}


429响应。 5xx响应的情况。5xx可能是由各种原因引起的。通常,如果我们遇到5xx,可能是服务器代码库中出现了问题。但在某些情况下,5xx可能是由于一个经常重新启动的不稳定服务导致的。此外,服务器可能部署在面临网络问题的可用性区域;甚至可能是一个尚未完全生效的失败发布。在这种情况下,进行重试是有意义的。通过重试,请求将被路由到负载均衡器后面的下一个服务器。
@Test
void test5xxResponse() throws IOException {
    MockWebServer server = new MockWebServer();
    MockResponse tooManyRequests = new MockResponse()
            .setBody("Server Error")
            .setResponseCode(500);
    MockResponse successfulRequests = new MockResponse()
            .setBody("successful");
 
    server.enqueue(tooManyRequests);
    server.enqueue(tooManyRequests);
    server.enqueue(successfulRequests);
    server.start();
 
    WebClient webClient = WebClient.builder()
            .baseUrl("http://" + server.getHostName() + ":" + server.getPort())
            .build();
 
    Mono<String> result = webClient.get()
            .retrieve()
            .bodyToMono(String.class)
            .retry(2);
 
    StepVerifier.create(result)
            .expectNextMatches(s -> s.equals("successful"))
            .verifyComplete();
 
    server.shutdown();
}


@Data
@AllArgsConstructor
@NoArgsConstructor
private static class UsernameResponse {
    private String username;
}
 
@Test
void badFormat() throws IOException {
    MockWebServer server = new MockWebServer();
    MockResponse tooManyRequests = new MockResponse()
            .setBody("Plain text");
    MockResponse successfulRequests = new MockResponse()
            .setBody("{\"username\":\"test\"}")
            .setHeader("Content-Type","application/json");
 
    server.enqueue(tooManyRequests);
    server.enqueue(tooManyRequests);
    server.enqueue(successfulRequests);
    server.start();
 
    WebClient webClient = WebClient.builder()
            .baseUrl("http://" + server.getHostName() + ":" + server.getPort())
            .build();
 
    Mono<UsernameResponse> result = webClient.get()
            .retrieve()
            .bodyToMono(UsernameResponse.class)
            .retry(2);
 
    StepVerifier.create(result)
            .expectNextMatches(s -> s.getUsername().equals("test"))
            .verifyComplete();
 
    server.shutdown();
}


UsernameResponse对象,这些响应将被拒绝。感谢重试,我们设法获得了一个成功的响应。
@Test
void badTimeout() throws IOException {
    MockWebServer server = new MockWebServer();
    MockResponse dealayedResponse= new MockResponse()
            .setBody("Plain text")
            .setSocketPolicy(SocketPolicy.DISCONNECT_DURING_RESPONSE_BODY)
            .setBodyDelay(10000, TimeUnit.MILLISECONDS);
    MockResponse successfulRequests = new MockResponse()
            .setBody("successful");
 
    server.enqueue(dealayedResponse);
    server.enqueue(successfulRequests);
    server.start();
 
    WebClient webClient = WebClient.builder()
            .baseUrl("http://" + server.getHostName() + ":" + server.getPort())
            .build();
 
    Mono<String> result = webClient.get()
            .retrieve()
            .bodyToMono(String.class)
            .timeout(Duration.ofMillis(5_000))
            .retry(1);
 
    StepVerifier.create(result)
            .expectNextMatches(s -> s.equals("successful"))
            .verifyComplete();
 
    server.shutdown();
}


推荐阅读: 百度面经(3)

本文链接: Spring WebFlux 重试