跳至主要內容

Dart 读取文件过程分析

JI,XIAOYONG...大约 21 分钟

Dart 读取文件时,先在 Dart 代码创建 File 引用,通过与IOServiceIsolate通信(先通过 IO Service 而发送请求到 native 端,等到 native 执行完操作之后再回调结果)从而实现对文件的读写。

实现一个简单的读取文件的代码如下:

import 'dart:io';

main() {
  var filePath =
      r"G:/21996.1.210529-1541.co_release_CLIENT_CONSUMER_x64FRE_en-us.iso";

  var file = File(filePath);

  var startTime = printCurrentTimeMs("start run file.readAsBytes");
  file.readAsBytes().then((value) {
    printCurrentTimeMs("file.readAsBytes() finish",
        lastTimeMs: startTime,
        suffix: "\nfile.readAsBytes() result:${value.length}");
  });
  printCurrentTimeMs("finish run file.readAsBytes");
}

int printCurrentTimeMs(String prefix, {String? suffix, int? lastTimeMs}) {
  var currentTimeMs = DateTime.now().millisecondsSinceEpoch;
  var timeElapseString =
      lastTimeMs == null ? "" : ", time elapse:${currentTimeMs - lastTimeMs}ms ";
  print(
      "$prefix current time($currentTimeMs)$timeElapseString${suffix ?? ""}");
  return currentTimeMs;
}

整个过程如下:

过程分析

Dart 端发起文件读写请求

其中file.readAsBytes() 是具体执行读取文件的地方,他的定义如下:

// -> sdk\lib\io\file_impl.dart
Future<Uint8List> readAsBytes();

在我们创建File时,实际上创建的是_Fileclass _File extends FileSystemEntity implements File)对象:

// -> sdk\lib\io\file_impl.dart

// abstract class File implements FileSystemEntity
("vm:entry-point")
  factory File(String path) {
    final IOOverrides? overrides = IOOverrides.current;
    if (overrides == null) {
      return new _File(path);
    }
    return overrides.createFile(path);
  }

_FileFile 的实现类,所以file.readAsBytes()实际调用的是_File 实现的方法:

// -> sdk\lib\io\file_impl.dart

// Read the file in blocks of size 64k.
const int _blockSize = 64 * 1024;

class _File extends FileSystemEntity implements File {

Future<Uint8List> readAsBytes() {
    Future<Uint8List> readDataChunked(RandomAccessFile file) {
			// 分段读取文件,每次只读取_blockSize 大小的内容
      var builder = new BytesBuilder(copy: false);
      var completer = new Completer<Uint8List>();
      void read() {
				// 每次只异步读取一部分文本
        file.read(_blockSize).then((data) {
          if (data.length > 0) {
            builder.add(data);
            read();
          } else {
            completer.complete(builder.takeBytes());
          }
        }, onError: completer.completeError);
      }

      read();
      return completer.future;
    }

    return open().then((file) {
      return file.length().then((length) {
        if (length == 0) {
          // May be character device, try to read it in chunks.
          return readDataChunked(file);
        }
        return file.read(length);
      }).whenComplete(file.close);
    });
  }

}

可以看到,无论是普通的文件格式,还是 character device,最后都是调用了_RandomAccessFileopen()read(int bytes)方法异步读取文件。

设备文件分为 Block Device Driver 和 Character Device Drive 两类。
Character Device Driver 又被称为字符设备或裸设备 raw devices; Block Device Driver 通常成为块设备。
而 Block Device Driver 是以固定大小长度来传送转移资料;Character Device Driver 是以不定长度的字元传送资料。 https://www.cnblogs.com/qlee/archive/2011/07/27/2118406.html#:~:text=Characteropen in new window

// -> flutter\bin\cache\pkg\sky_engine\lib\io\file_impl.dart

class _RandomAccessFile implements RandomAccessFile {
final String path;

  bool _asyncDispatched = false;
	// 读取文件的信息
  late _FileResourceInfo _resourceInfo;
	// 对文件的操作引用
  _RandomAccessFileOps _ops;

  ("vm:entry-point")
  _RandomAccessFile(int pointer, this.path)
      : _ops = new _RandomAccessFileOps(pointer) {
    _resourceInfo = new _FileResourceInfo(this);
    _maybeConnectHandler();
  }

	// 异步读取文件
	Future<Uint8List> read(int bytes) {
    // TODO(40614): Remove once non-nullability is sound.
    ArgumentError.checkNotNull(bytes, "bytes");
			// 异步读取文件,实际上是将发送指令到 IO Service,然后等待返回结果
    return _dispatch(_IOService.fileRead, [null, bytes]).then((response) {
      if (_isErrorResponse(response)) {
        throw _exceptionFromResponse(response, "read failed", path);
      }
      _resourceInfo.addRead(response[1].length);
			// 读取的文件内容
      Uint8List result = response[1];
      return result;
    });
  }

	// 同步读取文件
  Uint8List readSync(int bytes) {
    // TODO(40614): Remove once non-nullability is sound.
    ArgumentError.checkNotNull(bytes, "bytes");
    _checkAvailable();
		// 同步读取文件是对文件直接操作
    var result = _ops.read(bytes);
    if (result is OSError) {
      throw new FileSystemException("readSync failed", path, result);
    }
    _resourceInfo.addRead(result.length);
    return result;
  }

Future<RandomAccessFile> open({FileMode mode = FileMode.read}) {
// FileMode  https://github.com/dart-lang/sdk/blob/main/sdk/lib/io/io_service.dart
    if (mode != FileMode.read &&
        mode != FileMode.write &&
        mode != FileMode.append &&
        mode != FileMode.writeOnly &&
        mode != FileMode.writeOnlyAppend) {
      return new Future.error(
          new ArgumentError('Invalid file mode for this operation'));
    }
    return _dispatchWithNamespace(
				// 请求操作为“打开文件”,参数为:null,文件路径,操作文件的 mode
        _IOService.fileOpen, [null, _rawPath, mode._mode]).then((response) {
      if (_isErrorResponse(response)) {
        throw _exceptionFromResponse(response, "Cannot open file", path);
      }
			// 从 IO Service 那里异步获得文件句柄 response 和 path
      return new _RandomAccessFile(response, path);
    });
  }

}

