【功能完善】IoT: 重命名插件模块,重构插件管理逻辑,优化代码结构,更新配置文件以支持新插件架构。

This commit is contained in:
安浩浩
2025-01-24 23:17:26 +08:00
parent 916024b891
commit 698cec92bd
27 changed files with 198 additions and 168 deletions

View File

@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modules>
<module>yudao-module-iot-plugin-common</module>
<module>yudao-module-iot-plugin-http</module>
<module>yudao-module-iot-plugin-mqtt</module>
<module>yudao-module-iot-plugin-emqx</module>
</modules>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-plugins</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>
物联网 插件 模块
</description>
</project>

View File

@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-plugins</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-plugin-common</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
物联网 插件 模块 - 通用功能
</description>
<dependencies>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-common</artifactId>
</dependency>
<!-- 项目依赖 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<!-- 其他依赖项 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- PF4J -->
<dependency>
<groupId>org.pf4j</groupId>
<artifactId>pf4j-spring</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,60 @@
package cn.iocoder.yudao.module.iot.plugin.common.api;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.api.device.dto.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.IotDeviceStatusUpdateReqDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.client.RestTemplate;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
@Slf4j
public class DeviceDataApiClient implements DeviceDataApi {
private final RestTemplate restTemplate;
private final String deviceDataUrl;
// 可以通过构造器把 RestTemplate 和 baseUrl 注入进来
public DeviceDataApiClient(RestTemplate restTemplate, String deviceDataUrl) {
this.restTemplate = restTemplate;
this.deviceDataUrl = deviceDataUrl;
}
@Override
public CommonResult<Boolean> updateDeviceStatus(IotDeviceStatusUpdateReqDTO updateReqDTO) {
// 示例:如果对应的远程地址是 /rpc-api/iot/device-data/update-status
String url = deviceDataUrl + "/rpc-api/iot/device-data/update-status";
return doPost(url, updateReqDTO, "updateDeviceStatus");
}
@Override
public CommonResult<Boolean> reportDeviceEventData(IotDeviceEventReportReqDTO reportReqDTO) {
// 示例:如果对应的远程地址是 /rpc-api/iot/device-data/report-event
String url = deviceDataUrl + "/rpc-api/iot/device-data/report-event";
return doPost(url, reportReqDTO, "reportDeviceEventData");
}
@Override
public CommonResult<Boolean> reportDevicePropertyData(IotDevicePropertyReportReqDTO reportReqDTO) {
// 示例:如果对应的远程地址是 /rpc-api/iot/device-data/report-property
String url = deviceDataUrl + "/rpc-api/iot/device-data/report-property";
return doPost(url, reportReqDTO, "reportDevicePropertyData");
}
/**
* 将与远程服务交互的通用逻辑抽取成一个私有方法
*/
private <T> CommonResult<Boolean> doPost(String url, T requestBody, String actionName) {
log.info("[{}] Sending request to URL: {}", actionName, url);
try {
// 这里指定返回类型为 CommonResult<?>,根据后台服务返回的实际结构做调整
restTemplate.postForObject(url, requestBody, CommonResult.class);
return success(true);
} catch (Exception e) {
log.error("[{}] Error sending request to URL: {}", actionName, url, e);
return CommonResult.error(400, "Request error: " + e.getMessage());
}
}
}

View File

@@ -0,0 +1,29 @@
package cn.iocoder.yudao.module.iot.plugin.common.config;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.plugin.common.api.DeviceDataApiClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@Configuration
public class DeviceDataApiInitializer {
@Value("${iot.device-data.url}")
private String deviceDataUrl;
@Bean
public RestTemplate restTemplate() {
// 如果你有更多的自定义需求,比如连接池、超时时间等,可以在这里设置
return new RestTemplateBuilder().build();
}
@Bean
public DeviceDataApi deviceDataApi(RestTemplate restTemplate) {
// 返回我们自定义的 Client 实例
return new DeviceDataApiClient(restTemplate, deviceDataUrl);
}
}

View File

@@ -0,0 +1,6 @@
plugin.id=plugin-emqx
plugin.class=cn.iocoder.yudao.module.iot.plugin.EmqxPlugin
plugin.version=1.0.0
plugin.provider=ahh
plugin.dependencies=
plugin.description=plugin-emqx-1.0.0

