这两个月来因为工作和家庭的事情,导致一直都很忙,没有多少时间去汲取养分,也就没有什么产出,最近稍微轻松了一点,后续的【进阶之路】会慢慢回到正轨。

开门见山的说,第一次接触到多线程处理同一个任务,是使用IO多线程下载文件,之后也一直没有再处理这一块的任务,直到前几天有同事问我,为什么多线程处理一个list集合会出现各种bug,以及如何使用多线程的方式处理同一个list集合。

第一、为什么会出现类似于重复处理某一个模块的问题?

我们都知道,在Java中,每个线程都有自己独立的工作内存,线程对共享变量的所有操作都必须在自己的工作内存中进行,不能直接从主内存中读写。

如果线程1的修改内容想被线程2得到,那么线程1工作内存中修改后的共享变量需要先刷新到主内存中,再把主内存中更新过的共享变量更新到工作内存2中。

image.png

这个时候一般我们是考虑使用java中各种同步化的方法,首先,因为是需要高效处理list集合,所以可以排除synchronized方法,于是我想到了使用CompletionService操作异步任务。

大家可以在这篇文章看到具体的详解:
【进阶之路】线程池拓展与CompletionService操作异步任务

一、CompletionService

首先,按照之前文章的方法自定义一个WeedThreadPool

public class WeedThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime =new ThreadLocal<>();
    private final Logger log =Logger.getLogger("WeedThreadPool");
    //统计执行次数
    private final AtomicLong numTasks =new AtomicLong();
    //统计总执行时间
    private final AtomicLong totalTime =new AtomicLong();
    /**
     * 这里是实现线程池的构造方法,我随便选了一个,大家可以根据自己的需求找到合适的构造方法
     */
    public WeedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
}

然后就是实现线程池处理list集合的方法

public class WeedExecutorServiceDemo {
    BlockingQueue<Runnable> taskQueue;
    final static WeedThreadPool weedThreadPool = new WeedThreadPool(3, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
    // 开始时间

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //记录任务开始时间
        long start = System.currentTimeMillis();
        CompletionService<List<Integer>> cs = new ExecutorCompletionService<>(weedThreadPool);
        int tb=1;
        //生成集合
        List<List<Integer>> list1 =new ArrayList();
        for (int i = 0; i < 10; i++) {
            List<Integer> list =new ArrayList();
            //随机生成任务处理
            int hb=tb;
            tb =tb*2;
            int finalTb = tb;
            cs.submit(new Callable<List<Integer>>(){

                @Override
                public List<Integer> call() throws Exception {
                    for (int j = hb; j< finalTb; j++){
                        list.add(j);
                    }
                    System.out.println(Thread.currentThread().getName()+"["+list+"]");

                    return list;
                }
            });
        }
        //注意在处理完毕后结束任务
        weedThreadPool.shutdown();
        for (int i = 0; i < 10; i++) {
            Future<List<Integer>> future = cs.take();
            if (future != null) {
                list1.add(future.get());
                System.out.println(future.get());
            }
        }
        System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
        System.out.println("結果["+list1.size()+"]==="+list1);
    }
}

处理结果:

image.png

从结果上来看,还是比较美好的,通过CompletionService能够比较快速地分段处理任务,我之前也有提过,合理的线程池大小设计有助于提高任务的处理效率,网上通用的设置方法一般是这样的:

最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

进而得出

最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

二、ForkJoinPool

当然,除了使用CompletionService之外,也可以使用ForkJoinPool来设计一个处理方法。

image.png

ForkJoinPool和ThreadPoolExecutor都是继承自AbstractExecutorService抽象类,所以它和ThreadPoolExecutor的使用几乎没有多少区别。其核心思想是将大的任务拆分成多个小任务,然后在将多个小任务处理汇总到一个结果上。

ForkJoinPool框架通过初始化ForkJoinTask来执行任务,并提供了以下两个子类:

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask :用于有返回结果的任务。

我们实现的过程中可以使用RecursiveTask方法来分段处理list集合。

public class RecursiveTaskDemo {

    private static final ExecutorService executor = new ThreadPoolExecutor(2, 3, 10, TimeUnit.SECONDS, new LinkedBlockingQueue(10));
    private static final int totalRow = 53000;
    private static final int splitRow = 10000;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        //先循环生成待待处理集合
        List<Integer> list = new ArrayList<>(totalRow);
        for (int i = 0; i < totalRow; i++) {
            list.add(i);
        }
        //计算出需要创建的任务数
        int loopNum = (int)Math.ceil((double)totalRow/splitRow);
        ForkJoinPool pool = new ForkJoinPool(loopNum);
        ForkJoinTask<List> submit = pool.submit(new MyTask(list, 0, list.size()));

        List<List<Integer>>list1=new ArrayList<>();
        list1.add(submit.get());
        System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
        System.out.println("結果["+list1.size()+"]==="+list1);
    }
    //继承RecursiveTask
    static class MyTask extends RecursiveTask<List> {
        private List<Integer> list;
        private int startRow;
        private int endRow;

        public MyTask(List<Integer> list, int startRow, int endRow) {
            this.list = list;
            this.startRow = startRow;
            this.endRow = endRow;
        }

        /**
         * 递归处理数据,计算
         * @return
         */
        @Override
        protected List compute() {
            if (endRow - startRow <= splitRow) {
                List<Integer> ret = new ArrayList<>();
                for (int i = startRow; i < endRow; i++) {
                    //递归处理数据
                    ret.add(list.get(i));
                }
                System.out.println(Thread.currentThread().getName()+"["+ret+"]");
                return ret;
            }
            int loopNum = (int)Math.ceil((double)totalRow/splitRow);
            int startRow = 0;
            List<MyTask> myTaskList = new ArrayList<>();
            for (int i = 0; i < loopNum; i++) {
                if (startRow > totalRow) {
                    break;
                }
                int endRow = Math.min(startRow + splitRow, totalRow);
                System.out.println(String.format("startRow:%s, endRow:%s", startRow, endRow));
                myTaskList.add(new MyTask(list, startRow, endRow));
                startRow += splitRow;
            }
            //调用不同线程上独立执行的任务
            invokeAll(myTaskList);
            List<Integer> ret = new ArrayList<>();
            //归并
            for (MyTask myTask : myTaskList) {
                ret.addAll(myTask.join());
            }
            return ret;
        }
    }
}

处理结果:

image.png

通过上文展示的方法,大家可以在不加锁的方式来增加任务处理的效率,遇到类似于爬虫数据处理、数据迁移等场景都可以采用,实测效果还不错。当然,根据处理结果来分析,CompletionService的效率大概更高一些~。

大家好,我是练习java两年半时间的南橘,下面是我的微信,需要之前的导图或者想互相交流经验的小伙伴可以一起互相交流哦。

版权声明:本文为nanjuryc原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/nanjuryc/p/14850513.html