worker.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. # @description:
  2. # @author: licanglong
  3. # @date: 2025/11/12 15:38
  4. import json
  5. import time
  6. import paho.mqtt.client as mqtt
  7. broker = "117.72.147.109"
  8. port = 18830
  9. client_id = "worker-01"
  10. client = mqtt.Client(client_id=client_id, callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
  11. client.will_set(f"worker/status/{client_id}",
  12. payload=json.dumps({"status": "offline", "client_id": client_id}),
  13. retain=True)
  14. def on_message(client, userdata, msg):
  15. data = json.loads(msg.payload.decode())
  16. print(f"收到任务: {data}")
  17. # 模拟任务执行
  18. time.sleep(3)
  19. client.publish("task/ack", json.dumps({"task_id": data["task_id"], "worker": client_id, "result": "done"}))
  20. client.on_message = on_message
  21. client.connect(broker, port, 60)
  22. client.subscribe(f"task/{client_id}")
  23. client.loop_start()
  24. # 定时发送心跳
  25. while True:
  26. client.publish(f"worker/status/{client_id}",
  27. json.dumps({"status": "online", "client_id": client_id, "timestamp": int(time.time())}),
  28. retain=True)
  29. time.sleep(10)