_RandomAccessFile中,除了同步读写文件是对返回的文件引用直接操作外,很多操作都能看到通过_dispatch()方法与IO Service通信,让我们看一下这个方法的实现:

// -> sdk\lib\io\file_impl.dart

// _RandomAccessFile
Future _dispatch(int request, List data, {bool markClosed = false}) {
    if (closed) {
      return new Future.error(new FileSystemException("File closed", path));
    }
    if (_asyncDispatched) {
      var msg = "An async operation is currently pending";
      return new Future.error(new FileSystemException(msg, path));
    }
    if (markClosed) {
      // Set closed to true to ensure that no more async requests can be issued
      // for this file.
      closed = true;
    }
    _asyncDispatched = true;
    data[0] = _pointer();
		// 主要代码在这里,通过_IOService 的_dispatch 发送指令
    return **_IOService._dispatch**(request, data).whenComplete(() {
      _asyncDispatched = false;
    });
  }

 // open create 之类的操作会调用这个方法,不过最后也是调用_IOService._dispatch(request, data) 通信
static Future _dispatchWithNamespace(int request, List data) {
    data[0] = _namespacePointer();
		// 与 IO Service 进行异步通信,request 标记请求操作的类型,data 则是数据
    return **_IOService._dispatch**(request, data);
  }

查阅_IOServiceopen in new window的源码后发现这是个external 方法。

external static Future _dispatch(int request, List data);

An external function is connected to its body by an implementation-specific mechanism. Attempting to invoke an external function that has not been connected to its body will throw a NoSuchMethodError or some subclass thereof.
****https://github.com/dart-lang/sdk/issues/4300open in new window

根据external的定义,_dispatch方法在不同的机器上面实现不同。我们只看和 app 相关的实现(在sdk\lib\_internal\vm目录下,vm 同级目录还有 js 等实现),具体的实现如下:

// -> sdk\lib\_internal\vm\bin\io_service_patch.dart

// _IOService
class _IOService {
	// 用于向 IO Service 发送消息
  static _IOServicePorts _servicePorts = new _IOServicePorts();
// We use a static variable here to hold onto the last result of
  // calling the IO Service frome the native.
  static RawReceivePort? _receivePort;
  // the other side(other isolate) will send message back with the _replyToPort
  static late SendPort _replyToPort;
  // a map holding the registered callbacks for each received message.
  static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
  static int _id = 0;

  /// [request] IO 操作的类型,具体值在 [sdk/lib/io/io_service.dart] 中的_IOService 类中定义
  ///           主要有对文件、目录、网络进行操作的请求
  /// [data]    对应的数据,如果是文件,则是文件路径,如果是目录,则是目录路径等等
  
  static Future _dispatch(int request, List data) {
    int id;
    do {
		// create a special id to identify the request.
      id = _getNextId();
    } while (_messageMap.containsKey(id));
		// 通过_servicePorts 获取一个新的 SendPort 以便向 IOService 发送消息,
		// 这个 SendPort 是 IO Service 返回给 dart 用来向他发消息的
    final SendPort servicePort = _servicePorts._getPort(id);
    _ensureInitialize();
    final Completer completer = new Completer();
    _messageMap[id] = completer;
    try {
			// 向 IOService 发送消息,当 request 执行完毕之后,
			// 会调用_replyToPort 触发在 root zone 的回调_receivePort!.handler
      **servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
    } catch (error) {
      _messageMap.remove(id)!.complete(error);
      if (_messageMap.length == 0) {
        _finalize();
      }
    }
    return completer.future;
  }

  static void _ensureInitialize() {
    if (_receivePort == null) {
      _receivePort = new RawReceivePort(null, 'IO Service');
			// 其他地方可以使用_replyToPort 来发消息触发_receivePort 执行 handler 方法
      _replyToPort = _receivePort!.sendPort;
      _receivePort!.handler = (data) {
			// 在这里处理 IOService 执行完方法返回的数据
        assert(data is List && data.length == 2);
				// data[0] 就是我们在_dispatch 方法中获取的 id,
        // 将处理结果 data[1] 通过 Completer.complete 返回
        _messageMap.remove(data[0])!.complete(data[1]);
				// 释放这个触发这个回调的 SendPort
        _servicePorts._returnPort(data[0]);
        if (_messageMap.length == 0) {
          _finalize();
        }
      };
    }
  }
...
}

可以看到,最后是通过RawReceivePort/SendPort进行跨 Isolate 通信

_IOService使用_servicePorts对 native 层发送消息触发 IO 操作,然后使用_receivePort监听,当 IO 操作完成时会通过_replyToPort 回调结果,会在 _receivePort!.handler方法中根据当时请求的id找到Completer将结果传递回去。

这样当时我们在 file.readAsBytes()时获取到的Future便会收到回调,从而完成文件操作的流程。

  file.readAsBytes().then((value) {
    printCurrentTimeMs("file.readAsBytes() finish",
        lastTimeMs: startTime,
        suffix: "\nfile.readAsBytes() result:${value.length}");
  });

