自己手写一个简易版本的线程池【Java】【详细注解,含思考过程和知识点】

news/2024/5/20 6:11:31 标签: java, jvm, 开发语言, juc, 线程池
java">@Slf4j
public class Test15 {

    public static void main(String[] args) {
        ThreadPool pool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (taskQueue, task) -> {
            // 这里实现自己的拒绝策略

            // 死等
//            taskQueue.put(task);

            // 带超时时间的阻塞添加
            taskQueue.offer(task, 1000, TimeUnit.MILLISECONDS);
        });

        for (int i = 0; i < 3; i++) {
            int j = i;
            pool.execute(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("执行任务:{}", j);
            });
        }

    }
}

@Slf4j
// 自己手写一个简易的线程池
class ThreadPool {
    /**
     * 什么是线程池呢?
     * 顾名思义,线程池就是一个池子里面,有许多的线程。
     * 线程池是与工作队列密切相关的,其中在工作对垒中保存了所有等待执行的任务。
     * 线程池里的工作线程(Worker)的任务很简单:
     * 就是从工作队列中获取一个任务,然后执行,执行完之后,返回线程池,等待下一个任务。
     * <p>
     * 线程池有啥好处呢?
     * 可以重复使用线程池中的线程。避免大量的线程创建和销毁(开销巨大的操作)。从而提升效率。
     * 通过适当调整线程池的大小,可以充分的利用 cpu ,让 cpu 一直处于忙碌的状态,不让他闲下来。
     * 同时可以避免过多的线程,相互竞争,导致大量的上下文切换,影响效率。
     */

    // 存放任务的阻塞队列
    private BlockingQueue<Runnable> taskQueue;

    // 工作线程集合
    private Set<Worker> workers = new HashSet<>();

    // 核心线程数量
    private int coreSize;

    // 获取任务的超时时间
    private long timeout;

    private TimeUnit timeUnit;

    // 拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;


    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int capacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        taskQueue = new BlockingQueue<>(capacity);
        this.rejectPolicy = rejectPolicy;
    }

    // 让线程池来执行任务
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作线程的数量小于 coreSize,则直接新建线程来执行任务
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                log.debug("新增了 worker 线程{}", worker);
                workers.add(worker);
                worker.start();
            } else {
                // 如果工作线程的数量 >= coreSize,则将任务加入到任务队列中,让空闲的工作线程去获取执行
//                taskQueue.put(task);
                // 拒绝策略:【把具体的实现,交给调用者来自己写】
                // 1.死等
                // 2.带超时时间的等待
                // 3.让调用者放弃任务的执行
                // 4.让调用者抛出异常
                // 5.让调用者自己执行

                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        // 1.当task不为空时,直接执行任务
        // 2.当task为空时,去任务队列中去获取任务
        @Override
        public void run() {
//            while (task != null || (task = taskQueue.take()) != null) {
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
                try {
                    log.debug("执行任务 {} ", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            // 代码运行到这里,说明任务队列中也没有任务了。移除掉这个 worker 线程
            synchronized (workers) {
                log.debug("移除 worker 工作线程 {}", this);
                workers.remove(this);
            }
        }
    }
}

@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> taskQueue, T task);
}

@Slf4j
class BlockingQueue<T> {

    // 任务队列
    private Deque<T> queue = new ArrayDeque<>();

    // 锁,防止多个线程同时获取头部的任务时,发生并发安全问题,导致一个任务被重复执行多次;
    // 防止多个线程同时向尾部加入新的任务时,发生并发安全问题
    private ReentrantLock lock = new ReentrantLock();

    // 生产者的条件变量,当队列满的时候,生产者线程就到 fullWaitSet 休息
    private Condition fullWaitSet = lock.newCondition();

    // 消费者的条件变量,当队列空的时候,消费者线程就到 emptyWaitSet 休息
    private Condition emptyWaitSet = lock.newCondition();