View File

@@ -0,0 +1,164 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-plugins</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>yudao-module-iot-plugin-emqx</artifactId>
<name>${project.artifactId}</name>
<description>
物联网 插件模块 - emqx 插件
</description>
<properties>
<!-- 插件相关 -->
<plugin.id>emqx-plugin</plugin.id>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.EmqxPlugin</plugin.class>
<plugin.version>0.0.1</plugin.version>
<plugin.provider>ahh</plugin.provider>
<plugin.description>emqx-plugin-0.0.1</plugin.description>
<plugin.dependencies/>
</properties>
<build>
<plugins>
<!-- DOESN'T WORK WITH MAVEN 3 (I defined the plugin metadata in properties section)
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<version>1.0-alpha-2</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>read-project-properties</goal>
</goals>
<configuration>
<files>
<file>plugin.properties</file>
</files>
</configuration>
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>unzip jar file</id>
<phase>package</phase>
<configuration>
<target>
<unzip src="target/${project.artifactId}-${project.version}.${project.packaging}"
dest="target/plugin-classes"/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/assembly.xml
</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>attached</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifestEntries>
<Plugin-Id>${plugin.id}</Plugin-Id>
<Plugin-Class>${plugin.class}</Plugin-Class>
<Plugin-Version>${plugin.version}</Plugin-Version>
<Plugin-Provider>${plugin.provider}</Plugin-Provider>
<Plugin-Description>${plugin.description}</Plugin-Description>
<Plugin-Dependencies>${plugin.dependencies}</Plugin-Dependencies>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- 其他依赖项 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- PF4J Spring 集成 -->
<dependency>
<groupId>org.pf4j</groupId>
<artifactId>pf4j-spring</artifactId>
<scope>provided</scope>
</dependency>
<!-- 项目依赖 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- Vert.x 核心依赖 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<!-- Vert.x Web 模块 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<!-- MQTT -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,31 @@
<assembly>
<id>plugin</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<scope>runtime</scope>
<outputDirectory>lib</outputDirectory>
<includes>
<include>*:jar:*</include>
</includes>
</dependencySet>
</dependencySets>
<!--
<fileSets>
<fileSet>
<directory>target/classes</directory>
<outputDirectory>classes</outputDirectory>
</fileSet>
</fileSets>
-->
<fileSets>
<fileSet>
<directory>target/plugin-classes</directory>
<outputDirectory>classes</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@@ -0,0 +1,42 @@
package cn.iocoder.yudao.module.iot.plugin;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class EmqxPlugin extends Plugin {
private ExecutorService executorService;
public EmqxPlugin(PluginWrapper wrapper) {
super(wrapper);
this.executorService = Executors.newSingleThreadExecutor();
}
@Override
public void start() {
log.info("EmqxPlugin.start()");
if (executorService.isShutdown() || executorService.isTerminated()) {
executorService = Executors.newSingleThreadExecutor();
}
DeviceDataApi deviceDataApi = SpringUtil.getBean(DeviceDataApi.class);
if (deviceDataApi == null) {
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
return;
}
}
@Override
public void stop() {
log.info("EmqxPlugin.stop()");
}
}

View File

@@ -0,0 +1,6 @@
plugin.id=yudao-module-iot-plugin-http
plugin.class=cn.iocoder.yudao.module.iot.plugin.http.config.HttpVertxPlugin
plugin.version=1.0.0
plugin.provider=yudao
plugin.dependencies=
plugin.description=yudao-module-iot-plugin-http-1.0.0

View File