下面是到目前为止涉及到的类关系示意图:

IO Service 中转

那么,这个 IO Service 是做什么的,他又是如何实现与 dart 中的调用方双向通信,以及执行调用方需要的功能呢?

位于sdk\lib\_internal\vm\bin\io_service_patch.dart的_IOService 是一个中转站,向上承接来自 Dart 代码的 IO 请求指令(先行返回 Future),向下将这些指令转发至 Native 层的 IO Service,并监听回调,当 native 层处理完这些 IO 指令之后,将结果通过 Future 返回给 Dart 调用方。

让我们再看一下他的具体实现:

// -> sdk\lib\_internal\vm\bin\io_service_patch.dart

// _IOService
class _IOService {
	// 用于向 IO Service 发送消息
  static _IOServicePorts _servicePorts = new _IOServicePorts();
// We use a static variable here to hold onto the last result of
  // calling the IO Service frome the native.
  static RawReceivePort? _receivePort;
  // the other side(other isolate) will send message back with the _replyToPort
  static late SendPort _replyToPort;
  // a map holding the registered callbacks for each received message.
  static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
  static int _id = 0;

  /// [request] IO 操作的类型,具体值在 [sdk/lib/io/io_service.dart] 中的_IOService 类中定义
  ///           主要有对文件、目录、网络进行操作的请求
  /// [data]    对应的数据,如果是文件,则是文件路径,如果是目录,则是目录路径等等
  
  static Future _dispatch(int request, List data) {
    int id;
    do {
		// create a special id to identify the request.
      id = _getNextId();
    } while (_messageMap.containsKey(id));
		// 通过_servicePorts 获取一个新的 SendPort 以便向 IOService 发送消息,
		// 这个 SendPort 是 IO Service 返回给 dart 用来向他发消息的
    final SendPort **servicePort** = _servicePorts.**_getPort(id);**
    _ensureInitialize();
    final Completer completer = new Completer();
    _messageMap[id] = completer;
    try {
			// 向 IOService 发送消息,当 request 执行完毕之后,
			// 会调用_replyToPort 触发在 root zone 的回调_receivePort!.handler
      **servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
    } catch (error) {
      _messageMap.remove(id)!.complete(error);
      if (_messageMap.length == 0) {
        _finalize();
      }
    }
    return completer.future;
  }

  static void _ensureInitialize() {
    if (_receivePort == null) {
      _receivePort = new RawReceivePort(null, 'IO Service');
			// 其他地方可以使用_replyToPort 来发消息触发_receivePort 执行 handler 方法
      _replyToPort = _receivePort!.sendPort;
      _receivePort!.handler = (data) {
			// 在这里处理 IOService 执行完方法返回的数据
        assert(data is List && data.length == 2);
				// data[0] 就是我们在_dispatch 方法中获取的 id,
        // 将处理结果 data[1] 通过 Completer.complete 返回
        _messageMap.remove(data[0])!.complete(data[1]);
				// 释放这个触发这个回调的 SendPort
        _servicePorts._returnPort(data[0]);
        if (_messageMap.length == 0) {
          _finalize();
        }
      };
    }
  }
...
}

可以看到:

  • _IOService持有_IOServicePorts _servicePorts以便获取SendPort servicePort和 native 层通信,
  • 在之前的代码分析中,我们已经知道_IOService还在_ensureInitialize()中监听着RawReceivePort? _receivePort的回调,
  • 这样当_IOService_dispatch()方法中将_replyToPort_receivePort的 SendPort)传递给servicePort后,一旦 native 通过_replyToPort发送处理结果,_IOService立马可以收到并通过Completer.complete返回给Dart 中的调用方

上述这些步骤能够实施的关键,在于Dart层的_IOService如何与native层的_IOService关联起来呢?

让我们来分析一下SendPort servicePort的获取过程:

// -> sdk\lib\_internal\vm\bin\io_service_patch.dart

class _IOService {
	// 用于向 IO Service 发送消息
	static _IOServicePorts _servicePorts = new _IOServicePorts();
...
}

class _IOServicePorts {
  // We limit the number of IO Service ports per isolate so that we don't
  // spawn too many threads all at once, which can crash the VM on Windows.
  static const int maxPorts = 32;
  List<SendPort> _ports = <SendPort>[];
  List<SendPort> _freePorts = <SendPort>[];
  Map<int, SendPort> _usedPorts = new HashMap<int, SendPort>();

  _IOServicePorts();

  SendPort _getPort(int forRequestId) {
    if (_freePorts.isEmpty && _usedPorts.length < maxPorts) {
			// 如果没有可用的 SendPort,就新建 SendPort 用于远程服务通信
      final SendPort port = **_newServicePort()**;
      _ports.add(port);
      _freePorts.add(port);
    }
    if (!_freePorts.isEmpty) {
      // 有空闲 SendPort,使用
      final SendPort port = _freePorts.removeLast();
      assert(!_usedPorts.containsKey(forRequestId));
      _usedPorts[forRequestId] = port;
      return port;
    }
    // We have already allocated the max number of ports. Re-use an
    // existing one.
    final SendPort port = _ports[forRequestId % maxPorts];
    _usedPorts[forRequestId] = port;
    return port;
  }

  // 释放掉占用的 port
  void _returnPort(int forRequestId) {
    final SendPort port = _usedPorts.remove(forRequestId)!;
    if (!_usedPorts.values.contains(port)) {
      _freePorts.add(port);
    }
  }

  ("vm:external-name", "IOService_NewServicePort")
  external static SendPort _newServicePort();
}

