欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java 注解实现一个可配置线程池的方法示例

程序员文章站 2024-02-21 23:59:16
前言 项目需要多线程执行一些task,为了方便各个服务的使用。特意封装了一个公共工具类,下面直接撸代码: poolconfig(线程池核心配置参数): /**...

前言

项目需要多线程执行一些task,为了方便各个服务的使用。特意封装了一个公共工具类,下面直接撸代码:

poolconfig(线程池核心配置参数):

/**
 * <h1>线程池核心配置(<b style="color:#cd0000">基本线程池数量、最大线程池数量、队列初始容量、线程连接保持活动秒数(默认60s)</b>)</h1>
 * 
 * <blockquote><code>
 * <table border="1px" style="border-color:gray;" width="100%"><tbody>
 * <tr><th style="color:green;text-align:left;">
 * 属性名称
 * </th><th style="color:green;text-align:left;">
 * 属性含义
 * </th></tr>
 * <tr><td>
 * queuecapacity
 * </td><td>
 * 基本线程池数量
 * </td></tr>
 * <tr><td>
 * count
 * </td><td>
 * 最大线程池数量
 * </td></tr>
 * <tr><td>
 * maxcount
 * </td><td>
 * 队列初始容量
 * </td></tr>
 * <tr><td>
 * alivesec
 * </td><td>
 * 线程连接保持活动秒数(默认60s)
 * </td></tr>
 * </tbody></table>
 * </code></blockquote>
 
 */
public class poolconfig {
 
 private int queuecapacity = 200;
 
 private int count = 0;
 
 private int maxcount = 0;
 
 private int alivesec;
 
 public int getqueuecapacity() {
 return queuecapacity;
 } 
 
 public void setqueuecapacity(int queuecapacity) {
 this.queuecapacity = queuecapacity;
 }
 
 public void setcount(int count) {
 this.count = count;
 }
 
 public void setmaxcount(int maxcount) {
 this.maxcount = maxcount;
 }
 
 public void setalivesec(int alivesec) {
 this.alivesec = alivesec;
 }
 
 public int getcount() {
 return count;
 }
 
 public int getmaxcount() {
 return maxcount;
 }
 
 public int getalivesec() {
 return alivesec;
 }
}

threadpoolconfig(线程池配置 yml配置项以thread开头):

import java.util.arraylist;
import java.util.hashmap;
import java.util.list;
import java.util.map;
 
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.stereotype.component;
 
/**
 * <h1>线程池配置(<b style="color:#cd0000">线程池核心配置、各个业务处理的任务数量</b>)</h1>
 * 
 * <blockquote><code>
 * <table border="1px" style="border-color:gray;" width="100%"><tbody>
 * <tr><th style="color:green;text-align:left;">
 * 属性名称
 * </th><th style="color:green;text-align:left;">
 * 属性含义
 * </th></tr>
 * <tr><td>
 * pool
 * </td><td>
 * 线程池核心配置
 * 【{@link poolconfig}】
 * </td></tr>
 * <tr><td>
 * count
 * </td><td>
 * 线程池各个业务任务初始的任务数
 * </td></tr>
 * </tbody></table>
 * </code></blockquote>
 
 */
@component
@configurationproperties(prefix="thread")
public class threadpoolconfig {
 
 private poolconfig pool = new poolconfig();
 
 map<string, integer> count = new hashmap<>();
 
 public poolconfig getpool() {
 return pool;
 }
 
 public void setpool(poolconfig pool) {
 this.pool = pool;
 }
 
 public map<string, integer> getcount() {
 return count;
 }
 
}

定义task注解,方便使用:

@target(elementtype.type)
@retention(retentionpolicy.runtime)
@documented
@component
public @interface excutortask {
 
 /**
 * the value may indicate a suggestion for a logical excutortask name,
 * to be turned into a spring bean in case of an autodetected excutortask .
 * @return the suggested excutortask name, if any
 */
 string value() default "";
 
}

通过反射获取使用task注解的任务集合:

public class beans {
 
 private static final char prefix = '.';
 
 public static concurrentmap<string, string> scanbeanclassnames(){
 concurrentmap<string, string> beanclassnames = new concurrenthashmap<>();
 classpathscanningcandidatecomponentprovider provider = new classpathscanningcandidatecomponentprovider(false); 
   provider.addincludefilter(new annotationtypefilter(excutortask.class));
   for(package pkg : package.getpackages()){
   string basepackage = pkg.getname();
     set<beandefinition> components = provider.findcandidatecomponents(basepackage); 
     for (beandefinition component : components) {
     string beanclassname = component.getbeanclassname();
     try {
    class<?> clazz = class.forname(component.getbeanclassname());
    boolean isannotationpresent = clazz.isannotationpresent(zimatask.class);
    if(isannotationpresent){
     zimatask task = clazz.getannotation(excutortask.class);
     string aliasname = task.value();
     if(aliasname != null && !"".equals(aliasname)){
     beanclassnames.put(aliasname, component.getbeanclassname());
     }
    }
    } catch (classnotfoundexception e) {
    e.printstacktrace();
    }
     beanclassnames.put(beanclassname.substring(beanclassname.lastindexof(prefix) + 1), component.getbeanclassname());
     }
   }
   return beanclassnames;
  } 
}

 线程执行类taskpool:

