跳至主要內容

Dart event loop

JI,XIAOYONG...大约 19 分钟

本文基于 Dart 2.17


Dart App 中所有的代码都在一个 isolate 中运行(各个 isolate 之间的代码运行时是隔离的),一个 isolate 有自己的 heap,维持有一个消息队列 event_loop,处理两种消息:

  1. event queue 执行用户点击、屏幕刷新、绘制,一般的 Future、IO、Stream 流等,每次执行完毕都会先检查执行 micro task queue 中的任务,直到其为空再执行下一个 event queue
  2. microTask queue 优先执行,一般执行跑完即弃的小任务,如 Dart 内部的微任务

上述两种 event 会在普通的 Dart 同步方法执行完毕后执行,无论是 microTask 还是普通的 event,他们都是concurrency 并行执行(也就是说实际上还是上一个执行完毕,再执行另外一个),所以如果这些 event 中存在耗时长的方法,依旧会阻塞其他方法的执行,可能导致 UI 卡顿等情况。


在代码执行的过程中,各种事件(如用户点击、屏幕刷新、future、microtask 等)都会被当做一个个 event 放入到 event queue 中,然后不停的从 event loop 取出事件并执行:

dart_event_loop
dart_event_loop

他们的执行顺序如下:

dart_event_loop_sequeue
dart_event_loop_sequeue

可以从下述例子详细看一下代码执行的时候各个方法执行过程:

dart_test_queue_code
dart_test_queue_code
dart_test_queue_output
dart_test_queue_output

本文根据 Dart SDK 源码分析一下 event loop 的实现。

代码参考:https://gist.github.com/jixiaoyong/ac811902db42a51cf97e3290788ade4aopen in new window

1. 同步方法

同步方法包括普通的方法,以及一下几种会按照同步方法立即执行的方式:

Future.sync(() => print("Hello, I am future created by Future.sync"));
Future.forEach(
      [1, 2, 3],
      (element) =>
          print("Hello, I am future($element) created by Future.forEach"));
Future.doWhile(() async {
    if (repeatCounter++ < 3) {
      print("repeat ($repeatCounter/3) inner Future.doWhile");
      await Future.delayed(const Duration(seconds: 1));
      return true;
    }
    return false;
  });

2. micro task

microtask 会在同步方法执行完毕之后立即被执行,一般用来执行“即抛型”的方法,不应当执行耗时方法。microtask 列表会一直执行,直到 event loop 中没有 micro task 了,才会去执行 Future 等普通的 event。

scheduleMicrotask(() {
    print('Hello, world! I am a microtask.');
  });

Future.microtask(
      () => print("Hello, I am microtask created by Future.microtask"));

Future.value(1).then((value) {
    print("Hello, I am future created by Future.value");
  });

Future.error(Exception("Hello, I am future created by Future.error"))
      .onError((error, stackTrace) => print(error));

Future.value([FutureOr<T>? value]) 比较特殊,如果value 是 future,那么他会在 value 执行完毕后返回他的值,如果value不是 future,他就会立即执行属于 microtask

代码分析

让我们看一下上述方法的具体实现:

// -> sdk\lib\async\future.dart

factory Future.microtask(FutureOr<T> computation()) {
    _Future<T> result = new _Future<T>();
    scheduleMicrotask(() {
      try {
        result._complete(computation());
      } catch (e, s) {
        _completeWithErrorCallback(result, e, s);
      }
    });
    return result;
  }

可以看出,Future.microtask本质还是调用scheduleMicrotask实现的,其实现如下:

// -> sdk\lib\async\schedule_microtask.dart

('vm:entry-point', 'call')
void scheduleMicrotask(void Function() callback) {
  _Zone currentZone = Zone._current;
  if (identical(_rootZone, currentZone)) {
    // No need to bind the callback. We know that the root's scheduleMicrotask
    // will be invoked in the root zone.
    _rootScheduleMicrotask(null, null, _rootZone, callback);
    return;
  }
  _ZoneFunction implementation = currentZone._scheduleMicrotask;
  if (identical(_rootZone, implementation.zone) &&
      _rootZone.inSameErrorZone(currentZone)) {
    _rootScheduleMicrotask(
        null, null, currentZone, currentZone.registerCallback(callback));
    return;
  }
  Zone.current.scheduleMicrotask(Zone.current.bindCallbackGuarded(callback));
}

Zone.scheduleMicrotask()最后调用的是_RootZone的同名方法:

// -> sdk\lib\async\zone.dart

class _RootZone extends _Zone {
	void scheduleMicrotask(void f()) {
    _rootScheduleMicrotask(null, null, this, f);
  }
}

void _rootScheduleMicrotask(
    Zone? self, ZoneDelegate? parent, Zone zone, void f()) {
  if (!identical(_rootZone, zone)) {
    bool hasErrorHandler = !_rootZone.inSameErrorZone(zone);
    if (hasErrorHandler) {
      f = zone.bindCallbackGuarded(f);
    } else {
      f = zone.bindCallback(f);
    }
  }
  _scheduleAsyncCallback(f);
}

