Steve, thanks so much for the helpful information. I’ve lost a couple of days to unfruitful experiments with this so appreciate the guidance.
For my case, I’m developing a python extension with several different use-cases for asynchronous calls. I like the idea of writing these as python code with argparse allowing for direct calls or via CLI.
Here’s a minimal working example, where I use a very basic script “CLI.py” that process over certain amount of time and reports progress via stdout, then returns some data (as you do in SlicerParallelProcessing).
import argparse
import sys
import time
import json
#--- example CLI function
def exampleJob(numsec, update_interval, isCLI=False):
progress_values = []
# Progress reporting
start_time = time.time()
while True:
elapsed = time.time() - start_time
progress = min(1.0, elapsed / numsec)
progress_values.append(progress)
print(f'PROGRESS {progress:.2f}')
sys.stdout.flush() # Ensure progress output is flushed immediately
if progress >= 1.0:
break
time.sleep(update_interval)
# Mark completion and flush immediately
print('COMPLETE')
sys.stdout.flush()
# Prepare output data
output = {
'total_seconds': numsec,
'update_interval': update_interval,
'progress_values': progress_values
}
# Output JSON data after 'COMPLETE' to avoid confusion, and flush immediately
if isCLI:
json_output = json.dumps(output)
print(json_output)
sys.stdout.flush()
else:
return output
# CLI Python script
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CLI for long-running operation with progress updates.")
parser.add_argument('--numsec', type=int, required=True, help='Number of seconds to wait.')
parser.add_argument('--updateInterval', type=float, required=True, help='Interval at which to provide progress updates.')
args = parser.parse_args()
exampleJob(args.numsec, args.updateInterval, isCLI=True)
Then I have my UI code that calls this via QProcess. I use a convention to differentiate the progress output from the final data output at the end.
import os
import sys
import qt
import slicer
import vtk
import json
class AsyncCLITest:
def __init__(self):
# Create a simple UI
self.widget = qt.QWidget()
self.layout = qt.QVBoxLayout()
self.widget.setLayout(self.layout)
self.startButton = qt.QPushButton("Run CLI Process")
self.progressBar = qt.QProgressBar()
self.progressBar.setRange(0, 100)
self.progressBar.setValue(0)
self.layout.addWidget(self.startButton)
self.layout.addWidget(self.progressBar)
self.widget.show()
# Connect button to the start process method
self.startButton.clicked.connect(self.runCLIProcess)
# Set up the QProcess
self.process = qt.QProcess()
self.process.setProcessChannelMode(qt.QProcess.MergedChannels)
self.process.readyReadStandardOutput.connect(self.onUpdate)
self.process.finished.connect(self.onComplete)
self.process.readyReadStandardError.connect(self.onError)
# Buffer for accumulating process output
self.output_buffer = b""
def runCLIProcess(self):
"""Start the CLI script in a non-blocking way using QProcess."""
CLIpath = r"CLI.py"
python_executable = sys.executable # Path to the current Python executable
# Set arguments for the process
numsec = "10" # Duration of the process
update_interval = "0.5" # Update interval for progress
# Start the process and log the command for debugging
self.process.start(python_executable, [CLIpath, "--numsec", numsec, "--updateInterval", update_interval])
self.startButton.setEnabled(False)
self.progressBar.setValue(0)
def onUpdate(self):
"""Read and process the standard output from the QProcess."""
while self.process.canReadLine():
output_line = self.process.readLine().data().decode('utf-8', errors='ignore').strip()
print(f"Received output line: {output_line}") # Diagnostic: print each received line
if output_line.startswith("PROGRESS"):
progress_value = float(output_line.split()[1]) * 100
self.progressBar.setValue(int(progress_value))
else:
self.output_buffer += output_line.encode('utf-8') + b'\n'
def onError(self):
"""Read and print error messages from the QProcess."""
while self.process.canReadLine():
error_line = self.process.readLine().data().decode('utf-8', errors='ignore').strip()
print(f"Error output: {error_line}")
def onComplete(self):
"""Handle the process completion and read the final JSON data."""
self.startButton.setEnabled(True)
self.progressBar.setValue(100)
# Separate text data from binary
marker = self.output_buffer.find(b'COMPLETE')
if marker != -1:
# Skip past the 'COMPLETE' marker and any newlines
json_data_start = marker + len(b'COMPLETE')
json_data = self.output_buffer[json_data_start:].lstrip()
# Check if the JSON data is empty or malformed
if not json_data:
return
else:
return
# Deserialize JSON data
try:
output = json.loads(json_data.decode('utf-8'))
print("Received JSON output:")
print(json.dumps(output, indent=4))
except (json.JSONDecodeError, ValueError) as e:
print(f"Error deserializing JSON data: {e}")
# Create and show the UI
testApp = AsyncCLITest()
Does this look reasonable?
This is working for me after much noodling, but I’m open to recommendations to make more robust, or possibly abstract the call for many possible CLI scripts that use the same conventions.