0

goal: spawn multiple processes at the same time to read through the database to process (read & write). Since each process is opening it's own connection to the database, it should not be a problem (in theory)

problems:

  • in this ex, 3 processes are started and the same message is printed over for the 3 workers --> even though each connection should be refreshed at:

while check_remaining_rows() not in (None, 0, -1):

  • so it seem saltho the processes are running simultaneously, they are not accessing the database individually. I wrapped the SQL connection in each function too, so SQL should (?) would handle the connection pooling (and automatically in separate processes)

  • the processes are definitely working in paralel b/c I can ramp them to 50 and see the SQS load quickly

would really appreciate some help, I think I'm very close.


migrate_data.py

def start_sending_msg_sqs(_):
    """
    param: _ : discarded b/c just starting multiple processes at the same time
    """
    def utility_create_single_sqs_entry(outer_id, products):
        """ return: signle SQS entry for batch messsage
                    use results to append to a list to create a batch message
        """
        id = outer_id
        message_body = {
            "type": "cache_track",
            "data": {
                "organization_id": organization_id,
                "outer_id": outer_id,
                "products": json.loads(products)
            }
        }
        return {"Id": id, 'MessageBody': json.dumps(message_body)}

    def worker_write_to_sqs(json_bodies_list):
        """ collect 10 complete json_bodies to send to SQS
            returns: -> response JSON from SQS
                        needs error handling
        """

        boto3.setup_default_session(profile_name='name')
        sqs = boto3.client('sqs')
        response = sqs.send_message_batch(
            QueueUrl=QUEUE_URL,
            Entries=json_bodies_list
        )
        return response


    def utility_check_sqs_response_and_update_db(response):
        """ helper function: updates the tracks with "1 == sent_to_sqs to mark track as delivered"""

        success_messages = response['Successful']
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        for msg in success_messages:  ## only mark successful batch messages as sent_to_sqs

            outer_id = msg.get('Id')
            cursor.execute('''UPDATE tracks SET sent_to_sqs = ? WHERE outer_id= ?''', (1, outer_id))  ## ONLY WRITE STATEMENT
            conn.commit()  ## commit immediately
            # print(outer_id, 'Done')


    ## MAIN ##

    ## create all connections to db
    def main():
        conn = sqlite3.connect(db_path)
        cur_read = conn.cursor()

        cur_read.execute(
            '''SELECT * FROM tracks WHERE sent_to_sqs<>1 LIMIT 10''') # read 10 messages and send only the alpha numeric ones.
        batch_messages = []

        for row in cur_read:
            outer_id, ingestion_id, products, sent_to_sqs = row
            
            if sent_to_sqs != 1 :
                single_entry = utility_create_single_sqs_entry(outer_id=outer_id, products=products)
                batch_messages.append(single_entry)

        response = worker_write_to_sqs(json_bodies_list=batch_messages)
        print(f"process_id: {os.getpid()}\n response: {response} ")
        # pprint(response)
        utility_check_sqs_response_and_update_db(response=response)



    def check_remaining_rows():
        conn = sqlite3.connect(db_path)
        c = conn.cursor()
        c.execute('''SELECT * FROM tracks WHERE sent_to_sqs IS NULL LIMIT 10''')
        num_rows = c.fetchone()
        return num_rows

    # conn = sqlite3.connect(db_path)
    while check_remaining_rows() not in (None, 0, -1):
        main()


if __name__ == "__main__":
    processes = 3  # Specify number of processes here
    p = Pool(processes)
    p.map(start_sending_msg_sqs, range(processes))