_RootZone._scheduleAsyncCallback

这里调用了_RootZone._scheduleAsyncCallback方法,将传入的 callback 当做 microtask 执行。

// -> sdk\lib\async\schedule_microtask.dart

/// Schedules a callback to be called as a microtask.
///
/// The microtask is called after all other currently scheduled
/// microtasks, but as part of the current system event.
void _scheduleAsyncCallback(_AsyncCallback callback) {
  _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
  _AsyncCallbackEntry? lastCallback = _lastCallback;
  if (lastCallback == null) {
    _nextCallback = _lastCallback = newEntry;
    if (!_isInCallbackLoop) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  } else {
    lastCallback.next = newEntry;
    _lastCallback = newEntry;
  }
}

_RootZone._startMicrotaskLoop

这里面的_startMicrotaskLoop方法是实际上处理 microtask 的地方:

// -> sdk\lib\async\schedule_microtask.dart

/// Whether we are currently inside the callback loop.
///
/// If we are inside the loop, we never need to schedule the loop,
/// even if adding a first element.
bool _isInCallbackLoop = false;

void _microtaskLoop() {
  for (var entry = _nextCallback; entry != null; entry = _nextCallback) {
    _lastPriorityCallback = null;
    var next = entry.next;
    _nextCallback = next;
    if (next == null) _lastCallback = null;
    (entry.callback)();
  }
}

void _startMicrotaskLoop() {
  _isInCallbackLoop = true;
  try {
    // Moved to separate function because try-finally prevents
    // good optimization.
    _microtaskLoop();
  } finally {
    _lastPriorityCallback = null;
    _isInCallbackLoop = false;
    if (_nextCallback != null) {
      _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
    }
  }
}

_AsyncRun._scheduleImmediate方法则是触发处理 microtask 的方法:

// -> sdk\lib\async\schedule_microtask.dart

class _AsyncRun {
  /// Schedule the given callback before any other event in the event-loop.
  external static void _scheduleImmediate(void Function() callback);
}

_AsyncRun._scheduleImmediate

_AsyncRun._scheduleImmediate方法的实现在schedule_microtask_patch.dart中:

// -> sdk/lib/_internal/vm/lib/schedule_microtask_patch.dart


class _AsyncRun {
  
  static void _scheduleImmediate(void callback()) {
    final closure = _ScheduleImmediate._closure;
    if (closure == null) {
      throw new UnsupportedError("Microtasks are not supported");
    }
    closure(callback);
  }
}

typedef void _ScheduleImmediateClosure(void callback());

class _ScheduleImmediate {
  static _ScheduleImmediateClosure? _closure;
}

("vm:entry-point", "call")
void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {
  _ScheduleImmediate._closure = closure;
}

("vm:entry-point", "call")
void _ensureScheduleImmediate() {
  _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
}

_ScheduleImmediateClosure

可以看到,microtask 实际上是使用_ScheduleImmediateClosure调用的,关于他主要有两个方法:

  1. _setScheduleImmediateClosure
  2. _ensureScheduleImmediate

让我们先看一下第一个方法_setScheduleImmediateClosure

// -> runtime\bin\dartutils.cc

// PrepareAsyncLibrary 方法会在 Dart 虚拟机启动的时候被调用
Dart_Handle DartUtils::PrepareAsyncLibrary(Dart_Handle async_lib,
                                           Dart_Handle isolate_lib) {
  Dart_Handle schedule_immediate_closure = Dart_Invoke(
      isolate_lib, NewString("_getIsolateScheduleImmediateClosure"), 0, NULL);
  RETURN_IF_ERROR(schedule_immediate_closure);
  Dart_Handle args[1];
  args[0] = schedule_immediate_closure;
  return Dart_Invoke(async_lib, NewString("_setScheduleImmediateClosure"), 1,
                     args);
}

这里主要调用了 Dart 中的_getIsolateScheduleImmediateClosure方法创建了schedule_immediate_closure,然后通过_setScheduleImmediateClosure返回:

// -> sdk\lib\_internal\vm\lib\isolate_patch.dart

/// The embedder can execute this function to get hold of
/// [_isolateScheduleImmediate] above.
("vm:entry-point", "call")
Function _getIsolateScheduleImmediateClosure() {
  return _isolateScheduleImmediate;
}

/// The closure that should be used as scheduleImmediateClosure, when the VM
/// is responsible for the event loop.
void _isolateScheduleImmediate(void callback()) {
  assert((_pendingImmediateCallback == null) ||
      (_pendingImmediateCallback == callback));
  _pendingImmediateCallback = callback;
}

/// The callback that has been registered through `scheduleImmediate`.
_ImmediateCallback? _pendingImmediateCallback;

可以看到,这个方法会将传递进来的callback赋值给_pendingImmediateCallback

而结合上面的代码,_ensureScheduleImmediate 方法主要也是用来触发_ScheduleImmediateClosure执行回调事件。


