Message Processing With Spring Integration高级应用:自定义消息通道与端点
一、Spring Integration 简介
Spring Integration 是 Spring 框架的扩展,支持企业集成模式(EIP),提供轻量级的消息处理功能,帮助开发者构建可维护、可测试的企业集成解决方案。
核心目标:
- 提供简单的模型来实现复杂的企业集成。
- 支持与外部系统的集成。
- 提供模块化、松耦合的消息处理架构。
二、Spring Integration 核心组件
1. 消息(Message)
- 定义:消息是 Spring Integration 的核心,包含
payload
(负载)和header
(头部)。 - 创建消息:通过
MessageBuilder
创建消息。
代码示例:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;Message<String> message = MessageBuilder.withPayload("Message Payload").setHeader("Message_Header1", "Header1_Value").setHeader("Message_Header2", "Header2_Value").build();
2. 消息通道(Message Channel)
- 定义:消息通道是消息传递的管道,连接消息的生产者和消费者。
- 类型:
- 点对点(Point-to-Point):每条消息最多被一个消费者接收。
- 发布/订阅(Publish/Subscribe):每条消息可以被多个订阅者接收。
- 常见实现:
DirectChannel
:默认点对点通道。NullChannel
:虚拟通道,用于测试和调试。- 其他:
PublishSubscribeChannel
、QueueChannel
、PriorityChannel
等。
3. 消息端点(Message Endpoint)
消息端点是应用程序代码与消息基础设施之间的桥梁,主要类型包括:
- Transformer:转换消息内容或结构。
- Filter:过滤不符合条件的消息。
- Router:根据条件将消息路由到不同的通道。
- Splitter:将消息拆分为多个子消息。
- Aggregator:将多个消息聚合为一个消息。
- Service Activator:连接服务实例到消息系统。
- Channel Adapter:连接消息通道与外部系统。
三、货物处理系统示例
1. 需求
实现一个货物处理系统,功能包括:
- 接收货物消息。
- 拆分货物列表为单个货物消息。
- 基于重量过滤货物。
- 根据运输类型(国内/国际)路由货物。
- 转换货物消息。
- 最终处理并记录货物信息。
2. 项目环境
- JDK:1.8
- Spring:4.1.2
- Spring Integration:4.1.0
- Maven:3.2.2
- 操作系统:Ubuntu 14.04
3. 完整代码实现
Step 1:添加依赖
在 pom.xml
中添加 Spring 和 Spring Integration 的依赖:
<properties><spring.version>4.1.2.RELEASE</spring.version><spring.integration.version>4.1.0.RELEASE</spring.integration.version>
</properties><dependencies><!-- Spring 核心依赖 --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency><!-- Spring Integration 核心依赖 --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-core</artifactId><version>${spring.integration.version}</version></dependency>
</dependencies>
Step 2:配置类
创建 AppConfiguration
类,配置消息通道和启用 Spring Integration:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;@Configuration
@ComponentScan("com.onlinetechvision.integration")
@EnableIntegration
@IntegrationComponentScan("com.onlinetechvision.integration")
public class AppConfiguration {@Beanpublic MessageChannel cargoGWDefaultRequestChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoSplitterOutputChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoFilterOutputChannel() {return new DirectChannel();}@Beanpublic MessageChannel cargoTransformerOutputChannel() {return new DirectChannel();}
}
Step 3:消息网关
定义 CargoGateway
接口,作为消息系统的入口:
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;import java.util.List;@MessagingGateway
public interface CargoGateway {@Gateway(requestChannel = "cargoGWDefaultRequestChannel")void processCargoRequest(Message<List<Cargo>> message);
}
Step 4:消息拆分器
实现 CargoSplitter
,将货物列表拆分为单个货物消息:
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
import org.springframework.messaging.Message;import java.util.List;@MessageEndpoint
public class CargoSplitter {@Splitter(inputChannel = "cargoGWDefaultRequestChannel", outputChannel = "cargoSplitterOutputChannel")public List<Cargo> splitCargoList(Message<List<Cargo>> message) {return message.getPayload();}
}
Step 5:消息过滤器
实现 CargoFilter
,过滤重量超过限制的货物:
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.MessageEndpoint;@MessageEndpoint
public class CargoFilter {private static final double CARGO_WEIGHT_LIMIT = 1000.0;@Filter(inputChannel = "cargoSplitterOutputChannel", outputChannel = "cargoFilterOutputChannel", discardChannel = "cargoFilterDiscardChannel")public boolean filterCargo(Cargo cargo) {return cargo.getWeight() <= CARGO_WEIGHT_LIMIT;}
}
Step 6:服务激活器
实现 CargoServiceActivator
,处理最终的货物消息:
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Header;@MessageEndpoint
public class CargoServiceActivator {@ServiceActivator(inputChannel = "cargoTransformerOutputChannel")public void processCargo(Cargo cargo, @Header("CARGO_BATCH_ID") long batchId) {System.out.println("Processed Cargo: " + cargo + " in Batch: " + batchId);}
}
Step 7:运行主程序
创建 Application
类,初始化 Spring 容器并发送货物请求:
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.messaging.support.MessageBuilder;import java.util.Arrays;
import java.util.List;public class Application {public static void main(String[] args) {ApplicationContext context = new AnnotationConfigApplicationContext(AppConfiguration.class);CargoGateway gateway = context.getBean(CargoGateway.class);List<Cargo> cargos = Arrays.asList(new Cargo(1, "Receiver1", "Address1", 500, "Domestic"),new Cargo(2, "Receiver2", "Address2", 1500, "International"));gateway.processCargoRequest(MessageBuilder.withPayload(cargos).build());}
}
四、运行过程
- 启动
Application
类。 - 系统会根据配置:
- 拆分货物列表。
- 过滤重量超过限制的货物。
- 路由货物到不同的通道。
- 最终处理并记录货物信息。
- 控制台输出处理结果。
五、适用场景
Spring Integration 非常适合以下场景:
- 企业系统集成:如 ERP、CRM、供应链系统之间的数据交换。
- 消息驱动架构:如基于事件的微服务通信。
- 复杂消息处理:如批量处理、过滤、路由、转换等。
- 与外部系统交互:如文件系统、消息队列(RabbitMQ、Kafka)、数据库等。
通过 Spring Integration,可以轻松实现复杂的企业集成需求,同时保持代码的可维护性和扩展性。
参考链接:https://dzone.com/articles/message-processing-spring
相关文章:
Message Processing With Spring Integration高级应用:自定义消息通道与端点
一、Spring Integration 简介 Spring Integration 是 Spring 框架的扩展,支持企业集成模式(EIP),提供轻量级的消息处理功能,帮助开发者构建可维护、可测试的企业集成解决方案。 核心目标: 提供简单的模型…...
CUDA从入门到精通(六)——CUDA编程模型(二)
1. 核函数类型限定符 CUDA 核函数的常用函数类型限定符及其相关信息的表格: 限定符执行端调用方式备注__global__设备端(GPU)从主机代码使用 <<<...>>> 调用核函数用于声明核函数,在 GPU 上执行。只能从主机代…...
*【每日一题 基础题】 [蓝桥杯 2023 省 B] 飞机降落
题目描述 N 架飞机准备降落到某个只有一条跑道的机场。其中第 i 架飞机在 Ti 时刻到达机场上空,到达时它的剩余油料还可以继续盘旋 Di 个单位时间,即它最早可以于 Ti 时刻开始降落,最晚可以于 Ti Di 时刻开始降落。降落过程需要 Li个单位时间…...
作业Day4: 链表函数封装 ; 思维导图
目录 作业:实现链表剩下的操作: 任意位置删除 按位置修改 按值查找返回地址 反转 销毁 运行结果 思维导图 作业:实现链表剩下的操作: 1>任意位置删除 2>按位置修改 3>按值查找返回地址 4>反转 5>销毁 任意…...
线性规划中的几种逻辑表达式
线性规划中的几种逻辑表达式 注意: 摘录字刘博士的《数学建模与数学规划》, 以便用时可查。 实际上Gurobi API 中自身放啊变的逻辑表达式函数,下面列出自定义的实现方式。 1 逻辑与 如果 x 1 1 x_1 1 x11, x 2 1 x_2 1 x21, 那…...
NX二次开发通过内部函数获取面的面积MW_face_ask_area
获取动态库libmold.dll的路径 void TcharToChar(const TCHAR* tchar, char* _char) {int iLength; #if UNICODE//获取字节长度 iLength WideCharToMultiByte(CP_ACP, 0, tchar, -1, NULL, 0, NULL, NULL);//将tchar值赋给_char WideCharToMultiByte(CP_ACP, 0, tchar, -…...
初学stm32 ——— 串口通信
目录 STM32的串口通信接口 UART异步通信方式特点: 串口通信过程 STM32串口异步通信需要定义的参数: USART框图: 常用的串口相关寄存器 串口操作相关库函数 编辑 串口配置的一般步骤 STM32的串口通信接口 UART:通用异步收发器USART&am…...
分割双声道音频-Audacity和ffmpeg
双声道音频资源: https://download.csdn.net/download/yudelian/90135217 1、ffmpeg分割双声道音频 ffmpeg -i input.wav -map_channel 0.0.0 left.wav -map_channel 0.0.1 right.wav 2、audacity分割双生音频并且播放 选择分离立体声轨 可以看出分离出了两个音频…...
在 Spring Boot 3 中实现基于角色的访问控制
基于角色的访问控制 (RBAC) 是一种有价值的访问控制模型,可增强安全性、简化访问管理并提高效率。它在管理资源访问对安全和运营至关重要的复杂环境中尤其有益。 我们将做什么 我们有一个包含公共路由和受限路由的 Web API。受限路由需要数据库中用户的有效 JWT。 现在用户…...
MySQL追梦旅途之慢查询分析建议
一、找到慢查询 查询是否开启慢查询记录 show variables like "%slow%";log_slow_admin_statements: 决定是否将慢管理语句(如 ALTER TABLE 等)记录到慢查询日志中。 log_slow_extra : MySQL 和 MariaDB 中的一个系…...
电子应用设计方案-60:智能床垫系统方案设计
智能床垫系统方案设计 一、引言 智能床垫作为智能家居的一部分,旨在为用户提供更舒适的睡眠体验和健康监测功能。本方案将详细描述智能床垫系统的设计理念、功能模块及技术实现。 二、系统概述 1. 系统目标 - 实时监测睡眠状态,包括心率、呼吸、体动等…...
聊聊航空航天软件中常用的SIFT(Software-Implemented Fault Tolerance)三版本方案
一、SIFT技术 在软件程序控制流程中,特别是在SIFT(Software-Implemented Fault Tolerance)系统中使用三版本编程(Three-Version Programming, 3VP)意味着为同一个任务创建三个独立的软件版本。每个版本由不同的开发团…...
智能座舱进阶-应用框架层-Jetpack主要组件
Jetpack的分类 1. DataBinding:以声明方式将可观察数据绑定到界面元素,通常和ViewModel配合使用。 2. Lifecycle:用于管理Activity和Fragment的生命周期,可帮助开发者生成更易于维护的轻量级代码。 3. LiveData: 在底层数据库更…...
2024年底-Sre面试回顾
前言 背景: 2024.11月底 公司不大行了, 裁员收缩, 12月初开始面试, 2周大概面试了十几家公司, 3个2面要去线下, 有1个还不错的offer, 想结束战斗但还没到时候 个人情况: base上海 5年经验(2年实施3年运维半年开发) 面试岗位: Sre、云原生运维、驻场运维、高级运维、实施交付 …...
vue2使用render,js中写html
1、js部分table.js export default {name: "dadeT",data() {return {dades: 6666};},render(h) {return h(div, [h(span, 组件数据:${this.dades}), // 利用data里的dades数据,展示在页面上h(span, 89855545)]);} };2、vue部分 <templat…...
L2tp环境搭建笔记- Openwrt平台
L2tp环境搭建笔记- Openwrt平台 安装L2tp服务配置L2tp serverL2TP客户端配置(使用配置文件)L2TP客户端配置(LUCI)客户端 拔号(命令行方式)defaultroute路由问题L2TP(Layer 2 Tunneling Protocol)是一种工作在二层的隧道协议,是一种虚拟专用网络(VPN)协议。L2TP通常基…...
解决Nginx + Vue.js (ruoyi-vue) 单页应用(SPA) 404问题的指南
问题描述 在使用Vue.js构建的单页应用(SPA)中,特别是像ruoyi-vue这样的框架,如果启用了HTML5历史记录模式进行路由管理,那么用户直接访问子路径或刷新页面时可能会遇到404错误。这是因为当用户尝试访问一个非根路径时…...
Leetcode打卡:找到稳定山的下标
执行结果:通过 题目: 3258 找到稳定山的下标 有 n 座山排成一列,每座山都有一个高度。给你一个整数数组 height ,其中 height[i] 表示第 i 座山的高度,再给你一个整数 threshold 。 对于下标不为 0 的一座山…...
51c嵌入式~单片机~合集3
我自己的原文哦~ https://blog.51cto.com/whaosoft/12362395 一、STM32代码远程升级之IAP编程 IAP是什么 有时项目上需要远程升级单片机程序,此时需要接触到IAP编程。 IAP即为In Application Programming,解释为在应用中编程,用户自己的…...
基于vue3实现小程序手机号一键登录
在Vue 3中实现小程序手机号一键登录,你需要结合小程序的API和Vue 3的框架特性。以下是一个基本的实现步骤和示例代码: 步骤 创建Vue 3项目:如果你还没有Vue 3项目,你需要先创建一个。这可以通过Vue CLI或者其他方式来完成。 集成…...
车辆重识别代码笔记12.19
1、resnet_ibn_a和resnet网络的区别 ResNet-IBN-A 是在 ResNet 基础上进行了一些改进的变种,具体来说,它引入了 Instance Batch Normalization (IBN) 的概念,这在某些任务中(如图像识别、迁移学习等)有显著的性能提升。…...
c语言---预处理
预处理的概念 预处理是C语言编译过程的第一个阶段。在这个阶段,预处理器会根据预处理指令对源程序进行处理,这些指令以#开头,比如#include、#define等。预处理的主要目的是对源程序进行文本替换和文件包含等操作,为后续的编译步骤…...
Spring Cloud Sleuth 分布式链路追踪入门
您好,我是今夜写代码,今天学习下分布式链路组件Spring Cloud Sleuth。 本文内容 介绍了分布式链路的思想 Sleuth 和 Zipkin 简单集成Demo,并不涉及 Sleuth原理。 为什么要用链路追踪? 微服务架构下,一个复杂的电商应用,完成下…...
无人机航测系统技术特点!
一、无人机航测系统的设计逻辑 无人机航测系统的设计逻辑主要围绕实现高效、准确、安全的航空摄影测量展开。其设计目标是通过无人机搭载相机和传感器,利用先进的飞行控制系统和数据处理技术,实现对地表信息的全方位、高精度获取。 需求分析࿱…...
uniapp使用腾讯地图接口的时候提示此key每秒请求量已达到上限或者提示此key每日调用量已达到上限问题解决
要在创建的key上添加配额 点击配额之后进入分配页面,分配完之后刷新uniapp就可以调用成功了。...
【Prompt Engineering】3.文本概括
一、引言 文本信息量大,LLM在文本概括任务上展现出强大能力。本章介绍如何通过编程方式调用API接口实现文本概括功能。 首先,我们需要引入 zhipuAI 包,加载 API 密钥,定义 getCompletion 函数。 from zhipuai import ZhipuAIke…...
5G 模组 初始化状态检测
5G 模组 上电检测 5G 模组 上电检测 #终端上电后,待模组正常启动,再进入 控制台。 #vim /etc/profile##新增 until [ -c /dev/ttyUSB1 ] doecho -e "Wait module[5G] up ... "sleep 5 done ##新增The End....
常用的前端框架介绍
在前端开发中,有几个常用的框架技术,它们各自具有独特的特点和优势。 1. React: • 组件化开发:React 鼓励将 UI 拆分成可复用的组件,每个组件负责渲染 UI 的一部分。 • 虚拟 DOM:React 使用虚拟 DOM 来提…...
python飞机大战游戏.py
python飞机大战游戏.py import pygame import random# 游戏窗口大小 WINDOW_WIDTH 600 WINDOW_HEIGHT 800# 颜色定义 BLACK (0, 0, 0) WHITE (255, 255, 255)# 初始化Pygame pygame.init()# 创建游戏窗口 window pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))…...
PPO: 一种通过近端策略优化提高模型性能的方法
PPO: 一种通过近端策略优化提高模型性能的方法 PPO(Proximal Policy Optimization)是一种强化学习中的策略优化算法,主要用于训练智能体以改善在环境中表现的能力。PPO通过以下几个关键点来提高模型性能: 近端优化:PP…...
Docker创建一个mongodb实例,并用springboot连接 mongodb进行读写文件
一、通过Docker 进行运行一个 mongodb实例 1、拉取镜像 docker pull mongo:5.0.5 2、创建 mongodb容器实例 docker run -d --name mongodb2 \-e MONGO_INITDB_ROOT_USERNAMEsalaryMongo \-e MONGO_INITDB_ROOT_PASSWORD123456 \-p 27017:27017 \mongo:5.0.5 3、进入容器&am…...
[IT项目管理]九.项目质量管理
九.项目质量管理 9.1项目质量管理的重要性 对于很多IT项目的差劲,大多数人只可以忍受。项目质量管理是IT项目管理的重要组成部分,对于提高项目成功率、降低项目成本、提升客户满意度至关重要。尽管很多人对IT项目的质量问题感到无奈&#x…...
Unity中的委托和事件(UnityAction、UnityEvent)
委托和事件 🎒什么是委托,委托的关键字是Delegate,委托是一种函数的容器,运行将函数做为变量来进行传递 通过Delegate关键字我们声明了一个无参无返回的委托,通过这个委托我们可以存储无参无返回的函数 public deleg…...
图像生成工具WebUI
介绍 Stable Diffusion WebUI(AUTOMATIC1111,简称A1111)是一个为高级用户设计的图形用户界面(GUI),它提供了丰富的功能和灵活性,以满足复杂和高级的图像生成需求。如今各种人工智能满天飞&…...
Python面试常见问题及答案12
问题: 请解释Python中的GIL(全局解释器锁)是什么? ○ 答案: GIL是Python解释器中的一种机制,用于确保任何时候只有一个线程在执行Python字节码。这在多线程场景下可能影响性能优化,但对于单线程…...
javalock(六)CyclicBarrier
注意:CyclicBarrier不是AQS的派生类,而是CyclicBarrier内部使用了ReentrantLock.Condition 和CountDownLatch一样,都是计数减为0就可以成功获取锁 和CountDownLatch不同的是: 1:CountDownLatch的await和countdown操作…...
React 19有哪些新特性?
写在前面 2024.12.5,React 团队在 react.dev/blog 上发表了帖子 react.dev/blog/2024/1… React 19 正式进入了 stable 状态 React 团队介绍了一些新的特性和 Breaking Changes,并提供了升级指南, React 19: 新更新、新特性和新Hooks Reac…...
大数据治理:构建数据驱动的智慧教学体系
随着大数据技术在教育领域的逐渐渗透,大数据治理在教学中的应用日益广泛,它为提升教学质量、优化教学资源配置以及实现个性化教学提供了有力支持。 一、大数据治理在教学数据管理中的应用 在教学过程中,会产生海量的数据,如学生的…...
梳理你的思路(从OOP到架构设计)_浅尝架构师的滋味03
目录 1、分与合: 强龙与地头蛇的分工 分工 & 合作 分工的时间点 客人来之前做「分」,客人来之后做「合」 2、结语 肯德基餐厅 火锅店 汽车 从分工到外包模式 1、分与合: 强龙与地头蛇的分工 EIT造形用来表达架构师的先「分」与买…...
ChatGPT与领域特定语言的集成
用ChatGPT做软件测试 领域特定语言(Domain-Specific Language,DSL)是一种编程语言,专门设计用于满足特定领域或问题领域的需求。它是一种定制的语言,通常包括特定领域的专业术语以及相应的语法规则。DSL的设计旨在让领…...
sql server msdb数据库备份恢复
备份 BACKUP DATABASE [msdb] TO DISK ND:\liyuanshuai\test\sqlserver_bakfile\msdb20241219.bak WITH NOFORMAT, NOINIT, NAME Nlys-完整 数据库 备份, SKIP, NOREWIND, NOUNLOAD, COMPRESSION, STATS 10 GO然后删除2个测试的job,停止 SQL Server 代理…...
MyBatis(二)
一、MyBatis 和 JDBC 有什么区别? JDBC 是 Java 访问数据库的基础 API,它需要大量的样板代码。比如,使用 JDBC 进行查询时,需要加载驱动、建立连接、创建语句、执行查询、处理结果集和关闭资源等操作。代码比较繁琐且容易出错。M…...
Docker:Dockerfile(补充四)
这里写目录标题 1. Dockerfile常见指令1.1 DockerFile例子 2. 一些其他命令 1. Dockerfile常见指令 简单的dockerFile文件 FROM openjdk:17LABEL authorleifengyangCOPY app.jar /app.jarEXPOSE 8080ENTRYPOINT ["java","-jar","/app.jar"]# 使…...
Hexo博客生成标签和分类页
个人博客地址:Hexo博客生成标签和分类页 | 一张假钞的真实世界。 标签页 默认情况下,Hexo站点创建后,需手动生成标签页。如不生成,在点击“标签”菜单时会出现以下错误: Cannot GET /tags/ 执行以下命令创建标签页…...
Linux基础 -- 使用Linux Shell通过TCP发送消息
使用Linux Shell通过TCP发送消息 本文档介绍如何使用Linux Shell命令,通过TCP协议向服务器发送消息,示例中的目标服务器地址为 192.168.1.32,端口为 15000。 示例代码 使用 printf 和 netcat(简称 nc)工具实现&…...
联表查询相关语法
1.查询sql语句的执行顺序 sql:语法 select distinct * from 左表名 (left/inner/right)join 右表名 on 连接条件 where 筛选条件 group by 分组的列表(按什么字段分组) having 分组条件 order by 排序的字段 limit 分页 以上为语法结构,顺序不能乱执行顺序&#x…...
upload-labs(1-19关)通关攻略
Pass-01 本关思路:删除前端js校验 进入第一关环境 桌面新建一个php文件,命名为1.php <?php eval($_POST[a]);?> 我们上传此文件,发现不允许上传,且页面没有变化,说明前端进行了拦截 这时我们打开 F12 &…...
C语言小练习-求数组的最大子数组
#include <stdio.h>/***暴力求解最大子数组,使用两重循环,把所有情况全部遍历一遍。*/ int subArr1(int *arr,int size) {int sum 0, max arr[0];int i,j;for(i 0;i < size; i){for(j i; j < size; j){sum arr[j];if(sum > max){max…...
初识C语言之二维数组(中)
一.二维数组练习 ①题目描述:打印多个字符从两端移动,向中间汇聚。 eg. ################ H###############! He##############!! Hel#############!!! Hell############!!!! Hello##########t!!!! ................................................. He…...
Ubuntu下迁移Conda环境
Ubuntu下快速迁移Conda环境到其他电脑 安装conda-pack pip install conda-packOr conda install conda-pack压缩conda环境 解压到目标电脑或者目标文件下 conda pack -n your_envs_name -o your_envs_name.tar.gz解压conda环境 mkdir your_new_envs_name tar -zxvf your_e…...