可以看到这里最后的关键方法是SendPort _newServicePort(),这是一个external方法,在 native 实现。

Native 处理 Dart 的指令

IOService_NewServicePort

SendPort是由_newServicePort()方法创建的,这是一个external方法,他的 native 层实现名称是IOService_NewServicePort

// -> runtime\bin\io_service.cc

void FUNCTION_NAME(IOService_NewServicePort)(Dart_NativeArguments args) {
  Dart_SetReturnValue(args, Dart_Null());
	// 创建一个新的 native port
  Dart_Port service_port = **IOService::GetServicePort();**
  if (service_port != ILLEGAL_PORT) {
		// 【注意】这里根据 service_port 创建了 Dart 里面的 SendPort 对象
    // Return a send port for the service port.
    Dart_Handle send_port = Dart_NewSendPort(service_port);
		// 将当前 IOService 对应的 send_port 返回给调用方
    Dart_SetReturnValue(args, send_port);
  }
}

Dart_Port IOService::GetServicePort() {
  // 注意这里的参数
  // 分别是 native port 的名称,收到 native port 以后得回调方法,是否同时处理
  return **Dart_NewNativePort("IOService", IOServiceCallback, true);**
}

// -> runtime\include\dart_api.h

/**
 * Returns a new SendPort with the provided port id.
 *
 * \param port_id The destination port.
 *
 * \return A new SendPort if no errors occurs. Otherwise returns
 *   an error handle.
 */
DART_EXPORT Dart_Handle Dart_NewSendPort(Dart_Port port_id);

注意,在 Dart 层的_IOServiceSendPort _newServicePort() 方法最后再这里调用了IOService_NewServicePort

这里主要有 3 个步骤:

  1. 使用Dart_NewNativePort("IOService", IOServiceCallback, true);创建Dart_Port
  2. 使用Dart_NewSendPortDart_Port转化为Dart_Handle(也就是 Dart 中的SendPort
  3. 返回上面创建好的Dart_Handle,Dart 代码拿到返回的 Dart_Handle 也就是SendPort servicePort之后,就可以和 native 层的 IO Service 同通信。

接下来我们看一下前 2 步分别是怎么实现的:

Dart_NewNativePort

再看一下Dart_NewNativePort的调用参数:

Dart_NewNativePort("IOService", IOServiceCallback, true);

// -> runtime\include\dart_native_api.h

//  Creates a new native port.  When messages are received on this
//  native port, then they will be dispatched to the provided native
//  message handler.
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
                                         Dart_NativeMessageHandler handler,
                                         bool handle_concurrently);

IOServiceCallback

Dart_NewNativePort总共有 3 个参数,Dart_NativeMessageHandler handler是当这个Dart_Port收到消息的时候,会被回调的方法,也就是我们通过 Dart 端的_IOService.dispatch方法的**servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);语句执行向 native 发送 IO 指令时,在 native 这里真正负责执行的方法:

// -> runtime\bin\io_service.cc

void IOServiceCallback(Dart_Port dest_port_id, Dart_CObject* message) {
  Dart_Port reply_port_id = ILLEGAL_PORT;
  CObject* response = CObject::IllegalArgumentError();
  CObjectArray request(message);
	// 这里的参数顺序,与 Dart 层的_IOService(sdk\lib\_internal\vm\bin\io_service_patch.dart) 的_dispatch() 中的
	// **servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
	// 代码中的参数顺序一致
  if ((message->type == Dart_CObject_kArray) && (request.Length() == 4) &&
      request[0]->IsInt32() && request[1]->IsSendPort() &&
      request[2]->IsInt32() && request[3]->IsArray()) {
    CObjectInt32 message_id(request[0]);
    CObjectSendPort **reply_port**(request[1]);
    CObjectInt32 request_id(request[2]);
    CObjectArray data(request[3]);
    **reply_port_id** = **reply_port**.Value();
    // 这里解析完收到的参数后,回去执行对应的文件操作
    switch (request_id.Value()) {
      **IO_SERVICE_REQUEST_LIST(CASE_REQUEST);**
      default:
        UNREACHABLE();
    }
  }

  CObjectArray result(CObject::NewArray(2));
  result.SetAt(0, request[0]);
	// response 在上面的 IO_SERVICE_REQUEST_LIST 执行完毕后就会被赋值
  result.SetAt(1, **response**);
  ASSERT(reply_port_id != ILLEGAL_PORT);
  **Dart_PostCObject(reply_port_id, result.AsApiCObject());**
}

#define **CASE_REQUEST**(type, method, id)                                         \
  case IOService::k##type##method##Request:                                    \
    response = type::method##Request(data);                                    \
    break;

IOService 具体的执行是在IO_SERVICE_REQUEST_LIST根据解析到的参数执行对应的方法:

// -> runtime\bin\io_service.h

