2024年9月reactor线程模型的特点(求救,分布式事务怎么处理)

 更新时间:2024-09-21 06:36:02

  ⑴reactor线程模型的特点(求救,分布式事务怎么处理

  ⑵求救,分布式事务怎么处理

  ⑶性能和时延问题在服务化之前,业务通常都是本地API调用,本地方法调用性能损耗较小。服务化之后,服务提供者和消费者之间采用远程网络通信,增加了额外的性能损耗:客户端需要对消息进行序列化,主要占用CPU计算资源。序列化时需要创建二进制数组,耗费JVM堆内存或者堆外内存。客户端需要将序列化之后的二进制数组发送给服务端,占用网络带宽资源。服务端读取到码流之后,需要将请求数据报反序列化成请求对象,占用CPU计算资源。服务端通过反射的方式调用服务提供者实现类,反射本身对性能影响就比较大。服务端将响应结果序列化,占用CPU计算资源。服务端将应答码流发送给客户端,占用网络带宽资源。客户端读取应答码流,反序列化成响应消息,占用CPU资源。通过分析我们发现,一个简单的本地方法调用,切换成远程服务调用之后,额外增加了很多处理流程,不仅占用大量的系统资源,同时增加了时延。一些复杂的应用会拆分成多个服务,形成服务调用链,如果服务化框架的性能比较差、服务调用时延也比较大,业务服务化之后的性能和时延将无法满足业务的性能需求。.RPC框架高性能设计影响RPC框架性能的主要因素有三个。I/O调度模型:同步阻塞I/O(BIO还是非阻塞I/O(NIO。序列化框架的选择:文本协议、二进制协议或压缩二进制协议。线程调度模型:串行调度还是并行调度,锁竞争还是无锁化算法。.I/O调度模型在I/O编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者I/O多路复用技术进行处理。I/O多路复用技术通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型比,I/O多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低了系统的维护工作量,节省了系统资源。JDK._update版本使用epoll替代了传统的select/poll,极大地提升了NIO通信的性能,它的工作原理如图-所示。图-非阻塞I/O工作原理ty是一个开源的高性能NIO通信框架:它的I/O线程NioEventLoop由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端Channel。由于读写操作都是非阻塞的,这就可以充分提升I/O线程的运行效率,避免由于频繁I/O阻塞导致的线程挂起。另外,由于ty采用了异步通信模式,一个I/O线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。ty被精心设计,提供了很多独特的性能提升特性,使它做到了在各种NIO框架中性能排名第一,它的性能优化措施总结如下。零拷贝:(ty的接收和发送ByteBuffer采用DIRECTBUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAPBUFFERS进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。(ty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便地对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。(ty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。内存池:随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区Buffer,情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,ty提供了基于内存池的缓冲区重用机制。性能测试表明,采用内存池的ByteBuf相比于朝生夕灭的ByteBuf,性能高倍左右(性能数据与使用场景强相关。无锁化的串行设计:在大多数场景下,并行多线程处理可以提升系统的并发性能。但是,如果对于共享资源的并发访问处理不当,会带来严重的锁竞争,这最终会导致性能的下降。为了尽可能地避免锁竞争带来的性能损耗,可以通过串行化设计,即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。为了尽可能提升性能,ty采用了串行无锁化设计,在I/O线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。高效的并发编程:volatile的大量、正确使用;CAS和原子类的广泛使用;线程安全容器的使用;通过读写锁提升并发性能。.高性能序列化框架影响序列化性能的关键因素总结如下。序列化后的码流大小(网络带宽的占用。序列化&反序列化的性能(CPU资源占用。是否支持跨语言(异构系统的对接和开发语言切换。并发调用的性能表现:稳定性、线性增长、偶现的时延毛刺等。相比于JSON等文本协议,二进制序列化框架性能更优异,以Java原生序列化和Protobuf二进制序列化为例进行性能测试对比,结果如图-所示。图-序列化性能测试对比数据在序列化框架的技术选型中,如无特殊要求,尽量选择性能更优的二进制序列化框架,码流是否压缩,则需要根据通信内容做灵活选择,对于图片、音频、有大量重复内容的文本文件(例如小说可以采用码流压缩,常用的压缩算法包括GZip、Zig-Zag等。.高性能的Reactor线程模型该模型的特点总结如下。有专门一个NIO线程:Aeptor线程用于监听服务端,接收客户端的TCP连接请求。网络I/O操作:读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送。个NIO线程可以同时处理N条链路,但是个链路只对应个NIO线程,防止产生并发操作。由于Reactor模式使用的是异步非阻塞I/O,所有的I/O操作都不会导致阻塞,理论上一个线程可以独立处理所有I/O相关的操作,因此在绝大多数场景下,Reactor多线程模型都可以完全满足业务性能需求。Reactor线程调度模型的工作原理示意如图-所示。图-高性能的Reactor线程调度模型.业务最佳实践要保证高性能,单依靠分布式服务框架是不够的,还需要应用的配合,应用服务化高性能实践总结如下:能异步的尽可能使用异步或者并行服务调用,提升服务的吞吐量,有效降低服务调用时延。无论是NIO通信框架的线程池还是后端业务线程池,线程参数的配置必须合理。如果采用JDK默认的线程池,最大线程数建议不超过个。因为JDK的线程池默认采用N个线程争用个同步阻塞队列方式,当线程数过大时,会导致激烈的锁竞争,此时性能不仅不会提升,反而会下降。尽量减小要传输的码流大小,提升性能。本地调用时,由于在同一块堆内存中访问,参数大小对性能没有任何影响。跨进程通信时,往往传递的是个复杂对象,如果明确对方只使用其中的某几个字段或者某个对象引用,则不要把整个复杂对象都传递过去。举例,对象A持有个基本类型的字段,个复杂对象B和C。如果明确服务提供者只需要用到A聚合的C对象,则请求参数应该是C,而不是整个对象A。设置合适的客户端超时时间,防止业务高峰期因为服务端响应慢导致业务线程等应答时被阻塞,进而引起后续其他服务的消息在队列中排队,造成故障扩散。对于重要的服务,可以单独部署到独立的服务线程池中,与其他非核心服务做隔离,保障核心服务的高效运行。利用Docker等轻量级OS容器部署服务,对服务做物理资源层隔离,避免虚拟化之后导致的超过%的性能损耗。设置合理的服务调度优先级,并根据线上性能监控数据做实时调整。.事务一致性问题服务化之前,业务采用本地事务,多个本地SQL调用可以用一个大的事务块封装起来,如果某一个数据库操作发生异常,就可以将之前的SQL操作进行回滚,只有所有SQL操作全部成功,才最终提交,这就保证了事务强一致性,如图-所示。服务化之后,三个数据库操作可能被拆分到独立的三个数据库访问服务中,此时原来的本地SQL调用演变成了远程服务调用,事务一致性无法得到保证,如图-所示。图-服务化之后引入分布式事务问题假如服务A和服务B调用成功,则A和B的SQL将会被提交,最后执行服务C,它的SQL操作失败,对于应用消费者而言,服务A和服务B的相关SQL操作已经提交,服务C发生了回滚,这就导致事务不一致。从图-可以得知,服务化之后事务不一致主要是由服务分布式部署导致的,因此也被称为分布式事务问题。.分布式事务设计方案通常,分布式事务基于两阶段提交实现,它的工作原理示意图如图-所示。图-两阶段提交原理图阶段:全局事务管理器向所有事务参与者发送准备请求;事务参与者向全局事务管理器回复自己是否准备就绪。阶段:全局事务管理器接收到所有事务参与者的回复之后做判断,如果所有事务参与者都可以提交,则向所有事务提交者发送提交申请,否则进行回滚。事务参与者根据全局事务管理器的指令进行提交或者回滚操作。分布式事务回滚原理图如图-所示。图-分布式事务回滚原理图两阶段提交采用的是悲观锁策略,由于各个事务参与者需要等待响应最慢的参与者,因此性能比较差。第一个问题是协议本身的成本:整个协议过程是需要加锁的,比如锁住数据库的某条记录,且需要持久化大量事务状态相关的操作日志。更为麻烦的是,两阶段锁在出现故障时表现出来的脆弱性,比如两阶段锁的致命缺陷:当协调者出现故障,整个事务需要等到协调者恢复后才能继续执行,如果协调者出现类似磁盘故障等错误,该事务将被永久遗弃。对于分布式服务框架而言,从功能特性上需要支持分布式事务。在实际业务使用过程中,如果能够通过最终一致性解决问题,则不需要做强一致性;如果能够避免分布式事务,则尽量在业务层避免使用分布式事务。.分布式事务优化既然分布式事务有诸多缺点,那么为什么我们还在使用呢?有没有更好的解决方案来改进或者替换呢?如果我们只是针对分布式事务去优化的话,发现其实能改进的空间很小,毕竟瓶颈在分布式事务模型本身。那我们回到问题的根源:为什么我们需要分布式事务?因为我们需要各个资源数据保持一致性,但是对于分布式事务提供的强一致性,所有业务场景真的都需要吗?大多数业务场景都能容忍短暂的不一致,不同的业务对不一致的容忍时间不同。像银行转账业务,中间有几分钟的不一致时间,用户通常都是可以理解和容忍的。在大多数的业务场景中,我们可以使用最终一致性替代传统的强一致性,尽量避免使用分布式事务。在实践中常用的最终一致性方案就是使用带有事务功能的MQ做中间人角色,它的工作原理如下:在做本地事务之前,先向MQ发送一个prepare消息,然后执行本地事务,本地事务提交成功的话,向MQ发送一个mit消息,否则发送一个rollback消息,取消之前的消息。MQ只会在收到mit确认才会将消息投递出去,所以这样的形式可以保证在一切正常的情况下,本地事务和MQ可以达到一致性。但是分布式调用存在很多异常场景,诸如网络超时、VM宕机等。假如系统执行了local_tx()成功之后,还没来得及将mit消息发送给MQ,或者说发送出去由于网络超时等原因,MQ没有收到mit,发生了mit消息丢失,那么MQ就不会把prepare消息投递出去。MQ会根据策略去尝试询问(回调发消息的系统(checkmit进行检查该消息是否应该投递出去或者丢弃,得到系统的确认之后,MQ会做投递还是丢弃,这样就完全保证了MQ和发消息的系统的一致性,从而保证了接收消息系统的一致性。.研发团队协作问题服务化之后,特别是采用微服务架构以后。研发团队会被拆分成多个服务化小组,例如AWS的TwoPizzaTeam,每个团队由~名研发负责服务的开发、测试、部署上线、运维和运营等。随着服务数的膨胀,研发团队的增多,跨团队的协同配合将会成为一个制约研发效率提升的因素。.共用服务注册中心为了方便开发测试,经常会在线下共用一个所有服务共享的服务注册中心,这时,一个正在开发中的服务发布到服务注册中心,可能会导致一些消费者不可用。解决方案:可以让服务提供者开发方,只订阅服务(开发的服务可能依赖其他服务,而不注册正在开发的服务,通过直连测试正在开发的服务。它的工作原理如图-所示。图-只订阅,不发布.直连提供者在开发和测试环境下,如果公共的服务注册中心没有搭建,消费者将无法获取服务提供者的地址列表,只能做本地单元测试或使用模拟桩测试。还有一种场景就是在实际测试中,服务提供者往往多实例部署,如果服务提供者存在Bug,就需要做远程断点调试,这会带来两个问题:服务提供者多实例部署,远程调试地址无法确定,调试效率低下。多个消费者可能共用一套测试联调环境,断点调试过程中可能被其他消费者意外打断。解决策略:绕过注册中心,只测试指定服务提供者,这时候可能需要点对点直连,点对点直联方式将以服务接口为单位,忽略注册中心的提供者列表。.多团队进度协同假如前端Web门户依赖后台A、B、C和D个服务,分别由个不同的研发团队负责,门户要求新特性周内上线。A和B内部需求优先级排序将门户的优先级排的比较高,可以满足交付时间点。但是C和D服务所在团队由于同时需要开发其他优先级更高的服务,因此把优先级排的相对较低,无法满足周交付。在C和D提供版本之前,门户只能先通过打测试桩的方式完成Mock测试,但是由于并没有真实的测试过C和D服务,因此需求无法按期交付。应用依赖的服务越多,特性交付效率就越低下,交付的速度取决于依赖的最迟交付的那个服务。假如Web门户依赖后台的个服务,只要个核心服务没有按期交付,则整个进度就会延迟。解决方案:调用链可以将应用、服务和中间件之间的依赖关系串接并展示出来,基于调用链首入口的交付日期作为输入,利用依赖管理工具,可以自动计算出调用链上各个服务的最迟交付时间点。通过调用链分析和标准化的依赖计算工具,可以避免人为需求排序失误导致的需求延期。.服务降级和Mock测试在实际项目开发中,由于小组之间、个人开发者之间开发节奏不一致,经常会出现消费者等待依赖的服务提供者提供联调版本的情况,相互等待会降低项目的研发进度。解决方案:服务提供者首先将接口定下来并提供给消费者,消费者可以将服务降级同Mock测试结合起来,在Mock测试代码中实现容错降级的业务逻辑(业务放通,这样既完成了Mock测试,又实现了服务降级的业务逻辑开发,一举两得。.协同调试问题在实际项目开发过程中,各研发团队进度不一致很正常。如果消费者坐等服务提供者按时提供版本,往往会造成人力资源浪费,影响项目进度。解决方案:分布式服务框架提供Mock桩管理框架,当周边服务提供者尚未完成开发时,将路由切换到模拟测试模式,自动调用Mock桩;业务集成测试和上线时,则要能够自动切换到真实的服务提供者上,可以结合服务降级功能实现。.接口前向兼容性由于线上的Bug修复、内部重构和需求变更,服务提供者会经常修改内部实现,包括但不限于:接口参数变化、参数字段变化、业务逻辑变化和数据表结构变化。在实际项目中经常会发生服务提供者修改了接口或者数据结构,但是并没有及时知会到所有消费者,导致服务调用失败。解决方案:制定并严格执行《服务前向兼容性规范》,避免发生不兼容修改或者私自修改不通知周边的情况。接口兼容性技术保障:例如Thrift的IDL,支持新增、修改和删除字段,字段定义位置无关性,码流支持乱序等。.总结服务化之后,无论是服务化框架,还是业务服务,都面临诸多挑战,本章摘取了其中一些比较重要的问题,并给出解决方案和最佳实践。对于本章节没有列出的问题,则需要服务框架开发者和使用者在实践中探索,找出一条适合自己产品的服务化最佳实践。

  ⑷什么是reactor模式和proactor模式

  ⑸Reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个ServiceHandler,有多个RequestHandlers;ServiceHandler会对输入的请求(Event进行多路复用,并同步地将它们分发给相应的RequestHandler。

  ⑹Proactor模式是另一个消息异步通知的设计模式,与Reactor的最大区别在于,Proactor通知的不是就绪事件,而是操作完成事件,这也就是操作系统异步IO的主要模型。

  ⑺Reactor在实现上相对比较简单,对于大量对象,频繁从非就绪态触发到就绪态的场景处理十分高效;同时操作系统可以同时去等待多个对象触发,并且可以在事件触发后自由地选择后续执行流程,具有很高的灵活性。

  ⑻虽然并发编程实现阻塞式同步IO也可以实现同时等待多个对象触发的效果,但在编程的复杂度与资源的消耗等方面,Reactor模式拥有明显的优势。

  ⑼Tomcat篇-整体架构和I/O模型

  ⑽本文主要包括tomcat服务器的目录结构、工作模式、整体架构、I/O模型以及NIO、NIO、APR三者的对比介绍。

  ⑾我们先来看一下tomcat.和tomcat中的home目录中的文件:

  ⑿可以看到除掉一些说明文件之后,还有个目录:

  ⒀实际上除了主目录里有lib目录,在webapps目录下的web应用中的WEB-INF目录下也存在一个lib目录:

  ⒁●Tomcat主目录下的lib目录:存放的JAR文件不仅能被Tomcat访问,还能被所有在Tomcat中发布的JavaWeb应用访问●webapps目录下的JavaWeb应用的lib目录:存放的JAR文件只能被当前JavaWeb应用访问

  ⒂既然有多个lib目录,那么肯定就有使用的优先顺序,Tomcat类加载器的目录加载优先顺序如下:

  ⒃Tomcat的类加载器负责为Tomcat本身以及JavaWeb应用加载相关的类。假如Tomcat的类加载器要为一个JavaWeb应用加载一个类,类加载器会按照以下优先顺序到各个目录中去查找该类的.class文件,直到找到为止,如果所有目录中都不存在该类的.class文件,则会抛出异常:

  ⒄Tomcat不仅可以单独运行,还可以与其他的Web服务器集成,作为其他Web服务器的进程内或进程外的servlet容器。集成的意义在于:对于不支持运行JavaServlet的其他Web服务器,可通过集成Tomcat来提供运行Servlet的功能。

  ⒅Tomcat有三种工作模式:

  ⒆我们先从tomcat的源码目录来分析一下tomcat的整体架构,前面我们配置jsvc运行tomcat的时候,我们知道tomcat中启动运行的最主要的类是.apache.catalina.startup.Bootstrap,那么我们在tomcat的源码中的java目录下的目录的apache目录可以找到主要的源码的相对应的类。

  ⒇图中的目录如果画成架构图,可以这样表示:

  ⒈Tomcat本质上就是一款Servlet容器,因此catalina才是Tomcat的核心,其他模块都是为catalina提供支撑的。

  ⒉单线程阻塞I/O模型是最简单的一种服务器I/O模型,单线程即同时只能处理一个客户端的请求,阻塞即该线程会一直等待,直到处理完成为止。对于多个客户端访问,必须要等到前一个客户端访问结束才能进行下一个访问的处理,请求一个一个排队,只提供一问一答服务。

  ⒊如上图所示:这是一个同步阻塞服务器响应客户端访问的时间节点图。

  ⒋这种模型的特点在于单线程和阻塞I/O。单线程即服务器端只有一个线程处理客户端的所有请求,客户端连接与服务器端的处理线程比是n:,它无法同时处理多个连接,只能串行处理连接。而阻塞I/O是指服务器在读写数据时是阻塞的,读取客户端数据时要等待客户端发送数据并且把操作系统内核复制到用户进程中,这时才解除阻塞状态。写数据回客户端时要等待用户进程将数据写入内核并发送到客户端后才解除阻塞状态。这种阻塞带来了一个问题,服务器必须要等到客户端成功接收才能继续往下处理另外一个客户端的请求,在此期间线程将无法响应任何客户端请求。

  ⒌该模型的特点:它是最简单的服务器模型,整个运行过程都只有一个线程,只能支持同时处理一个客户端的请求(如果有多个客户端访问,就必须排队等待),服务器系统资源消耗较小,但并发能力低,容错能力差。

  ⒍多线程阻塞I/O模型在单线程阻塞I/O模型的基础上对其进行改进,加入多线程,提高并发能力,使其能够同时对多个客户端进行响应,多线程的核心就是利用多线程机制为每个客户端分配一个线程。

  ⒎如上图所示,服务器端开始监听客户端的访问,假如有两个客户端同时发送请求过来,服务器端在接收到客户端请求后分别创建两个线程对它们进行处理,每条线程负责一个客户端连接,直到响应完成。期间两个线程并发地为各自对应的客户端处理请求,包括读取客户端数据、处理客户端数据、写数据回客户端等操作。

  ⒏这种模型的I/O操作也是阻塞的,因为每个线程执行到读取或写入操作时都将进入阻塞状态,直到读取到客户端的数据或数据成功写入客户端后才解除阻塞状态。尽管I/O操作阻塞,但这种模式比单线程处理的性能明显高了,它不用等到第一个请求处理完才处理第二个,而是并发地处理客户端请求,客户端连接与服务器端处理线程的比例是:。

  ⒐多线程阻塞I/O模型的特点:支持对多个客户端并发响应,处理能力得到大幅提高,有较大的并发量,但服务器系统资源消耗量较大,而且如果线程数过多,多线程之间会产生较大的线程切换成本,同时拥有较复杂的结构。

  ⒑在探讨单线程非阻塞I/O模型前必须要先了解非阻塞情况下套接字事件的检测机制,因为对于单线程非阻塞模型最重要的事情是检测哪些连接有感兴趣的事件发生。一般会有如下三种检测方式。

  ⒒当多个客户端向服务器请求时,服务器端会保存一个套接字连接列表中,应用层线程对套接字列表轮询尝试读取或写入。如果成功则进行处理,如果失败则下次继续。这样不管有多少个套接字连接,它们都可以被一个线程管理,这很好地利用了阻塞的时间,处理能力得到提升。

  ⒓但这种模型需要在应用程序中遍历所有的套接字列表,同时需要处理数据的拼接,连接空闲时可能也会占用较多CPU资源,不适合实际使用。

  ⒔这种方式将套接字的遍历工作交给了操作系统内核,把对套接字遍历的结果组织成一系列的事件列表并返回应用层处理。对于应用层,它们需要处理的对象就是这些事件,这是一种事件驱动的非阻塞方式。

  ⒕服务器端有多个客户端连接,应用层向内核请求读写事件列表。内核遍历所有套接字并生成对应的可读列表readList和可写列表writeList。readList和writeList则标明了每个套接字是否可读/可写。应用层遍历读写事件列表readList和writeList,做相应的读写操作。

  ⒖内核遍历套接字时已经不用在应用层对所有套接字进行遍历,将遍历工作下移到内核层,这种方式有助于提高检测效率。然而,它需要将所有连接的可读事件列表和可写事件列表传到应用层,假如套接字连接数量变大,列表从内核复制到应用层也是不小的开销。另外,当活跃连接较少时,内核与应用层之间存在很多无效的数据副本,因为它将活跃和不活跃的连接状态都复制到应用层中。

  ⒗通过遍历的方式检测套接字是否可读可写是一种效率比较低的方式,不管是在应用层中遍历还是在内核中遍历。所以需要另外一种机制来优化遍历的方式,那就是回调函数。内核中的套接字都对应一个回调函数,当客户端往套接字发送数据时,内核从网卡接收数据后就会调用回调函数,在回调函数中维护事件列表,应用层获取此事件列表即可得到所有感兴趣的事件。

  ⒘内核基于回调的事件检测方式有两种

  ⒙第一种是用可读列表readList和可写列表writeList标记读写事件,套接字的数量与readList和writeList两个列表的长度一样。

  ⒚上面两种方式由操作系统内核维护客户端的所有连接并通过回调函数不断更新事件列表,而应用层线程只要遍历这些事件列表即可知道可读取或可写入的连接,进而对这些连接进行读写操作,极大提高了检测效率,自然处理能力也更强。

  ⒛单线程非阻塞I/O模型最重要的一个特点是,在调用读取或写入接口后立即返回,而不会进入阻塞状态。虽然只有一个线程,但是它通过把非阻塞读写操作与上面几种检测机制配合就可以实现对多个连接的及时处理,而不会因为某个连接的阻塞操作导致其他连接无法处理。在客户端连接大多数都保持活跃的情况下,这个线程会一直循环处理这些连接,它很好地利用了阻塞的时间,大大提高了这个线程的执行效率。

  单线程非阻塞I/O模型的主要优势体现在对多个连接的管理,一般在同时需要处理多个连接的发场景中会使用非阻塞NIO模式,此模型下只通过一个线程去维护和处理连接,这样大大提高了机器的效率。一般服务器端才会使用NIO模式,而对于客户端,出于方便及习惯,可使用阻塞模式的套接字进行通信。

  在多核的机器上可以通过多线程继续提高机器效率。最朴实、最自然的做法就是将客户端连接按组分配给若干线程,每个线程负责处理对应组内的连接。比如有个客户端访问服务器,服务器将套接字和套接字交由线程管理,而线程则管理套接字和套接字,通过事件检测及非阻塞读写就可以让每个线程都能高效处理。

  多线程非阻塞I/O模式让服务器端处理能力得到很大提高,它充分利用机器的CPU,适合用于处理高并发的场景,但它也让程序更复杂,更容易出现问题(死锁、数据不一致等经典并发问题。

  最经典的多线程非阻塞I/O模型方式是Reactor模式。首先看单线程下的Reactor,Reactor将服务器端的整个处理过程分成若干个事件,例如分为接收事件、读事件、写事件、执行事件等。Reactor通过事件检测机制将这些事件分发给不同处理器去处理。在整个过程中只要有待处理的事件存在,即可以让Reactor线程不断往下执行,而不会阻塞在某处,所以处理效率很高。

  基于单线程Reactor模型,根据实际使用场景,把它改进成多线程模式。常见的有两种方式:一种是在耗时的process处理器中引入多线程,如使用线程池;另一种是直接使用多个Reactor实例,每个Reactor实例对应一个线程。

  Reactor模式的一种改进方式如下图所示。其整体结构基本上与单线程的Reactor类似,只是引入了一个线程池。由于对连接的接收、对数据的读取和对数据的写入等操作基本上都耗时较少,因此把它们都放到Reactor线程中处理。然而,对于逻辑处理可能比较耗时的工作,可以在process处理器中引入线程池,process处理器自己不执行任务,而是交给线程池,从而在Reactor线程中避免了耗时的操作。将耗时的操作转移到线程池中后,尽管Reactor只有一个线程,它也能保证Reactor的高效。

  Reactor模式的另一种改进方式如下图所示。其中有多个Reactor实例,每个Reactor实例对应一个线程。因为接收事件是相对于服务器端而言的,所以客户端的连接接收工作统一由一个aept处理器负责,aept处理器会将接收的客户端连接均匀分配给所有Reactor实例,每个Reactor实例负责处理分配到该Reactor上的客户端连接,包括连接的读数据、写数据和逻辑处理。这就是多Reactor实例的原理。

  Tomcat支持的I/O模型如下表(自./.版本起,Tomcat移除了对BIO的支持,在.之前,Tomcat默认采用的I/O方式为BIO,之后改为NIO。无论NIO、NIO还是APR,在性能方面均优于以往的BIO。

  Tomcat中的NIO模型是使用的JAVA的NIO类库,其内部的IO实现是同步的(也就是在用户态和内核态之间的数据交换上是同步机制,采用基于selector实现的异步事件驱动机制(这里的异步指的是selector这个实现模型是使用的异步机制。而对于Java来说,非阻塞I/O的实现完全是基于操作系统内核的非阻塞I/O,它将操作系统的非阻塞I/O的差异屏蔽并提供统一的API,让我们不必关心操作系统。JDK会帮我们选择非阻塞I/O的实现方式。

  NIO和前者相比的最大不同就在于引入了异步通道来实现异步IO操作,因此也叫AIO(AsynchronousI/O。NIO.的异步通道APIs提供方便的、平台独立的执行异步操作的标准方法。这使得应用程序开发人员能够以更清晰的方式来编写程序,而不必定义自己的Java线程,此外,还可通过使用底层OS所支持的异步功能来提高性能。如同其他JavaAPI一样,API可利用的OS自有异步功能的数量取决于其对该平台的支持程度。

  异步通道提供支持连接、读取、以及写入之类非锁定操作的连接,并提供对已启动操作的控制机制。Java中用于JavaPlatform(NIO.的MoreNewI/OAPIs,通过在java.nio.channels包中增加四个异步通道类,从而增强了Java.中的NewI/OAPIs(NIO,这些类在风格上与NIO通道API很相似。他们共享相同的方法与参数结构体,并且大多数对于NIO通道类可用的参数,对于新的异步版本仍然可用。主要区别在于新通道可使一些操作异步执行。

  异步通道API提供两种对已启动异步操作的监测与控制机制。第一种是通过返回一个java.util.concurrent.Future对象来实现,它将会建模一个挂起操作,并可用于查询其状态以及获取结果。第二种是通过传递给操作一个新类的对象,java.nio.channels.pletionHandler,来完成,它会定义在操作完毕后所执行的处理程序方法。每个异步通道类为每个操作定义API副本,这样可采用任一机制。

  Apache可移植运行时(ApachePortableRuntime,APR是ApacheHTTP服务器的支持库,最初,APR是作为ApacheHTTP服务器的一部分而存在的,后来成为一个单独的项目。其他的应用程序可以使用APR来实现平台无关性(跨平台。APR提供了一组映射到下层操作系统的API,如果操作系统不支持某个特定的功能,APR将提供一个模拟的实现。这样程序员使用APR编写真正可在不同平台上移植的程序。

  顺利安装完成后会显示apr的lib库路径,一般都是/usr/local/apr/lib

  安装完成之后我们还需要修改环境变量和配置参数

  这里我们使用的是systemd调用jsvc来启动tomcat,所以我们直接在systemd对应的tomcat的unit文件中的ExecStart中添加一个路径参数-Djava.library.path=/usr/local/apr/lib指向apr库的路径:

  然后我们在tomcat的home目录下的conf子目录中对server.xml文件进行修改

  把端口对应的配置修改成apr:(其他端口配置也类似

  重启tomcat服务我们从tomcat的日志中就可以看到协议已经从默认的NIO变成了apr。

  NIO性能是最差的这是毋庸置疑的,如果是考虑到高并发的情况,显然异步非阻塞I/O模式的NIO和APR库在性能上更有优势,实际上NIO的性能表现也和APR不相上下,但是NIO要求Tomcat的版本要在.以上,而APR只需要.以上即可,但是APR需要额外配置库环境,相对于内置集成的NIO来说APR这个操作比较麻烦,两者各有优劣。具体使用哪个还是需要结合实际业务需求和环境进行测试才能决定。

  面试题:Java框架ty的io结构是什么

  主从Reactor多线程Nio结构,主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个个单独的NIO线程,而是一个独立的NIO线程池。Aeptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等,将新创建的SocketChannel注册到IO线程池(subreactor线程池的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Aeptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操作。xdxa利用主从NIO线程模型,可以解决个服务端监听线程无法有效处理所有客户端连接的性能不足问题。xdxa它的工作流程总结如下:xdxa从主线程池中随机选择一个Reactor线程作为Aeptor线程,用于绑定监听端口,接收客户端连接;xdxaAeptor线程接收客户端连接请求之后创建新的SocketChannel,将其注册到主线程池的其它Reactor线程上,由其负责接入认证、IP黑白名单过滤、握手等操作;xdxa步骤完成之后,业务层的链路正式建立,将SocketChannel从主线程池的Reactor线程的多路复用器上摘除,重新注册到Sub线程池的线程上,用于处理I/O的读写操作。

  ty的Reactor多线程模型,NioEventLoop,ChannelPipeline简介

  如果在Google上搜索“ty高性能易用“,在找到的一大批文章,你大概率会看到这张图,外加关键字

  NIO,Reactor多线程模型,异步串行无锁化,堆外内存,pipeline,翻看完这些文章后可以让你对ty的原理有大致了解,但是ty如何实现这些的呢?本文将尽可能简单的解释ty中Reactor多线程的实现,如有错误感谢指出.

  Selector是NIO的重要组件,Selector上可以注册Channel.Channel在注册的时候会标注自己感兴趣的事件:

  Channel,通道,为了便于理解,我把它分为三类

  Reactor多线程模型可以分为三块

  mainReactor负责客户端接入

  aeptor负责将接入的连接移交给subReactor

  subReactor负责连接的读写

  ChannelPipeline的设计思想是责任链设计模式,是由ChannelHandlerContext组成的双向链表,,首尾固定为HeadContext和TailContext,它们作为哨兵存在.当我们添加一个ChannelHandler到ChannelPipeline时,会先包装成ChannelHandlerContext再添加进去.

  inbound事件传播

  客户端向服务端发送消息,这个流向就称为inbound.消息会从Head开始由左向右传递直到Tail,由Tail进行收尾操作

  outbound事件传播

  服务端向客户端发送信息,这个流向称为outbound,消息会从Tail开始由右向左传递知道Head,由Head进行收尾操作

  当某个ChannelHandler操作抛出异常,会从该handler开始向Tail传递.由Tail做收尾操作.

  学习ty,要理解Reactor模型,并把它和ty的实现结合起来,我学习ty的时候就因为这块认识不深刻,浪费了很多时间也没有成效,共勉

  RocketMQ第五讲

  broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,把结果返回给消费者。消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认G,文件名长度为位,左边补零,剩余为起始偏移量,比如代表了第一个文件,起始偏移量为,文件大小为G=;当第一个文件写满了,第二个文件为,起始偏移量为,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;mitLog文件中保存了消息的全量内容。不同的Topic的消息,在mitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的mitLog中。broker启动了一个专门的线程来构建索引,把mitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。也可根据返回的offsetMsgId,解析出ip,端口和mitLog中的物理消息偏移量,直接去mitLog中取数据。引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历mitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列作为消费消息的索引,保存了指定Topic下的队列消息在mitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的mitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共个字节,分别为字节的mitlog物理偏移量、字节的消息长度、字节taghashcode,单个文件由W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约.M。IndexFile(索引文件提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为M,一个IndexFile可以保存W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。按照MessageKey查询消息的时候,会用到这个索引文件。IndexFile索引文件为用户提供通过“按照MessageKey查询消息”的消息索引查询服务,IndexFile文件的存储位置是:{fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于+W+W=个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用topic+“#”+UNIQ_KEY的value作为key来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔,也会用topic+“#”+KEY来做索引。其中的索引数据包含了KeyHash/mitLogOffset/Timestamp/NextIndexoffset这四个字段,一共Byte。NextIndexoffset即前面读出来的slotValue,如果有hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个IndexFile的结构如图,Byte的Header用于保存一些总的统计信息,W的SlotTable并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。W是真正的索引数据,即一个IndexFile可以保存W个索引。“按照MessageKey查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的mitLogoffset从mitLog文件中读取消息的实体内容。RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Produce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。属性和方法很多,就不往这里放了。文件存储实现类,包括多个内部类·对于文件夹下的一个文件上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。RocketMQ的RPC通信采用ty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档:Reactor线程模型上面的框图中可以大致了解RocketMQ中tyRemotingServer的Reactor多线程模型。一个Reactor主线程(eventLoopGroupBoss,即为上面的负责监听TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置,然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为,在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutroup(即为上面的“M”,源码中默认设置为去做。而处理业务操作放在业务线程池中执行,根据Romotingmand的业务请求码code去processorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的“M”。上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。AsyncAppender-Worker-Thread-:异步打印日志,logback使用,应该是守护线程FileWatchService:tyEventExecutor:tyNIOBoss_:一个tyServerNIOSelector_:默认为三个NSScheduledThread:定时任务线程ServerHouseKeepingService:守护线程ThreadDeathWatch--:守护线程,ty用,已经废弃RemotingExecutorThread(-:工作线程池,没有共用tyServerNIOSelector_,直接初始化个线程AsyncAppender-Worker-Thread-:异步打印日志,logback使用,共九个:RocketmqBrokerAppender_innerRocketmqFilterAppender_innerRocketmqProtectionAppender_innerRocketmqRemotingAppender_innerRocketmqRebalanceLockAppender_innerRocketmqStoreAppender_innerRocketmqStoreErrorAppender_innerRocketmqWaterMarkAppender_innerRocketmqTransactionAppender_innerSendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGEPullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGEProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGEQueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGEAdminBrokerThread_:remotingServer.registerDefaultProcessorClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENTHeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEATEndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTIONConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSETbrokerOutApi_thread_:BrokerController.registerBrokerAll(true,false,true);==================================================================BrokerControllerScheduledThread:=》BrokerController.this.getBrokerStats().record();BrokerController.this.consumerOffsetManager.persist();BrokerController.this.consumerFilterManager.persist();BrokerController.this.protectBroker();BrokerController.this.printWaterMark();log.info(“dispatchbehindmitlog{}bytes“,BrokerController.this.getMessageStore().dispatchBehindBytes());BrokerController.this.brokerOuterAPI.fetchNameServerAddr();BrokerController.this.printMasterAndSlaveDiff();BrokerController.this.registerBrokerAll(true,false,brokerConfig.isForceRegister());BrokerFastFailureScheduledThread:=》FilterServerManagerScheduledThread:=》FilterServerManager.this.createFilterServer();ClientHousekeepingScheduledThread:=》ClientHousekeepingService.this.scanExceptionChannel();PullRequestHoldServiceFileWatchServiceAllocateMappedFileServiceAeptSocketServiceBrokerStatsThread

  Redis经典设计模式-Reactor

  每个经典的系统服务组件,我们总是能发现其赖以高性能的机制。所以我们这次也是一样,借着Redis的专题,用实例来写点对Reactor模式的理解。首先我们免不了俗,简单介绍下Reactor的相关概念:

  Reactor模式的角色构成(Reactor模式一共有中角色构成):

  TODO:等有时间了实现一个多Reactor线程池模式的Demo放上来。

  Spark通信框架Sparkworkmon

  一直以来,基于Akka实现的RPC通信框架是Spark引以为豪的主要特性,也是与Hadoop等分布式计算框架对比过程中一大亮点,但是时代和技术都在演化,从Spark..版本开始,为了解决大块数据(如Shuffle的传输问题,Spark引入了ty通信框架,到了..版本,ty完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。

  JAVAIO也经历了几次演化,从最早的BIO(阻塞式/非阻塞IO,到.版本的NIO(IO复用,到.版本的NIO./AIO(异步IO。基于早期BIO来实现高并发网络服务器都是依赖多线程来实现,但是线程开销较大,BIO的瓶颈明显,NIO的出现解决了这一大难题,基于IO复用解决了IO高并发。但是NIO有也有几个缺点:

  因为这几个原因,促使了很多JAVA-IO通信框架的出现,ty就是其中一员,它也因为高度的稳定性,功能性,性能等特性,成为Java开发的首选

  首先是NIO的上层封装,ty提供了NioEventLoopGroup/NioSocketChannel/NioServerSocketChannel的组合来完成实际IO操作,继而在此之上实现数据流Pipeline以及EventLoop线程池等功能。

  另外它又重写了NIO,JDK-NIO底层是基于Epoll的LT模式来实现,而ty是基于Epoll的ET模式实现的一组IO操作EpollEventLoopGroup/EpollSocketChannel/EpollServerSocketChannelty对两种实现进行完美的封装,可以根据业务的需求来选择不同的实现

  从Akka出现背景来说,它是基于Actor的RPC通信系统,它的核心概念也是Message,它是基于协程的,性能不容置疑;基于scala的偏函数,易用性也没有话说,但是它毕竟只是RPC通信,无法适用大的package/stream的数据传输,这也是Spark早期引入ty的原因。

  首先不容置疑的是Akka可以做到的,ty也可以做到,但是ty可以做到,Akka却无法做到。原因是啥?在软件栈中,Akka相比ty要Higher一点,它专门针对RPC做了很多事情,而ty相比更加基础一点,可以为不同的应用层通信协议(RPC,FTP,HTTP等提供支持,在早期的Akka版本,底层的NIO通信就是用的ty。

  其次一个优雅的工程师是不会允许一个系统中容纳两套通信框架!最后,虽然ty没有Akka协程级的性能优势,但是ty内部高效的Reactor线程模型,无锁化的串行设计,高效的序列化,零拷贝,内存池等特性也保证了ty不会存在性能问题。那么Spark是怎么用ty来取代Akka呢?一句话,利用偏函数的特性,基于ty“仿造”出一个简约版本的Actor模型。

  对于work通信,不管传输的是序列化后的对象还是文件,在网络上表现的都是字节流。在传统IO中,字节流表示为Stream;在NIO中,字节流表示为ByteBuffer;在ty中字节流表示为ByteBuff或FileRegion;在Spark中,针对Byte也做了一层包装,支持对Byte和文件流进行处理,即ManagedBuffer;ManagedBuffer包含了三个函数createInputStream(),nioByteBuffer(),convertToty()来对Buffer进行“类型转换”,分别获取stream,ByteBuffer,ByteBuff或FileRegion;NioManagedBuffer/tyManagedBuffer/FileSegmentManagedBuffer也是针对性提供了具体的实现。

  更好的理解ManagedBuffer:比如ShuffleBlockManager模块需要在内存中维护本地executor生成的shuffle-map输出的文件引用,从而可以提供给shuffleFetch进行远程读取,此时文件表示为FileSegmentManagedBuffer,shuffleFetch远程调用FileSegmentManagedBuffer.nioByteBuffer/createInputStream函数从文件中读取为Bytes,并进行后面的网络传输。如果已经在内存中bytes就更好理解了,比如将一个字符数组表示为tyManagedBuffer。

  协议是应用层通信的基础,它提供了应用层通信的数据表示,以及编码和解码的能力。在Sparkworkmon中,继承AKKA中的定义,将协议命名为Message,它继承Encodable,提供了encode的能力。

  Message根据请求响应可以划分为RequestMessage和ResponseMessage两种;对于Response,根据处理结果,可以划分为Failure和Suess两种类型;根据功能的不同,主要划分为Stream,ChunkFetch,Rpc。

  Server构建在ty之上,它提供两种模型NIO和Epoll,可以通过参数(spark..io.mode)进行配置,最基础的module就是shuffle,不同的IOMode选型,对应了ty底层不同的实现,Server的Init过程中,最重要的步骤就是根据不同的IOModel完成EventLoop和Pipeline的构造

  其中,MessageEncoder/Decoder针对网络包到Message的编码和解码,而最为核心就TransportRequestHandler,它封装了对所有请求/响应的处理;

  TransportChannelHandler内部实现也很简单,它封装了responseHandler和requestHandler,当从ty中读取一条Message以后,根据判断路由给相应的responseHandler和requestHandler。

  Sever提供的RPC,ChunkFecth,Stream的功能都是依赖TransportRequestHandler来实现的;从原理上来说,RPC与ChunkFecth/Stream还是有很大不同的,其中RPC对于TransportRequestHandler来说是功能依赖,而ChunkFecth/Stream对于TransportRequestHandler来说只是数据依赖。

  怎么理解?即TransportRequestHandler已经提供了ChunkFecth/Stream的实现,只需要在构造的时候,向TransportRequestHandler提供一个streamManager,告诉RequestHandler从哪里可以读取到Chunk或者Stream。而RPC需要向TransportRequestHandler注册一个rpcHandler,针对每个RPC接口进行功能实现,同时RPC与ChunkFecth/Stream都会有同一个streamManager的依赖,因此注入到TransportRequestHandler中的streamManager也是依赖rpcHandler来实现,即rpcHandler中提供了RPC功能实现和streamManager的数据依赖。

  Server是通过监听一个端口,注入rpcHandler和streamManager从而对外提供RPC,ChunkFecth,Stream的服务,而Client即为一个客户端类,通过该类,可以将一个streamId/chunkIndex对应的ChunkFetch请求,streamId对应的Stream请求,以及一个RPC数据包对应的RPC请求发送到服务端,并监听和处理来自服务端的响应;其中最重要的两个类即为TransportClient和TransportResponseHandler分别为上述的“客户端类”和“监听和处理来自服务端的响应“。

  那么TransportClient和TransportResponseHandler是怎么配合一起完成Client的工作呢?由TransportClient将用户的RPC,ChunkFecth,Stream的请求进行打包并发送到Server端,同时将用户提供的回调函数注册到TransportResponseHandler,TransportResponseHandler是TransportChannelHandler的一部分,在TransportChannelHandler接收到数据包,并判断为响应包以后,将包数据路由到TransportResponseHandler中,在TransportResponseHandler中通过注册的回调函数,将响应包的数据返回给客户端

  无论是BlockTransfer还是ShuffleFetch都需要跨executor的数据传输,在每一个executor里面都需要运行一个Server线程(后面也会分析到,对于Shuffle也可能是一个独立的ShuffleServer进程存在来提供对Block数据的远程读写服务

  在每个Executor里面,都有一个BlockManager模块,它提供了对当前Executor所有的Block的“本地管理”,并对进程内其他模块暴露getBlockData(blockId:BlockId):ManagedBuffer的Block读取接口,但是这里GetBlockData仅仅是提供本地的管理功能,对于跨远程的Block传输,则由tyBlockTransferService提供服务。

  tyBlockTransferService本身即是Server,为其他其他远程Executor提供Block的读取功能,同时它即为Client,为本地其他模块暴露fetchBlocks的接口,支持通过host/port拉取任何Executor上的一组的Blocks。源码位置spark-core:.apache.spark.work.ty

  tyBlockTransferService作为一个Server,与Executor或Driver里面其他的服务一样,在进程启动时,由SparkEnv初始化构造并启动服务,在整个运行时的一部分。

  一个Server的构造依赖RpcHandler提供RPC的功能注入以及提供streamManager的数据注入。对于tyBlockTransferService,该RpcHandler即为tyBlockRpcServer,在构造的过程中,需要与本地的BlockManager进行管理,从而支持对外提供本地BlockMananger中管理的数据

  RpcHandler提供RPC的功能注入在这里还是属于比较“简陋的”,毕竟他是属于数据传输模块,Server中提供的chunkFetch和stream已经足够满足他的功能需要,那现在问题就是怎么从streamManager中读取数据来提供给chunkFetch和stream进行使用呢?就是tyBlockRpcServer作为RpcHandler提供的一个Rpc接口之一:OpenBlocks,它接受由Client提供一个Blockids列表,Server根据该BlockIds从BlockManager获取到相应的数据并注册到streamManager中,同时返回一个StreamID,后续Client即可以使用该StreamID发起ChunkFetch的操作。

  从tyBlockTransferService作为一个Server,我们基本可以推测tyBlockTransferService作为一个Client支持fetchBlocks的功能的基本方法:

  同时,为了提高服务端稳定性,针对fetchBlocks操作tyBlockTransferService提供了非重试版本和重试版本的BlockFetcher,分别为OneForOneBlockFetcher和RetryingBlockFetcher,通过参数(spark..io.maxRetries)进行配置,默认是重试次

  在Spark,Block有各种类型,可以是ShuffleBlock,也可以是BroadcastBlock等等,对于ShuffleBlock的Fetch,除了由Executor内部的tyBlockTransferService提供服务以外,也可以由外部的ShuffleService来充当Server的功能,并由专门的ExternalShuffleClient来与其进行交互,从而获取到相应Block数据。功能的原理和实现,基本一致,但是问题来了,为什么需要一个专门的ShuffleService服务呢?主要原因还是为了做到任务隔离,即减轻因为fetch带来对Executor的压力,让其专心的进行数据的计算。

  在目前Spark中,也提供了这样的一个AuxiliaryService:YarnShuffleService,但是对于Spark不是必须的,如果你考虑到需要“通过减轻因为fetch带来对Executor的压力”,那么就可以尝试尝试。

  同时,如果启用了外部的ShuffleService,对于shuffleClient也不是使用上面的tyBlockTransferService,而是专门的ExternalShuffleClient,功能逻辑基本一致!

  Akka的通信模型是基于Actor,一个Actor可以理解为一个Service服务对象,它可以针对相应的RPC请求进行处理,如下所示,定义了一个最为基本的Actor:

  Actor内部只有唯一一个变量(当然也可以理解为函数了,即Receive,它为一个偏函数,通过case语句可以针对Any信息可以进行相应的处理,这里Any消息在实际项目中就是消息包。

  另外一个很重要的概念就是ActorSystem,它是一个Actor的容器,多个Actor可以通过name-》Actor的注册到Actor中,在ActorSystem中可以根据请求不同将请求路由给相应的Actor。ActorSystem和一组Actor构成一个完整的Server端,此时客户端通过host:port与ActorSystem建立连接,通过指定name就可以相应的Actor进行通信,这里客户端就是ActorRef。所有Akka整个RPC通信系列是由Actor,ActorRef,ActorSystem组成。

  Spark基于这个思想在上述的work的基础上实现一套自己的RPCActor模型,从而取代Akka。其中RpcEndpoint对应Actor,RpcEndpointRef对应ActorRef,RpcEnv即对应了ActorSystem。

  RpcEndpoint与Actor一样,不同RPCServer可以根据业务需要指定相应receive/receiveAndReply的实现,在Spark内部现在有N多个这样的Actor,比如Executor就是一个Actor,它处理来自Driver的LaunchTask/KillTask等消息。

  RpcEnv相对于ActorSystem:

  RpcEndpointRef即为与相应Endpoint通信的引用,它对外暴露了send/ask等接口,实现将一个Message发送到Endpoint中。

  这就是新版本的RPC框架的基本功能,它的实现基本上与Akka无缝对接,业务的迁移的功能很小,目前基本上都全部迁移完了。

  RpcEnv不仅从外部接口与Akka基本一致,在内部的实现上,也基本差不多,都是按照MailBox的设计思路来实现的;

  RpcEnv即充当着Server,同时也为Client内部实现。当作为Server,RpcEnv会初始化一个Server,并注册tyRpcHandler。RpcHandler的receive接口负责对每一个请求进行处理,一般情况下,简单业务可以在RpcHandler直接完成请求的处理,但是考虑一个RpcEnv的Server上会挂载了很多个RpcEndpoint,每个RpcEndpoint的RPC请求频率不可控,因此需要对一定的分发机制和队列来维护这些请求,其中Dispatcher为分发器,InBox即为请求队列;

  在将RpcEndpoint注册到RpcEnv过程中,也间接的将RpcEnv注册到Dispatcher分发器中,Dispatcher针对每个RpcEndpoint维护一个InBox,在Dispatcher维持一个线程池(线程池大小默认为系统可用的核数,当然也可以通过spark.rpc.ty.dispatcher.numThreads进行配置,线程针对每个InBox里面的请求进行处理。当然实际的处理过程是由RpcEndpoint来完成。

  其次RpcEnv也完成Client的功能实现,RpcEndpointRef是以RpcEndpoint为单位,即如果一个进程需要和远程机器上N个RpcEndpoint服务进行通信,就对应N个RpcEndpointRef(后端的实际的网络连接是公用,这个是TransportClient内部提供了连接池来实现的,当调用一个RpcEndpointRef的ask/send等接口时候,会将把“消息内容+RpcEndpointRef+本地地址”一起打包为一个RequestMessage,交由RpcEnv进行发送。注意这里打包的消息里面包括RpcEndpointRef本身是很重要的,从而可以由Server端识别出这个消息对应的是哪一个RpcEndpoint。

  和发送端一样,在RpcEnv中,针对每个remote端的host:port维护一个队列,即OutBox,RpcEnv的发送仅仅是把消息放入到相应的队列中,但是和发送端不一样的是:在OutBox中没有维护一个所谓的线程池来定时清理OutBox,而是通过一堆synchronized来实现的,add之后立刻消费。

  摘自:Github/ColZer

  微服务架构的分布式事务问题如何处理

  分布式系统架构中,分布式事务问题是一个绕不过去的挑战。而微服务架构的流行,让分布式事问题日益突出!

  下面我们以电商购物支付流程中,在各大参与者系统中可能会遇到分布式事务问题的场景进行详细的分析!

  如上图所示,假设三大参与平台(电商平台、支付平台、银行的系统都做了分布式系统架构拆分,按上数中的流程步骤进行分析:

  电商平台中创建订单:预留库存、预扣减积分、锁定优惠券,此时电商平台内各服务间会有分布式事务问题,因为此时已经要跨多个内部服务修改数据;

  支付平台中创建支付订单(选银行卡支付:查询账户、查询限制规则,符合条件的就创建支付订单并跳转银行,此时不会有分布式事务问题,因为还不会跨服务改数据;

  银行平台中创建交易订单:查找账户、创建交易记录、判断账户余额并扣款、增加积分、通知支付平台,此时也会有分布式事务问题(如果是服务化架构的话;

  支付平台收到银行扣款结果:更改订单状态、给账户加款、给积分帐户增加积分、生成会计分录、通知电商平台等,此时也会有分布式事务问题;

  电商平台收到支付平台的支付结果:更改订单状态、扣减库存、扣减积分、使用优惠券、增加消费积分等,系统内部各服务间调用也会遇到分布式事问题;

  如上图,支付平台收到银行扣款结果后的内部处理流程:

  支付平台的支付网关对银行通知结果进行校验,然后调用支付订单服务执行支付订单处理;

  支付订单服务根据银行扣款结果更改支付订单状态;

  调用资金账户服务给电商平台的商户账户加款(实际过程中可能还会有各种的成本计费;如果是余额支付,还可能是同时从用户账户扣款,给商户账户加款;

  调用积分服务给用户积分账户增加积分;

  调用会计服务向会计(财务系统写进交易原始凭证生成会计分录;

  调用通知服务将支付处理结果通知电商平台;

  如上图,把支付系统中的银行扣款成功回调处理流程提取出来,对应的分布式事务问题的代码场景:

  /**支付订单处理**/

  Transactional(rollbackFor=Exception.class)

  publicvoidpleteOrder(){

  orderDao.update();?//订单服务本地更新订单状态

  aountService.update();?//调用资金账户服务给资金帐户加款

  pointService.update();?//调用积分服务给积分帐户增加积分

  aountingService.insert();?//调用会计服务向会计系统写入会计原始凭证

  merchantNotifyService.notify();?//调用商户通知服务向商户发送支付结果通知

  本地事务控制还可行吗?

  以上分布式事务问题,需要多种分布式事务解决方案来进行处理。

  资金账户加款、积分账户增加积分:T型事务(或两阶段提交型事务,实时性要求比较高,数据必须可靠。

  会计记账:异步确保型事务(基于可靠消息的最终一致性,可以异步,但数据绝对不能丢,而且一定要记账成功

  商户通知:最大努力通知型事务(按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对

  为什么要用ty开发

  ty是由JBOSS提供的基于JavaNIO的开源框架,ty提供异步非阻塞、事件驱动、高性能、高可靠、高可定制性的网络应用程序和工具,可用于开发服务端和客户端。

  JAVA原先是采用的是传统的BIO,为什么后来又研发出了NIO呢?

  首先看看传统的基于同步阻塞IO(BIO的线程模型图

  BIO主要存在以下缺点:

  从线程模型图中可以看到,一连接一线程,由于线程数是有限的,所以这样的模型是非常消耗资源的,

  最终也导致它不能承受高并发连接的需求

  性能低,因为频繁的进行上下文切换,导致CUP利用率低

  可靠性差,由于所有的IO操作都是同步的,即使是业务线程也如此,所以业务线程的IO操作也有可能被阻塞,

  这将导致系统过分依赖网络的实时情况和外部组件的处理能力,可靠性大大降低

  上面的原因就是导致早期的高性能服务器为什么不选用JAVA开发,而是选用C/C++的重要原因。

  为了解决上面的问题,NIO横空出世,下面是NIO的线程模型图

  NIO采用了Reactor线程模型,一个Reactor聚合了一个多路复用器Selector,它可以同时注册、监听和轮询

  成百上千个Channel,这样一个IO线程可以同时处理很多个客户端连接,线程模型优化为:N(N《最大句柄、数),

  或M:N(M通常为CUP核数+)

  避免了IO线程频繁的上下文切换,提升了CUP的效率

  所有的IO操作都是异步的,所以业务线程的IO操作就不用担心阻塞,系统降低了对网络的实时情况和外部组件

  为什么不直接用JDK原生的NIO而选用ty框架?

  先看看JDK的NIO中服务端和客户端的时序图

  从图中我们可以看到,使用JDK原生NIO的不足之处

  NIO的类库和API相当复杂,使用它来开发,需要非常熟练地掌握Selector、ByteBuffer、ServerSocketChannel、SocketChannel等

  需要很多额外的编程技能来辅助使用NIO,例如,因为NIO涉及了Reactor线程模型,所以必须必须对多线程和网络编程非常熟悉才能写出高质量的NIO程序

  想要有高可靠性,工作量和难度都非常的大,因为服务端需要面临客户端频繁的接入和断开、网络闪断、半包读写、失败缓存、网络阻塞的问题,这些将严重影响我们的可靠性,而使用原生NIO解决它们的难度相当大。

  JDKNIO中著名的BUG--epoll空轮询,当select返回时,会导致Selector空轮询而导致CUP%,官方表示JDK.之后修复了这个问题,其实只是发生的概率降低了,没有根本上解决。

  那么为什么要用ty呢?

  API使用简单,更容易上手,开发门槛低

  功能强大,预置了多种编解码功能,支持多种主流协议

  定制能力高,可以通过ChannelHandler对通信框架进行灵活地拓展

  高性能,与目前多种NIO主流框架相比,ty综合性能最高

  高稳定性,解决了JDKNIO的BUG

  经历了大规模的商业应用考验,质量和可靠性都有很好的验证。

  ty能提供什么服务?

  开发异步非阻塞的TCP网络应用程序

  开发异步非阻塞的UDP网络应用程序

  开发异步文件传输程序

  开发异步HTTP程序的服务端和客户端

  提供多种编解码的集成框架,包括谷歌Protobuf、JBossMarshalling、Java序列化、压缩编解码、XML解码、

  字符串编解码等都可以由用户直接使用

  提供形式多样的编解码基础类库,可以方便地进行私有协议栈编解码框架的二次开发

  基于职责链的Pipeline-Handler机制,可以方便地对网络事件进行拦截和定制

  所有的IO操作都是异步的,用户可以通过Future-Listeren机制主动get结果或者等IO线程完成操作之后主动Notify来通知,

  用户业务线程不需要同步等待

  基于链路空闲事件监测的心跳机制

您可能感兴趣的文章:

相关文章