Featured image of post ReactiveUI 与 Rx 常用方法总结

ReactiveUI 与 Rx 常用方法总结

ReactiveUI 与 Rx 常用方法总结

ReactiveUI

基础用法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
using System;
using System.Reactive.Linq;
using ReactiveUI;

namespace AvaloniaApplication1.ViewModels;

public partial class MainWindowViewModel : ViewModelBase
{
    [ObservableProperty]
    public partial string Text1 { get; set; }

    [ObservableProperty]
    public partial string Text2 { get; set; }

    [ObservableProperty]
    public partial string Result { get; set; }

    public MainWindowViewModel()
    {
        this.WhenAnyValue(s => s.Text1, s => s.Text2)
            .Where(s => double.TryParse(s.Item1, out _) && double.TryParse(s.Item2, out _))
            .Subscribe(s => Calculate());

        this.WhenAnyValue(s => s.Text1, s => s.Text2)
            .Where(s => !double.TryParse(s.Item1, out _) || !double.TryParse(s.Item2, out _))
            .Subscribe(s => Clear());
    }

    void Calculate()
    {
        Result = (double.Parse(Text1) + double.Parse(Text2)).ToString();
    }

    void Clear()
    {
        Result = string.Empty;
    }
}

只有当Text1和Text2发生变化且不为空时才订阅Calculate事件,为空时订阅Clear事件

WhenAnyWhenAnyValueWhenAnyObservable

WhenAny 是一组扩展方法,每个方法都以前缀 WhenAny 开头,可用于在对象的属性发生更改时获取通知。

在可能的情况下,WhenAnyValue 应该优先于 WhenAny,并且您不需要知道SenderExpression

WhenAny 变体支持多种属性更改通知。例如,它可以支持视图模型的 INotifyPropertyChanged 、基于 Windows 的 XAML 平台上的 DependencyProperty 以及 Apple 平台上的 NSObject 属性更改通知。要获取值更改通知,您的对象必须实现这些已知的属性更改通知机制之一。 如果其中一个不支持,您将只能获取属性的初始值,而不会收到任何更新通知。此外,运行时还会发出警告(请确保您已注册 ILogger 服务以便查看此警告)。

WhenAnyValue

IObservable<TRet> WhenAnyValue<TSender, TRet>( this TSender? sender, Expression<Func<TSender, TRet>> property1)

由于表达式尚不支持此功能,因此 WhenAnyValue 不能直接执行空传播。 你可以通过将 WhenAnyValue() 调用链接到每个属性来模拟对空值传播的支持。以下是示例:

1
2
this.WhenAnyValue(x => x.Foo, x => x.Foo.Bar, x => x.Foo.Bar.Baz, (foo, bar, baz) => foo?.Bar?.Baz)
    .Subscribe(x => Console.WriteLine(x));

WhenAny

WhenAny 允许您获取传递到 WhenAny 中的表达式和表达式。这对于一些场景是很有用的,比如在视图中,你需要知道调用属性改变的控件。

WhenAny 仅告知您输入表达式的最终值何时发生变化。即使最终的变化是由于表达式链中的中间值引起的,也是如此。以下是一个解释性示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
this.WhenAny(x => x.Foo.Bar.Baz, _ => "Hello!")
    .Subscribe(x => Console.WriteLine(x));

// Example 1
this.Foo.Bar.Baz = "Something";
>>> Hello!

// Example 2: Nothing printed! Because the string value does not change
this.Foo.Bar.Baz = "Something";

// Example 3: Still nothing!Because the expression is Foo.Bar.Baz,not Foo.Bar
this.Foo.Bar = new Bar() { Baz = "Something" };

// Example 4: The result changes, so we print
this.Foo.Bar = new Bar() { Baz = "Else" };
>>> Hello!

WhenAny 仅在读取给定表达式不会抛出 NullReferenceException 时才会发送通知。请考虑以下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
this.WhenAny(x => x.Foo.Bar.Baz, _ => "Hello!")
    .Subscribe(x => Console.WriteLine(x));

// Example 1
this.Foo.Bar.Baz = null;
>>> Hello!

// Example 2: Nothing printed!
this.Foo.Bar = null;

// Example 3
this.Foo.Bar = new Bar() { Baz = "Something" };
>>> Hello!

WhenAnyObservable

WhenAnyObservable 会观察一个或多个可观察对象并提供最新的可观察值,处理新可观察对象的自动订阅以及 WhenAnyObservable 可观察对象的处理。WhenAnyObservable 默认为惰性订阅,这意味着在订阅之前不会获得任何值(其他的When在Ctor完成时就会通知,这个只有在更改时才会通知)。

