你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

开发 transport 协议转换层

2021/12/25 17:13:32

首先 ThingsBoard 微服务架构目前已经支持的 MQTT、HTTP、CoAP、SNMP、LwM2M等协议。但是一些厂商的数据是 TCP 私有协议并且高度定制化需要扩展 Transport 微服务的情况下需要定制化自己的 transport 。

1.通过队列通信

ThingsBoard一共有几种消息队列用于微服务模块之间通信:

(1)transport 通过 tb.transport.api.requests 将设备鉴权任务交给 tb-core 处理,并通过 tb.transport.api.responses 获取响应。

(2)transport 和 rule_engine 通过 tb.core 队列将消息(会话生命周期事件、属性和rpc订阅)传给 tb-core。

(3)rule_engine 通过 js.eval.requests 将数据交给 js_executer 处理,并通过 js.eval.response 获取响应。

(4)transport 和 integration 通过 tb.rule-engine 将所有设备消息传给 rule_engine。

所以扩展自定义 Transport 也直接对接 queue 消息队列即可。

2.开发 Tcp-Transport

(1)transport-api

需要用到ThingsBoard的 transport-api,核心的依赖jar文件在 ThingsBoard 源码中可以找到。最简单的方式就是直接反编译现有的 mqtt-transport 取到对应的 lib 依赖库。这里演示如何从容器中获取:

创建一个 mqtt-transport 服务:

docker run  --restart=always --name tb-mqtt-transport \
-p 1883:1883 \
-e TB_SERVICE_ID=tb-mqtt-transport \
-e ZOOKEEPER_ENABLED=true \
-e ZOOKEEPER_URL=tb-test5:2181 \
-e MQTT_BIND_ADDRESS=0.0.0.0 \
-e MQTT_BIND_PORT=1883 \
-e MQTT_TIMEOUT=10000 \
-e TB_QUEUE_TYPE=kafka \
-e TB_KAFKA_SERVERS=kafkahost:kafkaport \
-v /usr/local/thingsboard/tb-transports/mqtt/conf:/config \
-d store/thingsboard/tb-mqtt-transport:3.3.1

从容器中取出相关源代码:

docker cp tb-mqtt-transport:/usr/share/tb-mqtt-transport/bin/tb-mqtt-transport.jar /root/tb-mqtt-transport.jar

直接解压就能获取到所有依赖包:

BOOT-INF\lib

其中用于队列通信的 transport-api 接口都在 SessionMsgListener 里面。

(2) SessionMsgListener 接口

调用设备登录和鉴权:

    private void init(ValidateDeviceCredentialsResponse response){
        DeviceSession that = this;
        tbDeviceInfo = response.getDeviceInfo();
        //保存身份信息
        tbSession = SessionInfoCreator.create(response, context, UUID.randomUUID());
        //创建一个session打开事件到tb
        context.getTransportService().process(
                tbSession,
                DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN),
                new TransportServiceCallback<Void>(){
                    @Override
                    public void onSuccess(Void aVoid) {
                        //订阅RCP RCP请求会回调到 SessionMsgListener.xxx() 方法中
                        context.getTransportService().process(
                                tbSession,
                                TransportProtos.SubscribeToRPCMsg.newBuilder().build(),
                                new TransportServiceCallback<Void>() {
                                    @Override
                                    public void onSuccess(Void aVoid) {
                                        log.info("[SubscribeToRPCMsg Success]");
                                    }
                                    @Override
                                    public void onError(Throwable throwable) {
                                        log.info("[SubscribeToRPCMsg Error]");
                                    }
                                }
                        );
                        //订阅属性变更 属性变更会回调到 SessionMsgListener.xxx() 方法中
                        context.getTransportService().process(
                                tbSession,
                                TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(),
                                new TransportServiceCallback<Void>() {
                                    @Override
                                    public void onSuccess(Void aVoid) {
                                        log.info("[SubscribeToAttributeUpdatesMsg Success]");
                                    }
                                    @Override
                                    public void onError(Throwable throwable) {
                                        log.info("[SubscribeToAttributeUpdatesMsg Error]");
                                    }
                                }
                        );
                        //注册 SessionMsgListener 用于监听各种回调
                        context.getTransportService().registerAsyncSession(
                                tbSession,
                                that
                        );
                        isInit = true;
                    }
                    @Override
                    public void onError(Throwable throwable) {
                        log.info("[会话初始化失败]deviceName="+deviceName+",deviceToken="+deviceToken);
                    }
                });
    }

