Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT Refactoring #2020

Draft
wants to merge 3 commits into
base: dev/v0.7.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More refactoring
  • Loading branch information
alaydshah committed Apr 5, 2024
commit 74226bfa7216673f4ccee4eeff742fa12336fba8
52 changes: 52 additions & 0 deletions python/fedml/computing/scheduler/comm_utils/mqtt_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MqttTopics:
# Run Topics
__run_client_mlops_status = "fl_run/fl_client/mlops/status"
__run_server_mlops_status = "fl_run/fl_server/mlops/status"
__client_mlops_job_cost = "ml_client/mlops/job_computing_cost"

# ============== Server -> MLOps ==============

Expand All @@ -62,12 +63,27 @@ class MqttTopics:
__server_mlops_deploy_progress = "fl_server/mlops/deploy_progress_and_eval"
__model_serving_mlops_llm_input_output_record = "model_serving/mlops/llm_input_output_record"

# ============== Launch -> MLOps ==============
__launch_mlops_artifacts = "launch_device/mlops/artifacts"
__launch_mlops_release_gpu_ids = "launch_device/mlops/release_gpu_ids"
__launch_mlops_sync_deploy_ids = "launch_device/mlops/sync_deploy_ids"

# ============== Deploy -> MLOps ==============
__deploy_mlops_status = "model_ops/model_device/return_deployment_status"
__compute_mlops_endpoint = "compute/mlops/endpoint"

# ========= System Monitoring Topics ==========
__client_mlops_system_performance = "fl_client/mlops/system_performance"
__client_mlops_gpu_device_info = "ml_client/mlops/gpu_device_info"


# TODO (alaydshah): Make sure these aren't used anywhere, and clean them up
# ============== Deprecated ==============

__server_run_exception = "flserver_agent/{run_id}/client_exit_train_with_exception"
__server_mlops_status = "fl_server/mlops/status"
__client_mlops_training_metrics = "fl_client/mlops/training_metrics"
__mlops_runtime_logs_run = "mlops/runtime_logs/{run_id}"

@classmethod
def server_client_start_train(cls, client_id: Union[int, str]):
Expand Down Expand Up @@ -176,3 +192,39 @@ def server_mlops_training_model_net(cls):
@classmethod
def model_serving_mlops_llm_input_output_record(cls):
return cls.__model_serving_mlops_llm_input_output_record

@classmethod
def client_mlops_job_cost(cls):
return cls.__client_mlops_job_cost

@classmethod
def mlops_runtime_logs_run(cls, run_id: Union[int, str]):
return cls.__mlops_runtime_logs_run.format(run_id=run_id)

@classmethod
def launch_mlops_artifacts(cls):
return cls.__launch_mlops_artifacts

@classmethod
def deploy_mlops_status(cls):
return cls.__deploy_mlops_status

@classmethod
def client_mlops_system_performance(cls):
return cls.__client_mlops_system_performance

@classmethod
def client_mlops_gpu_device_info(cls):
return cls.__client_mlops_gpu_device_info

@classmethod
def compute_mlops_endpoint(cls):
return cls.__compute_mlops_endpoint

@classmethod
def launch_mlops_release_gpu_ids(cls):
return cls.__launch_mlops_release_gpu_ids

@classmethod
def launch_mlops_sync_deploy_ids(cls):
return cls.__launch_mlops_sync_deploy_ids
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

def test_mqtt_topics():
edge_id = 1
end_point_id = 10
run_id = 100
model_device_client_edge_id_list = [1, 2, 3]

Expand Down Expand Up @@ -90,3 +91,29 @@ def test_mqtt_topics():
topic_model_serving_mlops_llm_input_output = "model_serving/mlops/llm_input_output_record"
assert MqttTopics.model_serving_mlops_llm_input_output_record() == topic_model_serving_mlops_llm_input_output

topic_client_mlops_job_cost = "ml_client/mlops/job_computing_cost"
assert MqttTopics.client_mlops_job_cost() == topic_client_mlops_job_cost