到目前为止,我们能确定的是:

  • 在 Dart VM 启动的时候,会创建一个_ScheduleImmediateClosure并保存在_pendingImmediateCallback;
  • 当有新的 microtask 加入的时候,会触发_startMicrotaskLoop方法在_microtaskLoop()中实际处理一个 microtask(这里的_startMicrotaskLoop触发的实际是通过_AsyncRun._scheduleImmediate(_startMicrotaskLoop)将其使用_pendingImmediateCallback 包裹之后执行的)。

_pendingImmediateCallback

现在的问题是,这个_pendingImmediateCallback 什么时候会被安排执行呢?

// -> sdk\lib\_internal\vm\lib\isolate_patch.dart

("vm:entry-point", "call")
void _runPendingImmediateCallback() {
  final callback = _pendingImmediateCallback;
  if (callback != null) {
    _pendingImmediateCallback = null;
    callback();
  }
}

("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {

	// Called from the VM to retrieve  the handler and handle a message.
  ("vm:entry-point", "call")
  static _handleMessage(int id, var message) {
    final handler = _portMap[id]?['handler'];
    if (handler == null) {
      return null;
    }
    // TODO(floitsch): this relies on the fact that any exception aborts the
    // VM. Once we have non-fatal global exceptions we need to catch errors
    // so that we can run the immediate callbacks.
    handler(message);
    _runPendingImmediateCallback();
    return handler;
  }

}

注意这里的关键代码,在_handleMessage方法中,会先执行RawReceivePort原本的 handler 内容,然后,执行_runPendingImmediateCallback()

_runPendingImmediateCallback则会执行_pendingImmediateCallback的内容,也就是前面的_startMicrotaskLoop,处理 event loop 中的 micro task。


这也就证明了我们之前说的“microtask 会在同步方法之后立即执行,并在每次普通的 event loop 执行完毕之后,都会检查并执行 event loop 中的 microtask,之后才继续执行普通 event”。

在下面的分析中,我们也还可以看到,在 event 每次处理 Timer 事件之后,都会检查执行 micro task。

3. event

除了下面列出来的使用 Future 或者 Timer 等创建的方法外,屏幕点击、刷新等事件也在此类 event 中。

当前 event loop 中没有 micro task 之后,就会执行一次此类普通 event,然后再检查一次 event loop,如果有 micro task 就执行 micro task 直到清空 micro task,否则继续执行下一个普通 event,直到 event loop 列表为空,退出 app。

先看几种会触发此类事件的方法:

Future.delayed(const Duration(seconds: 1), () {
    print("Hello, I am future");
  });

Future.any([
    Future(() {
      return "I am Future run immediately Future.any";
    }),
    Future.delayed(const Duration(seconds: 1), () {
      return "I am Future run delay, will be discard Future.any";
    })
  ]).then((value) => print(value));

Future.wait([
    Future(() {
      return "I am Future run immediately Future.wait 1/2";
    }),
    Future.delayed(const Duration(seconds: 1), () {
      return "I am Future run delay Future.wait 2/2";
    })
  ]).then((value) => print(value));

以及 Timer

Timer.periodic(const Duration(seconds: 1), (timer) {
    print("Hello, I am running inner(${timer.tick}/2) Timer.periodic");
    if (timer.tick == 2) {
      timer.cancel();
    }
  });

  Timer.run(() {
    print(
        "Hello, I will run asynchronously as soon as possible with Timer.run");
  });

Timer(Duration(seconds: 1), () {
    print("Hello, I will run asynchronously after 1 second with Timer");
  });

代码分析

我们依次看一下上述几个方法的具体实现:

// -> sdk\lib\async\future.dart

factory Future.delayed(Duration duration, [FutureOr<T> computation()?]) {
    if (computation == null && !typeAcceptsNull<T>()) {
      throw ArgumentError.value(
          null, "computation", "The type parameter is not nullable");
    }
    _Future<T> result = new _Future<T>();
    new Timer(duration, () {
      if (computation == null) {
        result._complete(null as T);
      } else {
        try {
          result._complete(computation());
        } catch (e, s) {
          _completeWithErrorCallback(result, e, s);
        }
      }
    });
    return result;
  }

static Future<T> any<T>(Iterable<Future<T>> futures) {
    var completer = new Completer<T>.sync();
    void onValue(T value) {
      if (!completer.isCompleted) completer.complete(value);
    }

    void onError(Object error, StackTrace stack) {
      if (!completer.isCompleted) completer.completeError(error, stack);
    }

    for (var future in futures) {
			// 一旦有一个 future 执行完毕,就立即返回结果,并丢弃掉后续 future 的返回
      future.then(onValue, onError: onError);
    }
    return completer.future;
  }

