【Elasticsearch从入门到精通】第16篇:Elasticsearch批量操作API——Bulk、Reindex与跨集群索引 上一篇【第15篇】 Elasticsearch删除与更新API——精确操作与脚本更新下一篇【第17篇】Elasticsearch并发控制——refresh参数与乐观并发控制摘要在实际生产环境中单条文档操作往往无法满足性能需求批量操作API是Elasticsearch高性能数据处理的基石。本文全面介绍了Elasticsearch的批量操作能力涵盖Bulk API的NDJSON格式规范元数据行数据行的两行结构、四种操作类型index/create/update/delete的混合使用与区别、最优批次大小通常5-15MB的配置策略与错误处理机制。深入解析了Reindex API的数据迁移能力包括源索引到目标索引的完整流程、查询过滤、版本控制internal/external/create三种模式、脚本转换与管道预处理以及跨集群Reindex的远程连接配置和白名单策略。最后介绍了Term向量API_termvectors的词元信息获取、统计信息分析与过滤功能。掌握这些内容将使你能够高效地完成Elasticsearch中的大规模数据操作与迁移任务。一、Bulk API基本格式1.1 NDJSON格式规范Bulk API_bulk允许在单个API调用中执行多个索引和删除操作可以显著提高操作效率。它使用新行分隔的JSONNDJSON格式action_and_meta_data\n optional_source\n action_and_meta_data\n optional_source\n ...注意最后一行数据必须以换行符\n结尾。发送请求时Content-Type应设置为application/x-ndjson。1.2 四种操作类型Bulk API支持以下四种操作操作类型说明需要数据行index索引文档存在则覆盖是create创建文档存在则报错是delete删除文档否update更新文档是doc/upsert/script1.3 完整示例POST_bulk{index:{_index:test,_id:1}}{field1:value1}{delete:{_index:test,_id:2}}{create:{_index:test,_id:3}}{field1:value3}{update:{_id:1,_index:test}}{doc:{field2:value2}}注意请求的 Content-Type 必须设置为application/x-ndjson。如果使用Curl提供文本文件输入必须使用--data-binary标志而非-d。二、Bulk API响应与错误处理2.1 响应结构Bulk API的响应是一个大型JSON结构每个操作的结果按请求中的顺序对应返回{took:30,errors:false,items:[{index:{_index:test,_type:_doc,_id:1,_version:1,result:created,_shards:{total:2,successful:1,failed:0},status:201}},{delete:{_index:test,_type:_doc,_id:2,_version:1,result:not_found,_shards:{total:2,successful:1,failed:0},status:404}}]}2.2 部分失败处理关键特性Bulk API中单个操作的失败不会影响其余操作的执行。即使某个操作报错其他操作仍会正常完成。errors字段仅表示是否存在任何失败操作。2.3 在索引级别指定默认值可以使用/{index}/_bulk端点为所有未显式指定索引的操作设置默认索引POSTtwitter/_bulk{index:{_id:1}}{user:kimchy,message:hello}{delete:{_id:2}}{create:{_id:3}}{user:other,message:world}三、Bulk API的Update操作3.1 Update操作格式Bulk中的Update操作需要两行元数据行和操作内容行。支持以下选项doc、upsert、doc_as_upsert、script、params、lang和_source。POST_bulk{update:{_id:1,_index:index1,retry_on_conflict:3}}{doc:{field:value}}{update:{_id:0,_index:index1,retry_on_conflict:3}}{script:{source:ctx._source.counter params.param1,lang:painless,params:{param1:1}},upsert:{counter:1}}{update:{_id:2,_index:index1,retry_on_conflict:3}}{doc:{field:value},doc_as_upsert:true}3.2 retry_on_conflict参数retry_on_conflict指定在发生版本冲突时重试更新的次数直接写在操作元数据行中{update:{_id:1,_index:test,retry_on_conflict:3}}3.3 _source控制可以控制Update操作后返回的_source内容{update:{_id:3,_index:index1,_source:true}}{doc:{field:value}}也可以放在操作内容行中{update:{_id:4,_index:index1}}{doc:{field:value},_source:true}四、Bulk API最优批次大小4.1 批次大小选择原则Bulk API中没有一个绝对适合所有场景的批次大小应根据具体工作负载进行测试。以下是一些通用建议指标推荐值说明批次大小5-15MB兼顾吞吐量和内存使用单批文档数1000-5000条根据文档大小调整线程数根据客户端CPU核数通常不超CPU核数的2倍4.2 批次大小对比批次大小吞吐量内存占用响应延迟适用场景1-5MB中低低实时写入、低延迟场景5-15MB高中中通用批量导入15-100MB很高高高离线数据迁移注意如果使用HTTP API确保客户端不发送HTTP块传输chunked encoding因为这会降低速度。五、Reindex API数据迁移5.1 基本用法Reindex API_reindex将文档从一个索引复制到另一个索引。最基本的形式如下POST_reindex{source:{index:twitter},dest:{index:new_twitter}}5.2 版本控制Reindex API支持多种版本控制模式internal默认盲目地将文档转储到目标覆盖同ID文档POST_reindex{source:{index:twitter},dest:{index:new_twitter,version_type:internal}}external保留源索引的版本号创建丢失的文档更新旧版本文档POST_reindex{source:{index:twitter},dest:{index:new_twitter,version_type:external}}create仅在目标索引中创建缺少的文档已有文档会版本冲突POST_reindex{source:{index:twitter},dest:{index:new_twitter,op_type:create}}5.3 版本控制策略对比策略行为适用场景internal默认覆盖同ID文档全量数据迁移external保留版本号增量更新跨集群增量同步create仅创建新文档补充缺失数据5.4 版本冲突处理默认情况下版本冲突会中止Reindex进程。设置conflictsproceed可以在冲突时继续POST_reindex{conflicts:proceed,source:{index:twitter},dest:{index:new_twitter}}5.5 查询过滤可以通过向source添加查询条件来限制迁移的文档范围POST_reindex{source:{index:twitter,query:{term:{user:kimchy}}},dest:{index:new_twitter}}5.6 多源索引source.index可以是一个列表允许从多个源索引复制POST_reindex{source:{index:[twitter,blog]},dest:{index:all_together}}5.7 字段过滤通过_source过滤只迁移需要的字段POST_reindex{source:{index:twitter,_source:[user,message]},dest:{index:new_twitter}}5.8 脚本转换Reindex支持通过脚本修改文档内容和元数据POST_reindex{source:{index:twitter},dest:{index:new_twitter},script:{source:ctx._source.tags ctx._source.tags ?: []; ctx._source.timestamp ctx._source.remove(post_date);,lang:painless}}脚本中可以修改的元数据字段包括_id、_index、_version、_routing。可以设置ctx.op为noop或delete来控制操作行为。5.9 路由控制在dest上设置路由参数POST_reindex{source:{index:source,query:{match:{company:cat}}},dest:{index:dest,routing:cat}}路由参数支持三种值参数值行为keep默认使用源索引的路由值discard目标索引路由值设为空自定义值所有文档使用指定路由5.10 批次大小与管道可以设置批次大小和使用索引预处理管道POST_reindex{source:{index:source,size:100},dest:{index:dest,pipeline:my_pipeline}}六、跨集群Reindex6.1 基本配置Reindex API支持从远程Elasticsearch集群重新索引数据POST_reindex{source:{remote:{host:https://otherhost:9200,username:user,password:pass},index:twitter},dest:{index:new_twitter}}6.2 白名单配置远程主机必须在elasticsearch.yml中显式配置白名单reindex.remote.whitelist:otherhost:9200,another:9200,127.0.10.*:9200,localhost:*白名单规则使用逗号分隔的主机和端口组合支持通配符如127.0.10.*:9200忽略通信协议只匹配主机和端口必须在所有协调节点上配置注意使用Basic Auth时务必使用HTTPS否则密码将以明文传输。跨集群Reindex功能可以与任何版本的Elasticsearch配合使用是集群升级的有效方式。6.3 远程连接参数POST_reindex{source:{remote:{host:https://otherhost:9200,username:user,password:pass,socket_timeout:1m,connect_timeout:10s},index:twitter,size:10},dest:{index:new_twitter}}参数说明默认值socket_timeout读超时时间30sconnect_timeout连接超时时间30ssize批次大小1000注意从远程服务器重新索引使用堆内缓冲区默认最大为100MB。如果远程索引包含大文档需要使用较小的批次大小。七、Term向量API7.1 基本概念Term向量Term Vectors用来存储文档字段的Term信息字段文本分词得到的词条和统计信息。Term向量在默认情况下是实时的。7.2 获取Term向量GETtwitter/_termvectors/1或者指定字段GETtwitter/_termvectors/1?fieldsmessage也可以通过请求体指定字段POSTtwitter/_termvectors/1{fields:[message],term_statistics:true,field_statistics:true}7.3 返回值类型类型参数说明默认Term信息term_statistics总词频、文档频率falseTerm统计positionsTerm位置信息不返回Term统计offsetsTerm起始/结束偏移不返回字段统计field_statistics文档计数、词频总和true7.4 Term过滤使用filter参数可以根据tf-idf分数过滤返回的Term帮助找出文档的特征向量POSTtwitter/_termvectors/1{fields:[plot],term_statistics:true,filter:{max_num_terms:3,min_term_freq:1,min_doc_freq:1}}过滤参数说明参数说明默认值max_num_terms每个字段返回的最大Term数25min_term_freq源文档中最低词频1max_term_freq源文档中最高词频无限min_doc_freq最低文档频率1max_doc_freq最高文档频率无限min_word_length最小词长0max_word_length最大词长无限7.5 多文档Term向量_mtermvectorsAPI允许一次获取多个文档的Term向量POSTtwitter/_mtermvectors{ids:[1,2],fields:[text],term_statistics:true}也可以在请求中提供人工文档来生成Term向量POSTtwitter/_mtermvectors{docs:[{_id:1,fields:[text]},{doc:{text:some text},fields:[text]}]}八、总结与最佳实践8.1 核心要点回顾Bulk API是Elasticsearch批量操作的核心NDJSON格式简洁高效支持四种操作混合使用部分失败隔离是Bulk API的重要特性单个操作失败不影响其他操作批次大小建议在5-15MB之间需要根据实际场景测试调优Reindex API是数据迁移的首选方案支持版本控制、查询过滤、脚本转换等丰富功能跨集群Reindex需要配置白名单和HTTPS认证是集群升级和数据同步的有效手段Term向量API提供了词元级别的分析能力适用于文本分析和相关性调试8.2 生产环境最佳实践Bulk批次优化通过测试找到最佳批次大小监控批量操作的响应时间和错误率Reindex限速大数据量迁移使用requests_per_second限速避免影响在线业务跨集群安全远程连接务必使用HTTPS严格配置白名单增量迁移使用version_type: external实现增量同步避免全量覆盖脚本转换在Reindex时使用脚本完成字段重命名、类型转换等数据清洗工作上一篇【第15篇】 Elasticsearch删除与更新API——精确操作与脚本更新下一篇【第17篇】Elasticsearch并发控制——refresh参数与乐观并发控制