Como gerar processos filhos paralelas em um sistema multi-processador?

votos
39

Eu tenho um script Python que eu quero usar como um controlador para outro script Python. Eu tenho um servidor com 64 processadores, por isso quero gerar até 64 processos filhos desta segunda script Python. O script criança é chamado:

$ python create_graphs.py --name=NAME

onde NOME é algo como XYZ, ABC, NYU etc.

No meu script controlador pai eu recuperar a variável nome de uma lista:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

Então, minha pergunta é, qual é a melhor maneira de gerar off esses processos como crianças? Eu quero limitar o número de crianças a 64 de cada vez, por isso necessita de acompanhar o status (se o processo filho terminar ou não) para que eu possa eficientemente manter toda a geração de execução.

Eu olhei para usando o pacote subprocesso, mas rejeitou-a, porque ele só gera uma criança de cada vez. Eu finalmente encontrei o pacote multiprocessador, mas eu admito a ser oprimido por toda a threads vs. subprocessos documentação.

Neste momento, o meu script usa subprocess.callpara gerar apenas uma criança de cada vez e se parece com isso:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = [python, /path/to/create_graphs.py, --name=+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

Eu realmente quero que ele gere até 64 crianças de cada vez. Em outras perguntas stackoverflow eu vi pessoas usando fila, mas parece que cria um impacto no desempenho?

Publicado 19/05/2009 em 20:39
fonte usuário
Em outras línguas...                            


4 respostas

votos
57

O que você está procurando é a piscina processo de classe em multiprocessamento.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

E aqui é um exemplo de cálculo para torná-lo mais fácil de entender. A seguir irá dividir 10000 tarefas em processos de N, onde N é a contagem de cpu. Note que eu estou passando Nenhum como o número de processos. Isto fará com que a classe Pool para usar cpu_count para o número de processos ( referência )

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results
Respondeu 19/05/2009 em 21:26
fonte usuário

votos
2

Aqui está a solução que eu vim acima, com base em Nadia e comentários de Jim. Não tenho a certeza se é a melhor maneira, mas funciona. O script criança original que está sendo chamado precisa ser um shell script, porque eu preciso usar alguns 3rd partido apps incluindo Matlab. Então eu tive que tirá-lo de Python e codificá-lo em bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Será que isso parece ser uma solução razoável? Eu tentei usar o formato de loop while de Jim, mas meu script apenas retornou nada. Eu não sou certo porque isso seria. Aqui é a saída quando eu executar o script com Jim do loop 'while' substituindo o 'para' loop:

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

Quando eu executá-lo com o 'para' loop, eu recebo algo mais significativo:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

Então isso funciona, e estou feliz. No entanto, eu ainda não entendo por que eu não posso usar loop 'while' estilo de Jim em vez do laço 'for' Eu estou usando. Obrigado por toda a ajuda - Estou impressionado com a amplitude do conhecimento @ stackoverflow.

Respondeu 17/06/2009 em 17:16
fonte usuário

votos
1

Eu não acho que você precisa de fila a menos que você pretende obter dados para fora das aplicações (que se você quer de dados, eu acho que pode ser mais fácil para adicioná-lo a um banco de dados de qualquer maneira)

mas tentar este sobre para o tamanho:

colocar o conteúdo de seu script create_graphs.py tudo em uma função chamada "create_graphs"

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

Eu sei que isso irá resultar em 1 menos fios do que os processadores, o que é provavelmente bom, deixa um processador de gerir os fios, disco I / O, e outras coisas que acontecem no computador. Se você decidir que quer usar o último núcleo basta adicionar um a ele

Editar : Eu acho que pode ter interpretado mal o propósito de my_list. Você não precisa de my_listse manter a par dos fios em tudo (como eles estão todos referenciados pelos itens na threadslista). Mas esta é uma boa maneira de alimentar a entrada de processos - ou ainda melhor: usar uma função de gerador;)

O objetivo do my_listethreads

my_listdetém os dados que você precisa para processar em sua função
threadsé apenas uma lista dos tópicos atualmente em execução

o loop while faz duas coisas, começar novos tópicos para processar os dados e verificar se algum tópicos são feitas em execução.

Então, enquanto você tiver qualquer um (a) mais dados para processar, ou (b) tópicos que não estão acabados em execução .... que deseja programar para continuar correndo. Uma vez que ambas as listas estão vazias que irá avaliar como Falseeo loop while vai sair

Respondeu 19/05/2009 em 21:04
fonte usuário

votos
1

Eu definitivamente utilizar multiprocessamento em vez de rolar minha própria solução usando subprocesso.

Respondeu 19/05/2009 em 21:04
fonte usuário

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more