четверг, 7 октября 2010 г.

Использование OmniThread Libray (OTL) для создания многопоточных приложений - 13

Создание пула задач

Начиная с версии 1.03, библиотека OTL поддерживает создание фоновых задач в пулах. Использование пулов дает возможность переложить функции управления созданными задачами с наших плеч на пул. Фактически нам не нужно будет думать о непосредственном контроле над созданными нами задачами, OTL делает это за нас.
От нас нужно только создать задачу и передать ее в пул задач. Кроме того использование пула задач позволит Вам использовать дополнительные возможности OTL. Вы можете задать максимальное количество одновременно выполняющихся параллельных задач, перевести поток в режим ожидания по истечении некоторого времени отпущенного на выполнения задачи (если задача не завершилось, фактически ее приостановить). Можете задать время предварительного ожидания задачи перед непосредственным выполнением (например, при долгой инициализации), выполнить предварительную инициализацию при подключении задачи к пулу  и многое другое.
Разберем реализацию пула на демонстрационном примере tests\24_ConnectionPool:
Откроем пример в среде Delphi, и переместимся в метод TfrmConnectionPoolDemo.FormCreate где и создается пул задач нашего примера
test_24_ConnectionPool.pas   Создание пула задач(потоков)
procedure TfrmConnectionPoolDemo.FormCreate(Sender: TObject);
begin
  FConnectionPool := CreateThreadPool('Connection pool');
  FConnectionPool.ThreadDataFactory := CreateThreadData;
  FConnectionPool.MaxExecuting := 3;
end;

Итак, при создании формы создается пул задач, которому присваивается название.
В случае закрытия программы вызывается обработчик OnClose формы, в котором закрываются все ждущие задачи (FConnectionPool.CancelAll), позволяя приложению завершиться корректно и удалить все фоновые задачи.
Переменная FConnectionPool представляет собой интерфейс, ссылающийся на менеджер пула, с помощью которого мы можем управлять дополнительными возможностями данного пула. Так FConnectionPool.MaxExecuting := 3 задает максимальное количество выполняющихся задач в пуле. Все остальные задачи, добавленные позже, будут поставлены на ожидание, если в данный момент времени в пуле одновременно будет три работающих фоновых задачи. При окончании одной из них, будет запущена следующая поставленная ранее на ожидание задача.
Стоит отметить, что временем жизни пула (FConnectionPool) вручную управлять нет нужды, т.к. автоматически Delphi управляет им. Поэтому программисту нет необходимости явно уничтожать FConnectionPool при выходе из программы. Так как переменная FConnectionPool является интерфейсом, то среда сама уничтожит данный объект, реализующий этот интерфейс при закрытии программы (автоматическом обнулении интерфейса)

Очень интересное свойство - FConnectionPool.ThreadDataFactory. Данное свойство позволяет связать с пулом задач, какую либо функцию, которая будет вызвана OTL при подключении к пулу  новой задачи. Будем называть данное свойство - фабрикой данных, благо сам автор ее так определил.
Функция в OTL определена как функция возвращающая интерфейс (любой интерфейс):

TOTPThreadDataFactory = function: IInterface;

property ThreadDataFactory: TOTPThreadDataFactory read GetThreadDataFactory
      write SetThreadDataFactory;

C помощью этой функции вы сможете выполнить какие-либо предварительные действия после подключения к пулу, до выполнения самой задачи.
Например, сделать предварительную инициализацию неких данных, создать некие объекты, установить связь с базой данных и прочее.
Фактически цепочка работы программы при использовании данного свойства будет такой:
            Создание задачи
            Подключение задачи к пулу
            Вызов функции заданной в ThreadDataFactory (для нашей фабрики данных).
            Старт фоновой задачи в пуле.
Если никаких предварительных действий при подключении выполнять не нужно, то данное свойство необязательно и тогда цепочка сократится до:
1.        Создание задачи
2.        Подключение задачи к пулу
3.        Старт фоновой задачи в пуле.

В нашем демонстрационном примере с свойством FConnectionPool.ThreadDataFactory связана функция CreateThreadData, которая создает объект TConnectionPoolData. TConnectionPoolData содержит только одно свойство ConnectionID, в котором хранится  уникальный идентификатор, который поможет нам следить за выполнением работы программы.
test_24_ConnectionPool.pas  
type
  IConnectionPoolData = interface ['{F604640D-6D4E-48B4-9A8C-483CA9635C71}']
    function ConnectionID: integer;
  end;

  TConnectionPoolData = class(TInterfacedObject, IConnectionPoolData)
  strict private
    cpID: integer;
  public
    constructor Create;
    destructor  Destroy; override;
    function ConnectionID: integer;
  end; { TConnectionPoolData }