topic_mlops_runtime_logs_run = "mlops/runtime_logs/" + str(run_id)
assert MqttTopics.mlops_runtime_logs_run(run_id=run_id) == topic_mlops_runtime_logs_run

topic_launch_mlops_artifacts = "launch_device/mlops/artifacts"
assert MqttTopics.launch_mlops_artifacts() == topic_launch_mlops_artifacts

deployment_status_topic_prefix = "model_ops/model_device/return_deployment_status"
assert MqttTopics.deploy_mlops_status() == deployment_status_topic_prefix

topic_client_mlops_system_performance = "fl_client/mlops/system_performance"
assert MqttTopics.client_mlops_system_performance() == topic_client_mlops_system_performance

topic_client_mlops_gpu_device_info = "ml_client/mlops/gpu_device_info"
assert MqttTopics.client_mlops_gpu_device_info() == topic_client_mlops_gpu_device_info

topic_compute_mlops_endpoint = "compute/mlops/endpoint"
assert MqttTopics.compute_mlops_endpoint() == topic_compute_mlops_endpoint

topic_launch_mlops_release_gpu_ids = "launch_device/mlops/release_gpu_ids"
assert MqttTopics.launch_mlops_release_gpu_ids() == topic_launch_mlops_release_gpu_ids

topic_launch_mlops_sync_deploy_ids = "launch_device/mlops/sync_deploy_ids"
assert MqttTopics.launch_mlops_sync_deploy_ids() == topic_launch_mlops_sync_deploy_ids
15 changes: 8 additions & 7 deletions python/fedml/computing/scheduler/master/server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import fedml
from ..comm_utils.job_cleanup import JobCleanup
from ..comm_utils.mqtt_topics import MqttTopics
from ..scheduler_core.scheduler_matcher import SchedulerMatcher
from ..comm_utils.constants import SchedulerConstants
from ..comm_utils.job_utils import JobRunnerUtils
Expand Down Expand Up @@ -1280,7 +1281,7 @@ def detect_edges_status(
return True, active_edge_info_dict, inactivate_edges

def send_status_check_msg(self, run_id, edge_id, server_id, context=None):
topic_get_model_device_id = "server/client/request_device_info/" + str(edge_id)
topic_get_model_device_id = MqttTopics.server_client_request_device_info(client_id=edge_id)
payload = {"server_id": server_id, "run_id": run_id}
if context is not None:
payload["context"] = context
Expand Down Expand Up @@ -1374,14 +1375,14 @@ def send_training_request_to_edges(self, active_edge_info_dict=None):
"machine_id": edge_id_item, "endpoint_gpu_count": gpu_num,
"master_deploy_id": edge_info.get("master_device_id", 0),
"slave_deploy_id": edge_info.get("slave_device_id", 0)})
topic_name = f"compute/mlops/endpoint"
topic_name = MqttTopics.compute_mlops_endpoint()
endpoint_info_json = {"endpoint_id": endpoint_id, "endpoint_info": endpoint_info}
print(f"endpoint_info_json {endpoint_info_json}")
self.message_center.send_message(topic_name, json.dumps(endpoint_info_json))

client_rank = 1
for edge_id in edge_id_list:
topic_start_train = "flserver_agent/" + str(edge_id) + "/start_train"
topic_start_train = MqttTopics.server_client_start_train(client_id=edge_id)
logging.info("start_train: send topic " + topic_start_train + " to client...")
request_json = self.request_json
request_json["client_rank"] = client_rank
Expand Down Expand Up @@ -1409,7 +1410,7 @@ def setup_listeners_for_edge_status(self, run_id, edge_ids, server_id):
self.client_agent_active_list[f"{run_id}"][f"server"] = server_id
for edge_id in edge_ids:
self.client_agent_active_list[f"{run_id}"][f"{edge_id}"] = ServerConstants.MSG_MLOPS_SERVER_STATUS_IDLE
edge_status_topic = "fl_client/flclient_agent_" + str(edge_id) + "/status"
edge_status_topic = MqttTopics.client_client_agent_status(client_id=edge_id)
self.add_message_listener(edge_status_topic, self.callback_edge_status)
self.subscribe_msg(edge_status_topic)

