步骤一电脑安装Python下载链接下载Python |Python.org步骤二安装依赖环境cmd下执行以下命令pip install flask出现以下结果代表安装成功步骤三创建回调脚本 agv_recv.pyCMD 输入notepad agv_recv.py弹出记事本后粘贴下面代码from flask import Flask, request app Flask(__name__) # 完全匹配WCS原回调路径/wcs/services/rest/cms/agvCallback app.route(/wcs/services/rest/cms/agvCallback, methods[POST,GET]) def agv_callback(): print( 收到WCS回调数据 ) print(请求头, dict(request.headers)) # 解析JSON报文 if request.is_json: req_data request.get_json() print(回调JSON报文, req_data) else: print(表单参数, request.form) # 返回WCS需要的成功应答 return {code:0,msg:success} if __name__ __main__: # 端口使用23600和WCS服务同端口 app.run(host0.0.0.0, port23600, debugFalse)CtrlS 保存关闭记事本。步骤四CMD 直接执行启动命令python agv_recv.py步骤五执行 curl 命令模拟推送消息curl -X POST http://127.0.0.1:23600/wcs/services/rest/cms/agvCallback -H Content-Type:application/json -d {\agvCode\:\AGV001\,\taskNo\:\T2026060301\}执行后回到运行服务的 CMD即可打印出收到的回调 JSON 数据。补充说明1. 将打印数据保存至文件中from flask import Flask, request import datetime app Flask(__name__) app.route(/wcs/services/rest/cms/agvCallback, methods[POST,GET]) def agv_callback(): print( 收到WCS回调数据 ) headers dict(request.headers) print(请求头, headers) if request.is_json: req_data request.get_json() print(回调JSON报文, req_data) # 新增落地文件 with open(agv_data.log,a,encodingutf-8) as f: time_str datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S) f.write(f【{time_str}】报文{req_data}\n) else: print(表单参数, request.form) return {code:0,msg:success} if __name__ __main__: app.run(host0.0.0.0, port23600, debugFalse)2. 按日期自动生成日志文件from flask import Flask, request import datetime, os app Flask(__name__) # 日志存放文件夹 LOG_DIR agv_logs # 文件夹不存在自动创建 if not os.path.exists(LOG_DIR): os.mkdir(LOG_DIR) app.route(/wcs/services/rest/cms/agvCallback, methods[POST,GET]) def agv_callback(): now datetime.datetime.now() time_full now.strftime(%Y-%m-%d %H:%M:%S) date_str now.strftime(%Y%m%d) # 日期作为日志文件名 print(f【{time_full}】收到WCS回调数据 ) headers dict(request.headers) print(请求头, headers) # 拼接当日日志完整路径 log_file os.path.join(LOG_DIR, fagv_{date_str}.log) if request.is_json: req_data request.get_json() print(回调JSON报文, req_data) # 写入当天的日志文件 with open(log_file,a,encodingutf-8) as f: f.write(f【{time_full}】请求头{headers}\n【报文内容】{req_data}\n-------------------------\n) else: form_data request.form print(表单参数, form_data) with open(log_file,a,encodingutf-8) as f: f.write(f【{time_full}】请求头{headers}\n【表单内容】{form_data}\n-------------------------\n) return {code:0,msg:success} if __name__ __main__: app.run(host0.0.0.0, port23600, debugFalse)
使用Python接受接口回调信息
发布时间:2026/6/4 1:27:13
步骤一电脑安装Python下载链接下载Python |Python.org步骤二安装依赖环境cmd下执行以下命令pip install flask出现以下结果代表安装成功步骤三创建回调脚本 agv_recv.pyCMD 输入notepad agv_recv.py弹出记事本后粘贴下面代码from flask import Flask, request app Flask(__name__) # 完全匹配WCS原回调路径/wcs/services/rest/cms/agvCallback app.route(/wcs/services/rest/cms/agvCallback, methods[POST,GET]) def agv_callback(): print( 收到WCS回调数据 ) print(请求头, dict(request.headers)) # 解析JSON报文 if request.is_json: req_data request.get_json() print(回调JSON报文, req_data) else: print(表单参数, request.form) # 返回WCS需要的成功应答 return {code:0,msg:success} if __name__ __main__: # 端口使用23600和WCS服务同端口 app.run(host0.0.0.0, port23600, debugFalse)CtrlS 保存关闭记事本。步骤四CMD 直接执行启动命令python agv_recv.py步骤五执行 curl 命令模拟推送消息curl -X POST http://127.0.0.1:23600/wcs/services/rest/cms/agvCallback -H Content-Type:application/json -d {\agvCode\:\AGV001\,\taskNo\:\T2026060301\}执行后回到运行服务的 CMD即可打印出收到的回调 JSON 数据。补充说明1. 将打印数据保存至文件中from flask import Flask, request import datetime app Flask(__name__) app.route(/wcs/services/rest/cms/agvCallback, methods[POST,GET]) def agv_callback(): print( 收到WCS回调数据 ) headers dict(request.headers) print(请求头, headers) if request.is_json: req_data request.get_json() print(回调JSON报文, req_data) # 新增落地文件 with open(agv_data.log,a,encodingutf-8) as f: time_str datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S) f.write(f【{time_str}】报文{req_data}\n) else: print(表单参数, request.form) return {code:0,msg:success} if __name__ __main__: app.run(host0.0.0.0, port23600, debugFalse)2. 按日期自动生成日志文件from flask import Flask, request import datetime, os app Flask(__name__) # 日志存放文件夹 LOG_DIR agv_logs # 文件夹不存在自动创建 if not os.path.exists(LOG_DIR): os.mkdir(LOG_DIR) app.route(/wcs/services/rest/cms/agvCallback, methods[POST,GET]) def agv_callback(): now datetime.datetime.now() time_full now.strftime(%Y-%m-%d %H:%M:%S) date_str now.strftime(%Y%m%d) # 日期作为日志文件名 print(f【{time_full}】收到WCS回调数据 ) headers dict(request.headers) print(请求头, headers) # 拼接当日日志完整路径 log_file os.path.join(LOG_DIR, fagv_{date_str}.log) if request.is_json: req_data request.get_json() print(回调JSON报文, req_data) # 写入当天的日志文件 with open(log_file,a,encodingutf-8) as f: f.write(f【{time_full}】请求头{headers}\n【报文内容】{req_data}\n-------------------------\n) else: form_data request.form print(表单参数, form_data) with open(log_file,a,encodingutf-8) as f: f.write(f【{time_full}】请求头{headers}\n【表单内容】{form_data}\n-------------------------\n) return {code:0,msg:success} if __name__ __main__: app.run(host0.0.0.0, port23600, debugFalse)