I have an API endpoint on Django REST that accepts a .json file and does some computing on the data to create some "profiles", then does some error and validation checks, and finally returns the response of fail if something went wrong.
This json file can be very large, which slows down the process. I can't use Django's bulk_create because the model is an inherited model and bulk create doesn't work in this case.
So I am trying to use Python's multiprocessing (correct me if this is not the best approach) to create the model instances and then save in batches. It goes like this:
- Viewset:
create()calls the utils function to loop through the objects in the json file - For each object another function
actually_createwhere the model is instantiated - These objects are saved in a list (batch)
- The batches are saved inside a transaction block
Even though I have used the same approach for other use cases, I can't get it to work and I don't know why because the traceback is not helpful and I can't set breakpoints when doing multiprocessing.
Traceback:
Traceback (most recent call last):
File "/home/everton/.virtualenvs/venv/lib/python3.8/site-packages/django/db/backends/base/base.py", line 235, in _cursor
return self._prepare_cursor(self.create_cursor(name))
File "/home/everton/.virtualenvs/venv/lib/python3.8/site-packages/django/db/backends/postgresql/base.py", line 223, in create_cursor
cursor = self.connection.cursor()
psycopg2.InterfaceError: connection already closed
Would Celery be an alternative for this? I don't have experience with it but I read some use cases that could be similar to this.
views.py
from rest_framework import viewsets
from rest_framework.response import Response
from .utils import create_profiles
class CustomViewSet(viewsets.ViewSet):
def create(self, request, *args, **kwargs):
#...
try:
profiles = create_profiles(data)
serializer = self.serializer_class(profiles, many=True)
except Exception as e:
resp = Response(str(e))
resp.status_code = status.HTTP_400_BAD_REQUEST
return resp
return Response(serializer.data, status=status.HTTP_200_OK)
utils.py
from multiprocessing import cpu_count, Pool
from django.db import transaction, connection
def save_batch(batch):
with transaction.atomic():
for obj in batch:
obj.save()
return batch
def worker_init():
connection.close()
def create_profiles(data):
create_batches = []
default_batch_size = 1000
with Pool(processes=cpu_count(), initializer=worker_init) as pool:
profiles_to_create = []
for profile in pool.imap_unordered(actually_create, data):
profiles_to_create.append(profile)
if len(profiles_to_create) == default_batch_size:
create_batches.append(profiles_to_create)
profiles_to_create = []
if len(profiles_to_create) > 0:
create_batches.append(profiles_to_create)
return profiles_to_create
def actually_create(profile):
#...
object = MyModel(**profile)
return object