вторник, 19 октября 2010 г.

Использование OmniThread Libray (OTL) для создания многопоточных приложений - 16

Система связи

В данном разделе мы рассмотрим более детально подсистему связи OTL. За работу системы связи отвечает файл OtlComm.pas. Как вы знаете, данная система используется интерфейсами OTL для передачи сообщений между фоновыми задачами и основным потоком программы. Как это происходит, мы разобрались на примерах ранее. Однако использование системы связи не затрагивает только библиотеку OT. Дело в том, что система не связана жестко с OTL-библиотекой (выделена отдельно), что в свою очередь дает вам возможность использовать ее и не в OTL-приложениях.

На первый взгляд система устроена просто и представляет собой IOmniCommunicationEndpoint, описывающий реализацию “конечной точки”. Точки, откуда будут приходить, и уходить сообщения.
Интерфейс системы связи
  IOmniCommunicationEndpoint = interface ['{910D329C-D049-48B9-B0C0-9434D2E57870}']
    function  GetNewMessageEvent: THandle;
    function  GetOtherEndpoint: IOmniCommunicationEndpoint;
    function  GetReader: TOmniMessageQueue;
    function  GetWriter: TOmniMessageQueue;
  //
    function  Receive(var msg: TOmniMessage): boolean; overload;
    function  Receive(var msgID: word; var msgData: TOmniValue): boolean; overload;
    function  ReceiveWait(var msg: TOmniMessage; timeout_ms: cardinal): boolean; overload;
    function  ReceiveWait(var msgID: word; var msgData: TOmniValue; timeout_ms: cardinal): boolean; overload;
    procedure Send(const msg: TOmniMessage); overload;
    procedure Send(msgID: word); overload;
    procedure Send(msgID: word; msgData: array of const); overload;
    procedure Send(msgID: word; msgData: TOmniValue); overload;
    function  SendWait(msgID: word; timeout_ms: cardinal = CMaxSendWaitTime_ms): boolean; overload;
    function  SendWait(msgID: word; msgData: TOmniValue; timeout_ms: cardinal = CMaxSendWaitTime_ms): boolean; overload;
    property NewMessageEvent: THandle read GetNewMessageEvent;
    property OtherEndpoint: IOmniCommunicationEndpoint read GetOtherEndpoint;
    property Reader: TOmniMessageQueue read GetReader;
    property Writer: TOmniMessageQueue read GetWriter;
  end; { IOmniCommunicationEndpoint }


Данный интерфейс позволяет владельцу отправлять сообщения, получать сообщения и ожидать прихода новых сообщений. Свойство Comm реализующее данный интерфейс присутствует как в интерфейсе управления задачей (IOmniTaskControl), так и на стороне самой задачи (IOmniTask). Обе конечные точки связаны так, чтобы сообщения, посылаемые через IOmniTaskControl.Comm.Send доходили к IOmniTask.Comm и наоборот.
Простая схема взаимодействия представлена на рисунке :

Рис. Взаимодействие между интерфейсами
Но это слишком упрощенно, когда Вы начнете разбираться досконально, вы поймете, что истинная мощь системы связи скрыта от Вас. На следующем рисунке дано реальное изображение объектов и интерфейсов, представленных в модуле OtlComm. Хотя у Вас может возникнуть первое впечатление, что IOmniCommunicationEndpoint является самой важной частью системы связи, в действительности все вращается вокруг IOmniTwoWayChannel.

Рис. Структурная схема системы связи
Чтобы понять, что представляет из себя данное изображение, начнем разбор с нижнего левого угла. Мы видим интерфейс IOmniTaskControl - тот, который возвращается к нам из процедуры CreateTask.
Работа интерфейса IOmniTaskControl осуществляется объектом TOmniTaskControl, которому принадлежит интерфейс IOmniTwoWayChannel, созданный во время инициализации TOmniTaskControl.
TOmniTaskControl.Initialize
procedure TOmniTaskControl.Initialize;
begin
  //...
  otcCommChannel := CreateTwoWayChannel;
  //...
end;

Этот интерфейс также передается  и объекту TOmniTask (который осуществляет поддержку интерфейса IOmniTask), в тот момент, когда фоновая задача запущена на выполнение.
TOmniTaskControl.Run
function TOmniTaskControl.Run: IOmniTaskControl;
var
  task: IOmniTask;