@@ -0,0 +1,136 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-plugins</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>yudao-module-iot-plugin-http</artifactId>
<version>1.0.0</version>
<name>${project.artifactId}</name>
<description>
物联网 插件模块 - http 插件
</description>
<properties>
<!-- 插件相关 -->
<plugin.id>${project.artifactId}</plugin.id>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.http.config.HttpVertxPlugin</plugin.class>
<plugin.version>${project.version}</plugin.version>
<plugin.provider>yudao</plugin.provider>
<plugin.description>${project.artifactId}-${project.version}</plugin.description>
<plugin.dependencies/>
</properties>
<build>
<plugins>
<!-- 插件模式 zip -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>unzip jar file</id>
<phase>package</phase>
<configuration>
<target>
<unzip src="target/${project.artifactId}-${project.version}.${project.packaging}"
dest="target/plugin-classes"/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/assembly.xml
</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>attached</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 插件模式 jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifestEntries>
<Plugin-Id>${plugin.id}</Plugin-Id>
<Plugin-Class>${plugin.class}</Plugin-Class>
<Plugin-Version>${plugin.version}</Plugin-Version>
<Plugin-Provider>${plugin.provider}</Plugin-Provider>
<Plugin-Description>${plugin.description}</Plugin-Description>
<Plugin-Dependencies>${plugin.dependencies}</Plugin-Dependencies>
</manifestEntries>
</archive>
</configuration>
</plugin>
<!-- 独立模式 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
<configuration>
<classifier>-standalone</classifier>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- 项目依赖 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-plugin-common</artifactId>
<version>${revision}</version>
</dependency>
<!-- Vert.x Web -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,24 @@
<assembly>
<id>plugin</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<scope>runtime</scope>
<outputDirectory>lib</outputDirectory>
<includes>
<include>*:jar:*</include>
</includes>
</dependencySet>
</dependencySets>
<fileSets>
<fileSet>
<directory>target/plugin-classes</directory>
<outputDirectory>classes</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@@ -0,0 +1,16 @@
package cn.iocoder.yudao.module.iot.plugin.http;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication(scanBasePackages = "cn.iocoder.yudao.module.iot.plugin")
public class HttpPluginSpringbootApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(HttpPluginSpringbootApplication.class);
application.setWebApplicationType(WebApplicationType.NONE);
application.run(args);
}
}

View File

@@ -0,0 +1,81 @@
package cn.iocoder.yudao.module.iot.plugin.http.config;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.plugin.http.service.HttpVertxHandler;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginWrapper;
import org.pf4j.spring.SpringPlugin;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
@Slf4j
public class HttpVertxPlugin extends SpringPlugin {
private static final int PORT = 8092;
private Vertx vertx;
public HttpVertxPlugin(PluginWrapper wrapper) {
super(wrapper);
}
@Override
public void start() {
log.info("HttpVertxPlugin.start()");
// 获取 DeviceDataApi 实例
DeviceDataApi deviceDataApi = SpringUtil.getBean(DeviceDataApi.class);
if (deviceDataApi == null) {
log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!");
return;
}
// 初始化 Vert.x
vertx = Vertx.vertx();
Router router = Router.router(vertx);
// 处理 Body
router.route().handler(BodyHandler.create());
// 设置路由
router.post("/sys/:productKey/:deviceName/thing/event/property/post")
.handler(new HttpVertxHandler(deviceDataApi));
// 启动 HTTP 服务器
vertx.createHttpServer()
.requestHandler(router)
.listen(PORT, http -> {
if (http.succeeded()) {
log.info("HTTP 服务器启动成功,端口为: {}", PORT);
} else {
log.error("HTTP 服务器启动失败", http.cause());
}
});
}
@Override
public void stop() {
log.info("HttpVertxPlugin.stop()");
if (vertx != null) {
vertx.close(ar -> {
if (ar.succeeded()) {
log.info("Vert.x 关闭成功");
} else {
log.error("Vert.x 关闭失败", ar.cause());
}
});
}
}
@Override
protected ApplicationContext createApplicationContext() {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.setClassLoader(getWrapper().getPluginClassLoader());
applicationContext.refresh();
return applicationContext;
}
}

View File

@@ -0,0 +1,16 @@
package cn.iocoder.yudao.module.iot.plugin.http.config;
import org.pf4j.DefaultPluginManager;
import org.pf4j.PluginWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HttpVertxPluginConfiguration {
@Bean(initMethod = "start")
public HttpVertxPlugin httpVertxPlugin() {
PluginWrapper pluginWrapper = new PluginWrapper(new DefaultPluginManager(), null, null, null);
return new HttpVertxPlugin(pluginWrapper);
}
}

View File