// This list must be kept in sync with the list in sdk/lib/io/io_service.dart
#define IO_SERVICE_REQUEST_LIST(V)                                             \
  V(File, Exists, 0)                                                           \
  V(File, Create, 1)                                                           \
  V(File, Delete, 2)                                                           \
  V(File, Rename, 3)                                                           \
  V(File, Copy, 4)                                                             \
  V(File, Open, 5)                                                             \
  V(File, ResolveSymbolicLinks, 6)                                             \
  V(File, Close, 7)                                                            \
  V(File, Position, 8)                                                         \
  V(File, SetPosition, 9)                                                      \
  V(File, Truncate, 10)                                                        \
  V(File, Length, 11)                                                          \
  V(File, LengthFromPath, 12)                                                  \
  V(File, LastAccessed, 13)                                                    \
  V(File, SetLastAccessed, 14)                                                 \
  V(File, LastModified, 15)                                                    \
  V(File, SetLastModified, 16)                                                 \
  V(File, Flush, 17)                                                           \
  V(File, ReadByte, 18)                                                        \
  V(File, WriteByte, 19)                                                       \
  V(File, Read, 20)                                                            \
  V(File, ReadInto, 21)                                                        \
  V(File, WriteFrom, 22)                                                       \
  V(File, CreateLink, 23)                                                      \
  V(File, DeleteLink, 24)                                                      \
  V(File, RenameLink, 25)                                                      \
  V(File, LinkTarget, 26)                                                      \
  V(File, Type, 27)                                                            \
  V(File, Identical, 28)                                                       \
  V(File, Stat, 29)                                                            \
  V(File, Lock, 30)                                                            \
  V(Socket, Lookup, 31)                                                        \
  V(Socket, ListInterfaces, 32)                                                \
  V(Socket, ReverseLookup, 33)                                                 \
  V(Directory, Create, 34)                                                     \
  V(Directory, Delete, 35)                                                     \
  V(Directory, Exists, 36)                                                     \
  V(Directory, CreateTemp, 37)                                                 \
  V(Directory, ListStart, 38)                                                  \
  V(Directory, ListNext, 39)                                                   \
  V(Directory, ListStop, 40)                                                   \
  V(Directory, Rename, 41)                                                     \
  V(SSLFilter, ProcessFilter, 42)

通过上述代码,可以得知,IOService 主要处理的方法有四类:

  • File
  • Directory
  • Socket
  • SSLFilter

IOServiceCallback方法中,我们注意到,程序最后执行的结果是通过Dart_PostCObject返回的,来看一下他是怎么实现的:

// -> runtime\vm\native_api_impl.cc

static bool PostCObjectHelper(Dart_Port port_id, Dart_CObject* message) {
  AllocOnlyStackZone zone;
  std::unique_ptr<Message> msg = WriteApiMessage(
      zone.GetZone(), message, port_id, Message::kNormalPriority);

  if (msg == nullptr) {
    return false;
  }

  // Post the message at the given port.
  return **PortMap::PostMessage(std::move(msg));**
}

DART_EXPORT bool Dart_PostCObject(Dart_Port port_id, Dart_CObject* message) {
  return PostCObjectHelper(port_id, message);
}

// -> runtime\vm\port.cc

bool PortMap::PostMessage(std::unique_ptr<Message> message,
                          bool before_events) {
  MutexLocker ml(mutex_);
  if (ports_ == nullptr) {
    return false;
  }
  auto it = ports_->TryLookup(message->dest_port());
  if (it == ports_->end()) {
    // Ownership of external data remains with the poster.
    message->DropFinalizers();
    return false;
  }
  MessageHandler* handler = (*it).handler;
  ASSERT(handler != nullptr);
  **handler->PostMessage(std::move(message), before_events);**
  return true;
}

// -> runtime\vm\message_handler.cc

void MessageHandler::PostMessage(std::unique_ptr<Message> message,
                                 bool before_events) {
  Message::Priority saved_priority;

  {
    MonitorLocker ml(&monitor_);
    if (FLAG_trace_isolates) {
      Isolate* source_isolate = Isolate::Current();
      if (source_isolate != nullptr) {
        OS::PrintErr(
            "[>] Posting message:\n"
            "\tlen:        %" Pd "\n\tsource:     (%" Pd64
            ") %s\n\tdest:       %s\n"
            "\tdest_port:  %" Pd64 "\n",
            message->Size(), static_cast<int64_t>(source_isolate->main_port()),
            source_isolate->name(), name(), message->dest_port());
      } else {
        OS::PrintErr(
            "[>] Posting message:\n"
            "\tlen:        %" Pd
            "\n\tsource:     <native code>\n"
            "\tdest:       %s\n"
            "\tdest_port:  %" Pd64 "\n",
            message->Size(), name(), message->dest_port());
      }
    }

    saved_priority = message->priority();
		// **将 Message 加入到 MessageQueue 中**
    if (message->IsOOB()) {
      oob_queue_->Enqueue(std::move(message), before_events);
    } else {
      queue_->Enqueue(std::move(message), before_events);
    }
    if (paused_for_messages_) {
      ml.Notify();
    }

    if (pool_ != nullptr && !task_running_) {
      ASSERT(!delete_me_);
      task_running_ = true;
      const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
      ASSERT(launched_successfully);
    }
  }

  // Invoke any custom message notification.
  MessageNotify(saved_priority);
}

上述代码最后将结果包装成了 Message 打包进MessageHandler消息队列中,这样便可以在 Dart 端通过消息分发接收到结果。

Dart_NewNativePort

再来看一下Dart_NewNativePort的实现如下:

// -> runtime\vm\native_api_impl.cc

DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
                                         Dart_NativeMessageHandler handler,
                                         bool handle_concurrently) {
  if (name == NULL) {
    name = "<UnnamedNativePort>";
  }
  if (handler == NULL) {
    OS::PrintErr("%s expects argument 'handler' to be non-null.\n",
                 CURRENT_FUNC);
    return ILLEGAL_PORT;
  }
  // 此方法位于 sdk/runtime/vm/dart.cc
  // Used to Indicate that a Dart API call is active.
  if (!Dart::SetActiveApiCall()) {
    return ILLEGAL_PORT;
  }
  // 【注意,这里切换了 isolate,退出当前 isolate,直到 Dart_NewNativePort 执行完毕再切换回当前 isolate】
  // Start the native port without a current isolate.
  //  这里的实现可以参考 https://github.com/dart-lang/sdk/blob/d437877c500c77d6e08372ba2dbda9c598f5bd8e/runtime/vm/dart_api_impl.cc
**IsolateLeaveScope saver(Isolate::Current());**
// 执行完 IsolateLeaveScope 后,会切换出当前 isolate 直到下面的 return port_id;执行完毕,但是在此期间,下面的代码依旧是在当前 isolate 所在的 IOThread 也即系统线程下进行的

  NativeMessageHandler* nmh = **new NativeMessageHandler(name, handler);**
	// 创建一个 Dart_Port 并且添加到 PortMap 中
  Dart_Port port_id = **PortMap::CreatePort(nmh);**
  if (port_id != ILLEGAL_PORT) {
		// 激活这个端口
    PortMap::SetPortState(port_id, PortMap::kLivePort);
		// 在 Dart 线程池中执行,在这里 Run() 中的代码会在一个新的线程中执行
    if (!**nmh->Run**(**Dart::thread_pool()**, NULL, NULL, 0)) {
      // 执行完毕之后,在之前调用本方法的环境,回调 handler,关闭 Dart_Port
      PortMap::ClosePort(port_id);
      port_id = ILLEGAL_PORT;
    }
  }
  Dart::ResetActiveApiCall();
  return port_id;
  // 上面 IsolateLeaveScope saver 对象在构造方法中退出了调用方法的分支,执行到这里后 saver 对象被回收,执行析构函数,又将 Isolate 切换回来
}

主要的流程有:

  1. 切换退出当前 isolate
  2. 创建NativeMessageHandler nmh包裹要处理的回调
  3. 根据上面创建的nmh创建Dart_Port port_id
  4. 执行**nmh->Run()**方法将nmh放到线程池中运行
  5. nmh执行完毕回调后,关闭Dart_Port port_id

也就是说,在 Dart 中向 Native 发送指令时,通过 Dart 的_IOService._dispatch()方法中执行_servicePorts._getPort(id);向 Native 层的 IOService 获取用于通信的SendPort servicePort时,会先通过 Dart_NewNativePort 创建一个NativeMessageHandler(会压入消息栈中),然后创建一个对应的Dart_Port port_id并返回给 Dart 用来触发消息。

让我们挨个分析一下:

1.退出当前 isolate

IsolateLeaveScopeopen in new window

2.创建NativeMessageHandler nmh包裹要处理的回调

3.根据上面创建的nmh创建Dart_Port port_id

看一下PortMap::CreatePort的实现:

// -> runtime\vm\port.cc

Dart_Port PortMap::CreatePort(MessageHandler* handler) {
  ASSERT(handler != NULL);
  MutexLocker ml(mutex_);
  if (ports_ == nullptr) {
    return ILLEGAL_PORT;
  }

#if defined(DEBUG)
  handler->CheckAccess();
#endif

	// 不停的遍历,直到找到一个可用的 port(类型为 int64_t)
  const Dart_Port port = AllocatePort();

	// 获取到的 port 只能通过 isolate_entry 访问
  // The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
  // by the [PortMap::mutex_] we already hold.
  MessageHandler::PortSetEntry isolate_entry;
  isolate_entry.port = port;
  handler->ports_.Insert(isolate_entry);

  Entry entry;
  entry.port = port;
  entry.handler = handler;
  entry.state = kNewPort;
  ports_->Insert(entry);

  if (FLAG_trace_isolates) {
    OS::PrintErr(
        "[+] Opening port: \n"
        "\thandler:    %s\n"
        "\tport:       %" Pd64 "\n",
        handler->name(), entry.port);
  }

  return entry.port;
}

Dart_Port PortMap::AllocatePort() {
  Dart_Port result;

  ASSERT(mutex_->IsOwnedByCurrentThread());

  // Keep getting new values while we have an illegal port number or the port
  // number is already in use.
  do {
    // Ensure port ids are representable in JavaScript for the benefit of
    // vm-service clients such as Observatory.
    const Dart_Port kMask1 = 0xFFFFFFFFFFFFF;
    // Ensure port ids are never valid object pointers so that reinterpreting
    // an object pointer as a port id never produces a used port id.
    const Dart_Port kMask2 = 0x3;
    result = (prng_->NextUInt64() & kMask1) | kMask2;

    // The two special marker ports are used for the hashset implementation and
    // cannot be used as actual ports.
    if (result == PortSet<Entry>::kFreePort ||
        result == PortSet<Entry>::kDeletedPort) {
      continue;
    }

    ASSERT(!static_cast<ObjectPtr>(static_cast<uword>(result))->IsWellFormed());
  } while (ports_->Contains(result));

  ASSERT(result != 0);
  ASSERT(!ports_->Contains(result));
  return result;
}

4.执行**nmh->Run()**方法将nmh放到线程池中运行


// -> runtime\vm\message_handler.cc
ThreadPool* pool_;
bool MessageHandler::Run(ThreadPool* pool,
                         StartCallback start_callback,
                         EndCallback end_callback,
                         CallbackData data) {
  MonitorLocker ml(&monitor_);
  if (FLAG_trace_isolates) {
    OS::PrintErr(
        "[+] Starting message handler:\n"
        "\thandler:    %s\n",
        name());
  }
  ASSERT(pool_ == NULL);
  ASSERT(!delete_me_);
  pool_ = pool;
  start_callback_ = start_callback;
  end_callback_ = end_callback;
  callback_data_ = data;
  task_running_ = true;
	// 在 Dart VM Thread 的线程池中执行 MessageHandler,会是一个新的线程
  bool result = pool_->Run<MessageHandlerTask>(this);
  if (!result) {
    pool_ = nullptr;
    start_callback_ = nullptr;
    end_callback_ = nullptr;
    callback_data_ = 0;
    task_running_ = false;
  }
  return result;
}

