gpt4 book ai didi

unit-testing - Dart:如何测试流是否在特定时间发出元素?

转载 作者:行者123 更新时间:2023-11-28 19:59:12 26 4
gpt4 key购买 nike

我尝试测试函数Stream transform(Stream input)。如何测试返回的流是否在特定时间发出元素?

RxJS (JavaScript) 我可以使用 TestScheduler在特定时间在输入流上发射元素,并测试它们是否在特定时间在输出流上发射。在这个example ,转换函数被传递给scheduler.startWithCreate:

var scheduler = new Rx.TestScheduler();

// Create hot observable which will start firing
var xs = scheduler.createHotObservable(
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onCompleted(230)
);

// Note we'll start at 200 for subscribe, hence missing the 150 mark
var res = scheduler.startWithCreate(function () {
return xs.map(function (x) { return x * x });
});

// Implement collection assertion
collectionAssert.assertEqual(res.messages, [
onNext(210, 4),
onNext(220, 9),
onCompleted(230)
]);

// Check for subscribe/unsubscribe
collectionAssert.assertEqual(xs.subscriptions, [
subscribe(200, 230)
]);

最佳答案

更新: 将我的代码作为一个名为 stream_test_scheduler 的包发布.

此代码的工作方式类似于 TestSchedulerRxJS , 但它使用实时(毫秒)而不是虚拟时间,因为你不能在 Dart 中伪造时间(参见 Irn's comment )。您可以将最大偏差传递给匹配器。我在这个例子中使用了 20 毫秒。但偏差各不相同。您可能必须对另一个测试或另一个(更快/更慢)系统使用不同的最大偏差值。

编辑:我将示例更改为延迟转换函数,它是 delay 的较短(较少可配置/参数)版本stream_ext的功能包裹。该测试检查元素是否延迟了一秒。

import 'dart:async';

import 'package:test/test.dart';

// To test:
/// Modified version of <https://github.com/theburningmonk/stream_ext/wiki/delay>
Stream delay(Stream input, Duration duration) {
var controller = new StreamController.broadcast(sync : true);
delayCall(Function f, [Iterable args]) => args == null
? new Timer(duration, f)
: new Timer(duration, () => Function.apply(f, args));
input.listen(
(x) => delayCall(_tryAdd, [controller, x]),
onError : (ex) => delayCall(_tryAddError, [ex]),
onDone : () => delayCall(_tryClose, [controller])
);
return controller.stream;
}

_tryAdd(StreamController controller, event) {
if (!controller.isClosed) controller.add(event);
}

_tryAddError(StreamController controller, err) {
if (!controller.isClosed) controller.addError(err);
}

_tryClose(StreamController controller) {
if (!controller.isClosed) controller.close();
}

main() async {
test('delay preserves relative time intervals between the values', () async {
var scheduler = new TestScheduler();

var source = scheduler.createStream([
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onCompleted(230)
]);

var result = await scheduler.startWithCreate(() => delay(source, ms(1000)));

expect(result, equalsRecords([
onNext(1150, 1),
onNext(1210, 2),
onNext(1220, 3),
onCompleted(1230)
], maxDeviation: 20));
});
}

equalsRecords(List<Record> records, {int maxDeviation: 0}) {
return pairwiseCompare(records, (Record r1, Record r2) {
var deviation = (r1.ticks.inMilliseconds - r2.ticks.inMilliseconds).abs();
if (deviation > maxDeviation) {
return false;
}
if (r1 is OnNextRecord && r2 is OnNextRecord) {
return r1.value == r2.value;
}
if (r1 is OnErrorRecord && r2 is OnErrorRecord) {
return r1.exception == r2.exception;
}
return (r1 is OnCompletedRecord && r2 is OnCompletedRecord);
}, 'equal with deviation of ${maxDeviation}ms to');
}

class TestScheduler {
final SchedulerTasks _tasks;

TestScheduler() : _tasks = new SchedulerTasks();

Stream createStream(List<Record> records) {
final controller = new StreamController(sync: true);
_tasks.add(controller, records);
return controller.stream;
}

Future<List<Record>> startWithCreate(Stream createStream()) {
final completer = new Completer<List<Record>>();
final records = <Record>[];
final start = new DateTime.now();
int timeStamp() {
final current = new DateTime.now();
return current.difference(start).inMilliseconds;
}
createStream().listen(
(event) => records.add(onNext(timeStamp(), event)),
onError: (exception) => records.add(onError(timeStamp(), exception)),
onDone: () {
records.add(onCompleted(timeStamp()));
completer.complete(records);
}
);
_tasks.run();
return completer.future;
}
}

class SchedulerTasks {
Map<Record, StreamController> _controllers = {};
List<Record> _records = [];

void add(StreamController controller, List<Record> records) {
for (var record in records) {
_controllers[record] = controller;
}
_records.addAll(records);
}

void run() {
_records.sort();
for (var record in _records) {
final controller = _controllers[record];
new Future.delayed(record.ticks, () {
if (record is OnNextRecord) {
controller.add(record.value);
} else if (record is OnErrorRecord) {
controller.addError(record.exception);
} else if (record is OnCompletedRecord) {
controller.close();
}
});
}
}
}

onNext(int ticks, int value) => new OnNextRecord(ms(ticks), value);

onCompleted(int ticks) => new OnCompletedRecord(ms(ticks));

onError(int ticks, exception) => new OnErrorRecord(ms(ticks), exception);

Duration ms(int milliseconds) => new Duration(milliseconds: milliseconds);

abstract class Record implements Comparable {
final Duration ticks;
Record(this.ticks);

@override
int compareTo(other) => ticks.compareTo(other.ticks);
}

class OnNextRecord extends Record {
final value;
OnNextRecord(Duration ticks, this.value) : super (ticks);

@override
String toString() => 'onNext($value)@${ticks.inMilliseconds}';
}

class OnErrorRecord extends Record {
final exception;
OnErrorRecord(Duration ticks, this.exception) : super (ticks);

@override
String toString() => 'onError($exception)@${ticks.inMilliseconds}';
}

class OnCompletedRecord extends Record {
OnCompletedRecord(Duration ticks) : super (ticks);

@override
String toString() => 'onCompleted()@${ticks.inMilliseconds}';
}

关于unit-testing - Dart:如何测试流是否在特定时间发出元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32345853/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com