Spring Integration 教程 一、什么是 Spring IntegrationSpring Integration 是 Spring 生态系统中的一个扩展模块用于实现企业应用集成 (EAI, Enterprise Application Integration)。它基于 Spring 框架提供了一套声明式的适配器用于集成不同的系统和服务。核心特点基于消息驱动的架构支持多种传输协议HTTP, TCP, JMS, AMQP, FTP, File 等提供开箱即用的端点适配器支持企业集成模式 (EIP, Enterprise Integration Patterns)二、核心概念1. Message// 消息由消息头和消息体组成 public interface MessageT { T getPayload(); MessageHeaders getHeaders(); } // 创建消息 MessageString message MessageBuilder.withPayload(Hello) .setHeader(key, value) .build();2. Message Channel消息通道用于在发送者和接收者之间传递消息。// 点对点通道 Bean public MessageChannel directChannel() { return new DirectChannel(); } // 发布订阅通道 Bean public MessageChannel publishSubscribeChannel() { return new PublishSubscribeChannel(); } // 队列通道 Bean public MessageChannel queueChannel() { return new QueueChannel(10); }3. Message Endpoint消息端点负责处理消息。三、快速入门示例Maven 依赖dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-integration/artifactId /dependency !-- 可选特定协议支持 -- dependency groupIdorg.springframework.integration/groupId artifactIdspring-integration-http/artifactId /dependency dependency groupIdorg.springframework.integration/groupId artifactIdspring-integration-file/artifactId /dependency基础配置示例Configuration EnableIntegration public class IntegrationConfig { // 定义消息通道 Bean public MessageChannel inputChannel() { return new DirectChannel(); } Bean public MessageChannel outputChannel() { return new DirectChannel(); } // 定义集成流程 Bean public IntegrationFlow simpleFlow() { return IntegrationFlow.from(inputChannel()) .transform(String.class, s - s.toUpperCase()) .filter(s - s.startsWith(A)) .handle(System.out::println) .get(); } }使用 MessagingGateway// 定义网关接口 MessagingGateway(defaultRequestChannel inputChannel) public interface SimpleGateway { void sendMessage(String message); Gateway(requestChannel requestChannel, replyChannel replyChannel) String sendAndReceive(String message); } // 使用网关 Service public class MessageService { Autowired private SimpleGateway gateway; public void send(String message) { gateway.sendMessage(message); } }四、常用企业集成模式1. 消息转换器 (Transformer)Bean public IntegrationFlow transformerFlow() { return IntegrationFlow.from(inputChannel) .transform(new GenericTransformerString, User() { Override public User transform(String source) { return new User(source); } }) .channel(outputChannel) .get(); }2. 消息过滤器 (Filter)Bean public IntegrationFlow filterFlow() { return IntegrationFlow.from(inputChannel) .filter(payload - payload instanceof User) .filter(payload.age 18) // SpEL 表达式 .channel(adultChannel) .get(); }3. 消息路由器 (Router)Bean public IntegrationFlow routerFlow() { return IntegrationFlow.from(inputChannel) .route(payload - { if (payload instanceof Order) return orderChannel; if (payload instanceof Payment) return paymentChannel; return errorChannel; }) .get(); }4. 消息拆分器 (Splitter) 和聚合器 (Aggregator)Bean public IntegrationFlow splitterAggregatorFlow() { return IntegrationFlow.from(inputChannel) .split() // 拆分消息 .channel(splitChannel) .aggregate() // 聚合消息 .channel(outputChannel) .get(); }五、常用适配器示例1. 文件适配器Configuration public class FileIntegrationConfig { // 读取文件 Bean public IntegrationFlow fileReaderFlow() { return IntegrationFlow.from( Files.inboundAdapter(new File(/input)) .patternFilter(*.txt), e - e.poller(Pollers.fixedDelay(1000)) ) .transform(File.class, File::getAbsolutePath) .handle(System.out::println) .get(); } // 写入文件 Bean public IntegrationFlow fileWriterFlow() { return IntegrationFlow.from(fileInputChannel) .handle(Files.outboundAdapter(new File(/output)) .autoCreateDirectory(true)) .get(); } }2. HTTP 适配器Configuration public class HttpIntegrationConfig { // HTTP 入站网关 Bean public IntegrationFlow httpInboundFlow() { return IntegrationFlow.from( Http.inboundGateway(/api/message) .requestMapping(m - m.methods(HttpMethod.POST)) .requestPayloadType(String.class) .replyTimeout(30000) ) .transform(String.class, s - Processed: s) .get(); } // HTTP 出站网关 Bean public IntegrationFlow httpOutboundFlow() { return IntegrationFlow.from(requestChannel) .handle(Http.outboundGateway(https://api.example.com/data) .httpMethod(HttpMethod.GET) .expectedResponseType(String.class)) .channel(responseChannel) .get(); } }3. JMS 适配器dependency groupIdorg.springframework.integration/groupId artifactIdspring-integration-jms/artifactId /dependency dependency groupIdorg.apache.activemq/groupId artifactIdactivemq-broker/artifactId /dependencyConfiguration public class JmsIntegrationConfig { Bean public ConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(tcp://localhost:61616); } // JMS 入站适配器 Bean public IntegrationFlow jmsInboundFlow() { return IntegrationFlow.from( Jms.inboundAdapter(connectionFactory()) .destination(queue.in) ) .transform(String.class, String::toUpperCase) .handle(message - System.out.println(Received: message)) .get(); } // JMS 出站适配器 Bean public IntegrationFlow jmsOutboundFlow() { return IntegrationFlow.from(jmsOutputChannel) .handle(Jms.outboundAdapter(connectionFactory()) .destination(queue.out)) .get(); } }六、高级特性1. 错误处理Bean public IntegrationFlow errorHandlingFlow() { return IntegrationFlow.from(inputChannel) .transform(...) .handle(..., e - e .advice(ExpressionEvaluatingRequestHandlerAdvice.class) .advice(advice - advice .onFailureExpression(payload.message) .trapException(true)) ) .get(); } // 全局错误通道 Bean public IntegrationFlow errorFlow() { return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) .handle(message - { Exception exception (Exception) message.getPayload(); log.error(Error: , exception); }) .get(); }2. 消息历史Configuration EnableIntegration EnableMessageHistory public class HistoryConfig { Bean public IntegrationFlow historyFlow() { return IntegrationFlow.from(inputChannel) .transform(...) .enrichHeaders(s - s.header(MessageHistory.HEADER_NAME, new MessageHistory())) .handle(...) .get(); } }3. 控制总线Bean public IntegrationFlow controlBusFlow() { return IntegrationFlow.from(controlBus) .controlBus() .get(); } // 使用控制总线 Component public class ControlBusService { Autowired Qualifier(controlBus) private MessageChannel controlBus; public void stopChannel() { controlBus.send(MessageBuilder.withPayload(myChannel.stop()).build()); } }七、完整示例文件处理系统SpringBootApplication EnableIntegration public class FileProcessingApplication { public static void main(String[] args) { SpringApplication.run(FileProcessingApplication.class, args); } } Configuration public class FileProcessingFlow { private static final Logger log LoggerFactory.getLogger(FileProcessingFlow.class); // 文件输入目录 Value(${input.directory:/input}) private String inputDirectory; // 处理成功目录 Value(${success.directory:/success}) private String successDirectory; // 处理失败目录 Value(${failed.directory:/failed}) private String failedDirectory; Bean public IntegrationFlow fileProcessingFlow() { return IntegrationFlow.from( Files.inboundAdapter(new File(inputDirectory)) .patternFilter(*.csv) .preventDuplicates(true) .autoCreateDirectory(true), e - e.poller(Pollers.fixedDelay(5000) .maxMessagesPerPoll(5) .advice(expressionAdvice())) ) .channel(MessageChannels.queue(processingChannel, 10)) .transform(Files.toStringTransformer()) // 文件转字符串 .split(s - s.delimiters(\n)) // 按行拆分 .filter(line - !line.trim().isEmpty()) .transform(line - parseCsvLine(line)) // 解析CSV .aggregate(aggregatorSpec - aggregatorSpec .releaseStrategy(new SimpleSequenceSizeReleaseStrategy()) .correlationStrategy(message - batch)) .handle(message - processBatch((ListMapString, String) message.getPayload())) .handle(Files.outboundAdapter(new File(successDirectory)) .autoCreateDirectory(true) .fileNameGenerator(message - generateFileName(message))) .get(); } // 错误处理失败的文件移动到失败目录 Bean public IntegrationFlow errorHandlingFlow() { return IntegrationFlow.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) .handle(message - { Message? failedMessage (Message?) message.getHeaders().get(inputMessage); File failedFile (File) failedMessage.getPayload(); FileUtils.moveFileToDirectory(failedFile, new File(failedDirectory), true); log.error(Failed to process file: {}, failedFile.getName()); }) .get(); } private MapString, String parseCsvLine(String line) { // CSV解析逻辑 return new HashMap(); } private void processBatch(ListMapString, String batch) { // 批量处理逻辑 log.info(Processing batch of {} records, batch.size()); } private String generateFileName(Message? message) { return processed_ System.currentTimeMillis() .json; } Bean public Advice expressionAdvice() { return new ExpressionEvaluatingRequestHandlerAdvice(); } }八、最佳实践合理使用通道类型DirectChannel 用于同步QueueChannel 用于缓冲PublishSubscribeChannel 用于广播避免阻塞操作使用 QueueChannel 时注意配置合适的大小和 poller错误处理始终配置错误通道记录异常并适当重试监控和管理使用 Spring Boot Actuator 监控集成端点management: endpoints: web: exposure: include: integration测试使用 SpringIntegrationTest 进行集成测试SpringBootTest SpringIntegrationTest(noAutoStartup {inputChannel}) class IntegrationFlowTest { Test void testFlow() { // 测试逻辑 } }