0

I have an API endpoint on Django REST that accepts a .json file and does some computing on the data to create some "profiles", then does some error and validation checks, and finally returns the response of fail if something went wrong. This json file can be very large, which slows down the process. I can't use Django's bulk_create because the model is an inherited model and bulk create doesn't work in this case.

So I am trying to use Python's multiprocessing (correct me if this is not the best approach) to create the model instances and then save in batches. It goes like this:

  • Viewset: create() calls the utils function to loop through the objects in the json file
  • For each object another function actually_create where the model is instantiated
  • These objects are saved in a list (batch)
  • The batches are saved inside a transaction block

Even though I have used the same approach for other use cases, I can't get it to work and I don't know why because the traceback is not helpful and I can't set breakpoints when doing multiprocessing.

Traceback:

Traceback (most recent call last):
  File "/home/everton/.virtualenvs/venv/lib/python3.8/site-packages/django/db/backends/base/base.py", line 235, in _cursor
    return self._prepare_cursor(self.create_cursor(name))
  File "/home/everton/.virtualenvs/venv/lib/python3.8/site-packages/django/db/backends/postgresql/base.py", line 223, in create_cursor
    cursor = self.connection.cursor()
psycopg2.InterfaceError: connection already closed

Would Celery be an alternative for this? I don't have experience with it but I read some use cases that could be similar to this.

views.py

from rest_framework import viewsets
from rest_framework.response import Response

from .utils import create_profiles


class CustomViewSet(viewsets.ViewSet):
    def create(self, request, *args, **kwargs):
        #...
        try:
            profiles = create_profiles(data)
            serializer = self.serializer_class(profiles, many=True)
        except Exception as e:
            resp = Response(str(e))
            resp.status_code = status.HTTP_400_BAD_REQUEST
            return resp

        return Response(serializer.data, status=status.HTTP_200_OK)

utils.py

from multiprocessing import cpu_count, Pool
from django.db import transaction, connection

def save_batch(batch):
    with transaction.atomic():
        for obj in batch:
            obj.save()
    return batch

def worker_init():
    connection.close()

def create_profiles(data):
    create_batches = []
    default_batch_size = 1000

    with Pool(processes=cpu_count(), initializer=worker_init) as pool:
        profiles_to_create = []
        for profile in pool.imap_unordered(actually_create, data):
            profiles_to_create.append(profile)
       
            if len(profiles_to_create) == default_batch_size:
                create_batches.append(profiles_to_create)
                profiles_to_create = []
       
        if len(profiles_to_create) > 0:
            create_batches.append(profiles_to_create)
    
    return profiles_to_create

def actually_create(profile):
    #...
    object = MyModel(**profile)
    return object
everspader
  • 712
  • 4
  • 21
  • Take a look at this question: https://stackoverflow.com/questions/8242837/django-multiprocessing-and-database-connections. Though I don't think it's a good idea in general to spawn processes for every request. Instead I'd try to manually prepare bulk insert statements for every table in the hierarchy – awesoon Feb 07 '22 at 11:45

0 Answers0