欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

FATE学习:跟着日志读源码(十一)upload 任务过程中产生的请求

程序员文章站 2022-07-14 13:31:25
...

综述

upload 任务的http请求,可以分为两部分。
一部分是submit -> job finish 这个流程中产生的。
另一部分是polling 的过程中,轮询产生的请求(只轮询后收集相关信息,不实际执行)

执行细节

以下的日志,是一个upload执行后,容器中完整的日志。

10.200.96.235 - - [26/Jul/2021 08:20:31] "POST /v1/party/202107260820309976351/local/0/create HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:31] "POST /v1/data/upload?%7B%22file%22:%20%22/data/projects/fate/examples/data/breast_hetero_guest.csv%22,%20%22head%22:%201,%20%22partition%22:%201,%20%22work_mode%22:%201,%20%22table_name%22:%20%22hetero_guest%22,%20%22namespace%22:%20%22cl%22,%20%22config%22:%20%22/data/projects/fate/cl/upload_guest.json%22,%20%22function%22:%20%22upload%22%7D HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/resource/apply HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/local/0/start HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/running HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:32] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/start HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:33] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:34] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:36] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:38] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:40] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:41] "POST /v1/party/202107260820309976351/local/0/update HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:42] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/output_data_info/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_data/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:43] "POST /v1/tracker/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/metric_meta/save HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:44] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:45] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/collect HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:45] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/report HTTP/1.1" 200 -
static conf path: /data/projects/fate/eggroll/conf/eggroll.properties
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/status/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/upload_0/202107260820309976351_upload_0/0/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/model HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/status/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:47] "POST /v1/party/202107260820309976351/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/clean HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/stop/success HTTP/1.1" 200 -
10.200.96.235 - - [26/Jul/2021 08:20:48] "POST /v1/party/202107260820309976351/local/0/clean HTTP/1.1" 200 -

其中endpoint为 collect 的是第二部分产生的日志,可以发现除最后一条记录外,时间间隔和轮询时间间隔是一致的。

其余则是第一部分的日志。

提交任务http 请求

按照不同的请求,依次说明其功能和函数调用链

  1. data/upload:
    提交job
    fate_flow_client.py -> fate_flow_server.py -> data_access_app_manager.py -> DAGScheduler.submit

  2. party/<job_id>//<party_id>/create:
    创建job (接上文)
    DAGScheduler.submit -> FederatedScheduler.create_job -> fate_flow_server.py -> party_app.py -> JobController.create_job

  3. party/<job_id>//<party_id>/resource/apply:
    为job申请资源
    DAGScheduler.schedule_waiting_jobs -> FederatedScheduler.resource_for_job -> fate_flow_server.py -> party_app.py -> ResourceManager.apply_for_job_resource

  4. party/<job_id>//<party_id>/start:
    start job
    DAGScheduler.schedule_waiting_jobs -> DAGScheduler.start_job -> FederatedScheduler.start_job(job=job) -> fate_flow_server.py -> party_app.py -> JobController.start_job

如下5-7 依次start task 并更新job状态
5. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/status/running:
更新task状态
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.start_task -> FederatedScheduler.sync_task_status -> fate_flow_server.py -> party_app.py -> JobController.update_job_status

  1. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/start:
    紧跟上述 FederatedScheduler.sync_task_status 操作之后,start task。
    DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.start_task -> FederatedScheduler.start_task -> fate_flow_server.py -> party_app.py -> TaskController.start_task

  2. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/report:
    反馈状态
    TaskExecutor.run_task -> TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task

如下8 -11都是是在upload task执行过程中产生的请求
8. party/<job_id>//<party_id>/update:
更新job状态
Upload.save_data_table -> ControllerClient.update_job -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> JobController.update_job

  1. tracker/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/output_data_info/save:
    保存output_data_info
    Upload.save_data_table -> TrackerClient.log_output_data_info -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.insert_output_data_info_into_db

  2. tracker/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/metric_data/save:
    保存指标
    Upload.save_data_table -> TrackerClient.log_metric_data -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.save_metric_data

  3. tracker/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/metric_meta/save:
    保存指标元数据
    Upload.save_data_table -> TrackerClient.log_metric_meta -> api_utils.local_api -> fate_flow_server.py -> tracker_app.py -> Tracker.save_metric_meta

回到 task_executor,在finally 中执行 ,更新task状态
12. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/report:
TaskExecutor.run_task -> TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task

TaskExecutor.run_task() 执行完毕,执行TaskExecutor.report_task_update_to_driver()
13. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>
report:TaskExecutor.report_task_update_to_driver -> ControllerClient.report_task -> api_utils.local_api -> fate_flow_server.py -> party_app.py -> TaskController.update_task

Tips:这一次schedule_running_job之后,已经是success 了
14. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/status/success:
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> FederatedScheduler.sync_task_status -> fate_flow_server.py -> party_app.py -> TaskController.update_task_status

Tips:因为已经success 进入下一环节
15. party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/stop/success:
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> FederatedScheduler.stop_task(接在上一步sync_task_status 之后) -> fate_flow_server.py -> party_app.py -> TaskController.stop_task

  1. party/<job_id>//<party_id>/model
    DAGScheduler.schedule_running_job -> FederatedScheduler.save_pipelined_model -> fate_flow_server.py -> party_app.py -> JobController.save_pipelined_model

  2. party/<job_id>//<party_id>/status/success
    DAGScheduler.schedule_running_job -> FederatedScheduler.sync_job_status -> fate_flow_server.py -> party_app.py -> JobController.update_job_status

  3. party/<job_id>//<party_id>/stop/success
    DAGScheduler.schedule_running_job -> DAGScheduler.finish -> DAGScheduler.stop_job -> FederatedScheduler.stop_job -> fate_flow_server.py -> party_app.py -> JobController.stop_jobs

  4. party/<job_id>//<party_id>/clean
    DAGScheduler.schedule_running_job -> DAGScheduler.finish -> FederatedScheduler.clean_job -> fate_flow_server.py -> party_app.py -> JobController.clean_job

再run 一次 清理干净 参见FATE学习:跟着日志读源码(九)upload任务job finsih阶段

  1. party/<job_id>//<party_id>/stop/success:DAGScheduler.schedule_running_job -> DAGScheduler.finish -> DAGScheduler.stop_job -> FederatedScheduler.stop_job -> fate_flow_server.py -> party_app.py -> JobController.stop_jobs

  2. party/<job_id>//<party_id>/clean:DAGScheduler.schedule_running_job -> DAGScheduler.finish -> FederatedScheduler.clean_job -> fate_flow_server.py -> party_app.py -> JobController.clean_job

对于轮询部分

party/<job_id>/<component_name>/<task_id>/<task_version>//<party_id>/collect:
DAGScheduler.schedule_running_job -> TaskScheduler.schedule -> TaskScheduler.collect_task_of_all_party -> FederatedScheduler.collect_task -> party_app.py -> TaskController.collect_task