console log: 3 processes started even though there's an error loading them. I can verify by looking at the process_id & seeing the SQS queue fill up

   process_id: 73654
 response: {'Successful': [{'Id': 'GB-SMU-42-18178', 'MessageId': '7c655e7d-9498-44e2-a772-8b96cc29dd7a', 'MD5OfMessageBody': '2db74bcf09416874aa15ff8aa295ae11'}, {'Id': 'US-TCC-10-48904', 'MessageId': '564f6469-78b2-4c0e-bf24-49ffbd432992', 'MD5OfMessageBody': '3c20481969a11ac3c9cd80e2b39e838b'}, {'Id': 'GB-SMU-40-64382', 'MessageId': 'f5aaa4dc-4d2d-4114-a865-5d0459fd7b2d', 'MD5OfMessageBody': 'aa3e07bd33bb94c5e32438686d24fb8f'}, {'Id': 'GB-SMU-21-96822', 'MessageId': '58c97340-66fa-427f-91b5-72f8b172b84d', 'MD5OfMessageBody': 'b5ac5f2b4b3ede4cd65f79648ed7ffb4'}, {'Id': 'GB-SMU-37-62595', 'MessageId': 'f98cb73f-84a2-438c-954c-9993d88ec21e', 'MD5OfMessageBody': 'dc1361725290fa49bb30c281bb6ece6c'}, {'Id': 'NL-Q27-16-00086', 'MessageId': '577985dd-94e7-4dd5-a0bf-361c43615cf5', 'MD5OfMessageBody': '3b5b474987a012162ac7bee3582ccbc7'}, {'Id': 'GB-SMU-23-13895', 'MessageId': 'cd6e751b-4696-4f38-ae86-313a3ce36d69', 'MD5OfMessageBody': '55354e20544f6c2f517719113b8da0c0'}, {'Id': 'US-UAN-13-00013', 'MessageId': 'a749dfa1-eaea-45b2-b716-141fbb20f321', 'MD5OfMessageBody': '1f583323ff212f3e2607f0a30fc1864a'}, {'Id': 'GB-SMU-28-72682', 'MessageId': 'c8a980be-b6b1-4f7b-8a81-5f595055a94c', 'MD5OfMessageBody': '1119474bdf1e593a48ab52cf454d2eb6'}, {'Id': 'GB-SMU-34-92892', 'MessageId': 'd31f5818-6a4a-450a-b42b-3bc11d6baec4', 'MD5OfMessageBody': 'd17ec85f80c1272450b40ad71eb80ab3'}], 'ResponseMetadata': {'RequestId': 'eb8a9b66-07e8-5567-9810-c1c6df4dd2c2', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'eb8a9b66-07e8-5567-9810-c1c6df4dd2c2', 'date': 'Tue, 08 Mar 2022 04:07:01 GMT', 'content-type': 'text/xml', 'content-length': '2380'}, 'RetryAttempts': 0}} 
