0

I'm trying to figure out how to rate-limit aiohttp for the following code:

import asyncio
import aiohttp
import json
import csv
import os.path

# Function to convert a list to string
def listToString(s):
    # initialize an empty string
    str1 = ""
    # traverse in the string
    for ele in s:
        str1 += ele
    # return string
    return str1

async def fetch(sem, session, url):
    async with sem:
        async with session.get(url) as response:
            return await response.json()

async def fetch_all(urls, loop):
    sem = asyncio.Semaphore(4)
    async with aiohttp.ClientSession(loop=loop) as session:
        results = await asyncio.gather(
            *[fetch(sem, session, url) for url in urls]
        )
    return results
if __name__ == '__main__':

    with open('testURLs.txt') as f:
        urls = [listToString(row) for row in csv.reader (f)]
        urls = tuple(urls)

    loop = asyncio.get_event_loop()
    data = loop.run_until_complete(fetch_all(urls, loop))
    for i, resp in enumerate(data):
        with open(f"data/{i}.json", "w") as f:
            json.dump(resp, f)

What I've Tried:

  • Adjusting the Sempahore() from 4-10000 does not seem to change the number of concurrent calls. When calling 5000 API calls I'm getting 429 status after about ~500.
  • My attempt to add a subclass someone wrote (below)

Note: I've tried to implement a subclass from this other SO post but the poster did not provide enough information.

import asyncio
import aiohttp
import aiosqlite
import json
import csv
import os.path
from blitzutils import ThrottledClientSession

# Function to convert a list to string
def listToString(s):
    # initialize an empty string
    str1 = ""
    # traverse in the string
    for ele in s:
        str1 += ele
    # return string
    return str1

async def fetch(sem, session, url):
    async with sem:
        async with session.get(url) as response:
            return await response.json() # here

async def fetch_all(urls, loop):
    sem = asyncio.Semaphore(10000)
    async with ThrottledClientSession(rate_limit=2, loop=loop) as session:
        results = await asyncio.gather(
            *[fetch(sem, session, url) for url in urls]
        )
    return results

if __name__ == '__main__':

    with open('testURLs.txt') as f:
        urls = [listToString(row) for row in csv.reader (f)]
        urls = tuple(urls)

    loop = asyncio.get_event_loop()
    data = loop.run_until_complete(fetch_all(urls, loop))
    for i, resp in enumerate(data):
        with open(f"data/{i}.json", "w") as f:
            json.dump(resp, f)

0 Answers0