update device profiles queue by tenant profile

This commit is contained in:
YevhenBondarenko 2022-05-02 23:44:43 +02:00
parent f4353e2db5
commit 547713f30a
9 changed files with 111 additions and 45 deletions

View File

@ -19,15 +19,19 @@ import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueClusterService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.ArrayList;
@ -41,9 +45,12 @@ import java.util.stream.Collectors;
@TbCoreComponent
@AllArgsConstructor
public class DefaultTbQueueService implements TbQueueService {
private static final String MAIN = "Main";
private final QueueService queueService;
private final TbQueueClusterService queueClusterService;
private final TbClusterService tbClusterService;
private final TbQueueAdmin tbQueueAdmin;
private final DeviceProfileService deviceProfileService;
@Override
public Queue saveQueue(Queue queue) {
@ -90,8 +97,8 @@ public class DefaultTbQueueService implements TbQueueService {
}
}
if (queueClusterService != null) {
queueClusterService.onQueueChange(queue);
if (tbClusterService != null) {
tbClusterService.onQueueChange(queue);
}
}
@ -106,13 +113,13 @@ public class DefaultTbQueueService implements TbQueueService {
tbQueueAdmin.createTopicIfNotExists(
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
}
if (queueClusterService != null) {
queueClusterService.onQueueChange(queue);
if (tbClusterService != null) {
tbClusterService.onQueueChange(queue);
}
} else {
log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName());
if (queueClusterService != null) {
queueClusterService.onQueueChange(queue);
if (tbClusterService != null) {
tbClusterService.onQueueChange(queue);
}
await();
for (int i = currentPartitions; i < oldPartitions; i++) {
@ -120,14 +127,14 @@ public class DefaultTbQueueService implements TbQueueService {
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
}
}
} else if (!oldQueue.equals(queue) && queueClusterService != null) {
queueClusterService.onQueueChange(queue);
} else if (!oldQueue.equals(queue) && tbClusterService != null) {
tbClusterService.onQueueChange(queue);
}
}
private void onQueueDeleted(TenantId tenantId, Queue queue) {
if (queueClusterService != null) {
queueClusterService.onQueueDelete(queue);
if (tbClusterService != null) {
tbClusterService.onQueueDelete(queue);
await();
}
// queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId);
@ -151,7 +158,6 @@ public class DefaultTbQueueService implements TbQueueService {
@Override
public void updateQueuesByTenants(List<TenantId> tenantIds, TenantProfile newTenantProfile, TenantProfile oldTenantProfile) {
boolean oldIsolated = oldTenantProfile != null && oldTenantProfile.isIsolatedTbRuleEngine();
boolean newIsolated = newTenantProfile.isIsolatedTbRuleEngine();
@ -199,9 +205,35 @@ public class DefaultTbQueueService implements TbQueueService {
}
tenantIds.forEach(tenantId -> {
toRemove.forEach(q -> deleteQueueByQueueName(tenantId, q));
Map<QueueId, List<DeviceProfile>> deviceProfileQueues;
toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key))));
if (oldTenantProfile != null && !newTenantProfile.getId().equals(oldTenantProfile.getId()) || !toRemove.isEmpty()) {
List<DeviceProfile> deviceProfiles = deviceProfileService.findDeviceProfiles(tenantId, new PageLink(Integer.MAX_VALUE)).getData();
deviceProfileQueues = deviceProfiles.stream()
.filter(dp -> dp.getDefaultQueueId() != null)
.collect(Collectors.groupingBy(DeviceProfile::getDefaultQueueId));
} else {
deviceProfileQueues = Collections.emptyMap();
}
Map<String, QueueId> createdQueues = toCreate.stream()
.map(key -> saveQueue(new Queue(tenantId, newQueues.get(key))))
.collect(Collectors.toMap(Queue::getName, Queue::getId));
// assigning created queues to device profiles instead of system queues
if (oldTenantProfile != null && !oldTenantProfile.isIsolatedTbRuleEngine()) {
deviceProfileQueues.forEach((queueId, list) -> {
Queue queue = queueService.findQueueById(TenantId.SYS_TENANT_ID, queueId);
QueueId queueIdToAssign = createdQueues.get(queue.getName());
if (queueIdToAssign == null) {
queueIdToAssign = createdQueues.get(MAIN);
}
for (DeviceProfile deviceProfile : list) {
deviceProfile.setDefaultQueueId(queueIdToAssign);
saveDeviceProfile(deviceProfile);
}
});
}
toUpdate.forEach(key -> {
Queue queueToUpdate = new Queue(tenantId, newQueues.get(key));
@ -215,7 +247,30 @@ public class DefaultTbQueueService implements TbQueueService {
saveQueue(queueToUpdate);
}
});
toRemove.forEach(q -> {
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q);
QueueId queueIdForRemove = queue.getId();
if (deviceProfileQueues.containsKey(queueIdForRemove)) {
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, q);
if (foundQueue == null) {
foundQueue = queueService.findQueueByTenantIdAndName(tenantId, MAIN);
}
QueueId newQueueId = foundQueue.getId();
deviceProfileQueues.get(queueIdForRemove).stream()
.peek(dp -> dp.setDefaultQueueId(newQueueId))
.forEach(this::saveDeviceProfile);
}
deleteQueue(tenantId, queueIdForRemove);
});
});
}
//TODO: remove after implementing TbDeviceProfileService
private void saveDeviceProfile(DeviceProfile deviceProfile) {
DeviceProfile savedDeviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile);
tbClusterService.onDeviceProfileChange(savedDeviceProfile, null);
tbClusterService.broadcastEntityStateChangeEvent(deviceProfile.getTenantId(), savedDeviceProfile.getId(), ComponentLifecycleEvent.UPDATED);
}
}

