Отправляет email-рассылки с помощью сервиса Sendsay

Статьи по Visual C++

  Все выпуски  

Многопоточное программирование. Часть 3. (Синхронизация потоков)


Домашняя страница www.devdoc.ru

DevDoc - это новые статьи по программированию каждую неделю.

Заходи и читай!

Домашняя страница Письмо автору Архив рассылки Публикация статьи

Здравствуйте уважаемые подписчики, сегодня в номере:

  • Результаты опроса
  • Статья "Многопоточное программирование. Часть 3. (Синхронизация потоков)"

Результаты опроса

Опросы постоянно проводятся на сайте www.devdoc.ru.

Результаты опроса
Какую среду разработки вы используете?

1Microsoft Visual Studio 6.0
12% ( 34 )
 
2Microsoft Visual Studio 7.0+
32% ( 87 )
 
3Borland Builder
10% ( 28 )
 
4GCC, бесплатные компиляторы
37% ( 101 )
 
5Eclipse
5% ( 13 )
 
6Другое
4% ( 11 )
 

Всего голосов: 274
Последний голос отдан: Воскресенье - 22 Июня 2008 - 23:48:13


Постоянная ссылка на статью (с картинками): http://www.devdoc.ru/index.php/content/view/multi_thread_3.htm

Автор: Кудинов Александр
Последняя модификация: 2008-06-23 10:04:55

Многопоточное программирование. Часть 3. (Синхронизация потоков)

Синхронизация потоков

В литературе и в Интернете часто пишут, что разработка многопоточных приложений намного сложнее, чем однопоточных. Основная сложность – это синхронизация выполнения потоков как на этапе разработки, так и на этапе сопровождения кода. В C++ самые сложные ошибки, как привило, связаны с некорректным использованием памяти и ошибками синхронизации потоков.
Рассмотрим программу:

volatile bool bReadyForProcessing = false;
volatile bool bTerminate = false;
int  iResult = 0;
 
 
DWORD WINAPI ThreadProc(PVOID pPararn)
{
 while(!bTerminate)
 {
  if(bReadyForProcessing)
  {
   iResult = iResult * 100;
   bReadyForProcessing = false;
  }else
  {
   Sleep(1);
  }
 }
 return 0;
}
 
int _tmain(int argc, _TCHAR* argv[])
{
 //запускаем поток
 DWORD dwID;
 HANDLE hThread = CreateThread(NULL, 0, ThreadProc, NULL, 0, &dwID);
 CloseHandle(hThread);    //Нам не нужен дескриптор
 
 for(int i = 0; i < 1000; i++)
 {
  iResult = 100;
  bReadyForProcessing = true;   //указываем потоку, что есть данные для обработки
  while(bReadyForProcessing)
   Sleep(1);      //Ожидаем завершения обработки
 
  if(10000 != iResult)
  {
   std::cout << "error" << std::endl;
  }
  std::cout << iResult << std::endl; //печатаем результат
 }
 
 bTerminate = true;     //завершить поток
 
 return 0;
}

Пример конечно надуманный, но показывает один из способов, как поток может обрабатывать данные. Главный поток программы готовит данные и устанавливает некий признак готовности. После чего он дожидается обработки данных. Причем он может в это время выполнять какие-либо другие действия. Обратите внимание на вызов Sleep(). Он необходим, чтобы «бесконечные» циклы в программе не занимали 100% процессорного времени. Этой командой мы разрешаем выполнить переключение контекста потока. Если в системе есть потоки, ожидающие выполнения – ОС запустит следующей поток из очереди.

В приведенной программе выполняется постоянный опрос переменных, чтобы определить готовность данных. Это вообще говоря плохая техника. Если есть возможность, надо стараться пользоваться другими средствами. Ниже вы увидите, как можно переписать эту программу.

Также обратите внимание на ключевое слово volatile, которое используется при объявлении флаговых переменных. Это нужно для того, чтобы компилятор не кэшировал переменные в регистрах. Это необходимо потому, что эти переменные могут быть изменены извне.

Данная программа содержит несколько ошибок, несмотря на то, что на вашем компьютере она скорее всего будет работать. Проблема в синхронизации потоков. Надо всегда помнить, что одна строка на C++ превращается в несколько команд ассемблера. Это значит, что переключение потока может произойти даже в середине строки. И, скажем, в то время, как один поток может писать в память, другой может попытаться записать туда же совсем другое значение. Результат будет непредсказуемый.

Эта программа была специально так написана, чтобы создавать иллюзию стабильности. Однако даже безобидные на первый взгляд модификации в коде могут вызвать фантастические ошибки.

Надо взять себе за правило синхронизировать доступ к объектам и переменным из разных потоков. Даже если (как в этом примере) это кажется излишним. Операционная система для этих целей предоставляет так называемые объекты синхронизации. Это Mutex, Event, Critical section, Waitable timer (ожидающие таймеры) и Semaphore. Каждый из них предоставляет уникальные свойства, однако у них есть и общая черта. Каждый из этих объектов может находиться в захваченном (non-signaled) состоянии. И при попытке повторного захвата ОС остановит поток до тех пор, пока объект синхронизации не перейдет во взведенное (signaled) состояние. Таким образом гарантируется, что в один момент времени только один поток будет обрабатывать данные. Помимо этого есть еще InterlockedXXX функции, которые служат для выполнения атомарных операций над переменными.

