「学习笔记」Spring Reactive Stack(六)响应式 HTTP 请求客户端 WebClient

WebClientSpring WebFlux模块提供的一个非阻塞的基于响应式编程的进行HTTP请求的客户端工具。

引入WebFlux依赖则可使用WebClient

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

1. 创建WebClient实例

WebClient接口提供了三个不同的静态方法(create()create(String baseUrl)builder())和一个内部类(WebClient.Bulider)来创建WebClient实例:

「学习笔记」Spring Reactive Stack(五)服务端事件推送Server-Sent Events

SSEServer-Sent Events服务器推送事件,是一种仅发送文本消息的技术。SSE基于HTTP协议中的持久连接。SSEHTML5标准协议中的一部分。

客户端接收服务端异步更新的消息可以分为两类:客户端拉取和服务端推送。

客户端拉取:通过短轮询或者长轮询定期请求服务器进行更新。

服务端推送:SSEWebSocketSSE是单向,WebSocket是双向;SSE基于HTTP协议,WebSocket基于WebSocket协议(HTTP以外的协议);

SSE网络协议

  • 基于纯文本的简单协议。服务器端的响应内容类型必须是text/event-stream。响应文本的内容是一个事件流,事件流是一个简单的文本流,仅支持UTF-8格式的编码。
  • 事件流由不同的事件组成。不同事件间通过仅包含回车符和换行符的空行(\r\n)来分隔。
  • 每个事件可以由多行构成,每行由类型和数据两部分组成。类型与数据通过冒号(:)进行分隔,冒号前的为类型,冒号后的为其对应的值。每个事件可以包含如下类型的行:
    • 类型为空白,表示该行是注释,会在处理时被忽略。
    • 类型为data,表示该行是事件所包含的数据。以data开头的行可以出现多次。所有这些行都是该事件的数据。
    • 类型为event,表示该行用来声明事件的类型,即事件名称。浏览器在收到数据时,会产生对应名称的事件。
    • 类型为id,表示该行用来声明事件的标识符。
    • 类型为retry,表示该行用来声明浏览器在连接断开之后进行重连的等待时间。
data: china // 该事件仅包含数据

data: Beijing // 该事件包含数据与事件标识
id: 100

event: myevent // 该事件指定了名称
data:shanghai
id: 101

: this is a comment // 该事件具有注释、名称,且包含两行数据
event:city
data: guangzhou
data: shenzhen
  • 事件标识id作用: 如果服务端发送的事件中包含事件标识id,那么浏览器会将最近一次接收到的事件标识id记录到HTTP头的Last-Event-ID属性中。如果浏览器与服务端的连接中断,当浏览器再次连接时,会将Last-Event-ID记录的事件标识id发送给服务端。服务器端通过浏览器端发送的事件标识id来确定将继续连接哪个事件。

订阅一个服务端推送事件(GET请求),需要设置 包含如下请求头的Request

「学习笔记」Spring Reactive Stack(四)响应式方式访问Redis

Spring Data Redis中同时支持了Jedis客户端和Lettuce客户端。但是仅Lettuce是支持Reactive方式的操作;这里选择默认的Lettuce客户端。

  1. 创建Maven项目,并在pom.xml导入依赖:
<!-- reactive redis依赖包(包含Lettuce客户端) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
  1. 配置文件application.yml
spring:
  redis:
    host: 127.0.0.1
    port: 6379
    password: 123456
    #Redis数据库索引(默认为0)
    database: 0
    #连接超时时间(毫秒)
    timeout: 5000
  1. 注入配置类:
@Configuration
public class ReactiveRedisConfig {
    @Bean
    public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
        return new ReactiveRedisTemplate<>(factory, RedisSerializationContext.string());
    }
}
  1. 简单的RedisService封装
@Service
@AllArgsConstructor
public class RedisService {

    private final ReactiveRedisTemplate<String, String> redisTemplate;

