0

I need to acquire samples from two systems:

  1. Video capturer (for example, OpenCV)
  2. Electromagnetic tracking system.

The samples must be synchronized, the sampling time must be as stable as possible, and the total capture time is limited.

For this, I have developed two functions (update, update2), and I have tried to execute them using threads, taking as a starting point one of the examples shown in Run certain code every n seconds.

Code:

from threading import Timer, Thread, Event
from threading import Timer, Thread, Event
import polhemus # Package for Tracker system

import cv2, time
import numpy as np
import matplotlib.pyplot as plt

class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


src=0 # Computer camera
track = polhemus.polhemus() # Tracker system
check =track.Initialize()
capture = cv2.VideoCapture(src) # Video capture system
capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
capture.read()
frames=[]
pos=[]

def update(): # Function for image data capturing
    t11=time.time()                    
    (_, frame) = capture.read() # get one sample from OpenCV
    t21=time.time()
    frames.append((frame,[t11,t21])) # save sample
    
def update2(): #Function for pose data capturing
    if check: 
        t1=time.time()
        track.Run()   # get one sample from Tracker 
        t2=time.time()
        pose=[track.PositionTooltipX1, track.PositionTooltipY1, track.PositionTooltipZ1, 
              track.AngleX1, track.AngleY1, track.AngleZ1]
        pos.append((pose,[t1,t2])) # save sample
    else:
        print('Tracker not connected')

def mainUpdate(): # main function to sample the two systems at time
    Thread(target=update).start()
    Thread(target=update2).start()
    
print('Recording starts at',time.time())

tObject=InfiniteTimer(0.04,mainUpdate) # Execute mainUpdate every 0.04 seconds (25 samples per second)
tObject.start()
time.sleep(5)   # Total Capture time
tObject.cancel()

print('Recording ends at', time.time())

Results

I set a sample period of 0.04 seconds and a total capture time of 5 seconds; this should generate approximately 25 samples per second, that is, 125 samples during the entire capture.

However, I only obtained 102 samples with a sampling period that varies between 0.045 and 0.065 seconds.

Is there a way to sample both systems synchronously with a stable sampling period?

Thanks in advance.

0 Answers0