Overview
Spring WebFlux 是一个响应式、非阻塞的 Web 框架。响应式指对变化做出反应的编程模型,例如对 I/O 事件、鼠标点击事件等做出反应。非阻塞是指不主动等待,而是被动的在接收到变化产生的通知时作出反应。
Java 9 引入了响应式流 (Reactive Streams) API,核心接口都定义在 java.util.concurrent.Flow 中,包括 Publisher、Subscriber、Subscription 和 Processor。
- 一个简单的例子:
public class ReactiveStreamExample {
public static void main(String[] args) throws Exception {
// 创建发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 创建订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("订阅成功");
subscription.request(1); // 请求一个数据项
}
@Override
public void onNext(String item) {
System.out.println("接收到数据: " + item);
subscription.request(1); // 继续请求下一个数据项
}
@Override
public void onError(Throwable throwable) {
System.err.println("发生错误: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("数据处理完成");
}
};
// 建立订阅关系
publisher.subscribe(subscriber);
// 发布数据,数据进入缓冲区
publisher.submit("Hello");
publisher.submit("Reactive");
publisher.submit("Streams");
// 关闭发布者,不再接受新的数据和订阅,消费者消费完成会触发 onComplete
publisher.close();
// 等待数据处理完成
Thread.sleep(1000);
}
}- 添加一个 Processor:
// 自定义 Processor:将字符串加上感叹号
class ExclamationProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 请求第一个数据项
}
@Override
public void onNext(String item) {
String transformed = item + "!";
submit(transformed); // 发布该数据
subscription.request(1); // 继续请求下一个数据项
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
closeExceptionally(throwable);
}
@Override
public void onComplete() {
close(); // 关闭 Processor
}
}
// 建立数据流关系:Publisher → Processor → Subscriber
publisher.subscribe(processor);
processor.subscribe(subscriber);
本质是用空间换时间,Publisher 缓冲数据并保存所有订阅关系,当特定事件发生时,异步执行 Subscriber 注册的回调方法。
SubmissionPublisher是 Java 9 提供的默认Publisher实现,默认使用ForkJoinPool.commonPool()作为执行器 (Executor)。Subscriber接口定义了响应式处理的四个阶段:订阅、接收数据、错误处理、完成。subscription.request(n)是背压机制的关键,控制订阅者一次请求多少数据。Processor是一个既是Subscriber又是Publisher的组件,适合做数据转换、过滤等中间处理。
Reactor
Reactor 是一个基于 Reactive Streams 规范,用于在 JVM 上构建非阻塞应用程序的响应式库 (WebFlux、R2DBC 等都使用它)。
Reactor 提供了两个可组合的响应式 APl 对象:Flux 对象表示 0..N 项的响应式序列,Mono 对象表示单个值或空结果。
下面看一个例子,引出一些概念:
Flux.range(1, 10)
.log()
.filter(i -> i % 2 == 0)
.map(i -> "hello: " + i)
.subscribe(System.out::println);log() 用于打印调试日志,subscribe() 是一个终结方法,用于触发流的执行。中间过程 (filter、map) 调用只是构建一个处理链,不执行终结方法则整个流不会执行,即没有订阅者,发布者不会发送任何数据。执行终结方法后,会从下游开始,逐层向上调用订阅方法,中间过程既是发布者也是订阅者。
流有冷流和热流之分,上面的 Flux.range(),构建的就是冷流。
对于冷流,只有订阅后才开始发射数据。即对于每一个订阅者,这个流都会重新开始,例如 HTTP 请求,每有一个新订阅者,就触发一次调用。
对于热流,新订阅者只会接收到他们订阅行为发生之后的数据 (除非有缓存),热流可以在没有任何订阅前就发布数据。
创建流
just/range/fromIterable
- 通过
just、range、fromIterable创建流:
Flux.just("foo", "bar", "foobar").log().subscribe();
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux.fromIterable(iterable).log().subscribe();
// 从 1 开始生成 10 个数
Flux.range(1, 10).log().subscribe();
// Mono 的 API 是 Flux 的子集
Mono.empty().log().subscribe();
Mono.just("foo").log().subscribe();generate
generate 用于同步、逐个的发送数据,可以调用 SynchronousSink 的 next、error、complete 方法,实现控制数据生成逻辑,需要注意 generate 中 每次回调中 next() 最多只能调用一次。
// generate 全参方法如下:
public static <T, S> Flux<T> generate(
Callable<S> stateSupplier, // 给生成器提供初始值 state
BiFunction<S, SynchronousSink<T>, S> generator, // 生成器
Consumer<? super S> stateConsumer // 生成结束或下游cancel时触发,消费最终的 state
);
// 举例:
Flux<String> flux = Flux.generate(
() -> 0, // 初始值
(state, sink) -> { // 生成逻辑
if (state == 5) sink.error(new RuntimeException("出错了"));
sink.next("3 x " + state + " = " + 3 * state); // next 用于发射一个数据
if (state == 10) sink.complete();
return state + 1; // 更新状态,供下一次调用
}, (state) -> System.out.println("finally: " + state));
flux.subscribe(System.out::println);
// 注意上述代码会打印:finally: 6,这是因为 sink.error() 不会导致跳过后面的代码,state + 1 仍会执行,如果修改为 throw 一个异常,则 finally state 为 5create
create 相比于 generate,使用线程安全的 FluxSink,可以在每次回调中,调用多次 next 方法发射数据,并且可以在多线程中使用。
create在将现有 API 与响应式连接起来时非常有用,例如桥接基于监听器的异步数据处理 API:
// 方法声明如下:
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)
// 定义监听器接口(模拟现有异步API)
interface DataListener<T> {
void onData(T data); // 接收单条数据
void onComplete(); // 处理完成
void onError(Throwable e); // 处理错误
}
// 模拟异步数据处理器
static class AsyncDataProcessor {
public void register(DataListener<String> listener) {
// 模拟异步产生数据(实际场景可能是IO、网络操作等)
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
Thread.sleep(200); // 模拟耗时操作
listener.onData("数据" + i);
}
listener.onComplete(); // 数据发送完毕
} catch (InterruptedException e) {
listener.onError(e);
}
}).start();
}
}
@Test
public void test() throws Exception {
AsyncDataProcessor processor = new AsyncDataProcessor();
Flux<String> flux = Flux.create(sink -> {
// 注册监听器,将回调事件转换为 Flux 信号
processor.register(new DataListener<String>() {
@Override
public void onData(String data) {
sink.next(data); // 发射数据
}
@Override
public void onComplete() {
sink.complete(); // 结束序列
}
@Override
public void onError(Throwable e) {
sink.error(e); // 发射错误
}
});
});
// 订阅并消费数据
flux.subscribe(
data -> System.out.println("接收: " + data),
error -> System.err.println("错误: " + error.getMessage()),
() -> System.out.println("接收完成")
);
Thread.sleep(5000);
}- 可以通过
OverflowStrategy来指定背压策略:IGNORE完全忽略背压,继续发送数据,可能导致异常或内存溢出;ERROR当下游无法跟上时,抛出IllegalStateException;DROP丢弃当前数据项,不发送给下游;LATEST只保留最新的数据项,旧的被覆盖;BUFFER(默认策略,将所有数据缓冲,可能导致内存溢出。
Flux.create(sink -> {
// 模拟快速生产
for (int i = 0; i < 10; i++) {
sink.next(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
sink.error(e);
}
}
sink.complete();
}, FluxSink.OverflowStrategy.LATEST // 改成不同策略试试
)
.publishOn(Schedulers.newSingle("slow-consumer"), 1) // 模拟慢速订阅者
.subscribe(data -> {
System.out.println("消费: " + data);
try {
Thread.sleep(30); // 模拟慢速消费
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});- 结合
onRequest实现推拉模式
Flux<String> flux = Flux.create(sink -> {
// 监听下游请求量
sink.onRequest(n -> {
System.out.println("收到下游请求,需要 " + n + " 条数据");
// 按需产生数据
for (int i = 0; i < n; i++) {
sink.next("数据" + i);
}
})
.onCancel(() -> System.out.println("onCancel"))
.onDispose(() -> System.out.println("onDispose"));
});
// limitRequest 满足后会发出 cancel 信号
flux.limitRequest(5).subscribe(System.out::println);常用操作
操作符参考:Which operator do I need?
subscribe、dispose
用于订阅并触发序列,全参数的构造方法如下:
subscribe(
// 消费每个值的逻辑
Consumer<? super T> consumer,
// 错误处理,针对的是上游发布者的异常
Consumer<? super Throwable> errorConsumer,
// 序列必须完整且无错误地执行完毕时,才执行
Runnable completeConsumer,
// onSubscribe 时触发, 用于初始化 request 默认行为是 request(Long.MAX_VALUE)
// 官方 API 已不建议使用该参数,而是自定义 `Subscriber` 实现去精细化控制
Consumer<? super Subscription> subscriptionConsumer
);序列的执行中参数传递包含了元素(数据) 和信号,信号表示当前状态,参考 reactor.core.publisher.SignalType 枚举类:
// A signal when the subscription is triggered
SUBSCRIBE
// A signal when a request is made through the subscription
REQUEST
// A signal when the subscription is cancelled
CANCEL
// A signal when an operator receives a subscription
ON_SUBSCRIBE
// A signal when an operator receives an emitted value
ON_NEXT
// A signal when an operator receives an error
ON_ERROR
// A signal when an operator completes
ON_COMPLETE
// A signal when an operator completes
AFTER_TERMINATE
// A context read signal
CURRENT_CONTEXT
// A context update signal
ON_CONTEXT如何取消订阅:
CountDownLatch latch = new CountDownLatch(1);
// 返回值是一个 Disposable 表示可以取消订阅
Disposable f1 = Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1)) // 1s 生成一个
.subscribe(
i -> System.out.println("接收到元素: " + i),
error -> System.err.println("发生错误: " + error),
() -> {
System.out.println("完成");
latch.countDown();
}
);
boolean await = latch.await(2, TimeUnit.SECONDS);
if (!await) {
f1.dispose(); // 如果没有消费完整个序列,不会触发 completeConsumer
}取消操作是一个信号,表明源应该停止生成元素。然而,并不能保证会立即停止:有些源生成元素的速度可能非常快,以至于在收到取消指令之前就已经完成了。
BaseSubscriber
自定义 Subscribe,可以继承 BaseSubscribe 类,重写相应的钩子方法,可以实现比 lambda 方式的 subscribe() 更多的功能。
class MySubscriber<T> extends BaseSubscriber<T> {
@Override
public void hookOnSubscribe(Subscription subscription) {
System.out.println("hookOnSubscribe");
request(1);
}
@Override
public void hookOnNext(T value) {
System.out.println("hookOnNext: " + value);
request(1);
}
@Override
protected void hookOnComplete() {
System.out.println("hookOnComplete");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("hookOnError: " + throwable);
}
@Override
protected void hookOnCancel() {
System.out.println("hookOnCancel");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("hookFinally: " + type);
}
}
// 执行
Flux.range(1, 3).map(i -> {
if (i <= 2) return i;
throw new RuntimeException("出错了");
}).subscribe(new MySubscriber<>());
// 将打印
hookOnSubscribe
hookOnNext: 1
hookOnNext: 2
hookOnError: java.lang.RuntimeException: 出错了
hookFinally: onErrorlimitRate/limitRequest
limitRate 用于实现预取,将下游订阅者的背压信号,分批次发送,有两个方法:
limitRate(int prefetchRate)表示执行时第一次request(prefetchRate),一旦算子发现 75% 的预取请求已得到满足,它就会向上游重新请求 75% 的内容 (向上取整)。limitRate(int highTide, int lowTide)表示执行时第一次request(highTide),每消费 lowTide 个后都执行request(lowTide),特别地:- 当 lowTide == 0 时,相当于禁用预取,每次消费完 highTide 个再继续
request(highTide); - 当 lowTide >= highTide 时,恢复 75% 策略,等同于
limitRate(highTide)。
- 当 lowTide == 0 时,相当于禁用预取,每次消费完 highTide 个再继续
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 50)
.log()
// 第一次 request(10),后续每消费 (75%*10)=8 个后,都 request(8)
.limitRate(10)
// 第一次 request(10),每消费 2 个后,都 request(2)
.limitRate(10, 2)
// 禁用预取,每次消费完 10 个再 request(10)
.limitRate(10, 0)
// 等同于 limitRate(10)
.limitRate(10, 10)
.subscribe(
System.out::println,
System.err::println,
latch::countDown
);
latch.await();
System.out.println("DONE");limitRequest(N) 用于限制下游最大请求量,当上游发送出 N 个数据后,会认为序列已完成,向下游发送 onComplete 信号,并 cancel()。
CountDownLatch latch = new CountDownLatch(1);
Flux.range(1, 50)
.log()
.limitRequest(10) // 最终输出只打印到 10
.subscribe(
System.out::println,
System.err::println,
latch::countDown
);
latch.await();
System.out.println("DONE");map、filter、handle
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>) 使用 SynchronousSink 只能逐个发送数据,相当于 map 与 filter 的结合。
Flux.range(1, 10)
.map(item -> item * 2)
.filter(item -> item % 3 == 0)
.subscribe(System.out::println);
// 等价于
Flux.range(1, 10)
.handle((item, sink) -> {
int i = item * 2;
if (i % 3 == 0) {
sink.next(i); // 满足条件则发送数据,相当于 filter
}
})
.subscribe(System.out::println);线程调度
默认情况下,大多数操作会与前一个操作在相同线程中执行,最顶层的操作会在调用它的线程中运行 (就是订阅动作的执行线程)。
final Mono<String> mono = Mono.just("hello ");
// 输出 hello main
mono.subscribe(v ->
System.out.println(v + Thread.currentThread().getName())
);
// 输出 hello Thread-2
Thread t = new Thread(() -> mono
.subscribe(v ->
System.out.println(v + Thread.currentThread().getName())
)
);
t.start();
t.join();在 Reactor 中,执行线程由 Scheduler 调度器决定,它的几个静态方法如下,用于返回特定的调度器实例:
Schedulers.immediate():相当于默认未指定,在调用线程中执行;Schedulers.single():在调度器被释放之前,此方法会为所有调用者复用同一个线程;Schedulers.newSingle():每次调用都新开一个线程;Schedulers.boundedElastic():一个有界弹性线程池,常用于包装同步阻塞调用;- 基于
ExecutorService,空闲时间默认 60s,线程数为 CPU 核心数 * 10,队列长度限制为 100_000;
- 基于
Schedulers.parallel():一个固定的线程池,工作线程与 CPU 核心数量相同;Schedulers.newXXX()方法用于创建各种类型调度器的新实例。例如,Schedulers.newParallel(xxx)会创建一个名为xxx的新并行调度器。
在链式调用中,通过 publishOn 和 subscribeOn 方法来切换执行上下文到指定的调度器,需要特别注意 publishOn 在链式调用中的位置。
publishOn
publishOn 从上游获取信号,并在下游重新播放这些信号,同时在 Scheduler 关联的一个工作线程上执行回调。因此它会影响后续操作符的执行位置(直到另一个 publishOn 被链接进来)。
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> {
log.info("map1");
return 10 + i;
})
// 后续操作将被调度到新的线程
.publishOn(Schedulers.parallel())
.map(i -> {
log.info("map2");
return "value " + i;
});
new Thread(() -> flux.subscribe(log::info), "t1").start();上面代码中,当不使用 publishOn 或使用 Schedulers.immediate() 时,都将在 t1 线程上执行 (即发生了订阅的线程)。当使用 Schedulers.parallel() 时,可以看到第一个 map 操作在 t1 上执行,第二个 map 和消费操作均在 parallel-1 上执行。
subscribeOn
subscribeOn 应用于订阅过程,即构建整个链路时,它会更改整个链的订阅线程,放在哪里都行,通常建议将其直接置于数据源之后,多次调用时,第一个 (最靠近源头的) 生效。
final Flux<String> flux = Flux
.range(1, 2)
.subscribeOn(Schedulers.parallel())
.map(i -> {
log.info("map1");
return 10 + i;
})
.publishOn(Schedulers.single())
.map(i -> {
log.info("map2");
return "value " + i;
});
new Thread(() -> flux.subscribe(log::info), "t1").start();
Thread.sleep(10000);上面代码中,尽管是在 t1 上进行订阅动作,但实际会被调度到 parallel-1 上执行,第一个 map 操作在 parallel-1 上执行,第二个 map 和消费操作均在 single-1 上执行,并且将 subscribeOn 放置在调用链上的任意位置,均不影响结果。
parallel 并行消费
使用 parallel() 可以将流转换为一个 ParallelFlux,配合 ranOn() 指定调度器,可以实现并行执行消费逻辑,如果要合并成串行执行,可以调用 sequential()。
Flux.range(1, 10)
.parallel(2)
.runOn(Schedulers.parallel())
// map 在两个线程上并行
.map(i -> {
log.info("map -> {}", i);
return i * i;
})
// 转换为串行,最终消费在一个线程上执行
.sequential()
.subscribe(i -> log.info("sub -> {}", i));包装同步阻塞调用
将同步阻塞操作提交到 boundedElastic 中去执行,避免阻塞其他非阻塞工作线程。
Mono blockingWrapper = Mono.fromCallable(() -> {
return /* make a remote synchronous call */
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic());异常处理
响应式序列中的任何错误都是一个终止事件,即使使用了错误处理操作符,它也不会让原始序列继续。错误信号将沿着操作符链传播到最后一步,即 Subscribe 的 onError 方法,若未定义 onError 方法,则会抛出 ErrorCallbackNotImplemented 异常。
onErrorReturn:捕获异常并返回一个默认值;onErrorComplete:捕获并忽略异常,将onError信号转为onComplete;onErrorResume:捕获并执行处理逻辑 (可以返回动态值、抛出异常);doOnError:感知到异常后执行处理逻辑,但原样抛出,即不影响错误信号传播;doFinally:相当于 finally 块;
Flux.just(1, 2, 0, 3)
.map(i -> "10 / " + i + " = " + (10 / i))
// 返回默认值
.onErrorReturn(e -> e.getMessage().equals("/ by zero"), "fallbackValue")
// 忽略错误
.onErrorComplete(e -> e.getMessage().equals("/ by zero"))
// 抛出新错误
.onErrorResume(e -> Mono.error(new Exception("出错了")))
// 返回动态值
.onErrorResume(e -> Mono.just("someValue"))
.doFinally(signalType -> System.out.println("Finally: " + signalType))
.subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Done")
);重试
使用 retry(n) 可以实现重试,本质是重新订阅,实际上是一个全新的序列,原序列已经终止。
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.elapsed() // 用于记录每个元素与前一个元素经过的间隔时间
.retry(1)
.subscribe(System.out::println, System.err::println);使用 retryWhen 可以实现自定义条件重试失败序列。retryWhen 接收一个伙伴序列,当上游序列产生错误时,会向该伙伴序列发送一个 RetrySignal(包含错误信息、重试次数等元数据),用户通过自定义逻辑处理 RetrySignal 并返回一个新的 Publisher 来控制重试行为:
- 若伙伴序列发射一个值,则触发重试,即重新订阅;
- 若伙伴序列正常完成,则终止重试,且若有上游异常信号,会被忽略 (下游收不到),转成完成信号;
- 若伙伴序列产生错误,则终止重试,下游可以感知到该错误。
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.elapsed()
.retryWhen(Retry.from(companion -> companion.handle(
(retrySignal, sink) -> {
// 自定义重试条件
if (retrySignal.totalRetries() < 1) {
sink.next(retrySignal.totalRetries()); // 触发重试
} else {
// 终止重试并传播错误,不传播的话会被忽略,即下游将触发 onComplete
sink.error(retrySignal.failure());
}
}
)))
.subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Done")
);常见的重试策略:
// 实现最多 n 次重试
flux.retryWhen(Retry.max(3));
// 仅对特定错误重试
flux.retryWhen(Retry.max(3).filter(e -> e instanceof TimeoutException));
// 指数退避重试,最多 3 次,初始延迟 1000ms(后面每次*2),随机抖动 ±50%
flux.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)).jitter(0.5)
// 每次重试时执行
.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries()))
// 达到重试次数时,抛出原始异常
.onRetryExhaustedThrow((spec, rs) -> rs.failure())
)
.elapsed() // 记录间隔时间
.subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Done")
);Sinks
sink 译为水槽,在 Reactor 中可以理解为接收器,Sinks API 允许我们手动且线程安全的触发信号、构建 Publisher 并定义与下游交互的逻辑。
Sinks.One 用于处理单个元素,Sinks.Many 用于处理多元素序列。通过调用其 emit* 或 tryEmit* 方法,发送一个值。其中 tryEmit 会立即返回发送结果,供调用方处理;而 emit 可以通过指定 EmitFailureHandler 来进行处理。
- Sinks.one 举例:
// Sinks.One 只能发送一次
final Sinks.One<String> one = Sinks.one();
// 转换为 Mono 并进行订阅
one.asMono().subscribe(System.out::println, System.err::println);
// 尝试发送数据
Sinks.EmitResult hello = one.tryEmitValue("hello");
Sinks.EmitResult world = one.tryEmitValue("world");
// 查看发送结果
System.out.println("hello: " + hello); // OK
System.out.println("world: " + world); // FAIL_TERMINATED在 Sinks.Many 中可以使用 unicast()、multicast() 指定单播还是多播;使用 onBackpressureBuffer() 指定背压缓冲区大小;使用 replay() 缓存历史数据向新订阅者重放;使用 unsafe() 移除线程安全检查,降低开销,适用于能保证外部同步的场景。
Sinks.many().unicast().onBackpressureBuffer(Queue?)单播 Sinks.Many 可以通过使用内部缓冲区来处理背压。
// 创建一个单播序列,缓存数量为 2
final Sinks.Many<String> many = Sinks.many().unicast().onBackpressureBuffer(new ArrayBlockingQueue<>(2));
final Flux<String> flux = many.asFlux();
// 订阅前发送三个数据,前两个被缓存
Sinks.EmitResult emitResult = many.tryEmitNext("aaa");
System.out.println("emitResult aaa : " + emitResult); // OK
emitResult = many.tryEmitNext("bbb");
System.out.println("emitResult bbb : " + emitResult); // OK
emitResult = many.tryEmitNext("ccc");
System.out.println("emitResult ccc : " + emitResult); // 失败:FAIL_ZERO_SUBSCRIBER
// 慢速消费
flux.subscribeOn(Schedulers.parallel())
.subscribe(item -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("sub1: " + item);
},
System.err::println,
() -> System.out.println("Done")
);
);
// flux.subscribe(); // 订阅多次会报错 IllegalStateException
// 快速生产
for (int i = 0; i < 50; i++) {
emitResult = many.tryEmitNext(String.valueOf(i));
if (emitResult != Sinks.EmitResult.OK) {
// 生产过快会打印 emitResult x : FAIL_OVERFLOW
System.out.println("emitResult " + i + " : " + emitResult);
}
}
Thread.sleep(5000);
many.tryEmitComplete(); // 完成信号Sinks.many().multicast().onBackpressureBuffer(int bufferSize?)多播 Sinks.Many 可以向多个订阅者发送信号,同时为每个订阅者处理背压问题,订阅者仅接收在其订阅后通过接收器推送的信号。
// 创建一个多播 Sink,缓存数量为 2
final Sinks.Many<String> many = Sinks.many().multicast().onBackpressureBuffer(2);
final Flux<String> flux = many.asFlux();
/**
* 与单播不同,多播 sink onBackpressureBuffer 是为每个订阅者维护的被压缓冲区,
* 在没有订阅之前发送的数据,其中一部分会被缓存起来作为 warm up 数据,
* 第一个订阅者会接收到这些值,后续订阅者只能接收到订阅之后发送的值
*/
for (int i = 1; i <= 100; i++) {
Sinks.EmitResult emitResult = many.tryEmitNext(String.valueOf(i));
if (emitResult != Sinks.EmitResult.OK) {
// 若干个成功之后,会打印 FAIL_ZERO_SUBSCRIBER
System.out.println("[" + Thread.currentThread().getName() + "] warm emit " + i + " : " + emitResult);
}
}
// 第一个订阅者会接收到一部分 warm up 数据
flux.publishOn(Schedulers.newSingle("quick"))
.elapsed()
.subscribe(item -> {
System.out.println("[" + Thread.currentThread().getName() + "] quick handle "+ item);
}, System.err::println);
// 第二个订阅者,只能接收到订阅之后的数据
flux.publishOn(Schedulers.newSingle("slow"))
.subscribe(item -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("[" + Thread.currentThread().getName() + "] slow handle "+ item);
}, System.err::println);
Thread.sleep(1000); // 确保订阅动作完成
new Thread(() -> {
for (int i = 101; i <= 800; i++) {
Sinks.EmitResult emitResult = many.tryEmitNext(String.valueOf(i));
if (emitResult != Sinks.EmitResult.OK) {
System.out.println("[" + Thread.currentThread().getName() + "] emit " + i + " : " + emitResult);
}
}
System.out.println(Thread.currentThread().getName() + ": 发布完成");
}, "t1").start();Context 上下文
Context 用于在响应式序列中传递上下文信息,它类似于 Map 存储键值对,解决了多线程环境中无法使用 ThreadLocal 的问题。
String key = "message";
Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key))
))
.contextWrite(ctx -> ctx.put(key, "Reactor")) // 覆盖掉 World
.contextWrite(ctx -> ctx.put(key, "World"))
.subscribe(System.out::println);
// 输出 Hello Reactor在调用链中,contextWrite 的位置很重要,因为订阅信号是自下而上流动的,所以其内容只能被位于其上方的操作符看到,写入多次会被覆盖。
如果在两个 contextWrite 中间再添加一个 flatMap 操作,如下所示:
String key = "message";
Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key)) // 只能看到 Reactor
))
.contextWrite(ctx -> ctx.put(key, "Reactor"))
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key)) // 只能看到 World
))
.contextWrite(ctx -> ctx.put(key, "World"))
.subscribe(System.out::println);
// 输出 Hello Reactor World如果在 flatMap 中调用 contextWrite,那么它对外部序列将不可见:
String key = "message";
Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key))
))
.flatMap(s -> Mono.deferContextual(ctx ->
Mono.just(s + " " + ctx.get(key)))
.contextWrite(ctx -> ctx.put(key, "Reactor")) // 不会影响主序列
)
.contextWrite(ctx -> ctx.put(key, "World")) // 影响主序列
.subscribe(System.out::println);
// 输出 Hello World ReactorWebFlux
对于 Web 容器,请参考: 此处
Spring WebFlux 在 Tomcat、Jetty、Servlet 容器以及非 Servlet 运行时(如 Netty 和 Undertow)上均受支持,可以和 Spring MVC 结合使用。Tomcat/Jetty 需通过 Servlet 3.1+ 的非阻塞 API 适配 WebFlux,对于 Undertow,Spring WebFlux 直接使用 Undertow API,而不使用 Servlet API。
与 Spring MVC 对比:
| 特性 | Spring MVC | Spring WebFlux |
|---|---|---|
| 线程模型 | 每个请求一个线程 | 少量线程处理大量请求(事件驱动) |
| 性能场景 | 简单/中小型应用,IO 密度不高 | 高并发、IO 密集型系统(如网关、推送服务) |
| 前端控制器 | DispatcherServlet | DispatcherHandler |
| 请求、响应 | ServletRequest/Response | ServerWebExchange |
| 过滤器 | Filter | WebFilter |
| 异常处理器 | HandlerExceptionResolver | DispatcherExceptionHandler |
| 自定义配置 | WebMvcConfigurer | WebFluxConfigurer |
| Rest 请求工具 | RestTemplate | WebClient |