I need to acquire samples from two systems:
- Video capturer (for example, OpenCV)
- Electromagnetic tracking system.
The samples must be synchronized, the sampling time must be as stable as possible, and the total capture time is limited.
For this, I have developed two functions (update, update2), and I have tried to execute them using threads, taking as a starting point one of the examples shown in Run certain code every n seconds.
Code:
from threading import Timer, Thread, Event
from threading import Timer, Thread, Event
import polhemus # Package for Tracker system
import cv2, time
import numpy as np
import matplotlib.pyplot as plt
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.thread = None
def _handle_target(self):
self.is_running = True
self.target()
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(self.seconds, self._handle_target)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print("Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
src=0 # Computer camera
track = polhemus.polhemus() # Tracker system
check =track.Initialize()
capture = cv2.VideoCapture(src) # Video capture system
capture.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
capture.read()
frames=[]
pos=[]
def update(): # Function for image data capturing
t11=time.time()
(_, frame) = capture.read() # get one sample from OpenCV
t21=time.time()
frames.append((frame,[t11,t21])) # save sample
def update2(): #Function for pose data capturing
if check:
t1=time.time()
track.Run() # get one sample from Tracker
t2=time.time()
pose=[track.PositionTooltipX1, track.PositionTooltipY1, track.PositionTooltipZ1,
track.AngleX1, track.AngleY1, track.AngleZ1]
pos.append((pose,[t1,t2])) # save sample
else:
print('Tracker not connected')
def mainUpdate(): # main function to sample the two systems at time
Thread(target=update).start()
Thread(target=update2).start()
print('Recording starts at',time.time())
tObject=InfiniteTimer(0.04,mainUpdate) # Execute mainUpdate every 0.04 seconds (25 samples per second)
tObject.start()
time.sleep(5) # Total Capture time
tObject.cancel()
print('Recording ends at', time.time())
Results
I set a sample period of 0.04 seconds and a total capture time of 5 seconds; this should generate approximately 25 samples per second, that is, 125 samples during the entire capture.
However, I only obtained 102 samples with a sampling period that varies between 0.045 and 0.065 seconds.
Is there a way to sample both systems synchronously with a stable sampling period?
Thanks in advance.