четверг, 23 июня 2011 г.

Использование OmniThread Libray 2.0 (OTL 2.0) для создания многопоточных приложений. Настройка фоновых задач в OTLParallel

Простота параллелизма высокоуровневых фоновых задач библиотеки OmniThreadLibrary (модуль OtlParallel) проявляется при управлении несколькими частями независимого кода. Это хорошо показано в предыдущей части.
Однако при такой высокоуровневой обработке фоновых задач необходимы инструменты, позволяющие настраивать и регулировать эти задачи. Одним из примеров настройки является общение фоновой высокоуровневой задачи с главным потоком программы (основным потоком).
Ранее взаимодействие между фоновыми задачами и основным потоком программы, ложилось на плечи каналов связи, которые мы рассматривали ранее. Приходилось реализовать такие каналы вручную или опускаясь на низкий уровень блиотеки работать с CreateTask.
Однако начиная с выпуска 910, OtlParallel содержит намного более простой способ создания фоновых задачи и настройки их для коммуникации с владельцем (сиречь основным потоком программы).

Реализация этого механизма меняется от одной структуры высокого уровня до другой, и мы будем рассматривать это позже. То, что их объединяет вместе – это то, что все имеют интерфейс IOmniTaskConfig, возвращающейся из функции Parallel.TaskConfig.
class function Parallel.TaskConfig: IOmniTaskConfig;
begin
  Result := TOmniTaskConfig.Create;
end;
Этот новый интерфейс содержит функции, чтобы настроить обработчик сообщений (OnMessage), обработчик завершения задачи (OnTerminated), флаг отмены прохода(выполнения) задачи (CancelWith), связать задачу с диспетчером мониторинга (MonitorWith), или со счетчиком (WithCounter) а также с блокировкой (WithLock). Вся это функциональность интерфейса реализована непосредственно, таким образом, что задание этих свойств может использоваться в быстрой (цепочечной) манере. С цепочной манерой задания свойст мы уже сталкивались ранее, для тех кто пропустил данный пункт, сообщаю снова, фактически каждая функция данного интерфейса возращает указатель на свой интерфейс, попутно выставляю нужные флаги, обработчики и т.п. поэтому настройка нужных свойств может вестись в цепочной манере, например так:
Var
  Cfg: IOmniTaskConfig
Begin
  Cfg. MonitorWith(Monitor).OnTerminated(HandleTerminater);
Что равнозначно вызову в классической манере
Var
  Cfg: IOmniTaskConfig
Begin
  Cfg. MonitorWith(Monitor);
  Cfg.OnTerminated(HandleTerminater);
Сам же интерфейс IOmniTaskConfig представлен ниже:
IOmniTaskConfig = interface
  procedure Apply(const task: IOmniTaskControl);
  function  CancelWith(const token: IOmniCancellationToken): 
    IOmniTaskConfig;
  function  MonitorWith(const monitor: IOmniTaskControlMonitor): 
    IOmniTaskConfig;
  function  OnMessage(eventDispatcher: TObject): 
    IOmniTaskConfig; overload;
  function  OnMessage(eventHandler: TOmniTaskMessageEvent): 
    IOmniTaskConfig; overload;
  function  OnMessage(msgID: word; 
    eventHandler: TOmniTaskMessageEvent): IOmniTaskConfig; overload;
  function  OnMessage(msgID: word; eventHandler: TOmniMessageExec): 
    IOmniTaskConfig; overload;
  function  OnTerminated(eventHandler: TOmniTaskTerminatedEvent): 
    IOmniTaskConfig; overload;
  function  OnTerminated(eventHandler: TOmniOnTerminatedFunction): 
    IOmniTaskConfig; overload;
  function  WithCounter(const counter: IOmniCounter): IOmniTaskConfig;
  function  WithLock(const lock: TSynchroObject; 
    autoDestroyLock: boolean = true): IOmniTaskConfig; overload;
  function  WithLock(const lock: IOmniCriticalSection): 
    IOmniTaskConfig; overload;
end;
Итак, соответсвенно с введением нового интерфейса были переработаны некоторые методы, а именно:

