created created firmware queue
This commit is contained in:
parent
609024a603
commit
c9439b3976
@ -34,16 +34,23 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.dao.device.DeviceProfileService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.firmware.FirmwareService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.FIRMWARE_CHECKSUM;
|
||||
@ -61,12 +68,18 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
|
||||
private final DeviceService deviceService;
|
||||
private final DeviceProfileService deviceProfileService;
|
||||
private final RuleEngineTelemetryService telemetryService;
|
||||
private final TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> fwStateMsgProducer;
|
||||
|
||||
public DefaultFirmwareStateService(FirmwareService firmwareService, DeviceService deviceService, DeviceProfileService deviceProfileService, RuleEngineTelemetryService telemetryService) {
|
||||
public DefaultFirmwareStateService(FirmwareService firmwareService,
|
||||
DeviceService deviceService,
|
||||
DeviceProfileService deviceProfileService,
|
||||
RuleEngineTelemetryService telemetryService,
|
||||
TbCoreQueueFactory coreQueueFactory) {
|
||||
this.firmwareService = firmwareService;
|
||||
this.deviceService = deviceService;
|
||||
this.deviceProfileService = deviceProfileService;
|
||||
this.telemetryService = telemetryService;
|
||||
this.fwStateMsgProducer = coreQueueFactory.createToFirmwareStateServiceMsgProducer();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -85,7 +98,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
|
||||
}
|
||||
if (!newFirmwareId.equals(oldFirmwareId)) {
|
||||
// Device was updated and new firmware is different from previous firmware.
|
||||
update(device, firmwareService.findFirmwareById(device.getTenantId(), newFirmwareId), System.currentTimeMillis());
|
||||
send(device.getTenantId(), device.getId(), newFirmwareId, System.currentTimeMillis());
|
||||
}
|
||||
} else {
|
||||
// Device was updated and new firmware is not set.
|
||||
@ -93,7 +106,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
|
||||
}
|
||||
} else if (newFirmwareId != null) {
|
||||
// Device was created and firmware is defined.
|
||||
update(device, firmwareService.findFirmwareById(device.getTenantId(), newFirmwareId), System.currentTimeMillis());
|
||||
send(device.getTenantId(), device.getId(), newFirmwareId, System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
@ -103,9 +116,8 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
|
||||
|
||||
Consumer<Device> updateConsumer;
|
||||
if (deviceProfile.getFirmwareId() != null) {
|
||||
Firmware firmware = firmwareService.findFirmwareById(tenantId, deviceProfile.getFirmwareId());
|
||||
long ts = System.currentTimeMillis();
|
||||
updateConsumer = d -> update(d, firmware, ts);
|
||||
updateConsumer = d -> send(d.getTenantId(), d.getId(), deviceProfile.getFirmwareId(), ts);
|
||||
} else {
|
||||
updateConsumer = this::remove;
|
||||
}
|
||||
@ -113,10 +125,9 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
|
||||
PageLink pageLink = new PageLink(100);
|
||||
PageData<Device> pageData;
|
||||
do {
|
||||
//TODO: create a query which will return devices without firmware
|
||||
pageData = deviceService.findDevicesByTenantIdAndType(tenantId, deviceProfile.getName(), pageLink);
|
||||
pageData = deviceService.findDevicesByTenantIdAndTypeAndEmptyFirmware(tenantId, deviceProfile.getName(), pageLink);
|
||||
|
||||
pageData.getData().stream().filter(d -> d.getFirmwareId() == null).forEach(updateConsumer);
|
||||
pageData.getData().forEach(updateConsumer);
|
||||
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
@ -124,6 +135,64 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
|
||||
} while (pageData.hasNext());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(ToFirmwareStateServiceMsg msg) {
|
||||
boolean isSuccess = false;
|
||||
FirmwareId targetFirmwareId = new FirmwareId(new UUID(msg.getFirmwareIdMSB(), msg.getFirmwareIdLSB()));
|
||||
DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
|
||||
TenantId tenantId = new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
|
||||
long ts = msg.getTs();
|
||||
|
||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||
if (device == null) {
|
||||
log.warn("[{}] [{}] Device was removed during firmware update msg was queued!", tenantId, deviceId);
|
||||
} else {
|
||||
FirmwareId currentFirmwareId = device.getFirmwareId();
|
||||
|
||||
if (currentFirmwareId == null) {
|
||||
currentFirmwareId = deviceProfileService.findDeviceProfileById(tenantId, device.getDeviceProfileId()).getFirmwareId();
|
||||
}
|
||||
|
||||
if (targetFirmwareId.equals(currentFirmwareId)) {
|
||||
update(device, firmwareService.findFirmwareById(device.getTenantId(), targetFirmwareId), ts);
|
||||
isSuccess = true;
|
||||
} else {
|
||||
log.warn("[{}] [{}] Can`t update firmware for the device, target firmwareId: [{}], current firmwareId: [{}]!", tenantId, deviceId, targetFirmwareId, currentFirmwareId);
|
||||
}
|
||||
}
|
||||
return isSuccess;
|
||||
}
|
||||
|
||||
private void send(TenantId tenantId, DeviceId deviceId, FirmwareId firmwareId, long ts) {
|
||||
ToFirmwareStateServiceMsg msg = ToFirmwareStateServiceMsg.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
|
||||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
|
||||
.setFirmwareIdMSB(firmwareId.getId().getMostSignificantBits())
|
||||
.setFirmwareIdLSB(firmwareId.getId().getLeastSignificantBits())
|
||||
.setTs(ts)
|
||||
.build();
|
||||
|
||||
TopicPartitionInfo tpi = new TopicPartitionInfo(fwStateMsgProducer.getDefaultTopic(), null, null, false);
|
||||
fwStateMsgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null);
|
||||
|
||||
BasicTsKvEntry status = new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.FIRMWARE_STATE, FirmwareUpdateStatus.QUEUED.name()));
|
||||
|
||||
telemetryService.saveAndNotify(tenantId, deviceId, Collections.singletonList(status), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void tmp) {
|
||||
log.trace("[{}] Success save firmware status!", deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("[{}] Failed to save firmware status!", deviceId, t);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private void update(Device device, Firmware firmware, long ts) {
|
||||
TenantId tenantId = device.getTenantId();
|
||||
DeviceId deviceId = device.getId();
|
||||
@ -131,6 +200,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
|
||||
List<TsKvEntry> telemetry = new ArrayList<>();
|
||||
telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.TARGET_FIRMWARE_TITLE, firmware.getTitle())));
|
||||
telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.TARGET_FIRMWARE_VERSION, firmware.getVersion())));
|
||||
telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.FIRMWARE_STATE, FirmwareUpdateStatus.INITIATED.name())));
|
||||
|
||||
telemetryService.saveAndNotify(tenantId, deviceId, telemetry, new FutureCallback<>() {
|
||||
@Override
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.firmware;
|
||||
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
|
||||
public interface FirmwareStateService {
|
||||
|
||||
@ -24,4 +25,6 @@ public interface FirmwareStateService {
|
||||
|
||||
void update(DeviceProfile deviceProfile);
|
||||
|
||||
boolean process(ToFirmwareStateServiceMsg msg);
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,5 @@
|
||||
package org.thingsboard.server.service.firmware;
|
||||
|
||||
public enum FirmwareUpdateStatus {
|
||||
QUEUED, INITIATED, DOWNLOADING, DOWNLOADED, VERIFIED, UPDATING, UPDATED, FAILED
|
||||
}
|
||||
@ -24,6 +24,7 @@ import org.springframework.context.event.EventListener;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.rule.engine.api.RpcError;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
@ -35,7 +36,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
|
||||
@ -49,6 +50,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseP
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
@ -58,8 +60,8 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
import org.thingsboard.server.service.edge.EdgeNotificationService;
|
||||
import org.thingsboard.server.service.firmware.FirmwareStateService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
|
||||
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
|
||||
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
|
||||
@ -97,6 +99,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
@Value("${queue.core.stats.enabled:false}")
|
||||
private boolean statsEnabled;
|
||||
|
||||
@Value("${queue.core.firmware.pack-interval-ms:60000}")
|
||||
private long firmwarePackInterval;
|
||||
@Value("${queue.core.firmware.pack-size:100}")
|
||||
private int firmwarePackSize;
|
||||
|
||||
private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> mainConsumer;
|
||||
private final DeviceStateService stateService;
|
||||
private final TbApiUsageStateService statsService;
|
||||
@ -104,11 +111,15 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
private final SubscriptionManagerService subscriptionManagerService;
|
||||
private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
|
||||
private final EdgeNotificationService edgeNotificationService;
|
||||
private final FirmwareStateService firmwareStateService;
|
||||
private final TbCoreConsumerStats stats;
|
||||
protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
|
||||
private final TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> firmwareStatesConsumer;
|
||||
|
||||
protected volatile ExecutorService usageStatsExecutor;
|
||||
|
||||
private volatile ExecutorService firmwareStatesExecutor;
|
||||
|
||||
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
|
||||
ActorSystemContext actorContext,
|
||||
DeviceStateService stateService,
|
||||
@ -121,10 +132,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
TbApiUsageStateService statsService,
|
||||
TbTenantProfileCache tenantProfileCache,
|
||||
TbApiUsageStateService apiUsageStateService,
|
||||
EdgeNotificationService edgeNotificationService) {
|
||||
EdgeNotificationService edgeNotificationService,
|
||||
FirmwareStateService firmwareStateService) {
|
||||
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
|
||||
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
|
||||
this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
|
||||
this.firmwareStatesConsumer = tbCoreQueueFactory.createToFirmwareStateServiceMsgConsumer();
|
||||
this.stateService = stateService;
|
||||
this.localSubscriptionService = localSubscriptionService;
|
||||
this.subscriptionManagerService = subscriptionManagerService;
|
||||
@ -132,12 +145,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
this.edgeNotificationService = edgeNotificationService;
|
||||
this.stats = new TbCoreConsumerStats(statsFactory);
|
||||
this.statsService = statsService;
|
||||
this.firmwareStateService = firmwareStateService;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.init("tb-core-consumer", "tb-core-notifications-consumer");
|
||||
this.usageStatsExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer"));
|
||||
this.firmwareStatesExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-firmware-notifications-consumer"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@ -146,6 +161,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
if (usageStatsExecutor != null) {
|
||||
usageStatsExecutor.shutdownNow();
|
||||
}
|
||||
if (firmwareStatesExecutor != null) {
|
||||
firmwareStatesExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
@ -153,6 +171,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
public void onApplicationEvent(ApplicationReadyEvent event) {
|
||||
super.onApplicationEvent(event);
|
||||
launchUsageStatsConsumer();
|
||||
launchFirmwareUpdateNotificationConsumer();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -167,6 +186,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
.map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic()))
|
||||
.collect(Collectors.toSet()));
|
||||
}
|
||||
this.firmwareStatesConsumer.subscribe();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -336,10 +356,57 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
});
|
||||
}
|
||||
|
||||
private void launchFirmwareUpdateNotificationConsumer() {
|
||||
long maxProcessingTimeoutPerRecord = firmwarePackInterval / firmwarePackSize;
|
||||
firmwareStatesExecutor.submit(() -> {
|
||||
while (!stopped) {
|
||||
try {
|
||||
List<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> msgs = firmwareStatesConsumer.poll(getNotificationPollDuration());
|
||||
if (msgs.isEmpty()) {
|
||||
Thread.sleep(maxProcessingTimeoutPerRecord);
|
||||
continue;
|
||||
}
|
||||
long timeToSleep = maxProcessingTimeoutPerRecord;
|
||||
for (TbProtoQueueMsg<ToFirmwareStateServiceMsg> msg : msgs) {
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
boolean isSuccessUpdate = handleFirmwareUpdates(msg);
|
||||
long endTime = System.currentTimeMillis();
|
||||
long spentTime = endTime - startTime;
|
||||
timeToSleep = timeToSleep - spentTime;
|
||||
if (isSuccessUpdate && timeToSleep > 0) {
|
||||
log.debug("Spent time per record is: [{}]!", spentTime);
|
||||
Thread.sleep(timeToSleep);
|
||||
timeToSleep = maxProcessingTimeoutPerRecord;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.warn("Failed to process firmware update msg: {}", msg, e);
|
||||
}
|
||||
}
|
||||
firmwareStatesConsumer.commit();
|
||||
} catch (Exception e) {
|
||||
if (!stopped) {
|
||||
log.warn("Failed to obtain usage stats from queue.", e);
|
||||
try {
|
||||
Thread.sleep(getNotificationPollDuration());
|
||||
} catch (InterruptedException e2) {
|
||||
log.trace("Failed to wait until the server has capacity to handle new firmware updates", e2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
log.info("TB Firmware States Consumer stopped.");
|
||||
});
|
||||
}
|
||||
|
||||
private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
|
||||
statsService.process(msg, callback);
|
||||
}
|
||||
|
||||
private boolean handleFirmwareUpdates(TbProtoQueueMsg<ToFirmwareStateServiceMsg> msg) {
|
||||
return firmwareStateService.process(msg.getValue());
|
||||
}
|
||||
|
||||
private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) {
|
||||
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
|
||||
FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB())
|
||||
@ -448,6 +515,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
if (usageStatsConsumer != null) {
|
||||
usageStatsConsumer.unsubscribe();
|
||||
}
|
||||
if (firmwareStatesConsumer != null) {
|
||||
firmwareStatesConsumer.unsubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -478,6 +478,8 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
builder.setResponseStatus(TransportProtos.ResponseStatus.SUCCESS);
|
||||
builder.setFirmwareIdMSB(firmwareId.getId().getMostSignificantBits());
|
||||
builder.setFirmwareIdLSB(firmwareId.getId().getLeastSignificantBits());
|
||||
builder.setTitle(firmware.getTitle());
|
||||
builder.setVersion(firmware.getVersion());
|
||||
builder.setFileName(firmware.getFileName());
|
||||
builder.setContentType(firmware.getContentType());
|
||||
firmwareCacheWriter.put(firmwareId.toString(), firmware.getData().array());
|
||||
|
||||
@ -750,6 +750,7 @@ queue:
|
||||
transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
|
||||
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
|
||||
js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1}"
|
||||
fw-updates: "${TB_QUEUE_KAFKA_FW_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
|
||||
consumer-stats:
|
||||
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
|
||||
print-interval-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"
|
||||
@ -820,6 +821,10 @@ queue:
|
||||
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
|
||||
partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
|
||||
pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}"
|
||||
firmware:
|
||||
topic: "${TB_QUEUE_CORE_FW_TOPIC:tb_firmware}"
|
||||
pack-interval-ms: "${TB_QUEUE_CORE_FW_PACK_INTERVAL_MS:60000}"
|
||||
pack-size: "${TB_QUEUE_CORE_FW_PACK_SIZE:100}"
|
||||
usage-stats-topic: "${TB_QUEUE_US_TOPIC:tb_usage_stats}"
|
||||
stats:
|
||||
enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}"
|
||||
|
||||
@ -61,6 +61,8 @@ public interface DeviceService {
|
||||
|
||||
PageData<Device> findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink);
|
||||
|
||||
PageData<Device> findDevicesByTenantIdAndTypeAndEmptyFirmware(TenantId tenantId, String type, PageLink pageLink);
|
||||
|
||||
PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink);
|
||||
|
||||
PageData<DeviceInfo> findDeviceInfosByTenantIdAndDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId, PageLink pageLink);
|
||||
|
||||
@ -99,7 +99,7 @@ public class DataConstants {
|
||||
public static final String CURRENT_FIRMWARE_VERSION = "cur_fw_version";
|
||||
public static final String TARGET_FIRMWARE_TITLE = "target_fw_title";
|
||||
public static final String TARGET_FIRMWARE_VERSION = "target_fw_version";
|
||||
public static final String CURRENT_FIRMWARE_STATE = "cur_fw_state";
|
||||
public static final String FIRMWARE_STATE = "fw_state";
|
||||
|
||||
//attributes
|
||||
//telemetry
|
||||
|
||||
@ -37,6 +37,8 @@ public class TbKafkaTopicConfigs {
|
||||
private String notificationsProperties;
|
||||
@Value("${queue.kafka.topic-properties.js-executor}")
|
||||
private String jsExecutorProperties;
|
||||
@Value("${queue.kafka.topic-properties.fw-updates}")
|
||||
private String fwUpdatesProperties;
|
||||
|
||||
@Getter
|
||||
private Map<String, String> coreConfigs;
|
||||
@ -48,6 +50,8 @@ public class TbKafkaTopicConfigs {
|
||||
private Map<String, String> notificationsConfigs;
|
||||
@Getter
|
||||
private Map<String, String> jsExecutorConfigs;
|
||||
@Getter
|
||||
private Map<String, String> fwUpdatesConfigs;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
@ -56,6 +60,7 @@ public class TbKafkaTopicConfigs {
|
||||
transportApiConfigs = getConfigs(transportApiProperties);
|
||||
notificationsConfigs = getConfigs(notificationsProperties);
|
||||
jsExecutorConfigs = getConfigs(jsExecutorProperties);
|
||||
fwUpdatesConfigs = getConfigs(fwUpdatesProperties);
|
||||
}
|
||||
|
||||
private Map<String, String> getConfigs(String properties) {
|
||||
|
||||
@ -186,6 +186,17 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getFirmwareTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void destroy() {
|
||||
if (coreAdmin != null) {
|
||||
|
||||
@ -21,12 +21,13 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
@ -165,14 +166,25 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
|
||||
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getFirmwareTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -130,6 +130,16 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
|
||||
return new InMemoryTbQueueConsumer<>(coreSettings.getUsageStatsTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new InMemoryTbQueueConsumer<>(coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new InMemoryTbQueueProducer<>(coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
return new InMemoryTbQueueProducer<>(coreSettings.getUsageStatsTopic());
|
||||
|
||||
@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -73,6 +74,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
||||
private final TbQueueAdmin jsExecutorAdmin;
|
||||
private final TbQueueAdmin transportApiAdmin;
|
||||
private final TbQueueAdmin notificationAdmin;
|
||||
private final TbQueueAdmin fwUpdatesAdmin;
|
||||
|
||||
public KafkaMonolithQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
|
||||
TbServiceInfoProvider serviceInfoProvider,
|
||||
@ -98,6 +100,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
||||
this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
|
||||
this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
|
||||
this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
|
||||
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -273,6 +276,29 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
||||
return consumerBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||
consumerBuilder.settings(kafkaSettings);
|
||||
consumerBuilder.topic(coreSettings.getFirmwareTopic());
|
||||
consumerBuilder.clientId("monolith-fw-consumer-" + serviceInfoProvider.getServiceId());
|
||||
consumerBuilder.groupId("monolith-fw-consumer");
|
||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
consumerBuilder.admin(fwUpdatesAdmin);
|
||||
consumerBuilder.statsService(consumerStatsService);
|
||||
return consumerBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
|
||||
requestBuilder.settings(kafkaSettings);
|
||||
requestBuilder.clientId("monolith-fw-producer-" + serviceInfoProvider.getServiceId());
|
||||
requestBuilder.defaultTopic(coreSettings.getFirmwareTopic());
|
||||
requestBuilder.admin(fwUpdatesAdmin);
|
||||
return requestBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
|
||||
|
||||
@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -70,6 +71,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
private final TbQueueAdmin jsExecutorAdmin;
|
||||
private final TbQueueAdmin transportApiAdmin;
|
||||
private final TbQueueAdmin notificationAdmin;
|
||||
private final TbQueueAdmin fwUpdatesAdmin;
|
||||
|
||||
public KafkaTbCoreQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
|
||||
TbServiceInfoProvider serviceInfoProvider,
|
||||
@ -93,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
|
||||
this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
|
||||
this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
|
||||
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -241,6 +244,29 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
return consumerBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||
consumerBuilder.settings(kafkaSettings);
|
||||
consumerBuilder.topic(coreSettings.getFirmwareTopic());
|
||||
consumerBuilder.clientId("tb-core-fw-consumer-" + serviceInfoProvider.getServiceId());
|
||||
consumerBuilder.groupId("tb-core-fw-consumer");
|
||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
consumerBuilder.admin(fwUpdatesAdmin);
|
||||
consumerBuilder.statsService(consumerStatsService);
|
||||
return consumerBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
|
||||
requestBuilder.settings(kafkaSettings);
|
||||
requestBuilder.clientId("tb-core-fw-producer-" + serviceInfoProvider.getServiceId());
|
||||
requestBuilder.defaultTopic(coreSettings.getFirmwareTopic());
|
||||
requestBuilder.admin(fwUpdatesAdmin);
|
||||
return requestBuilder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
|
||||
|
||||
@ -22,6 +22,7 @@ import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.*;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
@ -190,6 +191,17 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
|
||||
|
||||
@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -164,6 +165,17 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
|
||||
|
||||
@ -24,6 +24,7 @@ import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -188,6 +189,17 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic());
|
||||
|
||||
@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -170,6 +171,17 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic());
|
||||
|
||||
@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -187,6 +188,17 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic());
|
||||
|
||||
@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -170,6 +171,17 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
|
||||
return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic(),
|
||||
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
|
||||
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
|
||||
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic());
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.queue.provider;
|
||||
|
||||
import org.thingsboard.server.gen.js.JsInvokeProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
@ -85,6 +86,20 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory {
|
||||
*/
|
||||
TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer();
|
||||
|
||||
/**
|
||||
* Used to consume messages about firmware update notifications by TB Core Service
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer();
|
||||
|
||||
/**
|
||||
* Used to consume messages about firmware update notifications by TB Core Service
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer();
|
||||
|
||||
/**
|
||||
* Used to consume high priority messages by TB Core Service
|
||||
*
|
||||
|
||||
@ -26,6 +26,9 @@ public class TbQueueCoreSettings {
|
||||
@Value("${queue.core.topic}")
|
||||
private String topic;
|
||||
|
||||
@Value("${queue.core.firmware.topic:tb_firmware}")
|
||||
private String firmwareTopic;
|
||||
|
||||
@Value("${queue.core.usage-stats-topic:tb_usage_stats}")
|
||||
private String usageStatsTopic;
|
||||
|
||||
|
||||
@ -359,8 +359,10 @@ message GetFirmwareResponseMsg {
|
||||
ResponseStatus responseStatus = 1;
|
||||
int64 firmwareIdMSB = 2;
|
||||
int64 firmwareIdLSB = 3;
|
||||
string contentType = 4;
|
||||
string fileName = 5;
|
||||
string title = 4;
|
||||
string version = 5;
|
||||
string contentType = 6;
|
||||
string fileName = 7;
|
||||
}
|
||||
|
||||
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
|
||||
@ -657,3 +659,13 @@ message ToUsageStatsServiceMsg {
|
||||
int64 entityIdLSB = 4;
|
||||
repeated UsageStatsKVProto values = 5;
|
||||
}
|
||||
|
||||
message ToFirmwareStateServiceMsg {
|
||||
int64 ts = 1;
|
||||
int64 tenantIdMSB = 2;
|
||||
int64 tenantIdLSB = 3;
|
||||
int64 deviceIdMSB = 4;
|
||||
int64 deviceIdLSB = 5;
|
||||
int64 firmwareIdMSB = 6;
|
||||
int64 firmwareIdLSB = 7;
|
||||
}
|
||||
|
||||
@ -207,6 +207,8 @@ public class DeviceApiController {
|
||||
|
||||
@RequestMapping(value = "/{deviceToken}/firmware", method = RequestMethod.GET)
|
||||
public DeferredResult<ResponseEntity> getFirmware(@PathVariable("deviceToken") String deviceToken,
|
||||
@RequestParam(value = "title") String title,
|
||||
@RequestParam(value = "version") String version,
|
||||
@RequestParam(value = "chunkSize", required = false, defaultValue = "0") int chunkSize,
|
||||
@RequestParam(value = "chunk", required = false, defaultValue = "0") int chunk) {
|
||||
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
|
||||
@ -217,7 +219,7 @@ public class DeviceApiController {
|
||||
.setTenantIdLSB(sessionInfo.getTenantIdLSB())
|
||||
.setDeviceIdMSB(sessionInfo.getDeviceIdMSB())
|
||||
.setDeviceIdLSB(sessionInfo.getDeviceIdLSB()).build();
|
||||
transportContext.getTransportService().process(sessionInfo, requestMsg, new GetFirmwareCallback(responseWriter, chunkSize, chunk));
|
||||
transportContext.getTransportService().process(sessionInfo, requestMsg, new GetFirmwareCallback(responseWriter, title, version, chunkSize, chunk));
|
||||
}));
|
||||
return responseWriter;
|
||||
}
|
||||
@ -278,11 +280,15 @@ public class DeviceApiController {
|
||||
|
||||
private class GetFirmwareCallback implements TransportServiceCallback<TransportProtos.GetFirmwareResponseMsg> {
|
||||
private final DeferredResult<ResponseEntity> responseWriter;
|
||||
private final String title;
|
||||
private final String version;
|
||||
private final int chuckSize;
|
||||
private final int chuck;
|
||||
|
||||
GetFirmwareCallback(DeferredResult<ResponseEntity> responseWriter, int chuckSize, int chuck) {
|
||||
GetFirmwareCallback(DeferredResult<ResponseEntity> responseWriter, String title, String version, int chuckSize, int chuck) {
|
||||
this.responseWriter = responseWriter;
|
||||
this.title = title;
|
||||
this.version = version;
|
||||
this.chuckSize = chuckSize;
|
||||
this.chuck = chuck;
|
||||
}
|
||||
@ -291,7 +297,7 @@ public class DeviceApiController {
|
||||
public void onSuccess(TransportProtos.GetFirmwareResponseMsg firmwareResponseMsg) {
|
||||
if (!TransportProtos.ResponseStatus.SUCCESS.equals(firmwareResponseMsg.getResponseStatus())) {
|
||||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND));
|
||||
} else {
|
||||
} else if (title.equals(firmwareResponseMsg.getTitle()) && version.equals(firmwareResponseMsg.getVersion())) {
|
||||
String firmwareId = new UUID(firmwareResponseMsg.getFirmwareIdMSB(), firmwareResponseMsg.getFirmwareIdLSB()).toString();
|
||||
ByteArrayResource resource = new ByteArrayResource(transportContext.getFirmwareCacheReader().get(firmwareId, chuckSize, chuck));
|
||||
ResponseEntity<ByteArrayResource> response = ResponseEntity.ok()
|
||||
@ -301,6 +307,8 @@ public class DeviceApiController {
|
||||
.contentType(parseMediaType(firmwareResponseMsg.getContentType()))
|
||||
.body(resource);
|
||||
responseWriter.setResult(response);
|
||||
} else {
|
||||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -81,6 +81,8 @@ public interface DeviceDao extends Dao<Device>, TenantEntityDao {
|
||||
*/
|
||||
PageData<Device> findDevicesByTenantIdAndType(UUID tenantId, String type, PageLink pageLink);
|
||||
|
||||
PageData<Device> findDevicesByTenantIdAndTypeAndEmptyFirmware(UUID tenantId, String type, PageLink pageLink);
|
||||
|
||||
/**
|
||||
* Find device infos by tenantId, type and page link.
|
||||
*
|
||||
|
||||
@ -354,6 +354,15 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
|
||||
return deviceDao.findDevicesByTenantIdAndType(tenantId.getId(), type, pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<Device> findDevicesByTenantIdAndTypeAndEmptyFirmware(TenantId tenantId, String type, PageLink pageLink) {
|
||||
log.trace("Executing findDevicesByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink);
|
||||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
|
||||
validateString(type, "Incorrect type " + type);
|
||||
validatePageLink(pageLink);
|
||||
return deviceDao.findDevicesByTenantIdAndTypeAndEmptyFirmware(tenantId.getId(), type, pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) {
|
||||
log.trace("Executing findDeviceInfosByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink);
|
||||
|
||||
@ -94,6 +94,15 @@ public interface DeviceRepository extends PagingAndSortingRepository<DeviceEntit
|
||||
@Param("textSearch") String textSearch,
|
||||
Pageable pageable);
|
||||
|
||||
@Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId " +
|
||||
"AND d.type = :type " +
|
||||
"AND d.firmwareId = null " +
|
||||
"AND LOWER(d.searchText) LIKE LOWER(CONCAT(:textSearch, '%'))")
|
||||
Page<DeviceEntity> findByTenantIdAndTypeAndFirmwareIdIsNull(@Param("tenantId") UUID tenantId,
|
||||
@Param("type") String type,
|
||||
@Param("textSearch") String textSearch,
|
||||
Pageable pageable);
|
||||
|
||||
@Query("SELECT new org.thingsboard.server.dao.model.sql.DeviceInfoEntity(d, c.title, c.additionalInfo, p.name) " +
|
||||
"FROM DeviceEntity d " +
|
||||
"LEFT JOIN CustomerEntity c on c.id = d.customerId " +
|
||||
|
||||
@ -149,6 +149,16 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
|
||||
DaoUtil.toPageable(pageLink)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<Device> findDevicesByTenantIdAndTypeAndEmptyFirmware(UUID tenantId, String type, PageLink pageLink) {
|
||||
return DaoUtil.toPageData(
|
||||
deviceRepository.findByTenantIdAndTypeAndFirmwareIdIsNull(
|
||||
tenantId,
|
||||
type,
|
||||
Objects.toString(pageLink.getTextSearch(), ""),
|
||||
DaoUtil.toPageable(pageLink)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(UUID tenantId, String type, PageLink pageLink) {
|
||||
return DaoUtil.toPageData(
|
||||
|
||||
@ -41,7 +41,6 @@ zk:
|
||||
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
|
||||
|
||||
cache:
|
||||
# caffeine or redis
|
||||
type: "${CACHE_TYPE:redis}"
|
||||
|
||||
redis:
|
||||
|
||||
@ -36,7 +36,6 @@ zk:
|
||||
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
|
||||
|
||||
cache:
|
||||
# caffeine or redis
|
||||
type: "${CACHE_TYPE:redis}"
|
||||
|
||||
redis:
|
||||
|
||||
@ -41,7 +41,6 @@ zk:
|
||||
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
|
||||
|
||||
cache:
|
||||
# caffeine or redis
|
||||
type: "${CACHE_TYPE:redis}"
|
||||
|
||||
redis:
|
||||
|
||||
@ -41,7 +41,6 @@ zk:
|
||||
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
|
||||
|
||||
cache:
|
||||
# caffeine or redis
|
||||
type: "${CACHE_TYPE:redis}"
|
||||
|
||||
redis:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user