| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- # @description:
- # @author: licanglong
- # @date: 2025/11/12 15:38
- import json
- import time
- import paho.mqtt.client as mqtt
- broker = "117.72.147.109"
- port = 18830
- client_id = "scheduler"
- online_workers = {}
- def on_message(client, userdata, msg):
- data = json.loads(msg.payload.decode())
- worker_id = data["client_id"]
- status = data.get("status", "unknown")
- if status == "online":
- online_workers[worker_id] = time.time()
- elif status == "offline":
- online_workers.pop(worker_id, None)
- def send_task(client, task_data):
- now = time.time()
- active_workers = [w for w, t in online_workers.items() if now - t < 30]
- if not active_workers:
- print("暂无在线worker,等待中...")
- return
- worker = active_workers[0]
- topic = f"task/{worker}"
- client.publish(topic, json.dumps(task_data))
- print(f"任务已发送给 {worker}: {task_data}")
- # ✅ 新写法:兼容 paho-mqtt 2.x
- client = mqtt.Client(client_id=client_id, callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
- client.on_message = on_message
- client.connect(broker, port, 60)
- client.subscribe("worker/status/+")
- client.loop_start()
- while True:
- send_task(client, {"task_id": int(time.time()), "action": "compute"})
- time.sleep(10)
|