每当文档保存时,它都会打印来自 IsSaved 可观察变量的值。当 Document 属性发生更改时,它将自动取消订阅并重新订阅。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class MyViewModel
{
    [Reactive]
    public Document Document { get; set; }

    public MyViewModel()
    {
      this.WhenAnyObservable(x => x.Document.IsSaved).Subscribe(x => Console.WriteLine($"Document Saved: {x}"));
    }
}

public class Document
{
    public IObservable<bool> IsSaved { get; }
}

Where

仅从 Observable 中发出那些通过谓词测试的项 IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)

Subscribe

根据事件源的发出的事件或通知进行操作

注意:该操作符之后将返回IDisposable,而不是IObservable

IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)

SubscribeOn

指定事件源在订阅时应使用的调度器

SubscribeOn 运算符指定 Observable 将开始在哪个线程上运行,而不管该运算符在运算符链中的哪个点被调用。另一方面,ObserveOn 会影响 Observable 将使用的线程, 线程位于该运算符出现的位置下方。 因此,您可以调用 在 Observable 链中的不同点多次进行 ObserveOn 运算符,以便更改某些运算符在哪些线程上进行作。

IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)

Throttle

此操作符会对源序列进行限流,具体做法是对每个元素保留 dueTime(指定时长)。若在这个时间窗口内又产生了另一个元素,那么前一个元素会被丢弃,并且会为当前元素启动一个新的计时器,如此循环往复。对于元素间间隔从不大于或等于 dueTime 的流,经过处理后的流将不会产生任何元素。若要在保证元素周期性产生的同时减少流的数据量,可考虑使用 Observable.Sample 系列操作符。

1
2
3
this.WhenAnyValue(s => s.Text1)
    .Throttle(TimeSpan.FromSeconds(1))
    .Subscribe(s => Calculate());

SampleInterval

1
2
3
4
5
6
var sub = Observable
    .Interval(TimeSpan.FromSeconds(0.01))
    .Sample(TimeSpan.FromMilliseconds(500))
    .Subscribe(s => Result=s.ToString());
await Task.Delay(TimeSpan.FromSeconds(2));
sub.Dispose();

每 10 毫秒发出一个值的可观察序列,每 500 毫秒对这个序列进行一次采样,将采样得到的值转换为字符串并赋值给 Result 变量,持续执行 2 秒后取消订阅并释放资源;尽管2s内通过Interval产生了200个,但是由于Sample,我只采样到了4个

TimerAmb

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
using System.Reactive.Linq;

var observable1 = Observable.Timer(TimeSpan.FromMilliseconds(200))
    .Select(_ => "Observable 1");

// 创建第二个可观测序列,延迟 100 毫秒后发射元素
var observable2 = Observable.Timer(TimeSpan.FromMilliseconds(100))
    .Select(_ => "Observable 2");

// 使用 Amb 操作符选择第一个产生元素的序列
var result = observable1.Amb(observable2);

// 订阅结果序列
result.Subscribe(
    value => Console.WriteLine($"Received: {value}"),
    error => Console.WriteLine($"Error: {error}"),
    () => Console.WriteLine("Completed")
);

借助 Observable.Timer 来创建两个可观测序列 observable1 和 observable2,它们会在特定延迟后发射元素。

在这个示例中,由于 observable2 延迟 100 毫秒,比 observable1 的 200 毫秒延迟要短,所以 observable2 会率先产生元素,Amb 操作符会选择 observable2 并忽略 observable1

Received: Observable 2
Completed

RetryCatch

Retry允许在发生错误时或抛出错误时重新订阅源序列。例如,以下代码创建一个会抛出异常的序列,并通过Retry(2)最多重试2次:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;

var source = Observable.Create<int>(observer =>
{
    Console.WriteLine("Doing work...");
    // throw new Exception("Initial failure");
    observer.OnError(new Exception("Initial failure"));
    return Disposable.Empty;
});

source
    .Retry(2) // 发生错误时重新订阅源序列,最多尝试2次
    .Subscribe(
        x => Console.WriteLine("OnNext: " + x),
        ex => Console.WriteLine("OnError: " + ex.Message),
        () => Console.WriteLine("OnCompleted")
    );

输出

Doing work…
Doing work…
OnError: Initial failure

**Catch**用于捕获异常并替换为新的序列。例如,以下代码在发生错误后返回一个备用值:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;

var source = Observable.Create<int>(observer =>
{
    observer.OnError(new Exception("Primary error"));
    return Disposable.Empty;
});

