Goal
I'm trying to get postgres to parallel process a REST API call across a whole column of data (~20 million rows).
First Attempt
a UDF to be called on the column, like so:
select col1, public.py_pgrest(col1) as enriched_data
from my_table
based on this SO answer I'm thinking (code's got kinks, don't grock):
CREATE OR REPLACE FUNCTION public.py_pgrest(data_to_enrich text)
RETURNS text
LANGUAGE plpython2u
AS $function$
import urllib2
from urllib2 import Request, urlopen, URLError, HTTPError
uri = "http://enrichment.api/q=" + data_to_enrich
req = Request(uri)
try:
enriched_data = json.loads(urlopen(req).read()).get('my_enrichment_field')
except HTTPError as e:
return e
except URLError as e:
if hasattr(e, 'reason'):
return e.reason
elif hasattr(e, 'code'):
return e.code
else:
return e
return enriched_data
$function$
PARALLEL SAFE;
Problem
Even though the function is marked PARALLEL SAFE, this only helps for the query planner to allocate multiple workers to it and then gather the results.
Question
Is there some pattern that allows me to parallelize the UDF within each worker?
a tentative and coarse python example - just to illustrate the intent (based on this SO answer):
async def parallel_rest_calls():
async with aiohttp.ClientSession() as session:
tasks = []
for row in column:
tasks.append(call_rest_api(session=session, data=row, **kwargs))
enriched_data_col = await asyncio.gather(*tasks, return_exceptions=True)
return enriched_data_col
Granted, the python example still needs to map the input row to the correct output, but you get the gist, it's about utilizing the IO time to make more calls.