响应式web
长轮训
短轮询去服务端查询的时候,不管库存量有没有变化,服务器就立即返回结果了。
长轮询则不是,在长轮询中,服务器如果检测到库存量没有变化的话,将会把当前请求挂起一段时间(这个时间也叫作超时时间,一般是90秒)。在这个时间里,服务器会去检测库存量有没有变化,检测到变化就立即返回,否则就一直等到超时为止。
而对于客户端来说,不管是长轮询还是短轮询,客户端的动作都是一样的,就是不停的去请求,不同的是服务端,短轮询情况下服务端每次请求不管有没有变化都会立即返回结果,而长轮询情况下,如果有变化才会立即返回结果,而没有变化的话,则不会再立即给客户端返回结果,直到超时为止。
背压 Backpressure
在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer 溢出,这种现象就叫做 Backpressure 出现。
SSE
所谓的SSE(Sever-Sent Event),就是浏览器向服务器发送了一个HTTP请求,保持长连接,服务器不断单向地向浏览器推送“信息”,这么做是为了节省网络资源,不用一直发请求,建立新连接。
它是基于HTTP协议通信的。JS基于EventSource实现,后台基于text/event-stream;charset=utf-8。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| @RestController @RequestMapping(path = "/sse") public class SseRest {
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@GetMapping(path = "/subscribe") public SseEmitter subscribe(String id) { SseEmitter sseEmitter = new SseEmitter(3600000L); sseCache.put(id, sseEmitter); sseEmitter.onTimeout(() -> sseCache.remove(id)); sseEmitter.onCompletion(() -> System.out.println("完成!!!")); return sseEmitter; }
@GetMapping(path = "/push") public String push(String id, String content) throws IOException { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { sseEmitter.send(content); } return "over"; }
@GetMapping(path = "over") public String over(String id) { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { sseEmitter.complete(); sseCache.remove(id); } return "over"; }
@GetMapping(path = "/push-all") public String pushAll(String content) throws IOException { for (String s : sseCache.keySet()) { SseEmitter sseEmitter = sseCache.get(s); if (sseEmitter != null) { sseEmitter.send(content); } }
return "over"; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <!doctype html> <html lang="en"> <head> <meta charset="UTF-8">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Sse测试文档</title> </head> <body> <div>sse 测试</div> <div id="result"></div> </body> </html> <script> var source = new EventSource('http://localhost/sse/subscribe?id=user1'); source.onmessage = function (event) { text = document.getElementById('result').innerText; text += '\n' + event.data; document.getElementById('result').innerText = text; }; source.onopen = function (event) { text = document.getElementById('result').innerText; text += '\n 开启: '; console.log(event); document.getElementById('result').innerText = text; }; </script>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| <!doctype html> <html lang="en"> <head> <meta charset="UTF-8">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Sse测试文档</title> </head> <body> <div>sse 测试</div> <div id="result"></div> </body> </html> <script>
var source = new EventSource('http://localhost/sse/subscribe?id=user2'); source.onmessage = function (event) { text = document.getElementById('result').innerText; text += '\n' + event.data; document.getElementById('result').innerText = text; }; source.onopen = function (event) { text = document.getElementById('result').innerText; text += '\n 开启: '; console.log(event); document.getElementById('result').innerText = text; }; </script>
|
RXJava2
1 2 3 4
| <dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class RXJavaTest {
public static void main(String[] args) {
Observable<String> girl = Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println(Thread.currentThread().getName()); emitter.onNext("1"); Thread.sleep(1000); System.out.println(Thread.currentThread().getName()); emitter.onNext("2"); Thread.sleep(1000); System.out.println(Thread.currentThread().getName()); emitter.onNext("3"); Thread.sleep(1000); emitter.onNext("4"); Thread.sleep(1000); emitter.onNext("5"); Thread.sleep(1000); emitter.onComplete(); } });
Observer<String> man = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe" + d); }
@Override public void onNext(String t) { System.out.println(Thread.currentThread().getName()); System.out.println("onNext " + t); }
@Override public void onError(Throwable e) { System.out.println("onError " + e.getMessage()); }
@Override public void onComplete() { System.out.println("onComplete"); } };
girl.subscribe(man); } }
|
方法 |
说明 |
Schedulers.computation() |
适用于计算密集型任务 |
Schedulers.io() |
适用于 IO 密集型任务 |
Schedulers.trampoline() |
在某个调用 schedule 的线程执行 |
Schedulers.newThread() |
每个 Worker 对应一个新线程 |
Schedulers.single() |
所有 Worker 使用同一个线程执行任务 |
Schedulers.from(Executor) |
使用 Executor 作为任务执行的线程 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class RXJavaTest2 {
public static void main(String[] args) throws InterruptedException {
Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); emitter.onNext("4"); emitter.onNext("5"); emitter.onComplete(); } }) .observeOn( Schedulers.computation() ) .subscribeOn( Schedulers.computation()) .subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe..."); }
@Override public void onNext(String t) { System.out.println("onNext"); }
@Override public void onError(Throwable e) { System.out.println("onError"); }
@Override public void onComplete() { System.out.println("onComplete"); }
}) ;
Thread.sleep(10000); } }
|
Reactor
响应式编程能够防止雪崩
Flux
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
String[] s = new String[] {"xx","oo"};
Flux<String> flux1 = Flux.just(s);
Flux<String> flux2 = Flux.just("xx","xxx");
List<String> list = Arrays.asList("hello", "world"); Flux<String> flux3 = Flux.fromIterable(list);
Stream<String> stream = Stream.of("hi", "hello"); Flux<String> flux4 = Flux.fromStream(stream);
Flux<Integer> range = Flux.range(0, 5);
Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).take(5); longFlux.subscribe(System.out::println);
Flux.range(1, 5).subscribe(System.out::println);
Flux<String> mergeWith = flux3.mergeWith(flux4); mergeWith.subscribe(System.out::println); System.out.println("---");
Flux<String> source1 = Flux.just("111", "world","333"); Flux<String> source2 = Flux.just("2111", "xxx");
Flux<Tuple2<String, String>> zip = source1.zipWith(source2); zip.subscribe(tuple -> { System.out.println(tuple.getT1() + " -> " + tuple.getT2()); });
|
1 2 3 4 5 6 7 8
| Flux.generate(sink -> {
sink.next("xx"); sink.complete();
}).subscribe(System.out::print); }
|
1 2 3 4 5 6 7 8 9 10 11
| Flux.create(sink -> {
for (int i = 0; i < 10; i++) { sink.next("xxoo:" + i); }
sink.complete();
}).subscribe(System.out::println);
|
Mono
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Service public class PersonService { static ConcurrentHashMap<Integer, Person> map = new ConcurrentHashMap<>();
static {
for (int i = 0; i < 100; i++) {
Person person = new Person();
person.setId(i); person.setName("yangchaoyue" + i);
map.put(i, person); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @RestController @RequestMapping("/person") public class PersonController {
@Autowired PersonService personSrv;
@GetMapping("") Mono<Object> get(String name){
System.out.println("线程 get" + Thread.currentThread().getName()); System.out.println("---1");
Mono<Object> mono = Mono.create(sink -> {
System.out.println("线程 create" + Thread.currentThread().getName()); sink.success(personSrv.getPerson()); }) .doOnSubscribe(sub -> { System.out.println("xxx"); }) .doOnNext(data -> { System.out.println("data:" + data); })
.doOnSuccess(onSuccess -> { System.out.println("onSuccess"); }); System.out.println("---2");
return mono; } }
|
1 2 3 4 5 6 7 8 9
| 输出(同一个线程运行): 线程 getreactor-http-nio-3 ---1 ---2 xxx 线程 createreactor-http-nio-3 线程 getPersonreactor-http-nio-3 data:com.mashibing.admin.pojo.Person@577e0635 onSuccess
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @GetMapping("xxoo")
Mono<Object> get2(ServerHttpRequest request,String name,WebSession session){ if(StringUtils.isEmpty(session.getAttribute("code"))) {
System.out.println("需要这样设置session值"); session.getAttributes().put("code", 250); } request.getQueryParams().add("key","value");
System.out.println("code = " + session.getAttribute("code"));
return Mono.just("么么哒"); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> sse(){
Flux<String> flux = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> { try { Thread.sleep(new Random().nextInt(3000)); } catch (InterruptedException e) { } return "xxoo" + i; }))
.doOnSubscribe(sub -> { System.out.println("sub 了"); }) .doOnComplete(() -> { System.out.println("doOnComplete"); }) .doOnNext(data -> { System.out.println("有data了~" + data); }) ; return flux; }
|
WebFlux