source
    .Catch(Observable.Return(42))  // 捕获异常并替换为值42
    .Subscribe(
        x => Console.WriteLine("OnNext: " + x),
        ex => Console.WriteLine("OnError: " + ex.Message),
        () => Console.WriteLine("OnCompleted")
    );

OnNext: 42
OnCompleted

ScanBuffer

Scan 用于对序列中的每个元素连续应用累积函数,并输出中间结果。类似于LINQ的 Aggregate,但会发射所有中间值

1
2
3
4
5
6
7
using System.Reactive.Linq;

var source = Observable.Range(1, 5); // 生成1到5的序列

source
    .Scan(0, (acc, val) => acc + val) // 初始值0,累加当前值
    .Subscribe(x => Console.WriteLine("Scan Result: " + x));

Scan Result: 1
Scan Result: 3
Scan Result: 6
Scan Result: 10
Scan Result: 15

Buffer 将事件流按时间或数量分组,输出批量数据。常见用法包括按时间窗口或元素数量缓冲。

1
2
3
4
5
6
7
8
9
using System.Reactive.Linq;

var source = Observable.Interval(TimeSpan.FromSeconds(1)) // 每秒发射递增数
                       .Take(10); // 限制总发射次数

// 按时间窗口(5秒)和最大数量(3个元素)缓冲
source
    .Buffer(TimeSpan.FromSeconds(5), 3) 
    .Subscribe(buffer => Console.WriteLine($"Buffered Count: {buffer.Count}"));

Buffered Count: 3 // 第1-3秒的数据
Buffered Count: 3 // 第4-5秒的数据(可能触发时间窗口)
Buffered Count: 3 // 下一个窗口
Buffered Count: 1 // 剩余1个元素

DistinctDistinctUntilChanged

Distinct:过滤序列中所有重复的元素,仅保留首次出现的元素。

1
2
3
4
var source = new[] { 1, 2, 2, 3, 3, 3 };
source.ToObservable()
      .Distinct()
      .Subscribe(x => Console.WriteLine(x));

输出1, 2, 3

DistinctUntilChanged:仅过滤连续重复 的元素,保留第一个,后续连续相同的元素被丢弃。

1
2
3
4
var source = new[] { 1, 2, 2, 3, 2, 2 };
source.ToObservable()
      .DistinctUntilChanged()
      .Subscribe(x => Console.WriteLine(x));

输出1, 2, 3, 2

Zip

Zip 用于合并两个序列,逐个配对元素并生成结果。在 Rx 中,它常用于组合多个异步数据流。

1
2
3
4
5
6
7
using System.Reactive.Linq;

var numbers = Observable.Range(1, 3); // 1, 2, 3
var letters = Observable.Return("A").Concat(Observable.Return("B")); // A, B

numbers.Zip(letters, (n, l) => $"{n}{l}")
       .Subscribe(result => Console.WriteLine(result));

1A
2B

DelayDoTimeInterval

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
using System;
using System.Reactive.Linq;
using System.Reactive.Concurrency;

class Program
{
    static void Main()
    {
        var source = Observable.Interval(TimeSpan.FromSeconds(1)) // 每秒发射递增数
                               .Take(5); // 限制发射次数为5次

        source
            .Delay(TimeSpan.FromSeconds(0.5)) // 延迟0.5秒后发射元素 [[4]]
            .Do(x => Console.WriteLine($"Do: 被延迟的值 = {x}")) // 记录中间状态 [[6]]
            .TimeInterval() // 测量连续事件的时间间隔 [[1]]
            .Subscribe(
                interval => Console.WriteLine($"TimeInterval: 值={interval.Value}, 间隔={interval.Interval.TotalSeconds}秒"),
                ex => Console.WriteLine($"OnError: {ex.Message}"),
                () => Console.WriteLine("OnCompleted") // 序列结束 [[10]]
            );

        // 确保主线程等待足够时间以观察输出
        Console.ReadLine();
    }
}
  • 流程
    1. 每秒生成一个数字(Interval)。
    2. 延迟0.5秒后发射(Delay)。
    3. 记录延迟后的值(Do)。
    4. 计算相邻值的发射时间间隔(TimeInterval)。
    5. 序列结束时自动触发 OnCompleted

Do: 被延迟的值 = 0
TimeInterval: 值=0, 间隔=1.5369618秒
Do: 被延迟的值 = 1
TimeInterval: 值=1, 间隔=0.9820277秒
Do: 被延迟的值 = 2
TimeInterval: 值=2, 间隔=0.9995484秒
Do: 被延迟的值 = 3
TimeInterval: 值=3, 间隔=1.0001499秒
Do: 被延迟的值 = 4
TimeInterval: 值=4, 间隔=1.0005686秒
OnCompleted

