roi peker

Fixes subscription for Rx::bindStream

- add docs to bindStream for RxList,RxSet,RxMap,RxImpl.
- fixes the error of binding an ongoing Stream, without closing the Subscription when the observable gets closed.
... ... @@ -179,13 +179,17 @@ class _RxImpl<T> implements RxInterface<T> {
}
Stream<T> get stream => subject.stream;
StreamSubscription<T> listen(void Function(T) onData,
{Function onError, void Function() onDone, bool cancelOnError}) =>
stream.listen(onData, onError: onError, onDone: onDone);
/// Binds an existing stream to this Rx to keep the values in sync.
void bindStream(Stream<T> stream) => stream.listen((va) => value = va);
/// Binds an existing [Stream<T>] to this Rx<T> to keep the values in sync.
/// You can bind multiple sources to update the value.
/// Closing the subscription will happen automatically when the observer
/// Widget ([GetX] or [Obx]) gets unmounted from the Widget tree.
void bindStream(Stream<T> stream) {
_subscriptions[stream] = stream.listen((va) => value = va);
}
Stream<R> map<R>(R mapper(T data)) => stream.map(mapper);
}
... ...
import 'dart:async';
import 'dart:collection';
import 'dart:math';
import 'package:flutter/foundation.dart';
... ... @@ -31,7 +32,7 @@ class RxList<E> implements List<E>, RxInterface<List<E>> {
@override
StreamController<List<E>> subject = StreamController.broadcast();
final Map<Stream<List<E>>, StreamSubscription> _subscriptions = {};
final _subscriptions = HashMap<Stream<List<E>>, StreamSubscription>();
void operator []=(int index, E val) {
_list[index] = val;
... ... @@ -209,7 +210,13 @@ class RxList<E> implements List<E>, RxInterface<List<E>> {
}) =>
stream.listen(onData, onError: onError, onDone: onDone);
void bindStream(Stream<List<E>> stream) => stream.listen((va) => value = va);
/// Binds an existing [Stream<List>] to this [RxList].
/// You can bind multiple sources to update the value.
/// Closing the subscription will happen automatically when the observer
/// Widget ([GetX] or [Obx]) gets unmounted from the Widget tree.
void bindStream(Stream<List<E>> stream) {
_subscriptions[stream] = stream.listen((va) => value = va);
}
@override
E get first => value.first;
... ...
import 'dart:async';
import 'dart:collection';
import 'package:flutter/foundation.dart';
... ... @@ -13,7 +14,7 @@ class RxMap<K, V> implements RxInterface<Map<K, V>>, Map<K, V> {
@override
StreamController<Map<K, V>> subject = StreamController<Map<K, V>>.broadcast();
final Map<Stream<Map<K, V>>, StreamSubscription> _subscriptions = {};
final _subscriptions = HashMap<Stream<Map<K, V>>, StreamSubscription>();
Map<K, V> _value;
... ... @@ -62,8 +63,13 @@ class RxMap<K, V> implements RxInterface<Map<K, V>>, Map<K, V> {
{Function onError, void Function() onDone, bool cancelOnError}) =>
stream.listen(onData, onError: onError, onDone: onDone);
void bindStream(Stream<Map<K, V>> stream) =>
stream.listen((va) => value = va);
/// Binds an existing [Stream<Map>] to this [RxMap].
/// You can bind multiple sources to update the value.
/// Closing the subscription will happen automatically when the observer
/// Widget ([GetX] or [Obx]) gets unmounted from the Widget tree.
void bindStream(Stream<Map<K, V>> stream) {
_subscriptions[stream] = stream.listen((va) => value = va);
}
void add(K key, V value) {
_value[key] = value;
... ...
import 'dart:async';
import 'dart:collection';
import 'package:flutter/foundation.dart';
... ... @@ -27,7 +28,7 @@ class RxSet<E> implements Set<E>, RxInterface<Set<E>> {
bool get isNotEmpty => value.isNotEmpty;
StreamController<Set<E>> subject = StreamController<Set<E>>.broadcast();
final Map<Stream<Set<E>>, StreamSubscription> _subscriptions = {};
final _subscriptions = HashMap<Stream<Set<E>>, StreamSubscription>();
/// Adds [item] only if [condition] resolves to true.
void addIf(dynamic condition, E item) {
... ... @@ -153,7 +154,13 @@ class RxSet<E> implements Set<E>, RxInterface<Set<E>> {
{Function onError, void Function() onDone, bool cancelOnError}) =>
stream.listen(onData, onError: onError, onDone: onDone);
void bindStream(Stream<Set<E>> stream) => stream.listen((va) => value = va);
/// Binds an existing [Stream<Set>] to this [RxSet].
/// You can bind multiple sources to update the value.
/// Closing the subscription will happen automatically when the observer
/// Widget ([GetX] or [Obx]) gets unmounted from the Widget tree.
void bindStream(Stream<Set<E>> stream) {
_subscriptions[stream] = stream.listen((va) => value = va);
}
@override
E get first => value.first;
... ...