Jonny Borges
Committed by GitHub

Merge pull request #593 from roipeker/fix_rx_binding

Fixes subscription for Rx::bindStream
@@ -179,13 +179,17 @@ class _RxImpl<T> implements RxInterface<T> { @@ -179,13 +179,17 @@ class _RxImpl<T> implements RxInterface<T> {
179 } 179 }
180 180
181 Stream<T> get stream => subject.stream; 181 Stream<T> get stream => subject.stream;
182 -  
183 StreamSubscription<T> listen(void Function(T) onData, 182 StreamSubscription<T> listen(void Function(T) onData,
184 {Function onError, void Function() onDone, bool cancelOnError}) => 183 {Function onError, void Function() onDone, bool cancelOnError}) =>
185 stream.listen(onData, onError: onError, onDone: onDone); 184 stream.listen(onData, onError: onError, onDone: onDone);
186 185
187 - /// Binds an existing stream to this Rx to keep the values in sync.  
188 - void bindStream(Stream<T> stream) => stream.listen((va) => value = va); 186 + /// Binds an existing [Stream<T>] to this Rx<T> to keep the values in sync.
  187 + /// You can bind multiple sources to update the value.
  188 + /// Closing the subscription will happen automatically when the observer
  189 + /// Widget ([GetX] or [Obx]) gets unmounted from the Widget tree.
  190 + void bindStream(Stream<T> stream) {
  191 + _subscriptions[stream] = stream.listen((va) => value = va);
  192 + }
189 193
190 Stream<R> map<R>(R mapper(T data)) => stream.map(mapper); 194 Stream<R> map<R>(R mapper(T data)) => stream.map(mapper);
191 } 195 }
1 import 'dart:async'; 1 import 'dart:async';
  2 +import 'dart:collection';