    // 队列容量
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    // 加入任务
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                // 进入 fullWaitSet 休息室等待
                try {
                    log.debug("任务 {} 等待加入任务队列 ...", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("任务 {} 加入任务队列", task);
            queue.addLast(task);
            // 记得唤醒 emptyWaitSet 休息室中等待的线程
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capacity) {
                // 进入 fullWaitSet 休息室等待
                try {
                    log.debug("任务 {} 等待加入任务队列 ...", task);
                    if (nanos <= 0) {
                        return false;
                    }
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("任务 {} 加入任务队列", task);
            queue.addLast(task);
            // 记得唤醒 emptyWaitSet 休息室中等待的线程
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    // 一直死等的阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.size() == 0) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            // 记得唤醒 fullWaitSet 休息室中等待的线程
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间的阻塞获取
    public T poll(long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == 0) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    // 返回值是:还要等待的时间。
                    // 将返回值赋值给 nanos,这样被虚假唤醒之后,还要重头开始等待那么多的时间。
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T task = queue.removeFirst();
            // 记得唤醒 fullWaitSet 休息室中等待的线程
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    // 获取队列大小
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if (queue.size() == capacity) {
                rejectPolicy.reject(this, task);
            } else {
                // 任务队列还没满
                log.debug("加入任务队列 {}", task);
                queue.add(task);
                emptyWaitSet.signal();;
            }
        } finally {
            lock.unlock();
        }
    }
}

http://www.niftyadmin.cn/n/1326904.html

相关文章

XHR如何爬虫_程序员如何炼成 Python 爬虫“王者”?

作者 | 周萝卜责编 | 郭芮出品 | CSDN(ID&#xff1a;CSDNnews)本文章精选了五个爬虫实例&#xff0c;希望能够给想要入门 Python 爬虫的小伙伴儿们一些帮助。网易精选评价爬取首先来看一个网易精选网站的爬虫例子&#xff0c;可以爬取评价的商品很多&#xff0c;这里选择“iPh…

python网络爬虫_Python连载(一):网络爬虫基础及pythpon环境搭建

原标题&#xff1a;Python连载(一)&#xff1a;网络爬虫基础及pythpon环境搭建从今天开始&#xff0c;我们的Python连载正式开始啦&#xff5e;接下来我们会给大家分享Python网络爬虫的相关技术课程。一、我们先来了解下什么是网络爬虫&#xff1f;网络爬虫又被称为网页蜘蛛、网…

js数组获取index_浅浅谈vue.js

1、Vue的生命周期在Vue官方文档中生命周期函数有如下定义每个 Vue 实例在被创建时都要经过一系列的初始化过程——例如&#xff0c;需要设置数据监听、编译模板、将实例挂载到 DOM 并在数据变化时更新 DOM 等。同时在这个过程中也会运行一些叫做生命周期钩子的函数&#xff0c;…

iphone储存空间系统怎么清理_手机资讯:苹果iPhone内存不够用怎么办如何清理内存...

如今使用IT数码设备的小伙伴们是越来越多了&#xff0c;那么IT数码设备当中是有很多知识的&#xff0c;这些知识很多小伙伴一般都是不知道的&#xff0c;就好比最近就有很多小伙伴们想要知道苹果iPhone内存不够用怎么办如何清理内存&#xff0c;那么既然现在大家对于苹果iPhone…

win10怎么自动关机_【电脑】第31期分享:Win10如何解决svchost一直占用网速和内存?...

【电脑】第31期分享svchost.exe是一个属于微软Windows操作系统的系统程序&#xff0c;微软官方对它的解释是&#xff1a;svchost.exe 是从动态链接库(DLL)中运行的服务的通用主机进程名称。svchost不仅会占有电脑内存&#xff0c;还会抢你的网&#xff0c;今天给大家分享如何关…

单脉冲雷达的相干干扰的研究文章_薛定谔的猫终于有救了:Nature 研究首次观测到量子跃迁过程...

授权转自&#xff1a;公众号“科研圈” 耶鲁大学最新发表在 Nature上的一项研究表明&#xff0c;我们能够计算出某个时间量子跃迁发生的概率&#xff0c;从而预测“薛定谔的猫”的命运。实验首次捕捉到了跃迁中的量子系统&#xff0c;这意味着量子跃迁并非玻尔和海森堡所认为的…

PLSQL如何将千万数据快速插入到另一张表中_excel数据透视表:善用这些功能,提高工作效率!下篇...

编按&#xff1a;哈喽&#xff0c;大家好&#xff01;在上篇文章中&#xff0c;我们为大家分享了透视表的前5条妙用&#xff0c;分别是合并同类项、按条件汇总数据、统计非重复数据、排名、批量创建表格&#xff0c;不知道大家都还记得吗&#xff1f;那么今天我们书接上回&…

AD19无法生成PCB_入门的基本PCB布局设计步骤

准备是成功的基石&#xff0c;在PCB设计中也是如此。改进和增长将伴随经验&#xff0c;首先做好准备能够充分利用经验获得成功。为了帮助你做好准备&#xff0c;以下是一些基本的PCB布局设计步骤。从良好的原材料入手是您PCB布局设计的第一步无论打算执行什么任务&#xff0c;要…