("vm:recognized", "other")
  static Future<List<T>> wait<T>(Iterable<Future<T>> futures,
      {bool eagerError = false, void cleanUp(T successValue)?}) {
    // This is a VM recognised method, and the _future variable is deliberately
    // allocated in a specific slot in the closure context for stack unwinding.
    final _Future<List<T>> _future = _Future<List<T>>();
    List<T?>? values; // Collects the values. Set to null on error.
    int remaining = 0; // How many futures are we waiting for.
    late Object error; // The first error from a future.
    late StackTrace stackTrace; // The stackTrace that came with the error.

    // Handle an error from any of the futures.
    void handleError(Object theError, StackTrace theStackTrace) {
      remaining--;
      List<T?>? valueList = values;
      if (valueList != null) {
        if (cleanUp != null) {
          for (var value in valueList) {
            if (value != null) {
              // Ensure errors from cleanUp are uncaught.
              T cleanUpValue = value;
              new Future.sync(() {
                cleanUp(cleanUpValue);
              });
            }
          }
        }
        values = null;
        if (remaining == 0 || eagerError) {
          _future._completeError(theError, theStackTrace);
        } else {
          error = theError;
          stackTrace = theStackTrace;
        }
      } else if (remaining == 0 && !eagerError) {
        _future._completeError(error, stackTrace);
      }
    }

    try {
      // As each future completes, put its value into the corresponding
      // position in the list of values.
      for (var future in futures) {
        int pos = remaining;
				// 在这里依次执行 future
        future.then((T value) {
          remaining--;
          List<T?>? valueList = values;
          if (valueList != null) {
            valueList[pos] = value;
            if (remaining == 0) {
              _future._completeWithValue(List<T>.from(valueList));
            }
          } else {
            if (cleanUp != null && value != null) {
              // Ensure errors from cleanUp are uncaught.
              new Future.sync(() {
                cleanUp(value);
              });
            }
            if (remaining == 0 && !eagerError) {
              // If eagerError is false, and valueList is null, then
              // error and stackTrace have been set in handleError above.
              _future._completeError(error, stackTrace);
            }
          }
        }, onError: handleError);
        // Increment the 'remaining' after the call to 'then'.
        // If that call throws, we don't expect any future callback from
        // the future, and we also don't increment remaining.
        remaining++;
      }
      if (remaining == 0) {
        return _future.._completeWithValue(<T>[]);
      }
      values = new List<T?>.filled(remaining, null);
    } catch (e, st) {
      // The error must have been thrown while iterating over the futures
      // list, or while installing a callback handler on the future.
      // This is a breach of the `Future` protocol, but we try to handle it
      // gracefully.
      if (remaining == 0 || eagerError) {
        // Throw a new Future.error.
        // Don't just call `_future._completeError` since that would propagate
        // the error too eagerly, not giving the callers time to install
        // error handlers.
        // Also, don't use `_asyncCompleteError` since that one doesn't give
        // zones the chance to intercept the error.
        return new Future.error(e, st);
      } else {
        // Don't allocate a list for values, thus indicating that there was an
        // error.
        // Set error to the caught exception.
        error = e;
        stackTrace = st;
      }
    }
    return _future;
  }

可以看到,除了Future.waitFuture.any 这两个处理 Future 集合的方法外,Future.delayed 这个方法内部是实际上是通过 Timer 实现**的。

Future.then

在看 Timer 实现之前,先看一下Futrue.then的实现,他对应的实现是_Future.then

// -> sdk\lib\async\future_impl.dart

class _Future<T> implements Future<T> {
// Register callbacks to be called when this future completes.
//
// When this future completes with a value, the [onValue] callback will be called with that value.
//  If this future is already completed, the callback will not be called immediately,
// but will be scheduled in a later microtask
Future<R> then<R>(FutureOr<R> f(T value), {Function? onError}) {
    Zone currentZone = Zone.current;
    if (identical(currentZone, _rootZone)) {
      if (onError != null &&
          onError is! Function(Object, StackTrace) &&
          onError is! Function(Object)) {
        throw ArgumentError.value(
            onError,
            "onError",
            "Error handler must accept one Object or one Object and a StackTrace"
                " as arguments, and return a value of the returned future's type");
      }
    } else {
      f = currentZone.registerUnaryCallback<FutureOr<R>, T>(f);
      if (onError != null) {
        // This call also checks that onError is assignable to one of:
        //   dynamic Function(Object)
        //   dynamic Function(Object, StackTrace)
        onError = _registerErrorHandler(onError, currentZone);
      }
    }
    _Future<R> result = new _Future<R>();
    _addListener(new _FutureListener<T, R>.then(result, f, onError));
		// 返回创建好的 Future
    return result;
  }

}

_Futrue.then只是对传入的回调的进行了包装,实际上是通过_Future._addListener()实现具体的逻辑:

// -> sdk\lib\async\future_impl.dart

bool get _mayComplete => (_state & _completionStateMask) == _stateIncomplete;
  bool get _isPendingComplete => (_state & _statePendingComplete) != 0;
  bool get _mayAddListener =>
      _state <= (_statePendingComplete | _stateIgnoreError);

