多个线程按顺序执行?我有10种做法,你能想到几种

多个线程按顺序执行?我有10种做法,你能想到几种

首页动作格斗多线程游戏更新时间:2024-06-07

很多读者朋友应该都觉得这道题目不难,这次给大家带来十二种做法,一定有你没有见过的新姿势。

1. synchronized wait notify

说到同步,我们很容易就想到synchronized。

线程间通信呢?我们先回忆一下线程间的调度。

可以看到,等待和运行之间的转换可以用wait和notify。

那么整体思路也就有了:

public class ABC1 { //锁住的对象 private final static Object lock = new Object(); //A是否已经执行 private static boolean aExecuted = false; //B是否已经执行过 private static boolean bExecuted = false; public static void printA() { synchronized (lock) { System.out.println("A"); aExecuted = true; //唤醒所有等待线程 lock.notifyAll(); } } ​ public static void printB() throws InterruptedException { synchronized (lock) { //获取到锁,但是要等A执行 while (!aExecuted) { lock.wait(); } System.out.println("B"); bExecuted = true; lock.notifyAll(); } } ​ public static void printC() throws InterruptedException { synchronized (lock) { //获取到锁,但是要等B执行 while (!bExecuted) { lock.wait(); } System.out.println("C"); } } ​ } 复制代码

@Test void abc1() { //线程A new Thread(() -> { ABC1.printA(); }, "A").start(); //线程B new Thread(() -> { try { ABC1.printB(); } catch (InterruptedException e) { e.printStackTrace(); } }, "B").start(); //线程C new Thread(() -> { try { ABC1.printC(); } catch (InterruptedException e) { e.printStackTrace(); } }, "C").start(); } 复制代码2. lock 全局变量state

还可以用lock state来实现,大概思路:

public class ABC2 { //可重入锁 private final static Lock lock = new ReentrantLock(); //判断是否执行:1表示应该A执行,2表示应该B执行,3表示应该C执行 private static int state = 1; ​ public static void printA() { //自旋 while (state < 4) { try { //获取锁 lock.lock(); //并发情况下,不能用if,要用循环判断等待条件,避免虚假唤醒 while (state == 1) { System.out.println("A"); state ; } } finally { //要保证不执行的时候,锁能释放掉 lock.unlock(); } } } ​ public static void printB() throws InterruptedException { while (state < 4) { try { lock.lock(); //获取到锁,应该执行 while (state == 2) { System.out.println("B"); state ; } } finally { lock.unlock(); } } } ​ public static void printC() throws InterruptedException { while (state < 4) { try { lock.lock(); while (state == 3) { //获取到锁,应该执行 System.out.println("C"); state ; } } finally { lock.unlock(); } } } ​ } 复制代码

这里也有几个细节要注意:

3. volatile

上一种做法,我们用了同步 全局变量的方式,那么有没有什么更轻量级的做法?

我们可以直接用volatile修饰变量,volatile能保证变量的更改对所有线程可见。

public class ABC3 { ​ //判断是否执行:1表示应该A执行,2表示应该B执行,3表示应该C执行 private static volatile Integer state = 1; ​ public static void printA() { //通过循环,hang住线程 while (state != 1) { } System.out.println("A"); state ; } ​ public static void printB() throws InterruptedException { while (state != 2) { } System.out.println("B"); state ; } ​ public static void printC() throws InterruptedException { while (state != 3) { } System.out.println("C"); state ; } ​ } 复制代码4. AtomicInteger

除了无锁的volatile方法,还有没有什么轻量级锁的方法呢?

我们都知道synchronized和lock都属于悲观锁,我们还可以用乐观锁来实现。

在Java里,我们熟悉的原子操作类AtomicInteger就是基于CAS实现的,可以用来保证Integer操作的原子性。

public class ABC4 { ​ //判断是否执行:1表示应该A执行,2表示应该B执行,3表示应该C执行 private static AtomicInteger state = new AtomicInteger(1); ​ public static void printA() { System.out.println("A"); state.incrementAndGet(); } ​ public static void printB() throws InterruptedException { while (state.get() < 4) { while (state.get() == 2) { System.out.println("B"); state.incrementAndGet(); } } } ​ public static void printC() throws InterruptedException { while (state.get() < 4) { while (state.get() == 3) { System.out.println("C"); state.incrementAndGet(); } } } ​ } 复制代码5.lock condition

在Java中,除了Object的waitnotify/notify可以实现等待/通知机制,conditionLock配合同样可以完成等待通知机制。

使用condition.await(),使当前线程进入等待状态,使用condition.signal()或者condition.signalAll()唤醒等待线程。

public class ABC5 { //可重入锁 private final static Lock lock = new ReentrantLock(); //判断是否执行:1表示应该A执行,2表示应该B执行,3表示应该C执行 private static int state = 1; //condition对象 private static Condition a = lock.newCondition(); private static Condition b = lock.newCondition(); private static Condition c = lock.newCondition(); ​ public static void printA() { //通过循环,hang住线程 while (state < 4) { try { //获取锁 lock.lock(); //并发情况下,不能用if,要用循环判断等待条件,避免虚假唤醒 while (state != 1) { a.await(); } System.out.println("A"); state ; b.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //要保证不执行的时候,锁能释放掉 lock.unlock(); } } } ​ public static void printB() throws InterruptedException { while (state < 4) { try { lock.lock(); //获取到锁,应该执行 while (state != 2) { b.await(); } System.out.println("B"); state ; c.signal(); } finally { lock.unlock(); } } } ​ public static void printC() throws InterruptedException { while (state < 4) { try { lock.lock(); while (state != 3) { c.await(); } //获取到锁,应该执行 System.out.println("C"); state ; } finally { lock.unlock(); } } } ​ } 复制代码6.信号量Semaphore

线程间同步,还可以使用信号量Semaphore,信号量顾名思义,多线程协作时完成信号传递。

使用acquire() 获取许可,如果没有可用的许可,线程进入阻塞等待状态;使用release释放许可。

public class ABC6 { ​ private static Semaphore semaphoreB = new Semaphore(0); private static Semaphore semaphoreC = new Semaphore(0); ​ public static void printA() { System.out.println("A"); semaphoreB.release(); } ​ public static void printB() throws InterruptedException { semaphoreB.acquire(); System.out.println("B"); semaphoreC.release(); } ​ public static void printC() throws InterruptedException { semaphoreC.acquire(); System.out.println("C"); } ​ } 复制代码7.计数器CountDownLatch

CountDownLatch的一个适用场景,就是用来进行多个线程的同步管理,线程调用了countDownLatch.await() 之后,需要等待countDownLatch的信号countDownLatch.countDown() ,在收到信号前,它不会往下执行。

public class ABC7 { ​ private static CountDownLatch countDownLatchB = new CountDownLatch(1); private static CountDownLatch countDownLatchC = new CountDownLatch(1); ​ public static void printA() { System.out.println("A"); countDownLatchB.countDown(); } ​ public static void printB() throws InterruptedException { countDownLatchB.await(); System.out.println("B"); countDownLatchC.countDown(); } ​ public static void printC() throws InterruptedException { countDownLatchC.await(); System.out.println("C"); } ​ } 复制代码8. 循环栅栏CyclicBarrier

用到了CountDownLatch,我们应该想到,还有一个功能和它类似的工具类CyclicBarrier。

有的翻译叫同步屏障,我觉得翻译成循环栅栏,更能体现它的功能特性。

就像是出去旅游,大家不同时间到了景区门口,但是景区疫情限流,先把栅栏拉下来,在景区里的游客走一批,打开栅栏,再放进去一批,走一批,再放进去一批……

这就是CyclicBarrier的两个特性,

这道题怎么用CyclicBarrier解决呢?

public class ABC8 { ​ private static CyclicBarrier cyclicBarrier = new CyclicBarrier(1); private static Integer state = 1; ​ public static void printA() { while (state != 1) { } System.out.println("A"); state = 2; } ​ public static void printB() throws InterruptedException { try { //在栅栏前等待 cyclicBarrier.await(); //state不等于2的时候等待 while (state != 2) { } System.out.println("B"); state = 3; } catch (BrokenBarrierException e) { e.printStackTrace(); } } ​ public static void printC() throws InterruptedException { try { cyclicBarrier.await(); while (state != 3) { } System.out.println("C"); } catch (BrokenBarrierException e) { e.printStackTrace(); } } ​ } ​ 复制代码

当然,CyclicBarrier的实现其实还是基于lock condition,多个线程在到达一定条件前await,到达条件后signalAll。

9.交换器Exchanger

在前面,我们已经用到了常用的并发工具类,其实还有一个不那么常用的并发工具类Exchanger,同样也可以用来解决这道题目。

Exchanger用于两个线程在某个节点时进行数据交换,在这道题里:

public class ABC9 { private static Exchanger<Integer> exchangerB = new Exchanger<>(); private static Exchanger<Integer> exchangerC = new Exchanger<>(); ​ public static void printA() { System.out.println("A"); try { //交换 exchangerB.exchange(2); } catch (InterruptedException e) { e.printStackTrace(); } } ​ public static void printB() { try { //交换 Integer state = exchangerB.exchange(0); //等待 while (state != 2) { } //执行 System.out.println("B"); //第二次交换 exchangerC.exchange(3); } catch (InterruptedException e) { e.printStackTrace(); } } ​ public static void printC() { try { Integer state = exchangerC.exchange(0); while (state != 3) { } System.out.println("C"); } catch (InterruptedException e) { e.printStackTrace(); } } } ​ 复制代码

Exchanger是基于ThreadLocal实现的,那么我们这个问题可以基于ThreadLocal来实现吗?

10.ThreadLocal

ThreadLocal,我们应该都了解过它的用法和原理,那么怎么用ThreadLocal实现三个线程顺序打印ABC呢?

子线程是并发执行的,但是主线程的代码是顺序执行的,我们在主线程里改变变量,子线程根据变量判断。

那么问题来了,子线程怎么获取主线程的变量呢?可以用InheritableThreadLocal。

public class ABC10 { public static void main(String[] args) { //使用ThreadLocal存储变量 ThreadLocal<Integer> threadLocal = new InheritableThreadLocal<>(); threadLocal.set(1); new Thread(() -> { System.out.println("A"); }, "A").start(); //设置变量值 threadLocal.set(2); ​ new Thread(() -> { //等待 while (threadLocal.get() != 2) { } System.out.println("B"); }, "B").start(); threadLocal.set(3); ​ new Thread(() -> { while (threadLocal.get() != 3) { } System.out.println("C"); }, "C").start(); } } 复制代码11.管道流PipedStream

线程之间通信,还有一种比较笨重的办法——PipedInputStream/PipedOutStream。

一个线程使用PipedOutStream写数据,一个线程使用PipedInputStream读数据,而且Piped的读取只能一对一。

那么,在这道题里:

public class ABC11 { public static void main(String[] args) throws IOException { //线程A的输出流 PipedOutputStream outputStreamA = new PipedOutputStream(); //线程B的输出流 PipedOutputStream outputStreamB = new PipedOutputStream(); //线程B的输入流 PipedInputStream inputStreamB = new PipedInputStream(); //线程C的输入流 PipedInputStream inputStreamC = new PipedInputStream(); ​ ​ outputStreamA.connect(inputStreamB); outputStreamB.connect(inputStreamC); ​ new Thread(() -> { System.out.println("A"); try { //流写入 outputStreamA.write("B".getBytes()); } catch (IOException e) { e.printStackTrace(); } }, "A").start(); ​ new Thread(() -> { //流读取 byte[] buffer = new byte[1]; try { inputStreamB.read(buffer); //转换成String String msg = new String(buffer); System.out.println(msg); outputStreamB.write("C".getBytes()); } catch (IOException e) { e.printStackTrace(); } }, "B").start(); ​ new Thread(() -> { byte[] buffer = new byte[1]; try { inputStreamC.read(buffer); String msg = new String(buffer); System.out.println(msg); } catch (IOException e) { e.printStackTrace(); } }, "C").start(); } } 复制代码12.阻塞队列BlockingQueue

阻塞队列同样也可以用来进行线程调度。

public class ABC12 { ​ private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); ​ public static void printA() { System.out.println("A"); queue.offer("B"); } ​ public static void printB() throws InterruptedException { while (queue.size() != 1) { } System.out.println("B"); queue.offer("C"); } ​ public static void printC() throws InterruptedException { while (queue.size() != 2) { } System.out.println("C"); } ​ } ​ 复制代码多线程安全问题

线程安全问题通俗的讲:主要是在多线程的环境下,不同线程同时读和写公共资源(临界资源),导致的数据异常问题。

比如:变量a=0,线程1给该变量 1,线程2也给该变量 1。此时,线程3获取a的值有可能不是2,而是1。线程3这不就获取了错误的数据?

线程安全问题会直接导致数据异常,从而影响业务功能的正常使用,所以这个问题还是非常严重的。

那么,如何解决线程安全问题呢?

今天跟大家一起聊聊,保证线程安全的10个小技巧,希望对你有所帮助。

1. 无状态

我们都知道只有多个线程访问公共资源的时候,才可能出现数据安全问题,那么如果我们没有公共资源,是不是就没有这个问题呢?

例如:

public class NoStatusService { public void add(String status) { System.out.println("add status:" status); } public void update(String status) { System.out.println("update status:" status); } } 复制代码

这个例子中NoStatusService没有定义公共资源,换句话说是无状态的。

这种场景中,NoStatusService类肯定是线程安全的。

2. 不可变

如果多个线程访问的公共资源是不可变的,也不会出现数据的安全性问题。

例如:

public class NoChangeService { public static final String DEFAULT_NAME = "abc"; public void add(String status) { System.out.println(DEFAULT_NAME); } } 复制代码

DEFAULT_NAME被定义成了static final的常量,在多线程中环境中不会被修改,所以这种情况,也不会出现线程安全问题。

3. 无修改权限

有时候,我们定义了公共资源,但是该资源只暴露了读取的权限,没有暴露修改的权限,这样也是线程安全的。

例如:

public class SafePublishService { private String name; public String getName() { return name; } public void add(String status) { System.out.println("add status:" status); } } 复制代码

这个例子中,没有对外暴露修改name字段的入口,所以不存在线程安全问题。

3. synchronized

使用JDK内部提供的同步机制,这也是使用比较多的手段,分为:同步方法 和 同步代码块。

我们优先使用同步代码块,因为同步方法的粒度是整个方法,范围太大,相对来说,更消耗代码的性能。

其实,每个对象内部都有一把锁,只有抢到那把锁的线程,才被允许进入对应的代码块执行相应的代码。

当代码块执行完之后,JVM底层会自动释放那把锁。

例如:

public class SyncService { private int age = 1; private Object object = new Object(); //同步方法 public synchronized void add(int i) { age = age i; System.out.println("age:" age); } public void update(int i) { //同步代码块,对象锁 synchronized (object) { age = age i; System.out.println("age:" age); } } public void update(int i) { //同步代码块,类锁 synchronized (SyncService.class) { age = age i; System.out.println("age:" age); } } } 复制代码4. Lock

除了使用synchronized关键字实现同步功能之外,JDK还提供了Lock接口,这种显示锁的方式。

通常我们会使用Lock接口的实现类:ReentrantLock,它包含了:公平锁、非公平锁、可重入锁、读写锁 等更多更强大的功能。

例如:

public class LockService { private ReentrantLock reentrantLock = new ReentrantLock(); public int age = 1; public void add(int i) { try { reentrantLock.lock(); age = age i; System.out.println("age:" age); } finally { reentrantLock.unlock(); } } } 复制代码

但如果使用ReentrantLock,它也带来了有个小问题就是:需要在finally代码块中手动释放锁。

不过说句实话,在使用Lock显示锁的方式,解决线程安全问题,给开发人员提供了更多的灵活性。

5. 分布式锁

如果是在单机的情况下,使用synchronized和Lock保证线程安全是没有问题的。

但如果在分布式的环境中,即某个应用如果部署了多个节点,每一个节点使用可以synchronized和Lock保证线程安全,但不同的节点之间,没法保证线程安全。

这就需要使用:分布式锁了。

分布式锁有很多种,比如:数据库分布式锁,zookeeper分布式锁,redis分布式锁等。

其中我个人更推荐使用redis分布式锁,其效率相对来说更高一些。

使用redis分布式锁的伪代码如下:

try{ String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime); if ("OK".equals(result)) { return true; } return false; } finally { unlock(lockKey); } 复制代码

同样需要在finally代码块中释放锁。

如果你对redis分布式锁的用法和常见的坑,比较感兴趣的话,可以看看我的另一篇文章《聊聊redis分布式锁的8大坑》,里面有更详细的介绍。

6. volatile

有时候,我们有这样的需求:如果在多个线程中,有任意一个线程,把某个开关的状态设置为false,则整个功能停止。

简单的需求分析之后发现:只要求多个线程间的可见性,不要求原子性。

如果一个线程修改了状态,其他的所有线程都能获取到最新的状态值。

这样一分析这就好办了,使用volatile就能快速满足需求。

例如:

@Service public CanalService { private volatile boolean running = false; private Thread thread; @Autowired private CanalConnector canalConnector; public void handle() { //连接canal while(running) { //业务处理 } } public void start() { thread = new Thread(this::handle, "name"); running = true; thread.start(); } public void stop() { if(!running) { return; } running = false; } } 复制代码

需要特别注意的地方是:volatile不能用于计数和统计等业务场景。因为volatile不能保证操作的原子性,可能会导致数据异常。

7. ThreadLocal

除了上面几种解决思路之外,JDK还提供了另外一种用空间换时间的新思路:ThreadLocal。

当然ThreadLocal并不能完全取代锁,特别是在一些秒*更新库存中,必须使用锁。

ThreadLocal的核心思想是:共享变量在每个线程都有一个副本,每个线程操作的都是自己的副本,对另外的线程没有影响。

温馨提醒一下:我们平常在使用ThreadLocal时,如果使用完之后,一定要记得在finally代码块中,调用它的remove方法清空数据,不然可能会出现内存泄露问题。

例如:

public class ThreadLocalService { private ThreadLocal<Integer> threadLocal = new ThreadLocal<>(); public void add(int i) { Integer integer = threadLocal.get(); threadLocal.set(integer == null ? 0 : integer i); } } 复制代码

如果对ThreadLocal感兴趣的小伙伴,可以看看我的另一篇文章《ThreadLocal夺命11连问》,里面有对ThreadLocal的原理、用法和坑,有非常详细的介绍。

8. 线程安全集合

有时候,我们需要使用的公共资源放在某个集合当中,比如:ArrayList、HashMap、HashSet等。

如果在多线程环境中,有线程往这些集合中写数据,另外的线程从集合中读数据,就可能会出现线程安全问题。

为了解决集合的线程安全问题,JDK专门给我们提供了能够保证线程安全的集合。

比如:CopyOnWriteArrayList、ConcurrentHashMap、CopyOnWriteArraySet、ArrayBlockingQueue等等。

例如:

public class HashMapTest { private static ConcurrentHashMap<String, Object> hashMap = new ConcurrentHashMap<>(); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { hashMap.put("key1", "value1"); } }).start(); new Thread(new Runnable() { @Override public void run() { hashMap.put("key2", "value2"); } }).start(); try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(hashMap); } } 复制代码

在JDK底层,或者spring框架当中,使用ConcurrentHashMap保存加载配置参数的场景非常多。

比较出名的是spring的refresh方法中,会读取配置文件,把配置放到很多的ConcurrentHashMap缓存起来。

9. CAS

JDK除了使用锁的机制解决多线程情况下数据安全问题之外,还提供了CAS机制。

这种机制是使用CPU中比较和交换指令的原子性,JDK里面是通过Unsafe类实现的。

CAS内部包含了四个值:旧数据、期望数据、新数据 和 地址,比较旧数据 和 期望的数据,如果一样的话,就把旧数据改成新数据。如果不一样的话,当前线程不断自旋,一直到成功为止。

不过,使用CAS保证线程安全,可能会出现ABA问题,需要使用AtomicStampedReference增加版本号解决。

其实,实际工作中很少直接使用Unsafe类的,一般用atomic包下面的类即可。

public class AtomicService { private AtomicInteger atomicInteger = new AtomicInteger(); public int add(int i) { return atomicInteger.getAndAdd(i); } } 复制代码10. 数据隔离

有时候,我们在操作集合数据时,可以通过数据隔离,来保证线程安全。

例如:

public class ThreadPoolTest { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor(8, //corePoolSize线程池中核心线程数 10, //maximumPoolSize 线程池中最大线程数 60, //线程池中线程的最大空闲时间,超过这个时间空闲线程将被回收 TimeUnit.SECONDS,//时间单位 new ArrayBlockingQueue(500), //队列 new ThreadPoolExecutor.CallerRunsPolicy()); //拒绝策略 List<User> userList = Lists.newArrayList( new User(1L, "苏三", 18, "成都"), new User(2L, "苏三说技术", 20, "四川"), new User(3L, "技术", 25, "云南")); for (User user : userList) { threadPool.submit(new Work(user)); } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(userList); } static class Work implements Runnable { private User user; public Work(User user) { this.user = user; } @Override public void run() { user.setName(user.getName() "测试"); } } } 复制代码

这个例子中,使用线程池处理用户信息。

每个用户只被线程池中的一个线程处理,不存在多个线程同时处理一个用户的情况。所以这种人为的数据隔离机制,也能保证线程安全。

数据隔离还有另外一种场景:kafka生产者把同一个订单的消息,发送到同一个partion中。每一个partion都部署一个消费者,在kafka消费者中,使用单线程接收消息,并且做业务处理。

这种场景下,从整体上看,不同的partion是用多线程处理数据的,但同一个partion则是用单线程处理的,所以也能解决线程安全问题。

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved