logo头像
Snippet 博客主题

高并发之请求合并

本文于 1626 天之前发表,文中内容可能已经过时。

如果我们没有在高并发场景下,我们获取单个用户信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UserServiceImpl implements UserService {
@Override
public User getById(Integer id) {
try {
//这段代码代码该方法的正常耗时
Thread.sleep(10l);
User user = new User();
user.setId(id);
return user;
}catch (Exception e){
return null;
}
}
}

但在面向大批量to c的用户场景下,用户的请求会出现毛刺的现象。比如某段时间逛的人特别多,获取用户信息,或者商品信息的请求某段时间突然变大,导致单台服务器支持不住,如果做流控的话,体验不是很好。

在这种情况下,两种办法:
1: 添加服务器
2: 请求合并

添加服务器简单有效,但是成本上去了,请求合并能够解决单台服务器的吞吐量的问题,那么上面的代码需要变成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class MergeUserServiceImpl implements UserService {

static class Request {
Integer id;
CompletableFuture future;

public Request(Integer id, CompletableFuture future) {
this.id = id;
this.future = future;
}

}

private LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>(1000);

public MergeUserServiceImpl(){
init();
}

private void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//1.从阻塞队列中取出queue的请求,生成一次批量查询。
int size = linkedBlockingQueue.size();
if (size == 0) {
return;
}
List<Request> requests = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
// 移出队列,并返回。
Request poll = linkedBlockingQueue.poll();
requests.add(poll);
}
//2.组装一个批量查询请求参数。
List<Integer> ids = new ArrayList<>();
for (Request request : requests) {
ids.add(request.id);
}
//3. http 请求,或者 dubbo 请求。批量请求,得到结果list。
System.out.println("本次合并请求数量:"+ids.size());
//请求
Map<Integer, User> responses = new HashMap<>();
for(Integer id:ids){
User user=new User();
user.setId(id);
responses.put(id,user);
}
Thread.sleep(100l);
//4.将结果响应给每一个单独的用户请求。
for (Request request : requests) {
//根据请求中携带的能表示唯一参数,去批量查询的结果中找响应。
User user= responses.get(request.id);
//将结果返回到对应的请求线程。2个线程通信,异步编程赋值。
//complete(),源码注释翻译:如果尚未完成,则将由方法和相关方法返回的值设置为给定值
request.future.complete(user);
}

} catch (Exception e) {
e.printStackTrace();
}
}
// 立即执行任务,并间隔10 毫秒重复执行。
}, 0, 10, TimeUnit.MILLISECONDS);
}


public User getById(Integer id) {
CompletableFuture<User> future = new CompletableFuture();
linkedBlockingQueue.offer(new Request(id, future));
try {
return future.get();
}catch (Exception e){
e.printStackTrace();
}
return null;

}
}

这么做的好处,可以合并io的操作,可以使用redis的pipline

spring cloud 下Hystrix请求合并

demo1
demo2

在spring 环境下的请求合并,使用了Spring的注解,逻辑思路还是如上面所示。