调用属性上传:

    public void postAttributeMsg(String payload){

        context.getTransportService().process(
                tbSession,
                JsonConverter.convertToAttributesProto(new JsonParser().parse(payload)),
                new TransportServiceCallback<Void>(){
                    @Override
                    public void onSuccess(Void aVoid) {
                        log.info("[属性发送成功]"+payload);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        log.info("[属性发送失败]"+payload);
                    }
                });
    }

调用遥测上传:

    public void postTelemetryMsg(String payload){

        context.getTransportService().process(
                tbSession,
                JsonConverter.convertToTelemetryProto(new JsonParser().parse(payload)),
                new TransportServiceCallback<Void>(){
                    @Override
                    public void onSuccess(Void aVoid) {
                        log.info("[遥测发送成功]"+payload);
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        log.info("[遥测发送失败]"+payload);
                    }
                });
    }

调用获取属性:

    public void getSharedAttribute(String key){

        TransportProtos.GetAttributeRequestMsg getAttributeRequestMsg = TransportProtos.GetAttributeRequestMsg.newBuilder()
                .addSharedAttributeNames(key)
                .build();

        context.getTransportService().process(
                tbSession,
                getAttributeRequestMsg,
                new TransportServiceCallback<Void>() {
                    @Override
                    public void onSuccess(Void aVoid) {

                    }

                    @Override
                    public void onError(Throwable throwable) {

                    }
                }
        );

    }

调用ota文件升级:

    public byte[] getOtaPackage(String packageType, int chuckSize, int chuck) throws Exception{

        TransportProtos.GetOtaPackageRequestMsg requestMsg = TransportProtos.GetOtaPackageRequestMsg.newBuilder()
                .setTenantIdMSB(tbSession.getTenantIdMSB())
                .setTenantIdLSB(tbSession.getTenantIdLSB())
                .setDeviceIdMSB(tbSession.getDeviceIdMSB())
                .setDeviceIdLSB(tbSession.getDeviceIdLSB())
                //OtaPackageType.FIRMWARE 即 sw 或者 fw
                .setType(packageType)
                .build();

        final byte[][] res = new byte[1][];
        CountDownLatch latch = new CountDownLatch(1);

        //发送文件获取请求不需要传递文件名称,直接返回设备分配的文件。需要和传入的ota包版本进行匹配。
        context.getTransportService().process(
                tbSession,
                requestMsg,
                new TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg>(){
                    @Override
                    public void onSuccess(TransportProtos.GetOtaPackageResponseMsg getOtaPackageResponseMsg) {
                        //当前设备未分配可升级的 ota 包
                        if(!TransportProtos.ResponseStatus.SUCCESS.equals(getOtaPackageResponseMsg.getResponseStatus())){
                            log.info("Error:device["+tbSession.getDeviceName()+"] ota package not found!");
                        }
                        //文件存在
                        else {
                            log.info("Success: device["+tbSession.getDeviceName()+"] ota_title["+getOtaPackageResponseMsg.getTitle()+"] ota_version["+getOtaPackageResponseMsg.getVersion()+"]");
                            //缓存中的 ota 包编号
                            String otaPackageId = new UUID(
                                    getOtaPackageResponseMsg.getOtaPackageIdMSB(),
                                    getOtaPackageResponseMsg.getOtaPackageIdLSB()).toString();
                            //获取到文件块或者整个文件
                            //每次拉取chuckSize个字节,当前是第chuck次[从0开始]拉取.
                            //byte[] source = context.getOtaPackageDataCache().get(otaPackageId);
                            byte[] source = context.getOtaPackageDataCache().get(otaPackageId,chuckSize,chuck);
                            res[0] = source;
                            latch.countDown();
                        }
                    }
                    @Override
                    public void onError(Throwable throwable) {
                            log.info("Error getOtaPackageResponseMsg");
                        latch.countDown();
                    }
                }
        );
        latch.await(3, TimeUnit.SECONDS);
        return res[0];
    }