ReactiveUI
基础用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
using System ;
using System.Reactive.Linq ;
using ReactiveUI ;
namespace AvaloniaApplication1.ViewModels ;
public partial class MainWindowViewModel : ViewModelBase
{
[ObservableProperty]
public partial string Text1 { get ; set ; }
[ObservableProperty]
public partial string Text2 { get ; set ; }
[ObservableProperty]
public partial string Result { get ; set ; }
public MainWindowViewModel ()
{
this . WhenAnyValue ( s => s . Text1 , s => s . Text2 )
. Where ( s => double . TryParse ( s . Item1 , out _ ) && double . TryParse ( s . Item2 , out _ ))
. Subscribe ( s => Calculate ());
this . WhenAnyValue ( s => s . Text1 , s => s . Text2 )
. Where ( s => ! double . TryParse ( s . Item1 , out _ ) || ! double . TryParse ( s . Item2 , out _ ))
. Subscribe ( s => Clear ());
}
void Calculate ()
{
Result = ( double . Parse ( Text1 ) + double . Parse ( Text2 )). ToString ();
}
void Clear ()
{
Result = string . Empty ;
}
}
只有当Text1和Text2发生变化且不为空时才订阅Calculate事件,为空时订阅Clear事件
WhenAny
、WhenAnyValue
、WhenAnyObservable
WhenAny
是一组扩展方法,每个方法都以前缀 WhenAny
开头,可用于在对象的属性发生更改时获取通知。
在可能的情况下,WhenAnyValue
应该优先于 WhenAny
,并且您不需要知道Sender
或Expression
WhenAny
变体支持多种属性更改通知。例如,它可以支持视图模型的 INotifyPropertyChanged
、基于 Windows 的 XAML 平台上的 DependencyProperty
以及 Apple 平台上的 NSObject
属性更改通知。要获取值更改通知,您的对象必须实现这些已知的属性更改通知机制之一。
如果其中一个不支持,您将只能获取属性的初始值,而不会收到任何更新通知。此外,运行时还会发出警告(请确保您已注册 ILogger
服务以便查看此警告)。
WhenAnyValue
IObservable<TRet> WhenAnyValue<TSender, TRet>( this TSender? sender, Expression<Func<TSender, TRet>> property1)
由于表达式尚不支持此功能,因此 WhenAnyValue
不能直接执行空传播。
你可以通过将 WhenAnyValue()
调用链接到每个属性来模拟对空值传播的支持。以下是示例:
1
2
this . WhenAnyValue ( x => x . Foo , x => x . Foo . Bar , x => x . Foo . Bar . Baz , ( foo , bar , baz ) => foo ?. Bar ?. Baz )
. Subscribe ( x => Console . WriteLine ( x ));
WhenAny
WhenAny
允许您获取传递到 WhenAny 中
的表达式和表达式。这对于一些场景是很有用的,比如在视图中,你需要知道调用属性改变的控件。
WhenAny
仅告知您输入表达式的最终值何时发生变化。即使最终的变化是由于表达式链中的中间值引起的,也是如此。以下是一个解释性示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
this . WhenAny ( x => x . Foo . Bar . Baz , _ => "Hello!" )
. Subscribe ( x => Console . WriteLine ( x ));
// Example 1
this . Foo . Bar . Baz = "Something" ;
>>> Hello !
// Example 2: Nothing printed! Because the string value does not change
this . Foo . Bar . Baz = "Something" ;
// Example 3: Still nothing!Because the expression is Foo.Bar.Baz,not Foo.Bar
this . Foo . Bar = new Bar () { Baz = "Something" };
// Example 4: The result changes, so we print
this . Foo . Bar = new Bar () { Baz = "Else" };
>>> Hello !
WhenAny
仅在读取给定表达式不会抛出 NullReferenceException
时才会发送通知。请考虑以下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
this . WhenAny ( x => x . Foo . Bar . Baz , _ => "Hello!" )
. Subscribe ( x => Console . WriteLine ( x ));
// Example 1
this . Foo . Bar . Baz = null ;
>>> Hello !
// Example 2: Nothing printed!
this . Foo . Bar = null ;
// Example 3
this . Foo . Bar = new Bar () { Baz = "Something" };
>>> Hello !
WhenAnyObservable
WhenAnyObservable
会观察一个或多个可观察对象并提供最新的可观察值,处理新可观察对象的自动订阅以及 WhenAnyObservable
可观察对象的处理。WhenAnyObservable 默认为惰性订阅,这意味着在订阅之前不会获得任何值(其他的When在Ctor完成时就会通知,这个只有在更改时才会通知)。
每当文档保存时,它都会打印来自 IsSaved
可观察变量的值。当 Document
属性发生更改时,它将自动取消订阅并重新订阅。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyViewModel
{
[Reactive]
public Document Document { get ; set ; }
public MyViewModel ()
{
this . WhenAnyObservable ( x => x . Document . IsSaved ). Subscribe ( x => Console . WriteLine ( $"Document Saved: {x}" ));
}
}
public class Document
{
public IObservable < bool > IsSaved { get ; }
}
Where
仅从 Observable 中发出那些通过谓词测试的项
IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate)
Subscribe
根据事件源的发出的事件或通知进行操作
注意:该操作符之后将返回IDisposable,而不是IObservable
IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
SubscribeOn
指定事件源在订阅时应使用的调度器
SubscribeOn 运算符指定 Observable 将开始在哪个线程上运行,而不管该运算符在运算符链中的哪个点被调用。另一方面,ObserveOn 会影响 Observable 将使用的线程, 该 线程位于该运算符出现的位置下方。 因此,您可以调用 在 Observable 链中的不同点多次进行 ObserveOn 运算符,以便更改某些运算符在哪些线程上进行作。
IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)
📢
1
2
3
4
this . WhenAnyValue ( s => s . Text1 , s => s . Text2 )
. SubscribeOn ( RxApp . MainThreadScheduler )
. SubscribeOn ( RxApp . TaskpoolScheduler )
. SubscribeOn ( SynchronizationContext . Current )
📢
RxApp.MainThreadScheduler
获取或设置一个调度器,该调度器用于对那些应在 “用户界面 (UI) 线程上” 运行的工作项进行调度。在正常模式下,这将是 DispatcherScheduler
(分发器调度器),而在单元测试模式下,这将是 Immediate
(即时调度器),以便简化常见单元测试的编写工作。
RxApp.TaskpoolScheduler
获取或设置用于调度工作项以在后台线程中运行的调度器。在两种模式下,这些工作项都将在 TPL(任务并行库)任务池中运行。
SynchronizationContext.Current
Thread.CurrentThread._synchronizationContext
Throttle
此操作符会对源序列进行限流,具体做法是对每个元素保留 dueTime
(指定时长)。若在这个时间窗口内又产生了另一个元素,那么前一个元素会被丢弃,并且会为当前元素启动一个新的计时器,如此循环往复。对于元素间间隔从不大于或等于 dueTime
的流,经过处理后的流将不会产生任何元素。若要在保证元素周期性产生的同时减少流的数据量,可考虑使用 Observable.Sample
系列操作符。
1
2
3
this . WhenAnyValue ( s => s . Text1 )
. Throttle ( TimeSpan . FromSeconds ( 1 ))
. Subscribe ( s => Calculate ());
Sample
、Interval
1
2
3
4
5
6
var sub = Observable
. Interval ( TimeSpan . FromSeconds ( 0.01 ))
. Sample ( TimeSpan . FromMilliseconds ( 500 ))
. Subscribe ( s => Result = s . ToString ());
await Task . Delay ( TimeSpan . FromSeconds ( 2 ));
sub . Dispose ();
每 10 毫秒发出一个值的可观察序列,每 500 毫秒对这个序列进行一次采样,将采样得到的值转换为字符串并赋值给 Result
变量,持续执行 2 秒后取消订阅并释放资源;尽管2s内通过Interval
产生了200个,但是由于Sample
,我只采样到了4个
Timer
、Amb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
using System.Reactive.Linq ;
var observable1 = Observable . Timer ( TimeSpan . FromMilliseconds ( 200 ))
. Select ( _ => "Observable 1" );
// 创建第二个可观测序列,延迟 100 毫秒后发射元素
var observable2 = Observable . Timer ( TimeSpan . FromMilliseconds ( 100 ))
. Select ( _ => "Observable 2" );
// 使用 Amb 操作符选择第一个产生元素的序列
var result = observable1 . Amb ( observable2 );
// 订阅结果序列
result . Subscribe (
value => Console . WriteLine ( $"Received: {value}" ),
error => Console . WriteLine ( $"Error: {error}" ),
() => Console . WriteLine ( "Completed" )
);
借助 Observable.Timer
来创建两个可观测序列 observable1
和 observable2
,它们会在特定延迟后发射元素。
在这个示例中,由于 observable2
延迟 100 毫秒,比 observable1
的 200 毫秒延迟要短,所以 observable2
会率先产生元素,Amb
操作符会选择 observable2
并忽略 observable1
。
Received: Observable 2
Completed
Retry
、Catch
Retry
允许在发生错误时或抛出错误时重新订阅源序列。例如,以下代码创建一个会抛出异常的序列,并通过Retry(2)最多重试2次:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
using System ;
using System.Reactive.Disposables ;
using System.Reactive.Linq ;
var source = Observable . Create < int >( observer =>
{
Console . WriteLine ( "Doing work..." );
// throw new Exception("Initial failure");
observer . OnError ( new Exception ( "Initial failure" ));
return Disposable . Empty ;
});
source
. Retry ( 2 ) // 发生错误时重新订阅源序列,最多尝试2次
. Subscribe (
x => Console . WriteLine ( "OnNext: " + x ),
ex => Console . WriteLine ( "OnError: " + ex . Message ),
() => Console . WriteLine ( "OnCompleted" )
);
输出
Doing work…
Doing work…
OnError: Initial failure
**Catch
**用于捕获异常并替换为新的序列。例如,以下代码在发生错误后返回一个备用值:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
using System ;
using System.Reactive.Disposables ;
using System.Reactive.Linq ;
var source = Observable . Create < int >( observer =>
{
observer . OnError ( new Exception ( "Primary error" ));
return Disposable . Empty ;
});
source
. Catch ( Observable . Return ( 42 )) // 捕获异常并替换为值42
. Subscribe (
x => Console . WriteLine ( "OnNext: " + x ),
ex => Console . WriteLine ( "OnError: " + ex . Message ),
() => Console . WriteLine ( "OnCompleted" )
);
OnNext: 42
OnCompleted
Scan
、Buffer
Scan
用于对序列中的每个元素连续应用累积函数,并输出中间结果。类似于LINQ的 Aggregate,但会发射所有中间值
1
2
3
4
5
6
7
using System.Reactive.Linq ;
var source = Observable . Range ( 1 , 5 ); // 生成1到5的序列
source
. Scan ( 0 , ( acc , val ) => acc + val ) // 初始值0,累加当前值
. Subscribe ( x => Console . WriteLine ( "Scan Result: " + x ));
Scan Result: 1
Scan Result: 3
Scan Result: 6
Scan Result: 10
Scan Result: 15
Buffer
将事件流按时间或数量分组,输出批量数据。常见用法包括按时间窗口或元素数量缓冲。
1
2
3
4
5
6
7
8
9
using System.Reactive.Linq ;
var source = Observable . Interval ( TimeSpan . FromSeconds ( 1 )) // 每秒发射递增数
. Take ( 10 ); // 限制总发射次数
// 按时间窗口(5秒)和最大数量(3个元素)缓冲
source
. Buffer ( TimeSpan . FromSeconds ( 5 ), 3 )
. Subscribe ( buffer => Console . WriteLine ( $"Buffered Count: {buffer.Count}" ));
Buffered Count: 3 // 第1-3秒的数据
Buffered Count: 3 // 第4-5秒的数据(可能触发时间窗口)
Buffered Count: 3 // 下一个窗口
Buffered Count: 1 // 剩余1个元素
Distinct
、DistinctUntilChanged
Distinct:过滤序列中所有重复的元素,仅保留首次出现的元素。
1
2
3
4
var source = new [] { 1 , 2 , 2 , 3 , 3 , 3 };
source . ToObservable ()
. Distinct ()
. Subscribe ( x => Console . WriteLine ( x ));
输出 :1, 2, 3
DistinctUntilChanged:仅过滤连续重复 的元素,保留第一个,后续连续相同的元素被丢弃。
1
2
3
4
var source = new [] { 1 , 2 , 2 , 3 , 2 , 2 };
source . ToObservable ()
. DistinctUntilChanged ()
. Subscribe ( x => Console . WriteLine ( x ));
输出 :1, 2, 3, 2
Zip
Zip
用于合并两个序列,逐个配对元素并生成结果。在 Rx 中,它常用于组合多个异步数据流。
1
2
3
4
5
6
7
using System.Reactive.Linq ;
var numbers = Observable . Range ( 1 , 3 ); // 1, 2, 3
var letters = Observable . Return ( "A" ). Concat ( Observable . Return ( "B" )); // A, B
numbers . Zip ( letters , ( n , l ) => $"{n}{l}" )
. Subscribe ( result => Console . WriteLine ( result ));
1A
2B
Delay
、Do
、TimeInterval
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
using System ;
using System.Reactive.Linq ;
using System.Reactive.Concurrency ;
class Program
{
static void Main ()
{
var source = Observable . Interval ( TimeSpan . FromSeconds ( 1 )) // 每秒发射递增数
. Take ( 5 ); // 限制发射次数为5次
source
. Delay ( TimeSpan . FromSeconds ( 0.5 )) // 延迟0.5秒后发射元素 [[4]]
. Do ( x => Console . WriteLine ( $"Do: 被延迟的值 = {x}" )) // 记录中间状态 [[6]]
. TimeInterval () // 测量连续事件的时间间隔 [[1]]
. Subscribe (
interval => Console . WriteLine ( $"TimeInterval: 值={interval.Value}, 间隔={interval.Interval.TotalSeconds}秒" ),
ex => Console . WriteLine ( $"OnError: {ex.Message}" ),
() => Console . WriteLine ( "OnCompleted" ) // 序列结束 [[10]]
);
// 确保主线程等待足够时间以观察输出
Console . ReadLine ();
}
}
流程 :
每秒生成一个数字(Interval
)。
延迟0.5秒后发射(Delay
)。
记录延迟后的值(Do
)。
计算相邻值的发射时间间隔(TimeInterval
)。
序列结束时自动触发 OnCompleted
。
Do: 被延迟的值 = 0
TimeInterval: 值=0, 间隔=1.5369618秒
Do: 被延迟的值 = 1
TimeInterval: 值=1, 间隔=0.9820277秒
Do: 被延迟的值 = 2
TimeInterval: 值=2, 间隔=0.9995484秒
Do: 被延迟的值 = 3
TimeInterval: 值=3, 间隔=1.0001499秒
Do: 被延迟的值 = 4
TimeInterval: 值=4, 间隔=1.0005686秒
OnCompleted
Average
、Concat
、Count
、Max
、Min
、Aggregate
、Sum
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
using System ;
using System.Reactive.Linq ;
class Program
{
static void Main ()
{
// 创建第一个序列
var source1 = Observable . Range ( 1 , 3 ); // 发射 1, 2, 3
// 创建第二个序列
var source2 = Observable . Range ( 4 , 2 ); // 发射 4, 5
// 使用 Concat 合并两个序列 [[3]]
var concatenated = source1 . Concat ( source2 );
// 应用聚合操作符
concatenated
. Count () // 计算总项数 [[1]]
. Subscribe ( count => Console . WriteLine ( $"Total Count: {count}" ));
concatenated
. Sum () // 计算总和 [[9]]
. Subscribe ( sum => Console . WriteLine ( $"Total Sum: {sum}" ));
concatenated
. Average () // 计算平均值 [[9]]
. Subscribe ( avg => Console . WriteLine ( $"Average: {avg}" ));
concatenated
. Min () // 查找最小值
. Subscribe ( min => Console . WriteLine ( $"Min: {min}" ));
concatenated
. Max () // 查找最大值
. Subscribe ( max => Console . WriteLine ( $"Max: {max}" ));
// 使用 Aggregate 自定义聚合逻辑(类似 Sum)
concatenated
. Aggregate (( acc , val ) => acc + val ) // 累积求和 [[9]]
. Subscribe ( total => Console . WriteLine ( $"Reduce (Custom Sum): {total}" ));
Console . ReadLine (); // 防止控制台退出
}
}
Total Count: 5
Total Sum: 15
Average: 3
Min: 1
Max: 5
Reduce (Custom Sum): 15
All
、Contains
、TakeUntil
、StartWith
、Join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
using System ;
using System.Reactive.Linq ;
class Program
{
static void Main ()
{
// 示例序列1:数字序列 [1, 2, 3, 4, 5]
var numbers = Observable . Range ( 1 , 5 );
// 示例序列2:字符串序列 ["A", "B", "C"]
var letters = Observable . Range ( 0 , 3 ). Select ( x => (( char )( 'A' + x )). ToString ());
// StartWith: 在序列开始前添加元素 [0, 1, 2, 3, 4, 5]
var startWithExample = numbers . StartWith ( 0 );
startWithExample . Subscribe ( x => Console . WriteLine ( $"StartWith: {x}" ));
// All: 检查所有元素是否大于0
numbers . All ( x => x > 0 )
. Subscribe ( result => Console . WriteLine ( $"All > 0: {result}" )); // 输出 true
// Contains: 判断序列是否包含元素 3
numbers . Contains ( 3 )
. Subscribe ( result => Console . WriteLine ( $"Contains 3: {result}" )); // 输出 true
// TakeUntil: 当另一个序列(如定时器)触发时停止
var takeUntilExample = numbers . TakeUntil ( Observable . Timer ( TimeSpan . FromSeconds ( 2 )));
takeUntilExample . Subscribe (
x => Console . WriteLine ( $"TakeUntil: {x}" ),
() => Console . WriteLine ( "TakeUntil Completed" ));
// Join: 合并两个序列(基于时间窗口)
var now = DateTime . Now ;
var source1 = Observable . Interval ( TimeSpan . FromSeconds ( 1 )). Take ( 3 ). Select ( x => $"N{x}" );
var source2 = Observable . Interval ( TimeSpan . FromSeconds ( 1.5 )). Take ( 3 ). Select ( x => $"L{x}" );
source1 . Join (
source2 ,
_ => Observable . Timer ( TimeSpan . FromSeconds ( 2 )), // 左侧元素存活时间
_ => Observable . Timer ( TimeSpan . FromSeconds ( 2 )), // 右侧元素存活时间
( n , l ) => $"{n}+{l}"
). Subscribe ( result => Console . WriteLine ( $"Join Result: {result}" ));
Console . ReadLine (); // 防止控制台退出
}
}
应用StartWith添加初始元素。
使用All检查条件。
使用Contains判断是否存在元素。
使用TakeUntil在另一个序列触发后停止。
使用Join合并两个序列。
每个操作符后订阅结果并输出。
添加必要的引用标注。
StartWith: 0
StartWith: 1
StartWith: 2
StartWith: 3
StartWith: 4
StartWith: 5
All > 0: True
Contains 3: True
TakeUntil: 1
TakeUntil: 2
TakeUntil: 3
TakeUntil: 4
TakeUntil: 5
TakeUntil Completed
Join Result: N0+L0
Join Result: N1+L0
Join Result: N0+L1
Join Result: N1+L1
Join Result: N2+L0
Join Result: N2+L1
Join Result: N2+L2