This script is a tool for monitoring task progress on the Runninghub platform. Through the script, you can conveniently initiate tasks, obtain real-time task progress (including overall progress, current execution node, and progress within the node), and get result information after the task is completed.
Open the terminal (command prompt / CMD) and execute the following command to install dependencies:
pip install requests websocket-client
Before use, you need to configure the key parameters in the tool (located at the head of the code file). These parameters are used to interact with the Runninghub platform and must be filled in correctly.
| Parameter Name | Description |
|---|---|
API_KEY | The key for authentication, a credential for the Runninghub platform to identify the user |
WORKFLOW_ID | The ComfyUI workflow ID you want to run, specifying the workflow to be executed |
How to obtain API_KEY:
How to obtain WORKFLOW_ID: (This example takes the workflow on the homepage as an example)
https://www.runninghub.ai/post/1979026379880697857 in the address bar above: 1979026379880697857 is the WORKFLOW_ID (workflow ID).Open the downloaded script file (attached at the end of the manual) with a text editor (such as VS Code, Notepad, etc.)
Find the configuration area at the head of the code:
API_KEY = "****************************" # Replace with your own API_KEY
WORKFLOW_ID = "*****************" # Fill in the workflow ID
Replace the values of API_KEY and WORKFLOW_ID with your actual information (delete the original sample values and fill in your key and workflow ID)
If you do not need to customize task parameters (use the default workflow configuration), you can run it directly:
Copy the code from the complete example at the end and save it locally as xxxx.py
Open the terminal and navigate to the directory where the file is located
Execute the command to start the tool:
python xxxx.py
If you need to pass custom parameters to the workflow (such as input images, text, etc.), you need to modify the node_info_list parameter:
Find the definition of node_info_list in the main function of the code:
# 1. Initiate task + get WSS
# node_info_list is empty by default
node_info_list = '[]'
Fill in the custom parameters according to the workflow parameter format of the Runninghub platform (must be in JSON string format). For example:
node_info_list = '[{"nodeId": "6","fieldName": "text", "fieldValue": "Depth of field, blurry street background,photo of a cyberpunk barbarian Battlecore woman with glowing opalescent third eye,bust, highly detailed glowing Elvish runes tattooed to the irises, glowing Elvish, Stealth Skin, runes on cheeks, runes on jaw line, runes on ears, runes on forehead,", "description": "text"}]'
(Note: The specific parameter format needs to refer to the requirements of your workflow nodes, which can be viewed on the Runninghub workflow editing page for node parameter descriptions)
After saving the file, run the tool according to the "Basic Usage" steps
During the operation of the tool, the following operations will be automatically completed, and you can understand the real-time status through the terminal output:
taskId (unique task identifier)The tool will automatically obtain the mapping relationship between the IDs and names of all nodes in the workflow, which is used to display node names in the progress (instead of obscure IDs). The output is similar to:
✅ Node mapping obtained successfully (7 nodes in total), which are {'3': 'KSampler', '4': 'CheckpointLoaderSimple', '5': 'EmptyLatentImage', '6': 'CLIPTextEncode', '7': 'CLIPTextEncode', '8': 'VAEDecode', '9': 'SaveImage'}
Real-time progress information is received through WebSocket, and the terminal will dynamically output the following content:
Overall progress: 30.0% (3/10))Current node: Upscale)(50.0%), indicating that the node is half completed)📌 Node Resize (completed from cache) (indicating that the node uses cached results and does not need to be re-executed)🎉 Task completed! (monitoring will be automatically closed at this time)| Output Example | Meaning |
|---|---|
✅ Task initiated successfully, taskId: 123456 | The task has been successfully submitted to the Runninghub platform, and taskId is the unique identifier of the task |
🔍 3rd poll... | Polling the platform to obtain the WSS link (up to 30 polls, 5 seconds apart each time) |
✅ WSS obtained successfully: wss://xxx | The real-time monitoring link has been obtained, and progress monitoring will start soon |
📊 Progress monitoring started... | The WebSocket connection has been established, and progress information will be received |
📈 Overall progress: 50.0% (5/10) - Current node: Upscale (75.0%) | Overall progress is 50% (5 out of 10 nodes completed), currently executing the "Upscale" node, which is 75% completed |
📌 Node LoadImage (completed from cache) | The "LoadImage" node uses cached results and is marked as completed |
🎉 Task completed! | All nodes have been executed, and the task is ended |
| Problem Phenomenon | Possible Causes and Solutions |
|---|---|
| Task initiation fails, prompting "Task initiation failed: {...}" | 1. Incorrect API_KEY: Check if the API key is correct (note whether there are spaces or case errors) 2. Incorrect WORKFLOW_ID: Confirm whether the workflow ID exists and belongs to the user corresponding to the current API key |
| Polling timeout, prompting "❌ Polling timeout, WSS not obtained" | 1. The platform processes tasks too slowly: You can try to restart the tool 2. Network issues: Check if the network connection is normal and if the Runninghub platform is accessible 3. Long queuing time: When initiating a task on Runninghub, it may need to queue all the time. The WebSocket link can only be obtained when the task is successfully queued and in running state. |
| The task generation result is output directly without task progress display | The polling interval of this script for RunningHub is 5 seconds, which means the task is completed from queuing to execution within 5 seconds, so the next poll will return the result directly. |
| The progress displays node IDs instead of names | The node mapping acquisition failed, which can be ignored (does not affect task execution), or restart the tool to obtain the mapping |
API_KEY safe and avoid disclosing it to others (which may lead to others abusing your platform resources)node_info_list, ensure the JSON format is correct (you can use an online JSON verification tool for verification)import requests
import json
import time
from websocket import WebSocketApp
from typing import Dict, Set, Optional
# Configuration
API_KEY = "****************************" # Replace with your own API_KEY
WORKFLOW_ID = "*****************" # Fill in the workflow ID
HEADERS = {"Host": "www.runninghub.cn", "Content-Type": "application/json"}
URLS = {
"create_task": "https://www.runninghub.cn/task/openapi/create",
"get_outputs": "https://www.runninghub.cn/task/openapi/outputs",
"get_nodes": "https://www.runninghub.cn/api/openapi/getJsonApiFormat"
}
def get_task_results(task_id: str, max_retries: int = 3):
"""Get task results with retry mechanism"""
for attempt in range(max_retries):
try:
resp = requests.post(
URLS["get_outputs"],
headers=HEADERS,
data=json.dumps({"apiKey": API_KEY, "taskId": task_id})
)
resp.raise_for_status()
data = resp.json().get("data")
if isinstance(data, list) and data:
print("\n📄 Generation results:")
for i, item in enumerate(data, 1):
print(f" Result {i}: Type={item.get('fileType', 'unknown')}, URL={item.get('fileUrl', 'unknown')}")
return True
# Retry after waiting
if attempt < max_retries - 1:
print("⏳ Results not ready, retrying after 5 seconds...")
time.sleep(5)
except Exception as e:
print(f"❌ Exception when getting results: {str(e)}")
if attempt < max_retries - 1:
print("⏳ Retrying after 5 seconds...")
time.sleep(5)
print("❌ All retry attempts failed, no generation results found")
return False
class ProgressInfo:
def __init__(self):
self.completed_nodes: Set[str] = set()
self.current_node: str = ""
self.current_node_name: str = ""
self.current_progress: int = 0
self.current_max: int = 0
self.id_to_class_type: Optional[Dict[str, str]] = None
def set_id_to_class_type(self, id_map: Dict[str, str]):
self.id_to_class_type = id_map
def set_current_node(self, node_id: str):
self.current_node = node_id
self.current_node_name = self.id_to_class_type.get(node_id, "") if (self.id_to_class_type and node_id) else ""
def add_completed_node(self, node_id: str) -> bool:
if node_id not in self.completed_nodes:
self.completed_nodes.add(node_id)
return True
return False
def clear_current_node(self):
self.current_node = self.current_node_name = ""
self.current_progress = self.current_max = 0
def get_overall_percent(self) -> float:
total = len(self.id_to_class_type) if self.id_to_class_type else 0
return (len(self.completed_nodes) / total) * 100 if total > 0 else 0.0
def get_current_node_percent(self) -> float:
return (self.current_progress / self.current_max) * 100 if self.current_max > 0 else 0.0
def __str__(self):
base = f"Overall progress: {self.get_overall_percent():.1f}% ({len(self.completed_nodes)}/{len(self.id_to_class_type) if self.id_to_class_type else 0})"
if self.current_node_name:
base += f" - Current node: {self.current_node_name}"
if self.current_max > 0:
base += f" ({self.get_current_node_percent():.1f}%)"
return base
def create_task_and_get_wss(node_info_list: str) -> tuple:
"""Initiate ComfyUI task and poll to get WSS link and taskId"""
node_info_list = node_info_list.strip()
try:
resp = requests.post(
URLS["create_task"],
headers=HEADERS,
data=json.dumps({
"apiKey": API_KEY,
"workflowId": WORKFLOW_ID,
"nodeInfoList": json.loads(node_info_list)
})
)
resp.raise_for_status()
task_id = resp.json().get("data", {}).get("taskId")
if not task_id:
print(f"Task initiation failed: {resp.json()}")
return None, None
print(f"✅ Task initiated successfully, taskId: {task_id}")
except Exception as e:
print(f"Task initiation exception: {str(e)}")
return None, None
# Poll to get WSS
for poll_cnt in range(1, 31):
print(f"🔍 {poll_cnt}th poll to get WSS...")
time.sleep(5)
try:
resp = requests.post(
URLS["get_outputs"],
headers=HEADERS,
data=json.dumps({"apiKey": API_KEY, "taskId": task_id})
)
resp.raise_for_status()
data = resp.json().get("data")
# Return results directly
if isinstance(data, list):
print("\n🎉 Task completed, getting results directly:")
if data:
for i, item in enumerate(data, 1):
print(f" Result {i}: Type={item.get('fileType', 'unknown')}, URL={item.get('fileUrl', 'unknown')}")
return None, task_id
# Extract WSS
if isinstance(data, dict):
wss_url = data.get("netWssUrl")
if wss_url:
print(f"✅ WSS obtained successfully: {wss_url}")
return wss_url, task_id
print("WSS not ready, continuing to wait...")
except Exception as e:
print(f"Polling exception: {str(e)}")
print("❌ Polling timeout, WSS not obtained")
return None, task_id
def get_node_id_name_map() -> Dict[str, str]:
"""Get node ID → name mapping"""
try:
resp = requests.post(
URLS["get_nodes"],
headers=HEADERS,
data=json.dumps({"apiKey": API_KEY, "workflowId": WORKFLOW_ID})
)
resp.raise_for_status()
prompt_json = resp.json().get("data", {}).get("prompt", "{}")
id_name_map = {
node_id: info.get("class_type", node_id)
for node_id, info in json.loads(prompt_json).items()
}
print(f"✅ Node mapping obtained successfully ({len(id_name_map)} nodes in total), which are {id_name_map}")
return id_name_map
except Exception as e:
print(f"Node mapping acquisition exception: {str(e)}")
return {}
class TaskProgressMonitor:
def __init__(self, task_id: str):
self.progress = ProgressInfo()
self.tmp_node: Optional[str] = None
self.task_id = task_id
def start_monitor(self, wss_url: str, node_id_name_map: Dict[str, str]):
self.progress.set_id_to_class_type(node_id_name_map)
def on_open(ws):
print("\n📊 Progress monitoring started...")
def on_message(ws, msg):
try:
data = json.loads(msg)
msg_type = data.get("type", "")
if msg_type == "progress":
d = data.get("data", {})
self.progress.set_current_node(d.get("node", ""))
self.progress.current_progress = d.get("value", 0)
self.progress.current_max = d.get("max", 0)
print(f"📈 {self.progress}")
elif msg_type == "executing":
if self.tmp_node:
self.progress.add_completed_node(self.tmp_node)
self.progress.clear_current_node()
self.tmp_node = data.get("data", {}).get("node", "")
if self.tmp_node:
self.progress.set_current_node(self.tmp_node)
print(f"📈 {self.progress}")
elif msg_type == "execution_cached":
for node_id in data.get("data", {}).get("nodes", []):
if node_id and self.progress.add_completed_node(node_id):
print(f"📌 Node {node_id_name_map.get(node_id, node_id)} (completed from cache)")
print(f"📈 {self.progress}")
elif msg_type == "execution_success":
if self.tmp_node:
self.progress.add_completed_node(self.tmp_node)
print(f"📈 {self.progress}\n🎉 Task execution completed, getting generation results...")
if get_task_results(self.task_id):
print("✅ All results obtained!")
ws.close()
except Exception as e:
print(f"❌ Message processing exception: {str(e)}")
def on_close(ws, code, reason):
reason_str = reason.decode() if isinstance(reason, bytes) else reason
print(f"\n🔌 Monitoring closed: {reason_str} (status code: {code})")
def on_error(ws, err):
print(f"❌ Monitoring error: {str(err)}")
WebSocketApp(wss_url, on_open=on_open, on_message=on_message, on_close=on_close,
on_error=on_error).run_forever()
def main():
# Initiate task
node_info_list = '[]'
wss_url, task_id = create_task_and_get_wss(node_info_list)
# If results are returned directly, no need to monitor
if not wss_url:
return
# Get node mapping and start monitoring
node_map = get_node_id_name_map()
print("\n=== Starting progress monitoring ===")
TaskProgressMonitor(task_id).start_monitor(wss_url, node_map)
if __name__ == "__main__":
main()