Jonny Borges

change stream api to notifier

... ... @@ -28,7 +28,7 @@ class Home extends ObxStatelessWidget {
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
SimpleBuilder(builder: (context) {
Observer(builder: (context) {
print('builder');
return Text(
'${controller.count.value}',
... ...
part of rx_stream;
/// [GetStream] is the lightest and most performative way of working
/// with events at Dart. You sintaxe is like StreamController, but it works
/// with simple callbacks. In this way, every event calls only one function.
/// There is no buffering, to very low memory consumption.
/// event [add] will add a object to stream. [addError] will add a error
/// to stream. [listen] is a very light StreamSubscription interface.
/// Is possible take the last value with [value] property.
class GetStream<T> {
void Function()? onListen;
void Function()? onPause;
void Function()? onResume;
FutureOr<void> Function()? onCancel;
GetStream({this.onListen, this.onPause, this.onResume, this.onCancel});
factory GetStream.fromValue(T value,
{Function()? onListen,
Function()? onPause,
Function()? onResume,
FutureOr<void> Function()? onCancel}) {
final valuedStream = GetStream<T>(
onListen: onListen,
onPause: onPause,
onResume: onResume,
onCancel: onCancel)
.._value = value;
return valuedStream;
}
List<LightSubscription<T>>? _onData = <LightSubscription<T>>[];
bool? _isBusy = false;
FutureOr<bool?> removeSubscription(LightSubscription<T> subs) async {
if (!_isBusy!) {
return _onData!.remove(subs);
} else {
await Future.delayed(Duration.zero);
return _onData?.remove(subs);
}
}
FutureOr<void> addSubscription(LightSubscription<T> subs) async {
if (!_isBusy!) {
return _onData!.add(subs);
} else {
await Future.delayed(Duration.zero);
return _onData!.add(subs);
}
}
int? get length => _onData?.length;
bool get hasListeners => _onData!.isNotEmpty;
void _notifyData(T data) {
_isBusy = true;
for (final item in _onData!) {
if (!item.isPaused) {
item._data?.call(data);
}
}
_isBusy = false;
}
void _notifyError(Object error, [StackTrace? stackTrace]) {
assert(!isClosed, 'You cannot add errors to a closed stream.');
_isBusy = true;
var itemsToRemove = <LightSubscription<T>>[];
for (final item in _onData!) {
if (!item.isPaused) {
if (stackTrace != null) {
item._onError?.call(error, stackTrace);
} else {
item._onError?.call(error);
}
if (item.cancelOnError ?? false) {
//item.cancel?.call();
itemsToRemove.add(item);
item.pause();
item._onDone?.call();
}
}
}
for (final item in itemsToRemove) {
_onData!.remove(item);
}
_isBusy = false;
}
void _notifyDone() {
assert(!isClosed, 'You cannot close a closed stream.');
_isBusy = true;
for (final item in _onData!) {
if (!item.isPaused) {
item._onDone?.call();
}
}
_isBusy = false;
}
late T _value;
T get value {
// RxInterface.proxy?.addListener(this);
return _value;
}
void add(T event) {
assert(!isClosed, 'You cannot add event to closed Stream');
_value = event;
_notifyData(event);
}
bool get isClosed => _onData == null;
void addError(Object error, [StackTrace? stackTrace]) {
assert(!isClosed, 'You cannot add error to closed Stream');
_notifyError(error, stackTrace);
}
void close() {
assert(!isClosed, 'You cannot close a closed Stream');
_notifyDone();
_onData = null;
_isBusy = null;
// _value = null;
}
LightSubscription<T> listen(void Function(T event) onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
final subs = LightSubscription<T>(
removeSubscription,
onPause: onPause,
onResume: onResume,
onCancel: onCancel,
)
..onData(onData)
..onError(onError)
..onDone(onDone)
..cancelOnError = cancelOnError;
addSubscription(subs);
onListen?.call();
return subs;
}
Stream<T> get stream =>
GetStreamTransformation(addSubscription, removeSubscription);
}
class LightSubscription<T> extends StreamSubscription<T> {
final RemoveSubscription<T> _removeSubscription;
LightSubscription(this._removeSubscription,
{this.onPause, this.onResume, this.onCancel});
final void Function()? onPause;
final void Function()? onResume;
final FutureOr<void> Function()? onCancel;
bool? cancelOnError = false;
@override
Future<void> cancel() {
_removeSubscription(this);
onCancel?.call();
return Future.value();
}
OnData<T>? _data;
Function? _onError;
Callback? _onDone;
bool _isPaused = false;
@override
void onData(OnData<T>? handleData) => _data = handleData;
@override
void onError(Function? handleError) => _onError = handleError;
@override
void onDone(Callback? handleDone) => _onDone = handleDone;
@override
void pause([Future<void>? resumeSignal]) {
_isPaused = true;
onPause?.call();
}
@override
void resume() {
_isPaused = false;
onResume?.call();
}
@override
bool get isPaused => _isPaused;
@override
Future<E> asFuture<E>([E? futureValue]) => Future.value(futureValue);
}
class GetStreamTransformation<T> extends Stream<T> {
final AddSubscription<T> _addSubscription;
final RemoveSubscription<T> _removeSubscription;
GetStreamTransformation(this._addSubscription, this._removeSubscription);
@override
LightSubscription<T> listen(void Function(T event)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
final subs = LightSubscription<T>(_removeSubscription)
..onData(onData)
..onError(onError)
..onDone(onDone);
_addSubscription(subs);
return subs;
}
}
typedef RemoveSubscription<T> = FutureOr<bool?> Function(
LightSubscription<T> subs);
typedef AddSubscription<T> = FutureOr<void> Function(LightSubscription<T> subs);
// part of rx_stream;
// /// [GetStream] is the lightest and most performative way of working
// /// with events at Dart. You sintaxe is like StreamController, but it works
// /// with simple callbacks. In this way, every event calls only one function.
// /// There is no buffering, to very low memory consumption.
// /// event [add] will add a object to stream. [addError] will add a error
// /// to stream. [listen] is a very light StreamSubscription interface.
// /// Is possible take the last value with [value] property.
// class GetStream<T> {
// void Function()? onListen;
// void Function()? onPause;
// void Function()? onResume;
// FutureOr<void> Function()? onCancel;
// GetStream({this.onListen, this.onPause, this.onResume, this.onCancel});
// factory GetStream.fromValue(T value,
// {Function()? onListen,
// Function()? onPause,
// Function()? onResume,
// FutureOr<void> Function()? onCancel}) {
// final valuedStream = GetStream<T>(
// onListen: onListen,
// onPause: onPause,
// onResume: onResume,
// onCancel: onCancel)
// .._value = value;
// return valuedStream;
// }
// List<LightSubscription<T>>? _onData = <LightSubscription<T>>[];
// bool? _isBusy = false;
// FutureOr<bool?> removeSubscription(LightSubscription<T> subs) async {
// if (!_isBusy!) {
// return _onData!.remove(subs);
// } else {
// await Future.delayed(Duration.zero);
// return _onData?.remove(subs);
// }
// }
// FutureOr<void> addSubscription(LightSubscription<T> subs) async {
// if (!_isBusy!) {
// return _onData!.add(subs);
// } else {
// await Future.delayed(Duration.zero);
// return _onData!.add(subs);
// }
// }
// int? get length => _onData?.length;
// bool get hasListeners => _onData!.isNotEmpty;
// void _notifyData(T data) {
// _isBusy = true;
// for (final item in _onData!) {
// if (!item.isPaused) {
// item._data?.call(data);
// }
// }
// _isBusy = false;
// }
// void _notifyError(Object error, [StackTrace? stackTrace]) {
// assert(!isClosed, 'You cannot add errors to a closed stream.');
// _isBusy = true;
// var itemsToRemove = <LightSubscription<T>>[];
// for (final item in _onData!) {
// if (!item.isPaused) {
// if (stackTrace != null) {
// item._onError?.call(error, stackTrace);
// } else {
// item._onError?.call(error);
// }
// if (item.cancelOnError ?? false) {
// //item.cancel?.call();
// itemsToRemove.add(item);
// item.pause();
// item._onDone?.call();
// }
// }
// }
// for (final item in itemsToRemove) {
// _onData!.remove(item);
// }
// _isBusy = false;
// }
// void _notifyDone() {
// assert(!isClosed, 'You cannot close a closed stream.');
// _isBusy = true;
// for (final item in _onData!) {
// if (!item.isPaused) {
// item._onDone?.call();
// }
// }
// _isBusy = false;
// }
// late T _value;
// T get value {
// // RxInterface.proxy?.addListener(this);
// return _value;
// }
// void add(T event) {
// assert(!isClosed, 'You cannot add event to closed Stream');
// _value = event;
// _notifyData(event);
// }
// bool get isClosed => _onData == null;
// void addError(Object error, [StackTrace? stackTrace]) {
// assert(!isClosed, 'You cannot add error to closed Stream');
// _notifyError(error, stackTrace);
// }
// void close() {
// assert(!isClosed, 'You cannot close a closed Stream');
// _notifyDone();
// _onData = null;
// _isBusy = null;
// // _value = null;
// }
// LightSubscription<T> listen(void Function(T event) onData,
// {Function? onError, void Function()? onDone, bool? cancelOnError}) {
// final subs = LightSubscription<T>(
// removeSubscription,
// onPause: onPause,
// onResume: onResume,
// onCancel: onCancel,
// )
// ..onData(onData)
// ..onError(onError)
// ..onDone(onDone)
// ..cancelOnError = cancelOnError;
// addSubscription(subs);
// onListen?.call();
// return subs;
// }
// Stream<T> get stream =>
// GetStreamTransformation(addSubscription, removeSubscription);
// }
// class LightSubscription<T> extends StreamSubscription<T> {
// final RemoveSubscription<T> _removeSubscription;
// LightSubscription(this._removeSubscription,
// {this.onPause, this.onResume, this.onCancel});
// final void Function()? onPause;
// final void Function()? onResume;
// final FutureOr<void> Function()? onCancel;
// bool? cancelOnError = false;
// @override
// Future<void> cancel() {
// _removeSubscription(this);
// onCancel?.call();
// return Future.value();
// }
// OnData<T>? _data;
// Function? _onError;
// Callback? _onDone;
// bool _isPaused = false;
// @override
// void onData(OnData<T>? handleData) => _data = handleData;
// @override
// void onError(Function? handleError) => _onError = handleError;
// @override
// void onDone(Callback? handleDone) => _onDone = handleDone;
// @override
// void pause([Future<void>? resumeSignal]) {
// _isPaused = true;
// onPause?.call();
// }
// @override
// void resume() {
// _isPaused = false;
// onResume?.call();
// }
// @override
// bool get isPaused => _isPaused;
// @override
// Future<E> asFuture<E>([E? futureValue]) => Future.value(futureValue);
// }
// class GetStreamTransformation<T> extends Stream<T> {
// final AddSubscription<T> _addSubscription;
// final RemoveSubscription<T> _removeSubscription;
// GetStreamTransformation(this._addSubscription, this._removeSubscription);
// @override
// LightSubscription<T> listen(void Function(T event)? onData,
// {Function? onError, void Function()? onDone, bool? cancelOnError}) {
// final subs = LightSubscription<T>(_removeSubscription)
// ..onData(onData)
// ..onError(onError)
// ..onDone(onDone);
// _addSubscription(subs);
// return subs;
// }
// }
// typedef RemoveSubscription<T> = FutureOr<bool?> Function(
// LightSubscription<T> subs);
// typedef AddSubscription<T> =
//FutureOr<void> Function(LightSubscription<T> subs);
... ...
... ... @@ -3,7 +3,6 @@ library rx_stream;
import 'dart:async';
import '../rx_typedefs/rx_typedefs.dart';
import '../rx_types/rx_types.dart';
part 'get_stream.dart';
//part 'get_stream.dart';
part 'mini_stream.dart';
... ...
... ... @@ -4,7 +4,7 @@ part of rx_types;
/// reactivity
/// of those `Widgets` and Rx values.
mixin RxObjectMixin<T> on NotifyManager<T> {
mixin RxObjectMixin<T> on GetListenable<T> {
//late T _value;
/// Makes a direct update of [value] adding it to the Stream
... ... @@ -25,9 +25,9 @@ mixin RxObjectMixin<T> on NotifyManager<T> {
/// person.refresh();
/// print( person );
/// ```
void refresh() {
subject.add(value);
}
// void refresh() {
// subject.add(value);
// }
/// updates the value to `null` and adds it to the Stream.
/// Even with null-safety coming, is still an important feature to support, as
... ... @@ -59,6 +59,7 @@ mixin RxObjectMixin<T> on NotifyManager<T> {
/// onChanged: myText,
/// ),
///```
@override
T call([T? v]) {
if (v != null) {
value = v;
... ... @@ -95,25 +96,18 @@ mixin RxObjectMixin<T> on NotifyManager<T> {
/// Updates the [value] and adds it to the stream, updating the observer
/// Widget, only if it's different from the previous value.
@override
set value(T val) {
if (subject.isClosed) return;
if (isDisposed) return;
sentToStream = false;
if (value == val && !firstRebuild) return;
firstRebuild = false;
// _value = val;
sentToStream = true;
subject.add(val);
//TODO: Check this
super.value = val;
}
/// Returns the current [value]
T get value {
return subject.value;
//RxInterface.proxy?.addListener(subject);
// return _value;
}
Stream<T> get stream => subject.stream;
/// Returns a [StreamSubscription] similar to [listen], but with the
/// added benefit that it primes the stream with the current [value], rather
/// than waiting for the next [value]. This should not be called in [onInit]
... ... @@ -127,6 +121,7 @@ mixin RxObjectMixin<T> on NotifyManager<T> {
cancelOnError: cancelOnError,
);
//TODO: Change to refresh????
subject.add(value);
return subscription;
... ... @@ -137,64 +132,64 @@ mixin RxObjectMixin<T> on NotifyManager<T> {
/// Closing the subscription will happen automatically when the observer
/// Widget (`GetX` or `Obx`) gets unmounted from the Widget tree.
void bindStream(Stream<T> stream) {
final listSubscriptions =
_subscriptions[subject] ??= <StreamSubscription>[];
listSubscriptions.add(stream.listen((va) => value = va));
}
}
class RxNotifier<T> = RxInterface<T> with NotifyManager<T>;
mixin NotifyManager<T> {
GetStream<T> subject = GetStream<T>();
final _subscriptions = <GetStream, List<StreamSubscription>>{};
// final listSubscriptions =
// _subscriptions[subject] ??= <StreamSubscription>[];
bool get canUpdate => _subscriptions.isNotEmpty;
/// This is an internal method.
/// Subscribe to changes on the inner stream.
void addListener(GetStream<T> rxGetx) {
if (!_subscriptions.containsKey(rxGetx)) {
final subs = rxGetx.listen((data) {
if (!subject.isClosed) subject.add(data);
});
final listSubscriptions =
_subscriptions[rxGetx] ??= <StreamSubscription>[];
listSubscriptions.add(subs);
}
}
StreamSubscription<T> listen(
void Function(T) onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
subject.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError ?? false,
);
/// Closes the subscriptions for this Rx, releasing the resources.
void close() {
_subscriptions.forEach((getStream, _subscriptions) {
for (final subscription in _subscriptions) {
subscription.cancel();
}
});
_subscriptions.clear();
subject.close();
final sub = stream.listen((va) => value = va);
reportAdd(sub.cancel);
}
}
//class RxNotifier<T> = RxInterface<T> with NotifyManager<T>;
// mixin NotifyManager<T> {
// GetStream<T> subject = GetStream<T>();
// final _subscriptions = <GetStream, List<StreamSubscription>>{};
// bool get canUpdate => _subscriptions.isNotEmpty;
// /// This is an internal method.
// /// Subscribe to changes on the inner stream.
// void addListener(GetStream<T> rxGetx) {
// if (!_subscriptions.containsKey(rxGetx)) {
// final subs = rxGetx.listen((data) {
// if (!subject.isClosed) subject.add(data);
// });
// final listSubscriptions =
// _subscriptions[rxGetx] ??= <StreamSubscription>[];
// listSubscriptions.add(subs);
// }
// }
// StreamSubscription<T> listen(
// void Function(T) onData, {
// Function? onError,
// void Function()? onDone,
// bool? cancelOnError,
// }) =>
// subject.listen(
// onData,
// onError: onError,
// onDone: onDone,
// cancelOnError: cancelOnError ?? false,
// );
// /// Closes the subscriptions for this Rx, releasing the resources.
// void close() {
// _subscriptions.forEach((getStream, _subscriptions) {
// for (final subscription in _subscriptions) {
// subscription.cancel();
// }
// });
// _subscriptions.clear();
// subject.close();
// }
// }
/// Base Rx class that manages all the stream logic for any Type.
abstract class _RxImpl<T> extends RxNotifier<T> with RxObjectMixin<T> {
_RxImpl(T initial) {
subject = GetStream.fromValue(initial);
}
abstract class _RxImpl<T> extends GetListenable<T> with RxObjectMixin<T> {
_RxImpl(T initial) : super(initial);
void addError(Object error, [StackTrace? stackTrace]) {
subject.addError(error, stackTrace);
... ...
... ... @@ -5,12 +5,10 @@ part of rx_types;
/// This interface is the contract that _RxImpl]<T> uses in all it's
/// subclass.
abstract class RxInterface<T> {
static RxInterface? proxy;
bool get canUpdate;
//bool get canUpdate;
/// Adds a listener to stream
void addListener(GetStream<T> rxGetx);
void addListener(VoidCallback listener);
/// Close the Rx Variable
void close();
... ... @@ -20,13 +18,24 @@ abstract class RxInterface<T> {
{Function? onError, void Function()? onDone, bool? cancelOnError});
/// Avoids an unsafe usage of the `proxy`
static T notifyChildren<T>(RxNotifier observer, ValueGetter<T> builder) {
final _observer = RxInterface.proxy;
RxInterface.proxy = observer;
final result = builder();
if (!observer.canUpdate) {
RxInterface.proxy = _observer;
throw """
// static T notifyChildren<T>(RxNotifier observer, ValueGetter<T> builder) {
// final _observer = RxInterface.proxy;
// RxInterface.proxy = observer;
// final result = builder();
// if (!observer.canUpdate) {
// RxInterface.proxy = _observer;
// throw ObxError();
// }
// RxInterface.proxy = _observer;
// return result;
// }
}
class ObxError {
const ObxError();
@override
String toString() {
return """
[Get] the improper use of a GetX has been detected.
You should only use GetX or Obx for the specific widget that will be updated.
If you are seeing this error, you probably did not insert any observable variables into GetX/Obx
... ... @@ -34,8 +43,5 @@ abstract class RxInterface<T> {
(example: GetX => HeavyWidget => variableObservable).
If you need to update a parent widget and a child widget, wrap each one in an Obx/GetX.
""";
}
RxInterface.proxy = _observer;
return result;
}
}
... ...
part of rx_types;
/// Create a list similar to `List<T>`
class RxList<E> extends ListMixin<E>
with NotifyManager<List<E>>, RxObjectMixin<List<E>>
implements RxInterface<List<E>> {
RxList([List<E> initial = const []]) {
subject = GetStream.fromValue(List.from(initial));
}
class RxList<E> extends GetListenable<List<E>>
with ListMixin<E>, RxObjectMixin<List<E>> {
RxList([List<E> initial = const []]) : super(initial);
factory RxList.filled(int length, E fill, {bool growable = false}) {
return RxList(List.filled(length, fill, growable: growable));
... ... @@ -87,12 +84,12 @@ class RxList<E> extends ListMixin<E>
@override
int get length => value.length;
@override
@protected
List<E> get value {
RxInterface.proxy?.addListener(subject);
return subject.value;
}
// @override
// @protected
// List<E> get value {
// RxInterface.proxy?.addListener(subject);
// return subject.value;
// }
@override
set length(int newLength) {
... ...
part of rx_types;
class RxMap<K, V> extends MapMixin<K, V>
with NotifyManager<Map<K, V>>, RxObjectMixin<Map<K, V>>
implements RxInterface<Map<K, V>> {
RxMap([Map<K, V> initial = const {}]) {
subject = GetStream.fromValue(Map.from(initial));
}
class RxMap<K, V> extends GetListenable<Map<K, V>>
with MapMixin<K, V>, RxObjectMixin<Map<K, V>> {
RxMap([Map<K, V> initial = const {}]) : super(initial);
factory RxMap.from(Map<K, V> other) {
return RxMap(Map.from(other));
... ... @@ -53,13 +50,13 @@ class RxMap<K, V> extends MapMixin<K, V>
return val;
}
@override
@protected
Map<K, V> get value {
return subject.value;
// RxInterface.proxy?.addListener(subject);
// return _value;
}
// @override
// @protected
// Map<K, V> get value {
// return subject.value;
// // RxInterface.proxy?.addListener(subject);
// // return _value;
// }
}
extension MapExtension<K, V> on Map<K, V> {
... ...
part of rx_types;
class RxSet<E> extends SetMixin<E>
with NotifyManager<Set<E>>, RxObjectMixin<Set<E>>
implements RxInterface<Set<E>> {
RxSet([Set<E> initial = const {}]) {
subject = GetStream.fromValue(Set.from(initial));
}
class RxSet<E> extends GetListenable<Set<E>>
with SetMixin<E>, RxObjectMixin<Set<E>> {
RxSet([Set<E> initial = const {}]) : super(initial);
/// Special override to push() element(s) in a reactive way
/// inside the List,
... ... @@ -20,13 +17,13 @@ class RxSet<E> extends SetMixin<E>
refresh();
}
@override
@protected
Set<E> get value {
return subject.value;
// RxInterface.proxy?.addListener(subject);
// return _value;
}
// @override
// @protected
// Set<E> get value {
// return subject.value;
// // RxInterface.proxy?.addListener(subject);
// // return _value;
// }
@override
@protected
... ...
... ... @@ -4,6 +4,7 @@ import 'dart:async';
import 'dart:collection';
import 'package:flutter/foundation.dart';
import 'package:get/get_state_manager/src/rx_flutter/rx_notifier.dart';
import 'package:get/get_state_manager/src/simple/list_notifier.dart';
import '../rx_stream/rx_stream.dart';
... ...
import 'dart:async';
import '../../../get_core/get_core.dart';
import '../../../get_state_manager/src/rx_flutter/rx_notifier.dart';
import '../rx_types/rx_types.dart';
import 'utils/debouncer.dart';
... ... @@ -57,7 +58,7 @@ class Workers {
/// }
/// ```
Worker ever<T>(
RxInterface<T> listener,
GetListenable<T> listener,
WorkerCallback<T> callback, {
dynamic condition = true,
Function? onError,
... ... @@ -132,7 +133,7 @@ Worker everAll(
/// }
///```
Worker once<T>(
RxInterface<T> listener,
GetListenable<T> listener,
WorkerCallback<T> callback, {
dynamic condition = true,
Function? onError,
... ... @@ -175,7 +176,7 @@ Worker once<T>(
/// );
/// ```
Worker interval<T>(
RxInterface<T> listener,
GetListenable<T> listener,
WorkerCallback<T> callback, {
Duration time = const Duration(seconds: 1),
dynamic condition = true,
... ... @@ -219,7 +220,7 @@ Worker interval<T>(
/// }
/// ```
Worker debounce<T>(
RxInterface<T> listener,
GetListenable<T> listener,
WorkerCallback<T> callback, {
Duration? time,
Function? onError,
... ...
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter/widgets.dart';
import '../../../get_core/get_core.dart';
import '../../../get_instance/src/get_instance.dart';
import '../../../get_instance/src/lifecycle.dart';
import '../../../get_rx/src/rx_types/rx_types.dart';
import '../simple/list_notifier.dart';
import '../simple/simple_builder.dart';
typedef GetXControllerBuilder<T extends GetLifeCycleMixin> = Widget Function(
T controller);
class StatefulObserverComponent = StatefulElement with ObserverComponent;
class GetX<T extends GetLifeCycleMixin> extends StatefulWidget {
final GetXControllerBuilder<T> builder;
final bool global;
... ... @@ -39,6 +40,9 @@ class GetX<T extends GetLifeCycleMixin> extends StatefulWidget {
});
@override
StatefulElement createElement() => StatefulElement(this);
@override
void debugFillProperties(DiagnosticPropertiesBuilder properties) {
super.debugFillProperties(properties);
properties
... ... @@ -55,10 +59,8 @@ class GetX<T extends GetLifeCycleMixin> extends StatefulWidget {
}
class GetXState<T extends GetLifeCycleMixin> extends State<GetX<T>> {
final _observer = RxNotifier();
T? controller;
bool? _isCreator = false;
late StreamSubscription _subs;
@override
void initState() {
... ... @@ -83,7 +85,7 @@ class GetXState<T extends GetLifeCycleMixin> extends State<GetX<T>> {
if (widget.global && Get.smartManagement == SmartManagement.onlyBuilder) {
controller?.onStart();
}
_subs = _observer.listen((data) => setState(() {}), cancelOnError: false);
super.initState();
}
... ... @@ -109,22 +111,29 @@ class GetXState<T extends GetLifeCycleMixin> extends State<GetX<T>> {
GetInstance().delete<T>(tag: widget.tag);
}
}
_subs.cancel();
_observer.close();
for (final disposer in disposers) {
disposer();
}
controller = null;
_isCreator = null;
super.dispose();
}
void _update() {
setState(() {});
}
final disposers = <Disposer>[];
@override
Widget build(BuildContext context) => TaskManager.instance
.exchange(disposers, _update, () => widget.builder(controller!));
@override
void debugFillProperties(DiagnosticPropertiesBuilder properties) {
super.debugFillProperties(properties);
properties.add(DiagnosticsProperty<T>('controller', controller));
}
@override
Widget build(BuildContext context) => RxInterface.notifyChildren(
_observer,
() => widget.builder(controller!),
);
}
... ...
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
... ... @@ -5,10 +7,7 @@ import '../../../instance_manager.dart';
import '../../get_state_manager.dart';
import '../simple/list_notifier.dart';
mixin StateMixin<T> on ListNotifier {
late T _value;
RxStatus? _status;
extension _NullOrEmpty on Object {
bool _isNullOrEmpty(dynamic val) {
if (val == null) return true;
var result = false;
... ... @@ -21,6 +20,11 @@ mixin StateMixin<T> on ListNotifier {
}
return result;
}
}
mixin StateMixin<T> on ListNotifier {
late T _value;
RxStatus? _status;
void _fillEmptyStatus() {
_status = _isNullOrEmpty(_value) ? RxStatus.loading() : RxStatus.success();
... ... @@ -72,6 +76,77 @@ mixin StateMixin<T> on ListNotifier {
}
}
class GetListenable<T> extends ListNotifierSingle
implements ValueListenable<T> {
GetListenable(T val) : _value = val;
StreamController<T>? _controller;
StreamController<T> get subject {
if (_controller == null) {
_controller = StreamController<T>.broadcast();
addListener(_streamListener);
}
return _controller!;
}
void _streamListener() {
_controller?.add(_value);
}
@mustCallSuper
void close() {
removeListener(_streamListener);
_controller?.close();
dispose();
}
Stream<T> get stream {
return subject.stream;
}
T _value;
@override
T get value {
reportRead();
return _value;
}
void _notify() {
refresh();
}
set value(T newValue) {
if (_value == newValue) return;
_value = newValue;
_notify();
}
T? call([T? v]) {
if (v != null) {
value = v;
}
return value;
}
StreamSubscription<T> listen(
void Function(T)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
stream.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError ?? false,
);
@override
String toString() => value.toString();
}
class Value<T> extends ListNotifier
with StateMixin<T>
implements ValueListenable<T?> {
... ... @@ -115,8 +190,6 @@ extension ReactiveT<T> on T {
Value<T> get reactive => Value<T>(this);
}
typedef Condition = bool Function();
abstract class GetNotifier<T> extends Value<T> with GetLifeCycleMixin {
GetNotifier(T initial) : super(initial);
}
... ... @@ -128,7 +201,7 @@ extension StateExt<T> on StateMixin<T> {
Widget? onLoading,
Widget? onEmpty,
}) {
return SimpleBuilder(builder: (_) {
return Observer(builder: (_) {
if (status.isLoading) {
return onLoading ?? const Center(child: CircularProgressIndicator());
} else if (status.isError) {
... ...
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter/widgets.dart';
import '../../../get_rx/src/rx_types/rx_types.dart';
import '../simple/simple_builder.dart';
typedef WidgetCallback = Widget Function();
... ... @@ -12,48 +10,8 @@ typedef WidgetCallback = Widget Function();
/// See also:
/// - [Obx]
/// - [ObxValue]
abstract class ObxWidget extends StatefulWidget {
abstract class ObxWidget extends ObxStatelessWidget {
const ObxWidget({Key? key}) : super(key: key);
@override
void debugFillProperties(DiagnosticPropertiesBuilder properties) {
super.debugFillProperties(properties);
properties..add(ObjectFlagProperty<Function>.has('builder', build));
}
@override
_ObxState createState() => _ObxState();
@protected
Widget build();
}
class _ObxState extends State<ObxWidget> {
final _observer = RxNotifier();
late StreamSubscription subs;
@override
void initState() {
super.initState();
subs = _observer.subject.stream.listen(_updateTree, cancelOnError: false);
}
void _updateTree(_) {
if (mounted) {
setState(() {});
}
}
@override
void dispose() {
subs.cancel();
_observer.close();
super.dispose();
}
@override
Widget build(BuildContext context) =>
RxInterface.notifyChildren(_observer, widget.build);
}
/// The simplest reactive widget in GetX.
... ... @@ -69,7 +27,9 @@ class Obx extends ObxWidget {
const Obx(this.builder);
@override
Widget build() => builder();
Widget build(BuildContext context) {
return builder();
}
}
/// Similar to Obx, but manages a local state.
... ... @@ -90,5 +50,5 @@ class ObxValue<T extends RxInterface> extends ObxWidget {
const ObxValue(this.builder, this.data, {Key? key}) : super(key: key);
@override
Widget build() => builder(data);
Widget build(BuildContext context) => builder(data);
}
... ...
... ... @@ -47,15 +47,22 @@ mixin ListNotifierSingleMixin on Listenable {
TaskManager.instance.notify(this);
}
@protected
void reportAdd(VoidCallback disposer) {
TaskManager.instance.reportAdd(disposer);
}
void _notifyUpdate() {
for (var element in _updaters!) {
element!();
}
}
bool get isDisposed => _updaters == null;
bool _debugAssertNotDisposed() {
assert(() {
if (_updaters == null) {
if (isDisposed) {
throw FlutterError('''A $runtimeType was used after being disposed.\n
'Once you have called dispose() on a $runtimeType, it can no longer be used.''');
}
... ... @@ -151,18 +158,16 @@ class TaskManager {
GetStateUpdate? _setter;
List<VoidCallback>? _remove;
final listNotifier = ListNotifierGroup();
// void addElement(Object id, GetStateUpdate listener) {
// _remove?.add(listNotifier.addListenerId(id, listener));
// }
void reportAdd(VoidCallback listener) {
_remove?.add(listener);
}
void notify(ListNotifierSingleMixin _updaters) {
final listener = _setter;
if (listener != null) {
if (!_updaters.containsListener(listener)) {
_updaters.addListener(listener);
_remove?.add(() => _updaters.removeListener(listener));
reportAdd(() => _updaters.removeListener(listener));
}
}
}
... ... @@ -172,8 +177,26 @@ class TaskManager {
_remove = disposers;
_setter = setState;
final result = builder();
if (disposers.isEmpty) {
throw ObxError();
}
_remove = null;
_setter = null;
return result;
}
}
class ObxError {
const ObxError();
@override
String toString() {
return """
[Get] the improper use of a GetX has been detected.
You should only use GetX or Obx for the specific widget that will be updated.
If you are seeing this error, you probably did not insert any observable variables into GetX/Obx
or insert them outside the scope that GetX considers suitable for an update
(example: GetX => HeavyWidget => variableObservable).
If you need to update a parent widget and a child widget, wrap each one in an Obx/GetX.
""";
}
}
... ...
... ... @@ -78,10 +78,10 @@ class _ValueBuilderState<T> extends State<ValueBuilder<T?>> {
class ObxElement = StatelessElement with ObserverComponent;
// It's a experimental feature
class SimpleBuilder extends ObxStatelessWidget {
class Observer extends ObxStatelessWidget {
final WidgetBuilder builder;
const SimpleBuilder({Key? key, required this.builder}) : super(key: key);
const Observer({Key? key, required this.builder}) : super(key: key);
@override
Widget build(BuildContext context) => builder(context);
... ...
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:get/state_manager.dart';
... ... @@ -73,28 +74,28 @@ Future<int> stream() {
return c.future;
}
Future<int> getStream() {
final c = Completer<int>();
// Future<int> getStream() {
// final c = Completer<int>();
final value = GetStream<int>();
final timer = Stopwatch();
timer.start();
// final value = GetStream<int>();
// final timer = Stopwatch();
// timer.start();
value.listen((v) {
if (times == v) {
timer.stop();
print(
"""$v listeners notified | [GET_STREAM] time: ${timer.elapsedMicroseconds}ms""");
c.complete(timer.elapsedMicroseconds);
}
});
// value.listen((v) {
// if (times == v) {
// timer.stop();
// print(
// """$v listeners notified | [GET_STREAM] time: ${timer.elapsedMicroseconds}ms""");
// c.complete(timer.elapsedMicroseconds);
// }
// });
for (var i = 0; i < times + 1; i++) {
value.add(i);
}
// for (var i = 0; i < times + 1; i++) {
// value.add(i);
// }
return c.future;
}
// return c.future;
// }
Future<int> miniStream() {
final c = Completer<int>();
... ... @@ -157,7 +158,7 @@ GetValue is ${calculePercentage(dart, getx).round()}% faster than Default ValueN
print('============================================');
print('DART STREAM X GET_STREAM X GET_MINI_STREAM TEST');
print('-----------');
var getx = await getStream();
// var getx = await getStream();
var mini = await miniStream();
var dart = await stream();
print('-----------');
... ... @@ -167,16 +168,16 @@ GetStream is ${calculePercentage(dart, mini).round()}% faster than Default Strea
times = 30000;
dart = await stream();
getx = await getStream();
// getx = await getStream();
mini = await miniStream();
times = 60000;
dart = await stream();
getx = await getStream();
// getx = await getStream();
mini = await miniStream();
print('-----------');
print('dart_stream delay $dart ms to made $times requests');
print('getx_stream delay $getx ms to made $times requests');
// print('getx_stream delay $getx ms to made $times requests');
print('getx_mini_stream delay $mini ms to made $times requests');
print('-----------');
print('''
... ...