欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

高并发处理,从Java8新特性说起

程序员文章站 2024-03-17 20:30:58
...

一、前言

随着公司业务的扩展和用户的增加,我们的网关接口不得不面对高并发的场景。
如何处理高并发请求呢?除了在系统架构上,分库分表、分布集群,异步处理等处理方式。本文来聊一聊,通过Java语言本身,来进行高并发场景的优化。

二、核心思路

高并发处理,从Java8新特性说起
如图所示
1、多客户端同时向服务器发起请求。
2、服务器将获取到的请求,添加到请求队列。
3、由一个定时任务(比如10ms执行一次),获取队列的全部元素,并将之包装成为请求列表,注意这里的队列是线程安全的。
4、不再单次请求一条数据,而是发送批量数据,获取到批量数据

//单次请求
getStoreOrderByOrderCode(String code);
//批量请求
getStoreOrderListByOrderCodeBatch(List<String> codeList);

5、遍历获取到的列表,将数据发送至对应线程。
6、多客户端获取到服务器的响应。

三、代码

废话不多说,开整

1、远程调用的服务

/**
 * @author xuyuanpeng
 * @version 1.0
 * @date 2019-05-15 14:28
 */
@Service
public class RemoteService {
    /**
     * 假装这是一个远程接口
     * @param code
     * @return
     */
    public StoreOrder getOrderByCode(String code) {
        StoreOrder order=new StoreOrder();
        order.setCreateTime(System.currentTimeMillis());
        order.setId(new Random().nextInt());
        order.setOrderCode(code);
        return order;
    }

    /**
     * 假装这是一个批量请求的远程接口
     * @param codes
     * @return
     */
    public List<StoreOrder> getOrderListByCodeBatch(List<String> codes){
        List<StoreOrder> list = new ArrayList<>();
        for(String code : codes){
            list.add(this.getOrderByCode(code));
        }
        return list;
    }
}

2、模拟客户端批量请求

   @Autowired
   private IPerformanceOrderService performanceOrderService;
   private static final Integer number=10000;

   private CountDownLatch cdl=new CountDownLatch(number);

  @org.junit.Test
  public void testThread3(){
        //在线程中进行wait,当 cdl.countDown() 递减,直至=0是,wait完毕
        //这时创建的1000个线程,也创建完毕,1000个线程内容,执行同时执行

        long startTime=System.currentTimeMillis();

        for(int i=0;i<number;i++){
            Thread thread = new Thread(()->{
                try {
                    cdl.await();
                    StoreOrder order = performanceOrderService.getOrderByCode("CODE_"+new Random().nextInt());
//                    System.out.println("RESULT>>>"+JsonMapper.toJsonString(order));
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
            thread.start();
            cdl.countDown();
        }
        long finshTime=System.currentTimeMillis()-startTime;
        System.out.println("处理完成时间:"+finshTime);
        //避免以上不输出,直接down掉
        try {
            Thread.sleep(50000);
        }catch (Exception e){
            // who care?
        }
    }
变量说明:

1.performanceOrderService是一个service层为一个订单服务。
2.CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行

初始化时,定义它的值,在业务代码之前,进行wait。
通过countDown()方法,对其值进行-1
当值为0时,唤醒wait的线程。执行wait后面的方法。
本文中,我们先批量的start了多个线程,当线程创建并start完毕,最后一起执行里面的方法。

3、服务类

@Service
public class PerformanceOrderService implements IPerformanceOrderService {
    @Autowired
    private RemoteService remoteService;

    //线程安全的队列
    LinkedBlockingDeque<XRequest> queue=new LinkedBlockingDeque<>();

    @Override
    public StoreOrder getOrderByCode(String code) throws ExecutionException, InterruptedException {
        XRequest request=new XRequest();
        request.setOrderCode(code);

        //JDK 新特性 存储线程完成结果,并可以分发回线程
        CompletableFuture<StoreOrder> future = new CompletableFuture<>();

        request.setFuture(future);

        queue.add(request);

        //阻塞中 等待原厂接口调用完成
        return future.get();
    }

    @PostConstruct
    public void init(){
        System.out.println("PerformanceOrderService>>>init");
        // Executors.newSingleThreadScheduledExecutor()=Executors.newScheduledThreadPool(1)
        ScheduledExecutorService scheduledExecutorService =
                Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                Integer size=queue.size();
                if(size==0){
                    return;
                }
                ArrayList<XRequest> reqList = new ArrayList<>();
                for(int i = 0 ;i < size; i++){
                    XRequest req = queue.poll();
                    reqList.add(req);
                }

                System.out.println("批量处理的数据量为:"+size);
                //将list数据,处理成批量处理的参数,然后请求批量处理接口
                List<String> codeList = new ArrayList<>();
                for(XRequest request : reqList){
                    codeList.add(request.getOrderCode());
                }

                //根据批量参数  请求 批量处理方法
                List<StoreOrder> result = remoteService.getOrderListByCodeBatch(codeList);

                //将唯一标识,与相应结果进行对应
                Map<String,StoreOrder> bindData=new HashMap<>();
                for (StoreOrder storeOrder : result){
                    String code =storeOrder.getOrderCode();
                    bindData.put(code,storeOrder);
                }
                //通过XRequest 下发至对应线程
                for(XRequest request : reqList){
                    StoreOrder storeOrder=bindData.get(request.getOrderCode());
                    CompletableFuture<StoreOrder> future = request.getFuture();
                    future.complete(storeOrder);
                }
            }
        },0,10, TimeUnit.MILLISECONDS);
    }
}
相关说明

1、LinkedBlockingDeque 队列,用来存储请求
2、 @PostConstruct
在被声明为@Service或者@Controller的类中,里面被声明为@PostConstruct的方法,在系统初始化时,会被执行。
3、CompletableFuture

既然CompletableFuture类实现了CompletionStage接口,首先我们需要理解这个接口的契约。它代表了一个特定的计算的阶段,可以同步或者异步的被完成。你可以把它看成一个计算流水线上的一个单元,最终会产生一个最终结果,这意味着几个CompletionStage可以串联起来,一个完成的阶段可以触发下一阶段的执行,接着触发下一次,接着……
除了实现CompletionStage接口, CompletableFuture也实现了future接口, 代表一个未完成的异步事件。CompletableFuture提供了方法,能够显式地完成这个future,所以它叫CompletableFuture

流程说明

1、一个请求进入方法,会同时生成CompletableFuture对象。将此对象和方法的参数,加入请求队列
2、系统在初识时生产了一个定时任务,每隔10ms执行一次。
3、定时任务获取到当前全部的请求队列,依次pop出所有元素封装为list。并将元素包装为批量请求的参数,向远程方法进行请求。
4、获取到请求结果,遍历请求的list,获取到orderCode与请求结果进行匹配,如果匹配上,在获取请求list中的CompletableFuture,通过CompletableFuture通知回对应线程,完成消息请求

四、总结

此方法,特别适合处理同一时间段,大批量的网络数据请求的情况。
需要理解并使用CompletableFuture、ScheduledExecutorService

五、源码

文章中引用部分源于其他博客。由于丢失出处,并未贴出引用地址。
https://gitee.com/xyp_YF/HConcurrenceDemo