// 会在“线程池”运行的时候执行对应的 MessageHandler 回调
class MessageHandlerTask : public ThreadPool::Task {
 public:
  explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
    ASSERT(handler != NULL);
  }

  virtual void Run() {
    ASSERT(handler_ != NULL);
		// 执行具体的逻辑
    handler_->TaskCallback();
  }

 private:
  MessageHandler* handler_;

  DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
};

// -> runtime\include\dart_api.h

// A port is used to send or receive inter-isolate messages
typedef int64_t Dart_Port;

// -> runtime\vm\thread_pool.h

// Runs a task on the thread pool.
  template <typename T, typename... Args>
  bool Run(Args&&... args) {
    return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
  }

// -> runtime\vm\thread_pool.cc
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
  Worker* new_worker = nullptr;
  {
    MonitorLocker ml(&pool_monitor_);
    if (shutting_down_) {
      return false;
    }
		// 创建新的 Worker
    new_worker = ScheduleTaskLocked(&ml, std::move(task));
  }
  if (new_worker != nullptr) {
		// 在线程中执行 task
    new_worker->StartThread();
  }
  return true;
}

// -> runtime\vm\thread_pool.cc

// 创建一个 Worker
ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
                                                   std::unique_ptr<Task> task) {
  // Enqueue the new task.
  tasks_.Append(task.release());
  pending_tasks_++;
  ASSERT(pending_tasks_ >= 1);

  // Notify existing idle worker (if available).
  if (count_idle_ >= pending_tasks_) {
    ASSERT(!idle_workers_.IsEmpty());
    ml->Notify();
    return nullptr;
  }

  // If we have maxed out the number of threads running, we will not start a
  // new one.
  if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
    if (!idle_workers_.IsEmpty()) {
      ml->Notify();
    }
    return nullptr;
  }

  // Otherwise start a new worker.
  auto new_worker = new Worker(this);
  idle_workers_.Append(new_worker);
  count_idle_++;
  return new_worker;
}

// 新建的 Woker 和 ThreadPool 绑定
ThreadPool::Worker::Worker(ThreadPool* pool)
    : pool_(pool), join_id_(OSThread::kInvalidThreadJoinId) {}

// new_worker->StartThread();会调用下面的方法
void ThreadPool::Worker::StartThread() {
  int result = OSThread::Start("DartWorker", &Worker::Main,
                               reinterpret_cast<uword>(this));
  if (result != 0) {
    FATAL1("Could not start worker thread: result = %d.", result);
  }
}

// OSThread::Start 每个端不一样,我们选择 Android 端的实现
// -> runtime\vm\os_thread_android.cc

int OSThread::Start(const char* name,
                    ThreadStartFunction function,
                    uword parameter) {
  pthread_attr_t attr;
  int result = pthread_attr_init(&attr);
  RETURN_ON_PTHREAD_FAILURE(result);

  result = pthread_attr_setstacksize(&attr, OSThread::GetMaxStackSize());
  RETURN_ON_PTHREAD_FAILURE(result);

  ThreadStartData* data = new ThreadStartData(name, function, parameter);
	// 声明系统线程类型
  pthread_t tid;
	// 调用系统创建线程的函数 https://blog.csdn.net/liangxanhai/article/details/7767430
	// pthread_create 参数含义:1. &tid 指向线程的指针,2. &attr 新建线程的属性 3. ThreadStart 线程要执行的方法指针 4. data 传给参数 ThreadStart 的参数
  // 成功执行线程则返回 0
  result = pthread_create(&tid, &attr, ThreadStart, data);
  RETURN_ON_PTHREAD_FAILURE(result);

  result = pthread_attr_destroy(&attr);
  RETURN_ON_PTHREAD_FAILURE(result);

  return 0;
}

