DolphinDB工业数据质量:完整性检查与修复 目录摘要一、数据质量概述1.1 数据质量维度1.2 质量指标二、完整性检查2.1 字段完整性2.2 记录完整性2.3 时间完整性三、一致性检查3.1 数据一致性3.2 引用一致性3.3 业务一致性四、数据质量评分4.1 质量评分函数4.2 质量报告五、自动修复5.1 缺失值修复5.2 异常值修复5.3 重复值修复六、质量监控6.1 质量监控表6.2 定期质量检查七、实战案例7.1 数据质量管理平台八、总结参考资料摘要本文深入讲解DolphinDB工业数据质量管理。从完整性检查到一致性验证从数据质量评分到自动修复从质量监控到持续改进全面介绍数据质量管理的核心方法。通过丰富的代码示例帮助读者掌握工业数据质量管理的核心技能。一、数据质量概述1.1 数据质量维度数据质量维度完整性高质量数据准确性一致性及时性有效性1.2 质量指标指标说明完整性数据是否完整准确性数据是否正确一致性数据是否一致及时性数据是否及时有效性数据是否有效二、完整性检查2.1 字段完整性//创建测试数据 ttable(1..100asid,take([1,NULL,3],100)asdevice_id,take([25.0,NULL,27.0],100)astemperature,take([50.0,51.0,NULL],100)ashumidity)//检查字段完整性defcheckFieldCompleteness(data){resulttable(data.columnNames()asfield_name,each(def(col){sum(isNull(data[col]))},data.columnNames())asnull_count,each(def(col){sum(isNull(data[col]))*100.0/data.rows()},data.columnNames())asnull_rate)returnresult}//使用 checkFieldCompleteness(t)2.2 记录完整性//检查记录完整性defcheckRecordCompleteness(data,keyColumns){//检查主键完整性 keyNullselect*fromdata where hasNull(keyColumns)//检查重复记录 duplicatesselect count(*)ascntfromdata group by keyColumns having count(*)1returndict(STRING,ANY,[[keyNullCount,keyNull.rows()],[duplicateCount,duplicates.rows()]])}//使用 checkRecordCompleteness(t,id)2.3 时间完整性//检查时间序列完整性defcheckTimeCompleteness(data,timeCol,interval){//获取时间范围 minTimemin(data[timeCol])maxTimemax(data[timeCol])//计算预期记录数 expectedCount(maxTime-minTime)/interval1//实际记录数 actualCountdata.rows()//缺失记录 missingCountexpectedCount-actualCountreturndict(STRING,ANY,[[expectedCount,expectedCount],[actualCount,actualCount],[missingCount,missingCount],[completenessRate,actualCount*100.0/expectedCount]])}三、一致性检查3.1 数据一致性//检查数据一致性defcheckDataConsistency(data,rules){resultsarray(STRING,0)for(ruleinrules){violationsselect*fromdata wherenoteval(rule.condition)if(violations.rows()0){results.append!(rule.name: string(violations.rows()) 条违规)}}returnresults}//定义规则 rules[dict(STRING,ANY,[[name,温度范围],[condition,temperature between -40 and 100]]),dict(STRING,ANY,[[name,湿度范围],[condition,humidity between 0 and 100]])]//使用 checkDataConsistency(t,rules)3.2 引用一致性//检查引用一致性defcheckReferenceConsistency(data,refTable,dataCol,refCol){//查找无效引用 invalidRefsselect*fromdata where data[dataCol]notin(select refColfromrefTable)returndict(STRING,ANY,[[invalidCount,invalidRefs.rows()],[invalidRecords,invalidRefs]])}3.3 业务一致性//检查业务一致性defcheckBusinessConsistency(data){//示例温度升高时湿度应该下降 inconsistentselect*fromdata where temperature30andhumidity70returninconsistent}四、数据质量评分4.1 质量评分函数//数据质量评分defcalculateQualityScore(data){scoresdict(STRING,DOUBLE)//完整性评分 nullRateseach(def(col){sum(isNull(data[col]))*100.0/data.rows()},data.columnNames())scores[completeness]100-avg(nullRates)//准确性评分基于异常值比例 outlierRatesarray(DOUBLE,0)for(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){avgValavg(data[col])stdValstd(data[col])outlierRatesum(abs(data[col]-avgVal)3*stdVal)*100.0/data.rows()outlierRates.append!(outlierRate)}}scores[accuracy]100-avg(outlierRates)//总分 scores[total](scores[completeness]scores[accuracy])/2returnscores}//使用 calculateQualityScore(t)4.2 质量报告//生成质量报告defgenerateQualityReport(data){reportdict(STRING,ANY)//基本信息 report[totalRows]data.rows()report[totalColumns]data.columns()//完整性 report[completeness]checkFieldCompleteness(data)//质量评分 report[scores]calculateQualityScore(data)returnreport}//使用 reportgenerateQualityReport(t)print(report)五、自动修复5.1 缺失值修复//自动修复缺失值defautoFixMissingValues(data,strategymean){resultdatafor(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){if(strategymean){result[col]iif(isNull(data[col]),avg(data[col]),data[col])}elseif(strategymedian){result[col]iif(isNull(data[col]),med(data[col]),data[col])}elseif(strategyzero){result[col]iif(isNull(data[col]),0,data[col])}}}returnresult}5.2 异常值修复//自动修复异常值defautoFixOutliers(data,methodclip){resultdatafor(colindata.columnNames()){if(type(data[col])in[INT,LONG,FLOAT,DOUBLE]){avgValavg(data[col])stdValstd(data[col])loweravgVal-3*stdVal upperavgVal3*stdValif(methodclip){result[col]iif(data[col]lower,lower,iif(data[col]upper,upper,data[col]))}elseif(methodremove){resultselect*fromresult where data[col]between lowerandupper}}}returnresult}5.3 重复值修复//自动修复重复值defautoFixDuplicates(data,keyColumns){returnselect distinct*fromdata}六、质量监控6.1 质量监控表//创建质量监控表 share table(1:0,check_timetable_namecheck_typescoredetails,[TIMESTAMP,STRING,STRING,DOUBLE,STRING])asquality_log//记录质量检查deflogQualityCheck(tableName,checkType,score,details){insert into quality_log values(now(),tableName,checkType,score,details)}6.2 定期质量检查//定期质量检查任务defscheduledQualityCheck(){tloadTable(dfs://iot_db,sensor_data)//完整性检查 completenesscalculateQualityScore(t)[completeness]logQualityCheck(sensor_data,completeness,completeness,)//准确性检查 accuracycalculateQualityScore(t)[accuracy]logQualityCheck(sensor_data,accuracy,accuracy,)}//定时任务 scheduleJob(quality_check,数据质量检查,scheduledQualityCheck,00:00,2024.01.01,2030.12.31,D)七、实战案例7.1 数据质量管理平台//数据质量管理平台//1.创建质量检查函数defqualityCheckPipeline(data,tableName){print( 数据质量检查: tableName )//完整性检查 completenesscheckFieldCompleteness(data)print(字段完整性:)print(completeness)//一致性检查 rules[dict(STRING,ANY,[[name,温度范围],[condition,temperature between -40 and 100]]),dict(STRING,ANY,[[name,湿度范围],[condition,humidity between 0 and 100]])]consistencycheckDataConsistency(data,rules)print(一致性检查: string(consistency))//质量评分 scorescalculateQualityScore(data)print(质量评分:)print(scores)//记录日志 logQualityCheck(tableName,total,scores[total],)returnscores}//2.创建测试数据 ttable(1..1000asid,take(1..10,1000)asdevice_id,2024.01.01T00:00:000..999*60000astimestamp,concat([rand(20.0..30.0,950),take(NULL,30),rand(100.0..200.0,20)])astemperature,concat([rand(40.0..60.0,970),take(NULL,30)])ashumidity)//3.执行质量检查 qualityCheckPipeline(t,sensor_data)//4.自动修复 fixedautoFixMissingValues(t,mean)fixedautoFixOutliers(fixed,clip)//5.再次检查 qualityCheckPipeline(fixed,sensor_data_fixed)print(数据质量管理完成)八、总结本文详细介绍了DolphinDB工业数据质量管理完整性检查字段完整性、记录完整性、时间完整性一致性检查数据一致性、引用一致性、业务一致性质量评分评分函数、质量报告自动修复缺失值修复、异常值修复、重复值修复质量监控监控表、定期检查思考题如何设计数据质量指标体系如何平衡数据修复的自动化程度如何持续改进数据质量参考资料DolphinDB数据质量DolphinDB数据治理