JUC/多线程的基本使用(一)

news/2024/5/20 6:11:33 标签: Java, JUC

一、基本使用

Thread、Runnable、FutureTask

Java多线程-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/m0_71534259/article/details/132381495?spm=1001.2014.3001.5501

二、查看进程线程的方法

windows

任务管理器可以查看进程和线程数,也可以用来杀死进程
tasklist 查看进程
taskkill 杀死进程

linux

ps - fe 查看所有进程
ps - fT - p <PID> 查看某个进程( PID )的所有线程
kill 杀死进程
top 按大写 H 切换是否显示线程
top - H - p <PID> 查看某个进程( PID )的所有线程

Java

jps 命令查看所有 Java 进程
jstack <PID> 查看某个 Java 进程( PID )的所有线程状态
jconsole 来查看某个 Java 进程中线程的运行情况(图形界面)

jconsole 远程监控配置

需要以如下方式运行你的 java
java -Djava.rmi.server.hostname=ip地址 -Dcom.sun.management.jmxremote -
Dcom.sun.management.jmxremote.port=连接端口 -Dcom.sun.management.jmxremote.ssl=是否安全连接 -
Dcom.sun.management.jmxremote.authenticate=是否认证 java类
修改 /etc/hosts 文件将 127.0.0.1 映射至主机名
如果要认证访问,还需要做如下步骤
复制 jmxremote.password 文件
修改 jmxremote.password jmxremote.access 文件的权限为 600 即文件所有者可读写
连接时填入 controlRole (用户名), R&D (密码)

三、线程的常见方法

(一)、run与start

直接调用 run 是在主线程中执行了 run ,没有启动新的线程
使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码

(二)、sleep与yield

sleep
1. 调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)
2. 其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
3. 睡眠结束后的线程未必会立刻得到执行
4. 建议用 TimeUnit sleep 代替 Thread sleep 来获得更好的可读性
yield
1. 调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程
2. 具体的实现依赖于操作系统的任务调度器

(三)、interrupt 方法详解

1、打断 sleepwaitjoin 的线程

这几个方法都会让线程进入阻塞状态
打断 这些 线程,会抛出InterruptedException异常,会清空打断状态( isInterrupted = false ),同时并不会退出当前线程,只是打断了当前状态(如打断睡眠状态。)

2、打断正常运行的线程

打断正常运行的线程 , 不会清空打断状态
private static void test2() throws InterruptedException {
 Thread t2 = new Thread(()->{
 while(true) {
     Thread current = Thread.currentThread();
     boolean interrupted = current.isInterrupted();
     if(interrupted) {
         log.debug(" 打断状态: {}", interrupted);
         break;
     }
 }
 }, "t2");
 t2.start();
 sleep(0.5);
 t2.interrupt();
}

正常运行的线程被interrupt()后,并不是直接终止线程,而是把打断标记置为true,然后由编程者确定是否打断,isInterrupted 也不会重置,isInterrupted = true

3、两阶段终止(优雅的终止线程)

package com.itcast.test;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.Test5")
public class Test5 {
    public static void main(String[] args) throws InterruptedException {

        TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();

        twoPhaseTermination.start();

        Thread.sleep(3500);

        twoPhaseTermination.stop();
    }
}

@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination{
    private Thread monitor;

    public void start(){
        monitor = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    Thread current = Thread.currentThread();
                    if (current.isInterrupted()){
                        log.debug("进程结束之前进行的工作(料理后事)");
                        break;
                    }
                    try {
                        Thread.sleep(1000);  //睡眠过程中被打断,抛出异常,isInterrupted重置为false
                        log.debug("执行业务的相关功能"); //此时被打断,直接在下次循环中执行打断程序
                    } catch (InterruptedException e){
                        log.debug("在睡眠时被打断");
                        current.interrupt(); //将打断标记重新置为true,下次循环时执行结束程序
                    }
                }
            }
        });
        
        monitor.start();
    }

    public void stop(){
        monitor.interrupt();
    }
}