2 import 'dart:math'; 3 import 'dart:math';
3 4
4 import 'package:flutter/foundation.dart'; 5 import 'package:flutter/foundation.dart';
@@ -31,7 +32,7 @@ class RxList<E> implements List<E>, RxInterface<List<E>> { @@ -31,7 +32,7 @@ class RxList<E> implements List<E>, RxInterface<List<E>> {
31 @override 32 @override
32 StreamController<List<E>> subject = StreamController.broadcast(); 33 StreamController<List<E>> subject = StreamController.broadcast();
33 34
34 - final Map<Stream<List<E>>, StreamSubscription> _subscriptions = {}; 35 + final _subscriptions = HashMap<Stream<List<E>>, StreamSubscription>();
35 36
36 void operator []=(int index, E val) { 37 void operator []=(int index, E val) {
37 _list[index] = val; 38 _list[index] = val;
@@ -209,7 +210,13 @@ class RxList<E> implements List<E>, RxInterface<List<E>> { @@ -209,7 +210,13 @@ class RxList<E> implements List<E>, RxInterface<List<E>> {
209 }) => 210 }) =>
210 stream.listen(onData, onError: onError, onDone: onDone); 211 stream.listen(onData, onError: onError, onDone: onDone);
211 212
212 - void bindStream(Stream<List<E>> stream) => stream.listen((va) => value = va); 213 + /// Binds an existing [Stream<List>] to this [RxList].
  214 + /// You can bind multiple sources to update the value.
  215 + /// Closing the subscription will happen automatically when the observer
  216 + /// Widget ([GetX] or [Obx]) gets unmounted from the Widget tree.
  217 + void bindStream(Stream<List<E>> stream) {
  218 + _subscriptions[stream] = stream.listen((va) => value = va);
  219 + }
213 220
214 @override 221 @override
215 E get first => value.first; 222 E get first => value.first;
1 import 'dart:async'; 1 import 'dart:async';
  2 +import 'dart:collection';
2 3
3 import 'package:flutter/foundation.dart'; 4 import 'package:flutter/foundation.dart';
4 5
@@ -13,7 +14,7 @@ class RxMap<K, V> implements RxInterface<Map<K, V>>, Map<K, V> { @@ -13,7 +14,7 @@ class RxMap<K, V> implements RxInterface<Map<K, V>>, Map<K, V> {
13 14
14 @override 15 @override
15 StreamController<Map<K, V>> subject = StreamController<Map<K, V>>.broadcast(); 16 StreamController<Map<K, V>> subject = StreamController<Map<K, V>>.broadcast();
16 - final Map<Stream<Map<K, V>>, StreamSubscription> _subscriptions = {}; 17 + final _subscriptions = HashMap<Stream<Map<K, V>>, StreamSubscription>();
17 18
18 Map<K, V> _value; 19 Map<K, V> _value;
19 20
@@ -62,8 +63,13 @@ class RxMap<K, V> implements RxInterface<Map<K, V>>, Map<K, V> { @@ -62,8 +63,13 @@ class RxMap<K, V> implements RxInterface<Map<K, V>>, Map<K, V> {
62 {Function onError, void Function() onDone, bool cancelOnError}) => 63 {Function onError, void Function() onDone, bool cancelOnError}) =>
63 stream.listen(onData, onError: onError, onDone: onDone); 64 stream.listen(onData, onError: onError, onDone: onDone);
64 65
65 - void bindStream(Stream<Map<K, V>> stream) =>  
66 - stream.listen((va) => value = va); 66 + /// Binds an existing [Stream<Map>] to this [RxMap].
  67 + /// You can bind multiple sources to update the value.
  68 + /// Closing the subscription will happen automatically when the observer
  69 + /// Widget ([GetX] or [Obx]) gets unmounted from the Widget tree.
  70 + void bindStream(Stream<Map<K, V>> stream) {
  71 + _subscriptions[stream] = stream.listen((va) => value = va);
  72 + }
67 73
68 void add(K key, V value) { 74 void add(K key, V value) {
69 _value[key] = value; 75 _value[key] = value;
1 import 'dart:async'; 1 import 'dart:async';
  2 +import 'dart:collection';
2 3
3 import 'package:flutter/foundation.dart'; 4 import 'package:flutter/foundation.dart';
4 5
@@ -27,7 +28,7 @@ class RxSet<E> implements Set<E>, RxInterface<Set<E>> { @@ -27,7 +28,7 @@ class RxSet<E> implements Set<E>, RxInterface<Set<E>> {
27 bool get isNotEmpty => value.isNotEmpty; 28 bool get isNotEmpty => value.isNotEmpty;
28 29
29 StreamController<Set<E>> subject = StreamController<Set<E>>.broadcast(); 30 StreamController<Set<E>> subject = StreamController<Set<E>>.broadcast();
30 - final Map<Stream<Set<E>>, StreamSubscription> _subscriptions = {}; 31 + final _subscriptions = HashMap<Stream<Set<E>>, StreamSubscription>();
31 32
32 /// Adds [item] only if [condition] resolves to true. 33 /// Adds [item] only if [condition] resolves to true.
33 void addIf(dynamic condition, E item) { 34 void addIf(dynamic condition, E item) {
@@ -153,7 +154,13 @@ class RxSet<E> implements Set<E>, RxInterface<Set<E>> { @@ -153,7 +154,13 @@ class RxSet<E> implements Set<E>, RxInterface<Set<E>> {
153 {Function onError, void Function() onDone, bool cancelOnError}) => 154 {Function onError, void Function() onDone, bool cancelOnError}) =>
154 stream.listen(onData, onError: onError, onDone: onDone); 155 stream.listen(onData, onError: onError, onDone: onDone);
155 156
156 - void bindStream(Stream<Set<E>> stream) => stream.listen((va) => value = va); 157 + /// Binds an existing [Stream<Set>] to this [RxSet].
  158 + /// You can bind multiple sources to update the value.
  159 + /// Closing the subscription will happen automatically when the observer
  160 + /// Widget ([GetX] or [Obx]) gets unmounted from the Widget tree.
  161 + void bindStream(Stream<Set<E>> stream) {
  162 + _subscriptions[stream] = stream.listen((va) => value = va);
  163 + }
157 164
158 @override 165 @override
159 E get first => value.first; 166 E get first => value.first;