响应式web

长轮训

短轮询去服务端查询的时候,不管库存量有没有变化,服务器就立即返回结果了。

长轮询则不是,在长轮询中,服务器如果检测到库存量没有变化的话,将会把当前请求挂起一段时间(这个时间也叫作超时时间,一般是90秒)。在这个时间里,服务器会去检测库存量有没有变化,检测到变化就立即返回,否则就一直等到超时为止。

而对于客户端来说,不管是长轮询还是短轮询,客户端的动作都是一样的,就是不停的去请求,不同的是服务端,短轮询情况下服务端每次请求不管有没有变化都会立即返回结果,而长轮询情况下,如果有变化才会立即返回结果,而没有变化的话,则不会再立即给客户端返回结果,直到超时为止。

背压 Backpressure

在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。

SSE

所谓的SSE(Sever-Sent Event),就是浏览器向服务器发送了一个HTTP请求,保持长连接,服务器不断单向地向浏览器推送“信息”,这么做是为了节省网络资源,不用一直发请求,建立新连接。

它是基于HTTP协议通信的。JS基于EventSource实现,后台基于text/event-stream;charset=utf-8。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@RestController
@RequestMapping(path = "/sse")
public class SseRest {

private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

@GetMapping(path = "/subscribe")
public SseEmitter subscribe(String id) {
// 超时时间设置为1小时
SseEmitter sseEmitter = new SseEmitter(3600000L);
sseCache.put(id, sseEmitter);
// 超时回调 触发
sseEmitter.onTimeout(() -> sseCache.remove(id));
// 结束之后的回调触发
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}

@GetMapping(path = "/push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
// 发送消息
sseEmitter.send(content);
}
return "over";
}

@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
// 执行完毕,断开连接
sseEmitter.complete();
sseCache.remove(id);
}
return "over";
}




@GetMapping(path = "/push-all")
public String pushAll(String content) throws IOException {
for (String s : sseCache.keySet()) {
SseEmitter sseEmitter = sseCache.get(s);
if (sseEmitter != null) {
// 发送消息
sseEmitter.send(content);
}
}

return "over";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<!-- user1.html-->
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8"> <!-- for HTML5 -->

<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>Sse测试文档</title>
</head>
<body>
<div>sse 测试</div>
<div id="result"></div>
</body>
</html>
<script>
var source = new EventSource('http://localhost/sse/subscribe?id=user1');
source.onmessage = function (event) {
text = document.getElementById('result').innerText;
text += '\n' + event.data;
document.getElementById('result').innerText = text;
};
<!-- 添加一个开启回调 -->
source.onopen = function (event) {
text = document.getElementById('result').innerText;
text += '\n 开启: ';
console.log(event);
document.getElementById('result').innerText = text;
};
</script>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<!-- user2.html-->
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8"> <!-- for HTML5 -->

<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>Sse测试文档</title>
</head>
<body>
<div>sse 测试</div>
<div id="result"></div>
</body>
</html>
<script>
<!-- H5 里的对象 -->

var source = new EventSource('http://localhost/sse/subscribe?id=user2');
source.onmessage = function (event) {
text = document.getElementById('result').innerText;
text += '\n' + event.data;
document.getElementById('result').innerText = text;
};
<!-- 添加一个开启回调 -->
source.onopen = function (event) {
text = document.getElementById('result').innerText;
text += '\n 开启: ';
console.log(event);
document.getElementById('result').innerText = text;
};
</script>

RXJava2

1
2
3
4
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class RXJavaTest {

// 同步

public static void main(String[] args) {

// Observable 被观察者

Observable<String> girl = Observable.create(new ObservableOnSubscribe<String>() {


// emitter 发射器,发射体
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {

// onNext可以 无限次调用
System.out.println(Thread.currentThread().getName());
emitter.onNext("1");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName());
emitter.onNext("2");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName());
emitter.onNext("3");
Thread.sleep(1000);
emitter.onNext("4");
Thread.sleep(1000);
emitter.onNext("5");
Thread.sleep(1000);
emitter.onComplete();
}
});

// Observer 观察者
Observer<String> man = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// TODO Auto-generated method stub
System.out.println("onSubscribe" + d);
}

@Override
public void onNext(String t) {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName());
System.out.println("onNext " + t);
}

@Override
public void onError(Throwable e) {
// TODO Auto-generated method stub
System.out.println("onError " + e.getMessage());
}

@Override
public void onComplete() {
// TODO Auto-generated method stub
System.out.println("onComplete");
}
};

girl.subscribe(man);
}
}
方法 说明
Schedulers.computation() 适用于计算密集型任务
Schedulers.io() 适用于 IO 密集型任务
Schedulers.trampoline() 在某个调用 schedule 的线程执行
Schedulers.newThread() 每个 Worker 对应一个新线程
Schedulers.single() 所有 Worker 使用同一个线程执行任务
Schedulers.from(Executor) 使用 Executor 作为任务执行的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class RXJavaTest2 {
// 异步


public static void main(String[] args) throws InterruptedException {


// 被观察者
Observable.create(new ObservableOnSubscribe<String>() {

@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
emitter.onNext("5");
emitter.onComplete();
}
})
// 哪个线程是观察者
.observeOn(
Schedulers.computation()
)
.subscribeOn( Schedulers.computation())
.subscribe(new Observer<String>() {

@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe...");
}

@Override
public void onNext(String t) {
System.out.println("onNext");
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onComplete() {
System.out.println("onComplete");
}

})
;

Thread.sleep(10000);
}
}