4、打断 park 线程

打断 park 线程 , 不会清空打断状态
private static void test3() throws InterruptedException {
 Thread t1 = new Thread(() -> {
 log.debug("park...");
 LockSupport.park();
 log.debug("unpark...");
 log.debug("打断状态:{}", Thread.currentThread().isInterrupted());
 }, "t1");
 t1.start();
 sleep(0.5);
 t1.interrupt();
}
如果打断标记已经是 true, park 会失效
可以使用 Thread.interrupted() 清除打断状态
private static void test4() {
 Thread t1 = new Thread(() -> {
 for (int i = 0; i < 5; i++) {
 log.debug("park...");
 LockSupport.park();
 log.debug("打断状态:{}", Thread.currentThread().isInterrupted());
 }
 });
 t1.start();
 sleep(1);
 t1.interrupt();
}

(四)、主线程与守护线程

默认情况下, Java 进程需要等待所有线程都运行结束,才会结束。有一种特殊的线程叫做守护线程,只要其它非守 护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。
// 设置该线程为守护线程
t1.setDaemon(true);
垃圾回收器线程就是一种守护线程
Tomcat 中的 Acceptor Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等
待它们处理完当前请求

四、线程的状态

(一)、五种状态

(二)、六种状态

这是从 Java API 层面来描述的
根据 Thread.State 枚举,分为六种状态
NEW 线程刚被创建,但是还没有调用 start() 方法
RUNNABLE 当调用了 start() 方法之后,注意, Java API 层面的 RUNNABLE 状态涵盖了 操作系统 层面的 【可运行状态】、【运行状态】和【阻塞状态】(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为 是可运行)
BLOCKED(例如因为得不到锁导致的阻塞)  WAITING(例如join(),一直等待造成的阻塞)  TIMED_WAITING(例如sleep()时的阻塞状态)  都是 Java API 层面对【阻塞状态】的细分,后面会在状态转换一节 详述
TERMINATED 当线程代码运行结束

五、共享模型之无锁

(一)、CAS volatile

class AccountSafe {
    private AtomicInteger balance;

    public AccountSafe(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }
    
    public Integer getBalance() {
        return balance.get();
    }
    
    public void withdraw(Integer amount) {
        while (true) {
            int prev = balance.get();
            int next = prev - amount;
            if (balance.compareAndSet(prev, next)) {
                break;
            }
        }
        // 可以简化为下面的方法
        // balance.addAndGet(-1 * amount);
    }
}
获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
注意
volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原
子性)
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

(二)、原子整数

J.U.C 并发包提供了:
  • AtomicBoolean
  • AtomicInteger
  • AtomicLong
AtomicInteger 为例
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

(三)、原子引用(更改引用本身)

为什么需要原子引用类型?
  • AtomicReference
  • AtomicMarkableReference
  • AtomicStampedReference
有如下方法

1、AtomicReference

public interface DecimalAccount {
    // 获取余额
    BigDecimal getBalance();
    // 取款
    void withdraw(BigDecimal amount);
    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(DecimalAccount account) {
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(BigDecimal.TEN);
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }
}
class DecimalAccountSafeCas implements DecimalAccount {
    AtomicReference<BigDecimal> ref;

    public DecimalAccountSafeCas(BigDecimal balance) {
        ref = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return ref.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while (true) {
            BigDecimal prev = ref.get();
            BigDecimal next = prev.subtract(amount);
            if (ref.compareAndSet(prev, next)) {
                break;
            }
        }
    }
}

2、AtomicStampedReference

ABA问题

A->B->A 在比对时最终仍然是A,仍然可以进行

ABA问题的解决

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // 获取值 A
    String prev = ref.getReference();
    // 获取版本号
    int stamp = ref.getStamp();
    log.debug("版本 {}", stamp);
    // 如果中间有其它线程干扰,发生了 ABA 现象
    other();
    sleep(1);
    // 尝试改为 C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