void _addListener(_FutureListener listener) {
    assert(listener._nextListener == null);
		// 如果是待完成的或者忽略错误的,将当前 listener 添加到链表头部;
		// 在后文处理结果的时候,会从链表尾部开始读取
    if (_mayAddListener) {
      listener._nextListener = _resultOrListeners;
      _resultOrListeners = listener;
    } else {
      if (_isChained) {
        // Delegate listeners to chained source future.
        // If the source is complete, instead copy its values and
        // drop the chaining.
        _Future source = _chainSource;
        if (!source._isComplete) {
				  // 如果依赖于 source,那么就添加为 source 的 listener
          source._addListener(listener);
          return;
        }
        _cloneResult(source);
      }
      assert(_isComplete);
      // Handle late listeners asynchronously.
      _zone.scheduleMicrotask(() {
				// Propagates the value/error of [source] to its [listeners]
        _propagateToListeners(this, listener);
      });
    }
  }

_Future._addListener(_FutureListener listener)中基本上做了如下判断:

  • 如果 Future 是延迟完成的,就添加监听。
  • 如果 Future 已经完成了,就加入到 micro task 中,安排执行 listener 回调(_propagateToListeners(this, listener))。

具体可以参考Flutter 之 Future 原理解析open in new window


Timer

我们再看一下 Timer 的实现:

// -> sdk\lib\async\timer.dart

// Timer.run

static void run(void Function() callback) {
    new Timer(Duration.zero, callback);
  }

factory Timer(Duration duration, void Function() callback) {
    if (Zone.current == Zone.root) {
      // No need to bind the callback. We know that the root's timer will
      // be invoked in the root zone.
      return Zone.current.createTimer(duration, callback);
    }
    return Zone.current
        .createTimer(duration, Zone.current.bindCallbackGuarded(callback));
  }

factory Timer.periodic(Duration duration, void callback(Timer timer)) {
    if (Zone.current == Zone.root) {
      // No need to bind the callback. We know that the root's timer will
      // be invoked in the root zone.
      return Zone.current.createPeriodicTimer(duration, callback);
    }
    var boundCallback = Zone.current.bindUnaryCallbackGuarded<Timer>(callback);
    return Zone.current.createPeriodicTimer(duration, boundCallback);
  }

创建 Timer

可以看到,Timer 的创建实际上是 Zone 通过两种方式创建的:

// -> sdk\lib\async\zone.dart
abstract class Zone {

	/// Creates a [Timer] where the callback is executed in this zone.
  Timer createTimer(Duration duration, void Function() callback);

  /// Creates a periodic [Timer] where the callback is executed in this zone.
  Timer createPeriodicTimer(Duration period, void callback(Timer timer));

}

Zone 是抽象类,他的实现是_RootZone

// -> sdk\lib\async\zone.dart

class _RootZone extends _Zone {

Timer createTimer(Duration duration, void f()) {
    return Timer._createTimer(duration, f);
  }

  Timer createPeriodicTimer(Duration duration, void f(Timer timer)) {
    return Timer._createPeriodicTimer(duration, f);
  }

}

可以看到这里实际上是调用了 Timer 中对应的私有方法:

// -> sdk\lib\async\timer.dart

	external static Timer _createTimer(
      Duration duration, void Function() callback);
  external static Timer _createPeriodicTimer(
      Duration duration, void callback(Timer timer));

他们的具体实现在timer_patch.dart中:

// -> sdk\lib\_internal\vm\lib\timer_patch.dart


class Timer {
  
  static Timer _createTimer(Duration duration, void callback()) {
    final factory = VMLibraryHooks.timerFactory;
    if (factory == null) {
      throw new UnsupportedError("Timer interface not supported.");
    }
    int milliseconds = duration.inMilliseconds;
    if (milliseconds < 0) milliseconds = 0;
    return factory(milliseconds, (_) {
      callback();
    }, false);
  }

  
  static Timer _createPeriodicTimer(
      Duration duration, void callback(Timer timer)) {
    final factory = VMLibraryHooks.timerFactory;
    if (factory == null) {
      throw new UnsupportedError("Timer interface not supported.");
    }
    int milliseconds = duration.inMilliseconds;
    if (milliseconds < 0) milliseconds = 0;
    return factory(milliseconds, callback, true);
  }
}

可以看到,无论是单次的还是循环的 Timer 都是使用VMLibraryHooks.timerFactory创建的:

// -> sdk\lib\_internal\vm\lib\timer_impl.dart

("vm:entry-point", "call")
_setupHooks() {
  VMLibraryHooks.timerFactory = _Timer._factory;
}

	// The Timer factory registered with the dart:async library by the embedder.
  static Timer _factory(
      int milliSeconds, void callback(Timer timer), bool repeating) {
    if (repeating) {
      return new _Timer.periodic(milliSeconds, callback);
    }
    return new _Timer(milliSeconds, callback);
  }

	factory _Timer(int milliSeconds, void callback(Timer timer)) {
    return _createTimer(callback, milliSeconds, false);
  }

  factory _Timer.periodic(int milliSeconds, void callback(Timer timer)) {
    return _createTimer(callback, milliSeconds, true);
  }

最终都是调用的_Timer._createTimer方法:

// -> sdk\lib\_internal\vm\lib\timer_impl.dart

