|
|
@@ -0,0 +1,49 @@
|
|
|
+# @description:
|
|
|
+# @author: licanglong
|
|
|
+# @date: 2025/11/12 15:38
|
|
|
+import json
|
|
|
+import time
|
|
|
+
|
|
|
+import paho.mqtt.client as mqtt
|
|
|
+
|
|
|
+broker = "117.72.147.109"
|
|
|
+port = 18830
|
|
|
+client_id = "scheduler"
|
|
|
+
|
|
|
+online_workers = {}
|
|
|
+
|
|
|
+
|
|
|
+def on_message(client, userdata, msg):
|
|
|
+ data = json.loads(msg.payload.decode())
|
|
|
+ worker_id = data["client_id"]
|
|
|
+ status = data.get("status", "unknown")
|
|
|
+
|
|
|
+ if status == "online":
|
|
|
+ online_workers[worker_id] = time.time()
|
|
|
+ elif status == "offline":
|
|
|
+ online_workers.pop(worker_id, None)
|
|
|
+
|
|
|
+
|
|
|
+def send_task(client, task_data):
|
|
|
+ now = time.time()
|
|
|
+ active_workers = [w for w, t in online_workers.items() if now - t < 30]
|
|
|
+ if not active_workers:
|
|
|
+ print("暂无在线worker,等待中...")
|
|
|
+ return
|
|
|
+ worker = active_workers[0]
|
|
|
+ topic = f"task/{worker}"
|
|
|
+ client.publish(topic, json.dumps(task_data))
|
|
|
+ print(f"任务已发送给 {worker}: {task_data}")
|
|
|
+
|
|
|
+
|
|
|
+# ✅ 新写法:兼容 paho-mqtt 2.x
|
|
|
+client = mqtt.Client(client_id=client_id, callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
|
|
|
+client.on_message = on_message
|
|
|
+client.connect(broker, port, 60)
|
|
|
+
|
|
|
+client.subscribe("worker/status/+")
|
|
|
+client.loop_start()
|
|
|
+
|
|
|
+while True:
|
|
|
+ send_task(client, {"task_id": int(time.time()), "action": "compute"})
|
|
|
+ time.sleep(10)
|