高并发之请求合并
本文于
1626
天之前发表,文中内容可能已经过时。
如果我们没有在高并发场景下,我们获取单个用户信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class UserServiceImpl implements UserService { @Override public User getById(Integer id) { try { //这段代码代码该方法的正常耗时 Thread.sleep(10l); User user = new User(); user.setId(id); return user; }catch (Exception e){ return null; } } }
但在面向大批量to c的用户场景下,用户的请求会出现毛刺的现象。比如某段时间逛的人特别多,获取用户信息,或者商品信息的请求某段时间突然变大,导致单台服务器支持不住,如果做流控的话,体验不是很好。
在这种情况下,两种办法: 1: 添加服务器 2: 请求合并
添加服务器简单有效,但是成本上去了,请求合并能够解决单台服务器的吞吐量的问题,那么上面的代码需要变成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class MergeUserServiceImpl implements UserService { static class Request { Integer id; CompletableFuture future; public Request(Integer id, CompletableFuture future) { this.id = id; this.future = future; } } private LinkedBlockingQueue<Request> linkedBlockingQueue = new LinkedBlockingQueue<Request>(1000); public MergeUserServiceImpl(){ init(); } private void init() { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //1.从阻塞队列中取出queue的请求,生成一次批量查询。 int size = linkedBlockingQueue.size(); if (size == 0) { return; } List<Request> requests = new ArrayList<>(size); for (int i = 0; i < size; i++) { // 移出队列,并返回。 Request poll = linkedBlockingQueue.poll(); requests.add(poll); } //2.组装一个批量查询请求参数。 List<Integer> ids = new ArrayList<>(); for (Request request : requests) { ids.add(request.id); } //3. http 请求,或者 dubbo 请求。批量请求,得到结果list。 System.out.println("本次合并请求数量:"+ids.size()); //请求 Map<Integer, User> responses = new HashMap<>(); for(Integer id:ids){ User user=new User(); user.setId(id); responses.put(id,user); } Thread.sleep(100l); //4.将结果响应给每一个单独的用户请求。 for (Request request : requests) { //根据请求中携带的能表示唯一参数,去批量查询的结果中找响应。 User user= responses.get(request.id); //将结果返回到对应的请求线程。2个线程通信,异步编程赋值。 //complete(),源码注释翻译:如果尚未完成,则将由方法和相关方法返回的值设置为给定值 request.future.complete(user); } } catch (Exception e) { e.printStackTrace(); } } // 立即执行任务,并间隔10 毫秒重复执行。 }, 0, 10, TimeUnit.MILLISECONDS); } public User getById(Integer id) { CompletableFuture<User> future = new CompletableFuture(); linkedBlockingQueue.offer(new Request(id, future)); try { return future.get(); }catch (Exception e){ e.printStackTrace(); } return null; } }
这么做的好处,可以合并io的操作,可以使用redis的pipline
spring cloud 下Hystrix请求合并 demo1 demo2
在spring 环境下的请求合并,使用了Spring的注解,逻辑思路还是如上面所示。