// Dispatch to the thread start function provided by the caller. This trampoline
// is used to ensure that the thread is properly destroyed if the thread just
// exits.
static void* ThreadStart(void* data_ptr) {
  if (FLAG_worker_thread_priority != kMinInt) {
    if (setpriority(PRIO_PROCESS, gettid(), FLAG_worker_thread_priority) ==
        -1) {
      FATAL2("Setting thread priority to %d failed: errno = %d\n",
             FLAG_worker_thread_priority, errno);
    }
  }

  ThreadStartData* data = reinterpret_cast<ThreadStartData*>(data_ptr);

  const char* name = data->name();
  OSThread::ThreadStartFunction function = data->function();
  uword parameter = data->parameter();
  delete data;

  // Set the thread name. There is 16 bytes limit on the name (including \0).
  // pthread_setname_np ignores names that are too long rather than truncating.
  char truncated_name[16];
  snprintf(truncated_name, ARRAY_SIZE(truncated_name), "%s", name);
  pthread_setname_np(pthread_self(), truncated_name);

	// 创建一个系统线程的包装类 OSThread 和新建的系统线程绑定
  // Create new OSThread object and set as TLS for new thread.
  OSThread* thread = OSThread::CreateOSThread();
  if (thread != NULL) {
		// 将线程切换到新创建的系统线程
    OSThread::SetCurrent(thread);
    thread->set_name(name);
    UnblockSIGPROF();
    // Call the supplied thread start function handing it its parameters.
		// 执行创建 ThreadStartData 时传入的方法,也就是 ThreadPool::Worker::Main(uword args)
    function(parameter);
  }

  return NULL;

OSThread* OSThread::CreateOSThread() {
  ASSERT(thread_list_lock_ != NULL);
  MutexLocker ml(thread_list_lock_);
  if (!creation_enabled_) {
    return NULL;
  }
  OSThread* os_thread = new OSThread();
  AddThreadToListLocked(os_thread);
  return os_thread;
}

在创建了新的系统线程后,会执行下面的方法:

// -> runtime\vm\thread_pool.cc

void ThreadPool::Worker::Main(uword args) {
  // Call the thread start hook here to notify the embedder that the
  // thread pool thread has started.
  Dart_ThreadStartCallback start_cb = Dart::thread_start_callback();
  if (start_cb != nullptr) {
    start_cb();
  }

  OSThread* os_thread = OSThread::Current();
  ASSERT(os_thread != nullptr);

  Worker* worker = reinterpret_cast<Worker*>(args);
  ThreadPool* pool = worker->pool_;
	// 将 Worker 和系统线程绑定
  os_thread->owning_thread_pool_worker_ = worker;
  worker->os_thread_ = os_thread;

  // Once the worker quits it needs to be joined.
  worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);

#if defined(DEBUG)
  {
    MonitorLocker ml(&pool->pool_monitor_);
    ASSERT(pool->idle_workers_.ContainsForDebugging(worker));
  }
#endif

  pool->WorkerLoop(worker);

  worker->os_thread_ = nullptr;
  os_thread->owning_thread_pool_worker_ = nullptr;

  // Call the thread exit hook here to notify the embedder that the
  // thread pool thread is exiting.
  Dart_ThreadExitCallback exit_cb = Dart::thread_exit_callback();
  if (exit_cb != nullptr) {
    exit_cb();
  }
}

// -> runtime\vm\os_thread.h
// OSThread
// The ThreadPool::Worker which owns this OSThread. If this OSThread was not
  // started by a ThreadPool it will be nullptr. This TLS value is not
  // protected and should only be read/written by the OSThread itself.
  void* owning_thread_pool_worker_ = nullptr;

  // thread_list_lock_ cannot have a static lifetime because the order in which
  // destructors run is undefined. At the moment this lock cannot be deleted
  // either since otherwise, if a thread only begins to run after we have
  // started to run TLS destructors for a call to exit(), there will be a race
  // on its deletion in CreateOSThread().
  static Mutex* thread_list_lock_;

Dart_NewSendPort

看一下Dart_NewSendPort如何将创建好的Dart_Port service_port转变为 Dart 的SendPort的:

// -> runtime\vm\dart_api_impl.cc

DART_EXPORT Dart_Handle Dart_NewSendPort(Dart_Port port_id) {
  DARTSCOPE(Thread::Current());
  CHECK_CALLBACK_STATE(T);
  if (port_id == ILLEGAL_PORT) {
    return Api::NewError("%s: illegal port_id %" Pd64 ".", CURRENT_FUNC,
                         port_id);
  }
  return Api::NewHandle(T, SendPort::New(port_id));
}

// -> runtime\vm\object.cc

SendPortPtr SendPort::New(Dart_Port id, Heap::Space space) {
  return New(id, Isolate::Current()->origin_id(), space);
}

SendPortPtr SendPort::New(Dart_Port id,
                          Dart_Port origin_id,
                          Heap::Space space) {
  ASSERT(id != ILLEGAL_PORT);
	// 创建新的 SendPort 并将 Dart_Port id 和当前的 isolate id 与之绑定
  SendPort& result = SendPort::Handle();
  {
    ObjectPtr raw =
        Object::Allocate(SendPort::kClassId, SendPort::InstanceSize(), space,
                         SendPort::ContainsCompressedPointers());
    NoSafepointScope no_safepoint;
    result ^= raw;
    result.StoreNonPointer(&result.untag()->id_, id);
    result.StoreNonPointer(&result.untag()->origin_id_, origin_id);
  }
  return result.ptr();
}

到这里我们发现,Dart_NewNativePort将要处理的事件handler封装起来,最后在非当前 isolate的线程中执行。

结论

从上面的分析中,我们可以知道,在 Dart 中通过 File 进行文件操作,其实是通过 Dart 中的_IOService 进行消息中转,将用户的 IO 指令发送到 Native 层的 IOService 中;

IOService 通过一些列操作,得到一个SendPort servicePort,与此同时对应的 IO 操作已经压入消息栈中等待触发在单独的线程中执行;

之后在_IOService 中servicePort将用户需要的 IO 操作和与自己通信的_replyToPort = _receivePort!.sendPort; 通过send方法触发IOServiceCallback执行对应的 IO 操作,并且在最后调用Dart_PostCObject方法将结果压入消息栈中,这会触发 Dart 层_IOService 的_receivePort!.handler 回调事件,然后根据事件失败或者成功,使用 Completer 通过 Event loop 一步步将事件上报,最终回调用户需要的命令。

参考资料

09、Flutter FFI Dart Native API_又吹风_Bassy 的博客-CSDN 博客open in new window

快手 - 开眼快创 Flutter 实践 | w4lle's Notesopen in new window

文章标题:《Dart 读取文件过程分析》
本文作者: JI,XIAOYONG
发布时间: 2022/06/02 16:25:23 UTC+8
更新时间: 2023/12/30 16:17:02 UTC+8
written by human, not by AI
本文地址: https://jixiaoyong.github.io/blog/posts/3db5282.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 许可协议。转载请注明出处!
你认为这篇文章怎么样?
  • 0
  • 0
  • 0
  • 0
  • 0
  • 0
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.8