Программирование в ACE: уровень интерфейсных фасадов, многозадачность и многопоточность
Содержание:
1. Где применяется ACE и архитектура ACE;
2. Уровень адаптации к операционной системе и параллелизм;
3.Уровень интерфейсных фасадов, многозадачность и многопоточность (Вы читаете данный раздел).
Описанные выше механизмы параллелизма реализованы в ACE в виде интерфейсных фасадов. Таким образом, мы поднимаемся с уровня адаптации к операционной системе на уровень фасадов ACE, инкапсулирующих API операционной системы. Возникает вопрос: а для чего требуется использовать фасады ACE, не проще ли нам использовать API операционной системы и сокеты для реализации конкретной задачи? Ответ лежит на поверхности – во-первых, мы создаем легко переносимый код, который можно будет использовать повторно, а, во-вторых, вместо небезопасных с точки зрения типо-безопасности дескрипторов функций API, мы используем инкапсулирующие их фасады ACE.
Как известно, создание новой задачи осуществляется в POSIX-совместимых операционных системах с помощью функции fork() (в Windows это можно сделать с помощью функции CreateProcess()). Вместо этого ACE предлагает класс ACE_Process, при помощи которого можно создавать процессы и управлять ими. С помощью класса ACE_Process_Option программист может задавать опции процесса, такие как командная строка, переменные окружения, рабочий каталог и им подобные. Создадим утилиту, запускающую программу, имя которой указано в качестве аргумента командной строки.
В отличие от многозадачности, многопоточность позволяет разным потокам исполнения работать в едином адресном пространстве процесса. И в большинстве случаев многопоточность является более удобной схемой для создания приложений, которые обрабатывают множественные клиентские запросы. При программировании кроссплатформенных многопоточных приложений программисты сталкиваются не только с различиями в семантике потоковых API, но и с различными идеологиями многопоточности. Например, в Linux потоки являются процессами особого типа – легкими (lightweight) процессами. Они создаются как дочерние процессы главного процесса. В Windows мы имеем главный процесс, который является неким контейнером для потоков программы. В ACE потоки создаются и управляются при помощи класса ACE_Thread_Manager. Аналогично вышеописанному менеджеру процессов, этим классом можно пользоваться как синглтоном или создавать несколько экземпляров менеджера потоков для определения различных групп потоков в рамках одного процесса. Напишем небольшую программу, которая создает три бесконечно выполняющихся потока, и закрывает их после некоторой паузы.
Одним из параметров метода spawn() менеджера потоков является приоритет потока, и по умолчанию это значение равно ACE_DEFAULT_THREAD_PRIORITY. С помощью класса ACE_Sched_Params можно выбирать нужный приоритет исполнения и устанавливать его посредством функции sched_params из пространства имен ACE_OS. Базовый приоритет выполнения для всех потоков программы можно установить в самом начале ее работы в функции main(). Таким образом, все вновь создаваемые потоки будут выполняться с этим приоритетом. Нижеследующий код демонстрирует, как можно установить приоритет выполнения.
Не секрет, что при создании многопоточных программ разработчики сталкиваются с проблемой одновременного обращения к одним и тем же данным из разных потоков. Например, одновременно происходят попытки записать и прочитать значение одной и той же переменной. Для того чтобы избежать подобных ситуаций, применяются механизмы синхронизации, которые позволяют заблокировать выполнение потока до тех пор, пока захвативший данный ресурс поток не освободит его. Существует несколько стандартизированных (и стандартизированных де-факто как, например, Win32) API, предоставляющих механизмы синхронизации потоков. ACE предоставляет набор унифицированных классов, реализующих всю необходимую разработчикам функциональность.
Все классы синхронизации предоставляют унифицированный интерфейс ACE_LOCK. Напишем программу, основной поток которой изменяет значение глобальной переменной, а вспомогательный выводит ее значение в консоль, когда она изменяется. Фактически мы смоделируем один из самых распространенных случаев, когда требуется синхронизировать работу потоков.
1. Где применяется ACE и архитектура ACE;
2. Уровень адаптации к операционной системе и параллелизм;
3.
Уровень интерфейсных фасадов
Описанные выше механизмы параллелизма реализованы в ACE в виде интерфейсных фасадов. Таким образом, мы поднимаемся с уровня адаптации к операционной системе на уровень фасадов ACE, инкапсулирующих API операционной системы. Возникает вопрос: а для чего требуется использовать фасады ACE, не проще ли нам использовать API операционной системы и сокеты для реализации конкретной задачи? Ответ лежит на поверхности – во-первых, мы создаем легко переносимый код, который можно будет использовать повторно, а, во-вторых, вместо небезопасных с точки зрения типо-безопасности дескрипторов функций API, мы используем инкапсулирующие их фасады ACE.
Многозадачность
Как известно, создание новой задачи осуществляется в POSIX-совместимых операционных системах с помощью функции fork() (в Windows это можно сделать с помощью функции CreateProcess()). Вместо этого ACE предлагает класс ACE_Process, при помощи которого можно создавать процессы и управлять ими. С помощью класса ACE_Process_Option программист может задавать опции процесса, такие как командная строка, переменные окружения, рабочий каталог и им подобные. Создадим утилиту, запускающую программу, имя которой указано в качестве аргумента командной строки.
#include
#include
int main(int argc, char* argv[])
{
if( argcspawn( &process1, options );
//Дожидаемся завершения всех процессов, ассоциированных с группой
return pPM->wait(ACE_Time_Value::max_time);
}
#include
int main(int argc, char* argv[])
{
if( argcspawn( &process1, options );
//Дожидаемся завершения всех процессов, ассоциированных с группой
return pPM->wait(ACE_Time_Value::max_time);
}
Многопоточность
В отличие от многозадачности, многопоточность позволяет разным потокам исполнения работать в едином адресном пространстве процесса. И в большинстве случаев многопоточность является более удобной схемой для создания приложений, которые обрабатывают множественные клиентские запросы. При программировании кроссплатформенных многопоточных приложений программисты сталкиваются не только с различиями в семантике потоковых API, но и с различными идеологиями многопоточности. Например, в Linux потоки являются процессами особого типа – легкими (lightweight) процессами. Они создаются как дочерние процессы главного процесса. В Windows мы имеем главный процесс, который является неким контейнером для потоков программы. В ACE потоки создаются и управляются при помощи класса ACE_Thread_Manager. Аналогично вышеописанному менеджеру процессов, этим классом можно пользоваться как синглтоном или создавать несколько экземпляров менеджера потоков для определения различных групп потоков в рамках одного процесса. Напишем небольшую программу, которая создает три бесконечно выполняющихся потока, и закрывает их после некоторой паузы.
#include
#include
unsigned long thread( void * p )
{
//Имитируем работу потока
ACE_OS::printf( "Hi there. I am #%i threadn", p);
//По идее мы должны «заснуть» на бесконечное время
//Но вызов cancel_all() в конце main() закрывает все потоки
ACE_OS::sleep( ACE_Time_Value::max_time );
return 0;
}
int main(int argc, char* argv[])
{
//Создаем три потока
ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();
pTM->spawn( thread, (void*)0, THR_SCOPE_SYSTEM);
pTM ->spawn( thread, (void*)1, THR_SCOPE_SYSTEM);
pTM ->spawn( thread, (void*)2, THR_SCOPE_SYSTEM);
//Имитируем некую работу основного потока
ACE_OS::sleep(1);
//Завершаем все потоки
pTM->cancel_all();
//Дожидаемся завершения всех потоков
return pTM->wait( &ACE_Time_Value::max_time );
}
#include
unsigned long thread( void * p )
{
//Имитируем работу потока
ACE_OS::printf( "Hi there. I am #%i threadn", p);
//По идее мы должны «заснуть» на бесконечное время
//Но вызов cancel_all() в конце main() закрывает все потоки
ACE_OS::sleep( ACE_Time_Value::max_time );
return 0;
}
int main(int argc, char* argv[])
{
//Создаем три потока
ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();
pTM->spawn( thread, (void*)0, THR_SCOPE_SYSTEM);
pTM ->spawn( thread, (void*)1, THR_SCOPE_SYSTEM);
pTM ->spawn( thread, (void*)2, THR_SCOPE_SYSTEM);
//Имитируем некую работу основного потока
ACE_OS::sleep(1);
//Завершаем все потоки
pTM->cancel_all();
//Дожидаемся завершения всех потоков
return pTM->wait( &ACE_Time_Value::max_time );
}
Одним из параметров метода spawn() менеджера потоков является приоритет потока, и по умолчанию это значение равно ACE_DEFAULT_THREAD_PRIORITY. С помощью класса ACE_Sched_Params можно выбирать нужный приоритет исполнения и устанавливать его посредством функции sched_params из пространства имен ACE_OS. Базовый приоритет выполнения для всех потоков программы можно установить в самом начале ее работы в функции main(). Таким образом, все вновь создаваемые потоки будут выполняться с этим приоритетом. Нижеследующий код демонстрирует, как можно установить приоритет выполнения.
#include
#include
int main(int argc, char* argv[])
{
ACE_Sched_Params schedParams( ACE_SCHED_FIFO,
ACE_Sched_Params::priority_min(ACE_SCHED_FIFO),
ACE_SCOPE_PROCESS );
ACE_OS::sched_params( schedParams );
return 0;
}
#include
int main(int argc, char* argv[])
{
ACE_Sched_Params schedParams( ACE_SCHED_FIFO,
ACE_Sched_Params::priority_min(ACE_SCHED_FIFO),
ACE_SCOPE_PROCESS );
ACE_OS::sched_params( schedParams );
return 0;
}
Синхронизация
Не секрет, что при создании многопоточных программ разработчики сталкиваются с проблемой одновременного обращения к одним и тем же данным из разных потоков. Например, одновременно происходят попытки записать и прочитать значение одной и той же переменной. Для того чтобы избежать подобных ситуаций, применяются механизмы синхронизации, которые позволяют заблокировать выполнение потока до тех пор, пока захвативший данный ресурс поток не освободит его. Существует несколько стандартизированных (и стандартизированных де-факто как, например, Win32) API, предоставляющих механизмы синхронизации потоков. ACE предоставляет набор унифицированных классов, реализующих всю необходимую разработчикам функциональность.
Все классы синхронизации предоставляют унифицированный интерфейс ACE_LOCK. Напишем программу, основной поток которой изменяет значение глобальной переменной, а вспомогательный выводит ее значение в консоль, когда она изменяется. Фактически мы смоделируем один из самых распространенных случаев, когда требуется синхронизировать работу потоков.
#include
#include
#include
//Глобальный счетчик
static int gCounter(0);
//Мьютекс
static ACE_Thread_Mutex gMutex;
unsigned long thread( void * p )
{
int oldValue(-1);
while(1)
{
//Запрашиваем блокировку
if( gMutex.acquire_read()!=-1 )
{
//Если значение счетчика было инкрементировано, выводим его
if( gCounter>oldValue )
{
ACE_OS::printf( "Counter is %in", gCounter);
oldValue = gCounter;
}
}
//Снимаем блокировку
gMutex.release();
}
return 0;
}
int main(int argc, char* argv[])
{
ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();
//Создаем три потока
int id;
id = pTM->spawn( thread, (void*)0, THR_SCOPE_SYSTEM);
if( id==-1 )
{
ACE_OS::printf( "Unable to create threadn");
return -1;
}
int i(10);
while(i)
{
//Запрашиваем блокировку
if( gMutex.acquire_write()!=-1 )
{
++gCounter;
}
//Снимаем блокировку
gMutex.release();
--i;
//Имитируем работу программы
ACE_OS::sleep(1);
}
//Завершаем все потоки
pTM ->cancel_all();
//Дожидаемся завершения всех потоков
return pTM ->wait( &ACE_Time_Value::max_time );
}
#include
#include
//Глобальный счетчик
static int gCounter(0);
//Мьютекс
static ACE_Thread_Mutex gMutex;
unsigned long thread( void * p )
{
int oldValue(-1);
while(1)
{
//Запрашиваем блокировку
if( gMutex.acquire_read()!=-1 )
{
//Если значение счетчика было инкрементировано, выводим его
if( gCounter>oldValue )
{
ACE_OS::printf( "Counter is %in", gCounter);
oldValue = gCounter;
}
}
//Снимаем блокировку
gMutex.release();
}
return 0;
}
int main(int argc, char* argv[])
{
ACE_Thread_Manager * pTM = ACE_Thread_Manager::instance();
//Создаем три потока
int id;
id = pTM->spawn( thread, (void*)0, THR_SCOPE_SYSTEM);
if( id==-1 )
{
ACE_OS::printf( "Unable to create threadn");
return -1;
}
int i(10);
while(i)
{
//Запрашиваем блокировку
if( gMutex.acquire_write()!=-1 )
{
++gCounter;
}
//Снимаем блокировку
gMutex.release();
--i;
//Имитируем работу программы
ACE_OS::sleep(1);
}
//Завершаем все потоки
pTM ->cancel_all();
//Дожидаемся завершения всех потоков
return pTM ->wait( &ACE_Time_Value::max_time );
}