Jonatas

fix concurrent modification

@@ -55,6 +55,8 @@ You cannot ${isClosed ? "close" : "add events to"} a closed stream.'''; @@ -55,6 +55,8 @@ You cannot ${isClosed ? "close" : "add events to"} a closed stream.''';
55 listenable.addSubscription(subs); 55 listenable.addSubscription(subs);
56 return subs; 56 return subs;
57 } 57 }
  58 +
  59 + Stream<T> get stream => GetStreamTransformation(listenable);
58 } 60 }
59 61
60 class LightListenable<T> { 62 class LightListenable<T> {
@@ -125,7 +127,10 @@ class LightSubscription<T> extends StreamSubscription<T> { @@ -125,7 +127,10 @@ class LightSubscription<T> extends StreamSubscription<T> {
125 bool cancelOnError = false; 127 bool cancelOnError = false;
126 128
127 @override 129 @override
128 - Future<void> cancel() async => listener.removeSubscription(this); 130 + Future<void> cancel() {
  131 + listener.removeSubscription(this);
  132 + return Future.value();
  133 + }
129 134
130 OnData<T> _data; 135 OnData<T> _data;
131 136
1 library rx_stream; 1 library rx_stream;
2 2
3 import 'dart:async'; 3 import 'dart:async';
  4 +import 'package:flutter/scheduler.dart';
  5 +
4 import '../rx_typedefs/rx_typedefs.dart'; 6 import '../rx_typedefs/rx_typedefs.dart';
5 7
6 part 'get_stream.dart'; 8 part 'get_stream.dart';
@@ -11,6 +11,8 @@ bool _conditional(dynamic condition) { @@ -11,6 +11,8 @@ bool _conditional(dynamic condition) {
11 return true; 11 return true;
12 } 12 }
13 13
  14 +typedef WorkerCallback<T> = Function(T callback);
  15 +
14 /// 16 ///
15 /// Called every time [listener] changes. As long as the [condition] 17 /// Called every time [listener] changes. As long as the [condition]
16 /// returns true. 18 /// returns true.
@@ -41,7 +43,7 @@ bool _conditional(dynamic condition) { @@ -41,7 +43,7 @@ bool _conditional(dynamic condition) {
41 /// void increment() => count + 1; 43 /// void increment() => count + 1;
42 /// } 44 /// }
43 /// ``` 45 /// ```
44 -Worker ever<T>(RxInterface<T> listener, Function(T) callback, 46 +Worker ever<T>(RxInterface<T> listener, WorkerCallback<T> callback,
45 {dynamic condition = true}) { 47 {dynamic condition = true}) {
46 StreamSubscription sub = listener.subject.listen((event) { 48 StreamSubscription sub = listener.subject.listen((event) {
47 if (_conditional(condition)) callback(event); 49 if (_conditional(condition)) callback(event);
@@ -53,7 +55,7 @@ Worker ever<T>(RxInterface<T> listener, Function(T) callback, @@ -53,7 +55,7 @@ Worker ever<T>(RxInterface<T> listener, Function(T) callback,
53 /// for the [callback] is common to all [listeners], 55 /// for the [callback] is common to all [listeners],
54 /// and the [callback] is executed to each one of them. The [Worker] is 56 /// and the [callback] is executed to each one of them. The [Worker] is
55 /// common to all, so [worker.dispose()] will cancel all streams. 57 /// common to all, so [worker.dispose()] will cancel all streams.
56 -Worker everAll(List<RxInterface> listeners, Function(dynamic) callback, 58 +Worker everAll(List<RxInterface> listeners, WorkerCallback callback,
57 {dynamic condition = true}) { 59 {dynamic condition = true}) {
58 final evers = <StreamSubscription>[]; 60 final evers = <StreamSubscription>[];
59 for (var i in listeners) { 61 for (var i in listeners) {
@@ -94,7 +96,7 @@ Worker everAll(List<RxInterface> listeners, Function(dynamic) callback, @@ -94,7 +96,7 @@ Worker everAll(List<RxInterface> listeners, Function(dynamic) callback,
94 /// void increment() => count + 1; 96 /// void increment() => count + 1;
95 /// } 97 /// }
96 ///``` 98 ///```
97 -Worker once<T>(RxInterface<T> listener, Function(T) callback, 99 +Worker once<T>(RxInterface<T> listener, WorkerCallback<T> callback,
98 {dynamic condition}) { 100 {dynamic condition}) {
99 Worker ref; 101 Worker ref;
100 StreamSubscription sub; 102 StreamSubscription sub;
@@ -102,8 +104,10 @@ Worker once<T>(RxInterface<T> listener, Function(T) callback, @@ -102,8 +104,10 @@ Worker once<T>(RxInterface<T> listener, Function(T) callback,
102 if (!_conditional(condition)) return; 104 if (!_conditional(condition)) return;
103 ref._disposed = true; 105 ref._disposed = true;
104 ref._log('called'); 106 ref._log('called');
105 - sub?.cancel();  
106 callback(event); 107 callback(event);
  108 + Timer.run(() {
  109 + sub?.cancel();
  110 + });
107 }); 111 });
108 ref = Worker(sub.cancel, '[once]'); 112 ref = Worker(sub.cancel, '[once]');
109 return ref; 113 return ref;
@@ -126,7 +130,7 @@ Worker once<T>(RxInterface<T> listener, Function(T) callback, @@ -126,7 +130,7 @@ Worker once<T>(RxInterface<T> listener, Function(T) callback,
126 /// condition: () => count < 20, 130 /// condition: () => count < 20,
127 /// ); 131 /// );
128 /// ``` 132 /// ```
129 -Worker interval<T>(RxInterface<T> listener, Function(T) callback, 133 +Worker interval<T>(RxInterface<T> listener, WorkerCallback<T> callback,
130 {Duration time = const Duration(seconds: 1), dynamic condition = true}) { 134 {Duration time = const Duration(seconds: 1), dynamic condition = true}) {
131 var debounceActive = false; 135 var debounceActive = false;
132 time ??= const Duration(seconds: 1); 136 time ??= const Duration(seconds: 1);
@@ -159,7 +163,7 @@ Worker interval<T>(RxInterface<T> listener, Function(T) callback, @@ -159,7 +163,7 @@ Worker interval<T>(RxInterface<T> listener, Function(T) callback,
159 /// ); 163 /// );
160 /// } 164 /// }
161 /// ``` 165 /// ```
162 -Worker debounce<T>(RxInterface<T> listener, Function(T) callback, 166 +Worker debounce<T>(RxInterface<T> listener, WorkerCallback<T> callback,
163 {Duration time}) { 167 {Duration time}) {
164 final _debouncer = 168 final _debouncer =
165 Debouncer(delay: time ?? const Duration(milliseconds: 800)); 169 Debouncer(delay: time ?? const Duration(milliseconds: 800));