scheduler.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. # @description:
  2. # @author: licanglong
  3. # @date: 2025/11/12 15:38
  4. import json
  5. import time
  6. import paho.mqtt.client as mqtt
  7. broker = "117.72.147.109"
  8. port = 18830
  9. client_id = "scheduler"
  10. online_workers = {}
  11. def on_message(client, userdata, msg):
  12. data = json.loads(msg.payload.decode())
  13. worker_id = data["client_id"]
  14. status = data.get("status", "unknown")
  15. if status == "online":
  16. online_workers[worker_id] = time.time()
  17. elif status == "offline":
  18. online_workers.pop(worker_id, None)
  19. def send_task(client, task_data):
  20. now = time.time()
  21. active_workers = [w for w, t in online_workers.items() if now - t < 30]
  22. if not active_workers:
  23. print("暂无在线worker,等待中...")
  24. return
  25. worker = active_workers[0]
  26. topic = f"task/{worker}"
  27. client.publish(topic, json.dumps(task_data))
  28. print(f"任务已发送给 {worker}: {task_data}")
  29. # ✅ 新写法:兼容 paho-mqtt 2.x
  30. client = mqtt.Client(client_id=client_id, callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
  31. client.on_message = on_message
  32. client.connect(broker, port, 60)
  33. client.subscribe("worker/status/+")
  34. client.loop_start()
  35. while True:
  36. send_task(client, {"task_id": int(time.time()), "action": "compute"})
  37. time.sleep(10)