Tenant Profile Cache

This commit is contained in:
Andrii Shvaika 2020-10-13 11:53:22 +03:00
parent ff4ca5a760
commit 6f80cf95e2
10 changed files with 195 additions and 27 deletions

View File

@ -72,6 +72,7 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
import org.thingsboard.server.service.mail.MailExecutorService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
@ -127,6 +128,10 @@ public class ActorSystemContext {
@Getter
private DeviceService deviceService;
@Autowired
@Getter
private TbTenantProfileCache tenantProfileCache;
@Autowired
@Getter
private TbDeviceProfileCache deviceProfileCache;

View File

@ -30,6 +30,7 @@ import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.MsgType;
@ -40,7 +41,9 @@ import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import java.util.HashSet;
@ -50,13 +53,14 @@ import java.util.Set;
@Slf4j
public class AppActor extends ContextAwareActor {
private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
private final TbTenantProfileCache tenantProfileCache;
private final TenantService tenantService;
private final Set<TenantId> deletedTenants;
private boolean ruleChainsInitialized;
private AppActor(ActorSystemContext systemContext) {
super(systemContext);
this.tenantProfileCache = systemContext.getTenantProfileCache();
this.tenantService = systemContext.getTenantService();
this.deletedTenants = new HashSet<>();
}
@ -117,8 +121,7 @@ public class AppActor extends ContextAwareActor {
boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
for (Tenant tenant : tenantIterator) {
// TODO: Tenant Profile from cache
TenantProfile tenantProfile = systemContext.getTenantProfileService().findTenantProfileById(TenantId.SYS_TENANT_ID, tenant.getTenantProfileId());
TenantProfile tenantProfile = tenantProfileCache.get(tenant.getTenantProfileId());
if (isCore || (isRuleEngine && !tenantProfile.isIsolatedTbRuleEngine())) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
@ -133,7 +136,7 @@ public class AppActor extends ContextAwareActor {
}
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
if (!deletedTenants.contains(msg.getTenantId())) {
@ -146,15 +149,23 @@ public class AppActor extends ContextAwareActor {
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
TbActorRef target = null;
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
log.warn("Message has system tenant id: {}", msg);
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
if (msg.getEntityId().getEntityType() == EntityType.TENANT_PROFILE) {
tenantProfileCache.evict(new TenantProfileId(msg.getEntityId().getId()));
} else {
log.warn("Message has system tenant id: {}", msg);
}
} else {
if (msg.getEntityId().getEntityType() == EntityType.TENANT
&& msg.getEvent() == ComponentLifecycleEvent.DELETED) {
log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
if (msg.getEntityId().getEntityType() == EntityType.TENANT) {
TenantId tenantId = new TenantId(msg.getEntityId().getId());
deletedTenants.add(tenantId);
ctx.stop(new TbEntityActorId(tenantId));
tenantProfileCache.evict(tenantId);
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
deletedTenants.add(tenantId);
ctx.stop(new TbEntityActorId(tenantId));
} else {
target = getOrCreateTenantActor(msg.getTenantId());
}
} else {
target = getOrCreateTenantActor(msg.getTenantId());
}

View File

@ -77,9 +77,7 @@ public class TenantActor extends RuleChainManagerActor {
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
// TODO: Tenant Profile from cache
TenantProfile tenantProfile = systemContext.getTenantProfileService().findTenantProfileById(tenantId, tenant.getTenantProfileId());
TenantProfile tenantProfile = systemContext.getTenantProfileCache().get(tenant.getTenantProfileId());
isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);

View File

@ -94,6 +94,7 @@ import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.AccessControlService;
@ -205,6 +206,9 @@ public abstract class BaseController {
@Autowired
protected TbQueueProducerProvider producerProvider;
@Autowired
protected TbTenantProfileCache tenantProfileCache;
@Autowired
protected TbDeviceProfileCache deviceProfileCache;

View File

@ -91,6 +91,7 @@ public class TenantController extends BaseController {
if (newTenant) {
installScripts.createDefaultRuleChains(tenant.getId());
}
tenantProfileCache.evict(tenant.getId());
return tenant;
} catch (Exception e) {
throw handleException(e);
@ -106,6 +107,7 @@ public class TenantController extends BaseController {
TenantId tenantId = new TenantId(toUUID(strTenantId));
checkTenantId(tenantId, Operation.DELETE);
tenantService.deleteTenant(tenantId);
tenantProfileCache.evict(tenantId);
tbClusterService.onEntityStateChange(tenantId, tenantId, ComponentLifecycleEvent.DELETED);
} catch (Exception e) {
throw handleException(e);

View File

@ -29,9 +29,11 @@ import org.springframework.web.bind.annotation.RestController;
import org.thingsboard.server.common.data.EntityInfo;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.security.permission.Resource;
@ -93,8 +95,11 @@ public class TenantProfileController extends BaseController {
}
tenantProfile = checkNotNull(tenantProfileService.saveTenantProfile(getTenantId(), tenantProfile));
tenantProfileCache.put(tenantProfile);
tbClusterService.onEntityStateChange(TenantId.SYS_TENANT_ID, tenantProfile.getId(),
newTenantProfile ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
return tenantProfile;
} catch (Exception e) {
} catch (Exception e) {
throw handleException(e);
}
}

View File

@ -0,0 +1,106 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.profile;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Service
@Slf4j
public class DefaultTbTenantProfileCache implements TbTenantProfileCache {
private final Lock tenantProfileFetchLock = new ReentrantLock();
private final TenantProfileService tenantProfileService;
private final TenantService tenantService;
private final ConcurrentMap<TenantProfileId, TenantProfile> tenantProfilesMap = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, TenantProfileId> tenantsMap = new ConcurrentHashMap<>();
public DefaultTbTenantProfileCache(TenantProfileService tenantProfileService, TenantService tenantService) {
this.tenantProfileService = tenantProfileService;
this.tenantService = tenantService;
}
@Override
public TenantProfile get(TenantProfileId tenantProfileId) {
TenantProfile profile = tenantProfilesMap.get(tenantProfileId);
if (profile == null) {
profile = tenantProfilesMap.get(tenantProfileId);
if (profile == null) {
tenantProfileFetchLock.lock();
try {
profile = tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, tenantProfileId);
if (profile != null) {
tenantProfilesMap.put(tenantProfileId, profile);
}
} finally {
tenantProfileFetchLock.unlock();
}
}
}
return profile;
}
@Override
public TenantProfile get(TenantId tenantId) {
TenantProfileId profileId = tenantsMap.get(tenantId);
if (profileId == null) {
Tenant tenant = tenantService.findTenantById(tenantId);
if (tenant != null) {
profileId = tenant.getTenantProfileId();
tenantsMap.put(tenantId, profileId);
} else {
return null;
}
}
return get(profileId);
}
@Override
public void put(TenantProfile profile) {
if (profile.getId() != null) {
tenantProfilesMap.put(profile.getId(), profile);
}
}
@Override
public void evict(TenantProfileId profileId) {
tenantProfilesMap.remove(profileId);
}
@Override
public void evict(TenantId tenantId) {
tenantsMap.remove(tenantId);
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.profile;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
public interface TbTenantProfileCache {
TenantProfile get(TenantId tenantId);
TenantProfile get(TenantProfileId tenantProfileId);
void put(TenantProfile profile);
void evict(TenantProfileId id);
void evict(TenantId id);
}

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
@Slf4j
@Service
@ -33,19 +34,18 @@ public class DefaultTenantRoutingInfoService implements TenantRoutingInfoService
private final TenantService tenantService;
private final TenantProfileService tenantProfileService;
private final TbTenantProfileCache tenantProfileCache;
public DefaultTenantRoutingInfoService(TenantService tenantService, TenantProfileService tenantProfileService) {
public DefaultTenantRoutingInfoService(TenantService tenantService, TbTenantProfileCache tenantProfileCache) {
this.tenantService = tenantService;
this.tenantProfileService = tenantProfileService;
this.tenantProfileCache = tenantProfileCache;
}
@Override
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
Tenant tenant = tenantService.findTenantById(tenantId);
if (tenant != null) {
// TODO: Tenant Profile from cache
TenantProfile tenantProfile = tenantProfileService.findTenantProfileById(tenantId, tenant.getTenantProfileId());
TenantProfile tenantProfile = tenantProfileCache.get(tenant.getTenantProfileId());
return new TenantRoutingInfo(tenantId, tenantProfile.isIsolatedTbCore(), tenantProfile.isIsolatedTbRuleEngine());
} else {
throw new RuntimeException("Tenant not found!");

View File

@ -70,6 +70,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.dao.device.provision.ProvisionFailedException;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.state.DeviceStateService;
@ -92,7 +93,7 @@ public class DefaultTransportApiService implements TransportApiService {
//TODO: Constructor dependencies;
private final DeviceProfileService deviceProfileService;
private final TenantService tenantService;
private final TenantProfileService tenantProfileService;
private final TbTenantProfileCache tenantProfileCache;
private final DeviceService deviceService;
private final RelationService relationService;
private final DeviceCredentialsService deviceCredentialsService;
@ -106,14 +107,14 @@ public class DefaultTransportApiService implements TransportApiService {
private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>();
public DefaultTransportApiService(DeviceProfileService deviceProfileService, TenantService tenantService,
TenantProfileService tenantProfileService, DeviceService deviceService,
TbTenantProfileCache tenantProfileCache, DeviceService deviceService,
RelationService relationService, DeviceCredentialsService deviceCredentialsService,
DeviceStateService deviceStateService, DbCallbackExecutorService dbCallbackExecutorService,
TbClusterService tbClusterService, DataDecodingEncodingService dataDecodingEncodingService,
DeviceProvisionService deviceProvisionService) {
this.deviceProfileService = deviceProfileService;
this.tenantService = tenantService;
this.tenantProfileService = tenantProfileService;
this.tenantProfileCache = tenantProfileCache;
this.deviceService = deviceService;
this.relationService = relationService;
this.deviceCredentialsService = deviceCredentialsService;
@ -321,10 +322,8 @@ public class DefaultTransportApiService implements TransportApiService {
private ListenableFuture<TransportApiResponseMsg> handle(GetTenantRoutingInfoRequestMsg requestMsg) {
TenantId tenantId = new TenantId(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB()));
// TODO: Tenant Profile from cache
ListenableFuture<TenantProfile> tenantProfileFuture =
Futures.transform(tenantService.findTenantByIdAsync(TenantId.SYS_TENANT_ID, tenantId), tenant ->
tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, tenant.getTenantProfileId()), dbCallbackExecutorService);
ListenableFuture<TenantProfile> tenantProfileFuture = Futures.immediateFuture(tenantProfileCache.get(tenantId));
return Futures.transform(tenantProfileFuture, tenantProfile -> TransportApiResponseMsg.newBuilder()
.setGetTenantRoutingInfoResponseMsg(GetTenantRoutingInfoResponseMsg.newBuilder().setIsolatedTbCore(tenantProfile.isIsolatedTbCore())
.setIsolatedTbRuleEngine(tenantProfile.isIsolatedTbRuleEngine()).build()).build(), dbCallbackExecutorService);