четверг, 13 января 2011 г.

Использование OmniThread Libray 2.0 (OTL 2.0) для создания многопоточных приложений - 4

Многоступенчатые процессы


Модуль OtlParallel в OmniThreadLibrary предлагает некоторые решения высокого уровня, которые позволяют Вам легко управлять некоторыми видами процессов параллельно. До некоторых пор OTL предоставляло только фоновые вычисления (Parallel.Future), независимые параллельные процессы (Parallel.Join) и параллельные циклы, где фоновая задача, выполняющаяся в данном цикле, зависела только от значения цикла, а не от вычислений, сделанных в других фоновых задачах Parallel.ForEach. К сожалению, многие параллельные процессы не подпадают ни под одну из этих категорий.

И вот недавно автор OTL (по предложению Mason Wheeler) добавил поддержку многоступенчатых процессов. Поддержка многоступенцатых процессов теперь реализована в конструкции Parallel.Pipeline.

Трубопроводы

Основой разработки данного механизма явилось предположение, что некий процесс может быть разбит на стадии (или подпроцессы) и связан с очередями данных.
Потоки данных от входной очереди передаются в первую стадию, где эти данные  частично обрабатываются, и затем передаются в очередь-посредник. Первая стадия тогда продолжает выполнение, обрабатывает другие входящие данные и передает их снова по мере поступления в очередь-посредник.
Это продолжается, до тех пор, пока полностью не будет отработаны входящие данные в данной стадии. После отработки стадии, очередь-посредник запускает следующую стадию, которая делает обработку в подобной манере и так далее и так далее.
В конце концов, данные переданные через очереди (прошедшие стадии) будут  на финальной стадии прочитаны и обработаны программой создавшей этот многоступенчатый процесс.
В целом, многоступенчатый процесс функционирует как трубопровод - данные входят, данные выходят (и чудесно в промежутках обрабатываются). Далее термином трубопровод мы будем именовать многоступенчатый процесс.

Также важно,  что никакая стадия не взаимодействует с другой стадией. Единственное взаимодействие между стадиями сделано с данными, проходящими через посреднические очереди. Количество данных переданных через очередь-посредник никак не регламентируется. Для одной стадии возможна ситуация когда на входе стадия получит больше или меньше данных чем передаст после обработки через выход в очередь-посредник для следующей стадии.

В классической последовательной программе план выполнения многоступенчатого процесса очень прост:

В параллельной и многопоточной среде, мы можем добиться большего успеха чем используя классический одно-параллельный подход. Поскольку стадии в значительной степени независимы, они могут быть выполнены значительно быстрее:

Настраивание задач и очередей-посредников является довольно простым, хотя и не совсем тривиальным процессом. Поэтому было бы лучше, если бы это могло бы быть автоматизировано в некотором роде.

Parallel.Pipeline

Трубопровод создается с помощью функции Parallel.Pipeline, которая возвращает интерфейс IOmniPipeline. Есть две перегруженных версии этой функции - одна для общего создания трубопровода и другая для создания простых трубопроводов, которые не требуют никаких специальных настроек.

class function Pipeline: IOmniPipeline; overload;
class function Pipeline(
  const stages: array of TPipelineStageDelegate;
  const input: IOmniBlockingCollection = nil):
  IOmniPipeline; overload;

Последняя версия имеет два параметра: stages – массив обработок стадий и input  - дополнительная входная очередь. Входная очередь может использоваться, чтобы обеспечить начальные данные для первой стадии. Также можно передать nil в параметр input  тем самым указывая первой стадии, что данных для первой стадии нет. Это при меняется тогда, когда первая стадия сама генерирует данные, для последующих, то есть данных для обработки на входе для нее не требуется.
Для данных стоящих в очереди в Parallel.Pipeline используются блокирующие коллекции
  • Примечание:О блокирующих коллекциях OTL я постараюсь рассказать далее.Интересующихся отправлю к http://17slon.com/blogs/gabr/2010/02/three-steps-to-blocking-collection-3.html
Сами стадии осуществлены как анонимные процедуры или методы, берущие два параметра очереди - один для входа и один для выхода. Кроме того в первой стадии, где входная очередь не может быть определена, автоматически создается Трубопровод и передается делегату стадии:

TPipelineStageDelegate = reference to procedure
  (const input, output: IOmniBlockingCollection);

