Jonatas

improve getstream security

@@ -62,12 +62,24 @@ You cannot ${isClosed ? "close" : "add events to"} a closed stream.'''; @@ -62,12 +62,24 @@ You cannot ${isClosed ? "close" : "add events to"} a closed stream.''';
62 class LightListenable<T> { 62 class LightListenable<T> {
63 List<LightSubscription<T>> _onData = <LightSubscription<T>>[]; 63 List<LightSubscription<T>> _onData = <LightSubscription<T>>[];
64 64
65 - void removeSubscription(LightSubscription<T> subs) {  
66 - _onData.remove(subs); 65 + bool _isBusy = false;
  66 +
  67 + FutureOr<bool> removeSubscription(LightSubscription<T> subs) async {
  68 + if (!_isBusy) {
  69 + return _onData.remove(subs);
  70 + } else {
  71 + await Future.delayed(Duration.zero);
  72 + return _onData.remove(subs);
  73 + }
67 } 74 }
68 75
69 - void addSubscription(LightSubscription<T> subs) {  
70 - _onData.add(subs); 76 + FutureOr<void> addSubscription(LightSubscription<T> subs) async {
  77 + if (!_isBusy) {
  78 + return _onData.add(subs);
  79 + } else {
  80 + await Future.delayed(Duration.zero);
  81 + return _onData.add(subs);
  82 + }
71 } 83 }
72 84
73 int get length => _onData?.length; 85 int get length => _onData?.length;
@@ -75,23 +87,20 @@ class LightListenable<T> { @@ -75,23 +87,20 @@ class LightListenable<T> {
75 bool get hasListeners => _onData.isNotEmpty; 87 bool get hasListeners => _onData.isNotEmpty;
76 88
77 void notifyData(T data) { 89 void notifyData(T data) {
78 - _checkIfDisposed(); 90 + assert(!isDisposed, 'You cannot add data to a closed stream.');
  91 + _isBusy = true;
79 for (final item in _onData) { 92 for (final item in _onData) {
80 if (item.isPaused) { 93 if (item.isPaused) {
81 break; 94 break;
82 } 95 }
83 item._data?.call(data); 96 item._data?.call(data);
84 } 97 }
85 - }  
86 -  
87 - void _checkIfDisposed() {  
88 - if (isDisposed) {  
89 - throw '[LightStream] Error: You cannot add events to a closed stream.';  
90 - } 98 + _isBusy = false;
91 } 99 }
92 100
93 void notifyError(Object error, [StackTrace stackTrace]) { 101 void notifyError(Object error, [StackTrace stackTrace]) {
94 - _checkIfDisposed(); 102 + assert(!isDisposed, 'You cannot add errors to a closed stream.');
  103 + _isBusy = true;
95 for (final item in _onData) { 104 for (final item in _onData) {
96 if (item.isPaused) { 105 if (item.isPaused) {
97 break; 106 break;
@@ -102,19 +111,25 @@ class LightListenable<T> { @@ -102,19 +111,25 @@ class LightListenable<T> {
102 item._onDone?.call(); 111 item._onDone?.call();
103 } 112 }
104 } 113 }
  114 + _isBusy = false;
105 } 115 }
106 116
107 void notifyDone() { 117 void notifyDone() {
108 - _checkIfDisposed(); 118 + assert(!isDisposed, 'You cannot close a closed stream.');
  119 + _isBusy = true;
109 for (final item in _onData) { 120 for (final item in _onData) {
110 if (item.isPaused) { 121 if (item.isPaused) {
111 break; 122 break;
112 } 123 }
113 item._onDone?.call(); 124 item._onDone?.call();
114 } 125 }
  126 + _isBusy = false;
115 } 127 }
116 128
117 - void dispose() => _onData = null; 129 + void dispose() {
  130 + _onData = null;
  131 + _isBusy = null;
  132 + }
118 133
119 bool get isDisposed => _onData == null; 134 bool get isDisposed => _onData == null;
120 } 135 }
@@ -104,10 +104,8 @@ Worker once<T>(RxInterface<T> listener, WorkerCallback<T> callback, @@ -104,10 +104,8 @@ Worker once<T>(RxInterface<T> listener, WorkerCallback<T> callback,
104 if (!_conditional(condition)) return; 104 if (!_conditional(condition)) return;
105 ref._disposed = true; 105 ref._disposed = true;
106 ref._log('called'); 106 ref._log('called');
  107 + sub?.cancel();
107 callback(event); 108 callback(event);
108 - Timer.run(() {  
109 - sub?.cancel();  
110 - });  
111 }); 109 });
112 ref = Worker(sub.cancel, '[once]'); 110 ref = Worker(sub.cancel, '[once]');
113 return ref; 111 return ref;