Продожение начатое в 
В то время когда это показывает истинную мощь двух функции NoWait и OnStop, понимание этого процесса сложно и определенно код выше очень труднопонимаем. Автор OTL решил внести некоторые синтаксически исправления в OTL, для более понятного понимания работы, теперь используя новые возможности OTL мы перепишем наш код более наглядно для понимания:
Пример 
var
  prime     : TOmniValue;
  primeQueue: IOmniBlockingCollection;
begin
  lbLog.Clear;
  primeQueue := TOmniBlockingCollection.Create;
  Parallel.ForEach(1, 1000)
       .PreserveOrder               // сохранит входной порядок
       .NoWait                  
       .Into(primeQueue)             
       .Execute(
                 procedure (const value: integer; var res: TOmniValue)
                 begin
                   res := value;
                 end
               );
  for prime in primeQueue do 
  begin
    lbLog.Items.Add(IntToStr(prime));
    lbLog.Update;
  end;
end;
Этот пример показывает некоторые усовершенствования, добавленные к циклу ForEach. 
Во-первых, Вы можете приказать, чтобы параллельная подсистема сохранила входной порядок, вызывая функцию PreservedOrder. 
Во-вторых, использование делегата Into в  ForEach будет автоматически вызывать, CompleteAdding для параметре Into (в нашем случае для очереди) при окончании цикла.  В этом случае потребность в уродливом запросе OnStop отпадет.
От себя: Что такое делегат – делегат это дополнительная возможность (в нашем случае для Parallel.ForEach), которая устанавливается, вызовом соответствующей функции Parallel.ForEach. Так PreserveOrder, NoWait, Execute                  являются классическими делегатами, расширяющими поведение Parallel.ForEach. В чем то делегатов можно сравнить с дополнительными плагинами навешиваемыми на поведение функции,  в чем то с переключателями, положение которых управляет работой функции. 
В-третьих, делегат Execute (из-за использования Into), вызывает делегата с другой сигнатурой. 
От себя: Что такое сигнатура – сигнатура это можно сказать описание входящих параметров анонимной функции, которая передается делегату на выполнение.
Стандартная сигнатура делегата Execute:
procedure (const value: T)
В нашем же с случае предоставить делегату Execute процедуру с сигнатурой 
procedure (const value: integer; var res: TOmniValue)
Если в параметр res будут установлено какой-нибудь значение в делегате, то это значение будет  добавлено к очереди (установленной в Into). Если значение не было изменено, то оно и не будет добавлено к очереди.
В основном стандартный код, выполняющийся в параллельном теле цикла заменен кодом ниже, и этот код вызывает Вашего собственного делегата (loopBody).
        result := TOmniValue.Null;
        while (not Stopped) and localQueue.GetNext(value) do 
        begin
          loopBody(value, result);
          if not result.IsEmpty then 
          begin
              oplIntoQueueObj.Add(result)
              result := TOmniValue.Null;
          end;
        end;
        oplIntoQueueObj.CompleteAdding; 
Делегат NoWait и Into предоставляют Вам простой путь привязать параллельные циклы цепью и осуществить многократные параллельные стадии обработки. 
Стоит также отметить что Parallel.ForEach позволяет использовать такую возможность как нумераторы.
Var
  dataQueue  : IOmniBlockingCollection;
  prime      : TOmniValue;
  resultQueue: IOmniBlockingCollection;
begin
  lbLog.Clear;
  dataQueue := TOmniBlockingCollection.Create;
  resultQueue := TOmniBlockingCollection.Create;
  // Добавим 1000 элементов
  Parallel.ForEach(1, 1000)
    .NoWait
    .Into(dataQueue)
    .Execute(
    procedure (const value: integer; var res: TOmniValue)
    begin
        res := value;
    end
  );
  // параллельный цикл проход с помощью нумератора 
  // по коллекции из 100 элементов
  Parallel.ForEach<integer>(dataQueue as IOmniValueEnumerable)
    .NoWait
    .Into(resultQueue)
    .Execute(
      procedure (const value: integer; var res: TOmniValue)
      begin
         res := value;
      end
  );
  for prime in primeQueue do begin
    lbLog.Items.Add(IntToStr(prime));
    lbLog.Update;
  end;
end;
Если вы хотите перебрать что-то очень нестандартное в цикле, вы можете написать GetNext делегат:
Использование GetNext делегата в ForEach
Parallel.ForEach<integer>(
      function (var value: integer): boolean
      begin
        value := i;
        Result := (i <= testSize);
        Inc(i);
      end)
    .Execute(
      procedure (const elem: integer)
      begin
        outQueue.Add(elem);
      end);
Далее представлен полный список возможностей вызова параллельного цикла:
    ForEach(low, high: integer; step: integer = 1): IOmniParallelLoop<integer>;
    ForEach(const enumerable: IOmniValueEnumerable): IOmniParallelLoop; 
    ForEach(const enum: IOmniValueEnumerator): IOmniParallelLoop; 
    ForEach(const enumerable: IEnumerable): IOmniParallelLoop;
    ForEach(const enum: IEnumerator): IOmniParallelLoop;
    ForEach(const sourceProvider: TOmniSourceProvider): IOmniParallelLoop;
    ForEach(enumerator: TEnumeratorDelegate): IOmniParallelLoop;
    ForEach<T>(const enumerable: IOmniValueEnumerable): IOmniParallelLoop<T>;
    ForEach<T>(const enum: IOmniValueEnumerator): IOmniParallelLoop<T>;
    ForEach<T>(const enumerable: IEnumerable): IOmniParallelLoop<T>;
    ForEach<T>(const enum: IEnumerator): IOmniParallelLoop<T>;
    ForEach<T>(const enumerable: TEnumerable<T>): IOmniParallelLoop<T>;
    ForEach<T>(const enum: TEnumerator<T>): IOmniParallelLoop<T>;
    ForEach<T>(enumerator: TEnumeratorDelegate<T>): IOmniParallelLoop<T>;
    ForEach(const enumerable: TObject): IOmniParallelLoop;
    ForEach<T>(const enumerable: TObject): IOmniParallelLoop<T>;
Последние две версии используются, чтобы пройтись по любому объекту, который поддерживает  стандартный класс нумераторов. К сожалению, эта особенность доступна только в Delphi 2010, потому что она использует расширенный RTTI, чтобы получить доступ к нумератору и его методам.
Параллельный цикл можно использовать даже тогда  источник перечисления не  является потоко-защищенным.  Вы можете быть уверены, что к данным, которые переданы к ForEach поток получит доступ от одного потока в тоже самое время (хотя код выполняющийся не всегда будет выполняться в том же самом потоке). Фактически OTL возьмет на себя роль механизма синхронизации обращения в этим данным.
Только в специальных случаях, когда внутренний механизм OTL знает, что источник – потокозащищен (например, когда IOmniValueEnumerator передают к ForEach), к данным получат доступ множество потоков  в то же самое время.
Комментариев нет:
Отправить комментарий