На заметку:
Для ознакомления с объектами синхронизации рекомендуется посмотреть описание следующих функций в MSDN:
WaitForSingleObject, WaitForMultipleObjects, MsgWaitForMultipleObjects
CreateSemaphore/OpenSemaphore/ReleaseSemaphore
CreateEvent/SetEvent/ResetEvent
CreateMutex/OpenMutex/ReleaseMutex
InitializeCriticalSection/EnterCriticalSection/LeaveCriticalSection
CreateWaitableTimer/OpenWaitableTirrer/SetWaitableTimer

WaitXXX, помимо ожидания объекта синхронизации, могут его изменять. Так, например, для Mutex произойдет новый захват объекта и будет установлен новый поток-владелец.

Давайте перепишем наш пример с правильной синхронизацией.

HANDLE   hReadyForProcessing;
HANDLE   hDataReady;
HANDLE   hTerminate;
int    iResult = 0;
 
 
DWORD WINAPI ThreadProc(PVOID pPararn)
{
 HANDLE hEvents[2] = {hReadyForProcessing, hTerminate};
 DWORD dwRes;
 while(WAIT_OBJECT_0 + 0 == (dwRes = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE)) )
 {
  iResult = iResult * 100;
  SetEvent(hDataReady);
 }
 if(WAIT_OBJECT_0 + 1 != dwRes) //Условие завершения
 {
  return 2;     //Что то не так с вызовом WaitForMultipleObjects
 }
 return 0;
}
 
int _tmain(int argc, _TCHAR* argv[])
{
 //Создаем объекты синхронизации
 hReadyForProcessing = CreateEvent(NULL, FALSE, FALSE, NULL);
 hDataReady = CreateEvent(NULL, FALSE, FALSE, NULL);
 hTerminate = CreateEvent(NULL, FALSE, FALSE, NULL);
 
 //запускаем поток
 DWORD dwID;
 HANDLE hThread = CreateThread(NULL, 0, ThreadProc, NULL, 0, &dwID);
 
 for(int i = 0; i < 1000; i++)
 {
  iResult = 100;
  SetEvent(hReadyForProcessing);    //указываем потоку, что есть данные для обработки
  WaitForSingleObject(hDataReady, INFINITE);
 
  if(10000 != iResult)
  {
   std::cout << "error" << std::endl;
  }
  std::cout << iResult << std::endl; //печатаем результат
 }
 
 SetEvent(hTerminate);     //запрос на завершение потока
 if(WAIT_OBJECT_0 != WaitForSingleObject(hThread, 5000))
 {
  TerminateThread(hThread, 3);
  std::cout << "Error in thread" << std::endl;
 }else
 {
  CloseHandle(hThread);
 }
 CloseHandle(hDataReady);
 CloseHandle(hReadyForProcessing);
 CloseHandle(hTerminate);
 
 return 0;
}

Как видите, мы, во-первых, избавились от вызова Sleep. Это существенно, т.к. теперь поток не будет ждать не миллисекундой больше, чем надо для начала обработки. Задержка будет вызвана только скоростью переключения потоков и их приоритетом. Кроме того, мы получили настоящую атомарность при изменении флагов. ОС гарантирует, что события (Event) меняются атомарно.

Ну и напоследок рассмотрим еще один пример с использованием Mutex.

HANDLE   hMutex;
std::list<int> lstQueue;
 
 
DWORD WINAPI ThreadProc(PVOID pParam)
{
 int val = *reinterpret_cast<int*>(pParam);
 for(int i =0; i < 100; i++)
 {
  WaitForSingleObject(hMutex, INFINITE);
  lstQueue.push_back(val);
  ReleaseMutex(hMutex);
  Sleep(1);        //Просто задержка для наглядности работы
 }
 return 0;
}
 
int _tmain(int argc, _TCHAR* argv[])
{
 //Создаем объекты синхронизации
 hMutex = CreateMutex(NULL, FALSE, NULL);
 
 //запускаем поток
 DWORD dwID;
 int   iVal1 = 1, iVal2 = 2;
 
 //Начинаем заполнять очередь.
 HANDLE hThread1 = CreateThread(NULL, 0, ThreadProc, &iVal1, 0, &dwID);
 HANDLE hThread2 = CreateThread(NULL, 0, ThreadProc, &iVal2, 0, &dwID);
 
 //Ждем завершеня потоков
 HANDLE hEvents[2] = {hThread1, hThread2};
 WaitForMultipleObjects(2, hEvents, TRUE, INFINITE);
 
 //Печать результата
 for(std::list<int>::iterator it = lstQueue.begin(); it != lstQueue.end(); it++)
 {
  std::cout << *it << ", "; //печатаем результат
 }
 
 CloseHandle(hThread1);
 CloseHandle(hThread2);
 CloseHandle(hMutex);
 
 return 0;
}

В этом примере два потока заполняют очередь. Несмотря на то, что добавление нового элемента в список - это всего одна строка, это комплексная операция. Она содержит помимо всего прочего и операции распределения памяти. Поэтому синхронизация доступа к списку обязательна. Можете убрать синхронизацию и посмотреть, что из этого выйдет.

В следующей статье мы поговорим про взаимную блокировку.

Copyright (C) Kudinov Alexander, 2006-2007

Перепечатка и использование материалов запрещена без писменного разрешения автора.


В избранное