Очередь выходных данных и исходящие буферы
В предыдущих частях мы немного разабрались с внутренним механизмом работы Parallel ForEach. В этой части мы разберемся с очередью выходных даных. В OTL она представлена модификатором .PreserveOutput.
Очередь выходных данных, как правило, используются вместе с .Into- модификатором. Причина этого кроется в обеспечении интеграции между параллельной инфраструктурой и вашим параллельный кодом. Тем кодом, который выполняется в качестве задачи в Parallel.ForEach.
В обычной конструкции ForEach, выход из параллельной задачи не определен.
Вы имеете право делать все, что вам угодно с каждым элементом параллельного цикла, выводить какие-либо результаты (в случае, если нужно) в рамках работы с элементом, но параллельные потоки, выполняющиеся в рамках распараллеливания цикла ничего знать об этом не будут.
Таким образом, OTL не имеет возможности хранить любое выходное значение, потому что - по крайней мере, с точки зрения библиотеки – распараллеленый код не создает никаких выходных данных.
Однако когда вы задействуете модификатор .Into ваша программа начинает использовать другие механизмы работы (другие параметры в сигнатуре для вашего кода). Рассмотрим это на простом демонстрационном примере OTL 38:
Передем к рассмотрению внутреннего кода OTL – части, которая намечает параллельные задачи:
• Менеджер данных связан с очередью выходных данных (поле oplIntoQueueIntf содержит значение, которую передают к .Into методу).
• Созданная локальная очередь делает те-же самые действия, как и в ситуации, когда foreach вызван штатно.
• Исходящий буфер создан по условию менеджера и связан с локальной очередью.
• Пользовательский код выполнен, и каждое непустое значение, вернувшееся с пользовательского кода, будет записано в исходящий буфер.
• Исходящий буфер выгружен, как локальная очередь.
Интересная часть - как обычно - скрыта на заднем плане; в локальной очереди, менеджере данных и исходящем буфере.
Соединение всего этого
Первая модификация находится в источнике данных. Когда используется модификатор .PreserveOrder, каждый пакет данных, знает исходное положение для чтения. Чтобы упростить дело, раскол пакета данных не используется в этом случае, соответсвенно распараллеливание будет недостаточно эффективно с точки зрения производительности.
У каждой локальной очереди есть свой связанный с ней исходящий буфер.
Каждый задача управляет двумя исходящими буферами. Один из них является активным, и в него задача записывает данные. Другой буфер может быть пустым или полным. Каждый буфер связан с входной позицией - так же, как пакет данных.
Когда мы смотрим на механизм чтения/записи данных в перспективе одной задачи, все очень просто. Задача читает данные от локальной очереди (которая в свою очередь читает данные от пакета данных, связанных с некоторым положением), и записывает эти данные в исходящий буфер (связанный с тем же самым положением).
Хитрая часть начинается, когда пакет данных исчерпан (“if not Result ” в коде ниже):
С этого момента, GetNext продолжается таким же образом, как и в простом foreach, за исключением того, что GetNext устанавливает положение активного буфера всякий раз, когда новый пакет данных прочитан от менеджера данных.
Другая часть магии происходит в метод, который вызывается из MarkFull. Этот метод идет по буферному списку и проверяет, есть ли какие-нибудь буфера, которые являются а) полными и б) предназначенными для текущей позиции вывода. Такие буферы копируются в очередь выходных данных и возвращаются в использование.
• Каждый буфер данных связан с положением.
• У каждой локальной очереди есть два исходящих буфера, один из которых является активным, а другой может быть в этот момент времени свободным или полным.
• Каждый исходящий буфер также связан с положением.
• Локальная очередь записывает данные, чтобы создать буфер.
• Когда буфер будет полон, он поместится в список буферов ожидания. В тот момент все соответствующие буфера ожидания будут скопированы в очередь выходных данных.
Продолжение следует…
В предыдущих частях мы немного разабрались с внутренним механизмом работы Parallel ForEach. В этой части мы разберемся с очередью выходных даных. В OTL она представлена модификатором .PreserveOutput.
Очередь выходных данных, как правило, используются вместе с .Into- модификатором. Причина этого кроется в обеспечении интеграции между параллельной инфраструктурой и вашим параллельный кодом. Тем кодом, который выполняется в качестве задачи в Parallel.ForEach.
В обычной конструкции ForEach, выход из параллельной задачи не определен.
Вы имеете право делать все, что вам угодно с каждым элементом параллельного цикла, выводить какие-либо результаты (в случае, если нужно) в рамках работы с элементом, но параллельные потоки, выполняющиеся в рамках распараллеливания цикла ничего знать об этом не будут.
Таким образом, OTL не имеет возможности хранить любое выходное значение, потому что - по крайней мере, с точки зрения библиотеки – распараллеленый код не создает никаких выходных данных.
Однако когда вы задействуете модификатор .Into ваша программа начинает использовать другие механизмы работы (другие параметры в сигнатуре для вашего кода). Рассмотрим это на простом демонстрационном примере OTL 38:
Parallel.ForEach(1, CMaxTest)
.PreserveOrder
.Into(primeQueue)
.Execute(
procedure (const value: integer; var res: TOmniValue)
begin
if IsPrime(value) then
res := value;
end);
Поскольку модификатор .Into определен, параллельный код должен принять два параметра. Сначала - как обычно - входное значение (в рамках цикла Parallel.ForEach(1, CMaxTest)), вторым параметром идет – выходное значение TOmniValue, возвращающая собой результат работы кода в параллельной задаче (обработка отдельного значения). Передем к рассмотрению внутреннего кода OTL – части, которая намечает параллельные задачи:
InternalExecuteTask(
procedure (const task: IOmniTask)
var
localQueue : TOmniLocalQueue;
outputBuffer_ref: TOmniOutputBuffer;
position : int64;
result : TOmniValue;
value : TOmniValue;
begin
oplDataManager.SetOutput(oplIntoQueueIntf);
localQueue := oplDataManager.CreateLocalQueue;
try
outputBuffer_ref := oplDataManager.AllocateOutputBuffer;
try
localQueue.AssociateBuffer(outputBuffer_ref);
result := TOmniValue.Null;
while (not Stopped) and
localQueue.GetNext(position, value) do
begin
loopBody(task, value, result);
if not result.IsEmpty then begin
outputBuffer_ref.Submit(position, result);
result := TOmniValue.Null;
end;
end;
finally
oplDataManager.ReleaseOutputBuffer(outputBuffer_ref);
end;
finally
FreeAndNil(localQueue);
end;
end
);
Итак, рассмотрим механизм работы:• Менеджер данных связан с очередью выходных данных (поле oplIntoQueueIntf содержит значение, которую передают к .Into методу).
• Созданная локальная очередь делает те-же самые действия, как и в ситуации, когда foreach вызван штатно.
• Исходящий буфер создан по условию менеджера и связан с локальной очередью.
• Пользовательский код выполнен, и каждое непустое значение, вернувшееся с пользовательского кода, будет записано в исходящий буфер.
• Исходящий буфер выгружен, как локальная очередь.
Интересная часть - как обычно - скрыта на заднем плане; в локальной очереди, менеджере данных и исходящем буфере.
Соединение всего этого
Первая модификация находится в источнике данных. Когда используется модификатор .PreserveOrder, каждый пакет данных, знает исходное положение для чтения. Чтобы упростить дело, раскол пакета данных не используется в этом случае, соответсвенно распараллеливание будет недостаточно эффективно с точки зрения производительности.
У каждой локальной очереди есть свой связанный с ней исходящий буфер.
Каждый задача управляет двумя исходящими буферами. Один из них является активным, и в него задача записывает данные. Другой буфер может быть пустым или полным. Каждый буфер связан с входной позицией - так же, как пакет данных.
Когда мы смотрим на механизм чтения/записи данных в перспективе одной задачи, все очень просто. Задача читает данные от локальной очереди (которая в свою очередь читает данные от пакета данных, связанных с некоторым положением), и записывает эти данные в исходящий буфер (связанный с тем же самым положением).
Хитрая часть начинается, когда пакет данных исчерпан (“if not Result ” в коде ниже):
function TOmniLocalQueueImpl.GetNext(var position: int64; var value: TOmniValue): boolean;
begin
Result := lqiDataPackage.GetNext(position, value);
if not Result then begin
{$IFDEF Debug}Assert(assigned(lqiBufferSet));{$ENDIF Debug}
lqiBufferSet.ActiveBuffer.MarkFull;
lqiBufferSet.ActivateBuffer;
// this will block if alternate buffer is also full
Result := lqiDataManager_ref.GetNext(lqiDataPackage);
if Result then begin
Result := lqiDataPackage.GetNext(position, value);
if Result then
lqiBufferSet.ActiveBuffer.Range := lqiDataPackage.Range;
end;
end;
end; { TOmniLocalQueueImpl.GetNext }
Во-первых, активный буфер будет помечен как полный. Это приведет к вызову NotifyBufferFull. Тогда, активируется альтернативный буфер. Стоит отметить, что вызов (.ActivateBuffer) будет заблокирован, если альтернативный буфер не будет свободен. В таком случае текущий поток заблокируется до освобождения одного из двух своих буферов, вернее пока один из ее буферов не запишет данные в очередь выходных данных.С этого момента, GetNext продолжается таким же образом, как и в простом foreach, за исключением того, что GetNext устанавливает положение активного буфера всякий раз, когда новый пакет данных прочитан от менеджера данных.
Другая часть магии происходит в метод, который вызывается из MarkFull. Этот метод идет по буферному списку и проверяет, есть ли какие-нибудь буфера, которые являются а) полными и б) предназначенными для текущей позиции вывода. Такие буферы копируются в очередь выходных данных и возвращаются в использование.
procedure TOmniBaseDataManager.NotifyBufferFull(buffer: TOmniOutputBufferImpl);
begin
// Remove buffer from the list. Check if next buffer is waiting in
// the list. Copy buffer if it is full and repeat the process.
dmBufferRangeLock.Acquire;
try
while (dmBufferRangeList.Count > 0) and
(BufferList[0].Range.First = dmNextPosition) and
BufferList[0].IsFull do
begin
buffer := TOmniOutputBufferImpl(
dmBufferRangeList.ExtractObject(0));
dmNextPosition := buffer.Range.Last + 1;
buffer.CopyToOutput;
end;
finally dmBufferRangeLock.Release; end;
end; { TOmniBaseDataManager.NotifyBufferFull }
Можно резюмировать, что• Каждый буфер данных связан с положением.
• У каждой локальной очереди есть два исходящих буфера, один из которых является активным, а другой может быть в этот момент времени свободным или полным.
• Каждый исходящий буфер также связан с положением.
• Локальная очередь записывает данные, чтобы создать буфер.
• Когда буфер будет полон, он поместится в список буферов ожидания. В тот момент все соответствующие буфера ожидания будут скопированы в очередь выходных данных.
Продолжение следует…
Комментариев нет:
Отправить комментарий