Expand All @@ -1418,7 +1419,7 @@ def remove_listeners_for_edge_status(self, edge_ids=None):
edge_ids = self.request_json["edgeids"]

for edge_id in edge_ids:
edge_status_topic = "fl_client/flclient_agent_" + str(edge_id) + "/status"
edge_status_topic = MqttTopics.client_client_agent_status(client_id=edge_id)
self.unsubscribe_msg(edge_status_topic)

def setup_listener_for_run_metrics(self, run_id):
Expand Down Expand Up @@ -1857,12 +1858,12 @@ def send_training_stop_request_to_edges(
payload_obj = json.loads(payload)

for edge_id in edge_id_list:
topic_stop_train = "flserver_agent/" + str(edge_id) + "/stop_train"
topic_stop_train = MqttTopics.server_client_stop_train(client_id=edge_id)
logging.info("stop_train: send topic " + topic_stop_train)
self.message_center.send_message(topic_stop_train, json.dumps(payload_obj))

def send_training_stop_request_to_specific_edge(self, edge_id, payload):
topic_stop_train = "flserver_agent/" + str(edge_id) + "/stop_train"
topic_stop_train = MqttTopics.server_client_stop_train(client_id=edge_id)
logging.info("stop_train: send topic " + topic_stop_train)
self.message_center.send_message(topic_stop_train, payload)

Expand Down
7 changes: 3 additions & 4 deletions python/fedml/core/mlops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from fedml import constants
from fedml.computing.scheduler.comm_utils import sys_utils
from fedml.core.mlops.mlops_configs import MLOpsConfigs
from .mlops_constants import MLOpsConstants
from .mlops_metrics import MLOpsMetrics
from .mlops_profiler_event import MLOpsProfilerEvent
from .mlops_runtime_log import MLOpsRuntimeLog
Expand All @@ -27,6 +26,7 @@
from .system_stats import SysStats
from ..distributed.communication.mqtt.mqtt_manager import MqttManager
from ..distributed.communication.s3.remote_storage import S3Storage
from ...computing.scheduler.comm_utils.mqtt_topics import MqttTopics
from ...computing.scheduler.master.server_constants import ServerConstants
from ...computing.scheduler.master.server_runner import FedMLServerRunner
from ...computing.scheduler.slave.client_constants import ClientConstants
Expand Down Expand Up @@ -1444,7 +1444,7 @@ def release_resources(run_id, device_id):

payload = {"run_id": run_id, "device_id": device_id, "gpu_count": 0}
MLOpsStore.mlops_log_mqtt_mgr.send_message_json(
MLOpsConstants.MSG_TOPIC_LAUNCH_RELEASE_GPU_IDS, json.dumps(payload))
MqttTopics.launch_mlops_release_gpu_ids(), json.dumps(payload))


def sync_deploy_id(device_id, master_deploy_id, worker_deploy_id_list):
Expand All @@ -1454,5 +1454,4 @@ def sync_deploy_id(device_id, master_deploy_id, worker_deploy_id_list):

payload = {"device_id": device_id, "master_deploy_id": master_deploy_id, "worker_deploy_ids": worker_deploy_id_list}
MLOpsStore.mlops_log_mqtt_mgr.send_message_json(
MLOpsConstants.MSG_TOPIC_LAUNCH_SYNC_DEPLOY_IDS, json.dumps(payload))

MqttTopics.launch_mlops_sync_deploy_ids(), json.dumps(payload))
10 changes: 0 additions & 10 deletions python/fedml/core/mlops/mlops_constants.py

This file was deleted.

