Очередь выходных данных и исходящие буферы
В предыдущих частях мы немного разабрались с внутренним механизмом работы Parallel ForEach. В этой части мы разберемся с очередью выходных даных. В OTL она представлена модификатором .PreserveOutput.
Очередь выходных данных, как правило, используются вместе с .Into- модификатором. Причина этого кроется в обеспечении интеграции между параллельной инфраструктурой и вашим параллельный кодом. Тем кодом, который выполняется в качестве задачи в Parallel.ForEach.
В обычной конструкции ForEach, выход из параллельной задачи не определен.
Вы имеете право делать все, что вам угодно с каждым элементом параллельного цикла, выводить какие-либо результаты (в случае, если нужно) в рамках работы с элементом, но параллельные потоки, выполняющиеся в рамках распараллеливания цикла ничего знать об этом не будут.
Таким образом, OTL не имеет возможности хранить любое выходное значение, потому что - по крайней мере, с точки зрения библиотеки – распараллеленый код не создает никаких выходных данных.
Однако когда вы задействуете модификатор .Into ваша программа начинает использовать другие механизмы работы (другие параметры в сигнатуре для вашего кода). Рассмотрим это на простом демонстрационном примере OTL 38:
Передем к рассмотрению внутреннего кода OTL – части, которая намечает параллельные задачи:
• Менеджер данных связан с очередью выходных данных (поле oplIntoQueueIntf содержит значение, которую передают к .Into методу).
• Созданная локальная очередь делает те-же самые действия, как и в ситуации, когда foreach вызван штатно.
• Исходящий буфер создан по условию менеджера и связан с локальной очередью.
• Пользовательский код выполнен, и каждое непустое значение, вернувшееся с пользовательского кода, будет записано в исходящий буфер.
• Исходящий буфер выгружен, как локальная очередь.
Интересная часть - как обычно - скрыта на заднем плане; в локальной очереди, менеджере данных и исходящем буфере.
Соединение всего этого
Первая модификация находится в источнике данных. Когда используется модификатор .PreserveOrder, каждый пакет данных, знает исходное положение для чтения. Чтобы упростить дело, раскол пакета данных не используется в этом случае, соответсвенно распараллеливание будет недостаточно эффективно с точки зрения производительности.
У каждой локальной очереди есть свой связанный с ней исходящий буфер.
Каждый задача управляет двумя исходящими буферами. Один из них является активным, и в него задача записывает данные. Другой буфер может быть пустым или полным. Каждый буфер связан с входной позицией - так же, как пакет данных.
Когда мы смотрим на механизм чтения/записи данных в перспективе одной задачи, все очень просто. Задача читает данные от локальной очереди (которая в свою очередь читает данные от пакета данных, связанных с некоторым положением), и записывает эти данные в исходящий буфер (связанный с тем же самым положением).
Хитрая часть начинается, когда пакет данных исчерпан (“if not Result ” в коде ниже):
С этого момента, GetNext продолжается таким же образом, как и в простом foreach, за исключением того, что GetNext устанавливает положение активного буфера всякий раз, когда новый пакет данных прочитан от менеджера данных.
Другая часть магии происходит в метод, который вызывается из MarkFull. Этот метод идет по буферному списку и проверяет, есть ли какие-нибудь буфера, которые являются а) полными и б) предназначенными для текущей позиции вывода. Такие буферы копируются в очередь выходных данных и возвращаются в использование.
• Каждый буфер данных связан с положением.
• У каждой локальной очереди есть два исходящих буфера, один из которых является активным, а другой может быть в этот момент времени свободным или полным.
• Каждый исходящий буфер также связан с положением.
• Локальная очередь записывает данные, чтобы создать буфер.
• Когда буфер будет полон, он поместится в список буферов ожидания. В тот момент все соответствующие буфера ожидания будут скопированы в очередь выходных данных.
Продолжение следует…
В предыдущих частях мы немного разабрались с внутренним механизмом работы Parallel ForEach. В этой части мы разберемся с очередью выходных даных. В OTL она представлена модификатором .PreserveOutput.
Очередь выходных данных, как правило, используются вместе с .Into- модификатором. Причина этого кроется в обеспечении интеграции между параллельной инфраструктурой и вашим параллельный кодом. Тем кодом, который выполняется в качестве задачи в Parallel.ForEach.
В обычной конструкции ForEach, выход из параллельной задачи не определен.
Вы имеете право делать все, что вам угодно с каждым элементом параллельного цикла, выводить какие-либо результаты (в случае, если нужно) в рамках работы с элементом, но параллельные потоки, выполняющиеся в рамках распараллеливания цикла ничего знать об этом не будут.
Таким образом, OTL не имеет возможности хранить любое выходное значение, потому что - по крайней мере, с точки зрения библиотеки – распараллеленый код не создает никаких выходных данных.
Однако когда вы задействуете модификатор .Into ваша программа начинает использовать другие механизмы работы (другие параметры в сигнатуре для вашего кода). Рассмотрим это на простом демонстрационном примере OTL 38:
Parallel.ForEach(1, CMaxTest) .PreserveOrder .Into(primeQueue) .Execute( procedure (const value: integer; var res: TOmniValue) begin if IsPrime(value) then res := value; end);Поскольку модификатор .Into определен, параллельный код должен принять два параметра. Сначала - как обычно - входное значение (в рамках цикла Parallel.ForEach(1, CMaxTest)), вторым параметром идет – выходное значение TOmniValue, возвращающая собой результат работы кода в параллельной задаче (обработка отдельного значения).
Передем к рассмотрению внутреннего кода OTL – части, которая намечает параллельные задачи:
InternalExecuteTask( procedure (const task: IOmniTask) var localQueue : TOmniLocalQueue; outputBuffer_ref: TOmniOutputBuffer; position : int64; result : TOmniValue; value : TOmniValue; begin oplDataManager.SetOutput(oplIntoQueueIntf); localQueue := oplDataManager.CreateLocalQueue; try outputBuffer_ref := oplDataManager.AllocateOutputBuffer; try localQueue.AssociateBuffer(outputBuffer_ref); result := TOmniValue.Null; while (not Stopped) and localQueue.GetNext(position, value) do begin loopBody(task, value, result); if not result.IsEmpty then begin outputBuffer_ref.Submit(position, result); result := TOmniValue.Null; end; end; finally oplDataManager.ReleaseOutputBuffer(outputBuffer_ref); end; finally FreeAndNil(localQueue); end; end );Итак, рассмотрим механизм работы:
• Менеджер данных связан с очередью выходных данных (поле oplIntoQueueIntf содержит значение, которую передают к .Into методу).
• Созданная локальная очередь делает те-же самые действия, как и в ситуации, когда foreach вызван штатно.
• Исходящий буфер создан по условию менеджера и связан с локальной очередью.
• Пользовательский код выполнен, и каждое непустое значение, вернувшееся с пользовательского кода, будет записано в исходящий буфер.
• Исходящий буфер выгружен, как локальная очередь.
Интересная часть - как обычно - скрыта на заднем плане; в локальной очереди, менеджере данных и исходящем буфере.
Соединение всего этого
Первая модификация находится в источнике данных. Когда используется модификатор .PreserveOrder, каждый пакет данных, знает исходное положение для чтения. Чтобы упростить дело, раскол пакета данных не используется в этом случае, соответсвенно распараллеливание будет недостаточно эффективно с точки зрения производительности.
У каждой локальной очереди есть свой связанный с ней исходящий буфер.
Каждый задача управляет двумя исходящими буферами. Один из них является активным, и в него задача записывает данные. Другой буфер может быть пустым или полным. Каждый буфер связан с входной позицией - так же, как пакет данных.
Когда мы смотрим на механизм чтения/записи данных в перспективе одной задачи, все очень просто. Задача читает данные от локальной очереди (которая в свою очередь читает данные от пакета данных, связанных с некоторым положением), и записывает эти данные в исходящий буфер (связанный с тем же самым положением).
Хитрая часть начинается, когда пакет данных исчерпан (“if not Result ” в коде ниже):
function TOmniLocalQueueImpl.GetNext(var position: int64; var value: TOmniValue): boolean; begin Result := lqiDataPackage.GetNext(position, value); if not Result then begin {$IFDEF Debug}Assert(assigned(lqiBufferSet));{$ENDIF Debug} lqiBufferSet.ActiveBuffer.MarkFull; lqiBufferSet.ActivateBuffer; // this will block if alternate buffer is also full Result := lqiDataManager_ref.GetNext(lqiDataPackage); if Result then begin Result := lqiDataPackage.GetNext(position, value); if Result then lqiBufferSet.ActiveBuffer.Range := lqiDataPackage.Range; end; end; end; { TOmniLocalQueueImpl.GetNext }Во-первых, активный буфер будет помечен как полный. Это приведет к вызову NotifyBufferFull. Тогда, активируется альтернативный буфер. Стоит отметить, что вызов (.ActivateBuffer) будет заблокирован, если альтернативный буфер не будет свободен. В таком случае текущий поток заблокируется до освобождения одного из двух своих буферов, вернее пока один из ее буферов не запишет данные в очередь выходных данных.
С этого момента, GetNext продолжается таким же образом, как и в простом foreach, за исключением того, что GetNext устанавливает положение активного буфера всякий раз, когда новый пакет данных прочитан от менеджера данных.
Другая часть магии происходит в метод, который вызывается из MarkFull. Этот метод идет по буферному списку и проверяет, есть ли какие-нибудь буфера, которые являются а) полными и б) предназначенными для текущей позиции вывода. Такие буферы копируются в очередь выходных данных и возвращаются в использование.
procedure TOmniBaseDataManager.NotifyBufferFull(buffer: TOmniOutputBufferImpl); begin // Remove buffer from the list. Check if next buffer is waiting in // the list. Copy buffer if it is full and repeat the process. dmBufferRangeLock.Acquire; try while (dmBufferRangeList.Count > 0) and (BufferList[0].Range.First = dmNextPosition) and BufferList[0].IsFull do begin buffer := TOmniOutputBufferImpl( dmBufferRangeList.ExtractObject(0)); dmNextPosition := buffer.Range.Last + 1; buffer.CopyToOutput; end; finally dmBufferRangeLock.Release; end; end; { TOmniBaseDataManager.NotifyBufferFull }Можно резюмировать, что
• Каждый буфер данных связан с положением.
• У каждой локальной очереди есть два исходящих буфера, один из которых является активным, а другой может быть в этот момент времени свободным или полным.
• Каждый исходящий буфер также связан с положением.
• Локальная очередь записывает данные, чтобы создать буфер.
• Когда буфер будет полон, он поместится в список буферов ожидания. В тот момент все соответствующие буфера ожидания будут скопированы в очередь выходных данных.
Продолжение следует…
Комментариев нет:
Отправить комментарий