Так как код у нас демонстрационный, то в TConnectionPoolData.Create мы пошлем сообщение форме о начале соединения, запомним идентификатор потока в пуле (с увеличением внутреннего счетчика пула на 1).
Задействуем пяти секундную задержку до старта потока (для имитации задержки). В деструкторе же просто пошлем сообщение форме, об уничтожении соединения с пулом:
test_24_ConnectionPool.pas  
constructor TConnectionPoolData.Create;
begin
 PostToForm(WM_USER, MSG_CREATING_CONNECTION, integer(GetCurrentThreadID));
  cpID := GConnPoolID.Increment;
  Sleep(5000);
  PostToForm(WM_USER, MSG_CREATED_CONNECTION, cpID);
end;

destructor TConnectionPoolData.Destroy;
begin
  PostToForm(WM_USER, MSG_DESTROY_CONNECTION, cpID);
end;

Создадим задачу и подключим ее к пулу с помощью метода Schedule.
test_24_ConnectionPool.pas   Создание задачи и подключение данной задачи к пулу
procedure TfrmConnectionPoolDemo.btnScheduleClick(Sender: TObject);
begin
  Log('Creating task');
  CreateTask(TaskProc).MonitorWith(OTLMonitor).Schedule(FConnectionPool);
end;

Также в примере мы контролируем  работу нашей  задачи компонентом TOmniEventMonitor. Код, размещенный в процедуре TaskProc довольно простой. Код приводит интерфейс, переданный в ThreadData к интерфейсу IConnectionPoolData (task.ThreadData as IConnectionPoolData), отыскивает идентификатор подключения и отсылает его главной форме, затем засыпает на три секунды, имитируя эти некоторую работу задачи.
test_24_ConnectionPool.pas  
procedure TaskProc(const task: IOmniTask);
begin
  PostToForm(WM_USER + 1, task.UniqueID,
    (task.ThreadData as IConnectionPoolData).ConnectionID);
  Sleep(3000);
end;

Другой код, назначенный обработчику нажатия второй кнопки (“Schedule and wait”), демонстрирует, как Вы можете поставить задачу на ожидание  ее выполнении. Данный метод создание задачи будет Вам полезен, если Вы запускаете задачу от другого фонового потока.
test_24_ConnectionPool.pas  
procedure TfrmConnectionPoolDemo.btnScheduleAndWaitClick(Sender: TObject);
var
  task: IOmniTaskControl;
begin
  Screen.Cursor := crHourGlass;
  Log('Creating task');
  task := CreateTask(TaskProc).MonitorWith(OTLMonitor).Schedule(FConnectionPool);
  task.WaitFor(10000);
  task.Terminate;
  Screen.Cursor := crDefault;
  Application.ProcessMessages; //allow accumulated log messages to be processed
  Log('Awaited task termination');
end;

Нажмем кнопку “Shedule” и посмотрим, что будет происходить. На рисунке ниже представлен лог выполнения программы.
Рис.Запуск одной задачи в пуле.

Итак, Задача была создана и поставлена в пул. После чего управление было передано нашей фабрике данных. Фабрика данных нашей задачи отработала в течение 5 секунд и передала управление пулу. Пул задач создал новый фоновый поток 2824 и начал выполнение задачи в данном фоновом потоке, после чего задача отработала три секунды и была завершена.
Если вы сразу щелкните снова “Schedule” и посмотрите, что произошла дальше, то увидите интересную особенность:
Рис. Новая задача запущена в предыдущем подключении и потоке.
На этот раз опять была создана новая задача (с идентификатором 4), которая подключилась к пулу. При этом новый фоновый поток не был создан. Задача запустилась в том же самом фоновом потоке (2824), используя то же  самое подключение к пулу (1), какое использовала и  предыдущая задача.
Соединение к пулу (1) в свое время было создано при постановке в пул первой задачи. Как вы заметили, запущенная вторая задача уже не вызывает фабрику данных (не ждет 5 секунд), так как она уже создана (при подключении к пулу первой задачи), а непосредственно отрабатывает код фоновой задачи (ждет 3 секунды).
Если Вы теперь оставите программу выполняться 10 секунд, то увидите сообщение об уничтожении первого соединения к пулу (Destroying connection 1).
Причина данного сообщения будет в том, что сработала блокировка жизни неиспользуемого потока по времени (по умолчанию она равна 10 секундам). Другими словами, если OTL видит, что поток ничего не сделает в течение 10 секунд, то он будет остановлен. Фактически наш поток (2824) был остановлен, соответственно первое подключение было уничтожено.
Вы можете, конечно, установить любое время жизни такого потока в пуле или даже вообще отключить автоматическое уничтожение бездействующего потока (установив время жизни равное 0).
Рис. Первый фоновый поток занят, для второй задачи - создаем отдельный поток.

