Dart Isolate 源码分析
Isolate
💡 本文基于 Dart 2.17.1
Isolate, an isolated Dart execution context.
All Dart code runs in an isolate, and code can access classes and values only from the same isolate. Different isolates can communicate by sending values through ports (see ReceivePort, SendPort).
In Dart an isolate has its own event loop, its own global fields, can run in parallel with other isolates and have their own live-cycle.
— https://github.com/dart-lang/sdk/issues/36097#issuecomment-746510375
The new isolate has its own memory and its own thread working in parallel with the main isolate.
https://www.youtube.com/watch?v=NoVYI94MJio&ab_channel=Flutterly
Isolate 创建会占用内存,可以使用IsolateGroup
来解决,并且目前为止 Dart 和 Flutter 都默认支持在使用Isolate.spawn
创建新 Isolate 的时候使用 IsolateGroup(Isolate.spwanUri
创建的时候会创建单独的 IsolateGroup 和 Isolate)。
💡 在创建 isolate 的时候可以添加
addOnExitListener
或者addErrorListener
之类的监听,但是可能在执行添加代码的时候isolate 就已经终止了而导致这些方法收不到回调。
为了避免这种情况,可以在创建 isolate 的时候指定他的状态为**paused
**。
与 isolate 有关的类有:
Isolate
位置在sdk\lib\isolate\isolate.dart
。主要是Isolate
通用方法、属性的抽象描述,没有具体实现。Isolate
位置在sdk\lib\_internal\vm\lib\isolate_patch.dart
,是 app 等平台对应的具体实现,部分方法调用了 native 层的 Isolate 实现。Isolate
位置在runtime\vm\isolate.h
以及runtime\vm\isolate.cc
中,是 Isolate 的 native 层实现。
他们的关系大致如图:
简单使用
创建新 Isolate 的方式:
Isolate(``SendPort controlPort``, {this.pauseCapability, this.terminateCapability});
这种方式创建一种能力受限的 Isolate。The capabilities should be the subset of the capabilities that are available to the original isolate.本质上并没有在 native 层孵化一个新的 Isolate。
var newIsolate = Isolate(Isolate.current.controlPort);
newIsolate.addOnExitListener(Isolate.current.controlPort);
newIsolate.addErrorListener(Isolate.current.controlPort);
Future.delayed(Duration(seconds: 1),(){
newIsolate.kill();
print("try kill new isolate"); // after this,the dart code finish
});
print("finish");
Isolate.spawn(``void entryPoint(T message), T message,...)
创建一个和当前 Isolate共享同一份代码的 Isolate,并执行 entryPoint 方法,一般在 message 中传入 SendPort 以便从 entryPoint 中向来时的 Isolate 发送消息,新建的 Isolate 和当前 Isolate 在同一个 IsolateGroup 中。
void spawnIsolate() {
var receivePort = ReceivePort();
receivePort.listen((message) {
print("receivePort(${Isolate.current.debugName}) received msg: $message");
});
//创建一个和当前的 isolate 共享同一份代码的 Isolate
var isolate = Isolate.spawn((message) {
print("Isolate initial function(${Isolate.current.debugName}) received msg: $message");
(message as SendPort).send("HELLO_FORM_ISOLATE(${Isolate.current.debugName})");
}, receivePort.sendPort,debugName: "another_isolate");
}
Isolate.spawnUri(Uri uri,List<String> args,var message,...)
* 指定的uri
中创建并孵化一个 isolate,执行 uri 对应的 library 中的main
方法(0~2 个入参),并传入无参、args 或 message 作为参数
var receivePort = ReceivePort();
receivePort.listen((message) {
print("receivePort(${Isolate.current.debugName}) received msg: $message");
});
// 创建一个和当前的 isolate 共享同一份代码的 Isolate
var isolate = await Isolate.spawnUri(
Uri.file(
r"E:\workspace\others\flutter_dart_source_code_analysis\lib\dart\another_dart_file_to_spawn_uri.dart"),
[],
receivePort.sendPort,
debugName: "another_isolate");
Future.delayed(const Duration(seconds: 2), () {
receivePort.close();
isolate.kill(priority: Isolate.immediate);
print("try kill new isolate");
});
使用方法
pause
Capability pause([Capability? resumeCapability])
,暂停 Isolate,停止从*event loop queue
* 中取(并处理)消息,但是依然可以往里面加入消息
resumeCapability
是用来区分 pause 的,必须使用同一个*resumeCapability
*来 resume isolate。
- 使用同一个*
resumeCapability
*多次pause
,只需一次resume
就可以恢复isolate
- 使用不同*
resumeCapability
多次pause
,必须使用对应的resumeCapability
依次resume
才可以恢复isolate
(注意:这里也只需要使用当时 pause isolate 的resumeCapability
* 依次调用 resume 即可,而不用保持次数一致,比如,有 2 个*resumeCapability
,*调用 pause 次数分别为 a 1,b 2,那么要想 resume isolate,也只需要分别使用 a,b 调用一次 resume 即可)
ping
使用 isolate 往receivePort.sendPort
发送 response 消息,即使 isolate 当前被 pause 也可以正常发送
isolate.pause();
isolate.ping(receivePort.sendPort, response: "is isolate resume?");//receivePort 依然可以收到消息
ping 可以正常发送的原因是:
// -> lib\isolate\isolate.dart
// ping 方法是一个 external 方法
external void ping(SendPort responsePort,
{Object? response, int priority = immediate});
// -> sdk/lib/_internal/vm/lib/isolate_patch.dart
@patch
void ping(SendPort responsePort,
{Object? response, int priority: immediate}) {
var msg = new List<Object?>.filled(5, null)
..[0] = 0 // Make room for OOM message type.
..[1] = _PING
..[2] = responsePort
..[3] = priority
..[4] = response;
_sendOOB(controlPort, msg);
}
@pragma("vm:external-name", "Isolate_sendOOB")
external static void _sendOOB(port, msg);
// -> runtime/lib/isolate.cc
// 创建了一个 oob 消息并压入 oob_queue_
DEFINE_NATIVE_ENTRY(Isolate_sendOOB, 0, 2) {
GET_NON_NULL_NATIVE_ARGUMENT(SendPort, port, arguments->NativeArgAt(0));
GET_NON_NULL_NATIVE_ARGUMENT(Array, msg, arguments->NativeArgAt(1));
// Make sure to route this request to the isolate library OOB mesage handler.
msg.SetAt(0, Smi::Handle(Smi::New(Message::kIsolateLibOOBMsg)));
// Ensure message writer (and it's resources, e.g. forwarding tables) are
// cleaned up before handling interrupts.
{
PortMap::PostMessage(WriteMessage(/* can_send_any_object */ false,
/* same_group */ false, msg, port.Id(),
Message::kOOBPriority));
}
// Drain interrupts before running so any IMMEDIATE operations on the current
// isolate happen synchronously.
const Error& error = Error::Handle(thread->HandleInterrupts());
if (!error.IsNull()) {
Exceptions::PropagateError(error);
UNREACHABLE();
}
return Object::null();
}
在 MessageHandler 中有两种 MessageQueue:oob_queue_
和queue_
,前者优先级高,即使 isolate 被 pause 也会执行
// -> runtime\vm\message_handler.h
// 普通消息,暂停时不能处理
MessageQueue* queue_;
// 优先消息,即使处理消息时,优先处理 obb_queue 消息,如果为空再去考虑处理普通消息
// 即使 isolate 被 pause 也可以被处理
MessageQueue* oob_queue_;
像是ping
/kill
/pause
/addOnExitListener
/removeOnExitListener
这些指令消息都是压入到obb_queue_
中优先处理的。
源码分析
先看一下常用的几个方法是怎么实现的。
获取当前 Isolate
(sdk/lib/isolate/isolate.dart)Isolate.current
→
(sdk/lib/_internal/vm/lib/isolate_patch.dart) Isolate get current
→ Isolate._getCurrentIsolate()
→ _getPortAndCapabilitiesOfCurrentIsolate()
(runtime/lib/isolate.cc)DEFINE_NATIVE_ENTRY(Isolate_getPortAndCapabilitiesOfCurrentIsolate, 0, 0)
先看一下sdk/lib/_internal/vm/lib/isolate_patch.dart中的实现:
// -> sdk/lib/_internal/vm/lib/isolate_patch.dart
static final _currentIsolate = _getCurrentIsolate();
@patch
static Isolate get current => _currentIsolate;
static Isolate _getCurrentIsolate() {
List portAndCapabilities = _getPortAndCapabilitiesOfCurrentIsolate();
// 这里的参数分别是 SendPort,Capability,Capability
return new Isolate(portAndCapabilities[0],
pauseCapability: portAndCapabilities[1],
terminateCapability: portAndCapabilities[2]);
}
@pragma("vm:external-name", "Isolate_getPortAndCapabilitiesOfCurrentIsolate")
external static List _getPortAndCapabilitiesOfCurrentIsolate();
可以看到,最后是根据 native 端返回的信息,新建了一个 Isolate 引用,但是因为_currentIsolate
是static final
的,所以只会被调用一次,确保了在 Dart SDK 中调用Isolate.current
时获取的是当前唯一的 Isolate。
让我们看一下在 native 中是如何找到当前的 Isolate 的:
// -> \runtime\lib\isolate.cc
DEFINE_NATIVE_ENTRY(Isolate_getPortAndCapabilitiesOfCurrentIsolate, 0, 0) {
const Array& result = Array::Handle(Array::New(3));
result.SetAt(0, SendPort::Handle(SendPort::New(isolate->main_port())));
result.SetAt(
1, Capability::Handle(Capability::New(isolate->pause_capability())));
result.SetAt(
2, Capability::Handle(Capability::New(isolate->terminate_capability())));
return result.ptr();
}
可见是直接取的当前线程对应的 isolate对应的值,经过包装再返回到调用方。
创建 Isolate
在 Dart 中创建 Isolate 有 3 种方式:
Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability});
*create an* isolate,本质上只是将controlPort
等设置为传入的对象,并没有在 native 层新建 IsolateIsolate.spawn
create and spawns an isolateIsolate.spawnUri
create and spawns an isolate
这里分析一下后面两种方式,对比一下差异:
// -> sdk\lib\isolate\isolate.dart
// Creates a new [Isolate] object with a restricted set of capabilities.
Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability});
/// Creates and spawns an isolate that shares the same code as the current
/// isolate.
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? onError,
@Since("2.3") String? debugName});
/// Creates and spawns an isolate that runs the code from the library with
/// the specified URI.
///
/// The isolate starts executing the top-level `main` function of the library
/// with the given URI.
external static Future<Isolate> spawnUri(
Uri uri,
List<String> args,
var message,
{bool paused = false,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal = true,
bool? checked,
Map<String, String>? environment,
@Deprecated('The packages/ dir is not supported in Dart 2')
Uri? packageRoot,
Uri? packageConfig,
bool automaticPackageResolution = false,
@Since("2.3")
String? debugName});
对于 APP 等来说,上述Isolate.spawn
和Isolate.spawnUri
的实现都在vm
下面的isolate_patch.dart
中(js 会返回_unsupported()
):
// -> sdk\lib\_internal\vm\lib\isolate_patch.dart
@patch
static Future<Isolate> spawn<T>(void entryPoint(T message), T message,
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? onError,
String? debugName}) async {
// `paused` isn't handled yet.
// Check for the type of `entryPoint` on the spawning isolate to make
// error-handling easier.
if (entryPoint is! _UnaryFunction) {
throw new ArgumentError(entryPoint);
}
// The VM will invoke [_startIsolate] with entryPoint as argument.
// We do not inherit the package config settings from the parent isolate,
// instead we use the values that were set on the command line.
...
final RawReceivePort readyPort =
new RawReceivePort(null, 'Isolate.spawn ready');
try {
**_spawnFunction**(readyPort.sendPort, script.toString(), entryPoint, message,
paused, errorsAreFatal, onExit, onError, packageConfig, debugName);
return await **_spawnCommon**(readyPort);
} catch (e, st) {
readyPort.close();
return await new Future<Isolate>.error(e, st);
}
}
@patch
static Future<Isolate> spawnUri(Uri uri, List<String> args, var message,
{bool paused = false,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal = true,
bool? checked,
Map<String, String>? environment,
Uri? packageRoot,
Uri? packageConfig,
bool automaticPackageResolution = false,
String? debugName}) async {
// Verify that no mutually exclusive arguments have been passed.
...
// Resolve the uri against the current isolate's root Uri first.
...
// The VM will invoke [_startIsolate] and not `main`.
final packageConfigString = packageConfig?.toString();
final RawReceivePort readyPort =
new RawReceivePort(null, 'Isolate.spawnUri ready');
try {
**_spawnUri**(
readyPort.sendPort,
spawnedUri.toString(),
args,
message,
paused,
onExit,
onError,
errorsAreFatal,
checked,
null,
/* environment */
packageConfigString,
debugName);
return await **_spawnCommon**(readyPort);
} catch (e) {
readyPort.close();
rethrow;
}
}
// Isolate.spawn call
@pragma("vm:external-name", "Isolate_spawnFunction")
external static void _spawnFunction(
SendPort readyPort,
String uri,
Function topLevelFunction,
var message,
bool paused,
bool errorsAreFatal,
SendPort? onExit,
SendPort? onError,
String? packageConfig,
String? debugName);
// Isolate.spawnUri call
@pragma("vm:external-name", "Isolate_spawnUri")
external static void _spawnUri(
SendPort readyPort,
String uri,
List<String> args,
var message,
bool paused,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal,
bool? checked,
List? environment,
String? packageConfig,
String? debugName);
// 监听 Isolate spawn 状态,等成功之后将其处理后返回给 Dart 层的调用者
static Future<Isolate> _spawnCommon(RawReceivePort readyPort) {
final completer = new Completer<Isolate>.sync();
readyPort.handler = (readyMessage) {
readyPort.close();
if (readyMessage is List && readyMessage.length == 2) {
SendPort controlPort = readyMessage[0];
List capabilities = readyMessage[1];
**completer.complete(new Isolate(controlPort,
pauseCapability: capabilities[0],
terminateCapability: capabilities[1]));**
} else if (readyMessage is String) {
// We encountered an error while starting the new isolate.
completer.completeError(new IsolateSpawnException(
'Unable to spawn isolate: ${readyMessage}'));
} else {
// This shouldn't happen.
completer.completeError(new IsolateSpawnException(
"Internal error: unexpected format for ready message: "
"'${readyMessage}'"));
}
};
return completer.future;
}
其实,根据上述的代码,不管是Isolate.spawnUri()
还是Isolate.spawn
,都是先调用RawReceivePort
获取RawReceivePort readyPort
,最后都是调用了_spawnCommon(readyPort)
方法,最终通过new Isolate(controlPort, pauseCapability: capabilities[0], terminateCapability: capabilities[1])
方法创建了新的Isolate
。
这个方法的定义在sdk/lib/isolate/isolate.dart
中:
// -> sdk/lib/isolate/isolate.dart
final SendPort controlPort;
Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability});
可以看到,在 Dart 中,我们拿到的 Isolate 主要是持有一个和 native 中对应 SendPort。
通过上面的分析:
Isolate.spawn
最后调用了_spawnFunction
方法(native 层实现为Isolate_spawnFunction
);Isolate.spawnUri
最后调用了_spawnUri
方法(native 层实现为Isolate_spawnUri
)。
💡
new RawReceivePort()
方法主要是创建一个不存在于_RawReceivePortImpl
的static final _portMap = <int, Map<String, dynamic>>{};
中的 SendPort(具体实现在PortMap::CreatePort
中)。
Isolate_spawnFunction
Isolate.spawn
最后调用了_spawnFunction
方法,来看一下对应的Isolate_spawnFunction
的实现:
// -> runtime\lib\isolate.cc
DEFINE_NATIVE_ENTRY(Isolate_spawnFunction, 0, 10) {
// 解析参数
...
// closure_tuple_handle 对应我们在 Dart 中 Isolate.spawn() 中传入的 entryPoint
// 也就是 isolate 创建好以后执行的方法
std::unique_ptr<IsolateSpawnState> state(new IsolateSpawnState(
port.Id(), isolate->origin_id(), String2UTF8(script_uri),
closure_tuple_handle, &message_buffer, utf8_package_config,
paused.value(), fatal_errors, on_exit_port, on_error_port,
utf8_debug_name, **isolate->group()**));
// Since this is a call to Isolate.spawn, copy the parent isolate's code.
state->isolate_flags()->copy_parent_code = true;
**isolate->group()**->thread_pool()->Run<SpawnIsolateTask>(isolate,
std::move(state));
return Object::null();
}
可见,Isolate_spawnFunction
方法中主要还是解析收到的各种参数,最后在当前 isolate 对应的 IsolateGroup 的线程池中执行SpawnIsolateTask
:
SpawnIsolateTask
// -> runtime\lib\isolate.cc
class SpawnIsolateTask : public ThreadPool::Task {
SpawnIsolateTask(Isolate* parent_isolate,
std::unique_ptr<IsolateSpawnState> state)
: parent_isolate_(parent_isolate), state_(std::move(state)) {
parent_isolate->IncrementSpawnCount();
}
void Run() override {
const char* name = (state_->debug_name() == nullptr)
? state_->function_name()
: state_->debug_name();
ASSERT(name != nullptr);
auto group = state_->isolate_group();
if (group == nullptr) {
RunHeavyweight(name);
} else {
RunLightweight(name);
}
}
}
RunLightWeight
因为这里我们的isolate→group
不为空,所以走的是RunLightWeight
:
// -> runtime\lib\isolate.cc
void RunLightweight(const char* name) {
// The create isolate initialize callback is mandatory.
auto initialize_callback = **Isolate::InitializeCallback();**
if (initialize_callback == nullptr) {
FailedSpawn(
"Lightweight isolate spawn is not supported by this Dart embedder\n",
/*has_current_isolate=*/false);
return;
}
char* error = nullptr;
auto group = state_->isolate_group();
**Isolate* isolate = CreateWithinExistingIsolateGroup(group, name, &error);**
parent_isolate_->DecrementSpawnCount();
parent_isolate_ = nullptr;
if (isolate == nullptr) {
FailedSpawn(error, /*has_current_isolate=*/false);
free(error);
return;
}
void* child_isolate_data = nullptr;
**const bool success = initialize_callback(&child_isolate_data, &error);**
if (!success) {
FailedSpawn(error);
Dart_ShutdownIsolate();
free(error);
return;
}
**isolate->set_init_callback_data(child_isolate_data);
// 注意这里的 Run 方法,在**RunHeavyweight 方法的最后也调用了
// 到时候会一起分析一下
**Run(isolate);**
}
// -> runtime\vm\dart_api_impl.cc
Isolate* CreateWithinExistingIsolateGroup(IsolateGroup* group,
const char* name,
char** error) {
API_TIMELINE_DURATION(Thread::Current());
CHECK_NO_ISOLATE(Isolate::Current());
auto spawning_group = group;
**Isolate* isolate =** reinterpret_cast<Isolate*>(
**CreateIsolate**(spawning_group, /*is_new_group=*/false, name,
/*isolate_data=*/nullptr, error));
if (isolate == nullptr) return nullptr;
// 因为执行到这里的都有 IsolateGroup,共享同一份代码
auto source = spawning_group->source();
ASSERT(isolate->source() == source);
return isolate;
}
这里主要进行了 2 步:
- 使用
CreateWithinExistingIsolateGroup
创建 Isolate - 使用全局的
initialize_callback
(也就是Isolate::InitializeCallback()
)初始化 Isolate
Isolate::InitializeCallback()
这其中的Isolate::InitializeCallback()
是在Dart::Init
的时候就已经设置了的:
// -> runtime/bin/main.cc
void main(int argc, char** argv) {
...
// Initialize the Dart VM.
Dart_InitializeParams init_params;
init_params.version = DART_INITIALIZE_PARAMS_CURRENT_VERSION;
init_params.vm_snapshot_data = vm_snapshot_data;
init_params.vm_snapshot_instructions = vm_snapshot_instructions;
**init_params.create_group = CreateIsolateGroupAndSetup;**
**init_params.initialize_isolate = OnIsolateInitialize;**
init_params.shutdown_isolate = OnIsolateShutdown;
init_params.cleanup_isolate = DeleteIsolateData;
init_params.cleanup_group = DeleteIsolateGroupData;
init_params.file_open = DartUtils::OpenFile;
init_params.file_read = DartUtils::ReadFile;
init_params.file_write = DartUtils::WriteFile;
init_params.file_close = DartUtils::CloseFile;
init_params.entropy_source = DartUtils::EntropySource;
error = Dart_Initialize(&init_params);
}
// -> runtime\vm\dart_api_impl.cc
DART_EXPORT char* Dart_Initialize(Dart_InitializeParams* params) {
if (params == NULL) {
return Utils::StrDup(
"Dart_Initialize: "
"Dart_InitializeParams is null.");
}
if (params->version != DART_INITIALIZE_PARAMS_CURRENT_VERSION) {
return Utils::StrDup(
"Dart_Initialize: "
"Invalid Dart_InitializeParams version.");
}
return Dart::Init(params);
}
// -> runtime\vm\dart.cc
char* Dart::Init(const Dart_InitializeParams* params) {
if (!init_state_.SetInitializing()) {
return Utils::StrDup(
"Bad VM initialization state, "
"already initialized or "
"multiple threads initializing the VM.");
}
char* retval = DartInit(params);
if (retval != NULL) {
init_state_.ResetInitializing();
return retval;
}
init_state_.SetInitialized();
return NULL;
}
char* Dart::DartInit(const Dart_InitializeParams* params) {
...
OSThread::Init();
Zone::Init();
IsolateGroup::Init();
Isolate::InitVM();
PortMap::Init();
Service::Init();
...
Thread::ExitIsolate(); // Unregister the VM isolate from this thread.
**Isolate::SetCreateGroupCallback(params->create_group);**
**Isolate::SetInitializeCallback_(params->initialize_isolate);**
Isolate::SetShutdownCallback(params->shutdown_isolate);
Isolate::SetCleanupCallback(params->cleanup_isolate);
Isolate::SetGroupCleanupCallback(params->cleanup_group);
Isolate::SetRegisterKernelBlobCallback(params->register_kernel_blob);
Isolate::SetUnregisterKernelBlobCallback(params->unregister_kernel_blob);
...
}
也就是说,上文的Isolate::InitializeCallback()
实际上就是OnIsolateInitialize
,它的主要作用就是在 isolate 创建好之后进行统一的初始化操作,绑定一些数据:
// -> runtime\bin\main.cc
static bool OnIsolateInitialize(void** child_callback_data, char** error) {
Dart_Isolate isolate = Dart_CurrentIsolate();
ASSERT(isolate != nullptr);
auto isolate_group_data =
reinterpret_cast<IsolateGroupData*>(Dart_CurrentIsolateGroupData());
auto isolate_data = new IsolateData(isolate_group_data);
*child_callback_data = isolate_data;
Dart_EnterScope();
const auto **script_uri** = isolate_group_data->script_url;
const bool **isolate_run_app_snapshot** =
isolate_group_data->RunFromAppSnapshot();
Dart_Handle **result** = SetupCoreLibraries(isolate, isolate_data,
/*group_start=*/false,
/*resolved_packages_config=*/nullptr);
if (Dart_IsError(result)) goto failed;
if (isolate_run_app_snapshot) {
result = Loader::InitForSnapshot(script_uri, isolate_data);
if (Dart_IsError(result)) goto failed;
} else {
result = DartUtils::ResolveScript(Dart_NewStringFromCString(script_uri));
if (Dart_IsError(result)) goto failed;
if (isolate_group_data->kernel_buffer() != nullptr) {
// Various core-library parts will send requests to the Loader to resolve
// relative URIs and perform other related tasks. We need Loader to be
// initialized for this to work because loading from Kernel binary
// bypasses normal source code loading paths that initialize it.
const char* resolved_script_uri = NULL;
result = Dart_StringToCString(result, &resolved_script_uri);
if (Dart_IsError(result)) goto failed;
result = Loader::InitForSnapshot(resolved_script_uri, isolate_data);
if (Dart_IsError(result)) goto failed;
}
}
Dart_ExitScope();
return true;
failed:
*error = Utils::StrDup(Dart_GetError(result));
Dart_ExitScope();
return false;
}
CreateWithinExistingIsolateGroup
CreateWithinExistingIsolateGroup
→ CreateIsolate
再看一下创建 Isolate 的具体方法,这个在不同的 device 上面不一样,我们只关注 vm 下面的实现:
// -> runtime\vm\dart_api_impl.cc
static Dart_Isolate CreateIsolate(IsolateGroup* group,
bool is_new_group,
const char* name,
void* isolate_data,
char** error) {
**CHECK_NO_ISOLATE**(Isolate::Current());
auto source = group->source();
**Isolate* I = Dart::CreateIsolate(name, source->flags, group);**
if (I == NULL) {
if (error != NULL) {
*error = Utils::StrDup("Isolate creation failed");
}
return reinterpret_cast<Dart_Isolate>(NULL);
}
Thread* T = Thread::Current();
bool success = false;
{
StackZone zone(T);
// We enter an API scope here as InitializeIsolate could compile some
// bootstrap library files which call out to a tag handler that may create
// Api Handles when an error is encountered.
T->EnterApiScope();
const Error& error_obj = Error::Handle(
Z, **Dart::InitializeIsolate(
source->snapshot_data, source->snapshot_instructions,
source->kernel_buffer, source->kernel_buffer_size,
is_new_group ? nullptr : group, isolate_data)**);
if (error_obj.IsNull()) {
#if defined(DEBUG) && !defined(DART_PRECOMPILED_RUNTIME)
if (FLAG_check_function_fingerprints && !FLAG_precompiled_mode) {
Library::CheckFunctionFingerprints();
}
#endif // defined(DEBUG) && !defined(DART_PRECOMPILED_RUNTIME).
success = true;
} else if (error != NULL) {
*error = Utils::StrDup(error_obj.ToErrorCString());
}
// We exit the API scope entered above.
T->ExitApiScope();
}
if (success) {
if (is_new_group) {
group->heap()->InitGrowthControl();
}
// A Thread structure has been associated to the thread, we do the
// safepoint transition explicitly here instead of using the
// TransitionXXX scope objects as the reverse transition happens
// outside this scope in Dart_ShutdownIsolate/Dart_ExitIsolate.
T->set_execution_state(Thread::kThreadInNative);
T->EnterSafepoint();
if (error != NULL) {
*error = NULL;
}
return Api::CastIsolate(I);
}
Dart::ShutdownIsolate();
return reinterpret_cast<Dart_Isolate>(NULL);
}
这里主要有两步:
Dart::CreateIsolate
创建了Isolate* I
;- 然后调用
Dart::InitializeIsolate
初始化 isolate。
Dart::CreateIsolate:
// -> runtime\vm\dart.cc
Isolate* Dart::CreateIsolate(const char* name_prefix,
const Dart_IsolateFlags& api_flags,
IsolateGroup* isolate_group) {
// Create a new isolate.
Isolate* isolate =
Isolate::InitIsolate(name_prefix, isolate_group, api_flags);
return isolate;
}
💡 在 Dart 虚拟机启动(
Dart::DartInit
)的时候,也会调用Dart::InitIsolate
创建虚拟机对应的 Isolate,执行 UI 操作:vm_isolate_ = Isolate::InitIsolate(kVmIsolateName, group, api_flags, is_vm_isolate);
在Isolate::InitIsolate
方法中,先是用isolate_group
创建了新的 Isolate,然后将其与Thread
,MessageHandler
,SendPort
等绑定:
// -> runtime\vm\isolate.cc
Isolate* Isolate::InitIsolate(const char* name_prefix,
IsolateGroup* isolate_group,
const Dart_IsolateFlags& api_flags,
bool is_vm_isolate) {
// 创建新的 Isolate
**Isolate* result = new Isolate(isolate_group, api_flags);**
result->BuildName(name_prefix);
if (!is_vm_isolate) {
// vm isolate object store is initialized later, after null instance
// is created (in Dart::Init).
// Non-vm isolates need to have isolate object store initialized is that
// exit_listeners have to be null-initialized as they will be used if
// we fail to create isolate below, have to do low level shutdown.
ASSERT(result->group()->object_store() != nullptr);
result->isolate_object_store()->Init();
}
ASSERT(result != nullptr);
#if !defined(PRODUCT)
// Initialize metrics.
#define ISOLATE_METRIC_INIT(type, variable, name, unit) \
result->metric_##variable##_.InitInstance(result, name, NULL, Metric::unit);
ISOLATE_METRIC_LIST(ISOLATE_METRIC_INIT);
#undef ISOLATE_METRIC_INIT
#endif // !defined(PRODUCT)
// First we ensure we enter the isolate. This will ensure we're participating
// in any safepointing requests from this point on. Other threads requesting a
// safepoint operation will therefore wait until we've stopped.
//
// Though the [result] isolate is still in a state where no memory has been
// allocated, which means it's safe to GC the isolate group until here.
// 创建一个 Thread 并和当前 isolate 绑定
if (!**Thread::EnterIsolate(result)**) {
delete result;
return nullptr;
}
// Setup the isolate message handler.
MessageHandler* handler = new IsolateMessageHandler(result);
ASSERT(handler != nullptr);
// 在这里绑定了 message handler
**result->set_message_handler(handler);**
**result->set_main_port(PortMap::CreatePort(result->message_handler()));**
#if defined(DEBUG)
// Verify that we are never reusing a live origin id.
VerifyOriginId id_verifier(result->main_port());
Isolate::VisitIsolates(&id_verifier);
#endif
result->set_origin_id(result->main_port());
result->set_pause_capability(result->random()->NextUInt64());
result->set_terminate_capability(result->random()->NextUInt64());
#if !defined(PRODUCT)
result->debugger_ = new Debugger(result);
#endif
// Now we register the isolate in the group. From this point on any GC would
// traverse the isolate roots (before this point, the roots are only pointing
// to vm-isolate objects, e.g. null)
**isolate_group->RegisterIsolate(result);**
if (ServiceIsolate::NameEquals(name_prefix)) {
ASSERT(!ServiceIsolate::Exists());
ServiceIsolate::SetServiceIsolate(result);
#if !defined(DART_PRECOMPILED_RUNTIME)
} else if (KernelIsolate::NameEquals(name_prefix)) {
ASSERT(!KernelIsolate::Exists());
KernelIsolate::SetKernelIsolate(result);
#endif // !defined(DART_PRECOMPILED_RUNTIME)
}
if (FLAG_trace_isolates) {
if (name_prefix == nullptr || strcmp(name_prefix, "vm-isolate") != 0) {
OS::PrintErr(
"[+] Starting isolate:\n"
"\tisolate: %s\n",
result->name());
}
}
// Add to isolate list. Shutdown and delete the isolate on failure.
if (!TryMarkIsolateReady(result)) {
result->LowLevelShutdown();
Isolate::LowLevelCleanup(result);
return nullptr;
}
return result;
}
Dart::InitializeIsolate
这里主要是对 isolate 进行初始化,并在初始化完成后通知创建这个 isolate 的 isolate。
// -> runtime\vm\dart.cc
ErrorPtr Dart::InitializeIsolate(const uint8_t* snapshot_data,
const uint8_t* snapshot_instructions,
const uint8_t* kernel_buffer,
intptr_t kernel_buffer_size,
IsolateGroup* source_isolate_group,
void* isolate_data) {
// Initialize the new isolate.
Thread* T = Thread::Current();
Isolate* I = T->isolate();
auto IG = T->isolate_group();
#if defined(SUPPORT_TIMELINE)
TimelineBeginEndScope tbes(T, Timeline::GetIsolateStream(),
"InitializeIsolate");
tbes.SetNumArguments(1);
tbes.CopyArgument(0, "isolateName", I->name());
#endif
ASSERT(I != NULL);
StackZone zone(T);
HandleScope handle_scope(T);
bool was_child_cloned_into_existing_isolate = false;
if (source_isolate_group != nullptr) {
// If a static field gets registered in [IsolateGroup::RegisterStaticField]:
//
// * before this block it will ignore this isolate. The [Clone] of the
// initial field table will pick up the new value.
// * after this block it will add the new static field to this isolate.
{
SafepointReadRwLocker reader(T, source_isolate_group->program_lock());
**I->set_field_table**(T,
source_isolate_group->initial_field_table()->Clone(I));
**I->field_table()->MarkReadyToUse();**
}
was_child_cloned_into_existing_isolate = true;
} else {
const Error& error = Error::Handle(
// 从 IsolateGroup 中引用一些通用的变量(常量等等)
**InitIsolateFromSnapshot**(T, I, snapshot_data, snapshot_instructions,
kernel_buffer, kernel_buffer_size));
if (!error.IsNull()) {
return error.ptr();
}
}
Object::VerifyBuiltinVtables();
if (T->isolate()->origin_id() == 0) {
DEBUG_ONLY(IG->heap()->Verify(kForbidMarked));
}
#if defined(DART_PRECOMPILED_RUNTIME)
const bool kIsAotRuntime = true;
#else
const bool kIsAotRuntime = false;
#endif
if (kIsAotRuntime || was_child_cloned_into_existing_isolate) {
#if !defined(TARGET_ARCH_IA32)
ASSERT(IG->object_store()->build_generic_method_extractor_code() !=
Code::null());
ASSERT(IG->object_store()->build_nongeneric_method_extractor_code() !=
Code::null());
#endif
} else {
#if !defined(TARGET_ARCH_IA32)
if (I != Dart::vm_isolate()) {
if (IG->object_store()->build_generic_method_extractor_code() !=
nullptr) {
SafepointWriteRwLocker ml(T, IG->program_lock());
if (IG->object_store()->build_generic_method_extractor_code() !=
nullptr) {
IG->object_store()->set_build_generic_method_extractor_code(
Code::Handle(
StubCode::GetBuildGenericMethodExtractorStub(nullptr)));
}
}
if (IG->object_store()->build_nongeneric_method_extractor_code() !=
nullptr) {
SafepointWriteRwLocker ml(T, IG->program_lock());
if (IG->object_store()->build_nongeneric_method_extractor_code() !=
nullptr) {
IG->object_store()->set_build_nongeneric_method_extractor_code(
Code::Handle(
StubCode::GetBuildNonGenericMethodExtractorStub(nullptr)));
}
}
}
#endif // !defined(TARGET_ARCH_IA32)
}
I->set_ic_miss_code(StubCode::SwitchableCallMiss());
Error& error = Error::Handle();
if (snapshot_data == nullptr || kernel_buffer != nullptr) {
error ^= IG->object_store()->PreallocateObjects();
if (!error.IsNull()) {
return error.ptr();
}
}
const auto& out_of_memory =
Object::Handle(IG->object_store()->out_of_memory());
error ^= I->isolate_object_store()->PreallocateObjects(out_of_memory);
if (!error.IsNull()) {
return error.ptr();
}
if (!was_child_cloned_into_existing_isolate) {
IG->heap()->InitGrowthControl();
}
I->set_init_callback_data(isolate_data);
if (FLAG_print_class_table) {
IG->class_table()->Print();
}
#if !defined(PRODUCT)
ServiceIsolate::MaybeMakeServiceIsolate(I);
if (!Isolate::IsSystemIsolate(I)) {
I->message_handler()->set_should_pause_on_start(
FLAG_pause_isolates_on_start);
I->message_handler()->set_should_pause_on_exit(FLAG_pause_isolates_on_exit);
}
#endif // !defined(PRODUCT)
ServiceIsolate::SendIsolateStartupMessage();
#if !defined(PRODUCT)
I->debugger()->NotifyIsolateCreated();
#endif
// Create tag table.
I->set_tag_table(GrowableObjectArray::Handle(GrowableObjectArray::New()));
// Set up default UserTag.
const UserTag& default_tag = UserTag::Handle(UserTag::DefaultTag());
I->set_current_tag(default_tag);
I->init_loaded_prefixes_set_storage();
return Error::null();
}
可以看到,如果是调用Isolate.spawn()
的话,先从当前 isolate 获取对应的 Isolate Group,然后使用这个 Isolate Group 创建配置一个新的 isolate,这样在同一个 isolate group 中的 Isolate 可以共享常量,heap 等。
Isolate_spawnUri
如果是使用Isolate.spawnUri()
的话,就会通过Isolate_spawnUri
来创建 isolate。
// -> runtime\lib\isolate.cc
DEFINE_NATIVE_ENTRY(Isolate_spawnUri, 0, 12) {
// 解析参数
...
// Canonicalize the uri with respect to the current isolate.
const Library& root_lib =
Library::Handle(isolate->group()->object_store()->root_library());
char* error = NULL;
// 获取 canonical_uri
const char* **canonical_uri = CanonicalizeUri**(thread, root_lib, uri, &error);
if (canonical_uri == NULL) {
const String& msg = String::Handle(String::New(error));
ThrowIsolateSpawnException(msg);
}
const char* utf8_package_config =
packageConfig.IsNull() ? NULL : String2UTF8(packageConfig);
const char* utf8_debug_name =
debugName.IsNull() ? NULL : String2UTF8(debugName);
std::unique_ptr<IsolateSpawnState> state(new **IsolateSpawnState**(
port.Id(), canonical_uri, utf8_package_config, &arguments_buffer,
&message_buffer, paused.value(), fatal_errors, on_exit_port,
// 注意下面这里的 group=nullptr
on_error_port, utf8_debug_name, **/*group=*/nullptr**));
// If we were passed a value then override the default flags state for
// checked mode.
if (!checked.IsNull()) {
Dart_IsolateFlags* flags = state->isolate_flags();
flags->enable_asserts = checked.value();
}
// Since this is a call to Isolate.spawnUri, don't copy the parent's code.
state->isolate_flags()->copy_parent_code = false;
isolate->group()->thread_pool()->**Run<SpawnIsolateTask>**(isolate,
std::move(state));
return Object::null();
}
可以看到Isolate_spawnUri
还是执行了SpawnIsolateTask
。
SpawnIsolateTask
在SpawnIsolateTask.Run
方法中,因为spawnUri
中IsolateSpawnState
的IsolateGroup
为nulltrp
,所以这里执行的是RunHeavyweight(name)
:
RunHeavyweight
// ->
class SpawnIsolateTask : public ThreadPool::Task {
void Run() override {
const char* name = (state_->debug_name() == nullptr)
? state_->function_name()
: state_->debug_name();
ASSERT(name != nullptr);
auto group = state_->isolate_group();
if (group == nullptr) {
**RunHeavyweight(name);**
} else {
RunLightweight(name);
}
}
void RunHeavyweight(const char* name) {
// The create isolate group callback is mandatory. If not provided we
// cannot spawn isolates.
// 在 Dart::DartInit 中已经被设置,在 Isolate 创建时会被回调
**auto create_group_callback = Isolate::CreateGroupCallback();**
if (create_group_callback == nullptr) {
FailedSpawn("Isolate spawn is not supported by this Dart embedder\n");
return;
}
char* error = nullptr;
// Make a copy of the state's isolate flags and hand it to the callback.
Dart_IsolateFlags api_flags = *(state_->isolate_flags());
api_flags.is_system_isolate = false;
// 创建 isolate
**Dart_Isolate isolate =
(create_group_callback)(state_->script_url(), name, nullptr,
state_->package_config(), &api_flags,
parent_isolate_->init_callback_data(), &error);**
parent_isolate_->DecrementSpawnCount();
parent_isolate_ = nullptr;
if (isolate == nullptr) {
FailedSpawn(error, /*has_current_isolate=*/false);
free(error);
return;
}
// 切换到指定的 isolate
Dart_EnterIsolate(isolate);
// 这里也调用了 Run 方法
**Run(reinterpret_cast<Isolate*>(isolate));**
}
}
主要创建 Isolate 的过程在Isolate::CreateGroupCallback();
中,让我们看一下他是怎么来的:
Isolate::CreateGroupCallback()
他和上述Isolate::InitializeCallback_
的来源一致,都是在Dart_Initialize
中配置的,此外,还使用了parent_isolate_->init_callback_data()
。
先看一下的CreateIsolateGroupAndSetup
实现:
// -> runtime\bin\main.cc
static Dart_Isolate CreateIsolateGroupAndSetup(const char* script_uri,
const char* main,
const char* package_root,
const char* package_config,
Dart_IsolateFlags* flags,
void* callback_data,
char** error) {
// The VM should never call the isolate helper with a NULL flags.
ASSERT(flags != NULL);
ASSERT(flags->version == DART_FLAGS_CURRENT_VERSION);
ASSERT(package_root == nullptr);
bool dontneed_safe = true;
#if defined(DART_HOST_OS_LINUX)
// This would also be true in Linux, except that Google3 overrides the default
// ELF interpreter to one that apparently doesn't create proper mappings.
dontneed_safe = false;
#elif defined(DEBUG)
// If the snapshot isn't file-backed, madvise(DONT_NEED) is destructive.
if (Options::force_load_elf_from_memory()) {
dontneed_safe = false;
}
#endif
flags->snapshot_is_dontneed_safe = dontneed_safe;
int exit_code = 0;
#if !defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM)
if (strcmp(script_uri, DART_KERNEL_ISOLATE_NAME) == 0) {
return **CreateAndSetupKernelIsolate**(script_uri, package_config, flags, error,
&exit_code);
}
#endif // !defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM)
#if !defined(DART_PRECOMPILED_RUNTIME)
if (strcmp(script_uri, DART_DEV_ISOLATE_NAME) == 0) {
return **CreateAndSetupDartDevIsolate**(script_uri, package_config, flags,
error, &exit_code);
}
#endif // !defined(DART_PRECOMPILED_RUNTIME)
if (strcmp(script_uri, DART_VM_SERVICE_ISOLATE_NAME) == 0) {
return **CreateAndSetupServiceIsolate**(script_uri, package_config, flags,
error, &exit_code);
}
bool is_main_isolate = false;
return **CreateIsolateGroupAndSetupHelper**(is_main_isolate, script_uri, main,
package_config, flags, callback_data,
error, &exit_code);
}
这里创建 Isolate 的时候,区分了几种情况:
- 如果是 kernel-service(
DART_KERNEL_ISOLATE_NAME
)就执行CreateAndSetupKernelIsolate
- 如果是 dartdev(
DART_DEV_ISOLATE_NAME
)就执行CreateAndSetupDartDevIsolate
- 如果是 vm-service(
DART_VM_SERVICE_ISOLATE_NAME
)就执行CreateAndSetupServiceIsolate
- 如果以上都不满足,就执行
CreateIsolateGroupAndSetupHelper
显然,当我们在 Dart 代码中调用Isolate.spawnUri
的时候,这里会执行的是CreateIsolateGroupAndSetupHelper
:
// -> runtime\bin\main.cc
// 调用方
bool is_main_isolate = false;
return CreateIsolateGroupAndSetupHelper(is_main_isolate, script_uri, main,
package_config, flags, callback_data,
error, &exit_code);
// Returns newly created Isolate on success, NULL on failure.
static Dart_Isolate CreateIsolateGroupAndSetupHelper(
bool is_main_isolate,
const char* script_uri,
const char* name,
const char* packages_config,
Dart_IsolateFlags* flags,
void* callback_data,
char** error,
int* exit_code) {
...
// 根据是 AOT 还是 JIT 获取 kernel_buffer,app_snapshot,isolate_run_app_snapshot 等数据
#if defined(DART_PRECOMPILED_RUNTIME){
// AOT: All isolates need to be run from AOT compiled snapshots.
}
#else{
// JIT: Main isolate starts from the app snapshot, if any. Other isolates
// use the core libraries snapshot.
}
// 创建 isolate_group_data
auto isolate_group_data = new IsolateGroupData(
script_uri, packages_config, app_snapshot, isolate_run_app_snapshot);
// copy_parent_code 为 true 的话,这里的 kernel_buffer 为 NULL
if (kernel_buffer != NULL) {
if (kernel_buffer_ptr) {
isolate_group_data->SetKernelBufferAlreadyOwned(
std::move(kernel_buffer_ptr), kernel_buffer_size);
} else {
isolate_group_data->SetKernelBufferNewlyOwned(kernel_buffer,
kernel_buffer_size);
}
}
Dart_Isolate isolate = NULL;
IsolateData* isolate_data = nullptr;
#if !defined(DART_PRECOMPILED_RUNTIME)
if (!isolate_run_app_snapshot && (isolate_snapshot_data == NULL)) {
const uint8_t* platform_kernel_buffer = NULL;
intptr_t platform_kernel_buffer_size = 0;
dfe.LoadPlatform(&platform_kernel_buffer, &platform_kernel_buffer_size);
if (platform_kernel_buffer == NULL) {
platform_kernel_buffer = kernel_buffer;
platform_kernel_buffer_size = kernel_buffer_size;
}
if (platform_kernel_buffer == NULL) {
#if defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM)
FATAL(
"Binary built with --exclude-kernel-service. Cannot run"
" from source.");
#else
FATAL("platform_program cannot be NULL.");
#endif // defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM)
}
// TODO(sivachandra): When the platform program is unavailable, check if
// application kernel binary is self contained or an incremental binary.
// Isolate should be created only if it is a self contained kernel binary.
isolate_data = new IsolateData(isolate_group_data);
isolate = Dart_CreateIsolateGroupFromKernel(
script_uri, name, platform_kernel_buffer, platform_kernel_buffer_size,
flags, isolate_group_data, isolate_data, error);
} else {
isolate_data = new IsolateData(isolate_group_data);
// Creates a new isolate. The new isolate becomes the current isolate.
isolate = Dart_CreateIsolateGroup(script_uri, name, isolate_snapshot_data,
isolate_snapshot_instructions, flags,
isolate_group_data, isolate_data, error);
}
#else
**isolate_data = new IsolateData**(isolate_group_data);
// Creates a new isolate. The new isolate becomes the current isolate.
**isolate = Dart_CreateIsolateGroup**(script_uri, name, isolate_snapshot_data,
isolate_snapshot_instructions, flags,
**isolate_group_data**, **isolate_data**, error);
#endif // !defined(DART_PRECOMPILED_RUNTIME)
Dart_Isolate created_isolate = NULL;
if (isolate == NULL) {
delete isolate_data;
delete isolate_group_data;
} else {
**created_isolate = IsolateSetupHelper**(
isolate, is_main_isolate, script_uri, packages_config,
isolate_run_app_snapshot, flags, error, exit_code);
}
int64_t end = Dart_TimelineGetMicros();
Dart_TimelineEvent("CreateIsolateGroupAndSetupHelper", start, end,
Dart_Timeline_Event_Duration, 0, NULL, NULL);
return created_isolate;
}
这里可以看到,CreateIsolateGroupAndSetupHelper
按照是 JIT 还是 AOT 的编译方式,有不同的获取数据的方式,但不管哪种方式,最后都执行了一下三步:
- 创建
IsolateData* isolate_data
使用isolate_group_data
创建 IsolateData - 创建
Dart_Isolate isolate
创建Dart_Isolate
,将script_uri
,isolate_data
,和isolate_group_data
等绑定 - 创建并返回
Dart_Isolate created_isolate
包装isolate
,进行数据绑定,并将 isolate 标记为runnable
Dart_CreateIsolateGroup
这里分析一下**Dart_CreateIsolateGroup
的过程:**
// --> runtime/vm/dart_api_impl.cc#L1371
DART_EXPORT Dart_Isolate
Dart_CreateIsolateGroup(const char* script_uri,
const char* name,
const uint8_t* snapshot_data,
const uint8_t* snapshot_instructions,
Dart_IsolateFlags* flags,
void* isolate_group_data,
void* isolate_data,
char** error) {
API_TIMELINE_DURATION(Thread::Current());
Dart_IsolateFlags api_flags;
if (flags == nullptr) {
Isolate::FlagsInitialize(&api_flags);
flags = &api_flags;
}
const char* non_null_name = name == nullptr ? "isolate" : name;
std::unique_ptr<IsolateGroupSource> source(
new IsolateGroupSource(script_uri, non_null_name, snapshot_data,
snapshot_instructions, nullptr, -1, *flags));
// 创建 Isolate Group
auto group = new IsolateGroup(std::move(source), isolate_group_data, *flags);
// 创建 Isolate Group 持有的 Heap,由所有在这个 Isolate Group 下的 isolate 共享
group->CreateHeap(
/*is_vm_isolate=*/false, IsServiceOrKernelIsolateName(non_null_name));
IsolateGroup::RegisterIsolateGroup(group);
// 根据刚刚创建的 Isolate Group 创建 Isolate
Dart_Isolate isolate = CreateIsolate(group, /*is_new_group=*/true,
non_null_name, isolate_data, error);
if (isolate != nullptr) {
group->set_initial_spawn_successful();
}
return isolate;
}
Run(Isolate* child)
在上面的分析中,我们注意到,无论是RunHeavyweight(const char* name)
还是RunLightweight(const char* name)
方法,最后在创建了新的 isolate 之后,都执行了Run(Isolate* child)
方法,在这里正式启动了 isolate:
// -> runtime\lib\isolate.cc
void Run(Isolate* child) {
if (!EnsureIsRunnable(child)) {
Dart_ShutdownIsolate();
return;
}
state_->set_isolate(child);
if (state_->origin_id() != ILLEGAL_PORT) {
// origin_id is set to parent isolate main port id when spawning via
// spawnFunction.
child->set_origin_id(state_->origin_id());
}
bool success = true;
{
auto thread = Thread::Current();
// TransitionNativeToVM is used to transition the safepoint state of a
// thread from "running native code" to "running vm code" and ensures
// that the state is reverted back to "running native code" when
// exiting the scope/frame.
TransitionNativeToVM transition(thread);
// Create an empty zone and set is at the current zone for the Thread.
StackZone zone(thread);
// The class HandleScope is used to start a new handles scope in the
// code.
HandleScope hs(thread);
success = **EnqueueEntrypointInvocationAndNotifySpawner**(thread);
}
if (!success) {
state_ = nullptr;
Dart_ShutdownIsolate();
return;
}
// All preconditions are met for this to always succeed.
char* error = nullptr;
// Lets the VM run message processing for the isolate.
if (!**Dart_RunLoopAsync**(state_->errors_are_fatal(), state_->on_error_port(),
state_->on_exit_port(), &error)) {
FATAL("Dart_RunLoopAsync() failed: %s. Please file a Dart VM bug report.",
error);
}
}
这里只是做了一些环境准备,然后在EnqueueEntrypointInvocationAndNotifySpawner
方法中将 isolate 要运行的所有东西都准备好,然后再在Dart_RunLoopAsync
方法中正式开始 isolate 处理 event queue.
EnqueueEntrypointInvocationAndNotifySpawner
// -> runtime\lib\isolate.cc
bool EnqueueEntrypointInvocationAndNotifySpawner(Thread* thread) {
auto isolate = thread->isolate();
auto zone = thread->zone();
const bool is_spawn_uri = state_->is_spawn_uri();
// Step 1) Resolve the entrypoint function.
// 查找 isolate 开始运行的第一个方法,比如 Isolate.spawn 的 spawn 或者 Isolate.spawnUri 的 main 方法
auto& entrypoint_closure = Closure::Handle(zone);
if (state_->closure_tuple_handle() != nullptr) {
const auto& result = Object::Handle(
zone,
ReadObjectGraphCopyMessage(thread, state_->closure_tuple_handle()));
if (result.IsError()) {
ReportError(
"Failed to deserialize the passed entrypoint to the new isolate.");
return false;
}
entrypoint_closure = Closure::RawCast(result.ptr());
} else {
const auto& result = Object::Handle(zone, state_->ResolveFunction());
if (result.IsError()) {
ASSERT(is_spawn_uri);
ReportError("Failed to resolve entrypoint function.");
return false;
}
ASSERT(result.IsFunction());
auto& func = Function::Handle(zone, Function::Cast(result).ptr());
func = func.ImplicitClosureFunction();
entrypoint_closure = func.ImplicitStaticClosure();
}
// Step 2) Enqueue delayed invocation of entrypoint callback.
const auto& args_obj = Object::Handle(zone, state_->BuildArgs(thread));
if (args_obj.IsError()) {
ReportError(
"Failed to deserialize the passed arguments to the new isolate.");
return false;
}
ASSERT(args_obj.IsNull() || args_obj.IsInstance());
const auto& message_obj =
Object::Handle(zone, state_->BuildMessage(thread));
if (message_obj.IsError()) {
ReportError(
"Failed to deserialize the passed arguments to the new isolate.");
return false;
}
ASSERT(message_obj.IsNull() || message_obj.IsInstance());
// 解析参数,分别是 isolate 初始运行方法,参数 args、messgae、是否 spawn_uri
const Array& args = Array::Handle(zone, Array::New(4));
args.SetAt(0, entrypoint_closure);
args.SetAt(1, args_obj);
args.SetAt(2, message_obj);
args.SetAt(3, is_spawn_uri ? Bool::True() : Bool::False());
const auto& lib = Library::Handle(zone, Library::IsolateLibrary());
const auto& entry_name = String::Handle(zone, String::New("_startIsolate"));
const auto& entry_point =
Function::Handle(zone, lib.LookupLocalFunction(entry_name));
ASSERT(entry_point.IsFunction() && !entry_point.IsNull());
const auto& result =
Object::Handle(zone, DartEntry::InvokeFunction(entry_point, args));
if (result.IsError()) {
ReportError("Failed to enqueue delayed entrypoint invocation.");
return false;
}
// Step 3) Pause the isolate if required & Notify parent isolate about
// isolate creation.
const auto& capabilities = Array::Handle(zone, Array::New(2));
auto& capability = Capability::Handle(zone);
capability = Capability::New(isolate->pause_capability());
capabilities.SetAt(0, capability);
capability = Capability::New(isolate->terminate_capability());
capabilities.SetAt(1, capability);
const auto& send_port =
SendPort::Handle(zone, SendPort::New(isolate->main_port()));
const auto& message = Array::Handle(zone, Array::New(2));
message.SetAt(0, send_port);
message.SetAt(1, capabilities);
if (state_->paused()) {
capability ^= capabilities.At(0);
const bool added = isolate->AddResumeCapability(capability);
ASSERT(added);
isolate->message_handler()->increment_paused();
}
{
// If parent isolate died, we ignore the fact that we cannot notify it.
// 创建一个新的 Message 并将其压入 Isolate 的父 Isolate 对应的 MessageHandler 的 event queue 中
PortMap::PostMessage(WriteMessage(/* can_send_any_object */ false,
/* same_group */ false, message,
state_->parent_port(),
Message::kNormalPriority));
}
return true;
}
这里主要做了 3 件事:
- 查找 isolate 开始运行的第一个方法
entrypoint
,比如Isolate.spawn
的entrypoint
或者Isolate.spawnUri
的main
方法 - 解析参数,分别是 isolate 初始运行方法,参数
args
、messgae
、是否spawn_uri
等等,将其与上一步找到的entrypoint
结合 - (如果需要的话暂停创建好的 isolate),并通知 isolate 的父 isolate 当前 isolate 创建成功(附带当前 isolate 的
send_port
)
至此,Isolate 的创建工作已经完成,在Dart_RunLoopAsync
开始 isolate 处理消息:
Dart_RunLoopAsync
在这里主要是开始处理 event loop。
// -> runtime\vm\dart_api_impl.cc
DART_EXPORT bool Dart_RunLoopAsync(bool errors_are_fatal,
Dart_Port on_error_port,
Dart_Port on_exit_port,
char** error) {
auto thread = Thread::Current();
auto isolate = thread->isolate();
CHECK_ISOLATE(isolate);
*error = nullptr;
if (thread->api_top_scope() != nullptr) {
*error = Utils::StrDup("There must not be an active api scope.");
return false;
}
if (!isolate->is_runnable()) {
const char* error_msg = isolate->MakeRunnable();
if (error_msg != nullptr) {
*error = Utils::StrDup(error_msg);
return false;
}
}
isolate->SetErrorsFatal(errors_are_fatal);
if (on_error_port != ILLEGAL_PORT || on_exit_port != ILLEGAL_PORT) {
auto thread = Thread::Current();
TransitionNativeToVM transition(thread);
StackZone zone(thread);
if (on_error_port != ILLEGAL_PORT) {
const auto& port =
SendPort::Handle(thread->zone(), SendPort::New(on_error_port));
isolate->AddErrorListener(port);
}
if (on_exit_port != ILLEGAL_PORT) {
const auto& port =
SendPort::Handle(thread->zone(), SendPort::New(on_exit_port));
isolate->AddExitListener(port, Instance::null_instance());
}
}
Dart_ExitIsolate();
**isolate->Run();**
return true;
}
// -> runtime\vm\isolate.cc
void Isolate::Run() {
message_handler()->Run(group()->thread_pool(), nullptr, ShutdownIsolate,
reinterpret_cast<uword>(this));
}
Isolate::Run()
实际上是开启了处理消息队列:
// ->
// Runs this message handler on the thread pool.
//
// Before processing messages, the optional StartFunction is run.
//
// A message handler will run until it terminates either normally or
// abnormally. Normal termination occurs when the message handler
// no longer has any live ports. Abnormal termination occurs when
// HandleMessage() indicates that an error has occurred during
// message processing.
// Returns false if the handler terminated abnormally, otherwise it
// returns true.
bool MessageHandler::Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data) {
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Starting message handler:\n"
"\thandler: %s\n",
name());
}
ASSERT(pool_ == NULL);
ASSERT(!delete_me_);
pool_ = pool;
start_callback_ = start_callback;
end_callback_ = end_callback;
callback_data_ = data;
task_running_ = true;
bool result = **pool_->Run<MessageHandlerTask>(this);**
if (!result) {
pool_ = nullptr;
start_callback_ = nullptr;
end_callback_ = nullptr;
callback_data_ = 0;
task_running_ = false;
}
return result;
}
// -> runtime\vm\thread_pool.h
class ThreadPool {
bool Run(Args&&... args) {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}
...
}
// -> runtime\vm\thread_pool.cc
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* new_worker = nullptr;
{
MonitorLocker ml(&pool_monitor_);
if (shutting_down_) {
return false;
}
// 从线程池中获取 task,如果有空闲的/达到最大数量就尝试复用(此时这里返回 null)
// 否则创建新的并返回
new_worker = **ScheduleTaskLocked**(&ml, std::move(task));
}
if (new_worker != nullptr) {
// 创建一个新的 Worker 在新的系统线程运行 task
new_worker->**StartThread()**;
}
return true;
}
void ThreadPool::Worker::StartThread() {
// 创建一个新的系统线程,运行指定的代码,
// android 的实现在 runtime\vm\os_thread_android.cc
int result = OSThread::Start("DartWorker", &**Worker::Main**,
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Could not start worker thread: result = %d.", result);
}
}
这里通过ThreadPool::ScheduleTaskLocked
方法获取new_worker
:
- 如果已有的 worker 有空闲的或者已经达到最大数目了,就等待已有的 worker 执行任务
- 否则就创建新的 worker,并在新的线程运行
在获取到 worker 之后,就执行MessageHandlerTask
(见下文详细分析)。
我们主要关注 3 点:
ScheduleTaskLocked
分配 WorkerOSThread::Start
中使用&Worker::Main
在新系统线程开启 Worker 循环MessageHandlerTask
执行具体的消息分发内容
ScheduleTaskLocked
先详细看一下获取new_worker
的ThreadPool::ScheduleTaskLocked
方法:
// -> runtime\vm\thread_pool.cc
ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
std::unique_ptr<Task> task) {
// Enqueue the new task.
tasks_.Append(task.release());
pending_tasks_++;
ASSERT(pending_tasks_ >= 1);
// Notify existing idle worker (if available).
if (count_idle_ >= pending_tasks_) {
ASSERT(!idle_workers_.IsEmpty());
ml->Notify();
return nullptr;
}
// If we have maxed out the number of threads running, we will not start a
// new one.
if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
if (!idle_workers_.IsEmpty()) {
ml->Notify();
}
return nullptr;
}
// Otherwise start a new worker.
auto new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
return new_worker;
}
这里的逻辑是:
将当前任务加入到tasks_
队列中。
- 如果空闲
count_idle_
的 Worker 比等待中的任务数pending_tasks_
多,那就发送通知,使用已有的 Worker 处理任务。 - 如果当前 Worker 数量已经最大了,那就将等待中的任务数 pendingtasks 加一,等待有空闲的 Worker 处理任务。
- 否则,就新建一个 Worker(会对应创建一个新的系统线程)来处理任务。
&Worker::Main
在ThreadPool::RunImpl(std::unique_ptr<Task> task)
这里,StartThread 的第二个参数,**&Worker::Main
**启动了一个循环,不断的在任务队列tasks_
中取出消息并执行:
// -> runtime\vm\thread_pool.cc
ThreadPool::Worker::Main(uword args){
Worker* worker = reinterpret_cast<Worker*>(args);
ThreadPool* pool = worker->pool_;
pool->WorkerLoop(worker);
}
void ThreadPool::WorkerLoop(Worker* worker) {
WorkerList dead_workers_to_join;
while (true) {
MonitorLocker ml(&pool_monitor_);
// worker 会从 task_取出一个 task 并运行
if (!tasks_.IsEmpty()) {
IdleToRunningLocked(worker);
while (!tasks_.IsEmpty()) {
std::unique_ptr<Task> task(tasks_.RemoveFirst());
pending_tasks_--;
MonitorLeaveScope mls(&ml);
**task->Run();**
ASSERT(Isolate::Current() == nullptr);
task.reset();
}
RunningToIdleLocked(worker);
}
if (running_workers_.IsEmpty()) {
ASSERT(tasks_.IsEmpty());
OnEnterIdleLocked(&ml);
if (!tasks_.IsEmpty()) {
continue;
}
}
if (shutting_down_) {
ObtainDeadWorkersLocked(&dead_workers_to_join);
IdleToDeadLocked(worker);
break;
}
// Sleep until we get a new task, we time out or we're shutdown.
const int64_t idle_start = OS::GetCurrentMonotonicMicros();
bool done = false;
while (!done) {
const auto result = ml.WaitMicros(ComputeTimeout(idle_start));
// We have to drain all pending tasks.
if (!tasks_.IsEmpty()) break;
if (shutting_down_ || result == Monitor::kTimedOut) {
done = true;
break;
}
}
if (done) {
ObtainDeadWorkersLocked(&dead_workers_to_join);
IdleToDeadLocked(worker);
break;
}
}
// Before we transitioned to dead we obtained the list of previously died dead
// workers, which we join here. Since every death of a worker will join
// previously died workers, we keep the pending non-joined [dead_workers_] to
// effectively 1.
JoinDeadWorkersLocked(&dead_workers_to_join);
}
MessageHandlerTask
无论是哪种 Worker,最后都是执行的MessageHandlerTask
:
// -> runtime\vm\message_handler.cc
class MessageHandlerTask : public ThreadPool::Task {
public:
explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
ASSERT(handler != NULL);
}
virtual void Run() {
ASSERT(handler_ != NULL);
handler_->TaskCallback();
}
void MessageHandler::TaskCallback() {
ASSERT(Isolate::Current() == NULL);
MessageStatus status = kOK;
bool run_end_callback = false;
bool delete_me = false;
EndCallback end_callback = NULL;
CallbackData callback_data = 0;
{
// We will occasionally release and reacquire this monitor in this
// function. Whenever we reacquire the monitor we *must* process
// all pending OOB messages, or we may miss a request for vm
// shutdown.
MonitorLocker ml(&monitor_);
// This method is running on the message handler task. Which means no
// other message handler tasks will be started until this one sets
// [task_running_] to false.
ASSERT(task_running_);
#if !defined(PRODUCT)
if (ShouldPauseOnStart(kOK)) {
if (!is_paused_on_start()) {
PausedOnStartLocked(&ml, true);
}
// More messages may have come in before we (re)acquired the monitor.
status = HandleMessages(&ml, false, false);
if (ShouldPauseOnStart(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnStartLocked(&ml, false);
}
}
if (is_paused_on_exit()) {
status = HandleMessages(&ml, false, false);
if (ShouldPauseOnExit(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnExitLocked(&ml, false);
}
}
#endif // !defined(PRODUCT)
if (status == kOK) {
if (start_callback_ != nullptr) {
// Initialize the message handler by running its start function,
// if we have one. For an isolate, this will run the isolate's
// main() function.
//
// Release the monitor_ temporarily while we call the start callback.
ml.Exit();
status = start_callback_(callback_data_);
ASSERT(Isolate::Current() == NULL);
start_callback_ = NULL;
ml.Enter();
}
// Handle any pending messages for this message handler.
if (status != kShutdown) {
status = HandleMessages(&ml, (status == kOK), true);
}
}
// The isolate exits when it encounters an error or when it no
// longer has live ports.
if (status != kOK || !HasLivePorts()) {
#if !defined(PRODUCT)
if (ShouldPauseOnExit(status)) {
if (FLAG_trace_service_pause_events) {
OS::PrintErr(
"Isolate %s paused before exiting. "
"Use the Observatory to release it.\n",
name());
}
PausedOnExitLocked(&ml, true);
// More messages may have come in while we released the monitor.
**status = HandleMessages(&ml, false, false);**
if (ShouldPauseOnExit(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnExitLocked(&ml, false);
}
}
#endif // !defined(PRODUCT)
if (FLAG_trace_isolates) {
if (status != kOK && thread() != NULL) {
const Error& error = Error::Handle(thread()->sticky_error());
OS::PrintErr(
"[-] Stopping message handler (%s):\n"
"\thandler: %s\n"
"\terror: %s\n",
MessageStatusString(status), name(), error.ToCString());
} else {
OS::PrintErr(
"[-] Stopping message handler (%s):\n"
"\thandler: %s\n",
MessageStatusString(status), name());
}
}
pool_ = NULL;
// Decide if we have a callback before releasing the monitor.
end_callback = end_callback_;
callback_data = callback_data_;
run_end_callback = end_callback_ != NULL;
delete_me = delete_me_;
}
// Clear task_running_ last. This allows other tasks to potentially start
// for this message handler.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false;
}
// The handler may have been deleted by another thread here if it is a native
// message handler.
// Message handlers either use delete_me or end_callback but not both.
ASSERT(!delete_me || !run_end_callback);
if (run_end_callback) {
ASSERT(end_callback != NULL);
end_callback(callback_data);
// The handler may have been deleted after this point.
}
if (delete_me) {
delete this;
}
}
可以看到,这里执行了MessageHandler::HandleMessages
方法,来处理消息:
// -> runtime\vm\message_handler.cc
MessageHandler::MessageStatus MessageHandler::HandleMessages(
MonitorLocker* ml,
bool allow_normal_messages,
bool allow_multiple_normal_messages) {
ASSERT(monitor_.IsOwnedByCurrentThread());
// Scheduling of the mutator thread during the isolate start can cause this
// thread to safepoint.
// We want to avoid holding the message handler monitor during the safepoint
// operation to avoid possible deadlocks, which can occur if other threads are
// sending messages to this message handler.
//
// If isolate() returns nullptr [StartIsolateScope] does nothing.
ml->Exit();
StartIsolateScope start_isolate(isolate());
ml->Enter();
auto idle_time_handler =
isolate() != nullptr ? isolate()->group()->idle_time_handler() : nullptr;
MessageStatus max_status = kOK;
Message::Priority min_priority =
((allow_normal_messages && !paused()) ? Message::kNormalPriority
: Message::kOOBPriority);
std::unique_ptr<Message> **message = DequeueMessage(min_priority);**
while (message != nullptr) {
intptr_t message_len = message->Size();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[<] Handling message:\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
message_len, name(), message->dest_port());
}
// Release the monitor_ temporarily while we handle the message.
// The monitor was acquired in MessageHandler::TaskCallback().
ml->Exit();
Message::Priority saved_priority = message->priority();
Dart_Port saved_dest_port = message->dest_port();
MessageStatus status = kOK;
{
DisableIdleTimerScope disable_idle_timer(idle_time_handler);
status = HandleMessage(std::move(message));
}
if (status > max_status) {
max_status = status;
}
ml->Enter();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[.] Message handled (%s):\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
MessageStatusString(status), message_len, name(), saved_dest_port);
}
// If we are shutting down, do not process any more messages.
if (status == kShutdown) {
ClearOOBQueue();
break;
}
// Remember time since the last message. Don't consider OOB messages so
// using Observatory doesn't trigger additional idle tasks.
if ((FLAG_idle_timeout_micros != 0) &&
(saved_priority == Message::kNormalPriority)) {
if (idle_time_handler != nullptr) {
idle_time_handler->UpdateStartIdleTime();
}
}
// Some callers want to process only one normal message and then quit. At
// the same time it is OK to process multiple OOB messages.
if ((saved_priority == Message::kNormalPriority) &&
!allow_multiple_normal_messages) {
// We processed one normal message. Allow no more.
allow_normal_messages = false;
}
// Reevaluate the minimum allowable priority. The paused state
// may have changed as part of handling the message. We may also
// have encountered an error during message processing.
//
// Even if we encounter an error, we still process pending OOB
// messages so that we don't lose the message notification.
min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
? Message::kNormalPriority
: Message::kOOBPriority);
message = DequeueMessage(min_priority);
}
return max_status;
}
MessageHandler::DequeueMessage
则是按照优先级,依次从oob_queue_
和queue_
中获取消息:
// -> runtime\vm\message_handler.cc
std::unique_ptr<Message> MessageHandler::DequeueMessage(
Message::Priority min_priority) {
// TODO(turnidge): Add assert that monitor_ is held here.
std::unique_ptr<Message> message = oob_queue_->Dequeue();
if ((message == nullptr) && (min_priority < Message::kOOBPriority)) {
message = queue_->Dequeue();
}
return message;
}
关于oob_queue_
和queue_
的区别如下:
// ->
class MessageHandler {
MessageQueue* queue_;
MessageQueue* oob_queue_;
...
}
// -> runtime\vm\message.h
class Message {
// A message processed at any interrupt point (stack overflow check) instead
// of at the top of the message loop. Control messages from dart:isolate or
// vm-service requests.
bool IsOOB() const { return priority_ == Message::kOOBPriority; }
}
这里消息处理的步骤也启动了。
总结一下,Dart_RunLoopAsync
的主要功能是触发 isolate 的message_handler
处理消息分发:
Dart_RunLoopAsync
→ Isolate::Run()
→ message_handler()->Run()
→ pool_->Run<MessageHandlerTask>
→ ThreadPool::RunImpl
在ThreadPool::RunImpl(std::unique_ptr<Task> task)
这里主要触发了 2 步:
ScheduleTaskLocked
获取到new_worker
new_worker
调用ThreadPool::Worker::StartThread()
方法开启循环
然后根据是否创建了new_worker
有两种情况:
- 有
new_worker
,使用在OSThread::Start
方法中创建了一个新的系统线程,执行ThreadPool::Worker::Main
(这个方法的主要作用使用new_worker
从线程池中的取出任务执行) - 没有
new_worker
,那么等待已有的 Worker 空闲时执行任务
无论如何,这里的 Worker 要执行的任务都是在MessageHandler::Run
方法中指定的MessageHandlerTask
,而这个任务的内容便是开启MessageHandler::HandleMessages
方法,按照优先级不断的依次从oob_queue_
和queue_
中获取消息并处理。
总结
Isolate 是 Dart 代码运行的地方,拥有独立的 event loop,和全局变量,在自己单独的线程运行。
Isolate.spawn 默认会创建在同一个 IsolateGroup 中的 Isolate,他们之间共享 Heap(这里会发生 GC)和一个线程池。
Isolate.spawnUri 会从制定的 Uri 中创建一个新的 IsolateGroup 和对应的 Isolate,并执行 Uri 中的 main 方法。
Isolate 内部维持一个 Event Loop。