Hi,
I am struggling with the connection of a signal from a threaded data acquisition to a handler method. The signal is emitted just fine, but the slot seems never triggered, no matter if it is within the same object or an external slot.
What am I doing wrong here?
import time
import numpy as np
from PySide6 import QtCore
class DataAcquisition(QtCore.QObject):
data_signal = QtCore.Signal(dict)
def __init__(self):
super().__init__()
self.running = False
self.channels = 100
self.verbose = True
## Connect the data signal to a handler method
## TODO: This does not work?!
self.data_signal.connect(self.handle_data)
## Create a thread manager to run the data acquisition loop
self.thread_manager = QtCore.QThreadPool()
def get_measurement(self):
"""Simulate data acquisition with a dummy counter."""
return np.random.randint(0, 100, size=self.channels)
def handle_data(self, data):
print(f"Received data: {data}")
def process_data(self, data):
"""Simulate data processing."""
return data
def acquisition_loop(self):
"""Acquire data in a loop and send processed data to GUI."""
self.running = True
while self.running:
try:
## Acquiring raw data
raw_data = self.get_measurement()
## Prepare data for processing
processed_data = self.process_data(raw_data)
## Emit the processed data as a signal
## REMARK: The data must be wrapped as a dictionary with a str key for PySide!
processed_data = {'processed_data' : processed_data}
if self.verbose:
print(f"Emitting data from DAQ thread {type(processed_data)}: {processed_data.keys()}...")
self.data_signal.emit(processed_data)
time.sleep(0.5)
except Exception as e:
print(f"Error in data acquisition: {e}")
self.running = False
def start(self):
"""Start the acquisition loop."""
if self.verbose:
print("Starting data acquisition...")
## Run the data processing and display in a separate thread
self.running = True
self.thread_manager.start(self.acquisition_loop)
def stop(self):
"""Stop the acquisition loop."""
if self.verbose:
print("Stopping data acquisition...")
self.running = False
while self.thread_manager.activeThreadCount() > 0:
time.sleep(0.1)
if __name__ == "__main__":
## Threaded data acquisition
daq = DataAcquisition()
## Connect the data signal to a handler method
## TODO: This does not work?!
class ReceiverClass():
def handle_data(self, data):
print(f"ReceiverClass received data: {type(data)}")
receiver = ReceiverClass()
daq.data_signal.connect(receiver.handle_data)
## Connect the data signal to a handler method
## TODO: This does not work?!
def handle_data(data):
print(f"Received data: {type(data)}")
daq.data_signal.connect(handle_data)
## Start data acquisition
daq.start()
## Run for a few seconds
time.sleep(3) # run for a few seconds
## Stop data acquisition
daq.stop()