View File

@ -86,6 +86,8 @@ public class DefaultTbClusterService implements TbClusterService {
private boolean statsEnabled;
@Value("${edges.enabled}")
protected boolean edgesEnabled;
@Value("${service.type:monolith}")
private String serviceType;
private final AtomicInteger toCoreMsgs = new AtomicInteger(0);
private final AtomicInteger toCoreNfs = new AtomicInteger(0);
@ -503,10 +505,10 @@ public class DefaultTbClusterService implements TbClusterService {
.setPartitions(queue.getPartitions())
.build();
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build();
ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build();
ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build();
doSendQueueNotifications(transportMsg, coreMsg, ruleEngineMsg);
ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build();
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build();
doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg);
}
@Override
@ -521,32 +523,32 @@ public class DefaultTbClusterService implements TbClusterService {
.setQueueName(queue.getName())
.build();
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build();
ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build();
ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build();
doSendQueueNotifications(transportMsg, coreMsg, ruleEngineMsg);
}
private void doSendQueueNotifications(ToTransportMsg transportMsg, ToCoreNotificationMsg coreMsg, ToRuleEngineNotificationMsg ruleEngineMsg) {
Set<TransportProtos.ServiceInfo> tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT);
for (TransportProtos.ServiceInfo transportService : tbTransportServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportService.getServiceId());
producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null);
toTransportNfs.incrementAndGet();
}
Set<TransportProtos.ServiceInfo> tbCoreServices = partitionService.getAllServices(ServiceType.TB_CORE);
for (TransportProtos.ServiceInfo coreService : tbCoreServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreService.getServiceId());
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null);
toCoreNfs.incrementAndGet();
ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build();
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build();
doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg);
}
private void doSendQueueNotifications(ToRuleEngineNotificationMsg ruleEngineMsg, ToCoreNotificationMsg coreMsg, ToTransportMsg transportMsg) {
Set<TransportProtos.ServiceInfo> tbRuleEngineServices = partitionService.getAllServices(ServiceType.TB_RULE_ENGINE);
for (TransportProtos.ServiceInfo ruleEngineService : tbRuleEngineServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineService.getServiceId());
producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null);
toRuleEngineNfs.incrementAndGet();
}
if (!serviceType.equals("monolith")) {
Set<TransportProtos.ServiceInfo> tbCoreServices = partitionService.getAllServices(ServiceType.TB_CORE);
for (TransportProtos.ServiceInfo coreService : tbCoreServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreService.getServiceId());
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null);
toCoreNfs.incrementAndGet();
}
Set<TransportProtos.ServiceInfo> tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT);
for (TransportProtos.ServiceInfo transportService : tbTransportServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportService.getServiceId());
producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null);
toTransportNfs.incrementAndGet();
}
}
}
}

