среда, 26 января 2011 г.

Использование OmniThread Libray 2.0 (OTL 2.0) для создания многопоточных приложений - 6


Провайдер исходных данных

В данном разделе мы остановимся более подробно на провайдере исходных данных или исходном провайдере.
Как мы знаем, провайдер является поставщиком данных для передачи значений в локальную очередь (цикл), в котором данное значение далее передается в параллельные задачи.

Провайдер исходных данных – это объект, который передает данные из источника перечисления (данные, которые передали в Parallel For), и повторно упаковывает эти данные в формат, подходящий для параллельного потребления. В настоящее время есть три провайдера, определенные в модуле OtlDataManager:
TOmniIntegerRangeProvider
Делает проход (итерацию) по диапазонам целого числа. Стоит сказать, что данный класс не получает никакие данные из источников перечислений. Фактически это внутренний  класс для внутренних механизмов OTL.
TOmniValueEnumeratorProvider
Пробегает по источнику перечислений IOmniValueEnumerator, который является специализированным  нумератором, предоставляющим доступ к блокирующей коллекции. К данному нумератору можно получить доступ от нескольких параллельных задач (читателей). Так как в настоящее время этот провайдер работает с блокирующей коллекцией IOmniBlockingCollection он не требует обеспечения механизма блокировки.
TOmniEnumeratorProvider
Данный провайдер пробегается по нумераторам Windows (классам реализующим интерфейс IEnumerator) или нумераторам Delphi (GetEnumerator, обернутый в класс TOmniValueEnumerator).
Все провайдеры наследуются от абстрактного класса по TOmniSourceProvider, который предоставляет общие методы для всех потомков.
В теории работа потомка-провайдера должна быть построена на поддержке работы данных методов, но на практике потомки-провайдеры в некоторых случаях для улучшения производительности не обеспечивают полную поддержку данных методов, а используют свои внутреннии механизмы. Так что имейте это в виду.
TOmniSourceProvider
TOmniSourceProvider = class abstract
  public
    function  Count: int64; virtual; abstract;
    function  CreateDataPackage: TOmniDataPackage; virtual; abstract;
    function  GetCapabilities: TOmniSourceProviderCapabilities;
      virtual; abstract;
    function  GetPackage(dataCount: integer;
      package: TOmniDataPackage): boolean; virtual; abstract;
    function  GetPackageSizeLimit: integer; virtual; abstract;
  end; { TOmniSourceProvider }

Так как не все провайдеры поддерживают исходные методы базового класса  TOmniSourceProvider для определения особенностей работы конкретного провайдера введена функция GetCapabilities, которая возвращает особенности работы конкретного провайдера:
  TOmniSourceProviderCapability = (
    spcCountable,  // провайдер знает сколько он содержит данных (является
                   // исчисляемым)
    spcFast,       // быстрая обработка
    spcDataLimit   // пакет данных может содержать только ограниченное количество
                   // данных
  );

   TOmniSourceProviderCapabilities = set of TOmniSourceProviderCapability;

На практике, класс  TomniIntegerRangeProvider точно знает, сколько он содержит данных, то есть он является исчисляемым. Соответсвенно у него выставлен spcCountable. И действительно диапазон данных при цикле, например от 1 до 10 точно известен и равен 10. Так, как количество данных заранее известно, то также выставлен флаг spcFast указывающий,  на то, что данные будут обработаны быстро.
Насчет других классов провайдеров такое сказать нельзя – они не являются заранее исчисляемыми (не известен диапазон данных) и соответственно обработаны быстро они не могут.
Что качается флага spcDataLimit, то на данный момент он уже устарел и возможно автором OTL будет удален в следующих редакциях. Заместо данного флага нужно использовать метод GetPackageSizeLimit.

Другой важный аспект провайдера  - метод GetPackage. Этот метод получает доступ к источнику данных (гарантируя блокировку доступа в случае необходимости), восстанавливает данные и возвращает эти данные в пакете данных. Выполнение этого метода очень зависит от исходных данных. Например, провайдер TomniIntegerRangeProvider перебирает данные  в диапазоне “от” и “до” и возвращает пакет данных, который не содержит кучу значений, а содержит только одно значение в выбранном диапазоне. Поэтому операции с таким провайдером осуществляются довольно быстро.
В другом случае нумератор провайдера исходных данных блокирует источник данных (захватывает его) и возвращет пакет данных (строит его) по полученному от источника данных значению. Так как провайдер задействет как бы внешний источник данных через нумератор, то операции соответственно по времени длятся гораздо дольше, чем в случае с TomniIntegerRangeProvider.
В простейшем случае, TOmniValueEnumerator провайдера исходных данных просто возвращает значение и строит пакет данных.
TOmniValueEnumeratorProvider.GetPackage
function TOmniValueEnumeratorProvider.GetPackage(dataCount: integer; package:
  TOmniDataPackage): boolean;
var
  iData     : integer;
  intPackage: TOmniValueEnumeratorDataPackage absolute package;
  timeout   : cardinal;
  value     : TOmniValue;
begin
  Assert(not StorePositions);
  Result := false;
  dataCount := intPackage.Prepare(dataCount);
  timeout := INFINITE;
  for iData := 1 to dataCount do begin
    // получаем данные от внешнего источника и строим пакет данных
    if not vepEnumerator.TryTake(value, timeout) then
      break; //for
    intPackage.Add(value);
    timeout := 0;
    Result := true;
  end;