Async
Методы Parallel.Async теперь принимают дополнительный параметр типа IOmniTaskConfig. Если он установлен, то фоновая задача будет формироваться с указанной конфигурацией.
class procedure Async(task: TProc; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Async(task: TOmniTaskDelegate; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Async(task: TProc; onTermination: TProc; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Async(task: TOmniTaskDelegate; onTermination: TProc;
   taskConfig: IOmniTaskConfig = nil); overload;
Далее демонстрационное заявление формирует задачу, посылает log-сообщения в основную задачу (то есть в форму):
Parallel.Async(
    procedure (const task: IOmniTask)
    var
      i: integer;
    begin
      task.Comm.Send(WM_LOG, 'Starting');
      for i := 1 to 10 do begin
        task.Comm.Send(WM_LOG, i);
        Sleep(200);
      end;
      task.Comm.Send(WM_LOG, 'Completed');
    end,

    procedure
    begin
      btnAsync.Enabled := true;
    end,

    Parallel.TaskConfig.OnMessage(Self) // указываем сразу обработчик
    );
Обработчик сообщения WM_LOG:.
procedure WMLog(var msg: TOmniMessage); message WM_LOG;

procedure TfrmDemoParallelTaskConfig.WMLog(var msg: TOmniMessage);
begin
  lbLog.ItemIndex := lbLog.Items.Add('BGTASK: ' + msg.MsgData);
end;
Join
Аналогично Async, метод Join также принимает дополнительный параметр типа IOmniTaskConfig.
class procedure Join(const task1, task2: TProc; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Join(const task1, task2: TOmniTaskDelegate;
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Join(const tasks: array of TProc; 
  taskConfig: IOmniTaskConfig = nil); overload;
class procedure Join(const tasks: array of TOmniTaskDelegate;
  taskConfig: IOmniTaskConfig = nil); overload;
Следующее демо-приложение использует TaskConfig.WithLock для передачи хендла блокировки двум фоновым задачам использующим один разделенный ресурс:
procedure TfrmDemoParallelTaskConfig.btnJoinClick(Sender: TObject);
begin
  FSharedValue := 42;
  Parallel.Join(
    procedure (const task: IOmniTask)
    var
      i: integer;
    begin
      for i := 1 to 1000000 do begin
        task.Lock.Acquire;
        FSharedValue := FSharedValue + 17;
        task.Lock.Release;
      end;
    end,
    procedure (const task: IOmniTask)
    var
      i: integer;
    begin
      for i := 1 to 1000000 do begin
        task.Lock.Acquire;
        FSharedValue := FSharedValue - 17;
        task.Lock.Release;
      end;
    end
    ,Parallel.TaskConfig.WithLock(CreateOmniCriticalSection)
  );
  lbLog.ItemIndex := lbLog.Items.Add(Format(
    'JOIN: Shared value = %d (should be 42)', [FSharedValue]));
end;

Feature
Метод Palallel.Feature также имеет дополнительный параметр типа IOmniTaskConfig.
class function Future(action: TOmniFutureDelegate; 
  taskConfig: IOmniTaskConfig = nil): IOmniFuture; overload;
class function Future(action: TOmniFutureDelegateEx; 
  taskConfig: IOmniTaskConfig = nil): IOmniFuture; overload;
Демонстрационное приложение показывает как вы можете послать сообщение от будущей задачи:
procedure TfrmDemoParallelTaskConfig.btnFutureClick(Sender: TObject);
begin
  btnFuture.Enabled := false;
  FFuture := Parallel.Future(
    function (const task: IOmniTask): integer
    begin
      Sleep(500);
      Result := 42;
      task.Comm.Send(WM_FUTURE_RESULT);
    end,
    Parallel.TaskConfig.OnMessage(Self)
  )
end;

procedure WMFutureResult(var msg: TOmniMessage);  message WM_FUTURE_RESULT;

procedure TfrmDemoParallelTaskConfig.WMFutureResult(var msg: TOmniMessage);
begin
  lbLog.ItemIndex := lbLog.Items.Add('FUTURE: ' + IntToStr(FFuture.Value));
  FFuture := nil;
  btnFuture.Enabled := true;
end;
PipeLine
Трубопровод также расширился:
IOmniPipeline = interface
  function  Stage(pipelineStage: TPipelineStageDelegate; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stage(pipelineStage: TPipelineStageDelegateEx; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stages(const pipelineStages: 
    array of TPipelineStageDelegate; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
  function  Stages(const pipelineStages: 
    array of TPipelineStageDelegateEx; 
    taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload;
end;
Демонстрационное приложение снова только посылает сообщения от стадий в основную задачу:
procedure TfrmDemoParallelTaskConfig.btnPipelineClick(Sender: TObject);
var
  pipeOut: IOmniBlockingCollection;
  value  : TOmniValue;
begin
  pipeOut := Parallel.Pipeline
    .Stages([PipelineStage1, PipelineStage2], 
       Parallel.TaskConfig.OnMessage(Self))
    .Run;
  while not pipeOut.TryTake(value) do
    Application.ProcessMessages;
  lbLog.ItemIndex := lbLog.Items.Add('PIPELINE: ' + 
    IntToStr(value) + ' (should be 500500)');
end;
procedure TfrmDemoParallelTaskConfig.PipelineStage1(const input, output:
    IOmniBlockingCollection; const task: IOmniTask);
var
  i: integer;
begin
  task.Comm.Send(WM_LOG, 'Pipeline stage 1 starting');
  for i := 1 to 1000 do
    output.Add(i);
  task.Comm.Send(WM_LOG, 'Pipeline stage 1 stopped');
end;

ForEach
Метод Parallel.ForEach использует еще один подход. Интерфейс IOmniParallelLoop (интерфейс возвращается из метода ForEach) осуществляет функцию TaskConfig, которая принимает блок конфигурации.
IOmniParallelLoop = interface
  function  TaskConfig(
    const config: IOmniTaskConfig): IOmniParallelLoop;
end;
Следующее демонстрационное приложение иллюстрирует, как использовать TaskConfig в ForEach и в то же самое время демонстрирует другой интересный факт - хоть интерфейс и возвратился из Parallel.ForEach все-же большую часть времени он должен использаться в методе Execute. Поэтому нужно, где то его сохранить.
Это особенно полезно, если Вы управляете ForEach с выбором NoWait. В этом случае, ссылка, на интерфейс возвращенная из Parallel.ForEach должна быть сохранена где-нибудь, пока параллельная задача не заканчит свое выполнение.
Код ниже демонстрирует это результат Parallel.ForEach сохранен в глобальной переменной FParallel. Метод OnStop используется, чтобы обнулить эту переменную в конце.
Метод FParallel.Execute вызывается отдельно.
var
    FParallel: IOmniParallelLoop;
procedure TfrmDemoParallelTaskConfig.btnForEachClick(Sender: TObject);
begin
  FParallel := Parallel.ForEach(1, 17)
    .TaskConfig(Parallel.TaskConfig.OnMessage(Self))
    .NoWait
    .OnStop(
         procedure 
         begin 
             FParallel := nil; 
         End 
           );
  FParallel
    .Execute(
      procedure (const task: IOmniTask; const value: integer)
      begin
        task.Comm.Send(WM_LOG, value);
      end
            );
end;

ForkJoin
Метод Parallel.ForkJoin осуществляет только ограниченную поддержку конфигурации задачи. Хотя Вы можете передать конфигурацию задачи на интерфейс IOmniForkJoin, вызывая метод TaskConfig (так же, как в случае ForEach), интерфейс IOmniTask не выставлен в параллельных вычислениях (в IOmniForkJoin Compute), и поэтому не может использоваться для передачи сообщений. Это связано с особенностями выполнением ForkJoin и вероятно такое поведение не изменится в ближайшем будущем.
IOmniForkJoin = interface
  function  TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin;
end;
Из-за этого ограничения демонстрационное приложение использует конфигурацию задачи, только для того чтобы послать уведомление, что метод вилки/соединения закончил свое выполнение.
procedure TfrmDemoParallelTaskConfig.btnForkJoinClick(Sender: TObject);
var
  data: TArray;
  max : integer;
begin
  data := TArray.Create(1, 17, 4, 99, -250, 7, 13, 132, 101);
  max := ParallelMax(
    data,
    Parallel.ForkJoin
      .TaskConfig(Parallel.TaskConfig.OnTerminated(
        procedure (const task: IOmniTaskControl)
        begin
          lbLog.ItemIndex := lbLog.Items.Add(Format(
            'COMPUTE: Task %d terminated', [task.UniqueID]));
        end
      )),
    Low(data),
    High(data));
  lbLog.ItemIndex := lbLog.Items.Add('FORKJOIN: ' + IntToStr(max) + 
    ' (expected 132)');
end;
Продолжение следует…



1 комментарий:

  1. Помогите разобраться

    Многопоточное приложение обрабатывает файлы записывает результаты в БД (MS SQL Server)

    многопоточность реализованна с помощью библиотеки OmniThreadLibrary,
    Создаю Пул потоков FConnectionPool := CreateThreadPool('Connection pool');

    Один экземпляр потока (Worker) делает:
    чтение файла с диска (моментально)
    расчёт по файлу (долго)
    запись в БД (моментально)

    потоки полностью независимые

    Необходимо максимально быстрая обработка, полная загрузка сервера работой

    Проблема в неполной загрузке процессора (4 ядра).

    запускаю в 1 поток = обработка ~120 файла в сек, загрузка ЦП=25% (одно ядро на 100%)
    FConnectionPool.MaxExecuting := 1;

    запускаю в 4 потока = обработка ~120 файла в сек, загрузка ЦП=25% (нагрузка размазана по 4-ём ядрам (на 25% каждое)
    FConnectionPool.MaxExecuting := 4;

    Запускаю второй exe , получается 120+120 , 25%+25%

    Как добиться чтобы в рамках одного процесса(exe файла) добиться загрузки процессора 75% и производительности 120*3 файла/сек ??
    Повышение приоритета процесса(через диспетчера задач - ничего не даёт)

    ОтветитьУдалить