diff --git a/application/src/main/java/org/thingsboard/server/service/firmware/DefaultFirmwareStateService.java b/application/src/main/java/org/thingsboard/server/service/firmware/DefaultFirmwareStateService.java index 0c0b4d15e1..d791b882ee 100644 --- a/application/src/main/java/org/thingsboard/server/service/firmware/DefaultFirmwareStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/firmware/DefaultFirmwareStateService.java @@ -34,16 +34,23 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.firmware.FirmwareService; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; -import java.util.Objects; +import java.util.UUID; import java.util.function.Consumer; import static org.thingsboard.server.common.data.DataConstants.FIRMWARE_CHECKSUM; @@ -61,12 +68,18 @@ public class DefaultFirmwareStateService implements FirmwareStateService { private final DeviceService deviceService; private final DeviceProfileService deviceProfileService; private final RuleEngineTelemetryService telemetryService; + private final TbQueueProducer> fwStateMsgProducer; - public DefaultFirmwareStateService(FirmwareService firmwareService, DeviceService deviceService, DeviceProfileService deviceProfileService, RuleEngineTelemetryService telemetryService) { + public DefaultFirmwareStateService(FirmwareService firmwareService, + DeviceService deviceService, + DeviceProfileService deviceProfileService, + RuleEngineTelemetryService telemetryService, + TbCoreQueueFactory coreQueueFactory) { this.firmwareService = firmwareService; this.deviceService = deviceService; this.deviceProfileService = deviceProfileService; this.telemetryService = telemetryService; + this.fwStateMsgProducer = coreQueueFactory.createToFirmwareStateServiceMsgProducer(); } @Override @@ -85,7 +98,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService { } if (!newFirmwareId.equals(oldFirmwareId)) { // Device was updated and new firmware is different from previous firmware. - update(device, firmwareService.findFirmwareById(device.getTenantId(), newFirmwareId), System.currentTimeMillis()); + send(device.getTenantId(), device.getId(), newFirmwareId, System.currentTimeMillis()); } } else { // Device was updated and new firmware is not set. @@ -93,7 +106,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService { } } else if (newFirmwareId != null) { // Device was created and firmware is defined. - update(device, firmwareService.findFirmwareById(device.getTenantId(), newFirmwareId), System.currentTimeMillis()); + send(device.getTenantId(), device.getId(), newFirmwareId, System.currentTimeMillis()); } } @@ -103,9 +116,8 @@ public class DefaultFirmwareStateService implements FirmwareStateService { Consumer updateConsumer; if (deviceProfile.getFirmwareId() != null) { - Firmware firmware = firmwareService.findFirmwareById(tenantId, deviceProfile.getFirmwareId()); long ts = System.currentTimeMillis(); - updateConsumer = d -> update(d, firmware, ts); + updateConsumer = d -> send(d.getTenantId(), d.getId(), deviceProfile.getFirmwareId(), ts); } else { updateConsumer = this::remove; } @@ -113,10 +125,9 @@ public class DefaultFirmwareStateService implements FirmwareStateService { PageLink pageLink = new PageLink(100); PageData pageData; do { - //TODO: create a query which will return devices without firmware - pageData = deviceService.findDevicesByTenantIdAndType(tenantId, deviceProfile.getName(), pageLink); + pageData = deviceService.findDevicesByTenantIdAndTypeAndEmptyFirmware(tenantId, deviceProfile.getName(), pageLink); - pageData.getData().stream().filter(d -> d.getFirmwareId() == null).forEach(updateConsumer); + pageData.getData().forEach(updateConsumer); if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); @@ -124,6 +135,64 @@ public class DefaultFirmwareStateService implements FirmwareStateService { } while (pageData.hasNext()); } + @Override + public boolean process(ToFirmwareStateServiceMsg msg) { + boolean isSuccess = false; + FirmwareId targetFirmwareId = new FirmwareId(new UUID(msg.getFirmwareIdMSB(), msg.getFirmwareIdLSB())); + DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB())); + TenantId tenantId = new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); + long ts = msg.getTs(); + + Device device = deviceService.findDeviceById(tenantId, deviceId); + if (device == null) { + log.warn("[{}] [{}] Device was removed during firmware update msg was queued!", tenantId, deviceId); + } else { + FirmwareId currentFirmwareId = device.getFirmwareId(); + + if (currentFirmwareId == null) { + currentFirmwareId = deviceProfileService.findDeviceProfileById(tenantId, device.getDeviceProfileId()).getFirmwareId(); + } + + if (targetFirmwareId.equals(currentFirmwareId)) { + update(device, firmwareService.findFirmwareById(device.getTenantId(), targetFirmwareId), ts); + isSuccess = true; + } else { + log.warn("[{}] [{}] Can`t update firmware for the device, target firmwareId: [{}], current firmwareId: [{}]!", tenantId, deviceId, targetFirmwareId, currentFirmwareId); + } + } + return isSuccess; + } + + private void send(TenantId tenantId, DeviceId deviceId, FirmwareId firmwareId, long ts) { + ToFirmwareStateServiceMsg msg = ToFirmwareStateServiceMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setFirmwareIdMSB(firmwareId.getId().getMostSignificantBits()) + .setFirmwareIdLSB(firmwareId.getId().getLeastSignificantBits()) + .setTs(ts) + .build(); + + TopicPartitionInfo tpi = new TopicPartitionInfo(fwStateMsgProducer.getDefaultTopic(), null, null, false); + fwStateMsgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); + + BasicTsKvEntry status = new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.FIRMWARE_STATE, FirmwareUpdateStatus.QUEUED.name())); + + telemetryService.saveAndNotify(tenantId, deviceId, Collections.singletonList(status), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void tmp) { + log.trace("[{}] Success save firmware status!", deviceId); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Failed to save firmware status!", deviceId, t); + } + }); + } + + private void update(Device device, Firmware firmware, long ts) { TenantId tenantId = device.getTenantId(); DeviceId deviceId = device.getId(); @@ -131,6 +200,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService { List telemetry = new ArrayList<>(); telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.TARGET_FIRMWARE_TITLE, firmware.getTitle()))); telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.TARGET_FIRMWARE_VERSION, firmware.getVersion()))); + telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.FIRMWARE_STATE, FirmwareUpdateStatus.INITIATED.name()))); telemetryService.saveAndNotify(tenantId, deviceId, telemetry, new FutureCallback<>() { @Override diff --git a/application/src/main/java/org/thingsboard/server/service/firmware/FirmwareStateService.java b/application/src/main/java/org/thingsboard/server/service/firmware/FirmwareStateService.java index 3f84e5d0fb..ac51b0d95e 100644 --- a/application/src/main/java/org/thingsboard/server/service/firmware/FirmwareStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/firmware/FirmwareStateService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.firmware; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; public interface FirmwareStateService { @@ -24,4 +25,6 @@ public interface FirmwareStateService { void update(DeviceProfile deviceProfile); + boolean process(ToFirmwareStateServiceMsg msg); + } diff --git a/application/src/main/java/org/thingsboard/server/service/firmware/FirmwareUpdateStatus.java b/application/src/main/java/org/thingsboard/server/service/firmware/FirmwareUpdateStatus.java new file mode 100644 index 0000000000..d62265840a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/firmware/FirmwareUpdateStatus.java @@ -0,0 +1,5 @@ +package org.thingsboard.server.service.firmware; + +public enum FirmwareUpdateStatus { + QUEUED, INITIATED, DOWNLOADING, DOWNLOADED, VERIFIED, UPDATING, UPDATED, FAILED +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 71dd1a2e07..657320170f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -24,6 +24,7 @@ import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.actors.ActorSystemContext; @@ -35,7 +36,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; -import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; @@ -49,6 +50,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseP import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.queue.TbQueueConsumer; @@ -58,8 +60,8 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.edge.EdgeNotificationService; +import org.thingsboard.server.service.firmware.FirmwareStateService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; -import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; @@ -97,6 +99,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> mainConsumer; private final DeviceStateService stateService; private final TbApiUsageStateService statsService; @@ -104,11 +111,15 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> usageStatsConsumer; + private final TbQueueConsumer> firmwareStatesConsumer; protected volatile ExecutorService usageStatsExecutor; + private volatile ExecutorService firmwareStatesExecutor; + public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext, DeviceStateService stateService, @@ -121,10 +132,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService tpi.newByTopic(usageStatsConsumer.getTopic())) .collect(Collectors.toSet())); } + this.firmwareStatesConsumer.subscribe(); } @Override @@ -336,10 +356,57 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService { + while (!stopped) { + try { + List> msgs = firmwareStatesConsumer.poll(getNotificationPollDuration()); + if (msgs.isEmpty()) { + Thread.sleep(maxProcessingTimeoutPerRecord); + continue; + } + long timeToSleep = maxProcessingTimeoutPerRecord; + for (TbProtoQueueMsg msg : msgs) { + try { + long startTime = System.currentTimeMillis(); + boolean isSuccessUpdate = handleFirmwareUpdates(msg); + long endTime = System.currentTimeMillis(); + long spentTime = endTime - startTime; + timeToSleep = timeToSleep - spentTime; + if (isSuccessUpdate && timeToSleep > 0) { + log.debug("Spent time per record is: [{}]!", spentTime); + Thread.sleep(timeToSleep); + timeToSleep = maxProcessingTimeoutPerRecord; + } + } catch (Throwable e) { + log.warn("Failed to process firmware update msg: {}", msg, e); + } + } + firmwareStatesConsumer.commit(); + } catch (Exception e) { + if (!stopped) { + log.warn("Failed to obtain usage stats from queue.", e); + try { + Thread.sleep(getNotificationPollDuration()); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new firmware updates", e2); + } + } + } + } + log.info("TB Firmware States Consumer stopped."); + }); + } + private void handleUsageStats(TbProtoQueueMsg msg, TbCallback callback) { statsService.process(msg, callback); } + private boolean handleFirmwareUpdates(TbProtoQueueMsg msg) { + return firmwareStateService.process(msg.getValue()); + } + private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) { RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()) @@ -448,6 +515,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink); + PageData findDevicesByTenantIdAndTypeAndEmptyFirmware(TenantId tenantId, String type, PageLink pageLink); + PageData findDeviceInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink); PageData findDeviceInfosByTenantIdAndDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId, PageLink pageLink); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index e244873556..d53e992db8 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -99,7 +99,7 @@ public class DataConstants { public static final String CURRENT_FIRMWARE_VERSION = "cur_fw_version"; public static final String TARGET_FIRMWARE_TITLE = "target_fw_title"; public static final String TARGET_FIRMWARE_VERSION = "target_fw_version"; - public static final String CURRENT_FIRMWARE_STATE = "cur_fw_state"; + public static final String FIRMWARE_STATE = "fw_state"; //attributes //telemetry diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java index 0dc48eac82..16ec1be164 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java @@ -37,6 +37,8 @@ public class TbKafkaTopicConfigs { private String notificationsProperties; @Value("${queue.kafka.topic-properties.js-executor}") private String jsExecutorProperties; + @Value("${queue.kafka.topic-properties.fw-updates}") + private String fwUpdatesProperties; @Getter private Map coreConfigs; @@ -48,6 +50,8 @@ public class TbKafkaTopicConfigs { private Map notificationsConfigs; @Getter private Map jsExecutorConfigs; + @Getter + private Map fwUpdatesConfigs; @PostConstruct private void init() { @@ -56,6 +60,7 @@ public class TbKafkaTopicConfigs { transportApiConfigs = getConfigs(transportApiProperties); notificationsConfigs = getConfigs(notificationsProperties); jsExecutorConfigs = getConfigs(jsExecutorProperties); + fwUpdatesConfigs = getConfigs(fwUpdatesProperties); } private Map getConfigs(String properties) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java index 2bf396bb67..c2f4f4f750 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java @@ -186,6 +186,17 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getFirmwareTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getFirmwareTopic()); + } + @PreDestroy private void destroy() { if (coreAdmin != null) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java index e20770a931..0267cdecb8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java @@ -21,12 +21,13 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.queue.TbQueueAdmin; @@ -165,14 +166,25 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueProducer> createToUsageStatsServiceMsgProducer() { + public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic()); } @Override - public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { + public TbQueueConsumer> createToUsageStatsServiceMsgConsumer() { return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(), - msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getFirmwareTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getFirmwareTopic()); } @PreDestroy diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index da3718c0f2..8806176f9f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -130,6 +130,16 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE return new InMemoryTbQueueConsumer<>(coreSettings.getUsageStatsTopic()); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new InMemoryTbQueueConsumer<>(coreSettings.getFirmwareTopic()); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new InMemoryTbQueueProducer<>(coreSettings.getFirmwareTopic()); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new InMemoryTbQueueProducer<>(coreSettings.getUsageStatsTopic()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index e07f3e43d3..a526331116 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -73,6 +74,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin jsExecutorAdmin; private final TbQueueAdmin transportApiAdmin; private final TbQueueAdmin notificationAdmin; + private final TbQueueAdmin fwUpdatesAdmin; public KafkaMonolithQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings, TbServiceInfoProvider serviceInfoProvider, @@ -98,6 +100,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); + this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs()); } @Override @@ -273,6 +276,29 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi return consumerBuilder.build(); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(coreSettings.getFirmwareTopic()); + consumerBuilder.clientId("monolith-fw-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("monolith-fw-consumer"); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.admin(fwUpdatesAdmin); + consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("monolith-fw-producer-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(coreSettings.getFirmwareTopic()); + requestBuilder.admin(fwUpdatesAdmin); + return requestBuilder.build(); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 24e80adec5..5f64e42ffd 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -70,6 +71,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueAdmin jsExecutorAdmin; private final TbQueueAdmin transportApiAdmin; private final TbQueueAdmin notificationAdmin; + private final TbQueueAdmin fwUpdatesAdmin; public KafkaTbCoreQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings, TbServiceInfoProvider serviceInfoProvider, @@ -93,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); + this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs()); } @Override @@ -241,6 +244,29 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { return consumerBuilder.build(); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); + consumerBuilder.settings(kafkaSettings); + consumerBuilder.topic(coreSettings.getFirmwareTopic()); + consumerBuilder.clientId("tb-core-fw-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.groupId("tb-core-fw-consumer"); + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + consumerBuilder.admin(fwUpdatesAdmin); + consumerBuilder.statsService(consumerStatsService); + return consumerBuilder.build(); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); + requestBuilder.settings(kafkaSettings); + requestBuilder.clientId("tb-core-fw-producer-" + serviceInfoProvider.getServiceId()); + requestBuilder.defaultTopic(coreSettings.getFirmwareTopic()); + requestBuilder.admin(fwUpdatesAdmin); + return requestBuilder.build(); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder> requestBuilder = TbKafkaProducerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index 999a9c4259..3504c6aef2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; +import org.thingsboard.server.gen.transport.TransportProtos.*; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; @@ -190,6 +191,17 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic()); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index 2a63a24282..a9ebf5a4de 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -164,6 +165,17 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic()); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java index 1fa0363d28..cd6043cdde 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java @@ -24,6 +24,7 @@ import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -188,6 +189,17 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic()); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java index 5819e0e65a..8b5ad00de4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -170,6 +171,17 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic()); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java index df4f713b23..b569d13742 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -187,6 +188,17 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic()); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java index 4ee46f2422..17e6eb9a27 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -170,6 +171,17 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); } + @Override + public TbQueueConsumer> createToFirmwareStateServiceMsgConsumer() { + return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic(), + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); + } + + @Override + public TbQueueProducer> createToFirmwareStateServiceMsgProducer() { + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic()); + } + @Override public TbQueueProducer> createToUsageStatsServiceMsgProducer() { return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic()); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java index 5bb9de9564..be5ca967bf 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java @@ -16,6 +16,7 @@ package org.thingsboard.server.queue.provider; import org.thingsboard.server.gen.js.JsInvokeProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; @@ -85,6 +86,20 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory { */ TbQueueConsumer> createToUsageStatsServiceMsgConsumer(); + /** + * Used to consume messages about firmware update notifications by TB Core Service + * + * @return + */ + TbQueueConsumer> createToFirmwareStateServiceMsgConsumer(); + + /** + * Used to consume messages about firmware update notifications by TB Core Service + * + * @return + */ + TbQueueProducer> createToFirmwareStateServiceMsgProducer(); + /** * Used to consume high priority messages by TB Core Service * diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueCoreSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueCoreSettings.java index 5f9880d2c1..d194fdb4ea 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueCoreSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueCoreSettings.java @@ -26,6 +26,9 @@ public class TbQueueCoreSettings { @Value("${queue.core.topic}") private String topic; + @Value("${queue.core.firmware.topic:tb_firmware}") + private String firmwareTopic; + @Value("${queue.core.usage-stats-topic:tb_usage_stats}") private String usageStatsTopic; diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index 6335670f07..6f1f44b765 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -359,8 +359,10 @@ message GetFirmwareResponseMsg { ResponseStatus responseStatus = 1; int64 firmwareIdMSB = 2; int64 firmwareIdLSB = 3; - string contentType = 4; - string fileName = 5; + string title = 4; + string version = 5; + string contentType = 6; + string fileName = 7; } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. @@ -657,3 +659,13 @@ message ToUsageStatsServiceMsg { int64 entityIdLSB = 4; repeated UsageStatsKVProto values = 5; } + +message ToFirmwareStateServiceMsg { + int64 ts = 1; + int64 tenantIdMSB = 2; + int64 tenantIdLSB = 3; + int64 deviceIdMSB = 4; + int64 deviceIdLSB = 5; + int64 firmwareIdMSB = 6; + int64 firmwareIdLSB = 7; +} diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index 5c10202d00..dc3b980f88 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -207,6 +207,8 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/firmware", method = RequestMethod.GET) public DeferredResult getFirmware(@PathVariable("deviceToken") String deviceToken, + @RequestParam(value = "title") String title, + @RequestParam(value = "version") String version, @RequestParam(value = "chunkSize", required = false, defaultValue = "0") int chunkSize, @RequestParam(value = "chunk", required = false, defaultValue = "0") int chunk) { DeferredResult responseWriter = new DeferredResult<>(); @@ -217,7 +219,7 @@ public class DeviceApiController { .setTenantIdLSB(sessionInfo.getTenantIdLSB()) .setDeviceIdMSB(sessionInfo.getDeviceIdMSB()) .setDeviceIdLSB(sessionInfo.getDeviceIdLSB()).build(); - transportContext.getTransportService().process(sessionInfo, requestMsg, new GetFirmwareCallback(responseWriter, chunkSize, chunk)); + transportContext.getTransportService().process(sessionInfo, requestMsg, new GetFirmwareCallback(responseWriter, title, version, chunkSize, chunk)); })); return responseWriter; } @@ -278,11 +280,15 @@ public class DeviceApiController { private class GetFirmwareCallback implements TransportServiceCallback { private final DeferredResult responseWriter; + private final String title; + private final String version; private final int chuckSize; private final int chuck; - GetFirmwareCallback(DeferredResult responseWriter, int chuckSize, int chuck) { + GetFirmwareCallback(DeferredResult responseWriter, String title, String version, int chuckSize, int chuck) { this.responseWriter = responseWriter; + this.title = title; + this.version = version; this.chuckSize = chuckSize; this.chuck = chuck; } @@ -291,7 +297,7 @@ public class DeviceApiController { public void onSuccess(TransportProtos.GetFirmwareResponseMsg firmwareResponseMsg) { if (!TransportProtos.ResponseStatus.SUCCESS.equals(firmwareResponseMsg.getResponseStatus())) { responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND)); - } else { + } else if (title.equals(firmwareResponseMsg.getTitle()) && version.equals(firmwareResponseMsg.getVersion())) { String firmwareId = new UUID(firmwareResponseMsg.getFirmwareIdMSB(), firmwareResponseMsg.getFirmwareIdLSB()).toString(); ByteArrayResource resource = new ByteArrayResource(transportContext.getFirmwareCacheReader().get(firmwareId, chuckSize, chuck)); ResponseEntity response = ResponseEntity.ok() @@ -301,6 +307,8 @@ public class DeviceApiController { .contentType(parseMediaType(firmwareResponseMsg.getContentType())) .body(resource); responseWriter.setResult(response); + } else { + responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java index 784eb4e229..0a61ce554e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java @@ -81,6 +81,8 @@ public interface DeviceDao extends Dao, TenantEntityDao { */ PageData findDevicesByTenantIdAndType(UUID tenantId, String type, PageLink pageLink); + PageData findDevicesByTenantIdAndTypeAndEmptyFirmware(UUID tenantId, String type, PageLink pageLink); + /** * Find device infos by tenantId, type and page link. * diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index 73eff3b5e6..73dadef589 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -354,6 +354,15 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe return deviceDao.findDevicesByTenantIdAndType(tenantId.getId(), type, pageLink); } + @Override + public PageData findDevicesByTenantIdAndTypeAndEmptyFirmware(TenantId tenantId, String type, PageLink pageLink) { + log.trace("Executing findDevicesByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validateString(type, "Incorrect type " + type); + validatePageLink(pageLink); + return deviceDao.findDevicesByTenantIdAndTypeAndEmptyFirmware(tenantId.getId(), type, pageLink); + } + @Override public PageData findDeviceInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) { log.trace("Executing findDeviceInfosByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java index 0de59ac8be..999e69f330 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java @@ -94,6 +94,15 @@ public interface DeviceRepository extends PagingAndSortingRepository findByTenantIdAndTypeAndFirmwareIdIsNull(@Param("tenantId") UUID tenantId, + @Param("type") String type, + @Param("textSearch") String textSearch, + Pageable pageable); + @Query("SELECT new org.thingsboard.server.dao.model.sql.DeviceInfoEntity(d, c.title, c.additionalInfo, p.name) " + "FROM DeviceEntity d " + "LEFT JOIN CustomerEntity c on c.id = d.customerId " + diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java index 96313f7a45..a545c7cc6b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java @@ -149,6 +149,16 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao DaoUtil.toPageable(pageLink))); } + @Override + public PageData findDevicesByTenantIdAndTypeAndEmptyFirmware(UUID tenantId, String type, PageLink pageLink) { + return DaoUtil.toPageData( + deviceRepository.findByTenantIdAndTypeAndFirmwareIdIsNull( + tenantId, + type, + Objects.toString(pageLink.getTextSearch(), ""), + DaoUtil.toPageable(pageLink))); + } + @Override public PageData findDeviceInfosByTenantIdAndType(UUID tenantId, String type, PageLink pageLink) { return DaoUtil.toPageData( diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 18ef3e349c..ee3445c90d 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -41,7 +41,6 @@ zk: zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" cache: - # caffeine or redis type: "${CACHE_TYPE:redis}" redis: diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index b34f8940bc..e12c5308ba 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -36,7 +36,6 @@ zk: zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" cache: - # caffeine or redis type: "${CACHE_TYPE:redis}" redis: diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 25fee65cc0..6fa494f61b 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -41,7 +41,6 @@ zk: zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" cache: - # caffeine or redis type: "${CACHE_TYPE:redis}" redis: diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index ed30613043..6dbbb99311 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -41,7 +41,6 @@ zk: zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" cache: - # caffeine or redis type: "${CACHE_TYPE:redis}" redis: