пятница, 16 декабря 2011 г.

Python для параллельных вычислений

XXI век давно уже наступил и параллельные вычисления перестали быть чем-то сложным и недостижимым. Многие платформы и языки имеют средства для реализации распараллеливания и замечательный язык Python не является исключением.
Python предоставляет разнообразные инструменты для реализации многозадачности: многопоточную, многопроцессную и т.н. сопрограммы. Не хотелось бы вдаваться в глубокую теорию и описывать отличия и особенности многопоточной и многопроцессной многозадачностей - всё это есть в книгах, которые без труда находятся в гугле или яндексе. Вкратце стоит лишь отметить, что для математических вычислений лучше всего подходит именно многопроцессная многозадачность, которая реализована в Python благодаря модулю multiprocessing.

Перейдём к практике. Допустим есть такая нехитрая задача:

Найти минимальное натуральное число с суммой цифр 80, которое делится на 1237.

(Это несколько упрощённый аналог задачи с замечательного сайта Диофант)

Наверняка, такую задачу можно решить аналитически, обладая хорошим математическим образованием. Но нас интересует, как решить её с помощью компьютера и Python.
Если применить простой последовательный перебор в одном цикле, решаться она будет долго. Имея многоядерный процессор такую задачу можно легко распараллелить и решать тоже перебором, но параллельно ;).
Поскольку предугадать на каком интервале находится искомое число трудно - поступим следующим образом: создадим функцию поиска, принимающую на входе интервал чисел и выдающую результат. Будем запускать параллельно несколько процессов с этой функцией с последовательными и относительно небольшими интервалами и проверять, найдено ли решение.

Итак, исходный код (использовался Python 2.7.2):

#!/usr/bin/env python/
# -*- coding: utf-8 -*-

import multiprocessing
import time

# очередь для сбора результатов
result_queue = multiprocessing.Queue()

# блокировка
output_lock = multiprocessing.Lock()

def sumdig(v):
    """ Возвращает сумму цифр числа v.
    """
    Sum = 0
    remain = v
    while remain > 10:
        Sum += remain % 10
        remain = remain // 10
    Sum += remain
    return Sum

def worker(num, n1,n2, qres):
    """
    Поиск числа с суммой цифр =80 в заданном диапазоне [n1, n2].
   
    num - номер воркера. Сугубо для того, чтобы отличать их друг от друга
    n1, n2 - диапазон значений
    qres - объект Queue, очередь для накопления результатов
    """
    t1 = time.time()
    x = n1
    res = 0
    while x <= n2:
        sm = sumdig(x)
        if sm == 80:
            res = x
            break
        x += 1237
    t2 = time.time()   
   
    # запросим блокировку для вывода на экран результатов работы воркера
    output_lock.acquire()
   
    print 'worker #%d Res=%d  time: %0.4f ' % (num, res, float(t2-t1)),
    print ' n1=%d, n2=%d' % (n1,n2)

    # отпустим блокировку
    output_lock.release()
   
    if res > 0:
        qres.put_nowait(res)


if __name__ == '__main__':

    # число воркеров принимаем равным числу ядер
    worker_count = multiprocessing.cpu_count()

    start_value = 0
    M = 100000
    while True:
        jobs = []
        for i in xrange(worker_count):
            p = multiprocessing.Process(target=worker,
                                        args=(i, start_value + 1237*M*i + 1237,
                                                 start_value + 1237*M*(i+1),
                                                 result_queue))
            jobs.append(p)
            p.start()

        # ожидаем завершения работы всех воркеров
        for w in jobs:
            w.join()

        # начальное значение для следующей итерации
        start_value = start_value + 1237*M*(worker_count)
        print "start_value=", start_value


        if result_queue.empty():
            print "next iteration..."
        else:
            print "--- THE END ---"
            break

    # из очереди выберем все ответы и найдём самый минимальный
    res = []
    while not result_queue.empty():
        res.append(result_queue.get())

    print 'RESULT=', min(res) 

Некоторые замечания по исходному коду:

1) Каким выбрать число воркеров - заранее предсказать сложно. При малом его количестве - задача будет решаться медленнее. При большом - можно здорово подгрузить свою систему, что повлечет замедление работы не только воркеров, но и прочих процессов - так, например, при тестировании этого примера на 6 и более воркерах (четырёхядерный процессор i3, Windows7 и было ещё запущено много всякого типа Aptana Studio, Bat! и 4 окна Firefox c кучей вкладок) я наблюдал такие эффекты - с заметными задержками открывались программы и перетаскивались окна. Диспетчер задач показывал почти 100% загрузку всех ядер процессора.
    Практика показала, что макс. число воркеров должно выбираться эмпирически, с учётом загрузки процессора и среднего времени работы воркеров (в нашем случае, интервалы поиска чисел не следует задавать слишком длинными) и относительно оптимальным значением явлется число равное числу ядер процессора.

2) Для нормального отображения хода работы, а именно чтобы сообщения не накладывались друг на друга, искажая смысл - нужно блокировать вывод на консоль. Пожалуй, тема блокировок стоит того, чтобы рассмотреть её отдельно.

3) В данном примере, из соображений упрощения, для хранения воркеров задействован обычный список (jobs). Хотя на самом деле в модуле multiprocessing есть реализация пула процессов, обладающая широким набором функций: multiprocessing.Pool().

Как видно из примера - многозадачность, основанная на механизме многопроцессности, не такая уж сложная и страшная штука, а Python вполне себе годен для решения математических задач путём распараллеливания.

1 комментарий:

  1. В функции sumdig(v)стоит сделать небольшую поправку

    def sumdig(v):
    """ Возвращает сумму цифр числа v.
    """
    Sum = 0
    remain = v
    while remain > 9:
    Sum += remain % 10
    remain = remain // 10
    Sum += remain
    return Sum

    ОтветитьУдалить