Продожение начатое в
В то время когда это показывает истинную мощь двух функции NoWait и OnStop, понимание этого процесса сложно и определенно код выше очень труднопонимаем. Автор OTL решил внести некоторые синтаксически исправления в OTL, для более понятного понимания работы, теперь используя новые возможности OTL мы перепишем наш код более наглядно для понимания:
Пример
var
prime : TOmniValue;
primeQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
primeQueue := TOmniBlockingCollection.Create;
Parallel.ForEach(1, 1000)
.PreserveOrder // сохранит входной порядок
.NoWait
.Into(primeQueue)
.Execute(
procedure (const value: integer; var res: TOmniValue)
begin
res := value;
end
);
for prime in primeQueue do
begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;
Этот пример показывает некоторые усовершенствования, добавленные к циклу ForEach.
Во-первых, Вы можете приказать, чтобы параллельная подсистема сохранила входной порядок, вызывая функцию PreservedOrder.
Во-вторых, использование делегата Into в ForEach будет автоматически вызывать, CompleteAdding для параметре Into (в нашем случае для очереди) при окончании цикла. В этом случае потребность в уродливом запросе OnStop отпадет.
От себя: Что такое делегат – делегат это дополнительная возможность (в нашем случае для Parallel.ForEach), которая устанавливается, вызовом соответствующей функции Parallel.ForEach. Так PreserveOrder, NoWait, Execute являются классическими делегатами, расширяющими поведение Parallel.ForEach. В чем то делегатов можно сравнить с дополнительными плагинами навешиваемыми на поведение функции, в чем то с переключателями, положение которых управляет работой функции.
В-третьих, делегат Execute (из-за использования Into), вызывает делегата с другой сигнатурой.
От себя: Что такое сигнатура – сигнатура это можно сказать описание входящих параметров анонимной функции, которая передается делегату на выполнение.
Стандартная сигнатура делегата Execute:
procedure (const value: T)
В нашем же с случае предоставить делегату Execute процедуру с сигнатурой
procedure (const value: integer; var res: TOmniValue)
Если в параметр res будут установлено какой-нибудь значение в делегате, то это значение будет добавлено к очереди (установленной в Into). Если значение не было изменено, то оно и не будет добавлено к очереди.
В основном стандартный код, выполняющийся в параллельном теле цикла заменен кодом ниже, и этот код вызывает Вашего собственного делегата (loopBody).
result := TOmniValue.Null;
while (not Stopped) and localQueue.GetNext(value) do
begin
loopBody(value, result);
if not result.IsEmpty then
begin
oplIntoQueueObj.Add(result)
result := TOmniValue.Null;
end;
end;
oplIntoQueueObj.CompleteAdding;
Делегат NoWait и Into предоставляют Вам простой путь привязать параллельные циклы цепью и осуществить многократные параллельные стадии обработки.
Стоит также отметить что Parallel.ForEach позволяет использовать такую возможность как нумераторы.
Var
dataQueue : IOmniBlockingCollection;
prime : TOmniValue;
resultQueue: IOmniBlockingCollection;
begin
lbLog.Clear;
dataQueue := TOmniBlockingCollection.Create;
resultQueue := TOmniBlockingCollection.Create;
// Добавим 1000 элементов
Parallel.ForEach(1, 1000)
.NoWait
.Into(dataQueue)
.Execute(
procedure (const value: integer; var res: TOmniValue)
begin
res := value;
end
);
// параллельный цикл проход с помощью нумератора
// по коллекции из 100 элементов
Parallel.ForEach<integer>(dataQueue as IOmniValueEnumerable)
.NoWait
.Into(resultQueue)
.Execute(
procedure (const value: integer; var res: TOmniValue)
begin
res := value;
end
);
for prime in primeQueue do begin
lbLog.Items.Add(IntToStr(prime));
lbLog.Update;
end;
end;
Если вы хотите перебрать что-то очень нестандартное в цикле, вы можете написать GetNext делегат:
Использование GetNext делегата в ForEach
Parallel.ForEach<integer>(
function (var value: integer): boolean
begin
value := i;
Result := (i <= testSize);
Inc(i);
end)
.Execute(
procedure (const elem: integer)
begin
outQueue.Add(elem);
end);
Далее представлен полный список возможностей вызова параллельного цикла:
ForEach(low, high: integer; step: integer = 1): IOmniParallelLoop<integer>;
ForEach(const enumerable: IOmniValueEnumerable): IOmniParallelLoop;
ForEach(const enum: IOmniValueEnumerator): IOmniParallelLoop;
ForEach(const enumerable: IEnumerable): IOmniParallelLoop;
ForEach(const enum: IEnumerator): IOmniParallelLoop;
ForEach(const sourceProvider: TOmniSourceProvider): IOmniParallelLoop;
ForEach(enumerator: TEnumeratorDelegate): IOmniParallelLoop;
ForEach<T>(const enumerable: IOmniValueEnumerable): IOmniParallelLoop<T>;
ForEach<T>(const enum: IOmniValueEnumerator): IOmniParallelLoop<T>;
ForEach<T>(const enumerable: IEnumerable): IOmniParallelLoop<T>;
ForEach<T>(const enum: IEnumerator): IOmniParallelLoop<T>;
ForEach<T>(const enumerable: TEnumerable<T>): IOmniParallelLoop<T>;
ForEach<T>(const enum: TEnumerator<T>): IOmniParallelLoop<T>;
ForEach<T>(enumerator: TEnumeratorDelegate<T>): IOmniParallelLoop<T>;
ForEach(const enumerable: TObject): IOmniParallelLoop;
ForEach<T>(const enumerable: TObject): IOmniParallelLoop<T>;
Последние две версии используются, чтобы пройтись по любому объекту, который поддерживает стандартный класс нумераторов. К сожалению, эта особенность доступна только в Delphi 2010, потому что она использует расширенный RTTI, чтобы получить доступ к нумератору и его методам.
Параллельный цикл можно использовать даже тогда источник перечисления не является потоко-защищенным. Вы можете быть уверены, что к данным, которые переданы к ForEach поток получит доступ от одного потока в тоже самое время (хотя код выполняющийся не всегда будет выполняться в том же самом потоке). Фактически OTL возьмет на себя роль механизма синхронизации обращения в этим данным.
Только в специальных случаях, когда внутренний механизм OTL знает, что источник – потокозащищен (например, когда IOmniValueEnumerator передают к ForEach), к данным получат доступ множество потоков в то же самое время.
Комментариев нет:
Отправить комментарий