static _Timer _createTimer(
      void callback(Timer timer), int milliSeconds, bool repeating) {
    // Negative timeouts are treated as if 0 timeout.
    if (milliSeconds < 0) {
      milliSeconds = 0;
    }
    // Add one because DateTime.now() is assumed to round down
    // to nearest millisecond, not up, so that time + duration is before
    // duration milliseconds from now. Using microsecond timers like
    // Stopwatch allows detecting that the timer fires early.
    int now = VMLibraryHooks.timerMillisecondClock();
    int wakeupTime = (milliSeconds == 0) ? now : (now + 1 + milliSeconds);

    _Timer timer =
        new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
    // Enqueue this newly created timer in the appropriate structure and
    // notify if necessary.
    timer._enqueue();
    return timer;
  }

在创建 timer 的时候,先获取了当前的时间戳,然后计算出 timer 的唤醒时间wakeupTime ,最后调用_Timer._internal创建 timer。

_Timer._internal只是简单创建了 Timer:

_Timer._internal(
      this._callback, this._wakeupTime, this._milliSeconds, this._repeating)
      : _id = _nextId();

在创建根据需要创建好 Timer 之后,使用_Timer._enqueue方法把 Timer 放入到相应的队列中。

timer._enqueue

主要看一下timer._enqueue()方法的实现:

// -> sdk\lib\_internal\vm\lib\timer_impl.dart

	// Timers are ordered by wakeup time. Timers with a timeout value of > 0 do
  // end up on the TimerHeap. Timers with a timeout of 0 are queued in a list.
  static final _heap = new _TimerHeap();
  static _Timer? _firstZeroTimer;
  static _Timer _lastZeroTimer = _sentinelTimer;

	// Adds a timer to the heap or timer list. Timers with the same wakeup time
  // are enqueued in order and notified in FIFO order.
  void _enqueue() {
    if (_milliSeconds == 0) {
      if (_firstZeroTimer == null) {
        _lastZeroTimer = this;
        _firstZeroTimer = this;
      } else {
        _lastZeroTimer._indexOrNext = this;
        _lastZeroTimer = this;
      }
      // Every zero timer gets its own event.
      _notifyZeroHandler();
    } else {
      _heap.add(this);
      if (_heap.isFirst(this)) {
        _notifyEventHandler();
      }
    }
  }

可以看到无论是单次还是循环的 Timer 最后都是使用_Timer._internal创建的,然后再使用_Timer._enqueue()方法将 timer 添加到heap或者timer list中:

  • 如果 Timer 的_milliSeconds为 0,则会被添加到_lastZeroTimer中(并将上一个 timer 的_indexOrNext 指向自己),并在_notifyZeroHandler()方法发送_ZERO_EVENT 事件(最终会触发_Timer._handleMessage );
  • 否则则将其加入到_heap中,如果他是第一个 timer,就通过_notifyEventHandler()启动处理 Timer 的 event handler(这个线程会在合适的时间唤起 Timer 执行_Timer._handleMessage方法)。

在具体分析整个过程之前,我们先看一下几个属性的创建过程:

// -> sdk\lib\_internal\vm\lib\timer_impl.dart

class _Timer implements Timer {

	static _RawReceivePortImpl? _receivePort;
  static SendPort? _sendPort;

// Tell the event handler to wake this isolate at a specific time.
  static void _scheduleWakeup(int wakeupTime) {
    if (!_receivePortActive) {
      _createTimerHandler();
    }
    VMLibraryHooks.eventHandlerSendData(null, _sendPort!, wakeupTime);
    _scheduledWakeupTime = wakeupTime;
  }

// Enqueue one message for each zero timer. To be able to distinguish from
  // EventHandler messages we send a _ZERO_EVENT instead of a _TIMEOUT_EVENT.
  static void _notifyZeroHandler() {
    if (!_receivePortActive) {
      _createTimerHandler();
    }
    _sendPort!.send(_ZERO_EVENT);
  }

	// Create a receive port and register a message handler for the timer
  // events.
  static void _createTimerHandler() {
    var receivePort = _receivePort;
    if (receivePort == null) {
      assert(_sendPort == null);
      final port = _RawReceivePortImpl('Timer');
      port.handler = _handleMessage;
      _sendPort = port.sendPort;
      _receivePort = port;
      _scheduledWakeupTime = 0;
    } else {
      receivePort._setActive(true);
    }
    _receivePortActive = true;
  }

}

从上面代码我们可以看到:

  • _sendPort_receivePort对应的 sendPort,后者的 handler 是_handleMessage()方法
  • 无论是_notifyEventHandler()还是 _notifyZeroHandler()都会保证_createTimerHandler()调用过
_milliSeconds == 0

先看一下_milliSeconds为 0 的情况:

// -> sdk\lib\_internal\vm\lib\timer_impl.dart

// Enqueue one message for each zero timer. To be able to distinguish from
  // EventHandler messages we send a _ZERO_EVENT instead of a _TIMEOUT_EVENT.
  static void _notifyZeroHandler() {
    if (!_receivePortActive) {
      _createTimerHandler();
    }
    _sendPort!.send(_ZERO_EVENT);
  }

按照上面的分析,_sendPort!.send(_ZERO_EVENT)发送的消息,通过MessageHandler::PostMessage处理,最后调用_receivePort的 handler 也就是在_handleMessage(msg)方法中执行。

_milliSeconds ≠ 0
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

static void _notifyEventHandler() {
    if (_handlingCallbacks) {
      // While we are already handling callbacks we will not notify the event
      // handler. _handleTimeout will call _notifyEventHandler once all pending
      // timers are processed.
      return;
    }

    // If there are no pending timers. Close down the receive port.
    if ((_firstZeroTimer == null) && _heap.isEmpty) {
      // No pending timers: Close the receive port and let the event handler
      // know.
      if (_sendPort != null) {
        _cancelWakeup();
        _shutdownTimerHandler();
      }
      return;
    } else if (_heap.isEmpty) {
      // Only zero timers are left. Cancel any scheduled wakeups.
      _cancelWakeup();
      return;
    }
    // Only send a message if the requested wakeup time differs from the
    // already scheduled wakeup time.
    var wakeupTime = _heap.first._wakeupTime;
    if ((_scheduledWakeupTime == 0) || (wakeupTime != _scheduledWakeupTime)) {
      _scheduleWakeup(wakeupTime);
    }
  }

// Tell the event handler to wake this isolate at a specific time.
  static void _scheduleWakeup(int wakeupTime) {
    if (!_receivePortActive) {
      _createTimerHandler();
    }
    VMLibraryHooks.eventHandlerSendData(null, _sendPort!, wakeupTime);
    _scheduledWakeupTime = wakeupTime;
  }

可见,当_milliSeconds ≠ 0 的时候,会将其加入到_heap中,如果当前的 timer 是_heap中第一个,则调用_notifyEventHandler()告诉 event handler在指定的时间唤起 isolate

这里主要的实现是VMLibraryHooks.eventHandlerSendData,他的实现如下:

// -> sdk\lib\_internal\vm\bin\common_patch.dart

("vm:entry-point", "call")
_setupHooks() {
  VMLibraryHooks.eventHandlerSendData = _EventHandler._sendData;
  VMLibraryHooks.timerMillisecondClock = _EventHandler._timerMillisecondClock;
}

// -> sdk\lib\_internal\vm\bin\eventhandler_patch.dart


class _EventHandler {
  
  ("vm:external-name", "EventHandler_SendData")
  external static void _sendData(Object? sender, SendPort sendPort, int data);

  ("vm:external-name", "EventHandler_TimerMillisecondClock")
  external static int _timerMillisecondClock();
}

// -> runtime\bin\eventhandler.cc

/*
 * Send data to the EventHandler thread to register for a given instance
 * args[0] a ReceivePort args[1] with a notification event args[2].
 */
void FUNCTION_NAME(EventHandler_SendData)(Dart_NativeArguments args) {
  // Get the id out of the send port. If the handle is not a send port
  // we will get an error and propagate that out.
  Dart_Handle handle = Dart_GetNativeArgument(args, 1);
  Dart_Port dart_port;
  handle = Dart_SendPortGetId(handle, &dart_port);
  if (Dart_IsError(handle)) {
    Dart_PropagateError(handle);
    UNREACHABLE();
  }
  Dart_Handle sender = Dart_GetNativeArgument(args, 0);
  intptr_t id;
  if (Dart_IsNull(sender)) {
    id = kTimerId;
  } else {
    Socket* socket = Socket::GetSocketIdNativeField(sender);
    ASSERT(dart_port != ILLEGAL_PORT);
    socket->set_port(dart_port);
    socket->Retain();  // inc refcount before sending to the eventhandler.
    id = reinterpret_cast<intptr_t>(socket);
  }
  int64_t data = DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 2));
  event_handler->SendData(id, dart_port, data);
}

他的实现在 native 层的EventHander中名为event handler的子线程中通过异步 IO 执行任务

// ->

class EventHandler {
 public:
  EventHandler() {}
  void SendData(intptr_t id, Dart_Port dart_port, int64_t data) {
    delegate_.SendData(id, dart_port, data);
  }

}

不同的系统实现不同,对于 Android 来说:

// -> runtime\bin\eventhandler_android.cc

void EventHandlerImplementation::SendData(intptr_t id,
                                          Dart_Port dart_port,
                                          int64_t data) {
  WakeupHandler(id, dart_port, data);
}

void EventHandlerImplementation::WakeupHandler(intptr_t id,
                                               Dart_Port dart_port,
                                               int64_t data) {
  InterruptMessage msg;
  msg.id = id;
  msg.dart_port = dart_port;
  msg.data = data;
  // WriteToBlocking will write up to 512 bytes atomically, and since our msg
  // is smaller than 512, we don't need a thread lock.
  // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
  ASSERT(kInterruptMessageSize < PIPE_BUF);
  intptr_t result =
      FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
  if (result != kInterruptMessageSize) {
    if (result == -1) {
      perror("Interrupt message failure:");
    }
    FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
  }
}