private static void other() {
    new Thread(() -> {
        log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
                ref.getStamp(), ref.getStamp() + 1));
        log.debug("更新版本为 {}", ref.getStamp());
    }, "t1").start();
    sleep(0.5);
    new Thread(() -> {
        log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
                ref.getStamp(), ref.getStamp() + 1));
        log.debug("更新版本为 {}", ref.getStamp());
    }, "t2").start();
}

3、AtomicMarkableReference

简化版的AtomicStampedReference


public class TestABAAtomicMarkableReference {
    public static void main(String[] args) throws InterruptedException {
        GarbageBag bag = new GarbageBag("装满了垃圾");
        // 参数2 mark 可以看作一个标记,表示垃圾袋满了
        AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
        log.debug("主线程 start...");
        GarbageBag prev = ref.getReference();
        log.debug(prev.toString());
        new Thread(() -> {
            log.debug("打扫卫生的线程 start...");
            bag.setDesc("空垃圾袋");
            while (!ref.compareAndSet(bag, bag, true, false)) {}
            log.debug(bag.toString());
        }).start();
        Thread.sleep(1000);
        log.debug("主线程想换一只新垃圾袋?");
        boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
        log.debug("换了么?" + success);
        log.debug(ref.getReference().toString());
    }
}

(四)、原子数组(改变引用所指向的内容)

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

(五)、字段更新器

  • AtomicReferenceFieldUpdater // 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域( Field )进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
public class Test5 {
    private volatile int field;

    public static void main(String[] args) {
        AtomicIntegerFieldUpdater fieldUpAtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
        Test5 test5 = new Test5();
        fieldUpdater.compareAndSet(test5, 0, 10);
        // 修改成功 field = 10
        System.out.println(test5.field);
        // 修改成功 field = 20
        fieldUpdater.compareAndSet(test5, 10, 20);
        System.out.println(test5.field);
        // 修改失败 field = 20
        fieldUpdater.compareAndSet(test5, 10, 30);
        System.out.println(test5.field);
    }
}

(六)、原子累加器

LongAdder

(七)、Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法, Unsafe 对象不能直接调用,只能通过反射获得
public class UnsafeAccessor {
    static Unsafe unsafe;
    static {
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new Error(e);
        }
    }
    static Unsafe getUnsafe() {
        return unsafe;
    }
}
Unsafe CAS 操作
@Data
class Student {
    volatile int id;
    volatile String name;
}
Unsafe unsafe = UnsafeAccessor.getUnsafe();
Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");
// 获得成员变量的偏移量
long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);
long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);

Student student = new Student();
// 使用 cas 方法替换成员变量的值
UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true

System.out.println(student);

六、共享模型之不可变

(一)、不可变日期类

如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
    new Thread(() -> {
    LocalDate date = dtf.parse("2018-10-01", LocalDate::from);
    log.debug("{}", date);
    }).start();
    }

七、线程池

(一)、自定义线程池

1、自定义拒绝策略接口

当等待队列满了的时候,采取什么拒绝策略

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

2、自定义任务队列

@FunctionalInterface // 任务队列已满时再向其中添加任务的拒绝策略
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

@Slf4j
class BlockingQueue<T> {
    // 1. 任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();
    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    // 5. 容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时阻塞获取(在任务队列中取出一个任务,如果队列为空等待超出时间则返回null)
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    // 返回值是剩余时间
                    if (nanos <= 0) {
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取(在任务队列中取出一个任务,如果队列为空则一直等待)
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }

    // 阻塞添加(向队列中添加一个任务,如果队列已满则一直等待)
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capcity) {
                try {
                    log.debug("等待加入任务队列 {} ...", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 带超时时间阻塞添加(向队列中添加一个任务,添加成功返回true,如果队列已满则等待一段时间,然后返回false)
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while (queue.size() == capcity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    log.debug("等待加入任务队列 {} ...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    //返回当前任务队列的大小
    public int size() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    //向任务队列添加一个任务
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否满
            if (queue.size() == capcity) { //任务队列已满
                //制定任务队列已满时的拒绝策略
                //具体的拒绝策略由调用者指定,封装在RejectPolicy接口中,由用户实现拒绝策略
                //在rejectPolicy的reject方法中,用户可以调用上面的put()或offer()或其他方法。
                rejectPolicy.reject(this, task);
            } else { // 有空闲
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}

3、自定义线程池

@Slf4j
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;
    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();
    // 核心线程数
    private int coreSize;
    // 获取任务时的超时时间
    private long timeout;
    //时间单位
    private TimeUnit timeUnit;
    //创建线程池时实现的拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
                      RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    // 执行任务
    public void execute(Runnable task) {
        // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
        // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if (workers.size() < coreSize) {  //任务的数量小于核心线程数
                Worker worker = new Worker(task);
                log.debug("新增 worker{}, {}", worker, task);
                workers.add(worker);
                //直接开启线程执行任务
                worker.start();
            } else {  //任务数大于核心线程数
                //向任务队列中添加任务
                
                // taskQueue.put(task);
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task); //主线程调用时在rejectPolicy中实现拒绝策略
            }
        }
    }

    

    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当 task 不为空,直接执行任务
            // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
            // while(task != null || (task = taskQueue.take()) != null) {  //当任务队列为空时,当前线程一直等待、空转
            while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {  //当任务队列为空时,等待一段时间,超过时间后当前线程结束
                try {
                    log.debug("正在执行...{}", task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }
}

4、测试

public static void main(String[] args) {
    ThreadPool threadPool = new ThreadPool(1,
            1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ //创建线程池时自定义拒绝策略
        // 1. 死等
        // queue.put(task);
        // 2) 带超时等待
        // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
        // 3) 让调用者放弃任务执行
        // log.debug("放弃{}", task);
        // 4) 让调用者抛出异常
        // throw new RuntimeException("任务执行失败 " + task);
        // 5) 让调用者自己执行任务
        task.run();
    });
    for (int i = 0; i < 4; i++) {
        int j = i;
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", j);
            }
        });
    }
}

(二)、ThreadPoolExecutor

1、线程池状态

2、构造方法

public ThreadPoolExecutor( int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略

工作方式:

3、newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

4、newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

5、newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>()));
}

6、提交任务

// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
        throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

7、关闭线程池

(1)、shutdown
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完(包括正在运行中的任务和在任务队列中的任务)
- 此方法不会阻塞调用线程的执行
*/
void shutdown();

(2)、shutdownNow

/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();

8、任务调度线程池

(1)、Timer
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能, Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
public static void main(String[] args) {
    Timer timer = new Timer();
    TimerTask task1 = new TimerTask() {
        @Override
        public void run() {
            log.debug("task 1");
            sleep(2);
        }
    };
    TimerTask task2 = new TimerTask() {
        @Override
        public void run() {
            log.debug("task 2");
        }
    };
    // 使用 timer 添加两个任务,希望它们都在 1s 后执行
    // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
    timer.schedule(task1, 1000);
    timer.schedule(task2, 1000);
}

任务1执行时间过长会影响任务二的执行
(2)、ScheduledExecutorService

延迟执行

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
    System.out.println("任务1,执行时间:" + new Date());
    try { Thread.sleep(2000); } catch (InterruptedException e) { }
    }, 1000, TimeUnit.MILLISECONDS);
    executor.schedule(() -> {
    System.out.println("任务2,执行时间:" + new Date());
    }, 1000, TimeUnit.MILLISECONDS);

(3)、scheduleAtFixedRate

按固定时间执行

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
    log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);

输出

(4)、scheduleWithFixedDelay
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(()-> {
    log.debug("running...");
    sleep(2);
}, 1, 1, TimeUnit.SECONDS);

