# @description: # @author: licanglong # @date: 2025/11/12 15:38 import json import time import paho.mqtt.client as mqtt broker = "117.72.147.109" port = 18830 client_id = "scheduler" online_workers = {} def on_message(client, userdata, msg): data = json.loads(msg.payload.decode()) worker_id = data["client_id"] status = data.get("status", "unknown") if status == "online": online_workers[worker_id] = time.time() elif status == "offline": online_workers.pop(worker_id, None) def send_task(client, task_data): now = time.time() active_workers = [w for w, t in online_workers.items() if now - t < 30] if not active_workers: print("暂无在线worker,等待中...") return worker = active_workers[0] topic = f"task/{worker}" client.publish(topic, json.dumps(task_data)) print(f"任务已发送给 {worker}: {task_data}") # ✅ 新写法:兼容 paho-mqtt 2.x client = mqtt.Client(client_id=client_id, callback_api_version=mqtt.CallbackAPIVersion.VERSION2) client.on_message = on_message client.connect(broker, port, 60) client.subscribe("worker/status/+") client.loop_start() while True: send_task(client, {"task_id": int(time.time()), "action": "compute"}) time.sleep(10)