Reactor

响应式编程能够防止雪崩

Flux

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 静态方法生成Flux


String[] s = new String[] {"xx","oo"};
// just 已知元素数量和内容 使用
//
Flux<String> flux1 = Flux.just(s);
// flux1.subscribe(System.out::println);


Flux<String> flux2 = Flux.just("xx","xxx");
// flux2.subscribe(System.out::println);



//fromArray方法
List<String> list = Arrays.asList("hello", "world");
Flux<String> flux3 = Flux.fromIterable(list);
// flux3.subscribe(System.out::println);


//fromStream方法
Stream<String> stream = Stream.of("hi", "hello");
Flux<String> flux4 = Flux.fromStream(stream);
// flux4.subscribe(System.out::println);


//range方法
Flux<Integer> range = Flux.range(0, 5);

// range.subscribe(System.out::println);

//interval方法, take方法限制个数为5个
Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
longFlux.subscribe(System.out::println);

//链式
Flux.range(1, 5).subscribe(System.out::println);

// 合并
Flux<String> mergeWith = flux3.mergeWith(flux4);
mergeWith.subscribe(System.out::println);
System.out.println("---");

// 结合为元祖
Flux<String> source1 = Flux.just("111", "world","333");
Flux<String> source2 = Flux.just("2111", "xxx");

Flux<Tuple2<String, String>> zip = source1.zipWith(source2);
zip.subscribe(tuple -> {
System.out.println(tuple.getT1() + " -> " + tuple.getT2());
});
1
2
3
4
5
6
7
8
// 同步动态创建,next 只能被调用一次
Flux.generate(sink -> {

sink.next("xx");
sink.complete();

}).subscribe(System.out::print);
}
1
2
3
4
5
6
7
8
9
10
11
//异步动态创建
Flux.create(sink -> {

for (int i = 0; i < 10; i++) {
sink.next("xxoo:" + i);
}

sink.complete();


}).subscribe(System.out::println);

Mono

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class PersonService {
static ConcurrentHashMap<Integer, Person> map = new ConcurrentHashMap<>();

static {

for (int i = 0; i < 100; i++) {

Person person = new Person();

person.setId(i);
person.setName("yangchaoyue" + i);

map.put(i, person);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@RestController
@RequestMapping("/person")
public class PersonController {

@Autowired
PersonService personSrv;

@GetMapping("")
Mono<Object> get(String name){

System.out.println("线程 get" + Thread.currentThread().getName());
System.out.println("---1");

// 异步
Mono<Object> mono = Mono.create(sink -> {

// 组装数据序列
System.out.println("线程 create" + Thread.currentThread().getName());
sink.success(personSrv.getPerson());
})
.doOnSubscribe(sub -> {
// 1 订阅
System.out.println("xxx");
})
.doOnNext(data -> {
// 得到数据
System.out.println("data:" + data);
})

.doOnSuccess(onSuccess -> {
// 整体完成
System.out.println("onSuccess");
});
System.out.println("---2");

// SpringMvc 值 在这个环节准备好
// 得到一个包装 数据序列 -> 包含特征 -> 容器 拿到这个序列 -> 执行序列里的方法

// Ajax a() -> b(c()) ->
// 1, 写回调接口 , 让b调
// 2, 直接传方法过去
// 看起来 像是异步,实质上,阻塞的过程 在容器内部
return mono;
}
}
1
2
3
4
5
6
7
8
9
输出(同一个线程运行):
线程 getreactor-http-nio-3
---1
---2
xxx
线程 createreactor-http-nio-3
线程 getPersonreactor-http-nio-3
data:com.mashibing.admin.pojo.Person@577e0635
onSuccess
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@GetMapping("xxoo")
// ServerHttpRequest webFlux 中特有
// 拓展思维,SpringCloud Gateway
//没有HttpServlet,request里也没有sesion
Mono<Object> get2(ServerHttpRequest request,String name,WebSession session){
if(StringUtils.isEmpty(session.getAttribute("code"))) {

System.out.println("需要这样设置session值");
session.getAttributes().put("code", 250);
}
//需要这样设置属性
request.getQueryParams().add("key","value");

System.out.println("code = " + session.getAttribute("code"));

return Mono.just("么么哒");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//基于webFlux和netty的响应式编程SSE
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> sse(){

// 1. 封装对象
Flux<String> flux = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
try {
Thread.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
}
return "xxoo" + i;
}))

.doOnSubscribe(sub -> {
System.out.println("sub 了");
})
.doOnComplete(() -> {
System.out.println("doOnComplete");
})
.doOnNext(data -> {
System.out.println("有data了~" + data);
})
;
// 2. 对象 连带里面的方法 给了容器
return flux;
}

WebFlux

最后更新: 2020年11月30日 21:50

原始链接: https://midkuro.gitee.io/2020/10/21/reactor-webflux/

× 请我吃糖~
打赏二维码