Простота параллелизма высокоуровневых фоновых задач библиотеки OmniThreadLibrary (модуль OtlParallel) проявляется при управлении несколькими частями независимого кода. Это хорошо показано в предыдущей части.
Однако при такой высокоуровневой обработке фоновых задач необходимы инструменты, позволяющие настраивать и регулировать эти задачи. Одним из примеров настройки является общение фоновой высокоуровневой задачи с главным потоком программы (основным потоком).
Ранее взаимодействие между фоновыми задачами и основным потоком программы, ложилось на плечи каналов связи, которые мы рассматривали ранее. Приходилось реализовать такие каналы вручную или опускаясь на низкий уровень блиотеки работать с CreateTask.
Однако начиная с выпуска 910, OtlParallel содержит намного более простой способ создания фоновых задачи и настройки их для коммуникации с владельцем (сиречь основным потоком программы).
Реализация этого механизма меняется от одной структуры высокого уровня до другой, и мы будем рассматривать это позже. То, что их объединяет вместе – это то, что все имеют интерфейс IOmniTaskConfig, возвращающейся из функции Parallel.TaskConfig.
class function Parallel.TaskConfig: IOmniTaskConfig; begin Result := TOmniTaskConfig.Create; end;
Этот новый интерфейс содержит функции, чтобы настроить обработчик сообщений (OnMessage), обработчик завершения задачи (OnTerminated), флаг отмены прохода(выполнения) задачи (CancelWith), связать задачу с диспетчером мониторинга (MonitorWith), или со счетчиком (WithCounter) а также с блокировкой (WithLock). Вся это функциональность интерфейса реализована непосредственно, таким образом, что задание этих свойств может использоваться в быстрой (цепочечной) манере. С цепочной манерой задания свойст мы уже сталкивались ранее, для тех кто пропустил данный пункт, сообщаю снова, фактически каждая функция данного интерфейса возращает указатель на свой интерфейс, попутно выставляю нужные флаги, обработчики и т.п. поэтому настройка нужных свойств может вестись в цепочной манере, например так:
Var Cfg: IOmniTaskConfig Begin Cfg. MonitorWith(Monitor).OnTerminated(HandleTerminater);
Что равнозначно вызову в классической манере
Var Cfg: IOmniTaskConfig Begin Cfg. MonitorWith(Monitor); Cfg.OnTerminated(HandleTerminater);
Сам же интерфейс IOmniTaskConfig представлен ниже:
IOmniTaskConfig = interface procedure Apply(const task: IOmniTaskControl); function CancelWith(const token: IOmniCancellationToken): IOmniTaskConfig; function MonitorWith(const monitor: IOmniTaskControlMonitor): IOmniTaskConfig; function OnMessage(eventDispatcher: TObject): IOmniTaskConfig; overload; function OnMessage(eventHandler: TOmniTaskMessageEvent): IOmniTaskConfig; overload; function OnMessage(msgID: word; eventHandler: TOmniTaskMessageEvent): IOmniTaskConfig; overload; function OnMessage(msgID: word; eventHandler: TOmniMessageExec): IOmniTaskConfig; overload; function OnTerminated(eventHandler: TOmniTaskTerminatedEvent): IOmniTaskConfig; overload; function OnTerminated(eventHandler: TOmniOnTerminatedFunction): IOmniTaskConfig; overload; function WithCounter(const counter: IOmniCounter): IOmniTaskConfig; function WithLock(const lock: TSynchroObject; autoDestroyLock: boolean = true): IOmniTaskConfig; overload; function WithLock(const lock: IOmniCriticalSection): IOmniTaskConfig; overload; end;
Итак, соответсвенно с введением нового интерфейса были переработаны некоторые методы, а именно:
Async
Методы Parallel.Async теперь принимают дополнительный параметр типа IOmniTaskConfig. Если он установлен, то фоновая задача будет формироваться с указанной конфигурацией.
class procedure Async(task: TProc; taskConfig: IOmniTaskConfig = nil); overload; class procedure Async(task: TOmniTaskDelegate; taskConfig: IOmniTaskConfig = nil); overload; class procedure Async(task: TProc; onTermination: TProc; taskConfig: IOmniTaskConfig = nil); overload; class procedure Async(task: TOmniTaskDelegate; onTermination: TProc; taskConfig: IOmniTaskConfig = nil); overload;
Далее демонстрационное заявление формирует задачу, посылает log-сообщения в основную задачу (то есть в форму):
Parallel.Async( procedure (const task: IOmniTask) var i: integer; begin task.Comm.Send(WM_LOG, 'Starting'); for i := 1 to 10 do begin task.Comm.Send(WM_LOG, i); Sleep(200); end; task.Comm.Send(WM_LOG, 'Completed'); end, procedure begin btnAsync.Enabled := true; end, Parallel.TaskConfig.OnMessage(Self) // указываем сразу обработчик );
Обработчик сообщения WM_LOG:.
procedure WMLog(var msg: TOmniMessage); message WM_LOG; procedure TfrmDemoParallelTaskConfig.WMLog(var msg: TOmniMessage); begin lbLog.ItemIndex := lbLog.Items.Add('BGTASK: ' + msg.MsgData); end;
Join
Аналогично Async, метод Join также принимает дополнительный параметр типа IOmniTaskConfig.
class procedure Join(const task1, task2: TProc; taskConfig: IOmniTaskConfig = nil); overload; class procedure Join(const task1, task2: TOmniTaskDelegate; taskConfig: IOmniTaskConfig = nil); overload; class procedure Join(const tasks: array of TProc; taskConfig: IOmniTaskConfig = nil); overload; class procedure Join(const tasks: array of TOmniTaskDelegate; taskConfig: IOmniTaskConfig = nil); overload;
Следующее демо-приложение использует TaskConfig.WithLock для передачи хендла блокировки двум фоновым задачам использующим один разделенный ресурс:
procedure TfrmDemoParallelTaskConfig.btnJoinClick(Sender: TObject); begin FSharedValue := 42; Parallel.Join( procedure (const task: IOmniTask) var i: integer; begin for i := 1 to 1000000 do begin task.Lock.Acquire; FSharedValue := FSharedValue + 17; task.Lock.Release; end; end, procedure (const task: IOmniTask) var i: integer; begin for i := 1 to 1000000 do begin task.Lock.Acquire; FSharedValue := FSharedValue - 17; task.Lock.Release; end; end ,Parallel.TaskConfig.WithLock(CreateOmniCriticalSection) ); lbLog.ItemIndex := lbLog.Items.Add(Format( 'JOIN: Shared value = %d (should be 42)', [FSharedValue])); end;
Feature
Метод Palallel.Feature также имеет дополнительный параметр типа IOmniTaskConfig. class function Future(action: TOmniFutureDelegate ; taskConfig: IOmniTaskConfig = nil): IOmniFuture ; overload; class function Future (action: TOmniFutureDelegateEx ; taskConfig: IOmniTaskConfig = nil): IOmniFuture ; overload;
Демонстрационное приложение показывает как вы можете послать сообщение от будущей задачи:
procedure TfrmDemoParallelTaskConfig.btnFutureClick(Sender: TObject); begin btnFuture.Enabled := false; FFuture := Parallel.Future( function (const task: IOmniTask): integer begin Sleep(500); Result := 42; task.Comm.Send(WM_FUTURE_RESULT); end, Parallel.TaskConfig.OnMessage(Self) ) end; procedure WMFutureResult(var msg: TOmniMessage); message WM_FUTURE_RESULT; procedure TfrmDemoParallelTaskConfig.WMFutureResult(var msg: TOmniMessage); begin lbLog.ItemIndex := lbLog.Items.Add('FUTURE: ' + IntToStr(FFuture.Value)); FFuture := nil; btnFuture.Enabled := true; end;
PipeLine
Трубопровод также расширился:
IOmniPipeline = interface function Stage(pipelineStage: TPipelineStageDelegate; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stage(pipelineStage: TPipelineStageDelegateEx; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stages(const pipelineStages: array of TPipelineStageDelegate; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; function Stages(const pipelineStages: array of TPipelineStageDelegateEx; taskConfig: IOmniTaskConfig = nil): IOmniPipeline; overload; end;
Демонстрационное приложение снова только посылает сообщения от стадий в основную задачу:
procedure TfrmDemoParallelTaskConfig.btnPipelineClick(Sender: TObject); var pipeOut: IOmniBlockingCollection; value : TOmniValue; begin pipeOut := Parallel.Pipeline .Stages([PipelineStage1, PipelineStage2], Parallel.TaskConfig.OnMessage(Self)) .Run; while not pipeOut.TryTake(value) do Application.ProcessMessages; lbLog.ItemIndex := lbLog.Items.Add('PIPELINE: ' + IntToStr(value) + ' (should be 500500)'); end; procedure TfrmDemoParallelTaskConfig.PipelineStage1(const input, output: IOmniBlockingCollection; const task: IOmniTask); var i: integer; begin task.Comm.Send(WM_LOG, 'Pipeline stage 1 starting'); for i := 1 to 1000 do output.Add(i); task.Comm.Send(WM_LOG, 'Pipeline stage 1 stopped'); end;
ForEach
Метод Parallel.ForEach использует еще один подход. Интерфейс IOmniParallelLoop (интерфейс возвращается из метода ForEach) осуществляет функцию TaskConfig, которая принимает блок конфигурации.
IOmniParallelLoop = interface function TaskConfig( const config: IOmniTaskConfig): IOmniParallelLoop; end;
Следующее демонстрационное приложение иллюстрирует, как использовать TaskConfig в ForEach и в то же самое время демонстрирует другой интересный факт - хоть интерфейс и возвратился из Parallel.ForEach все-же большую часть времени он должен использаться в методе Execute. Поэтому нужно, где то его сохранить.
Это особенно полезно, если Вы управляете ForEach с выбором NoWait. В этом случае, ссылка, на интерфейс возвращенная из Parallel.ForEach должна быть сохранена где-нибудь, пока параллельная задача не заканчит свое выполнение.
Код ниже демонстрирует это результат Parallel.ForEach сохранен в глобальной переменной FParallel. Метод OnStop используется, чтобы обнулить эту переменную в конце.
Метод FParallel.Execute вызывается отдельно.
var FParallel: IOmniParallelLoop; procedure TfrmDemoParallelTaskConfig.btnForEachClick(Sender: TObject); begin FParallel := Parallel.ForEach(1, 17) .TaskConfig(Parallel.TaskConfig.OnMessage(Self)) .NoWait .OnStop( procedure begin FParallel := nil; End ); FParallel .Execute( procedure (const task: IOmniTask; const value: integer) begin task.Comm.Send(WM_LOG, value); end ); end;
ForkJoin
Метод Parallel.ForkJoin осуществляет только ограниченную поддержку конфигурации задачи. Хотя Вы можете передать конфигурацию задачи на интерфейс IOmniForkJoin, вызывая метод TaskConfig (так же, как в случае ForEach), интерфейс IOmniTask не выставлен в параллельных вычислениях (в IOmniForkJoin Compute), и поэтому не может использоваться для передачи сообщений. Это связано с особенностями выполнением ForkJoin и вероятно такое поведение не изменится в ближайшем будущем.
IOmniForkJoin = interface function TaskConfig(const config: IOmniTaskConfig): IOmniForkJoin; end;
Из-за этого ограничения демонстрационное приложение использует конфигурацию задачи, только для того чтобы послать уведомление, что метод вилки/соединения закончил свое выполнение.
procedure TfrmDemoParallelTaskConfig.btnForkJoinClick(Sender: TObject); var data: TArray; max : integer; begin data := TArray .Create(1, 17, 4, 99, -250, 7, 13, 132, 101); max := ParallelMax( data, Parallel.ForkJoin .TaskConfig(Parallel.TaskConfig.OnTerminated( procedure (const task: IOmniTaskControl) begin lbLog.ItemIndex := lbLog.Items.Add(Format( 'COMPUTE: Task %d terminated', [task.UniqueID])); end )), Low(data), High(data)); lbLog.ItemIndex := lbLog.Items.Add('FORKJOIN: ' + IntToStr(max) + ' (expected 132)'); end;
Продолжение следует…
Помогите разобраться
ОтветитьУдалитьМногопоточное приложение обрабатывает файлы записывает результаты в БД (MS SQL Server)
многопоточность реализованна с помощью библиотеки OmniThreadLibrary,
Создаю Пул потоков FConnectionPool := CreateThreadPool('Connection pool');
Один экземпляр потока (Worker) делает:
чтение файла с диска (моментально)
расчёт по файлу (долго)
запись в БД (моментально)
потоки полностью независимые
Необходимо максимально быстрая обработка, полная загрузка сервера работой
Проблема в неполной загрузке процессора (4 ядра).
запускаю в 1 поток = обработка ~120 файла в сек, загрузка ЦП=25% (одно ядро на 100%)
FConnectionPool.MaxExecuting := 1;
запускаю в 4 потока = обработка ~120 файла в сек, загрузка ЦП=25% (нагрузка размазана по 4-ём ядрам (на 25% каждое)
FConnectionPool.MaxExecuting := 4;
Запускаю второй exe , получается 120+120 , 25%+25%
Как добиться чтобы в рамках одного процесса(exe файла) добиться загрузки процессора 75% и производительности 120*3 файла/сек ??
Повышение приоритета процесса(через диспетчера задач - ничего не даёт)