A main difference between threads and processes is that the threads share memory space while the processes each have their own memory space. Besides what comments @mementum in your answer (pass the padlock as an argument to each process) if you print the outfile memory address in each process you will see that they are different objects, located in different memory spaces. This causes that when opening the file in 'w'
mode, each process finishes rewriting the file, at the end it only contains the output of the last process to finish. One solution is to open the file in 'append'
mode:
outfile = open(salida,"w")#crea el archivo que contendra el cruce
You can pass the file path to each process and open it in each script, this is not efficient but if each process opens a few times the file can serve. The question is, why do not we open the file and pass the object as an argument to each subprocess as we do with the padlock? It turns out that multiprocessing uses Pickle to serialize the data and the io.TextIOWrapper
objects are not serializable.
However, these types of implementations are not to my liking when using multiprocesses that require writing in the same file. There is another approach following the famous 'Divide et impera' that does not require the use of padlocks and that avoids headaches and I think it is safer than enabling access to the process of writing the file.
The idea is simple, create a process that is responsible for writing the outputs and that only he can write in the file. The other processes are responsible for doing the work and communicate with the writing process through a queue. Each process reads from the source file, processes the data and places the output in the write queue. The process that is responsible for creating the output file takes the information from the queue and saves it in the file. In this way you separate the writing in the data processing file, the program continues to use multiprocesses but the writing is not concurrent so you avoid many problems.
The problem that arises is that the process that is responsible for writing the outputs in the file should know if the others have finished before finishing it. We must communicate the status between processes, this can be done in many ways from events, shared memory or as simple, pass an agreed flag to the queue when a process ends so that the process that writes the output knows that it has finished:
from multiprocessing import Process, Queue, current_process
from time import sleep
from random import randint
def f(datos, cola):
try:
for n in datos:
cola.put('{}x2={} --- Procesado por {}\n'.format(n,n*2,current_process().name))
sleep(randint(1,100)/100)
except: raise
finally: cola.put('#end')
def write(ruta, cola, procesos):
with open(ruta, 'w') as f:
while procesos:
data = cola.get()
if data == '#end':
procesos -= 1
else:
f.write(data)
if __name__ == '__main__':
datos = [0,25,145,98,25518,6868,15156,85,2,89879,64,]
cola_escritura = Queue()
output_path = 'salida.txt'
workers = [Process(name = 'Proceso_{}'.format(i), target=f, args=(datos, cola_escritura,)) for i in range(4)]
writer = Process(target = write, args=(output_path, cola_escritura, len(workers),))
writer.start()
for w in workers:
w.start()
writer.join()
for w in workers:
w.join()
The example is something stupid, it's just to show the idea. We start 4 processes that do the 'hard tasks' and 1 that is responsible for the writing. Each process receives a list with numbers, processes them multiplying them by 2 and sends the operation to the queue and the name of the process that made it. To simulate long processing operations, sleep
is used together with a random waiting time.
All processes share a write queue where they send the outputs to be written in the file. This is handled by the writer
process. When each process ends send '#end'
to the queue, this is used so that writer
is waiting to receive new data until all processes finish.
An output file would be something like this:
0x2 = 0 --- Processed by Process_2
0x2 = 0 --- Processed by Process_0
0x2 = 0 --- Processed by Process_3
0x2 = 0 --- Processed by Process_1
25x2 = 50 --- Processed by Process_2
25x2 = 50 --- Processed by Process_3
145x2 = 290 --- Processed by Process_2
25x2 = 50 --- Processed by Process_0
25x2 = 50 --- Processed by Process_1
98x2 = 196 --- Processed by Process_2
145x2 = 290 --- Processed by Process_3
145x2 = 290 --- Processed by Process_0
25518x2 = 51036 --- Processed by Process_2
...