begin
  //...
  task := TOmniTask.Create(..., otcCommChannel, ...);
  //...
end;

Классу TOmniTwoWayChannel в свою очередь два кольцевых буфера типа TOmniRingBuffer. Данные буферы используются, как однонаправленные очереди сообщений. Сообщения передаются записями  типа TOmniMessage.
TOmniMessage
TOmniMessage = record
    MsgID  : word; {Идентификатор}
    MsgData: TOmniValue; {значение}
end;

Классу TOmniTwoWayChannel также принадлежат два интерфейса IOmniCommunicationEndpoint (реализованные  классом TOmniCommunicationEndpoint). Один из них выставлен через свойство Enpoint1, а другой через  Endpoint2.
IOmniTwoWayChannel
IOmniTwoWayChannel = interface ['{3ED1AB88-4209-4E01-AA79-A577AD719520}']
    function Endpoint1: IOmniCommunicationEndpoint;
    function Endpoint2: IOmniCommunicationEndpoint;
end;

Оба интерфейса конечных точек связаны с обоими кольцевыми буферами. Если мы назовем кольцевые буфера – “A” и “B”, то конечная точка 1 записывает сообщение в буфер “A” и читает сообщения с буфера “B”, в то время как конечная точка 2 записывает сообщения в буфер “B” и читает с буфера “A”.
TOmniTaskControl.Comm и TOmniTask.Comm – это простые свойства, которые возвращают различные оконечные точки (IOmniCommunicationEndpoint) того же самого интерфейса IOmniTwoWayChannel.

function TOmniTaskControl.GetComm: IOmniCommunicationEndpoint;
begin
  Result := otcCommChannel.Endpoint1;
end; { TOmniTaskControl.GetComm }

function TOmniTask.GetComm: IOmniCommunicationEndpoint;
begin
  Result := otCommChannel.Endpoint2;
end; { TOmniTask.GetComm }

Интересной особенностью системы связи является момент создания связи. Постоянное создание связи не является необходимым условием работы OTL. Поскольку как вы можете увидеть из диаграммы, данная система связи "весьма тяжела" автор библиотеки пошел на некоторые хитрости. Если вы запускаете на выполнение простые фоновые задачи по принципу “запустил и забыл” - вы можете не нуждаться в системе связи  вообще. Именно поэтому кольцевые буфера и конечные точки связи не создаются до тех пор, пока не возникает надобность в их использовании.
Хотя объект TOmniTwoWayChannel создается всякий раз, при создании фоновой задачи, это не влечет за собой создание других частей инфраструктуры. Дополнительные части  создаются только при доступе к конечной точке из программы. Если доступ не осуществляется то дополнительная инфраструктура (буфера, оконечные точки) не создается. Это можно просмотреть на примере Endpoint1.
TOmniTwoWayChannel
procedure TOmniTwoWayChannel.CreateBuffers;
begin
  if twcUnidirQueue[1] = nil then
    twcUnidirQueue[1] := TOmniRingBuffer.Create(twcMessageQueueSize);
  if twcUnidirQueue[2] = nil then
    twcUnidirQueue[2] := TOmniRingBuffer.Create(twcMessageQueueSize);
end; { TOmniTwoWayChannel.CreateBuffers }

function TOmniTwoWayChannel.Endpoint1: IOmniCommunicationEndpoint;
begin
  Assert((cardinal(@twcEndpoint[1]) AND 3) = 0);
  if twcEndpoint[1] = nil then begin
    twcLock.Acquire;
    try
      if twcEndpoint[1] = nil then begin
        CreateBuffers;
        twcEndpoint[1] := TOmniCommunicationEndpoint.Create(twcUnidirQueue[1],
                          twcUnidirQueue[2]);
      end;
    finally twcLock.Release; end;
  end;
  Result := twcEndpoint[1];
end; { TOmniTwoWayChannel.Endpoint1 }

Код, создания Endpoint2 подобен этому за исключением того, что это использует twcEndpoint[2] и полностью изменяет параметры, которые передаются к конструктору TOmniCommunicationEndpoint.Create.

Комментариев нет:

Отправить комментарий