rxjava是一个在java虚拟机上,使用可观察的序列构成基于事件的,异步的程序库
白话:类似于Google提供 给我我们使用的AsyncTask,它能在异步线程处理后传递到UI线程中处理,现在基本上已经由Rxjava替换了AsyncTask来使用
RxAndroid是基于RxJava开发出的一套适用于Android开发的辅助库
白话:RxJava通过RxAndroid的辅助,使得原本RxJava只在Java上运行的程序也能在Android上开发
观察者模式就是RxJava使用的核心点,掌握这个模式,可以理解RxJava更简单,观察者模式简单的说就是:订阅-发布;的模式,举一个例子说,当你订阅某家牛奶店的早餐奶(订阅过程),只要牛奶店生成牛奶,便会给你送过去(发布过程)。这里的牛奶店只有一家,但是订阅的人可以很多。这是一种一对多的关系,只要牛奶店发布牛奶,那么订阅的人就会收到牛奶。换成RxJava里面的话,牛奶店就是被观察者(ObServable),订阅的人就是观察者(ObServer),根据这个例子,我们通过带啊来实现这逻辑
#####1. 创建观察者和被观察者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| //被观察者 public interface ObServable{ //订阅 public subscribe(ObServer obServer); //取消订阅 void cancel(ObServer obServer); //发布 void onNext(); } //观察者 public interface ObServer{ //接收通知 void onNotify(); }
|
#####2.实现观察者和被观察者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| //牛奶店 public class MilkFactory implements ObServable{ private List<ObServer> ObServers; public MilkFactory(){ this.ObServers=new ArrayList<>(); } @Override public void subscribe(ObServer obServer){ ObServers.add(obServer); }
@Override public void cancel(ObServer obServer) { ObServers.remove(obServer); }
@Override public void onNext() { for (ObServer obServer:obServables) { obServer.onNotify(); } } } //顾客 public class MilkConsumer implements Observer{ private String name; public MilkConsumer(String name){ this.name=name; } @Override public void onNotify(){ System.out.println(this.name+"收到牛奶"); } } //观察者订阅被观察者 @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); ObServable milkFactory=new MilkFactory(); ObServer milkConsumer1=new MilkConsumer("顾客1"); ObServer milkConsumer2=new MilkConsumer("顾客2"); //订阅过程 milkFactory.subscribe(milkConsumer1); milkFactory.subscribe(milkConsumer2); //发布过程 System.out.println("此时,牛奶店已经产出早餐奶,通知并发送给各位顾客"); milkFactory.onNext(); } //查看程序输出的结果 //此时,牛奶店已经产出早餐奶,通知并发送给各位顾客 //顾客1 收到牛奶 //顾客2 收到牛奶
|
RxJava中的观察者模式也是跟例子中的大同小异,也是通过创建出观察者和被观察者,然后进行订阅事件,只不过这里的观察者和被观察者已经不在是我们自定义的两个接口类,而是通过RxJava提供好的观察者和被观察者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); //拿到被观察者 ObServable<String> obServable=getObServable(); //拿到观察者 ObServer<String> obServer=getObServer(); //订阅 obServable.subscribe(obServer); } public static Observable<String> getObServable(){ return Observable.create(new ObServableOnSubscribe<String>(){ @Override public void subscribe(@NonNull ObServableEmitter<String> e) throws Exception{ e.onNext("事件1"); e.onNext("事件2");//onNext()类似于下一步 e.onComplete();//onComplete()完成 } }); } public static Observer<String> getObServer(){ return new ObServer<String>(){ Disposable disposable=null; @Override public void onSubscribe(Disposable d){ disposable=d; } @Override public void onNext(String s){ System.out.println("onNext="+s); if (s.equals("取消关注")){ //断开订阅 disposable.dispose(); } } @Override public void onError(){
} @Override public void onComplete(){ System.out.println("onComplete"); } }; } //执行结果: //onNext=事件1 //onNext=事件2 //onComplete
|