Multiprocessing with processes in Python

0

I must write in the same file simultaneously for 4 processes, with threads I have no problem (even if I do not block the access, I guess for the GIL), but with processes is different because when you reach lock the process simply omits that part (this I guess based on the output file because it lacks several lines), I hope I explained well, I attach a large part of the code.

def cruzar(archivo,ini,fin,tam,pos,num):
        with open(archivo,"r") as infile1:
            ant=''
            lin=0
            con=0
            lineas=conteo(archivo)
            progreso.pasos(1.0/lineas)
            infile=islice(infile1,ini,fin,None)
            for linea in infile:
                linex=linea.replace("\n", "")
                fila=re.split('[;|, ]',linex)
                if len(fila)>tam:
                    if ant!=fila[pos]:
                        con=buscar(fila[pos],cuentas)
                        ant=fila[pos]
                    if con==1:
                        lock.acquire()#Aquí es donde hago el bloqueo para los demás procesos
                        try:
                            outfile.write(linea)
                        finally:
                            lock.release()# pero en vez de esperar a que se desbloquee simplemente omite esas lineas



def escribir(l,linea):
    global outfile
    outfile.write(linea)

The same code works for me using Threads, I only change the statements and already, I tried to write all the possible code, thank you very much for the collaboration, I would like to know what I have wrong, why the difference.

    
asked by Michael Alexander Quinonez 24.03.2017 в 02:53
source

2 answers

0

A main difference between threads and processes is that the threads share memory space while the processes each have their own memory space. Besides what comments @mementum in your answer (pass the padlock as an argument to each process) if you print the outfile memory address in each process you will see that they are different objects, located in different memory spaces. This causes that when opening the file in 'w' mode, each process finishes rewriting the file, at the end it only contains the output of the last process to finish. One solution is to open the file in 'append' mode:

outfile = open(salida,"w")#crea el archivo que contendra el cruce

You can pass the file path to each process and open it in each script, this is not efficient but if each process opens a few times the file can serve. The question is, why do not we open the file and pass the object as an argument to each subprocess as we do with the padlock? It turns out that multiprocessing uses Pickle to serialize the data and the io.TextIOWrapper objects are not serializable.

However, these types of implementations are not to my liking when using multiprocesses that require writing in the same file. There is another approach following the famous 'Divide et impera' that does not require the use of padlocks and that avoids headaches and I think it is safer than enabling access to the process of writing the file.

The idea is simple, create a process that is responsible for writing the outputs and that only he can write in the file. The other processes are responsible for doing the work and communicate with the writing process through a queue. Each process reads from the source file, processes the data and places the output in the write queue. The process that is responsible for creating the output file takes the information from the queue and saves it in the file. In this way you separate the writing in the data processing file, the program continues to use multiprocesses but the writing is not concurrent so you avoid many problems.

The problem that arises is that the process that is responsible for writing the outputs in the file should know if the others have finished before finishing it. We must communicate the status between processes, this can be done in many ways from events, shared memory or as simple, pass an agreed flag to the queue when a process ends so that the process that writes the output knows that it has finished:

from multiprocessing import Process, Queue, current_process
from time import sleep
from random import randint

def f(datos, cola):
    try:
        for n in datos:
            cola.put('{}x2={} --- Procesado por {}\n'.format(n,n*2,current_process().name))
            sleep(randint(1,100)/100)
    except: raise
    finally: cola.put('#end')

def write(ruta, cola, procesos):
    with open(ruta, 'w') as f:
        while procesos:
            data = cola.get()
            if data == '#end':
                procesos -= 1
            else:
                f.write(data)      


if __name__ == '__main__':
    datos = [0,25,145,98,25518,6868,15156,85,2,89879,64,]
    cola_escritura = Queue()
    output_path = 'salida.txt'

    workers = [Process(name = 'Proceso_{}'.format(i), target=f, args=(datos, cola_escritura,)) for i in range(4)]
    writer = Process(target = write, args=(output_path, cola_escritura, len(workers),))

    writer.start()
    for w in workers:
        w.start()

    writer.join()
    for w in workers:
        w.join()

The example is something stupid, it's just to show the idea. We start 4 processes that do the 'hard tasks' and 1 that is responsible for the writing. Each process receives a list with numbers, processes them multiplying them by 2 and sends the operation to the queue and the name of the process that made it. To simulate long processing operations, sleep is used together with a random waiting time.

All processes share a write queue where they send the outputs to be written in the file. This is handled by the writer process. When each process ends send '#end' to the queue, this is used so that writer is waiting to receive new data until all processes finish.

An output file would be something like this:

  

0x2 = 0 --- Processed by Process_2
  0x2 = 0 --- Processed by Process_0
  0x2 = 0 --- Processed by Process_3
  0x2 = 0 --- Processed by Process_1
  25x2 = 50 --- Processed by Process_2
  25x2 = 50 --- Processed by Process_3
  145x2 = 290 --- Processed by Process_2
  25x2 = 50 --- Processed by Process_0
  25x2 = 50 --- Processed by Process_1
  98x2 = 196 --- Processed by Process_2
  145x2 = 290 --- Processed by Process_3
  145x2 = 290 --- Processed by Process_0
  25518x2 = 51036 --- Processed by Process_2
  ...

    
answered by 26.03.2017 / 15:35
source
0

Since the question mentions several times that the code works with threading , but not with multiprocessing , let's assume:

  • The Lock is being created at module level

When threads are executed on that module, each thread has access to lock (which looks like a global scope variable) and manipulates the object.

In the case of multiprocessing , the module is running in different processes and each process has its own global variable with its own instance of Lock in lock .

The solution:

  • The process that starts the other processes creates a lock = multiprocessing.Lock

  • The creation of the process is passed as an argument. p = multiprocessing.Process(target=xxx, args=(lock,))

And it can be used in the xxx function. It can also be passed in kwargs and access lock by name.

    
answered by 25.03.2017 в 22:30