9、正确处理执行任务异常

方法 1 :主动捉异常

10、定时执行应用

如何让每周四 18:00:00 定时执行任务?
// 获得当前时间
LocalDateTime now = LocalDateTime.now();
// 获取本周四 18:00:00.000
LocalDateTime thursday =
    now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
// 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
if(now.compareTo(thursday)>=0){
thursday=thursday.plusWeeks(1);
}
// 计算时间差,即延时执行时间
long initialDelay=Duration.between(now,thursday).toMillis();
// 计算间隔时间,即 1 周的毫秒值
long oneWeek=7*24*3600*1000;
ScheduledExecutorService executor=Executors.newScheduledThreadPool(2);
System.out.println("开始时间:"+new Date());
executor.scheduleAtFixedRate(()->{
System.out.println("执行时间:"+new Date());
},initialDelay,oneWeek,TimeUnit.MILLISECONDS);

(三)、Fork/Join

class AddTask3 extends RecursiveTask<Integer> {

    int begin;
    int end;

    public AddTask3(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    public String toString() {
        return "{" + begin + "," + end + '}';
    }

    @Override
    protected Integer compute() {
        // 5, 5
        if (begin == end) {
            log.debug("join() {}", begin);
            return begin;
        }
        // 4, 5
        if (end - begin == 1) {
            log.debug("join() {} + {} = {}", begin, end, end + begin);
            return end + begin;
        }

        // 1 5
        int mid = (end + begin) / 2; // 3
        AddTask3 t1 = new AddTask3(begin, mid); // 1,3
        t1.fork();
        AddTask3 t2 = new AddTask3(mid + 1, end); // 4,5
        t2.fork();
        log.debug("fork() {} + {} = ?", t1, t2);
        int result = t1.join() + t2.join();
        log.debug("join() {} + {} = {}", t1, t2, result);
        return result;
    }
}

八、读写锁

ReentrantReadWriteLock

StampedLock

九、Semaphore

信号量,用来限制能同时访问共享资源的线程上限

(一)、基本使用

public static void main(String[] args) {
    // 1. 创建 semaphore 对象
    Semaphore semaphore = new Semaphore(3);
    // 2. 10个线程同时运行
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            // 3. 获取许可
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                log.debug("running...");
                sleep(1);
                log.debug("end...");
            } finally {
                // 4. 释放许可
                semaphore.release();
            }
        }).start();
    }
}

(二)、应用

实现连接池
@Slf4j(topic = "c.Pool")
class Pool {
    // 1. 连接池大小
    private final int poolSize;
    // 2. 连接对象数组
    private Connection[] connections;
    // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    private AtomicIntegerArray states;
    private Semaphore semaphore;

    // 4. 构造方法初始化
    public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("连接" + (i + 1));
        }
    }

    // 5. 借连接
    public Connection borrow() {// t1, t2, t3
        // 获取许可
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if (states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    log.debug("borrow {}", connections[i]);
                    return connections[i];
                }
            }
        }
        // 不会执行到这里
        return null;
    }

    // 6. 归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                log.debug("free {}", conn);
                semaphore.release();
                break;
            }
        }
    }
}

