IT博文
MySQL 事务隔离级别详解
使用 docker compose 安装 tidb
架构师日记-如何写的一手好代码
生产事故-记一次特殊的OOM排查
Docker安装RabbitMQ——基于docker-compose工具
使用 docker-compose 部署单机 RabbitMQ
只需3步,即刻体验Oracle Database 23c
长达 1.7 万字的 explain 关键字指南!
Redis为什么能抗住10万并发?揭秘性能优越的背后原因
深度剖析Redis九种数据结构实现原理
【绩效季】遇到一个好领导有多重要,从被打差绩效到收获成长
为什么Redis不直接使用C语言的字符串?
Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析
如何调整和优化 Go 程序的内存管理方式?
应用部署引起上游服务抖动问题分析及优化实践方案
Java 并发工具合集 JUC 大爆发!!!
卷起来!!这才是 MySQL 事务 & MVCC 的真相。
JDK8 到 JDK17 有哪些吸引人的新特性?
告别StringUtil:使用Java 11的全新String API优化你的代码
从JDK8飞升到JDK17,再到未来的JDK21
Java JMH Benchmark Tutorial
linux和macOS下top命令区别
Windows10关闭Hyper-V的三种方法
为什么应该选择 POSTGRES?
阿里云对象存储 OSS 限流超过阈值自动关闭【防破产,保平安】
Java高并发革命!JDK19新特性——虚拟线程(Virtual Threads)
“请不要在虚拟机中运行此程序”的解决方案
Spring中的循环依赖及解决
浅谈复杂业务系统的架构设计 | 京东云技术团队
面试题:聊聊TCP的粘包、拆包以及解决方案
操作日志记录实现方式
字节跳动技术团队-慢 SQL 分析与优化
Spring Boot 使用 AOP 防止重复提交
Controller层代码就该这么写,简洁又优雅!
SpringBoot 项目 + JWT 完成用户登录、注册、鉴权
重复提交不再是问题!SpringBoot自定义注解+AOP巧妙解决
SpringBoot 整合 ES 实现 CRUD 操作
SpringBoot 整合 ES 进行各种高级查询搜索
SpringBoot操作ES进行各种高级查询
SpringBoot整合ES查询
如何做架构设计? | 京东云技术团队
最值得推荐的五个VPN软件(便宜+好用+稳定),靠谱的V2ray梯子工具
我说MySQL每张表最好不超过2000万数据,面试官让我回去等通知?
vivo 自研鲁班分布式 ID 服务实践
使用自带zookeeper超简单安装kafka
推荐 6 个很牛的 IDEA 插件
喜马拉雅 Redis 与 Pika 缓存使用军规
「程序员转型技术管理」必修的 10 个能力提升方向
jdk17 下 netty 导致堆内存疯涨原因排查 | 京东云技术团队
如何优雅做好项目管理?
MySQL 到 TiDB:Hive Metastore 横向扩展之路
聊聊即将到来的 MySQL5.7 停服事件
Linux终端环境配置
微软 Edge 浏览器隐藏功能一览:多线程下载、IE 模式、阻止视频自动播放等
Hutool 中那些常用的工具类和实用方法
clash 内核删库?汇总目前常用的内核仓库和客户端
JDK11 升级 JDK17 最全实践干货来了 | 京东云技术团队
我是如何写一篇技术文的?
虚拟线程原理及性能分析
Java线程池实现原理及其在美团业务中的实践
Editplus和EmEditor配置一键编译java运行环境
用Spring Boot 3.2虚拟线程搭建静态文件服务器有多快?
SpringBoot中使用LocalDateTime踩坑记录 - 程序员偏安 - 博客园
程序员必备!10款实用便捷的Git可视化管理工具 - 追逐时光者 - 博客园
基于Netty开发轻量级RPC框架
开发Java应用时如何用好Log
复杂SQL治理实践 | 京东物流技术团队
火山引擎ByteHouse:分析型数据库如何设计并发控制?
多次崩了之后,阿里云终于改了
推荐程序员必知的四大神级学习网站
初探分布式链路追踪
新项目为什么决定用 JDK 17了
Java上进了,JDK21 要来了,并发编程再也不是噩梦了
mapstruct这么用,同事也开始模仿
再见RestTemplate,Spring 6.1新特性:RestClient 了解一下!
【MySQL】MySQL表设计的经验(建议收藏)
如何正确地理解应用架构并开发
解读工行专利CN112905176B
工商银行取得「基于 Spring Boot 的 web 系统后端实现方法及装置」专利
IDEA 2024.1:Spring支持增强、GitHub Action支持增强、更新HTTP Client等
TIOBE 2 月:Go 首次进入前十、“上古语言” COBOL 和 Fortran 排名飙升
Java 21 虚拟线程如何限流控制吞吐量
🎉 通用、灵活、高性能分布式 ID 生成器 | CosId 2.6.6 发布
20年编程,AI编程6个月,关于Copliot辅助编码工具,你想知道的都在这里
Java 8 内存管理原理解析及内存故障排查实践
消息队列选型之 Kafka vs RabbitMQ
从 MongoDB 到 PostgreSQL 的大迁移
腾讯云4月8日故障复盘及情况说明
PHP 在 2024 年还值得学习吗?
AMD集显安装显卡驱动之后出现黑屏,建议这样解决
使用 Docker 部署 moments 微信朋友圈 - 谱次· - 博客园
Java 17 是最常用的 Java LTS 版本
盘点Lombok的几个骚操作
Llama 3 + Ollama + Open WebUI打造本机强大GPT
如何优雅地编写缓存代码
Gmeek快速上手
笔记软件思源远程和本地接入大语言模型服务Ollama实现AI辅助写作(Windows篇)
Git Subtree:简单粗暴的多项目管理神器
这款轻量级规则引擎,真香!!
Ollama教程:本地LLM管理、WebUI对话、Python/Java客户端API应用
GLM-4-9B支持 Ollama 部署
智谱AI开源代码生成大模型第四代版本:CodeGeeX4-ALL-9B
美团二面:如何保证Redis与Mysql双写一致性?连续两个面试问到了!
免费开源好用,Obsidian和Omnivore真正实现一键联动剪藏文章,手把手教程!
得物 Redis 设计与实践
架构图怎么画?手把手教您,以生鲜电商为例剖析业务/应用/数据/技术架构图
使用Hutool要注意了!升级到6.0后你调用的所有方法都将报错 - 掘金
别再用雪花算法生成ID了!试试这个吧
无敌的Arthas!
Navicat Premium v16、v17 破解激活
🎉 分布式接口文档聚合,Solon 是怎么做的?
深入体验全新 Cursor AI IDE 后,说杀疯了真不为过!
Nacos 3.0 架构全景解读,AI 时代服务注册中心的演进
本文档使用 MrDoc 发布
-
+
Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析
上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。 但是BlockingQueue的实现类中,有一种阻塞队列比较特殊,就是SynchronousQueue(同步移交队列),队列长度为0。 作用就是一个线程往队列放数据的时候,必须等待另一个线程从队列中取走数据。同样,从队列中取数据的时候,必须等待另一个线程往队列中放数据。 这样特殊的队列,有什么应用场景呢? ## 1\. SynchronousQueue用法 先看一个SynchronousQueue的简单用例: ```java /** * @author 一灯架构 * @apiNote SynchronousQueue示例 **/ public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { // 1. 创建SynchronousQueue队列 BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(); // 2. 启动一个线程,往队列中放3个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 入队列 1"); synchronousQueue.put(1); Thread.sleep(1); System.out.println(Thread.currentThread().getName() + " 入队列 2"); synchronousQueue.put(2); Thread.sleep(1); System.out.println(Thread.currentThread().getName() + " 入队列 3"); synchronousQueue.put(3); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 3. 等待1000毫秒 Thread.sleep(1000L); // 4. 再启动一个线程,从队列中取出3个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); Thread.sleep(1); System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); Thread.sleep(1); System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } ``` 输出结果: ```java Thread-0 入队列 1 Thread-1 出队列 1 Thread-0 入队列 2 Thread-1 出队列 2 Thread-0 入队列 3 Thread-1 出队列 3 ``` 从输出结果中可以看到,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2。 由于SynchronousQueue是BlockingQueue的实现类,所以也实现类BlockingQueue中几组抽象方法: 为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法。 | 操作 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞一段时间 | | --- | --- | --- | --- | --- | | 放数据 | `add` | `offer` | `put` | `offer(e, time, unit)` | | 取数据 | `remove` | `poll` | `take` | `poll(time, unit)` | | 查看数据(不删除) | `element()` | `peek()` | 不支持 | 不支持 | 这几组方法的不同之处就是: 1. 当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer(e, time, unit)方法阻塞指定时间然后返回false。 2. 当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll(time, unit)方法阻塞指定时间然后返回null。 3. 当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。 工作中使用最多的就是offer、poll阻塞指定时间的方法。 ## 2\. SynchronousQueue应用场景 **SynchronousQueue的特点:** 队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。 这种特殊的实现逻辑有什么应用场景呢? 我的理解就是,**如果你希望你的任务需要被快速处理**,就可以使用这种队列。 Java线程池中的**newCachedThreadPool**(带缓存的线程池)底层就是使用SynchronousQueue实现的。 ```java public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } ``` **newCachedThreadPool**线程池的核心线程数是0,最大线程数是Integer的最大值,线程存活时间是60秒。 如果你使用**newCachedThreadPool**线程池,你提交的任务会被更快速的处理,因为你每次提交任务,都会有一个空闲的线程等着处理任务。如果没有空闲的线程,也会立即创建一个线程处理你的任务。 你想想,这处理效率,杠杠滴! 当然也有弊端,如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。 ## 3\. SynchronousQueue源码解析 ### 3.1 SynchronousQueue类属性 ```java public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { // 转换器,取数据和放数据的核心逻辑都在这个类里面 private transient volatile Transferer<E> transferer; // 默认的构造方法(使用非公平队列) public SynchronousQueue() { this(false); } // 有参构造方法,可以指定是否使用公平队列 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); } // 转换器实现类 abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); } // 基于栈实现的非公平队列 static final class TransferStack<E> extends Transferer<E> { } // 基于队列实现的公平队列 static final class TransferQueue<E> extends Transferer<E> { } } ``` 可以看到SynchronousQueue默认的无参构造方法,内部使用的是基于栈实现的非公平队列,当然也可以调用有参构造方法,传参是true,使用基于队列实现的公平队列。 ```java // 使用非公平队列(基于栈实现) BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(); // 使用公平队列(基于队列实现) BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(true); ``` 本次就常用的栈实现来剖析SynchronousQueue的底层实现原理。 ### 3.2 栈底层结构 栈结构,是非公平的,遵循先进后出。  使用个case测试一下: ```java /** * @author 一灯架构 * @apiNote SynchronousQueue示例 **/ public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { // 1. 创建SynchronousQueue队列 SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(); // 2. 启动一个线程,往队列中放1个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 入队列 0"); synchronousQueue.put(0); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 3. 等待1000毫秒 Thread.sleep(1000L); // 4. 启动一个线程,往队列中放1个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 入队列 1"); synchronousQueue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 5. 等待1000毫秒 Thread.sleep(1000L); // 6. 再启动一个线程,从队列中取出1个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); // 7. 等待1000毫秒 Thread.sleep(1000L); // 8. 再启动一个线程,从队列中取出1个元素 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } ``` 输出结果: ```java Thread-0 入队列 0 Thread-1 入队列 1 Thread-2 出队列 1 Thread-3 出队列 0 ``` 从输出结果中可以看出,符合栈结构先进后出的顺序。 ### 3.3 栈节点源码 栈中的数据都是由一个个的节点组成的,先看一下节点类的源码: ```java // 节点 static final class SNode { // 节点值(取数据的时候,该字段为null) Object item; // 存取数据的线程 volatile Thread waiter; // 节点模式 int mode; // 匹配到的节点 volatile SNode match; // 后继节点 volatile SNode next; } ``` - item 节点值,只在存数据的时候用。取数据的时候,这个值是null。 - waiter 存取数据的线程,如果没有对应的接收线程,这个线程会被阻塞。 - mode 节点模式,共有3种类型: | 类型值 | 类型描述 | 类型的作用 | | --- | --- | --- | | 0 | REQUEST | 表示取数据 | | 1 | DATA | 表示存数据 | | 2 | FULFILLING | 表示正在等待执行(比如取数据的线程,等待其他线程放数据) | ### 3.4 put/take流程 放数据和取数据的逻辑,在底层复用的是同一个方法,以put/take方法为例,另外两个放数据的方法,add和offer方法底层实现是一样的。 先看一下数据流转的过程,方便理解源码。 还是以上面的case为例: 1. Thread0先往SynchronousQueue队列中放入元素0 2. Thread1再往SynchronousQueue队列放入元素1 3. Thread2从SynchronousQueue队列中取出一个元素 第一步:Thread0先往SynchronousQueue队列中放入元素0 把本次操作组装成SNode压入栈顶,item是元素0,waiter是当前线程Thread0,mode是1表示放入数据。  第二步:Thread1再往SynchronousQueue队列放入元素1 把本次操作组装成SNode压入栈顶,item是元素1,waiter是当前线程Thread1,mode是1表示放入数据,next是SNode0。  第三步:Thread2从SynchronousQueue队列中取出一个元素 这次的操作比较复杂,也是先把本次的操作包装成SNode压入栈顶。 item是null(取数据的时候,这个字段没有值),waiter是null(当前线程Thread2正在操作,所以不用赋值了),mode是2表示正在操作(即将跟后继节点进行匹配),next是SNode1。  然后,Thread2开始把栈顶的两个节点进行匹配,匹配成功后,就把SNode2赋值给SNode1的match属性,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点。   ### 3.5 put/take源码实现 看完 了put/take流程,再来看源码就简单多了。 先看一下put方法源码: ```java // 放数据 public void put(E e) throws InterruptedException { // 不允许放null元素 if (e == null) throw new NullPointerException(); // 调用转换器实现类,放元素 if (transferer.transfer(e, false, 0) == null) { // 如果放数据失败,就中断当前线程,并抛出异常 Thread.interrupted(); throw new InterruptedException(); } } ``` 核心逻辑都在transfer方法中,代码很长,理清逻辑后,也很容易理解。 ```java // 取数据和放数据操作,共用一个方法 E transfer(E e, boolean timed, long nanos) { SNode s = null; // e为空,说明是取数据,否则是放数据 int mode = (e == null) ? REQUEST : DATA; for (; ; ) { SNode h = head; // 1. 如果栈顶节点为空,或者栈顶节点类型跟本次操作相同(都是取数据,或者都是放数据) if (h == null || h.mode == mode) { // 2. 判断节点是否已经超时 if (timed && nanos <= 0) { // 3. 如果栈顶节点已经被取消,就删除栈顶节点 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; // 4. 把本次操作包装成SNode,压入栈顶 } else if (casHead(h, s = snode(s, e, h, mode))) { // 5. 挂起当前线程,等待被唤醒 SNode m = awaitFulfill(s, timed, nanos); // 6. 如果这个节点已经被取消,就删除这个节点 if (m == s) { clean(s); return null; } // 7. 把s.next设置成head if ((h = head) != null && h.next == s) casHead(h, s.next); return (E) ((mode == REQUEST) ? m.item : s.item); } // 8. 如果栈顶节点类型跟本次操作不同,并且不是FULFILLING类型 } else if (!isFulfilling(h.mode)) { // 9. 再次判断如果栈顶节点已经被取消,就删除栈顶节点 if (h.isCancelled()) casHead(h, h.next); // 10. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶 else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // 11. 使用死循环,直到匹配到对应的节点 for (; ; ) { // 12. 遍历下个节点 SNode m = s.next; // 13. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。 if (m == null) { casHead(s, null); s = null; break; } SNode mn = m.next; // 14. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。 if (m.tryMatch(s)) { casHead(s, mn); return (E) ((mode == REQUEST) ? m.item : s.item); } else // 15. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配 s.casNext(m, mn); } } } else { // 16. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型, // 就再执行一遍上面第11步for循环中的逻辑(很少概率出现) SNode m = h.next; if (m == null) casHead(h, null); else { SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); else h.casNext(m, mn); } } } } ``` transfer方法逻辑也很简单,就是判断本次操作类型是否跟栈顶节点相同,如果相同,就把本次操作压入栈顶。否则就跟栈顶节点匹配,唤醒栈顶节点线程,弹出栈顶节点。 transfer方法中调用了awaitFulfill方法,**作用是**挂起当前线程。 ```java // 等待被唤醒 SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 1. 计算超时时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 2. 计算自旋次数 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); // 3. 如果已经匹配到其他节点,直接返回 SNode m = s.match; if (m != null) return m; if (timed) { // 4. 超时时间递减 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } // 5. 自旋次数减一 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // 6. 开始挂起当前线程 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } ``` awaitFulfill方法的逻辑也很简单,就是挂起当前线程。 take方法底层使用的也是transfer方法: ```java // 取数据 public E take() throws InterruptedException { // // 调用转换器实现类,取数据 E e = transferer.transfer(null, false, 0); if (e != null) return e; // 没取到,就中断当前线程 Thread.interrupted(); throw new InterruptedException(); } ``` ## 4\. 总结 1. SynchronousQueue是一种特殊的阻塞队列,队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。 2. SynchronousQueue底层是基于栈和队列两种数据结构实现的。 3. Java线程池中的**newCachedThreadPool**(带缓存的线程池)底层就是使用SynchronousQueue实现的。 4. 如果希望你的任务需要被快速处理,可以使用SynchronousQueue队列。
admin
2023年4月15日 06:38
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码