Dart 读取文件过程分析
Dart 读取文件时,先在 Dart 代码创建 File 引用,通过与IOService
跨Isolate
通信(先通过 IO Service 而发送请求到 native 端,等到 native 执行完操作之后再回调结果)从而实现对文件的读写。
实现一个简单的读取文件的代码如下:
import 'dart:io';
main() {
var filePath =
r"G:/21996.1.210529-1541.co_release_CLIENT_CONSUMER_x64FRE_en-us.iso";
var file = File(filePath);
var startTime = printCurrentTimeMs("start run file.readAsBytes");
file.readAsBytes().then((value) {
printCurrentTimeMs("file.readAsBytes() finish",
lastTimeMs: startTime,
suffix: "\nfile.readAsBytes() result:${value.length}");
});
printCurrentTimeMs("finish run file.readAsBytes");
}
int printCurrentTimeMs(String prefix, {String? suffix, int? lastTimeMs}) {
var currentTimeMs = DateTime.now().millisecondsSinceEpoch;
var timeElapseString =
lastTimeMs == null ? "" : ", time elapse:${currentTimeMs - lastTimeMs}ms ";
print(
"$prefix current time($currentTimeMs)$timeElapseString${suffix ?? ""}");
return currentTimeMs;
}
整个过程如下:
过程分析
Dart 端发起文件读写请求
其中file.readAsBytes()
是具体执行读取文件的地方,他的定义如下:
// -> sdk\lib\io\file_impl.dart
Future<Uint8List> readAsBytes();
在我们创建File
时,实际上创建的是_File
(class _File extends FileSystemEntity implements File
)对象:
// -> sdk\lib\io\file_impl.dart
// abstract class File implements FileSystemEntity
@pragma("vm:entry-point")
factory File(String path) {
final IOOverrides? overrides = IOOverrides.current;
if (overrides == null) {
return new _File(path);
}
return overrides.createFile(path);
}
_File
是File
的实现类,所以file.readAsBytes()
实际调用的是_File
实现的方法:
// -> sdk\lib\io\file_impl.dart
// Read the file in blocks of size 64k.
const int _blockSize = 64 * 1024;
class _File extends FileSystemEntity implements File {
Future<Uint8List> readAsBytes() {
Future<Uint8List> readDataChunked(RandomAccessFile file) {
// 分段读取文件,每次只读取_blockSize 大小的内容
var builder = new BytesBuilder(copy: false);
var completer = new Completer<Uint8List>();
void read() {
// 每次只异步读取一部分文本
file.read(_blockSize).then((data) {
if (data.length > 0) {
builder.add(data);
read();
} else {
completer.complete(builder.takeBytes());
}
}, onError: completer.completeError);
}
read();
return completer.future;
}
return open().then((file) {
return file.length().then((length) {
if (length == 0) {
// May be character device, try to read it in chunks.
return readDataChunked(file);
}
return file.read(length);
}).whenComplete(file.close);
});
}
}
可以看到,无论是普通的文件格式,还是 character device,最后都是调用了_RandomAccessFile
的open()
和read(int bytes)
方法异步读取文件。
设备文件分为 Block Device Driver 和 Character Device Drive 两类。
Character Device Driver 又被称为字符设备或裸设备 raw devices; Block Device Driver 通常成为块设备。
而 Block Device Driver 是以固定大小长度来传送转移资料;Character Device Driver 是以不定长度的字元传送资料。 https://www.cnblogs.com/qlee/archive/2011/07/27/2118406.html#:~:text=Character
// -> flutter\bin\cache\pkg\sky_engine\lib\io\file_impl.dart
class _RandomAccessFile implements RandomAccessFile {
final String path;
bool _asyncDispatched = false;
// 读取文件的信息
late _FileResourceInfo _resourceInfo;
// 对文件的操作引用
_RandomAccessFileOps _ops;
@pragma("vm:entry-point")
_RandomAccessFile(int pointer, this.path)
: _ops = new _RandomAccessFileOps(pointer) {
_resourceInfo = new _FileResourceInfo(this);
_maybeConnectHandler();
}
// 异步读取文件
Future<Uint8List> read(int bytes) {
// TODO(40614): Remove once non-nullability is sound.
ArgumentError.checkNotNull(bytes, "bytes");
// 异步读取文件,实际上是将发送指令到 IO Service,然后等待返回结果
return _dispatch(_IOService.fileRead, [null, bytes]).then((response) {
if (_isErrorResponse(response)) {
throw _exceptionFromResponse(response, "read failed", path);
}
_resourceInfo.addRead(response[1].length);
// 读取的文件内容
Uint8List result = response[1];
return result;
});
}
// 同步读取文件
Uint8List readSync(int bytes) {
// TODO(40614): Remove once non-nullability is sound.
ArgumentError.checkNotNull(bytes, "bytes");
_checkAvailable();
// 同步读取文件是对文件直接操作
var result = _ops.read(bytes);
if (result is OSError) {
throw new FileSystemException("readSync failed", path, result);
}
_resourceInfo.addRead(result.length);
return result;
}
Future<RandomAccessFile> open({FileMode mode = FileMode.read}) {
// FileMode https://github.com/dart-lang/sdk/blob/main/sdk/lib/io/io_service.dart
if (mode != FileMode.read &&
mode != FileMode.write &&
mode != FileMode.append &&
mode != FileMode.writeOnly &&
mode != FileMode.writeOnlyAppend) {
return new Future.error(
new ArgumentError('Invalid file mode for this operation'));
}
return _dispatchWithNamespace(
// 请求操作为“打开文件”,参数为:null,文件路径,操作文件的 mode
_IOService.fileOpen, [null, _rawPath, mode._mode]).then((response) {
if (_isErrorResponse(response)) {
throw _exceptionFromResponse(response, "Cannot open file", path);
}
// 从 IO Service 那里异步获得文件句柄 response 和 path
return new _RandomAccessFile(response, path);
});
}
}
在_RandomAccessFile
中,除了同步读写文件是对返回的文件引用直接操作外,很多操作都能看到通过_dispatch()
方法与IO Service
通信,让我们看一下这个方法的实现:
// -> sdk\lib\io\file_impl.dart
// _RandomAccessFile
Future _dispatch(int request, List data, {bool markClosed = false}) {
if (closed) {
return new Future.error(new FileSystemException("File closed", path));
}
if (_asyncDispatched) {
var msg = "An async operation is currently pending";
return new Future.error(new FileSystemException(msg, path));
}
if (markClosed) {
// Set closed to true to ensure that no more async requests can be issued
// for this file.
closed = true;
}
_asyncDispatched = true;
data[0] = _pointer();
// 主要代码在这里,通过_IOService 的_dispatch 发送指令
return **_IOService._dispatch**(request, data).whenComplete(() {
_asyncDispatched = false;
});
}
// open create 之类的操作会调用这个方法,不过最后也是调用_IOService._dispatch(request, data) 通信
static Future _dispatchWithNamespace(int request, List data) {
data[0] = _namespacePointer();
// 与 IO Service 进行异步通信,request 标记请求操作的类型,data 则是数据
return **_IOService._dispatch**(request, data);
}
查阅_IOService的源码后发现这是个external
方法。
external static Future _dispatch(int request, List data);
An
external
function is connected to its body by an implementation-specific mechanism. Attempting to invoke an external function that has not been connected to its body will throw a NoSuchMethodError or some subclass thereof.
****https://github.com/dart-lang/sdk/issues/4300
根据external
的定义,_dispatch
方法在不同的机器上面实现不同。我们只看和 app 相关的实现(在sdk\lib\_internal\vm
目录下,vm 同级目录还有 js 等实现),具体的实现如下:
// -> sdk\lib\_internal\vm\bin\io_service_patch.dart
// _IOService
class _IOService {
// 用于向 IO Service 发送消息
static _IOServicePorts _servicePorts = new _IOServicePorts();
// We use a static variable here to hold onto the last result of
// calling the IO Service frome the native.
static RawReceivePort? _receivePort;
// the other side(other isolate) will send message back with the _replyToPort
static late SendPort _replyToPort;
// a map holding the registered callbacks for each received message.
static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
static int _id = 0;
/// [request] IO 操作的类型,具体值在 [sdk/lib/io/io_service.dart] 中的_IOService 类中定义
/// 主要有对文件、目录、网络进行操作的请求
/// [data] 对应的数据,如果是文件,则是文件路径,如果是目录,则是目录路径等等
@patch
static Future _dispatch(int request, List data) {
int id;
do {
// create a special id to identify the request.
id = _getNextId();
} while (_messageMap.containsKey(id));
// 通过_servicePorts 获取一个新的 SendPort 以便向 IOService 发送消息,
// 这个 SendPort 是 IO Service 返回给 dart 用来向他发消息的
final SendPort servicePort = _servicePorts._getPort(id);
_ensureInitialize();
final Completer completer = new Completer();
_messageMap[id] = completer;
try {
// 向 IOService 发送消息,当 request 执行完毕之后,
// 会调用_replyToPort 触发在 root zone 的回调_receivePort!.handler
**servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
} catch (error) {
_messageMap.remove(id)!.complete(error);
if (_messageMap.length == 0) {
_finalize();
}
}
return completer.future;
}
static void _ensureInitialize() {
if (_receivePort == null) {
_receivePort = new RawReceivePort(null, 'IO Service');
// 其他地方可以使用_replyToPort 来发消息触发_receivePort 执行 handler 方法
_replyToPort = _receivePort!.sendPort;
_receivePort!.handler = (data) {
// 在这里处理 IOService 执行完方法返回的数据
assert(data is List && data.length == 2);
// data[0] 就是我们在_dispatch 方法中获取的 id,
// 将处理结果 data[1] 通过 Completer.complete 返回
_messageMap.remove(data[0])!.complete(data[1]);
// 释放这个触发这个回调的 SendPort
_servicePorts._returnPort(data[0]);
if (_messageMap.length == 0) {
_finalize();
}
};
}
}
...
}
可以看到,最后是通过RawReceivePort
/SendPort
进行跨 Isolate 通信。
_IOService
使用_servicePorts
对 native 层发送消息触发 IO 操作,然后使用_receivePort
监听,当 IO 操作完成时会通过_replyToPort
回调结果,会在 _receivePort!.handler
方法中根据当时请求的id
找到Completer
将结果传递回去。
这样当时我们在 file.readAsBytes()
时获取到的Future
便会收到回调,从而完成文件操作的流程。
file.readAsBytes().then((value) {
printCurrentTimeMs("file.readAsBytes() finish",
lastTimeMs: startTime,
suffix: "\nfile.readAsBytes() result:${value.length}");
});
下面是到目前为止涉及到的类关系示意图:
IO Service 中转
那么,这个 IO Service 是做什么的,他又是如何实现与 dart 中的调用方双向通信,以及执行调用方需要的功能呢?
位于sdk\lib\_internal\vm\bin\io_service_patch.dart
的_IOService 是一个中转站,向上承接来自 Dart 代码的 IO 请求指令(先行返回 Future),向下将这些指令转发至 Native 层的 IO Service,并监听回调,当 native 层处理完这些 IO 指令之后,将结果通过 Future 返回给 Dart 调用方。
让我们再看一下他的具体实现:
// -> sdk\lib\_internal\vm\bin\io_service_patch.dart
// _IOService
class _IOService {
// 用于向 IO Service 发送消息
static _IOServicePorts _servicePorts = new _IOServicePorts();
// We use a static variable here to hold onto the last result of
// calling the IO Service frome the native.
static RawReceivePort? _receivePort;
// the other side(other isolate) will send message back with the _replyToPort
static late SendPort _replyToPort;
// a map holding the registered callbacks for each received message.
static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
static int _id = 0;
/// [request] IO 操作的类型,具体值在 [sdk/lib/io/io_service.dart] 中的_IOService 类中定义
/// 主要有对文件、目录、网络进行操作的请求
/// [data] 对应的数据,如果是文件,则是文件路径,如果是目录,则是目录路径等等
@patch
static Future _dispatch(int request, List data) {
int id;
do {
// create a special id to identify the request.
id = _getNextId();
} while (_messageMap.containsKey(id));
// 通过_servicePorts 获取一个新的 SendPort 以便向 IOService 发送消息,
// 这个 SendPort 是 IO Service 返回给 dart 用来向他发消息的
final SendPort **servicePort** = _servicePorts.**_getPort(id);**
_ensureInitialize();
final Completer completer = new Completer();
_messageMap[id] = completer;
try {
// 向 IOService 发送消息,当 request 执行完毕之后,
// 会调用_replyToPort 触发在 root zone 的回调_receivePort!.handler
**servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
} catch (error) {
_messageMap.remove(id)!.complete(error);
if (_messageMap.length == 0) {
_finalize();
}
}
return completer.future;
}
static void _ensureInitialize() {
if (_receivePort == null) {
_receivePort = new RawReceivePort(null, 'IO Service');
// 其他地方可以使用_replyToPort 来发消息触发_receivePort 执行 handler 方法
_replyToPort = _receivePort!.sendPort;
_receivePort!.handler = (data) {
// 在这里处理 IOService 执行完方法返回的数据
assert(data is List && data.length == 2);
// data[0] 就是我们在_dispatch 方法中获取的 id,
// 将处理结果 data[1] 通过 Completer.complete 返回
_messageMap.remove(data[0])!.complete(data[1]);
// 释放这个触发这个回调的 SendPort
_servicePorts._returnPort(data[0]);
if (_messageMap.length == 0) {
_finalize();
}
};
}
}
...
}
可以看到:
_IOService
持有_IOServicePorts _servicePorts
以便获取SendPort servicePort
和 native 层通信,- 在之前的代码分析中,我们已经知道
_IOService
还在_ensureInitialize()
中监听着RawReceivePort? _receivePort
的回调, - 这样当
_IOService
在_dispatch()
方法中将_replyToPort
(_receivePort
的 SendPort)传递给servicePort
后,一旦 native 通过_replyToPort
发送处理结果,_IOService
立马可以收到并通过Completer.complete
返回给Dart 中的调用方。
上述这些步骤能够实施的关键,在于Dart层的_IOService
如何与native层的_IOService
关联起来呢?
让我们来分析一下SendPort servicePort
的获取过程:
// -> sdk\lib\_internal\vm\bin\io_service_patch.dart
class _IOService {
// 用于向 IO Service 发送消息
static _IOServicePorts _servicePorts = new _IOServicePorts();
...
}
class _IOServicePorts {
// We limit the number of IO Service ports per isolate so that we don't
// spawn too many threads all at once, which can crash the VM on Windows.
static const int maxPorts = 32;
List<SendPort> _ports = <SendPort>[];
List<SendPort> _freePorts = <SendPort>[];
Map<int, SendPort> _usedPorts = new HashMap<int, SendPort>();
_IOServicePorts();
SendPort _getPort(int forRequestId) {
if (_freePorts.isEmpty && _usedPorts.length < maxPorts) {
// 如果没有可用的 SendPort,就新建 SendPort 用于远程服务通信
final SendPort port = **_newServicePort()**;
_ports.add(port);
_freePorts.add(port);
}
if (!_freePorts.isEmpty) {
// 有空闲 SendPort,使用
final SendPort port = _freePorts.removeLast();
assert(!_usedPorts.containsKey(forRequestId));
_usedPorts[forRequestId] = port;
return port;
}
// We have already allocated the max number of ports. Re-use an
// existing one.
final SendPort port = _ports[forRequestId % maxPorts];
_usedPorts[forRequestId] = port;
return port;
}
// 释放掉占用的 port
void _returnPort(int forRequestId) {
final SendPort port = _usedPorts.remove(forRequestId)!;
if (!_usedPorts.values.contains(port)) {
_freePorts.add(port);
}
}
@pragma("vm:external-name", "IOService_NewServicePort")
external static SendPort _newServicePort();
}
可以看到这里最后的关键方法是SendPort _newServicePort()
,这是一个external
方法,在 native 实现。
Native 处理 Dart 的指令
IOService_NewServicePort
SendPort
是由_newServicePort()
方法创建的,这是一个external
方法,他的 native 层实现名称是IOService_NewServicePort
:
// -> runtime\bin\io_service.cc
void FUNCTION_NAME(IOService_NewServicePort)(Dart_NativeArguments args) {
Dart_SetReturnValue(args, Dart_Null());
// 创建一个新的 native port
Dart_Port service_port = **IOService::GetServicePort();**
if (service_port != ILLEGAL_PORT) {
// 【注意】这里根据 service_port 创建了 Dart 里面的 SendPort 对象
// Return a send port for the service port.
Dart_Handle send_port = Dart_NewSendPort(service_port);
// 将当前 IOService 对应的 send_port 返回给调用方
Dart_SetReturnValue(args, send_port);
}
}
Dart_Port IOService::GetServicePort() {
// 注意这里的参数
// 分别是 native port 的名称,收到 native port 以后得回调方法,是否同时处理
return **Dart_NewNativePort("IOService", IOServiceCallback, true);**
}
// -> runtime\include\dart_api.h
/**
* Returns a new SendPort with the provided port id.
*
* \param port_id The destination port.
*
* \return A new SendPort if no errors occurs. Otherwise returns
* an error handle.
*/
DART_EXPORT Dart_Handle Dart_NewSendPort(Dart_Port port_id);
注意,在 Dart 层的_IOService
的SendPort _newServicePort()
方法最后再这里调用了IOService_NewServicePort
。
这里主要有 3 个步骤:
- 使用
Dart_NewNativePort("IOService", IOServiceCallback, true);
创建Dart_Port
- 使用
Dart_NewSendPort
将Dart_Port
转化为Dart_Handle
(也就是 Dart 中的SendPort
) - 返回上面创建好的
Dart_Handle
,Dart 代码拿到返回的 Dart_Handle 也就是SendPort servicePort
之后,就可以和 native 层的 IO Service 同通信。
接下来我们看一下前 2 步分别是怎么实现的:
Dart_NewNativePort
再看一下Dart_NewNativePort
的调用参数:
Dart_NewNativePort("IOService", IOServiceCallback, true);
// -> runtime\include\dart_native_api.h
// Creates a new native port. When messages are received on this
// native port, then they will be dispatched to the provided native
// message handler.
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently);
IOServiceCallback
Dart_NewNativePort
总共有 3 个参数,Dart_NativeMessageHandler handler
是当这个Dart_Port
收到消息的时候,会被回调的方法,也就是我们通过 Dart 端的_IOService.dispatch
方法的**servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
语句执行向 native 发送 IO 指令时,在 native 这里真正负责执行的方法:
// -> runtime\bin\io_service.cc
void IOServiceCallback(Dart_Port dest_port_id, Dart_CObject* message) {
Dart_Port reply_port_id = ILLEGAL_PORT;
CObject* response = CObject::IllegalArgumentError();
CObjectArray request(message);
// 这里的参数顺序,与 Dart 层的_IOService(sdk\lib\_internal\vm\bin\io_service_patch.dart) 的_dispatch() 中的
// **servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
// 代码中的参数顺序一致
if ((message->type == Dart_CObject_kArray) && (request.Length() == 4) &&
request[0]->IsInt32() && request[1]->IsSendPort() &&
request[2]->IsInt32() && request[3]->IsArray()) {
CObjectInt32 message_id(request[0]);
CObjectSendPort **reply_port**(request[1]);
CObjectInt32 request_id(request[2]);
CObjectArray data(request[3]);
**reply_port_id** = **reply_port**.Value();
// 这里解析完收到的参数后,回去执行对应的文件操作
switch (request_id.Value()) {
**IO_SERVICE_REQUEST_LIST(CASE_REQUEST);**
default:
UNREACHABLE();
}
}
CObjectArray result(CObject::NewArray(2));
result.SetAt(0, request[0]);
// response 在上面的 IO_SERVICE_REQUEST_LIST 执行完毕后就会被赋值
result.SetAt(1, **response**);
ASSERT(reply_port_id != ILLEGAL_PORT);
**Dart_PostCObject(reply_port_id, result.AsApiCObject());**
}
#define **CASE_REQUEST**(type, method, id) \
case IOService::k##type##method##Request: \
response = type::method##Request(data); \
break;
IOService 具体的执行是在IO_SERVICE_REQUEST_LIST
根据解析到的参数执行对应的方法:
// -> runtime\bin\io_service.h
// This list must be kept in sync with the list in sdk/lib/io/io_service.dart
#define IO_SERVICE_REQUEST_LIST(V) \
V(File, Exists, 0) \
V(File, Create, 1) \
V(File, Delete, 2) \
V(File, Rename, 3) \
V(File, Copy, 4) \
V(File, Open, 5) \
V(File, ResolveSymbolicLinks, 6) \
V(File, Close, 7) \
V(File, Position, 8) \
V(File, SetPosition, 9) \
V(File, Truncate, 10) \
V(File, Length, 11) \
V(File, LengthFromPath, 12) \
V(File, LastAccessed, 13) \
V(File, SetLastAccessed, 14) \
V(File, LastModified, 15) \
V(File, SetLastModified, 16) \
V(File, Flush, 17) \
V(File, ReadByte, 18) \
V(File, WriteByte, 19) \
V(File, Read, 20) \
V(File, ReadInto, 21) \
V(File, WriteFrom, 22) \
V(File, CreateLink, 23) \
V(File, DeleteLink, 24) \
V(File, RenameLink, 25) \
V(File, LinkTarget, 26) \
V(File, Type, 27) \
V(File, Identical, 28) \
V(File, Stat, 29) \
V(File, Lock, 30) \
V(Socket, Lookup, 31) \
V(Socket, ListInterfaces, 32) \
V(Socket, ReverseLookup, 33) \
V(Directory, Create, 34) \
V(Directory, Delete, 35) \
V(Directory, Exists, 36) \
V(Directory, CreateTemp, 37) \
V(Directory, ListStart, 38) \
V(Directory, ListNext, 39) \
V(Directory, ListStop, 40) \
V(Directory, Rename, 41) \
V(SSLFilter, ProcessFilter, 42)
通过上述代码,可以得知,IOService 主要处理的方法有四类:
File
Directory
Socket
SSLFilter
在IOServiceCallback
方法中,我们注意到,程序最后执行的结果是通过Dart_PostCObject
返回的,来看一下他是怎么实现的:
// -> runtime\vm\native_api_impl.cc
static bool PostCObjectHelper(Dart_Port port_id, Dart_CObject* message) {
AllocOnlyStackZone zone;
std::unique_ptr<Message> msg = WriteApiMessage(
zone.GetZone(), message, port_id, Message::kNormalPriority);
if (msg == nullptr) {
return false;
}
// Post the message at the given port.
return **PortMap::PostMessage(std::move(msg));**
}
DART_EXPORT bool Dart_PostCObject(Dart_Port port_id, Dart_CObject* message) {
return PostCObjectHelper(port_id, message);
}
// -> runtime\vm\port.cc
bool PortMap::PostMessage(std::unique_ptr<Message> message,
bool before_events) {
MutexLocker ml(mutex_);
if (ports_ == nullptr) {
return false;
}
auto it = ports_->TryLookup(message->dest_port());
if (it == ports_->end()) {
// Ownership of external data remains with the poster.
message->DropFinalizers();
return false;
}
MessageHandler* handler = (*it).handler;
ASSERT(handler != nullptr);
**handler->PostMessage(std::move(message), before_events);**
return true;
}
// -> runtime\vm\message_handler.cc
void MessageHandler::PostMessage(std::unique_ptr<Message> message,
bool before_events) {
Message::Priority saved_priority;
{
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
Isolate* source_isolate = Isolate::Current();
if (source_isolate != nullptr) {
OS::PrintErr(
"[>] Posting message:\n"
"\tlen: %" Pd "\n\tsource: (%" Pd64
") %s\n\tdest: %s\n"
"\tdest_port: %" Pd64 "\n",
message->Size(), static_cast<int64_t>(source_isolate->main_port()),
source_isolate->name(), name(), message->dest_port());
} else {
OS::PrintErr(
"[>] Posting message:\n"
"\tlen: %" Pd
"\n\tsource: <native code>\n"
"\tdest: %s\n"
"\tdest_port: %" Pd64 "\n",
message->Size(), name(), message->dest_port());
}
}
saved_priority = message->priority();
// **将 Message 加入到 MessageQueue 中**
if (message->IsOOB()) {
oob_queue_->Enqueue(std::move(message), before_events);
} else {
queue_->Enqueue(std::move(message), before_events);
}
if (paused_for_messages_) {
ml.Notify();
}
if (pool_ != nullptr && !task_running_) {
ASSERT(!delete_me_);
task_running_ = true;
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
ASSERT(launched_successfully);
}
}
// Invoke any custom message notification.
MessageNotify(saved_priority);
}
上述代码最后将结果包装成了 Message 打包进MessageHandler
的消息队列中,这样便可以在 Dart 端通过消息分发接收到结果。
Dart_NewNativePort
再来看一下Dart_NewNativePort
的实现如下:
// -> runtime\vm\native_api_impl.cc
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently) {
if (name == NULL) {
name = "<UnnamedNativePort>";
}
if (handler == NULL) {
OS::PrintErr("%s expects argument 'handler' to be non-null.\n",
CURRENT_FUNC);
return ILLEGAL_PORT;
}
// 此方法位于 sdk/runtime/vm/dart.cc
// Used to Indicate that a Dart API call is active.
if (!Dart::SetActiveApiCall()) {
return ILLEGAL_PORT;
}
// 【注意,这里切换了 isolate,退出当前 isolate,直到 Dart_NewNativePort 执行完毕再切换回当前 isolate】
// Start the native port without a current isolate.
// 这里的实现可以参考 https://github.com/dart-lang/sdk/blob/d437877c500c77d6e08372ba2dbda9c598f5bd8e/runtime/vm/dart_api_impl.cc
**IsolateLeaveScope saver(Isolate::Current());**
// 执行完 IsolateLeaveScope 后,会切换出当前 isolate 直到下面的 return port_id;执行完毕,但是在此期间,下面的代码依旧是在当前 isolate 所在的 IOThread 也即系统线程下进行的
NativeMessageHandler* nmh = **new NativeMessageHandler(name, handler);**
// 创建一个 Dart_Port 并且添加到 PortMap 中
Dart_Port port_id = **PortMap::CreatePort(nmh);**
if (port_id != ILLEGAL_PORT) {
// 激活这个端口
PortMap::SetPortState(port_id, PortMap::kLivePort);
// 在 Dart 线程池中执行,在这里 Run() 中的代码会在一个新的线程中执行
if (!**nmh->Run**(**Dart::thread_pool()**, NULL, NULL, 0)) {
// 执行完毕之后,在之前调用本方法的环境,回调 handler,关闭 Dart_Port
PortMap::ClosePort(port_id);
port_id = ILLEGAL_PORT;
}
}
Dart::ResetActiveApiCall();
return port_id;
// 上面 IsolateLeaveScope saver 对象在构造方法中退出了调用方法的分支,执行到这里后 saver 对象被回收,执行析构函数,又将 Isolate 切换回来
}
主要的流程有:
- 切换退出当前 isolate
- 创建
NativeMessageHandler nmh
包裹要处理的回调 - 根据上面创建的
nmh
创建Dart_Port port_id
- 执行**
nmh->Run()
**方法将nmh
放到线程池中运行 - 当
nmh
执行完毕回调后,关闭Dart_Port port_id
也就是说,在 Dart 中向 Native 发送指令时,通过 Dart 的_IOService._dispatch()
方法中执行_servicePorts._getPort(id);
向 Native 层的 IOService 获取用于通信的SendPort servicePort
时,会先通过 Dart_NewNativePort 创建一个NativeMessageHandler(会压入消息栈中),然后创建一个对应的Dart_Port port_id
并返回给 Dart 用来触发消息。
让我们挨个分析一下:
1.退出当前 isolate
2.创建NativeMessageHandler nmh
包裹要处理的回调
3.根据上面创建的nmh
创建Dart_Port port_id
看一下PortMap::CreatePort
的实现:
// -> runtime\vm\port.cc
Dart_Port PortMap::CreatePort(MessageHandler* handler) {
ASSERT(handler != NULL);
MutexLocker ml(mutex_);
if (ports_ == nullptr) {
return ILLEGAL_PORT;
}
#if defined(DEBUG)
handler->CheckAccess();
#endif
// 不停的遍历,直到找到一个可用的 port(类型为 int64_t)
const Dart_Port port = AllocatePort();
// 获取到的 port 只能通过 isolate_entry 访问
// The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
// by the [PortMap::mutex_] we already hold.
MessageHandler::PortSetEntry isolate_entry;
isolate_entry.port = port;
handler->ports_.Insert(isolate_entry);
Entry entry;
entry.port = port;
entry.handler = handler;
entry.state = kNewPort;
ports_->Insert(entry);
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Opening port: \n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
handler->name(), entry.port);
}
return entry.port;
}
Dart_Port PortMap::AllocatePort() {
Dart_Port result;
ASSERT(mutex_->IsOwnedByCurrentThread());
// Keep getting new values while we have an illegal port number or the port
// number is already in use.
do {
// Ensure port ids are representable in JavaScript for the benefit of
// vm-service clients such as Observatory.
const Dart_Port kMask1 = 0xFFFFFFFFFFFFF;
// Ensure port ids are never valid object pointers so that reinterpreting
// an object pointer as a port id never produces a used port id.
const Dart_Port kMask2 = 0x3;
result = (prng_->NextUInt64() & kMask1) | kMask2;
// The two special marker ports are used for the hashset implementation and
// cannot be used as actual ports.
if (result == PortSet<Entry>::kFreePort ||
result == PortSet<Entry>::kDeletedPort) {
continue;
}
ASSERT(!static_cast<ObjectPtr>(static_cast<uword>(result))->IsWellFormed());
} while (ports_->Contains(result));
ASSERT(result != 0);
ASSERT(!ports_->Contains(result));
return result;
}
4.执行**nmh->Run()
**方法将nmh
放到线程池中运行
// -> runtime\vm\message_handler.cc
ThreadPool* pool_;
bool MessageHandler::Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data) {
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Starting message handler:\n"
"\thandler: %s\n",
name());
}
ASSERT(pool_ == NULL);
ASSERT(!delete_me_);
pool_ = pool;
start_callback_ = start_callback;
end_callback_ = end_callback;
callback_data_ = data;
task_running_ = true;
// 在 Dart VM Thread 的线程池中执行 MessageHandler,会是一个新的线程
bool result = pool_->Run<MessageHandlerTask>(this);
if (!result) {
pool_ = nullptr;
start_callback_ = nullptr;
end_callback_ = nullptr;
callback_data_ = 0;
task_running_ = false;
}
return result;
}
// 会在“线程池”运行的时候执行对应的 MessageHandler 回调
class MessageHandlerTask : public ThreadPool::Task {
public:
explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
ASSERT(handler != NULL);
}
virtual void Run() {
ASSERT(handler_ != NULL);
// 执行具体的逻辑
handler_->TaskCallback();
}
private:
MessageHandler* handler_;
DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
};
// -> runtime\include\dart_api.h
// A port is used to send or receive inter-isolate messages
typedef int64_t Dart_Port;
// -> runtime\vm\thread_pool.h
// Runs a task on the thread pool.
template <typename T, typename... Args>
bool Run(Args&&... args) {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}
// -> runtime\vm\thread_pool.cc
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* new_worker = nullptr;
{
MonitorLocker ml(&pool_monitor_);
if (shutting_down_) {
return false;
}
// 创建新的 Worker
new_worker = ScheduleTaskLocked(&ml, std::move(task));
}
if (new_worker != nullptr) {
// 在线程中执行 task
new_worker->StartThread();
}
return true;
}
// -> runtime\vm\thread_pool.cc
// 创建一个 Worker
ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
std::unique_ptr<Task> task) {
// Enqueue the new task.
tasks_.Append(task.release());
pending_tasks_++;
ASSERT(pending_tasks_ >= 1);
// Notify existing idle worker (if available).
if (count_idle_ >= pending_tasks_) {
ASSERT(!idle_workers_.IsEmpty());
ml->Notify();
return nullptr;
}
// If we have maxed out the number of threads running, we will not start a
// new one.
if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
if (!idle_workers_.IsEmpty()) {
ml->Notify();
}
return nullptr;
}
// Otherwise start a new worker.
auto new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
return new_worker;
}
// 新建的 Woker 和 ThreadPool 绑定
ThreadPool::Worker::Worker(ThreadPool* pool)
: pool_(pool), join_id_(OSThread::kInvalidThreadJoinId) {}
// new_worker->StartThread();会调用下面的方法
void ThreadPool::Worker::StartThread() {
int result = OSThread::Start("DartWorker", &Worker::Main,
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Could not start worker thread: result = %d.", result);
}
}
// OSThread::Start 每个端不一样,我们选择 Android 端的实现
// -> runtime\vm\os_thread_android.cc
int OSThread::Start(const char* name,
ThreadStartFunction function,
uword parameter) {
pthread_attr_t attr;
int result = pthread_attr_init(&attr);
RETURN_ON_PTHREAD_FAILURE(result);
result = pthread_attr_setstacksize(&attr, OSThread::GetMaxStackSize());
RETURN_ON_PTHREAD_FAILURE(result);
ThreadStartData* data = new ThreadStartData(name, function, parameter);
// 声明系统线程类型
pthread_t tid;
// 调用系统创建线程的函数 https://blog.csdn.net/liangxanhai/article/details/7767430
// pthread_create 参数含义:1. &tid 指向线程的指针,2. &attr 新建线程的属性 3. ThreadStart 线程要执行的方法指针 4. data 传给参数 ThreadStart 的参数
// 成功执行线程则返回 0
result = pthread_create(&tid, &attr, ThreadStart, data);
RETURN_ON_PTHREAD_FAILURE(result);
result = pthread_attr_destroy(&attr);
RETURN_ON_PTHREAD_FAILURE(result);
return 0;
}
// Dispatch to the thread start function provided by the caller. This trampoline
// is used to ensure that the thread is properly destroyed if the thread just
// exits.
static void* ThreadStart(void* data_ptr) {
if (FLAG_worker_thread_priority != kMinInt) {
if (setpriority(PRIO_PROCESS, gettid(), FLAG_worker_thread_priority) ==
-1) {
FATAL2("Setting thread priority to %d failed: errno = %d\n",
FLAG_worker_thread_priority, errno);
}
}
ThreadStartData* data = reinterpret_cast<ThreadStartData*>(data_ptr);
const char* name = data->name();
OSThread::ThreadStartFunction function = data->function();
uword parameter = data->parameter();
delete data;
// Set the thread name. There is 16 bytes limit on the name (including \0).
// pthread_setname_np ignores names that are too long rather than truncating.
char truncated_name[16];
snprintf(truncated_name, ARRAY_SIZE(truncated_name), "%s", name);
pthread_setname_np(pthread_self(), truncated_name);
// 创建一个系统线程的包装类 OSThread 和新建的系统线程绑定
// Create new OSThread object and set as TLS for new thread.
OSThread* thread = OSThread::CreateOSThread();
if (thread != NULL) {
// 将线程切换到新创建的系统线程
OSThread::SetCurrent(thread);
thread->set_name(name);
UnblockSIGPROF();
// Call the supplied thread start function handing it its parameters.
// 执行创建 ThreadStartData 时传入的方法,也就是 ThreadPool::Worker::Main(uword args)
function(parameter);
}
return NULL;
OSThread* OSThread::CreateOSThread() {
ASSERT(thread_list_lock_ != NULL);
MutexLocker ml(thread_list_lock_);
if (!creation_enabled_) {
return NULL;
}
OSThread* os_thread = new OSThread();
AddThreadToListLocked(os_thread);
return os_thread;
}
在创建了新的系统线程后,会执行下面的方法:
// -> runtime\vm\thread_pool.cc
void ThreadPool::Worker::Main(uword args) {
// Call the thread start hook here to notify the embedder that the
// thread pool thread has started.
Dart_ThreadStartCallback start_cb = Dart::thread_start_callback();
if (start_cb != nullptr) {
start_cb();
}
OSThread* os_thread = OSThread::Current();
ASSERT(os_thread != nullptr);
Worker* worker = reinterpret_cast<Worker*>(args);
ThreadPool* pool = worker->pool_;
// 将 Worker 和系统线程绑定
os_thread->owning_thread_pool_worker_ = worker;
worker->os_thread_ = os_thread;
// Once the worker quits it needs to be joined.
worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);
#if defined(DEBUG)
{
MonitorLocker ml(&pool->pool_monitor_);
ASSERT(pool->idle_workers_.ContainsForDebugging(worker));
}
#endif
pool->WorkerLoop(worker);
worker->os_thread_ = nullptr;
os_thread->owning_thread_pool_worker_ = nullptr;
// Call the thread exit hook here to notify the embedder that the
// thread pool thread is exiting.
Dart_ThreadExitCallback exit_cb = Dart::thread_exit_callback();
if (exit_cb != nullptr) {
exit_cb();
}
}
// -> runtime\vm\os_thread.h
// OSThread
// The ThreadPool::Worker which owns this OSThread. If this OSThread was not
// started by a ThreadPool it will be nullptr. This TLS value is not
// protected and should only be read/written by the OSThread itself.
void* owning_thread_pool_worker_ = nullptr;
// thread_list_lock_ cannot have a static lifetime because the order in which
// destructors run is undefined. At the moment this lock cannot be deleted
// either since otherwise, if a thread only begins to run after we have
// started to run TLS destructors for a call to exit(), there will be a race
// on its deletion in CreateOSThread().
static Mutex* thread_list_lock_;
Dart_NewSendPort
看一下Dart_NewSendPort
如何将创建好的Dart_Port service_port
转变为 Dart 的SendPort
的:
// -> runtime\vm\dart_api_impl.cc
DART_EXPORT Dart_Handle Dart_NewSendPort(Dart_Port port_id) {
DARTSCOPE(Thread::Current());
CHECK_CALLBACK_STATE(T);
if (port_id == ILLEGAL_PORT) {
return Api::NewError("%s: illegal port_id %" Pd64 ".", CURRENT_FUNC,
port_id);
}
return Api::NewHandle(T, SendPort::New(port_id));
}
// -> runtime\vm\object.cc
SendPortPtr SendPort::New(Dart_Port id, Heap::Space space) {
return New(id, Isolate::Current()->origin_id(), space);
}
SendPortPtr SendPort::New(Dart_Port id,
Dart_Port origin_id,
Heap::Space space) {
ASSERT(id != ILLEGAL_PORT);
// 创建新的 SendPort 并将 Dart_Port id 和当前的 isolate id 与之绑定
SendPort& result = SendPort::Handle();
{
ObjectPtr raw =
Object::Allocate(SendPort::kClassId, SendPort::InstanceSize(), space,
SendPort::ContainsCompressedPointers());
NoSafepointScope no_safepoint;
result ^= raw;
result.StoreNonPointer(&result.untag()->id_, id);
result.StoreNonPointer(&result.untag()->origin_id_, origin_id);
}
return result.ptr();
}
到这里我们发现,Dart_NewNativePort
将要处理的事件handler
封装起来,最后在非当前 isolate的线程中执行。
结论
从上面的分析中,我们可以知道,在 Dart 中通过 File 进行文件操作,其实是通过 Dart 中的_IOService 进行消息中转,将用户的 IO 指令发送到 Native 层的 IOService 中;
IOService 通过一些列操作,得到一个SendPort servicePort
,与此同时对应的 IO 操作已经压入消息栈中等待触发在单独的线程中执行;
之后在_IOService 中servicePort
将用户需要的 IO 操作和与自己通信的_replyToPort = _receivePort!.sendPort;
通过send
方法触发IOServiceCallback
执行对应的 IO 操作,并且在最后调用Dart_PostCObject
方法将结果压入消息栈中,这会触发 Dart 层_IOService 的_receivePort!.handler 回调事件,然后根据事件失败或者成功,使用 Completer 通过 Event loop 一步步将事件上报,最终回调用户需要的命令。