十、CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值, await() 用来等待计数归零, countDown() 用来让计数减一
public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(3);
    ExecutorService service = Executors.newFixedThreadPool(4);
    service.submit(() -> {
        log.debug("begin...");
        sleep(1);
        latch.countDown();
        log.debug("end...{}", latch.getCount());
    });
    service.submit(() -> {
        log.debug("begin...");
        sleep(1.5);
        latch.countDown();
        log.debug("end...{}", latch.getCount());
    });
    service.submit(() -> {
        log.debug("begin...");
        sleep(2);
        latch.countDown();
        log.debug("end...{}", latch.getCount());
    });
    service.submit(()->{
        try {
            log.debug("waiting...");
            latch.await();
            log.debug("wait end...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

十一、CyclicBarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执
行到某个需要 同步 的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
new Thread(()->{
    System.out.println("线程1开始.."+new Date());
    try {
    cb.await(); // 当个数不足时,等待
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    System.out.println("线程1继续向下运行..."+new Date());
    }).start();
    new Thread(()->{
    System.out.println("线程2开始.."+new Date());
    try { Thread.sleep(2000); } catch (InterruptedException e) { }
    try {
    cb.await(); // 2 秒后,线程个数够2,继续运行
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    System.out.println("线程2继续向下运行..."+new Date());
    }).start();

当数量减为0之后,在调用await方法,仍然会从开始设置的值计数。


http://www.niftyadmin.cn/n/5458123.html

相关文章

如何查看java安装路径

在不同的操作系统中&#xff0c;查看Java安装路径的方法可能会有所不同。以下是几种常见操作系统中查看Java安装路径的方法&#xff1a; 1.Windows: 打开命令提示符&#xff08;cmd&#xff09;&#xff0c;然后输入以下命令&#xff1a;for %i in (java.exe) do echo %~$PATH:…

外包干了8天,技术退步明显.......

先说一下自己的情况&#xff0c;大专生&#xff0c;19年通过校招进入杭州某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落! 而我已经在一个企业干了四年的功能测…

UE4 quest3平台网络不稳定问题排查及解决

目前接触项目是实时竞技项目&#xff0c;即时性要求较高&#xff0c;因为是内网所以直接使用了UPD每帧同步物理运算得到的位置、旋转数据&#xff0c;但是体验过程中发现有明显卡顿&#xff0c;一番排查发现是quest接入wifi网络ping值始终有高延迟波动&#xff0c;经过查阅文档…

蓝桥杯备考随手记: practise01

问题描述: 小明对数位中含有 2、0、1、9 的数字很感兴趣&#xff0c;在 1 到 40 中这样的数包 括 1、2、9、10 至 32、39 和 40&#xff0c;共 28 个&#xff0c;他们的和是 574。 请问&#xff0c;在 1 到 2019 中&#xff0c;所有这样的数的和是多少&#xff1f; 思路分析…

SpringBoot --条件注解与属性绑定

1. 条件注解 如果注解指定的条件成立&#xff0c;则触发指定行为。在前文介绍自动配置机制时&#xff0c;也可以看到在SpringBoot的源码中使用了该注解。 常用的条件注解主要有以下四个&#xff1a; ConditionalOnClass&#xff1a;如果类路径中存在这个类&#xff0c;则触发…

【动手学深度学习-pytorch】 9.4 双向循环神经网络

在序列学习中&#xff0c;我们以往假设的目标是&#xff1a; 在给定观测的情况下 &#xff08;例如&#xff0c;在时间序列的上下文中或在语言模型的上下文中&#xff09;&#xff0c; 对下一个输出进行建模。 虽然这是一个典型情景&#xff0c;但不是唯一的。 还可能发生什么其…

蓝桥杯物联网遇见的重大BUG及其产生原因和解决方法

BUG列表 1、ADC的RP2显示一直为0&#xff1a;2、LORX_Tx发送数据乱码&#xff1a;3、strcmp比较char a[2] {1, 2}与“12”字符串是否相等板子会死机&#xff1a;4、LORA_Tx和LORA_Rx放一起会接收不到数据&#xff1a;5、RTC获取到静止时间&#xff1a;6、ADC获取RP1和RP2模拟量…

离线Linux/openEuler服务器指定本地yum仓库

1、前提准备一个预装坏境比较完整的linux镜像文件&#xff0c;本文服务器使用的是openEuler 官网&#xff1a;openEuler下载 | 欧拉系统ISO镜像 | openEuler社区官网 2、上传镜像文件至服务器 如果是集群服务器&#xff0c;上传其中一台服务器之后&#xff0c;使用scp指令将镜…