然后系统会在时间到了之后,会调用EventHandlerImplementation::HandleEvents通过_send_port发送消息,并触发_receivePort的 hander 也就是_handleMessage方法处理消息。

_handleMessage

无论是_milliSeconds == 0 的时候_sendPort!.send(_ZERO_EVENT);,还是_milliSeconds != 0 通过EventHandler发送_TIMEOUT_EVENT 消息,最终都会使用_handleMessage处理消息:

// -> sdk\lib\_internal\vm\lib\timer_impl.dart

static void _handleMessage(msg) {
    List<_Timer> pendingTimers;
    if (msg == _ZERO_EVENT) {
      pendingTimers = _queueFromZeroEvent();
      assert(pendingTimers.length > 0);
    } else {
      assert(msg == _TIMEOUT_EVENT);
      _scheduledWakeupTime = 0; // Consumed the last scheduled wakeup now.
      pendingTimers = _queueFromTimeoutEvent();
    }
    _runTimers(pendingTimers);
    // Notify the event handler or shutdown the port if no more pending
    // timers are present.
    _notifyEventHandler();
  }

_handleMessage中按照 msg 的类型取出对应的pendingTimers然后再_runTimers中执行,在执行完毕或者遇到错误时,调用_notifyEventHandler()通知event handler或者关闭TimerHandler

// -> sdk\lib\_internal\vm\lib\timer_impl.dart

static void _runTimers(List<_Timer> pendingTimers) {
    // If there are no pending timers currently reset the id space before we
    // have a chance to enqueue new timers.
    if (_heap.isEmpty && (_firstZeroTimer == null)) {
      _idCount = 0;
    }

    // Fast exit if no pending timers.
    if (pendingTimers.length == 0) {
      return;
    }

    // Trigger all of the pending timers. New timers added as part of the
    // callbacks will be enqueued now and notified in the next spin at the
    // earliest.
    _handlingCallbacks = true;
    var i = 0;
    try {
			// 在这里遍历处理所有的 pendingTimers
      for (; i < pendingTimers.length; i++) {
        // Next pending timer.
        var timer = pendingTimers[i];
        timer._indexOrNext = null;

        // One of the timers in the pending_timers list can cancel
        // one of the later timers which will set the callback to
        // null. Or the pending zero timer has been canceled earlier.
        var callback = timer._callback;
        if (callback != null) {
          if (!timer._repeating) {
            // Mark timer as inactive.
            timer._callback = null;
          } else if (timer._milliSeconds > 0) {
            var ms = timer._milliSeconds;
            int overdue =
                VMLibraryHooks.timerMillisecondClock() - timer._wakeupTime;
            if (overdue > ms) {
              int missedTicks = overdue ~/ ms;
              timer._wakeupTime += missedTicks * ms;
              timer._tick += missedTicks;
            }
          }
          timer._tick += 1;

          callback(timer);
          // Re-insert repeating timer if not canceled.
          if (timer._repeating && (timer._callback != null)) {
            timer._advanceWakeupTime();
            timer._enqueue();
          }
					// 每次执行完 event 之后,都要执行没有被执行的 micro task
          // Execute pending micro tasks.
          _runPendingImmediateCallback();
        }
      }
    } finally {
      _handlingCallbacks = false;
      // Re-queue timers we didn't get to.
      for (i++; i < pendingTimers.length; i++) {
        var timer = pendingTimers[i];
        timer._enqueue();
      }
      _notifyEventHandler();
    }
  }

这里可以看到,这里依次遍历传入的 pendingTimers,并在每次执行完 event 后,去检查执行一下 micro task。


根据创建 Timer 的时候_milliSeconds 是否等于 0:会分别使用MessageHandler执行或者在名为 event handler 的 IO 线程通过isolate中的MessageHandler来执行任务;最后都会触发 Timer 的_handleMessage方法在_runTimers方法中执行 callback。

结论

综上,dart 中的方法总共有 3 种,按照优先级从前到后依次是:

  1. 普通的同步方法
  2. micro task
  3. 其他 event:部分 Future、Timer、点击事件、屏幕刷新等

在方法执行的时候:

  1. 先执行完毕所有的同步方法;
  2. 然后判断是否有 micro task,有的话就立即执行;
  3. 否则,就执行普通的 event,每次执行完一个 event 就执行一次步骤 2;
  4. 直到当前 app 中既没有 micro task 也没有普通的 event,退出 app。

参考资料

dart sdkopen in new window

The Event Loop and Dartopen in new window

Flutter 之 Future 原理解析 - 掘金open in new window

Flutter 之 Timer 原理解析 - 掘金open in new window

Dart 官网open in new window

文章标题:《Dart event loop》
本文作者: JI,XIAOYONG
发布时间: 2022/06/11 18:25:29 UTC+8
更新时间: 2023/12/30 16:17:02 UTC+8
written by human, not by AI
本文地址: https://jixiaoyong.github.io/blog/posts/db62c118.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 许可协议。转载请注明出处!
你认为这篇文章怎么样?
  • 0
  • 0
  • 0
  • 0
  • 0
  • 0
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.8