process_id: 73652
 response: {'Successful': [{'Id': 'GB-SMU-42-18178', 'MessageId': 'fe0a5588-3fbd-432f-8af6-72afffad8ea0', 'MD5OfMessageBody': '2db74bcf09416874aa15ff8aa295ae11'}, {'Id': 'US-TCC-10-48904', 'MessageId': '7f59b09f-8687-4c83-b890-80251900c86f', 'MD5OfMessageBody': '3c20481969a11ac3c9cd80e2b39e838b'}, {'Id': 'GB-SMU-40-64382', 'MessageId': 'df82148c-2188-4667-b282-19dc6f957328', 'MD5OfMessageBody': 'aa3e07bd33bb94c5e32438686d24fb8f'}, {'Id': 'GB-SMU-21-96822', 'MessageId': 'e0b07048-05e6-4450-b8c5-a911e54437f1', 'MD5OfMessageBody': 'b5ac5f2b4b3ede4cd65f79648ed7ffb4'}, {'Id': 'GB-SMU-37-62595', 'MessageId': '1fa4e210-6ebd-440b-a7ba-385c7a828075', 'MD5OfMessageBody': 'dc1361725290fa49bb30c281bb6ece6c'}, {'Id': 'NL-Q27-16-00086', 'MessageId': 'd2abc71f-6a57-4575-92d4-99a8374b7146', 'MD5OfMessageBody': '3b5b474987a012162ac7bee3582ccbc7'}, {'Id': 'GB-SMU-23-13895', 'MessageId': '053fedae-d320-4d78-ad32-9b8eefdc483f', 'MD5OfMessageBody': '55354e20544f6c2f517719113b8da0c0'}, {'Id': 'US-UAN-13-00013', 'MessageId': '80a3ecd3-5c8f-448c-95e4-cd676ac65fac', 'MD5OfMessageBody': '1f583323ff212f3e2607f0a30fc1864a'}, {'Id': 'GB-SMU-28-72682', 'MessageId': '4221ab23-a22f-4ba6-aaf5-0bfe55f3474d', 'MD5OfMessageBody': '1119474bdf1e593a48ab52cf454d2eb6'}, {'Id': 'GB-SMU-34-92892', 'MessageId': 'ac3bb5ad-29e9-41e3-93de-894bc11abf18', 'MD5OfMessageBody': 'd17ec85f80c1272450b40ad71eb80ab3'}], 'ResponseMetadata': {'RequestId': '0fd8d418-493d-5ea4-bcf8-8eb89198fa1f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '0fd8d418-493d-5ea4-bcf8-8eb89198fa1f', 'date': 'Tue, 08 Mar 2022 04:07:01 GMT', 'content-type': 'text/xml', 'content-length': '2380'}, 'RetryAttempts': 0}} 
process_id: 73653
 response: {'Successful': [{'Id': 'GB-SMU-42-18178', 'MessageId': '4f232f4c-4868-4797-a4fd-985eb6951492', 'MD5OfMessageBody': '2db74bcf09416874aa15ff8aa295ae11'}, {'Id': 'US-TCC-10-48904', 'MessageId': '4eec5b21-456f-4647-a1ed-0854c39fbcf4', 'MD5OfMessageBody': '3c20481969a11ac3c9cd80e2b39e838b'}, {'Id': 'GB-SMU-40-64382', 'MessageId': '060198b6-c7f7-434a-a512-cba3f240f8cc', 'MD5OfMessageBody': 'aa3e07bd33bb94c5e32438686d24fb8f'}, {'Id': 'GB-SMU-21-96822', 'MessageId': 'adba9480-2279-4abe-a8c2-e9d96aec1da5', 'MD5OfMessageBody': 'b5ac5f2b4b3ede4cd65f79648ed7ffb4'}, {'Id': 'GB-SMU-37-62595', 'MessageId': 'ae89b053-b017-493f-b696-a57398a9c6e2', 'MD5OfMessageBody': 'dc1361725290fa49bb30c281bb6ece6c'}, {'Id': 'NL-Q27-16-00086', 'MessageId': 'c7d2b8af-5e0a-4b5b-adf2-7929c9ee87ec', 'MD5OfMessageBody': '3b5b474987a012162ac7bee3582ccbc7'}, {'Id': 'GB-SMU-23-13895', 'MessageId': '1ea6e56f-24c2-454c-bfc6-82ea6240d848', 'MD5OfMessageBody': '55354e20544f6c2f517719113b8da0c0'}, {'Id': 'US-UAN-13-00013', 'MessageId': '0dac3476-ff3e-44a3-924c-8c580b87aaa5', 'MD5OfMessageBody': '1f583323ff212f3e2607f0a30fc1864a'}, {'Id': 'GB-SMU-28-72682', 'MessageId': 'ce99f300-5547-4372-9e0d-df23d6f54cc4', 'MD5OfMessageBody': '1119474bdf1e593a48ab52cf454d2eb6'}, {'Id': 'GB-SMU-34-92892', 'MessageId': 'e7dd8a9f-784d-485f-b634-3ff5e16dbc3f', 'MD5OfMessageBody': 'd17ec85f80c1272450b40ad71eb80ab3'}], 'ResponseMetadata': {'RequestId': '3249fc5e-8fed-5294-a218-a6fe57488434', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '3249fc5e-8fed-5294-a218-a6fe57488434', 'date': 'Tue, 08 Mar 2022 04:07:01 GMT', 'content-type': 'text/xml', 'content-length': '2380'}, 'RetryAttempts': 0}} 
process_id: 73653
 response: {'Successful': [{'Id': 'GB-SMU-17-30437', 'MessageId': '7f29e9ef-f0b4-4515-aca8-2a079dc037ed', 'MD5OfMessageBody': '610f7b2de912c6c18a85327d650f540a'}, {'Id': 'GB-SMU-14-74568', 'MessageId': '18b37089-ce13-4078-b5f3-93af6833abf8', 'MD5OfMessageBody': '7168727cbf8d7bb211c2d698aa9d9c06'}, {'Id': 'GB-SMU-26-85497', 'MessageId': 'fe8c585f-47a8-4045-a5fb-aa1543d543cd', 'MD5OfMessageBody': 'f19545debd4065cc6bb99f9718c1fa11'}, {'Id': 'GB-SMU-37-46262', 'MessageId': 'a93f5ca4-f4f0-43b0-b492-9504abee1a85', 'MD5OfMessageBody': 'e42add38b3bbea983e1104f3683e4d9c'}, {'Id': 'GB-SMU-24-09078', 'MessageId': '15e9bb63-41ff-429f-9fe5-ac3b87e591ec', 'MD5OfMessageBody': '8cc84ffefbf94a46c2b50e00a8493094'}, {'Id': 'GB-SMU-15-93219', 'MessageId': '6c888a8d-7526-4e08-9c16-85f3469a874c', 'MD5OfMessageBody': '051c1247367e2fd2d0fd1ef022ee2bb7'}, {'Id': 'GB-2AF-09-00034', 'MessageId': '31a8826c-3c5e-49b4-ab80-8f5070b1313b', 'MD5OfMessageBody': '1e2b891917e6b9515b84c4e2ffa97c7e'}, {'Id': 'US-TC9-09-79216', 'MessageId': 'afdc9c7e-aca0-4a87-8955-c174464f7ccb', 'MD5OfMessageBody': 'd9ed6780655c5ad8241f95af4fcd26ce'}, {'Id': 'GB-SMU-26-29739', 'MessageId': '76908807-f3d2-41b4-9524-73030f4f1545', 'MD5OfMessageBody': 'a589e4eab3b0bad0f2229720eebd6ec3'}, {'Id': 'US-TCE-10-65238', 'MessageId': '09fbafe9-1503-4174-b448-91637e02b0ad', 'MD5OfMessageBody': 'cd1653b6affe89f6158be352728b7526'}], 'ResponseMetadata': {'RequestId': 'c77b6153-9323-5fd1-9c48-3bf34b852df8', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c77b6153-9323-5fd1-9c48-3bf34b852df8', 'date': 'Tue, 08 Mar 2022 04:07:02 GMT', 'content-type': 'text/xml', 'content-length': '2380'}, 'RetryAttempts': 0}} 
process_id: 73654
 response: {'Successful': [{'Id': 'GB-SMU-17-30437', 'MessageId': 'd5da5a98-14fd-452f-b07f-1671760b2425', 'MD5OfMessageBody': '610f7b2de912c6c18a85327d650f540a'}, {'Id': 'GB-SMU-14-74568', 'MessageId': '80b8ecfd-cff6-41af-bae5-1a38f9f6440c', 'MD5OfMessageBody': '7168727cbf8d7bb211c2d698aa9d9c06'}, {'Id': 'GB-SMU-26-85497', 'MessageId': 'aa1a5c13-4f7a-40c6-9226-49dc28d7c69d', 'MD5OfMessageBody': 'f19545debd4065cc6bb99f9718c1fa11'}, {'Id': 'GB-SMU-37-46262', 'MessageId': '04b5da7f-e745-4f20-96c4-20905938300d', 'MD5OfMessageBody': 'e42add38b3bbea983e1104f3683e4d9c'}, {'Id': 'GB-SMU-24-09078', 'MessageId': 'd12324f3-17d8-4875-9c4b-fece0d7aadd9', 'MD5OfMessageBody': '8cc84ffefbf94a46c2b50e00a8493094'}, {'Id': 'GB-SMU-15-93219', 'MessageId': '89377f2e-8192-4380-9922-069572a90ca7', 'MD5OfMessageBody': '051c1247367e2fd2d0fd1ef022ee2bb7'}, {'Id': 'GB-2AF-09-00034', 'MessageId': '08c1515c-70b5-4e03-a576-caeb071075e0', 'MD5OfMessageBody': '1e2b891917e6b9515b84c4e2ffa97c7e'}, {'Id': 'US-TC9-09-79216', 'MessageId': '3543f0a4-238c-4f68-98bc-fd133506d41e', 'MD5OfMessageBody': 'd9ed6780655c5ad8241f95af4fcd26ce'}, {'Id': 'GB-SMU-26-29739', 'MessageId': '48408074-0810-485f-ab3d-a19f99578726', 'MD5OfMessageBody': 'a589e4eab3b0bad0f2229720eebd6ec3'}, {'Id': 'US-TCE-10-65238', 'MessageId': 'c74cd7f3-a60f-426d-b932-bbb3d339e37e', 'MD5OfMessageBody': 'cd1653b6affe89f6158be352728b7526'}], 'ResponseMetadata': {'RequestId': '0d3b44a8-7df3-5606-a5ac-1e08a5553108', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '0d3b44a8-7df3-5606-a5ac-1e08a5553108', 'date': 'Tue, 08 Mar 2022 04:07:02 GMT', 'content-type': 'text/xml', 'content-length': '2380'}, 'RetryAttempts': 0}} 
process_id: 73652
 response: {'Successful': [{'Id': 'GB-SMU-17-30437', 'MessageId': '71b4f7c8-35a4-4cdd-806a-6a5afe243935', 'MD5OfMessageBody': '610f7b2de912c6c18a85327d650f540a'}, {'Id': 'GB-SMU-14-74568', 'MessageId': '25b9e376-86cf-4474-8bdf-aa16dd8f7afb', 'MD5OfMessageBody': '7168727cbf8d7bb211c2d698aa9d9c06'}, {'Id': 'GB-SMU-26-85497', 'MessageId': 'd1085a35-acd7-4618-b0cf-d2d118a0eb6b', 'MD5OfMessageBody': 'f19545debd4065cc6bb99f9718c1fa11'}, {'Id': 'GB-SMU-37-46262', 'MessageId': '882dc77f-0c0e-48e6-96e2-482bdd01801e', 'MD5OfMessageBody': 'e42add38b3bbea983e1104f3683e4d9c'}, {'Id': 'GB-SMU-24-09078', 'MessageId': '918534a4-fe4f-4262-9f88-386d3d974626', 'MD5OfMessageBody': '8cc84ffefbf94a46c2b50e00a8493094'}, {'Id': 'GB-SMU-15-93219', 'MessageId': '7851b883-dc90-4be5-85a0-d9688f2f33ac', 'MD5OfMessageBody': '051c1247367e2fd2d0fd1ef022ee2bb7'}, {'Id': 'GB-2AF-09-00034', 'MessageId': '73c2bfd7-870e-4991-84c4-13008afbfe6e', 'MD5OfMessageBody': '1e2b891917e6b9515b84c4e2ffa97c7e'}, {'Id': 'US-TC9-09-79216', 'MessageId': '52f75546-830b-47b8-b1d3-709d0d2e51ff', 'MD5OfMessageBody': 'd9ed6780655c5ad8241f95af4fcd26ce'}, {'Id': 'GB-SMU-26-29739', 'MessageId': '2a347ac7-21e4-487f-9828-769fef3b5976', 'MD5OfMessageBody': 'a589e4eab3b0bad0f2229720eebd6ec3'}, {'Id': 'US-TCE-10-65238', 'MessageId': 'f69f4bf7-654f-4316-98be-b19255212021', 'MD5OfMessageBody': 'cd1653b6affe89f6158be352728b7526'}], 'ResponseMetadata': {'RequestId': '4e64c606-f406-5410-8850-9ca62ca0e544', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '4e64c606-f406-5410-8850-9ca62ca0e544', 'date': 'Tue, 08 Mar 2022 04:07:02 GMT', 'content-type': 'text/xml', 'content-length': '2380'}, 'RetryAttempts': 0}}
tigertiger
  • 53
  • 7

0 Answers0