View File

@ -117,7 +117,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final EdgeNotificationService edgeNotificationService;
private final OtaPackageStateService firmwareStateService;
private final TbCoreConsumerStats stats;
private final PartitionService partitionService;
protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
private final TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
@ -139,7 +138,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
TbApiUsageStateService apiUsageStateService,
EdgeNotificationService edgeNotificationService,
OtaPackageStateService firmwareStateService, PartitionService partitionService) {
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, partitionService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
this.firmwareStatesConsumer = tbCoreQueueFactory.createToOtaPackageStateServiceMsgConsumer();
@ -151,7 +150,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
this.stats = new TbCoreConsumerStats(statsFactory);
this.statsService = statsService;
this.firmwareStateService = firmwareStateService;
this.partitionService = partitionService;
}
@PostConstruct

View File

@ -102,7 +102,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
private final RuleEngineStatisticsService statisticsService;
private final TbRuleEngineDeviceRpcService tbDeviceRpcService;
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final QueueService queueService;
// private final TenantId tenantId;
@ -125,14 +124,13 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
TbTenantProfileCache tenantProfileCache,
TbApiUsageStateService apiUsageStateService,
PartitionService partitionService, TbServiceInfoProvider serviceInfoProvider, QueueService queueService) {
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, partitionService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
this.statisticsService = statisticsService;
this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
this.submitStrategyFactory = submitStrategyFactory;
this.processingStrategyFactory = processingStrategyFactory;
this.tbDeviceRpcService = tbDeviceRpcService;
this.statsFactory = statsFactory;
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.queueService = queueService;
// this.tenantId = actorContext.getServiceInfoProvider().getIsolatedTenant().orElse(TenantId.SYS_TENANT_ID);

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
@ -69,17 +70,20 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
protected final TbTenantProfileCache tenantProfileCache;
protected final TbDeviceProfileCache deviceProfileCache;
protected final TbApiUsageStateService apiUsageStateService;
protected final PartitionService partitionService;
protected final TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer;
public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache,
TbApiUsageStateService apiUsageStateService, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) {
TbApiUsageStateService apiUsageStateService, PartitionService partitionService,
TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) {
this.actorContext = actorContext;
this.encodingService = encodingService;
this.tenantProfileCache = tenantProfileCache;
this.deviceProfileCache = deviceProfileCache;
this.apiUsageStateService = apiUsageStateService;
this.partitionService = partitionService;
this.nfConsumer = nfConsumer;
}
@ -166,6 +170,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
}
} else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
tenantProfileCache.evict(componentLifecycleMsg.getTenantId());
partitionService.removeTenant(componentLifecycleMsg.getTenantId());
if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) {
apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId());
} else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) {

View File

@ -285,6 +285,11 @@ public class HashPartitionService implements PartitionService {
return Math.abs(hash % partitions);
}
@Override
public void removeTenant(TenantId tenantId) {
tenantRoutingInfoMap.remove(tenantId);
}
@Override
public int countTransportsByType(String type) {
var list = tbTransportServicesByType.get(type);

View File

@ -56,6 +56,8 @@ public interface PartitionService {
int resolvePartitionIndex(UUID entityId, int partitions);
void removeTenant(TenantId tenantId);
int countTransportsByType(String type);
void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg);

View File

@ -51,7 +51,7 @@ public class QueueRoutingInfo {
}
public QueueRoutingInfo(QueueUpdateMsg queueUpdateMsg) {
this.tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getQueueIdLSB()));
this.tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
this.queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
this.queueName = queueUpdateMsg.getQueueName();
this.queueTopic = queueUpdateMsg.getQueueTopic();

View File

@ -911,6 +911,7 @@ public class DefaultTransportService implements TransportService {
Optional<Tenant> profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray());
if (profileOpt.isPresent()) {
Tenant tenant = profileOpt.get();
partitionService.removeTenant(tenant.getId());
boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId());
if (updated) {
rateLimitService.update(tenant.getId());