3 changes: 2 additions & 1 deletion python/fedml/core/mlops/mlops_device_perfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .mlops_utils import MLOpsUtils
from .system_stats import SysStats
from ...computing.scheduler.comm_utils.job_monitor import JobMonitor
from ...computing.scheduler.comm_utils.mqtt_topics import MqttTopics
from ...core.distributed.communication.mqtt.mqtt_manager import MqttManager

ROLE_DEVICE_INFO_REPORTER = 1
Expand Down Expand Up @@ -163,7 +164,7 @@ def report_gpu_device_info(edge_id, mqtt_mgr=None):
total_mem, free_mem, total_disk_size, free_disk_size, cup_utilization, cpu_cores, gpu_cores_total, \
gpu_cores_available, sent_bytes, recv_bytes, gpu_available_ids = sys_utils.get_sys_realtime_stats()

topic_name = "ml_client/mlops/gpu_device_info"
topic_name = MqttTopics.client_mlops_gpu_device_info()

# We should report realtime available gpu count to MLOps, not from local redis cache.
# Use gpu_available_ids from sys_utils.get_sys_realtime_stats()
Expand Down
3 changes: 2 additions & 1 deletion python/fedml/core/mlops/mlops_job_perfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from .mlops_utils import MLOpsUtils
from .system_stats import SysStats
from ...computing.scheduler.comm_utils.mqtt_topics import MqttTopics
from ...core.distributed.communication.mqtt.mqtt_manager import MqttManager


Expand Down Expand Up @@ -38,7 +39,7 @@ def report_system_metric(run_id, edge_id, metric_json=None,
if run_id_str == "0" or run_id_str == "":
return

topic_name = "fl_client/mlops/system_performance"
topic_name = MqttTopics.client_mlops_system_performance()
if metric_json is None:
if sys_stats_obj is None:
sys_stats_obj = SysStats(process_id=os.getpid())
Expand Down
8 changes: 4 additions & 4 deletions python/fedml/core/mlops/mlops_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def report_edge_job_computing_cost(self, job_id, edge_id,
"""
this is used for reporting the computing cost of a job running on an edge to MLOps
"""
topic_name = "ml_client/mlops/job_computing_cost"
topic_name = MqttTopics.client_mlops_job_cost()
duration = computing_ended_time - computing_started_time
if duration < 0:
duration = 0
Expand All @@ -367,7 +367,7 @@ def report_edge_job_computing_cost(self, job_id, edge_id,
def report_logs_updated(self, run_id):
# if not self.comm_sanity_check():
# return
topic_name = "mlops/runtime_logs/" + str(run_id)
topic_name = MqttTopics.mlops_runtime_logs_run(run_id=run_id)
msg = {"time": time.time()}
message_json = json.dumps(msg)
logging.info("report_logs_updated. message_json = %s" % message_json)
Expand All @@ -377,7 +377,7 @@ def report_artifact_info(self, job_id, edge_id, artifact_name, artifact_type,
artifact_local_path, artifact_url,
artifact_ext_info, artifact_desc,
timestamp):
topic_name = "launch_device/mlops/artifacts"
topic_name = MqttTopics.launch_mlops_artifacts()
artifact_info_json = {
"job_id": job_id,
"edge_id": edge_id,
Expand All @@ -394,7 +394,7 @@ def report_artifact_info(self, job_id, edge_id, artifact_name, artifact_type,

def report_endpoint_status(self, end_point_id, model_status, timestamp=None,
end_point_name="", model_name="", model_inference_url=""):
deployment_status_topic_prefix = "model_ops/model_device/return_deployment_status"
deployment_status_topic_prefix = MqttTopics.deploy_mlops_status()
deployment_status_topic = "{}/{}".format(deployment_status_topic_prefix, end_point_id)
time_param = time.time_ns() / 1000.0 if timestamp is None else timestamp
deployment_status_payload = {"end_point_id": end_point_id, "end_point_name": end_point_name,
Expand Down
Loading