AverageConcatCountMaxMinAggregateSum

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
using System;
using System.Reactive.Linq;

class Program
{
    static void Main()
    {
        // 创建第一个序列
        var source1 = Observable.Range(1, 3); // 发射 1, 2, 3
        // 创建第二个序列
        var source2 = Observable.Range(4, 2); // 发射 4, 5

        // 使用 Concat 合并两个序列 [[3]]
        var concatenated = source1.Concat(source2); 

        // 应用聚合操作符
        concatenated
            .Count() // 计算总项数 [[1]]
            .Subscribe(count => Console.WriteLine($"Total Count: {count}"));

        concatenated
            .Sum() // 计算总和 [[9]]
            .Subscribe(sum => Console.WriteLine($"Total Sum: {sum}"));

        concatenated
            .Average() // 计算平均值 [[9]]
            .Subscribe(avg => Console.WriteLine($"Average: {avg}"));

        concatenated
            .Min() // 查找最小值
            .Subscribe(min => Console.WriteLine($"Min: {min}"));

        concatenated
            .Max() // 查找最大值
            .Subscribe(max => Console.WriteLine($"Max: {max}"));

        // 使用 Aggregate 自定义聚合逻辑(类似 Sum)
        concatenated
            .Aggregate((acc, val) => acc + val) // 累积求和 [[9]]
            .Subscribe(total => Console.WriteLine($"Reduce (Custom Sum): {total}"));

        Console.ReadLine(); // 防止控制台退出
    }
}

Total Count: 5
Total Sum: 15
Average: 3
Min: 1
Max: 5
Reduce (Custom Sum): 15

AllContainsTakeUntilStartWithJoin

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
using System;
using System.Reactive.Linq;

class Program
{
    static void Main()
    {
        // 示例序列1:数字序列 [1, 2, 3, 4, 5]
        var numbers = Observable.Range(1, 5);

        // 示例序列2:字符串序列 ["A", "B", "C"]
        var letters = Observable.Range(0, 3).Select(x => ((char)('A' + x)).ToString());

        // StartWith: 在序列开始前添加元素 [0, 1, 2, 3, 4, 5]
        var startWithExample = numbers.StartWith(0);
        startWithExample.Subscribe(x => Console.WriteLine($"StartWith: {x}"));

        // All: 检查所有元素是否大于0
        numbers.All(x => x > 0)
            .Subscribe(result => Console.WriteLine($"All > 0: {result}")); // 输出 true

        // Contains: 判断序列是否包含元素 3
        numbers.Contains(3)
            .Subscribe(result => Console.WriteLine($"Contains 3: {result}")); // 输出 true

        // TakeUntil: 当另一个序列(如定时器)触发时停止
        var takeUntilExample = numbers.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(2)));
        takeUntilExample.Subscribe(
            x => Console.WriteLine($"TakeUntil: {x}"),
            () => Console.WriteLine("TakeUntil Completed"));

        // Join: 合并两个序列(基于时间窗口)
        var now = DateTime.Now;
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3).Select(x => $"N{x}");
        var source2 = Observable.Interval(TimeSpan.FromSeconds(1.5)).Take(3).Select(x => $"L{x}");

        source1.Join(
            source2,
            _ => Observable.Timer(TimeSpan.FromSeconds(2)), // 左侧元素存活时间
            _ => Observable.Timer(TimeSpan.FromSeconds(2)), // 右侧元素存活时间
            (n, l) => $"{n}+{l}"
        ).Subscribe(result => Console.WriteLine($"Join Result: {result}"));

        Console.ReadLine(); // 防止控制台退出
    }
}
  1. 应用StartWith添加初始元素。
  2. 使用All检查条件。
  3. 使用Contains判断是否存在元素。
  4. 使用TakeUntil在另一个序列触发后停止。
  5. 使用Join合并两个序列。
  6. 每个操作符后订阅结果并输出。
  7. 添加必要的引用标注。

StartWith: 0
StartWith: 1
StartWith: 2
StartWith: 3
StartWith: 4
StartWith: 5
All > 0: True
Contains 3: True
TakeUntil: 1
TakeUntil: 2
TakeUntil: 3
TakeUntil: 4
TakeUntil: 5
TakeUntil Completed
Join Result: N0+L0
Join Result: N1+L0
Join Result: N0+L1
Join Result: N1+L1
Join Result: N2+L0
Join Result: N2+L1
Join Result: N2+L2

Built with Hugo
Theme Stack designed by Jimmy