вторник, 28 декабря 2010 г.

Использование OmniThread Libray 2.0 (OTL 2.0) для создания многопоточных приложений - 2

Продожение начатое в 

Этот код зависит от особенности коллекции TOmniBlockingCollection. Когда очередь будет пуста CompleteAdding заблокирует нумератор коллекции. Вызывать CompleteAdding в функции OnStop нужно обязательно, иначе бы без этого параллельный цикл работал не останавливаясь (вернее получилось бы вечное ожидание на следующем элементе коллекции).

В то время когда это показывает истинную мощь двух функции NoWait и OnStop, понимание этого процесса сложно и определенно код выше очень труднопонимаем. Автор OTL решил внести некоторые синтаксически исправления в OTL, для более понятного понимания работы, теперь используя новые возможности OTL мы перепишем наш код более наглядно для понимания:
Пример
var
  prime     : TOmniValue;
  primeQueue: IOmniBlockingCollection;
begin
  lbLog.Clear;
  primeQueue := TOmniBlockingCollection.Create;
  Parallel.ForEach(1, 1000)
       .PreserveOrder               // сохранит входной порядок
       .NoWait                 
       .Into(primeQueue)             
       .Execute(
                 procedure (const value: integer; var res: TOmniValue)
                 begin
                   res := value;
                 end
               );

  for prime in primeQueue do
  begin
    lbLog.Items.Add(IntToStr(prime));
    lbLog.Update;
  end;
end;

Этот пример показывает некоторые усовершенствования, добавленные к циклу ForEach.
Во-первых, Вы можете приказать, чтобы параллельная подсистема сохранила входной порядок, вызывая функцию PreservedOrder.
Во-вторых, использование делегата Into в  ForEach будет автоматически вызывать, CompleteAdding для параметре Into (в нашем случае для очереди) при окончании цикла.  В этом случае потребность в уродливом запросе OnStop отпадет.
От себя: Что такое делегат – делегат это дополнительная возможность (в нашем случае для Parallel.ForEach), которая устанавливается, вызовом соответствующей функции Parallel.ForEach. Так PreserveOrder, NoWait, Execute                  являются классическими делегатами, расширяющими поведение Parallel.ForEach. В чем то делегатов можно сравнить с дополнительными плагинами навешиваемыми на поведение функции,  в чем то с переключателями, положение которых управляет работой функции.
В-третьих, делегат Execute (из-за использования Into), вызывает делегата с другой сигнатурой.
От себя: Что такое сигнатура – сигнатура это можно сказать описание входящих параметров анонимной функции, которая передается делегату на выполнение.
Стандартная сигнатура делегата Execute:
procedure (const value: T)
В нашем же с случае предоставить делегату Execute процедуру с сигнатурой
procedure (const value: integer; var res: TOmniValue)

Если в параметр res будут установлено какой-нибудь значение в делегате, то это значение будет  добавлено к очереди (установленной в Into). Если значение не было изменено, то оно и не будет добавлено к очереди.
В основном стандартный код, выполняющийся в параллельном теле цикла заменен кодом ниже, и этот код вызывает Вашего собственного делегата (loopBody).

        result := TOmniValue.Null;
        while (not Stopped) and localQueue.GetNext(value) do
        begin
          loopBody(value, result);
          if not result.IsEmpty then
          begin
              oplIntoQueueObj.Add(result)
              result := TOmniValue.Null;
          end;
        end;
        oplIntoQueueObj.CompleteAdding;

Делегат NoWait и Into предоставляют Вам простой путь привязать параллельные циклы цепью и осуществить многократные параллельные стадии обработки.
Стоит также отметить что Parallel.ForEach позволяет использовать такую возможность как нумераторы.

Var
  dataQueue  : IOmniBlockingCollection;
  prime      : TOmniValue;
  resultQueue: IOmniBlockingCollection;
begin
  lbLog.Clear;
  dataQueue := TOmniBlockingCollection.Create;
  resultQueue := TOmniBlockingCollection.Create;
  // Добавим 1000 элементов
  Parallel.ForEach(1, 1000)
    .NoWait
    .Into(dataQueue)
    .Execute(
    procedure (const value: integer; var res: TOmniValue)
    begin
        res := value;
    end
  );
  // параллельный цикл проход с помощью нумератора
  // по коллекции из 100 элементов
  Parallel.ForEach<integer>(dataQueue as IOmniValueEnumerable)
    .NoWait
    .Into(resultQueue)
    .Execute(
      procedure (const value: integer; var res: TOmniValue)
      begin
         res := value;
      end
  );
  for prime in primeQueue do begin
    lbLog.Items.Add(IntToStr(prime));
    lbLog.Update;
  end;
end;

Если вы хотите перебрать что-то очень нестандартное в цикле, вы можете написать GetNext делегат:
Использование GetNext делегата в ForEach
Parallel.ForEach<integer>(
      function (var value: integer): boolean
      begin
        value := i;
        Result := (i <= testSize);
        Inc(i);
      end)
    .Execute(
      procedure (const elem: integer)
      begin
        outQueue.Add(elem);
      end);

Далее представлен полный список возможностей вызова параллельного цикла:
     
    ForEach(low, high: integer; step: integer = 1): IOmniParallelLoop<integer>;
    ForEach(const enumerable: IOmniValueEnumerable): IOmniParallelLoop;
    ForEach(const enum: IOmniValueEnumerator): IOmniParallelLoop;
    ForEach(const enumerable: IEnumerable): IOmniParallelLoop;
    ForEach(const enum: IEnumerator): IOmniParallelLoop;
    ForEach(const sourceProvider: TOmniSourceProvider): IOmniParallelLoop;
    ForEach(enumerator: TEnumeratorDelegate): IOmniParallelLoop;
    ForEach<T>(const enumerable: IOmniValueEnumerable): IOmniParallelLoop<T>;
    ForEach<T>(const enum: IOmniValueEnumerator): IOmniParallelLoop<T>;
    ForEach<T>(const enumerable: IEnumerable): IOmniParallelLoop<T>;
    ForEach<T>(const enum: IEnumerator): IOmniParallelLoop<T>;
    ForEach<T>(const enumerable: TEnumerable<T>): IOmniParallelLoop<T>;
    ForEach<T>(const enum: TEnumerator<T>): IOmniParallelLoop<T>;
    ForEach<T>(enumerator: TEnumeratorDelegate<T>): IOmniParallelLoop<T>;
    ForEach(const enumerable: TObject): IOmniParallelLoop;
    ForEach<T>(const enumerable: TObject): IOmniParallelLoop<T>;

Последние две версии используются, чтобы пройтись по любому объекту, который поддерживает  стандартный класс нумераторов. К сожалению, эта особенность доступна только в Delphi 2010, потому что она использует расширенный RTTI, чтобы получить доступ к нумератору и его методам.

Параллельный цикл можно использовать даже тогда  источник перечисления не  является потоко-защищенным.  Вы можете быть уверены, что к данным, которые переданы к ForEach поток получит доступ от одного потока в тоже самое время (хотя код выполняющийся не всегда будет выполняться в том же самом потоке). Фактически OTL возьмет на себя роль механизма синхронизации обращения в этим данным.
Только в специальных случаях, когда внутренний механизм OTL знает, что источник – потокозащищен (например, когда IOmniValueEnumerator передают к ForEach), к данным получат доступ множество потоков  в то же самое время.

Комментариев нет:

Отправить комментарий