end; { TOmniValueEnumeratorProvider.GetPackage }

Менеджер данных

Менеджер данных является так сказать краегольным камнем в иерархии OtlDataManager. Это центральная часть всей системы. Данный менеджер расположен между множественными локальными очередями и единственным провайдером исходных данных. Его задача состоит в том, чтобы удостоверится, что все параллельные задачи задействованы в работе.
В настоящее время в OTL реализованы два менеджера данных исчисляемый (countable) менеджер данных и эвристический (heuristic).
Первый используется, если провайдер исходных данных является исчисляемым, а последний если не является. Оба менеджера наследуются от абстрактного класса TOmniDataManager.

TOmniDataManager = class abstract
  public
    function  CreateLocalQueue: TOmniLocalQueue; virtual; abstract;
    function  AllocateOutputBuffer: TOmniOutputBuffer;
      virtual; abstract;
    function  GetNext(package: TOmniDataPackage): boolean;
      virtual; abstract;
    procedure ReleaseOutputBuffer(buffer: TOmniOutputBuffer);
      virtual; abstract;
    procedure SetOutput(const queue: IOmniBlockingCollection);
      overload; virtual; abstract;
  end; { TOmniDataManager }

Главное различие между ними заложено в функции GetNextFromProvider, которая читает данные от провайдера (вызывая его метод GetPackage). В исчисляемом провайдере это - только посредник (экспедитор передающий данные как есть), в то время как в эвристическом провайдере эта функция пытается найти хороший размер пакета, который позволит всем параллельным задачам работать на полной скорости.

function TOmniHeuristicDataManager.GetNextFromProvider(package: TOmniDataPackage;
  generation: integer): boolean;
const
  CDataLimit = Trunc(High(integer) / CFetchTimeout_ms);
var
  dataPerMs: cardinal;
  dataSize : integer;
  time     : int64;
begin
  // the goal is to fetch as much (but not exceeding <fetch_limit>)
  // data as possible in <fetch_timeout> milliseconds; highest amount
  // of data is limited by the GetDataCountForGeneration method.
  dataSize := GetDataCountForGeneration(generation);
  if dataSize > hdmEstimatedPackageSize.Value then
    dataSize := hdmEstimatedPackageSize.Value;
  time := DSiTimeGetTime64;
  Result := SourceProvider.GetPackage(dataSize, package);
  time := DSiTimeGetTime64 - time;
  if Result then begin
    if time = 0 then
      dataPerMs := CDataLimit
    else begin
      dataPerMs := Round(dataSize / time);
      if dataPerMs >= CDataLimit then
        dataPerMs := CDataLimit;
    end;
    // average over last four fetches for dynamic adaptation
    hdmEstimatedPackageSize.Value := Round
      ((hdmEstimatedPackageSize.Value / 4 * 3) +
       (dataPerMs / 4) * CFetchTimeout_ms);
  end;
end; { TOmniHeuristicDataManager.GetNextFromProvider }

Локальная очередь

Как вы знаете, каждая параллельная задача читает данные от локальной очереди, которая является простым интерфейсом к менеджеру данных. Самая важная часть очереди – это свой метод GetNext, который предоставляет конкретной задаче следующее значение.

function TOmniLocalQueueImpl.GetNext(var value: TOmniValue): boolean;
begin
  Result := lqiDataPackage.GetNext(value);
  if not Result then begin
    Result := lqiDataManager_ref.GetNext(lqiDataPackage);
    if Result then
      Result := lqiDataPackage.GetNext(value);
  end;
end; { TOmniLocalQueueImpl.GetNext }

Каждая локальная очередь содержит локальный пакет данных. GetNext сначала пытается прочитать следующее значение от этого пакета данных. Если эта попытка терпит неудачу (пакеты данных пустые - были уже полностью обработаны), то метод пытается получить новый пакет данных от менеджера данных и (в случае успеха) повторяет установку следующих данных от нового полученного пакета данных.
GetNext в менеджере данных сначала пытается получить следующий пакет от провайдера (через метод GetNextFromProvider, который вызывает метод провайдера GetPackage). Если терпится неудача, то метод пытается украсть часть рабочей нагрузки от другой задачи.
Кража - эта особенность, которая позволяет всем параллельным задачам быть активными до последнего перечисляемого значения. Чтобы осуществить это, менеджер данных пробегает повторно по всем локальным очередям и пытается расколоть пакет данных каждой локальной очереди наполовину. Если это удается, то половину пакета данных остается в оригинальной локальной очереди, а другая половина возвращается к локальной очереди, которая запрашивала новые данные (новый пакет данных).
Раскол пакета очень зависит от типа данных. Например, пакет данных целого числа только повторно вычисляет границы диапазона, в то время как основанные на нумераторе пакеты должны прежде скопировать данные.

function TOmniValueEnumeratorDataPackage.Split(
  package: TOmniDataPackage): boolean;
var
  intPackage: TOmniValueEnumeratorDataPackage absolute package;
  iValue    : integer;
  value     : TOmniValue;
begin
  Result := false;
  for iValue := 1 to intPackage.Prepare(vedpApproxCount.Value div 2)
  do begin
    if not GetNext(value) then
      break; //for
    intPackage.Add(value);
    Result := true;
  end;
end; { TOmniValueEnumeratorDataPackage.Split }

Комментариев нет:

Отправить комментарий