近期遇到一个需求,要求导出elasticsearch 8.x中的数据到Excel,这个需求存在一个比较难搞的问题,就是数据量大了,导出会超时,最好的做法是异步导出,然后通知前端去下载,前端可以异步的去查询导出进度或者后台轮询,等待服务端导出成功后,直接下载文件。笔者这边由于前端排期较满,所以才用了同步的方式,采用同步的方式导出,前端需要增加同步等待时间,而且会一直阻塞在导出页面,为了加快导出速度,笔者采用了多线程去做优化,最终将五万条记录数据的导出限制在40秒左右,这里的五万是elasticsearch 中的五万条记录,换算成Excel中的行数大概35万行左右,这里需要注意下,如果服务器的硬件配置(
主要是核心数和内存大小
)有限,建议还是采用异步导出,然后再下载的方式,避免OOM
。下面展开说明:
读取Elasticsearch数据
分页查询方式
Elasticsearch 的查询结果返回上限默认受 max_result_window
参数限制,其默认值为 10,000 条。但需根据不同的查询方式区分具体行为:
1. 默认分页查询(from + size
)的 10,000 条限制
-
机制:
当使用传统的分页方式(from
和size
参数)时,from + size
的总和不能超过max_result_window
的默认值 10,000。
例如:GET /your_index/_search {"from": 9000,"size": 1000,"query": { ... } }
此时
from + size = 10,000
,查询会成功;但若from + size > 10,000
,Elasticsearch 会抛出异常。 -
是否分页均受限制:
- 即使不显式分页(例如
from=0, size=15000
),只要size
超过max_result_window
,也会触发限制。 - 不指定
size
时:默认返回 10 条结果(与max_result_window
无关)。
- 即使不显式分页(例如
2. 绕过默认限制的其他查询方式
以下方法不受 max_result_window
限制,但需注意适用场景:
(1) Scroll API
- 用途:适合离线导出大数据(如全量数据迁移或批量处理)。
- 机制:创建快照式游标,分批次拉取数据。
- 限制:
- 不支持实时性要求高的场景(数据可能过期)。
- 需要手动清理 Scroll 上下文。
(2) Search After
- 用途:实时分页(如无限滚动列表)。
- 机制:基于上一页的排序值(如时间戳、唯一 ID)定位下一页。
- 限制:
- 需要指定唯一排序字段(确保分页顺序稳定)。
- 不支持跳页(只能连续翻页)。
3. 关键区别总结
查询方式 | 是否受 max_result_window 限制 | 适用场景 | 性能影响 |
---|---|---|---|
from + size | ✅ 是(默认 10,000) | 浅分页(前几百页) | 深度分页时性能极差 |
Scroll API | ❌ 否 | 大数据离线导出 | 资源占用高,需手动清理 |
Search After | ❌ 否 | 实时深分页(如无限滚动) | 高效,依赖排序字段 |
4. 是否需要调整 max_result_window
?
-
若必须使用
from + size
:
需通过以下方式修改索引设置:PUT /your_index/_settings {"index": {"max_result_window": 100000 # 调整为更大的值} }
但需注意:
- 深度分页(如
from=99999
)会导致性能骤降(每个分片需遍历所有匹配文档)。 - 可能触发内存溢出(OOM),尤其是查询结果包含大字段时。
- 深度分页(如
-
推荐替代方案:
优先使用 Search After 或 Scroll API,避免直接修改max_result_window
。例如:// Search After 示例 GET /your_index/_search {"size": 1000,"query": { ... },"sort": [{ "timestamp": "desc" },{ "_id": "asc" } // 确保排序唯一性],"search_after": [ "2023-10-01T00:00:00", "abc123" ] // 上一页最后一条的排序值 }
5. 在 Spring Boot 中的配置建议
- 使用
RestHighLevelClient
或ElasticsearchTemplate
:
直接调用 Scroll 或 Search After 接口,而非依赖from + size
。例如:// Search After 示例(Spring Data Elasticsearch) NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(QueryBuilders.matchAllQuery()).withSort(SortBuilders.fieldSort("timestamp").order(SortOrder.DESC)).withSort(SortBuilders.fieldSort("_id").order(SortOrder.ASC)).withPageable(Pageable.unpaged()) // 禁用传统分页.build();SearchHits<YourDocument> hits = elasticsearchTemplate.search(searchQuery, YourDocument.class); Object[] lastSortValues = hits.getSearchHit(hits.size() - 1).getSortValues();// 下一次查询时传入 search_after searchQuery.setSearchAfter(lastSortValues);
小结
- 默认限制:Elasticsearch 的
from + size
分页查询默认最多返回 10,000 条,无论是否显式分页。 - 绕过限制:使用 Scroll API 或 Search After 可突破此限制,但需根据场景选择合适方案。
- 性能优先:避免盲目调大
max_result_window
,优先优化查询逻辑或使用高效分页机制。
游标查询( Scroll API)方式
在 Elasticsearch 8.x 及更高版本中,官方推荐使用新的 Java API Client(elasticsearch-java
库)替代旧的 RestHighLevelClient
。以下是基于新客户端(ElasticsearchClient
)使用 Scroll API 的完整代码示例和步骤:
1. 添加依赖
确保 pom.xml
或 build.gradle
中包含最新版本的 Elasticsearch Java 客户端:
<dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.12.0</version> <!-- 检查最新版本 -->
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version> <!-- 匹配 Elasticsearch 版本 -->
</dependency>
2. 使用 Scroll API 的完整代码
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import java.io.IOException;
import java.util.List;public class ScrollApiExample {public void scrollData(ElasticsearchClient client, String indexName) throws IOException {// 1. 初始化 Scroll 请求SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexName).query(q -> q.matchAll(m -> m)) // 查询所有文档.size(1000) // 每批次拉取 1000 条.scroll(sb -> sb.time("1m")) // Scroll 有效期 1 分钟);// 2. 发送初始请求,获取 scrollId 和第一批数据SearchResponse<Object> response = client.search(searchRequest, Object.class);String scrollId = response.scrollId();List<Hit<Object>> hits = response.hits().hits();// 处理第一批数据processHits(hits);// 3. 循环拉取后续批次while (hits != null && !hits.isEmpty()) {// 构建 Scroll 请求ScrollRequest scrollRequest = ScrollRequest.of(s -> s.scrollId(scrollId).scroll(sb -> sb.time("1m")) // 续期 Scroll 有效期);// 发送请求获取下一批数据SearchResponse<Object> scrollResponse = client.scroll(scrollRequest, Object.class);scrollId = scrollResponse.scrollId(); // 更新 scrollIdhits = scrollResponse.hits().hits();// 处理当前批次数据processHits(hits);}// 4. 清理 Scroll 上下文if (scrollId != null) {ClearScrollRequest clearRequest = ClearScrollRequest.of(c -> c.scrollId(scrollId));client.clearScroll(clearRequest);}}private void processHits(List<Hit<Object>> hits) {for (Hit<Object> hit : hits) {Object source = hit.source(); // 获取文档内容(类型需与实际数据匹配)System.out.println("Document: " + source);}}
}
3. 关键参数说明
参数/方法 | 说明 |
---|---|
.size(1000) | 每批次拉取的文档数(默认 10 )。 |
.scroll(s -> s.time("1m")) | 设置 Scroll 上下文的存活时间(如 1m 表示 1 分钟)。 |
client.search() | 发送初始搜索请求,返回第一批数据和 scrollId 。 |
client.scroll() | 根据 scrollId 获取下一批数据。 |
client.clearScroll() | 清理 Scroll 上下文,释放资源。 |
4. 注意事项
-
数据类型匹配:
示例中使用了Object.class
泛型,实际应根据索引文档的 Java 类型替换(如User.class
)。SearchResponse<User> response = client.search(searchRequest, User.class);
-
错误处理:
- 添加
try-catch
块处理IOException
或 Elasticsearch 异常。 - 确保在异常时仍清理 Scroll 上下文(避免资源泄漏)。
- 添加
-
性能优化:
- 根据数据量调整
size
(如5000
),但需权衡内存消耗。 - 避免在 Scroll 存活时间内处理过慢,导致上下文过期。
- 根据数据量调整
-
实时性限制:
Scroll API 基于数据快照,后续写入可能不会反映在结果中。若需实时遍历,改用 Search After。
5. 结合 Search After 实现高效分页
若需要实时分页,可改用 Search After(需指定唯一排序字段):
SearchRequest searchRequest = SearchRequest.of(s -> s.index(indexName).query(q -> q.matchAll(m -> m)).size(1000).sort(so -> so.field(f -> f.field("timestamp").order(SortOrder.Desc))) // 排序字段.sort(so -> so.field(f -> f.field("_id").order(SortOrder.Asc))) // 确保唯一性
);SearchResponse<User> response = client.search(searchRequest, User.class);
List<Hit<User>> hits = response.hits().hits();// 获取最后一行的排序值
List<JsonData> lastSort = hits.get(hits.size() - 1).sort();// 下次查询时传入 search_after
SearchRequest nextPageRequest = SearchRequest.of(s -> s.index(indexName).query(q -> q.matchAll(m -> m)).size(1000).searchAfter(lastSort) // 指定 search_after.sort(so -> so.field(f -> f.field("timestamp").order(SortOrder.Desc))).sort(so -> so.field(f -> f.field("_id").order(SortOrder.Asc)))
);
小结
- Scroll API:适合离线大数据导出,但需手动管理上下文和内存。
- Search After:适合实时深分页,依赖唯一排序字段。
- 新客户端特性:Elasticsearch Java API Client 提供类型安全的 DSL,代码更简洁。
多线程处理ES数据
笔者这里采用了生产消费模式
,采用游标查询(Scroll API)
的方式,从ES中拉取数据放入阻塞队列供数据处理线程即消费者去处理,处理完成后放入线程安全的集合进行数据合并。
//每轮拉取的数据条数int pullSize = 2000;BoolQuery.Builder boolQueryBuilder = new BoolQuery.Builder();if (!StringUtils.isEmpty(queryCondition.getFieldCondition())) {if (!StringUtils.isEmpty(queryCondition.getFieldCondition().getId())) {boolQueryBuilder.must(new Query.Builder().term(new TermQuery.Builder().field("Id").value(queryCondition.getFieldCondition().getId()).build()).build());}if ((!StringUtils.isEmpty(queryCondition.getTimeRangeStart())) && (!StringUtils.isEmpty(queryCondition.getTimeRangeEnd()))) {//查询时间范围SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");//sdf.setTimeZone(TimeZone.getTimeZone("UTC"));//查询开始日期时间戳long startTimeStamp = 0;//查询结束日期时间戳ZonedDateTime endTime = null;try {startTimeStamp = sdf.parse(queryCondition.getTimeRangeStart()).getTime();//0点endTime = sdf.parse(queryCondition.getTimeRangeEnd()).toInstant().atZone(ZoneId.of("Asia/Shanghai")).plusDays(1);} catch (ParseException e) {log.error("Time parse error :" + e.getMessage());//throw new BusiException(e.getMessage());}long endTimeStamp = Date.from(endTime.toInstant()).getTime();//查询时间范围boolQueryBuilder.must(new Query.Builder().range(new RangeQuery.Builder().field("timestamp").gte(JsonData.of(startTimeStamp)).lt(JsonData.of(endTimeStamp)).build()).build());}}//加入状态为 "END"boolQueryBuilder.must(new Query.Builder().term(new TermQuery.Builder().field("status").value(Constant.END).build()).build());SearchRequest request = new SearchRequest.Builder().index(elasticProperties.getEsIndex()).query(new Query.Builder().bool(boolQueryBuilder.build()).build()).searchType(SearchType.DfsQueryThenFetch).scroll(s -> s.time("1m"))//Scroll有效期.size(pullSize)//每批次拉取2000条记录.build();//消费线程数int consumerThreads = 8;// 创建一个有界队列用于生产者和消费者之间的数据传递BlockingQueue<List<Hit<Dialog>>> hitsBatchQueue = new LinkedBlockingQueue<>(consumerThreads * 2);// 停止标志AtomicBoolean done = new AtomicBoolean(false);CountDownLatch producerLatch = new CountDownLatch(1);CountDownLatch consumersLatch = new CountDownLatch(consumerThreads);// 创建线程池ExecutorService executorService = Executors.newFixedThreadPool(consumerThreads + 1);// 启动生产者线程 - 负责执行scroll请求获取数据executorService.submit(() -> {String scrollId = null;int batchesProduced = 0;int totalHits = 0;try {SearchResponse<Dialog> searchResponse = elasticsearchClient.search(request, Dialog.class);if (searchResponse != null) {scrollId = searchResponse.scrollId();log.debug(searchResponse.toString());List<Hit<Dialog>> hits = searchResponse.hits().hits();//第一批数据放入消费者队列if ((hits != null) && (!hits.isEmpty())) {//放入生产队列hitsBatchQueue.put(hits);totalHits += hits.size();batchesProduced++;log.info("生产者: 初始批次,获取记录数:{} ", hits.size());} // 继续scroll直到没有更多数据或达到最大记录数while ((scrollId != null) && (hits != null) && (!hits.isEmpty()) && (totalHits < maxExportSize)) {//构建Scroll请求final String currentScrollId = scrollId;ScrollRequest scrollRequest = ScrollRequest.of(s -> s.scrollId(currentScrollId).scroll(st -> st.time("1m")));// 续期 Scroll 有效期ScrollResponse<AimcDialog> scrollResponse = elasticsearchClient.scroll(scrollRequest, AimcDialog.class);log.debug(scrollResponse.toString());// 更新scrollIdscrollId = scrollResponse.scrollId();if ((scrollId != null) && (!scrollId.equals(currentScrollId))) {clearScroll(currentScrollId);}hits = scrollResponse.hits().hits();if ((hits != null) && (!hits.isEmpty())) {//放入生产队列hitsBatchQueue.put(hits);totalHits += hits.size();batchesProduced++;//if (batchesProduced % 10 == 0) {log.info("生产者: 已生产 " + batchesProduced + " 批次,总记录数: " + totalHits);//}} else {break;}// 检查是否已达到最大记录数if (totalHits >= maxExportSize) {log.info("已达到最大记录数限制,停止生产: " + totalHits);break;}}// 清理 Scroll 上下文if (scrollId != null) {String currentScrollId = scrollId;ClearScrollRequest clearRequest = ClearScrollRequest.of(c -> c.scrollId(currentScrollId));elasticsearchClient.clearScroll(clearRequest);}}} catch (Exception e) {log.error("生产者线程出错: " + e.getMessage());} finally {// 清理scroll上下文if (scrollId != null) {clearScroll(scrollId);}// 标记生产完成done.set(true);producerLatch.countDown();}});//用来维护id和名字的映射ConcurrentHashMap<String, String> names = new ConcurrentHashMap<>();//线程锁ConcurrentHashMap<String, ReentrantLock> botLocks = new ConcurrentHashMap<>();//总共处理了多少查询结果final AtomicInteger totalFetched = new AtomicInteger(0);//用于返回的结果List<Log> logs = Collections.synchronizedList(new ArrayList<>());// 启动消费者线程 - 负责处理队列中的数据for (int i = 0; i < consumerThreads; i++) {final int consumerId = i;final List<Log> consumerResults = new ArrayList<>();executorService.submit(() -> {try {int processedCount = 0;while (!done.get() || !hitsBatchQueue.isEmpty()) {List<Hit<Dialog>> hits = hitsBatchQueue.poll(500, TimeUnit.MILLISECONDS);if (hits != null) {List<Log> logList = processHits(hits, names, botLocks);consumerResults.addAll(logList);processedCount += logList.size();totalFetched.addAndGet(logList.size());if (processedCount % pullSize == 0) {log.info("消费者 " + consumerId + ": 已处理 " + processedCount + " 条记录");}}}log.info("消费者 " + consumerId + " 完成,共处理 " + processedCount + " 条记录");// 同步添加结果synchronized (logs) {logCalls.addAll(consumerResults);}} catch (Exception e) {log.error("消费者 " + consumerId + " 出错: " + e.getMessage());e.printStackTrace();} finally {consumersLatch.countDown();}});}try {// 等待生产者完成boolean producerCompleted = producerLatch.await(10, TimeUnit.MINUTES);if (!producerCompleted) {log.error("生产者线程未在超时时间内完成");}// 等待所有消费者完成boolean consumersCompleted = consumersLatch.await(10, TimeUnit.MINUTES);if (!consumersCompleted) {log.error("部分消费者线程未在超时时间内完成");}// 关闭线程池executorService.shutdown();boolean b = executorService.awaitTermination(1, TimeUnit.MINUTES);log.info("数据拉取完成,总获取记录数: " + totalFetched.get());} catch (Exception e) {log.error(e.getMessage());e.printStackTrace();}
之所以采用生产消费模式,是因为从ES中批量拉取数据速度很快,但是对于每批次的数据处理是一个耗时操作,同时,数据处理过程中可能还会涉及远程调用,而且对于多个批次的数据可以多线程并行处理,这样耗时会大大减少。接下来讲一讲已经处理好的数据进行多线程导出到Excel。
导出数据到Excel
首先引入依赖
<dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>5.1.0</version></dependency><dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>5.1.0</version></dependency>
另外EasyExcel
的性能好像优于POI,感兴趣的可以尝试下
<!-- EasyExcel --><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.3.2</version></dependency>
导出Excel
Excel工具类
工具类中将从ES批量拉取的数据做分割处理,按照设定的大小分成若干个sheet,多个sheet采用多线程并行执行导出操作
public void downloadExcel(HttpServletRequest req, HttpServletResponse resp, String filename, Workbook workbook) {try {beforeDownload(req, resp, filename);OutputStream out = resp.getOutputStream();workbook.write(out);out.flush();out.close();} catch (IOException e) {throw new BusiException(e.getMessage());} finally {if (workbook instanceof SXSSFWorkbook) {((SXSSFWorkbook) workbook).dispose(); // 清理临时文件}}}public void beforeDownload(HttpServletRequest req, HttpServletResponse resp, String filename) throws UnsupportedEncodingException {filename = URLEncoder.encode(filename, "UTF-8");resp.setContentType("application/octet-stream;charset=UTF-8");resp.setHeader("Content-Disposition", "attachment;filename=" + filename);resp.addHeader("Pargam", "no-cache");resp.addHeader("Cache-Control", "no-cache");}public <C, M> Workbook createExcel(List<C> calls, Class<C> callClass, Class<M> commuClass) throws Exception {// 使用 SXSSFWorkbook 替代 HSSFWorkbookSXSSFWorkbook workbook = new SXSSFWorkbook(1000); // 缓存 100 行if (calls == null || calls.isEmpty()) {workbook.createSheet();return workbook;}// 分片List<List<C>> splitLists = splistList(calls, 5000, maxExportSize);ExecutorService executor = Executors.newFixedThreadPool(Math.min(splitLists.size(), Runtime.getRuntime().availableProcessors()));List<Future<Sheet>> futures = new ArrayList<>(splitLists.size());// 缓存表头和 getterList<FieldInfo> callFields = getFieldInfos(callClass);List<FieldInfo> commuFields = getFieldInfos(commuClass);List<String> headers = callFields.stream().map(f -> f.header).collect(Collectors.toList());List<String> subHeaders = commuFields.stream().map(f -> f.header).collect(Collectors.toList());// 样式CellStyle headerStyle = createHeaderStyle(workbook);CellStyle dataStyle = createDataStyle(workbook);CellStyle subHeaderStyle = createHeaderStyle(workbook);subHeaderStyle.setFillForegroundColor(HSSFColor.HSSFColorPredefined.SKY_BLUE.getIndex());// 多线程并行处理每个分片for (int i = 0; i < splitLists.size(); i++) {final List<C> subList = splitLists.get(i);final int finalI = i;futures.add(executor.submit(() -> {Sheet sheet;synchronized (workbook) {sheet = workbook.createSheet("Sheet" + (finalI + 1));}processSheet(sheet, subList, callClass, commuClass, headers, subHeaders, headerStyle, dataStyle, subHeaderStyle);log.info("{}导出完成", sheet.getSheetName());return sheet;}));}// 等待所有任务完成for (Future<Sheet> future : futures) {try {future.get();} catch (Exception e) {log.error("当前线程失败:{}", future);e.printStackTrace();}}executor.shutdown();boolean b = executor.awaitTermination(10, TimeUnit.MINUTES);return workbook;}//分割数据集合public <T> List<List<T>> splistList(List<T> list, int subNum, int maxExportSize) {List<List<T>> tNewList = new ArrayList<>();int size = Math.min(list.size(), maxExportSize);for (int i = 0; i < size; i += subNum) {int end = Math.min(i + subNum, size);tNewList.add(list.subList(i, end));}return tNewList;}
调用Excel工具类创建Excel
调用工具类创建工作簿,并传输到前端页面进行下载。
List<Log> logs = DialogDao4Es.exportLogCalls(queryCondition);Workbook workbook = null;try {workbook = excelExportUtil.createExcel(logs, Log.class, LogCommu.class);} catch (Exception e) {log.error("工作簿创建失败:{}", e.getMessage());e.printStackTrace();}excelExportUtil.downloadExcel(req, resp, "日志.xlsx", workbook);
到这里分享就结束了,欢迎大家一起交流,有什么问题 可以评论区留言。