get_stream.dart
5.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
part of rx_stream;
/// [GetStream] is the lightest and most performative way of working
/// with events at Dart. You sintaxe is like StreamController, but it works
/// with simple callbacks. In this way, every event calls only one function.
/// There is no buffering, to very low memory consumption.
/// event [add] will add a object to stream. [addError] will add a error
/// to stream. [listen] is a very light StreamSubscription interface.
/// Is possible take the last value with [value] property.
class GetStream<T> {
void Function() onListen;
void Function() onPause;
void Function() onResume;
FutureOr<void> Function() onCancel;
GetStream({this.onListen, this.onPause, this.onResume, this.onCancel});
List<LightSubscription<T>> _onData = <LightSubscription<T>>[];
bool _isBusy = false;
FutureOr<bool> removeSubscription(LightSubscription<T> subs) async {
if (!_isBusy) {
return _onData.remove(subs);
} else {
await Future.delayed(Duration.zero);
return _onData?.remove(subs);
}
}
FutureOr<void> addSubscription(LightSubscription<T> subs) async {
if (!_isBusy) {
return _onData.add(subs);
} else {
await Future.delayed(Duration.zero);
return _onData.add(subs);
}
}
int get length => _onData?.length;
bool get hasListeners => _onData.isNotEmpty;
void _notifyData(T data) {
_isBusy = true;
for (final item in _onData) {
if (!item.isPaused) {
item._data?.call(data);
}
}
_isBusy = false;
}
void _notifyError(Object error, [StackTrace stackTrace]) {
assert(!isClosed, 'You cannot add errors to a closed stream.');
_isBusy = true;
var itemsToRemove = <LightSubscription<T>>[];
for (final item in _onData) {
if (!item.isPaused) {
item._onError?.call(error, stackTrace);
if (item.cancelOnError) {
//item.cancel?.call();
itemsToRemove.add(item);
item.pause();
item._onDone?.call();
}
}
}
for (final item in itemsToRemove) {
_onData.remove(item);
}
_isBusy = false;
}
void _notifyDone() {
assert(!isClosed, 'You cannot close a closed stream.');
_isBusy = true;
for (final item in _onData) {
if (!item.isPaused) {
item._onDone?.call();
}
}
_isBusy = false;
}
T _value;
T get value => _value;
void add(T event) {
assert(!isClosed, 'You cannot add event to closed Stream');
_value = event;
_notifyData(event);
}
bool get isClosed => _onData == null;
void addError(Object error, [StackTrace stackTrace]) {
assert(!isClosed, 'You cannot add error to closed Stream');
_notifyError(error, stackTrace);
}
void close() {
assert(!isClosed, 'You cannot close a closed Stream');
_notifyDone();
_onData = null;
_isBusy = null;
_value = null;
}
LightSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
final subs = LightSubscription<T>(
removeSubscription,
onPause: onPause,
onResume: onResume,
onCancel: onCancel,
)
..onData(onData)
..onError(onError)
..onDone(onDone)
..cancelOnError = cancelOnError;
addSubscription(subs);
onListen?.call();
return subs;
}
Stream<T> get stream =>
GetStreamTransformation(addSubscription, removeSubscription);
}
class LightSubscription<T> extends StreamSubscription<T> {
final RemoveSubscription<T> _removeSubscription;
LightSubscription(this._removeSubscription,
{this.onPause, this.onResume, this.onCancel});
final void Function() onPause;
final void Function() onResume;
final FutureOr<void> Function() onCancel;
bool cancelOnError = false;
@override
Future<void> cancel() {
_removeSubscription(this);
onCancel?.call();
return Future.value();
}
OnData<T> _data;
dynamic _onError;
Callback _onDone;
bool _isPaused = false;
@override
void onData(OnData<T> handleData) => _data = handleData;
@override
void onError(Function handleError) => _onError = handleError;
@override
void onDone(Callback handleDone) => _onDone = handleDone;
@override
void pause([Future<void> resumeSignal]) {
_isPaused = true;
onPause?.call();
}
@override
void resume() {
_isPaused = false;
onResume?.call();
}
@override
bool get isPaused => _isPaused;
@override
Future<E> asFuture<E>([E futureValue]) => Future.value(futureValue);
}
class GetStreamTransformation<T> extends Stream<T> {
final AddSubscription<T> _addSubscription;
final RemoveSubscription<T> _removeSubscription;
GetStreamTransformation(this._addSubscription, this._removeSubscription);
@override
LightSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
final subs = LightSubscription<T>(_removeSubscription)
..onData(onData)
..onError(onError)
..onDone(onDone);
_addSubscription(subs);
return subs;
}
}
typedef RemoveSubscription<T> = FutureOr<bool> Function(
LightSubscription<T> subs);
typedef AddSubscription<T> = FutureOr<void> Function(LightSubscription<T> subs);