Однако если вы щелкните “Schedule” после создания первой задачи (во время сна 1 задачи) и установки первого соединения, OTL создаст второе соединение и второй поток и для второго потока вызовет новую фабрику данных.

По умолчанию, OTL пул использует столько потоков, сколько есть ядер в системе, но опять-таки можно переопределить этот параметр. На данный момент, вы ограничены максимум 60 одновременно выполняющимися потоками. Число 60 – это не произвольное число, а ограничение, накладываемое Windows на функцию WaitForMultipleObjects. Дело в том, что ждущая очередь WaitForMultipleObjects  может контролировать только 64 дескриптора объектов ядра.
На следующем рисунке видна работа OTL на 2-х ядерном процессоре.

Рис. Двуядерный процессор – каждому ядру выделяется по одному рабочему потоку, количество созданных задач - 4

Да, Вы можете изменить ограничение в  количество потоков, превысив число ядер центрального процессора в системе, но на самом деле действительно не рекомендуется выполнять одновременно 60 параллельных потоков.  Задачи же поставленные в пул этим не ограничиваются. Они ставятся в очередь.
Если на двуядерном процессоре мы пощелкаем кнопку “Schedule” раза три, то при настройках по умолчанию, мы заметим, что на выполнение запустятся только две задачи, третья же будет поставлена на ожидание и запустится после окончания первой.
Рис. Двуядерный процессор, созданы три задачи, две работают – одна ждет. Задействованы все ядра процессора.

4 комментария:

  1. Здравствуйте, Дмитрий :)

    клевый блог у вас получается по OTL у автора то меньше описания чем у вас.

    У меня к вам вопрос по OTL, вы можете забить более 80 тасков в шедул?
    т.е.
    например так, одновременно 10 тасков на выполнение:
    GlobalOmniThreadPool.MaxQueued := 300;
    GlobalOmniThreadPool.MaxExecuting := 10;

    for iTask := 1 to 100 do begin
    Application.ProcessMessages;
    CreateTask(THelloWorker.Create(Handle, delay_ms)).MonitorWith(OmniTED).Schedule;
    end;

    работать то оно работает, но FastMM ругается на утечки, на форуме примоза эта тема муссировалась с год назад, но воз и ныне там или эт у меня что-то не так. Откройте плиз его пример 11 thread pool и там где кнопка Schedule 80 tasks, откройте и поменяйте 80 на 100, это так сказать самый быстрый способ проверить.

    ОтветитьУдалить
  2. Здравствуйте Tony

    Попробую проверить.Если так то сообщу.Единственная надежда, что автор в версии 2.0 устранит такие вещи. Судя по тому, что в его блоге такая инфа проскакивала, думаю он постарается исправить такое поведение. К сожалению помимо этого встречаются проблемы с памятью и в других примерах, в частности у меня было с 25_WaitableComm. Будем тестить.

    ОтветитьУдалить
  3. Привет, Дмитрий :)

    я ему писал про этот трабл, он ответил, что тоже видит утечку и что будет тестить, бум надеяться что пофиксит быстро, а то сейчас приходится свой пул на базе его пула писать, дурость вобщем, в принципе он вроде как своей библиотекой серьезно занимается, так что надеюсь разберется :)

    вот кстати урл на топик на его сайте где идет обсуждение этой траблы: http://otl.17slon.com/forum/index.php/topic,13.15.html
    он вроде как славянин, но вроде не русский, я ему там написал, что нашел клевый блог по его библиотеке на русском :)

    классный блог начал, пиши еще, думаю очень многим поможет инфа про потоки в дельфе и про работу с отл-ем :)

    ОтветитьУдалить
  4. Здравствуйте Tony

    Да,проблема с утечкой памяти присутствует. Бдем надеяться, что автор исправит данные баги в новой версии.

    ОтветитьУдалить