
首先 ThingsBoard 微服务架构目前已经支持的 MQTT、HTTP、CoAP、SNMP、LwM2M等协议。但是一些厂商的数据是 TCP 私有协议并且高度定制化需要扩展 Transport 微服务的情况下需要定制化自己的 transport 。
1.通过队列通信
ThingsBoard一共有几种消息队列用于微服务模块之间通信:
(1)transport 通过 tb.transport.api.requests 将设备鉴权任务交给 tb-core 处理,并通过 tb.transport.api.responses 获取响应。
(2)transport 和 rule_engine 通过 tb.core 队列将消息(会话生命周期事件、属性和rpc订阅)传给 tb-core。
(3)rule_engine 通过 js.eval.requests 将数据交给 js_executer 处理,并通过 js.eval.response 获取响应。
(4)transport 和 integration 通过 tb.rule-engine 将所有设备消息传给 rule_engine。
所以扩展自定义 Transport 也直接对接 queue 消息队列即可。
2.开发 Tcp-Transport
(1)transport-api
需要用到ThingsBoard的 transport-api,核心的依赖jar文件在 ThingsBoard 源码中可以找到。最简单的方式就是直接反编译现有的 mqtt-transport 取到对应的 lib 依赖库。这里演示如何从容器中获取:
创建一个 mqtt-transport 服务:
docker run --restart=always --name tb-mqtt-transport \
-p 1883:1883 \
-e TB_SERVICE_ID=tb-mqtt-transport \
-e ZOOKEEPER_ENABLED=true \
-e ZOOKEEPER_URL=tb-test5:2181 \
-e MQTT_BIND_ADDRESS=0.0.0.0 \
-e MQTT_BIND_PORT=1883 \
-e MQTT_TIMEOUT=10000 \
-e TB_QUEUE_TYPE=kafka \
-e TB_KAFKA_SERVERS=kafkahost:kafkaport \
-v /usr/local/thingsboard/tb-transports/mqtt/conf:/config \
-d store/thingsboard/tb-mqtt-transport:3.3.1
从容器中取出相关源代码:
docker cp tb-mqtt-transport:/usr/share/tb-mqtt-transport/bin/tb-mqtt-transport.jar /root/tb-mqtt-transport.jar
直接解压就能获取到所有依赖包:
BOOT-INF\lib
其中用于队列通信的 transport-api 接口都在 SessionMsgListener 里面。
(2) SessionMsgListener 接口
调用设备登录和鉴权:
private void init(ValidateDeviceCredentialsResponse response){
DeviceSession that = this;
tbDeviceInfo = response.getDeviceInfo();
//保存身份信息
tbSession = SessionInfoCreator.create(response, context, UUID.randomUUID());
//创建一个session打开事件到tb
context.getTransportService().process(
tbSession,
DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN),
new TransportServiceCallback<Void>(){
@Override
public void onSuccess(Void aVoid) {
//订阅RCP RCP请求会回调到 SessionMsgListener.xxx() 方法中
context.getTransportService().process(
tbSession,
TransportProtos.SubscribeToRPCMsg.newBuilder().build(),
new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
log.info("[SubscribeToRPCMsg Success]");
}
@Override
public void onError(Throwable throwable) {
log.info("[SubscribeToRPCMsg Error]");
}
}
);
//订阅属性变更 属性变更会回调到 SessionMsgListener.xxx() 方法中
context.getTransportService().process(
tbSession,
TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(),
new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
log.info("[SubscribeToAttributeUpdatesMsg Success]");
}
@Override
public void onError(Throwable throwable) {
log.info("[SubscribeToAttributeUpdatesMsg Error]");
}
}
);
//注册 SessionMsgListener 用于监听各种回调
context.getTransportService().registerAsyncSession(
tbSession,
that
);
isInit = true;
}
@Override
public void onError(Throwable throwable) {
log.info("[会话初始化失败]deviceName="+deviceName+",deviceToken="+deviceToken);
}
});
}
调用属性上传:
public void postAttributeMsg(String payload){
context.getTransportService().process(
tbSession,
JsonConverter.convertToAttributesProto(new JsonParser().parse(payload)),
new TransportServiceCallback<Void>(){
@Override
public void onSuccess(Void aVoid) {
log.info("[属性发送成功]"+payload);
}
@Override
public void onError(Throwable throwable) {
log.info("[属性发送失败]"+payload);
}
});
}
调用遥测上传:
public void postTelemetryMsg(String payload){
context.getTransportService().process(
tbSession,
JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload)),
new TransportServiceCallback<Void>(){
@Override
public void onSuccess(Void aVoid) {
log.info("[遥测发送成功]"+payload);
}
@Override
public void onError(Throwable throwable) {
log.info("[遥测发送失败]"+payload);
}
});
}
调用获取属性:
public void getSharedAttribute(String key){
TransportProtos.GetAttributeRequestMsg getAttributeRequestMsg = TransportProtos.GetAttributeRequestMsg.newBuilder()
.addSharedAttributeNames(key)
.build();
context.getTransportService().process(
tbSession,
getAttributeRequestMsg,
new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
}
@Override
public void onError(Throwable throwable) {
}
}
);
}
调用ota文件升级:
public byte[] getOtaPackage(String packageType, int chuckSize, int chuck) throws Exception{
TransportProtos.GetOtaPackageRequestMsg requestMsg = TransportProtos.GetOtaPackageRequestMsg.newBuilder()
.setTenantIdMSB(tbSession.getTenantIdMSB())
.setTenantIdLSB(tbSession.getTenantIdLSB())
.setDeviceIdMSB(tbSession.getDeviceIdMSB())
.setDeviceIdLSB(tbSession.getDeviceIdLSB())
//OtaPackageType.FIRMWARE 即 sw 或者 fw
.setType(packageType)
.build();
final byte[][] res = new byte[1][];
CountDownLatch latch = new CountDownLatch(1);
//发送文件获取请求不需要传递文件名称,直接返回设备分配的文件。需要和传入的ota包版本进行匹配。
context.getTransportService().process(
tbSession,
requestMsg,
new TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg>(){
@Override
public void onSuccess(TransportProtos.GetOtaPackageResponseMsg getOtaPackageResponseMsg) {
//当前设备未分配可升级的 ota 包
if(!TransportProtos.ResponseStatus.SUCCESS.equals(getOtaPackageResponseMsg.getResponseStatus())){
log.info("Error:device["+tbSession.getDeviceName()+"] ota package not found!");
}
//文件存在
else {
log.info("Success: device["+tbSession.getDeviceName()+"] ota_title["+getOtaPackageResponseMsg.getTitle()+"] ota_version["+getOtaPackageResponseMsg.getVersion()+"]");
//缓存中的 ota 包编号
String otaPackageId = new UUID(
getOtaPackageResponseMsg.getOtaPackageIdMSB(),
getOtaPackageResponseMsg.getOtaPackageIdLSB()).toString();
//获取到文件块或者整个文件
//每次拉取chuckSize个字节,当前是第chuck次[从0开始]拉取.
//byte[] source = context.getOtaPackageDataCache().get(otaPackageId);
byte[] source = context.getOtaPackageDataCache().get(otaPackageId,chuckSize,chuck);
res[0] = source;
latch.countDown();
}
}
@Override
public void onError(Throwable throwable) {
log.info("Error getOtaPackageResponseMsg");
latch.countDown();
}
}
);
latch.await(3, TimeUnit.SECONDS);
return res[0];
}
