react怎么刷新页面(Reactor3参考指南)
上一篇[4、Reactor 核心特性(3)]
下一篇[5、Kotlin 支持]
英文原文:https://projectreactor.io/docs/core/3.2.11.RELEASE/reference/index.html#core-features GitHub:https://github.com/jijicai/ProjectReactor/tree/master/book/Reactor3
处理器(Processors)处理器是一种特殊的 Publisher,也是 Subscriber。这意味着你可以订阅 Processor(通常,它们实现 Flux),但是你也可以调用方法来手动将数据注入序列或者终止它。
有几种处理器,每种都有一些特定的语义,但是在你开始研究这些处理器之前,你需要问自己以下问题:
你需要处理器吗?
大多数时候,你应该尽量避免使用 Processor。它们很难正确使用,而且容易出现一些死角情况。
如果你认为 Processor 可以很好地与你的用例匹配,那么问问你自己是否尝试过这两种方法: (1)一个操作符或多个操作符的组合是否符合要求?(查看:我需要哪个操作符?) (https://projectreactor.io/docs/core/release/reference/#which-operator) (2)“generator”操作符能代替它工作吗?(通常,这些操作符是用来桥接非反应式 APIs 的,它提供了一个“sink”,在某种意义上,它类似于 Processor,允许你用数据手动填充序列或终止序列。)
如果在研究了上述替代方案之后,你仍然认为需要一个 Processor,请阅读下面的可用处理器概述部分,了解不同的实现。
通过使用 Sink Facade 从多个线程安全地生成
与其直接使用 Reactor Processors,不如通过调用一次 sink()来为处理器获取接收器。
FluxProcessor sinks 安全地访问多线程生产者,并可用于同时从多个线程生成数据的应用程序。例如,可以为 UnicastProcessor 创建线程安全的序列化 sink。
多个生产者线程可以在以下序列化sink 上并发地生成数据。
警告:尽管 FluxSink 适用于 Processor 的多线程手动馈送(feeding),但是将订阅者方法和sink 方法混合是不可能的:你必须将 FluxProcessor 订阅到源发布者,或者通过其 FluxSink 手动馈送它。
方法 next 的溢出行为有两种可能的方式,具体取决于 Processor 及其配置: (1)无边界处理器通过删除或缓冲来处理溢出。 (2)有界处理器在 IGNORE 策略上阻塞或“旋转”,或应用为 sink 指定的 overflowStrategy 行为。
可用处理器概览
Reactor 核心有几种不同的 Processor。并不是所有处理器都具有相同的语义,但大致可以分为 3 类。以下列表简要描述了这 3 类处理器: (1)直接的(direct)(DirectProcessor 和 UnicastProcessor):这些处理器只能通过直接的用户操作来推送数据(直接调用它们的 sink 方法)。 (2)同步的(synchronous)(EmitterProcessor 和 ReplyProcessor):这些处理器既可以通过用户交互推送数据,也可以订阅上游的 Publisher 并同步地将其删除。 (3)异步的(asynchronous)(WorkQueueProcessor 和 TopicProcessor):这些处理器既可以推送从多个上游的 Publishers 获得的数据,也可以推送通过用户交互获得的数据。它们更强健,并由 RingBuffer 数据结构支持,以便处理它们的多个上游流。
异步处理器是最复杂的示例,有许多不同的选项。因此,它们公开了一个 Builder 接口。而更简单的处理器使用静态工厂方法。
提示: 发送事件到不同线程的方法之一是使用与 publishOn(Scheduler) 相结合的 EmitterProcessor。例如,这可以取代以前的 TopicProcessor,后者使用 Unsafe 操作,并在 3.3.0 中被转移到 reactor-extra 。
直接处理器(Direct Processor)
直接处理器是能够将信号发送给一或多个订阅者的处理器。它是最简单的实例化方法,使用单个 DirectProcessor#create() 静态工厂方法。另一个方面,它有不处理背压的局限性。因此,如果你通过 DirectProcessor 推送 N 个元素,但至少有一个订阅者的请求小于 N,则 DirectProcessor 会向其订阅者发出 IllegalStateException 信号。
一旦处理器终止(通常通过调用其 sink 的 error(Throwable) 或 complete() 方法),它会让更多的订阅者订阅,并且立即向它们回放终止信号。
单播处理器(Unicast Processor)
单播处理器可以使用内部缓存区来处理背压。代价是它最多只能有一个订阅者。
UnicastProcessor 有更多的选项,反映在一些 create 静态工厂方法上。例如,默认情况下它是无限制的:如果在订阅器还没有请求数据时通过它推送任何数量的数据,它将缓存所有数据。
这可以通过在 create 工厂方法中为内部缓冲提供自定义 Queue 实现来更改。如果该队列是有界的,那么当缓冲区已满并且没有收到下游的足够多的请求时,处理器可以拒绝值的推送。
在这种有界的情况下,处理器还可以通过在每个被拒绝的元素上调用的回调构建,从而允许清除这些被拒绝的元素。
发射器处理器(Emitter Processor)
一个发射器处理器能够向多个订阅者发射信号,同时为每个订阅者提供背压。它还可以订阅 Publisher 并同步转发其信号。最初,当它没有订阅者时,它仍然可以接受一些数据,将其推送到可配置的 bufferSize 。在此之后,如果没有订阅者进入并消费数据,则调用 onNext 块,直到处理器耗尽(此时只能并发发生)。
因此,第一个订阅的订阅者在订阅时将接收最多 bufferSize 个元素。但是,在此之后,处理器将停止向其他订阅者回放信号。而这些后续的订阅者只接收订阅后通过处理器推送的信号。内部缓冲区仍用于背压目的。
默认情况下,如果发射器处理器的所有订阅者都被取消(这基本上意味着它们都已取消订阅),它将清除其内部缓冲区并停止接收新的订阅者。这可以通过 create 静态工厂方法中的 autoCancel 参数进行调优。
回放处理器(Replay Processor)
回放处理器缓存直接通过其 sink() 方法推送的或来自上游发布者(Publisher)的元素,并向后期的订阅者回放它们。
它可以以多种配置创建: (1)缓存单个元素(cacheLast)。 (2)缓存有限历史(create(int)),无线历史(create())。 (3)缓存基于时间的回放窗口(createTimeout(Duration))。 (4)缓存历史大小和时间窗口的组合(createSizeOrTimeout(int,Duration))。
主题处理器(Topic Processor)
主题处理器是一种异步处理器,当在 shared 配置中创建时,它能够从多个上游 Publishers 中继元素(请参阅 builder() 的 share(boolean) 选项)。
请注意,如果你打算直接或者从一个并发的上游 Publisher 同时调用 TopicProcessor 的 onNext、onComplete 或 onError 方法,则 share 选项是必需的。
否则,这样的并发调用是非法的,因为处理器是完全符合 Reactive Streams 规范的。
TopicProcessor 能够分散到多个订阅者。它通过将一个线程关联到每个订阅者来做到这一点,这个线程将一直运行,直到一个 onError 或 onComplete 信号被推送到处理器中,或者直到关联的订阅者被取消。下游订阅者的最大数量由 executor 构建器(builder)选项驱动。提供一个有界 ExecutorService, 将其限制为特定的数量。
处理器由存储推送信号的 RingBuffer 数据结构支持。每个订阅者线程都跟踪其关联的需求和 RingBuffer 中的正确索引。
这个处理器还有一个 autoCancel 构建器选项:如果设置为 true(默认值),它将导致在所有订阅者被取消时源发布者被取消。
工作队列处理器(WorkQueue Processor)
工作队列处理器也是一种异步处理器,当在 shared 配置中创建时,它能够中继来自多个上游发布者的元素(它与 TopicProcessor 共享大部分构建器选项)。
它放松了对反应式流规范的遵从,但是它获得了比 TopicProcessor 需要更少资源的好处。它仍然基于一个 RingBuffer ,但是避免了为每个 Subscriber 创建一个消费者线程的开销。因此,它的伸缩性比 TopicProcessor 更好。
代价是它的分布式模式有点不同:来自每个订阅者的请求都加在一起,并且处理器一次只向一个订阅者(Subscriber)传递信号,采用循环分布而不是扇出模式。
注意:不能保证公平的循环分布。
WorkQueueProcessor 的大部分构建器选项与 TopicProcessor 相同,例如:autoCancel、share 和 waitStrategy。下游订阅者的最大数量也是由可配置的带有 executor 选项的 ExecutorService 决定的。
警告:你应当注意不要向 WorkQueueProcessor 订阅太多 Subscribers,因为这样做可能会锁定处理器。如果你需要限定可能的订阅者数量,最好使用 ThreadPoolExecutor 或 ForkJoinPool。这个处理器能够检测它们的容量,并且在你订阅太多次时抛出异常。
上一篇[4、Reactor 核心特性(3)]
下一篇[5、Kotlin 支持]
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com