вторник, 22 марта 2011 г.

Использование OmniThread Libray 2.0 (OTL 2.0) для создания многопоточных приложений - 7

Очередь выходных данных и исходящие буферы
В предыдущих частях мы немного разабрались с внутренним механизмом работы Parallel ForEach. В этой части мы разберемся с очередью выходных даных. В OTL она представлена модификатором .PreserveOutput.
Очередь выходных данных, как правило, используются вместе с .Into- модификатором. Причина этого кроется в обеспечении интеграции между параллельной инфраструктурой и вашим параллельный кодом. Тем кодом, который выполняется в качестве задачи в Parallel.ForEach.
В обычной конструкции ForEach, выход из параллельной задачи не определен.
Вы имеете право делать все, что вам угодно с каждым элементом параллельного цикла, выводить какие-либо результаты (в случае, если нужно) в рамках работы с элементом, но параллельные потоки, выполняющиеся в рамках распараллеливания цикла ничего знать об этом не будут.

Таким образом, OTL не имеет возможности хранить любое выходное значение, потому что - по крайней мере, с точки зрения библиотеки – распараллеленый код не создает никаких выходных данных.
Однако когда вы задействуете модификатор .Into ваша программа начинает использовать другие механизмы работы (другие параметры в сигнатуре для вашего кода). Рассмотрим это на простом демонстрационном примере OTL 38:
Parallel.ForEach(1, CMaxTest)
    .PreserveOrder
    .Into(primeQueue)
    .Execute(
    procedure (const value: integer; var res: TOmniValue)
    begin
      if IsPrime(value) then
        res := value;
    end);
Поскольку модификатор .Into определен, параллельный код должен принять два параметра. Сначала - как обычно - входное значение (в рамках цикла Parallel.ForEach(1, CMaxTest)), вторым параметром идет – выходное значение TOmniValue, возвращающая собой результат работы кода в параллельной задаче (обработка отдельного значения).
Передем к рассмотрению внутреннего кода OTL – части, которая намечает параллельные задачи:
InternalExecuteTask(
    procedure (const task: IOmniTask)
    var
      localQueue      : TOmniLocalQueue;
      outputBuffer_ref: TOmniOutputBuffer;
      position        : int64;
      result          : TOmniValue;
      value           : TOmniValue;
    begin
      oplDataManager.SetOutput(oplIntoQueueIntf);
      localQueue := oplDataManager.CreateLocalQueue;
      try
        outputBuffer_ref := oplDataManager.AllocateOutputBuffer;
        try
          localQueue.AssociateBuffer(outputBuffer_ref);
          result := TOmniValue.Null;
          while (not Stopped) and 
                localQueue.GetNext(position, value) do 
          begin
            loopBody(task, value, result);
            if not result.IsEmpty then begin
              outputBuffer_ref.Submit(position, result);
              result := TOmniValue.Null;
            end;
          end;
        finally 
          oplDataManager.ReleaseOutputBuffer(outputBuffer_ref); 
        end;
      finally 
        FreeAndNil(localQueue); 
      end;
    end
    );
Итак, рассмотрим механизм работы:
• Менеджер данных связан с очередью выходных данных (поле oplIntoQueueIntf содержит значение, которую передают к .Into методу).
• Созданная локальная очередь делает те-же самые действия, как и в ситуации, когда foreach вызван штатно.
• Исходящий буфер создан по условию менеджера и связан с локальной очередью.
• Пользовательский код выполнен, и каждое непустое значение, вернувшееся с пользовательского кода, будет записано в исходящий буфер.
• Исходящий буфер выгружен, как локальная очередь.
Интересная часть - как обычно - скрыта на заднем плане; в локальной очереди, менеджере данных и исходящем буфере.

Соединение всего этого

Первая модификация находится в источнике данных. Когда используется модификатор .PreserveOrder, каждый пакет данных, знает исходное положение для чтения. Чтобы упростить дело, раскол пакета данных не используется в этом случае, соответсвенно распараллеливание будет недостаточно эффективно с точки зрения производительности.
У каждой локальной очереди есть свой связанный с ней исходящий буфер.
Каждый задача управляет двумя исходящими буферами. Один из них является активным, и в него задача записывает данные. Другой буфер может быть пустым или полным. Каждый буфер связан с входной позицией - так же, как пакет данных.

Когда мы смотрим на механизм чтения/записи данных в перспективе одной задачи, все очень просто. Задача читает данные от локальной очереди (которая в свою очередь читает данные от пакета данных, связанных с некоторым положением), и записывает эти данные в исходящий буфер (связанный с тем же самым положением).

Хитрая часть начинается, когда пакет данных исчерпан (“if not Result ” в коде ниже):
function TOmniLocalQueueImpl.GetNext(var position: int64; var value: TOmniValue): boolean;
begin
  Result := lqiDataPackage.GetNext(position, value);
  if not Result then begin
    {$IFDEF Debug}Assert(assigned(lqiBufferSet));{$ENDIF Debug}
    lqiBufferSet.ActiveBuffer.MarkFull;
    lqiBufferSet.ActivateBuffer; 
      // this will block if alternate buffer is also full
    Result := lqiDataManager_ref.GetNext(lqiDataPackage);
    if Result then begin
      Result := lqiDataPackage.GetNext(position, value);
      if Result then
        lqiBufferSet.ActiveBuffer.Range := lqiDataPackage.Range;
    end;
  end;
end; { TOmniLocalQueueImpl.GetNext }
Во-первых, активный буфер будет помечен как полный. Это приведет к вызову NotifyBufferFull. Тогда, активируется альтернативный буфер. Стоит отметить, что вызов (.ActivateBuffer) будет заблокирован, если альтернативный буфер не будет свободен. В таком случае текущий поток заблокируется до освобождения одного из двух своих буферов, вернее пока один из ее буферов не запишет данные в очередь выходных данных.
С этого момента, GetNext продолжается таким же образом, как и в простом foreach, за исключением того, что GetNext устанавливает положение активного буфера всякий раз, когда новый пакет данных прочитан от менеджера данных.
Другая часть магии происходит в метод, который вызывается из MarkFull. Этот метод идет по буферному списку и проверяет, есть ли какие-нибудь буфера, которые являются а) полными и б) предназначенными для текущей позиции вывода. Такие буферы копируются в очередь выходных данных и возвращаются в использование.
procedure TOmniBaseDataManager.NotifyBufferFull(buffer: TOmniOutputBufferImpl);
begin
  // Remove buffer from the list. Check if next buffer is waiting in
  // the list. Copy buffer if it is full and repeat the process.
  dmBufferRangeLock.Acquire;
  try
    while (dmBufferRangeList.Count > 0) and
          (BufferList[0].Range.First = dmNextPosition) and
          BufferList[0].IsFull do
    begin
      buffer := TOmniOutputBufferImpl(
        dmBufferRangeList.ExtractObject(0));
      dmNextPosition := buffer.Range.Last + 1;
      buffer.CopyToOutput;
    end;
  finally dmBufferRangeLock.Release; end;
end; { TOmniBaseDataManager.NotifyBufferFull }
Можно резюмировать, что

• Каждый буфер данных связан с положением.
• У каждой локальной очереди есть два исходящих буфера, один из которых является активным, а другой может быть в этот момент времени свободным или полным.
• Каждый исходящий буфер также связан с положением.
• Локальная очередь записывает данные, чтобы создать буфер.
• Когда буфер будет полон, он поместится в список буферов ожидания. В тот момент все соответствующие буфера ожидания будут скопированы в очередь выходных данных.

Продолжение следует…



Комментариев нет:

Отправить комментарий