Update
I originally implemented the rate-limiting algorithm that was referenced in the comment made by @balmy and it was noticed that there are times where the rate can be exceeded. This phenomenon was commented upon by @mindvirus where the OP was trying for 5 messages in an 8 second period:
This is good, but can exceed the rate. Let's say at time 0 you forward 5 messages, then at time N * (8/5) for N = 1, 2, ... you can send another message, resulting in more than 5 messages in an 8 second period.
So I am now using a new rate-limiting algorthm.
I have created two classes, RateLimitedProcessPool and RateLimitedThreadPool for mulitprocessing and multithreading respectively based on the algorithm presented in What's a good rate limiting algorithm?. These classes are like the standard mulitprocessing.pool.Pool and multiprocessing.pool.ThreadPool classes except the __init__ methods take two extra keyword arguments rate and per that together specify the maximum rate per second that the apply_async method can be called. For example, values rate=7 and per=3 implies that successive calls to apply_async will throttle so as to only allow a maximum rate of 7 calls every 3 seconds.
The following code demonstrates this with a simple worker function that emulates the OP's situation where the worker function takes 10 seconds to execute and must be limited to a maximum rate of 5 calls per second. We need to invoke this function 20 times and so the best performance we can achieve is a total run time of approximately 13 seconds.
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class RateLimitedPool:
# There is an a lag between the first call to apply_async and the first task actually starting:
LAG_TIME = .2 # seconds - needs to be fine-tuned:
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, (int, float)) and per > 0
self.rate = rate
self.per = per
self.count = 0
self.start_time = None
self.first_time = True
def _check_allowed(self):
current_time = time.time()
if self.start_time is None:
self.start_time = current_time
self.count = 1
return True
elapsed_time = current_time - self.start_time
if self.first_time:
elapsed_time -= self.LAG_TIME
if elapsed_time >= self.per:
self.start_time = current_time
self.count = 1
self.first_time = False
return True
if self.count < self.rate:
self.count += 1
return True
return False
def apply_async(self, *args, **kwargs):
while not self._check_allowed():
time.sleep(.1) # This can be fine-tuned
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x = ', x)
time.sleep(10)
return x, x * x
def main():
args = range(20)
pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Prints:
2021-10-03 07:19:48.002628 x = 0
2021-10-03 07:19:48.002628 x = 1
2021-10-03 07:19:48.002628 x = 3
2021-10-03 07:19:48.002628 x = 4
2021-10-03 07:19:48.002628 x = 2
2021-10-03 07:19:49.005625 x = 5
2021-10-03 07:19:49.005625 x = 6
2021-10-03 07:19:49.005625 x = 8
2021-10-03 07:19:49.005625 x = 7
2021-10-03 07:19:49.005625 x = 9
2021-10-03 07:19:50.008775 x = 10
2021-10-03 07:19:50.008775 x = 11
2021-10-03 07:19:50.008775 x = 13
2021-10-03 07:19:50.008775 x = 12
2021-10-03 07:19:50.008775 x = 14
2021-10-03 07:19:51.012774 x = 15
2021-10-03 07:19:51.012774 x = 16
2021-10-03 07:19:51.012774 x = 17
2021-10-03 07:19:51.012774 x = 18
2021-10-03 07:19:51.012774 x = 19
Total elapsed time: 13.015560150146484
CPU Intensive Example
In the following example I am using a RateLimitedProcessPool since my worker function is 100% CPU taking approximately 10 seconds to execute on my desktop. I only have 8 logical cores (4 physical cores), so my pool size is 8 and for this demo I am submitting 8 tasks with a rate of 3 tasks per second. The second 3 tasks will start approximately 1 second after the first 3 and the next 2 tasks will start 1 second after that. Because the number of physical cores becomes a limiting factor, the total running time is a little over 21 seconds.
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class RateLimitedPool:
# There is an a lag between the first call to apply_async and the first task actually starting:
LAG_TIME = .2 # seconds - needs to be fine-tuned:
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, (int, float)) and per > 0
self.rate = rate
self.per = per
self.count = 0
self.start_time = None
self.first_time = True
def _check_allowed(self):
current_time = time.time()
if self.start_time is None:
self.start_time = current_time
self.count = 1
return True
elapsed_time = current_time - self.start_time
if self.first_time:
elapsed_time -= self.LAG_TIME
if elapsed_time >= self.per:
self.start_time = current_time
self.count = 1
self.first_time = False
return True
if self.count < self.rate:
self.count += 1
return True
return False
def apply_async(self, *args, **kwargs):
while not self._check_allowed():
time.sleep(.1) # This can be fine-tuned
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
ONE_SECOND_ITERATIONS = 20_000_000
def one_second():
sum = 0
for _ in range(ONE_SECOND_ITERATIONS):
sum += 1
return sum
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 3 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x = ', x)
for _ in range(10):
one_second()
return x, x * x
def main():
args = range(8)
pool = RateLimitedProcessPool(8, rate=3, per=1) # 3 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Prints:
2021-10-03 09:51:32.857166 x = 0
2021-10-03 09:51:32.859168 x = 1
2021-10-03 09:51:32.864166 x = 2
2021-10-03 09:51:33.899890 x = 5
2021-10-03 09:51:33.899890 x = 3
2021-10-03 09:51:33.907888 x = 4
2021-10-03 09:51:34.924889 x = 6
2021-10-03 09:51:34.925888 x = 7
Total elapsed time: 21.22123622894287