欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

实战SpringCloud响应式微服务系列教程(第七章)

程序员文章站 2022-07-02 12:46:49
本章节继续介绍:Flux和Mono操作符(二) 1.条件操作符 Reactor中常用的条件操作符有defaultIfRmpty、skipUntil、skipWhile、takeUntil和takeWhile等。 1、defaultIfRmpty defaultIfRmpty操作符返回来自原始数据流的 ......

本章节继续介绍:flux和mono操作符(二)

1.条件操作符

reactor中常用的条件操作符有defaultifrmpty、skipuntil、skipwhile、takeuntil和takewhile等。

1、defaultifrmpty

defaultifrmpty操作符返回来自原始数据流的元素,如果原始数据流中没有元素,则返回一个默认元素。

defaultifrmpty操作符在实际开发过程中应用广泛,通常用在对方法返回值的处理上。如下controller层对service层返回值的处理。

@getmapper("/article/{id}")
public mono<responseentity<article>> findbyid(@pathvariable string id){
     return articleservice.findone(id)
               .map(responseentity::ok)
               .defaultifrmpty(responseentity.status(404).body(null));
}

 

2、takeuntil

takeuntil操作符的基本用法是takeuntil(predicate<? super t>> predicate),其中predicate代表一种断言条件,takeuntil将提取元素直到断言条件返回true。

示例代码如下:

flux.range(1,100).takeuntil(i -> i == 10).subscribe(system.out::println);

 

3、takewhile

takewhile操作符的基本用法是takewhile(predicate<? super t>> continuepredicate),其中continuepredicate也代表一种断言条件。与takeuntil不同的是,takewhile会在continuepredicate条件返回true时才进行元素的提取。

示例代码如下:

flux.range(1,100).takewhile(i -> i <= 10).subscribe(system.out::println);

 

4、skipuntil

与takeuntil相对应,skipuntil的基本用法是skipuntil(predicate<? super t>> predicate)。skipuntil将丢弃原始数据中的元素,直到predicate返回true。

5、skipwhile

与takewhile相对应,skipwhile操作符的基本用法是skipwhile(predicate<? super t>> continuepredicate)。当continuepredicate返回true时才进行元素的丢弃。

2.数学操作符

reactor中常用的数学操作符有concat、count、reduce等。

1、concat

concat用来合并来自不同flux的数据,这种合并采用的是顺序的方式。

2、count

count操作符比较简单,用来统计flux中元素的个数。

3、reduce

reduce操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的mono序列。具体的累计操作也是通过一个bifunction来实现的。

示例代码如下:

flux.range(1,10).reduce((x,y) -> x+y).subscribe(system.out::println);

 

这里bifunction就是一个求和函数,用来对1到10的数字进行求和,运行结果为55。

与其类似的还有一个reducewith。

示例代码如下:

flux.range(1,10).reducewith(() - >5,(x,y) -> x+y).subscribe(system.out::println);

 

这里使用5来初始化求和过程,得到的结果是60。

3.observable工具操作符

reactor中常用的observable操作符有delay、subscribe、timeout等。

1、delay

delay将时间的传递向后延迟一段时间。

2、subscribe

在前面的代码演示了subscribe操作符的用法,我们可以通过subscribe()方法来添加相应的订阅逻辑。

在前面章节中我们提到了reactor中的消息类型有三种,即正常消息,异常消息和完成消息。subscribe操作符可以只处理其中包含的正常消息,也可以同时处理异常消息和完成消息。当我们用subscribe处理异常消息时可以采用以下方式。

mono.just(100)
         .conacatwith(mono.error(new illegalstateexception()))
         .subscribe(system.out::println,system.err::println);

 

以上代码执行结果如下,我们得到了一个100,同时也获取了illegalstateexxeption这个异常。

100
java.lang.illegalstateexxeption

有时候我们不想直接抛出异常,而是想采用一个容错策略来返回一个默认值,就可以采用以下方式。

mono.just(100)
         .conacatwith(mono.error(new illegalstateexception()))
         .onerrorreturn(0)
         .subscribe(system.out::println);

 

以上代码执行结果如下。当产生异常时,使用onerrorreturn()方法返回一个默认值0.

100
0

另外容错策略也是通过switchonerror()方法使用另外的流产生元素。以下代码示例演示了这种策略。

与上面的执行结果相同。

mono.just(100)
         .conacatwith(mono.error(new illegalstateexception()))
         .switchonerror(mono.just(0))
         .subscribe(system.out::println);

 

3、timeout

timeout操作符维持原始被观察者的状态,在特定时间内没有产生任何事件时,将生成一个异常。

4、block

block操作符在没有接收到下一个元素之前一直被阻塞。block操作符通常用来把响应式的数据流转换成传统的数据流。

例如,使用如下方法时,我们分别将flux数据流和mono数据流转变成了普通的list<order>对象和单个order对象,同样也可以设置block的等待时间。

public list<order> getallorder(){
      return orderservice.getallorders().block(duration.ofsecond(5));
}

public order getorderbyid(long orderid){
      return orderservice.getorderbyid(orderid).block(duration.ofsecond(2));
}

往期

实战springcloud响应式微服务系列教程(第一章)

实战springcloud响应式微服务系列教程(第二章)

实战springcloud响应式微服务系列教程(第三章)

实战springcloud响应式微服务系列教程(第四章)

实战springcloud响应式微服务系列教程(第五章)

实战springcloud响应式微服务系列教程(第六章)