@component
public class taskpool {
 
 public threadpooltaskexecutor pooltaskexecutor;
 
 @autowired 
 private threadpoolconfig threadpoolconfig;
 
 @autowired 
 private applicationcontext context;
 
 private final integer max_pool_size = 2000;
 
 private poolconfig poolcfg;
 
 private map<string, integer> taskscount;
 
 private concurrentmap<string, string> beanclassnames;
 
 @postconstruct
  public void init() {
 
 beanclassnames = beans.scanbeanclassnames();
   
   pooltaskexecutor = new threadpooltaskexecutor();
   
   poolcfg = threadpoolconfig.getpool();
 
 taskscount = threadpoolconfig.getcount();
 
 int corepoolsize = poolcfg.getcount(), 
  maxpoolsize = poolcfg.getmaxcount(), 
  queuecapacity = poolcfg.getqueuecapacity(), 
  minpoolsize = 0, maxcount = (corepoolsize << 1);
 
 for(string taskname : taskscount.keyset()){
  minpoolsize += taskscount.get(taskname);
 }
 
 if(corepoolsize > 0){
  if(corepoolsize <= minpoolsize){
  corepoolsize = minpoolsize;
  }
 }else{
  corepoolsize = minpoolsize;
 }
 
 if(queuecapacity > 0){
  pooltaskexecutor.setqueuecapacity(queuecapacity);
 }
 
 if(corepoolsize > 0){
  if(max_pool_size < corepoolsize){
  corepoolsize = max_pool_size;
  }
  pooltaskexecutor.setcorepoolsize(corepoolsize);
 }
 
 if(maxpoolsize > 0){
  if(maxpoolsize <= maxcount){
  maxpoolsize = maxcount;
  }
  if(max_pool_size < maxpoolsize){
  maxpoolsize = max_pool_size;
  }
  pooltaskexecutor.setmaxpoolsize(maxpoolsize);
 }
 
 if(poolcfg.getalivesec() > 0){
  pooltaskexecutor.setkeepaliveseconds(poolcfg.getalivesec());
 }
 
 pooltaskexecutor.initialize();
  }
  
 public void execute(class<?>... clazz){
 int i = 0, len = taskscount.size();
 for(; i < len; i++){
  integer taskcount = taskscount.get(i);
  for(int t = 0; t < taskcount; t++){
  try{
   object taskobj = context.getbean(clazz[i]);
   if(taskobj != null){
   pooltaskexecutor.execute((runnable) taskobj);
   }
  }catch(exception ex){
   ex.printstacktrace();
  }
  }
 }
  }
  
 public void execute(string... args){
   int i = 0, len = taskscount.size();
 for(; i < len; i++){
  integer taskcount = taskscount.get(i);
  for(int t = 0; t < taskcount; t++){
  try{
   object taskobj = null;
   if(context.containsbean(args[i])){
   taskobj = context.getbean(args[i]);
   }else{
   if(beanclassnames.containskey(args[i].tolowercase())){
    class<?> clazz = class.forname(beanclassnames.get(args[i].tolowercase()));
    taskobj = context.getbean(clazz);
   }
   }
   if(taskobj != null){
   pooltaskexecutor.execute((runnable) taskobj);
   }
  }catch(exception ex){
   ex.printstacktrace();
  }
  }
 }
  }
 
 public void execute(){
 for(string taskname : taskscount.keyset()){
  integer taskcount = taskscount.get(taskname);
  for(int t = 0; t < taskcount; t++){
  try{
   object taskobj = null;
   if(context.containsbean(taskname)){
   taskobj = context.getbean(taskname);
   }else{
   if(beanclassnames.containskey(taskname)){
    class<?> clazz = class.forname(beanclassnames.get(taskname));
    taskobj = context.getbean(clazz);
   }
   }
   if(taskobj != null){
   pooltaskexecutor.execute((runnable) taskobj);
   }
  }catch(exception ex){
   ex.printstacktrace();
  }
  }
 }
  }
  
}

如何使用?(做事就要做全套 ^_^)

1.因为使用的springboot项目,需要在application.properties 或者 application.yml 添加

#配置执行的task线程数
thread.count.needexcutortask=4
#最大存活时间
thread.pool.alivesec=300000
#其他配置同理

2.将我们写的线程配置进行装载到我们的项目中

@configuration
public class taskmanager {
 
 @resource
 private taskpool taskpool;
 
 @postconstruct
 public void executor(){
 taskpool.execute();
 }
}

3.具体使用

@excutortask
public class needexcutortask implements runnable{
  @override
 public void run() {
    thread.sleep(1000l);
    log.info("====== 任务执行 =====")
  }
}

以上就是创建一个可扩展的线程池相关的配置(望指教~~~)。希望对大家的学习有所帮助,也希望大家多多支持。