    public Mono<String> get(String key) {
        return key==null ? null : redisTemplate.opsForValue().get(key);
    }
    public Mono<Boolean> set(String key, String value) {
        return redisTemplate.opsForValue().set(key, value);
    }
    public Mono<Boolean> set(String key, String value, Long time) {
        return redisTemplate.opsForValue().set(key, value, Duration.ofSeconds(time));
    }
    public Mono<Boolean> exists(String key) {
        return redisTemplate.hasKey(key);
    }
    public Mono<Long> remove(String key) {
        return redisTemplate.delete(key);
    }
}
  1. 测试
@SpringBootTest
class ReactiveRedisTest {
    @Resource
    private RedisService redisService;

    @Test
    void test1() {
        // 保存5分钟
        redisService.set("test1", "test1_value", 5 * 60L).subscribe(System.out::println);
        redisService.get("test1").subscribe(System.out::println);
    }
}

测试运行结果:

「学习笔记」Spring Reactive Stack(三)使用R2DBC访问MySQL

MySQL等一系列的关系型数据库也在R2DBC等包的帮助下支持响应式。 R2DBC基于Reactive Streams反应流规范,它是一个开放的规范,为驱动程序供应商和使用方提供接口(r2dbc-spi),与JDBC的阻塞特性不同,它提供了完全反应式的非阻塞API关系型数据库交互。

  1. 创建Maven项目,并导入依赖pom.xml:
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- data-r2dbc同时也会将r2dbc-pool导入 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- r2dbc mysql 库-->
<dependency>
  <groupId>dev.miku</groupId>
  <artifactId>r2dbc-mysql</artifactId>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <optional>true</optional>
</dependency>
  1. 配置文件application.yml
spring:
  r2dbc:
    url: r2dbcs:mysql://192.168.2.100:3306/test?characterEncoding=UTF8&serverTimezone=Asia/Shanghai
    username: developer
    password: 123456
    
#日志配置
logging:
  level:
    root: info
    # 调试环境查看执行的sql
    dev.miku.r2dbc.mysql.client.ReactorNettyClient: debug
  1. SQLModel
create table `base_user` (
  `userId` int not null auto_increment,
  `userName` varchar(100) default null comment '用户名',
  `userMobile` varchar(20) default null comment '手机号',
  primary key (`userId`),
  unique key `userMobile` (`userMobile`)
) engine=innodb comment='测试用户信息';
insert into `base_user` (`userName`, `userMobile`) values ('黑子', '15914061216');
insert into `base_user` (`userName`, `userMobile`) values ('大黄', '15914061217');
@Data
public class BaseUser {
    private Integer id;
    private String name;
    private String mobile;
}

实际开发中,由于历史原因数据库字段大多与Model类字段无法对应,这里也不对应,在sql中用别名对应。

「学习笔记」Spring Reactive Stack(二)Reactor异常处理

不管是在响应式编程还是普通的程序设计中,异常处理都是一个非常重要的方面。

对于Flux或者Mono来说,所有的异常都是一个终止的操作,即使你使用了异常处理,原生成序列也不会继续。 但是如果你对异常进行了处理,那么它会将oneError信号转换成为新的序列的开始,并将替换掉之前上游产生的序列。

先看一个Flux产生异常的例子

@Test
void test1() {
    Flux.just(10, 5, 0)
        .map(i -> "100 / " + i + " = " + (10 / i))
        .subscribe(System.out::println);
}

会得到一个异常ErrorCallbackNotImplemented

100 / 10 = 1
100 / 5 = 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero

1. onError方法

ReactorsubscribeonError方法(subscribe第二个参数),就是try catch的一个具体应用:

「学习笔记」Spring Reactive Stack(一)Spring WebFlux响应式Web框架入门

/articles/2021/spring_reactive_webflux/up-ee8803006ea5a4169467074917d6fdd0b12.webp

Spring WebFluxSpring Framework 5.0中引入的以Reactor为基础的响应式编程Web框架。

WebFlux 的异步处理是基于Reactor实现的,是将输入流适配成MonoFlux进行统一处理。

1. 响应式流(Reactive Streams)

  • Reactor 是一个响应式流,它有对应的发布者(Publisher),用两个类来表示:

0%