欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java大数据量(多线程)分段分批处理

程序员文章站 2022-07-13 13:23:23
...

分段处理主类

github地址:
https://github.com/zuojingang/common-tools-intergrated/blob/master/src/main/java/pers/zuo/component/piecewise/PiecewiseHandler.java

package pers.zuo.component.piecewise;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import pers.zuo.component.piecewise.bean.PiecewiseKey;
import pers.zuo.component.piecewise.bean.PiecewiseResult;
import pers.zuo.component.piecewise.bean.PiecewiseTask;

/**
 * @author zuojingang
 *
 * @param <T>
 *            the type of part process return
 */
public abstract class PiecewiseHandler<V> {

    public void nThreads(
            final Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> nThreadResult,
            final int totalNum) throws Exception {
        nThreads(nThreadResult, totalNum, D_THREAD_SIZE, D_PART_SIZE);
    }

    /**
     * @param totalNum
     * @param threadSize
     * @return nThreads process result.
     */
    public void nThreads(
            final Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> nThreadResult,
            final int totalNum, final int threadSize, final int partSize) throws Exception {

        if (null == nThreadResult || 0 >= totalNum || 0 >= threadSize) {
            return;
        }

        ExecutorService fixThreadPool = Executors.newFixedThreadPool(D_N_THREAD);
        List<PiecewiseTask> fTaskList = new ArrayList<>();

        int fromIndex = 0;
        try {
            while (totalNum > fromIndex) {

                final int thisFromIndex = fromIndex;
                final int threadProcessNum = Math.min(totalNum - fromIndex, threadSize);
                final int thisToIndex = thisFromIndex + threadProcessNum;

                if (0 < threadProcessNum) {
                    PiecewiseTask futureTask = PiecewiseBuilder.buildTask(new Callable<Boolean>() {

                        @Override
                        public Boolean call() throws Exception {
                            final Map<PiecewiseKey, PiecewiseResult<V>> threadResult = PiecewiseBuilder
                                    .initializeThreadResult();
                            nThreadResult.put(PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex),
                                    PiecewiseBuilder.buildResult(threadResult));
                            singleThread(threadResult, thisFromIndex, threadProcessNum, partSize);
                            return true;
                        }
                    }, PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex));

                    fixThreadPool.submit(futureTask);
                    fTaskList.add(futureTask);
                }
                fromIndex += threadProcessNum;
            }

            boolean finished = true;
            for (PiecewiseTask futureTask : fTaskList) {
                try {
                    finished = finished && futureTask.get();
                } catch (InterruptedException | ExecutionException e) {
                    nThreadResult.get(futureTask.getTaskKey()).setException(e);
                }
            }
        } catch (Exception e) {
            throw e;
        } finally {
            // the threadPool must manual-lock after use
            fixThreadPool.shutdown();
        }
    }

    public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int totalNum) {
        singleThread(threadResult, 0, totalNum);
    }

    public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int offset,
            final int totalNum) {
        singleThread(threadResult, offset, totalNum, D_PART_SIZE);
    }

    /**
     * @param offset
     * @param toIndex
     * @param partSize
     * @return process subList values and include first index(offset) and exclude
     *         latest index(offset + totalNum)
     */
    public void singleThread(final Map<PiecewiseKey, PiecewiseResult<V>> threadResult, final int offset,
            final int totalNum, final int partSize) {
        if (0 >= totalNum || 0 >= partSize) {
            return;
        }
        final int toIndex = offset + totalNum;

        int fromIndex = offset;
        while (toIndex > fromIndex) {

            int thisToIndex = Math.min(fromIndex + partSize, toIndex);

            V partResult = null;
            Exception pe = null;
            try {
                partResult = partProcess(fromIndex, thisToIndex);
            } catch (Exception e) {
                pe = e;
            }
            threadResult.put(PiecewiseBuilder.buildKey(fromIndex, thisToIndex),
                    PiecewiseBuilder.buildResult(partResult, pe));

            fromIndex = thisToIndex;
        }
    }

    /**
     * @param offset
     * @param partSize
     * @return part process result
     */
    protected abstract V partProcess(final int fromIndex, final int toIndex) throws Exception;

    public static final int D_N_THREAD = 10;
    public static final int D_THREAD_SIZE = 10000;
    public static final int D_PART_SIZE = 1000;
}

分段任务定制类

package pers.zuo.component.piecewise.bean;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

/**
 * @author zuojingang
 *
 * @param <K
 *            extends Number> the type of part process return
 */
public class PiecewiseTask extends FutureTask<Boolean> {

    private final PiecewiseKey taskKey;

    public PiecewiseTask(Callable<Boolean> callable, PiecewiseKey taskKey) {
        super(callable);
        this.taskKey = taskKey;
    }

    public PiecewiseKey getTaskKey() {
        return taskKey;
    }

}

分段任务Key值类

package pers.zuo.component.piecewise.bean;

public class PiecewiseKey {

    private final Integer from;
    private final Integer to;

    public PiecewiseKey(Integer from, Integer to) {
        super();
        this.from = from;
        this.to = to;
    }

    public Integer getFrom() {
        return from;
    }

    public Integer getTo() {
        return to;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((from == null) ? 0 : from.hashCode());
        result = prime * result + ((to == null) ? 0 : to.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PiecewiseKey other = (PiecewiseKey) obj;
        if (from == null) {
            if (other.from != null)
                return false;
        } else if (!from.equals(other.from))
            return false;
        if (to == null) {
            if (other.to != null)
                return false;
        } else if (!to.equals(other.to))
            return false;
        return true;
    }
}

分段任务返回值类

package pers.zuo.component.piecewise.bean;

public class PiecewiseResult<V> {

    private final V val;
    private Exception exception;

    public PiecewiseResult(V val) {
        super();
        this.val = val;
    }

    public PiecewiseResult(V val, Exception exception) {
        super();
        this.val = val;
        this.exception = exception;
    }

    public Exception getException() {
        return exception;
    }

    public void setException(Exception exception) {
        this.exception = exception;
    }

    public V getVal() {
        return val;
    }
}

获取实例工具类

package pers.zuo.component.piecewise.manager;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

import pers.zuo.component.piecewise.bean.PiecewiseKey;
import pers.zuo.component.piecewise.bean.PiecewiseResult;
import pers.zuo.component.piecewise.bean.PiecewiseTask;

public class PiecewiseBuilder {

    public static PiecewiseKey buildKey(Integer from, Integer to) {
        return new PiecewiseKey(from, to);
    }

    public static <V> PiecewiseResult<V> buildResult(V val) {
        return new PiecewiseResult<V>(val);
    }

    public static <V> PiecewiseResult<V> buildResult(V val, Exception exception) {
        return new PiecewiseResult<V>(val, exception);
    }

    public static PiecewiseTask buildTask(Callable<Boolean> callable, PiecewiseKey taskKey) {
        return new PiecewiseTask(callable, taskKey);
    }

    /**
     * this method aimed for simple when define the nThreadResult
     * 
     * @return
     */
    public static <V> Map<PiecewiseKey, PiecewiseResult<Map<PiecewiseKey, PiecewiseResult<V>>>> initializeNThreadResult() {
        return new HashMap<>();
    }

    /**
     * this method aimed for simple when define the threadResult
     * 
     * @return
     */
    public static <V> Map<PiecewiseKey, PiecewiseResult<V>> initializeThreadResult() {
        return new HashMap<>();
    }
}
相关标签: Java大数据量处理