@@ -0,0 +1,105 @@
package cn.iocoder.yudao.module.iot.plugin.http.service;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi;
import cn.iocoder.yudao.module.iot.api.device.dto.IotDevicePropertyReportReqDTO;
import io.vertx.core.Handler;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HttpVertxHandler implements Handler<RoutingContext> {
private final DeviceDataApi deviceDataApi;
public HttpVertxHandler(DeviceDataApi deviceDataApi) {
this.deviceDataApi = deviceDataApi;
}
@Override
public void handle(RoutingContext ctx) {
String productKey = ctx.pathParam("productKey");
String deviceName = ctx.pathParam("deviceName");
RequestBody requestBody = ctx.body();
JSONObject jsonData;
try {
jsonData = JSONUtil.parseObj(requestBody.asJsonObject());
} catch (Exception e) {
JSONObject res = createResponseJson(
400,
new JSONObject(),
null,
"请求数据不是合法的 JSON 格式: " + e.getMessage(),
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(400)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(res.toString());
return;
}
String id = jsonData.getStr("id", null);
try {
// 调用主程序的接口保存数据
IotDevicePropertyReportReqDTO createDTO = IotDevicePropertyReportReqDTO.builder()
.productKey(productKey)
.deviceName(deviceName)
.params(jsonData) // TODO 芋艿:这块要优化
.build();
deviceDataApi.reportDevicePropertyData(createDTO);
// 构造成功响应内容
JSONObject successRes = createResponseJson(
200,
new JSONObject(),
id,
"success",
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(200)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(successRes.toString());
} catch (Exception e) {
JSONObject errorRes = createResponseJson(
500,
new JSONObject(),
id,
"The format of result is error!",
"thing.event.property.post",
"1.0");
ctx.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json; charset=UTF-8")
.end(errorRes.toString());
}
}
/**
* 创建标准化的响应 JSON 对象
*
* @param code 响应状态码(业务层面的)
* @param data 返回的数据对象JSON
* @param id 请求的 id可选
* @param message 返回的提示信息
* @param method 返回的 method 标识
* @param version 返回的版本号
* @return 构造好的 JSON 对象
*/
private JSONObject createResponseJson(int code, JSONObject data, String id, String message, String method,
String version) {
JSONObject res = new JSONObject();
res.set("code", code);
res.set("data", data != null ? data : new JSONObject());
res.set("id", id);
res.set("message", message);
res.set("method", method);
res.set("version", version);
return res;
}
}

View File

@@ -0,0 +1,7 @@
spring:
application:
name: yudao-module-iot-plugin-http
iot:
device-data:
url: http://127.0.0.1:48080

View File

@@ -0,0 +1,7 @@
plugin.id=mqtt-plugin
plugin.description=Vert.x MQTT plugin
plugin.class=cn.iocoder.yudao.module.iot.plugin.MqttPlugin
plugin.version=1.0.0
plugin.requires=
plugin.provider=ahh
plugin.license=Apache-2.0

View File

@@ -0,0 +1,155 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-plugins</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>yudao-module-iot-plugin-mqtt</artifactId>
<name>${project.artifactId}</name>
<description>
物联网 插件模块 - mqtt 插件
</description>
<properties>
<!-- 插件相关 -->
<plugin.id>mqtt-plugin</plugin.id>
<plugin.class>cn.iocoder.yudao.module.iot.plugin.MqttPlugin</plugin.class>
<plugin.version>0.0.1</plugin.version>
<plugin.provider>ahh</plugin.provider>
<plugin.description>mqtt-plugin-0.0.1</plugin.description>
<plugin.dependencies/>
</properties>
<build>
<plugins>
<!-- DOESN'T WORK WITH MAVEN 3 (I defined the plugin metadata in properties section)
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>properties-maven-plugin</artifactId>
<version>1.0-alpha-2</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>read-project-properties</goal>
</goals>
<configuration>
<files>
<file>plugin.properties</file>
</files>
</configuration>
</execution>
</executions>
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>unzip jar file</id>
<phase>package</phase>
<configuration>
<target>
<unzip src="target/${project.artifactId}-${project.version}.${project.packaging}"
dest="target/plugin-classes"/>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptors>
<descriptor>
src/main/assembly/assembly.xml
</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>attached</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifestEntries>
<Plugin-Id>${plugin.id}</Plugin-Id>
<Plugin-Class>${plugin.class}</Plugin-Class>
<Plugin-Version>${plugin.version}</Plugin-Version>
<Plugin-Provider>${plugin.provider}</Plugin-Provider>
<Plugin-Description>${plugin.description}</Plugin-Description>
<Plugin-Dependencies>${plugin.dependencies}</Plugin-Dependencies>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- 其他依赖项 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- PF4J Spring 集成 -->
<dependency>
<groupId>org.pf4j</groupId>
<artifactId>pf4j-spring</artifactId>
<scope>provided</scope>
</dependency>
<!-- 项目依赖 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- Vert.x MQTT -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
<version>4.5.11</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,31 @@
<assembly>
<id>plugin</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<scope>runtime</scope>
<outputDirectory>lib</outputDirectory>
<includes>
<include>*:jar:*</include>
</includes>
</dependencySet>
</dependencySets>
<!--
<fileSets>
<fileSet>
<directory>target/classes</directory>
<outputDirectory>classes</outputDirectory>
</fileSet>
</fileSets>
-->
<fileSets>
<fileSet>
<directory>target/plugin-classes</directory>
<outputDirectory>classes</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@@ -0,0 +1,36 @@
package cn.iocoder.yudao.module.iot.plugin;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;
@Slf4j
public class MqttPlugin extends Plugin {
private MqttServerExtension mqttServerExtension;
public MqttPlugin(PluginWrapper wrapper) {
super(wrapper);
}
@Override
public void start() {
log.info("MQTT Plugin started.");
mqttServerExtension = new MqttServerExtension();
mqttServerExtension.startMqttServer();
}
@Override
public void stop() {
log.info("MQTT Plugin stopped.");
if (mqttServerExtension != null) {
mqttServerExtension.stopMqttServer().onComplete(ar -> {
if (ar.succeeded()) {
log.info("Stopped MQTT Server successfully");
} else {
log.error("Failed to stop MQTT Server: {}", ar.cause().getMessage());
}
});
}
}
}

View File

@@ -0,0 +1,231 @@
package cn.iocoder.yudao.module.iot.plugin;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttDisconnectMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.Extension;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* 根据官方示例,整合常见 MQTT 功能到 PF4J 的 Extension 类中
*/
@Slf4j
@Extension
public class MqttServerExtension {
private Vertx vertx;
private MqttServer mqttServer;
/**
* 启动 MQTT 服务端
* 可根据需要决定是否启用 SSL/TLS、WebSocket、多实例部署等
*/
public void startMqttServer() {
// 初始化 Vert.x
vertx = Vertx.vertx();
// ========== 如果需要 SSL/TLS请参考下面注释启用注释并替换端口、证书路径等 ==========
// MqttServerOptions options = new MqttServerOptions()
// .setPort(8883)
// .setKeyCertOptions(new PemKeyCertOptions()
// .setKeyPath("./src/test/resources/tls/server-key.pem")
// .setCertPath("./src/test/resources/tls/server-cert.pem"))
// .setSsl(true);
// ========== 如果需要 WebSocket请设置 setUseWebSocket(true) ==========
// options.setUseWebSocket(true);
// ========== 默认不启用 SSL 的示例 ==========
MqttServerOptions options = new MqttServerOptions()
.setPort(1883)
.setHost("0.0.0.0")
.setUseWebSocket(false); // 如果需要 WebSocket请改为 true
mqttServer = MqttServer.create(vertx, options);
// 指定 endpointHandler处理客户端连接等
mqttServer.endpointHandler(endpoint -> {
handleClientConnect(endpoint);
handleDisconnect(endpoint);
handleSubscribe(endpoint);
handleUnsubscribe(endpoint);
handlePublish(endpoint);
handlePing(endpoint);
});
// 启动监听
mqttServer.listen(ar -> {
if (ar.succeeded()) {
log.info("MQTT server is listening on port {}", mqttServer.actualPort());
} else {
log.error("Error on starting the server", ar.cause());
}
});
}
/**
* 优雅关闭 MQTT 服务端
*/
public Future<Void> stopMqttServer() {
if (mqttServer != null) {
return mqttServer.close().onComplete(ar -> {
if (ar.succeeded()) {
log.info("MQTT server closed.");
if (vertx != null) {
vertx.close();
log.info("Vert.x instance closed.");
}
} else {
log.error("Failed to close MQTT server: {}", ar.cause().getMessage());
}
});
}
return Future.succeededFuture();
}
// ==================== 以下为官方示例中常见事件的处理封装 ====================
/**
* 处理客户端连接 (CONNECT)
*/
private void handleClientConnect(MqttEndpoint endpoint) {
// 打印 CONNECT 的主要信息
log.info("MQTT client [{}] request to connect, clean session = {}",
endpoint.clientIdentifier(), endpoint.isCleanSession());
if (endpoint.auth() != null) {
log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword());
}
log.info("[properties = {}]", endpoint.connectProperties());
if (endpoint.will() != null) {
log.info("[will topic = {}, msg = {}, QoS = {}, isRetain = {}]",
endpoint.will().getWillTopic(),
new String(endpoint.will().getWillMessageBytes()),
endpoint.will().getWillQos(),
endpoint.will().isWillRetain());
}
log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds());
// 接受远程客户端的连接
endpoint.accept(false);
}
/**
* 处理客户端主动断开 (DISCONNECT)
*/
private void handleDisconnect(MqttEndpoint endpoint) {
endpoint.disconnectMessageHandler((MqttDisconnectMessage disconnectMessage) -> {
log.info("Received disconnect from client [{}], reason code = {}",
endpoint.clientIdentifier(), disconnectMessage.code());
});
}
/**
* 处理客户端订阅 (SUBSCRIBE)
*/
private void handleSubscribe(MqttEndpoint endpoint) {
endpoint.subscribeHandler((MqttSubscribeMessage subscribe) -> {
List<MqttSubAckReasonCode> reasonCodes = new ArrayList<>();
for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
log.info("Subscription for {} with QoS {}", s.topicName(), s.qualityOfService());
// 将客户端请求的 QoS 转换为返回给客户端的 reason code可能是错误码或实际 granted QoS
reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
}
// 回复 SUBACKMQTT 5.0 时可指定 reasonCodes、properties
endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
});
}
/**
* 处理客户端取消订阅 (UNSUBSCRIBE)
*/
private void handleUnsubscribe(MqttEndpoint endpoint) {
endpoint.unsubscribeHandler((MqttUnsubscribeMessage unsubscribe) -> {
for (String topic : unsubscribe.topics()) {
log.info("Unsubscription for {}", topic);
}
// 回复 UNSUBACKMQTT 5.0 时可指定 reasonCodes、properties
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
}
/**
* 处理客户端发布的消息 (PUBLISH)
*/
private void handlePublish(MqttEndpoint endpoint) {
// 接收 PUBLISH 消息
endpoint.publishHandler((MqttPublishMessage message) -> {
String payload = message.payload().toString(Charset.defaultCharset());
log.info("Received message [{}] on topic [{}] with QoS [{}]",
payload, message.topicName(), message.qosLevel());
// 根据不同 QoS回复客户端
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId());
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId());
}
});
// 如果 QoS = 2需要处理 PUBREL
endpoint.publishReleaseHandler(messageId -> {
endpoint.publishComplete(messageId);
});
}
/**
* 处理客户端 PINGREQ
*/
private void handlePing(MqttEndpoint endpoint) {
endpoint.pingHandler(v -> {
// 这里仅做日志, PINGRESP 已自动发送
log.info("Ping received from client [{}]", endpoint.clientIdentifier());
});
}
// ==================== 如果需要服务端向客户端发布消息,可用以下示例 ====================
/**
* 服务端主动向已连接的某个 endpoint 发布消息的示例
* 如果使用 MQTT 5.0,可以传递更多消息属性
*/
public void publishToClient(MqttEndpoint endpoint, String topic, String content) {
endpoint.publish(topic,
Buffer.buffer(content),
MqttQoS.AT_LEAST_ONCE, // QoS 自行选择
false,
false);
// 处理 QoS 1 和 QoS 2 的 ACK
endpoint.publishAcknowledgeHandler(messageId -> {
log.info("Received PUBACK from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId);
}).publishReceivedHandler(messageId -> {
endpoint.publishRelease(messageId);
}).publishCompletionHandler(messageId -> {
log.info("Received PUBCOMP from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId);
});
}
// ==================== 如果需要多实例部署,用于多核扩展,可参考以下思路 ====================
// 例如,在宿主应用或插件中循环启动多个 MqttServerExtension 实例,或使用 Vert.x 的 deployVerticle:
// DeploymentOptions options = new DeploymentOptions().setInstances(10);
// vertx.deployVerticle(() -> new MyMqttVerticle(), options);
}