Следующий код показывает простую реализацию Трубопровода в действии. Этот код взято от примера tests\OmniThreadLibrary 41_Pipeline OTL.
mniThreadLibrary 41_Pipeline.pas Создание трубопровода
procedure TfrmPipelineDemo.btnSimpleClick(Sender: TObject);
var
  pipeOut: IOmniBlockingCollection;
begin
  pipeOut := Parallel.Pipeline(
    [StageGenerate, StageMult2, StageMinus3, StageMod5, StageSum]).Run;
  lbLog.Items.Add(Format('Pipeline result: %d',
    [pipeOut.Next.AsInteger]));
end;

Parallel.Pipeline принимает множество делегатов стадии и возвращает интерфейс IOmniPipeline. Метод Run вызван из интерфейса, чтобы настроить инфраструктуру, начать все задачи.
Доступ к  интерфейсу IOmniPipeline обычно получают доступ через функцию создания трубопровода Parallel.Pipeline.

IOmniPipeline
IOmniPipeline = interface
  function  Input(const queue: IOmniBlockingCollection):
    IOmniPipeline;
  function  NumTasks(numTasks: integer): IOmniPipeline;
  function  Run: IOmniBlockingCollection;
  function  Stage(pipelineStage: TPipelineStageDelegate):
    IOmniPipeline;
  function  Stages(const pipelineStages: array of
    TPipelineStageDelegate): IOmniPipeline;
  function  Throttle(numEntries: integer;
    unblockAtCount: integer = 0): IOmniPipeline;
end;

Метод Input - устанавливает входную очередь. Если этот метод не вызван, то входная очередь назначена не будет, и первая стадия получит nill для входного параметра.
Метод Stage - добавляет еще одну стадию трубопровода.
Метод Stages - добавляет многократные стадии трубопровода (массив).
Метод NumTasks определяет количество параллельных задач выполнения для стадии(й), только что добавленных с помощью функций Stage/Stages (Для этого сначала нужно вызвать Stage(s) и только потом NumTasks). Если вызвать NumTasks  прежде, чем любая стадия добавлена, это определит число параллельных задач для всех стадий по умолчанию.
Количество параллельных задач для выполнения определенной стадии может, быть изменена NumTasks только после вызова Stage.
Метод Throttle - устанавливает параметры регулирования  для стадии(й), только что добавленных с помощью функций Stage/Stages и для вновь создаваемых стадий далее.
  • Подробно о регулированиях стадий и установках количества параллельных задач для стадии будет рассказано далее по тексту.
Метод Run  делает всю тяжелую работу - создает очереди и настраивает задачи OmniThreadLibrary. Этот метод возвращает выходную очередь, которая может использоваться в Вашей программе, чтобы получить результат вычисления прошедшего в многоступенчатом процессе (стадиях). Даже если последняя стадия, не приводит ни к какому результату, эта очередь может использоваться, для сигнализирования конца вычислений.
Когда каждая стадия заканчивается, CompleteAdding автоматически вызывается для выходной очереди. Это позволяет следующей стадии обнаруживать конец в получаемых данных на очереди входа (блокирующий нумератор коллекции, выйдет, или TryTake возвратится False). То же самое происходит и для выходной очереди.
Пример (также взятый от 41_Pipeline) поможет объяснить все это:
mniThreadLibrary 41_Pipeline.pas
procedure TfrmPipelineDemo.btnExtended2Click(Sender: TObject);
var
  pipeOut: IOmniBlockingCollection;
begin
  pipeOut := Parallel.Pipeline
    .Throttle(102400)
    .Stage(StageGenerate)
    .Stage(StageMult2)
    .Stages([StageMinus3, StageMod5])
    .NumTasks(2)
    .Stage(StageSum)
    .Run;
  lbLog.Items.Add(Format('Pipeline result: %d',
    [pipeOut.Next.AsInteger]));
end;

Во-первых, мы устанавливаем глобальные параметры регулирования. Это будет применено ко всем далее создаваемым стадиям. Две стадии будут добавлены, каждый с отдельным вызовом функции Stage.
Еще две стадии будут добавлены одним вызовом Stages. Обе эти стадии будут выполнять в двух параллельных задачах. В конце будет добавлена еще одна  стадия, и вызван метод Run.
Полный многоступенчатый процесс будет использовать семь задач (один для StageGenerate, один для StageMult2, два для StageMinus3, два для StageMod5 и один для StageSum).

Продолжение следует... 

Комментариев нет:

Отправить комментарий