Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Igor Kulikov 2020-10-14 12:32:36 +03:00
commit 45e491449f
14 changed files with 122 additions and 56 deletions

View File

@ -544,7 +544,11 @@ public class ActorSystemContext {
public void scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) {
log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs);
getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS);
if (delayInMs > 0) {
getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS);
} else {
ctx.tell(msg);
}
}
}

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
@ -477,6 +478,16 @@ class DefaultTbContext implements TbContext {
mainCtx.getRuleNodeStateService().removeByRuleNodeId(getTenantId(), getSelfId());
}
@Override
public void addProfileListener(Consumer<DeviceProfile> listener) {
mainCtx.getDeviceProfileCache().addListener(getTenantId(), getSelfId(), listener);
}
@Override
public void removeProfileListener() {
mainCtx.getDeviceProfileCache().removeListener(getTenantId(), getSelfId());
}
private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("ruleNodeId", ruleNodeId.toString());

View File

@ -94,7 +94,6 @@ public class DeviceProfileController extends BaseController {
DeviceProfile savedDeviceProfile = checkNotNull(deviceProfileService.saveDeviceProfile(deviceProfile));
deviceProfileCache.put(savedDeviceProfile);
tbClusterService.onDeviceProfileChange(savedDeviceProfile, null);
tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), savedDeviceProfile.getId(),
created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
@ -120,7 +119,6 @@ public class DeviceProfileController extends BaseController {
DeviceProfileId deviceProfileId = new DeviceProfileId(toUUID(strDeviceProfileId));
DeviceProfile deviceProfile = checkDeviceProfileId(deviceProfileId, Operation.DELETE);
deviceProfileService.deleteDeviceProfile(getTenantId(), deviceProfileId);
deviceProfileCache.evict(deviceProfileId);
tbClusterService.onDeviceProfileDelete(deviceProfile, null);
tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), deviceProfile.getId(), ComponentLifecycleEvent.DELETED);

View File

@ -170,7 +170,7 @@ public class DefaultDataUpdateService implements DataUpdateService {
ruleChainService.saveRuleChainMetaData(tenant.getId(), md);
}
} catch (Exception e) {
log.error("Unable to update Tenant", e);
log.error("[{}] Unable to update Tenant: {}", tenant.getId(), tenant.getName(), e);
}
}
};

View File

@ -21,14 +21,17 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@Service
@Slf4j
@ -40,6 +43,7 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfilesMap = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, DeviceProfileId> devicesMap = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, ConcurrentMap<EntityId, Consumer<DeviceProfile>>> listeners = new ConcurrentHashMap<>();
public DefaultTbDeviceProfileCache(DeviceProfileService deviceProfileService, DeviceService deviceService) {
this.deviceProfileService = deviceProfileService;
@ -50,19 +54,21 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
public DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId) {
DeviceProfile profile = deviceProfilesMap.get(deviceProfileId);
if (profile == null) {
profile = deviceProfilesMap.get(deviceProfileId);
if (profile == null) {
deviceProfileFetchLock.lock();
try {
deviceProfileFetchLock.lock();
try {
profile = deviceProfilesMap.get(deviceProfileId);
if (profile == null) {
profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
if (profile != null) {
deviceProfilesMap.put(deviceProfileId, profile);
log.info("[{}] Fetch device profile into cache: {}", profile.getId(), profile);
}
} finally {
deviceProfileFetchLock.unlock();
}
} finally {
deviceProfileFetchLock.unlock();
}
}
log.trace("[{}] Found device profile in cache: {}", deviceProfileId, profile);
return profile;
}
@ -85,12 +91,19 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
public void put(DeviceProfile profile) {
if (profile.getId() != null) {
deviceProfilesMap.put(profile.getId(), profile);
log.info("[{}] pushed device profile to cache: {}", profile.getId(), profile);
notifyListeners(profile);
}
}
@Override
public void evict(DeviceProfileId profileId) {
deviceProfilesMap.remove(profileId);
public void evict(TenantId tenantId, DeviceProfileId profileId) {
DeviceProfile oldProfile = deviceProfilesMap.remove(profileId);
log.info("[{}] evict device profile from cache: {}", profileId, oldProfile);
DeviceProfile newProfile = get(tenantId, profileId);
if (newProfile != null) {
notifyListeners(newProfile);
}
}
@Override
@ -98,4 +111,24 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
devicesMap.remove(deviceId);
}
@Override
public void addListener(TenantId tenantId, EntityId listenerId, Consumer<DeviceProfile> listener) {
listeners.computeIfAbsent(tenantId, id -> new ConcurrentHashMap<>()).put(listenerId, listener);
}
@Override
public void removeListener(TenantId tenantId, EntityId listenerId) {
ConcurrentMap<EntityId, Consumer<DeviceProfile>> tenantListeners = listeners.get(tenantId);
if (tenantListeners != null) {
tenantListeners.remove(listenerId);
}
}
private void notifyListeners(DeviceProfile profile) {
ConcurrentMap<EntityId, Consumer<DeviceProfile>> tenantListeners = listeners.get(profile.getTenantId());
if (tenantListeners != null) {
tenantListeners.forEach((id, listener) -> listener.accept(profile));
}
}
}

View File

@ -55,17 +55,17 @@ public class DefaultTbTenantProfileCache implements TbTenantProfileCache {
public TenantProfile get(TenantProfileId tenantProfileId) {
TenantProfile profile = tenantProfilesMap.get(tenantProfileId);
if (profile == null) {
profile = tenantProfilesMap.get(tenantProfileId);
if (profile == null) {
tenantProfileFetchLock.lock();
try {
tenantProfileFetchLock.lock();
try {
profile = tenantProfilesMap.get(tenantProfileId);
if (profile == null) {
profile = tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, tenantProfileId);
if (profile != null) {
tenantProfilesMap.put(tenantProfileId, profile);
}
} finally {
tenantProfileFetchLock.unlock();
}
} finally {
tenantProfileFetchLock.unlock();
}
}
return profile;

View File

@ -19,12 +19,13 @@ import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
public interface TbDeviceProfileCache extends RuleEngineDeviceProfileCache {
void put(DeviceProfile profile);
void evict(DeviceProfileId id);
void evict(TenantId tenantId, DeviceProfileId id);
void evict(DeviceId id);

View File

@ -15,11 +15,7 @@
*/
package org.thingsboard.server.service.profile;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;

View File

@ -221,7 +221,8 @@ public class DefaultTbClusterService implements TbClusterService {
byte[] msgBytes = encodingService.encode(msg);
TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
Set<String> tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE));
if (msg.getEntityId().getEntityType().equals(EntityType.TENANT)) {
if (msg.getEntityId().getEntityType().equals(EntityType.TENANT)
|| msg.getEntityId().getEntityType().equals(EntityType.DEVICE_PROFILE)) {
TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
for (String serviceId : tbCoreServices) {

View File

@ -144,7 +144,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
if (actorMsg instanceof ComponentLifecycleMsg) {
ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;
if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
deviceProfileCache.evict(new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
} else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId()));
}

View File

@ -263,12 +263,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));

View File

@ -18,8 +18,11 @@ package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.function.Consumer;
/**
* Created by ashvayka on 02.04.18.
*/
@ -29,4 +32,8 @@ public interface RuleEngineDeviceProfileCache {
DeviceProfile get(TenantId tenantId, DeviceId deviceId);
void addListener(TenantId tenantId, EntityId listenerId, Consumer<DeviceProfile> listener);
void removeListener(TenantId tenantId, EntityId listenerId);
}

View File

@ -20,6 +20,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
@ -223,4 +224,8 @@ public interface TbContext {
RuleNodeState saveRuleNodeState(RuleNodeState state);
void clearRuleNodeStates();
void addProfileListener(Consumer<DeviceProfile> listener);
void removeProfileListener();
}

View File

@ -39,6 +39,7 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -57,16 +58,20 @@ import java.util.concurrent.TimeUnit;
)
public class TbDeviceProfileNode implements TbNode {
private static final String PERIODIC_MSG_TYPE = "TbDeviceProfilePeriodicMsg";
private static final String PROFILE_UPDATE_MSG_TYPE = "TbDeviceProfileUpdateMsg";
private TbDeviceProfileNodeConfiguration config;
private RuleEngineDeviceProfileCache cache;
private TbContext ctx;
private final Map<DeviceId, DeviceState> deviceStates = new ConcurrentHashMap<>();
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbDeviceProfileNodeConfiguration.class);
this.cache = ctx.getDeviceProfileCache();
this.ctx = ctx;
scheduleAlarmHarvesting(ctx);
ctx.addProfileListener(this::onProfileUpdate);
if (config.isFetchAlarmRulesStateOnStart()) {
log.info("[{}] Fetching alarm rule state", ctx.getSelfId());
int fetchCount = 0;
@ -95,15 +100,14 @@ public class TbDeviceProfileNode implements TbNode {
}
}
/**
* TODO: Dynamic values evaluation;
*/
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
EntityType originatorType = msg.getOriginator().getEntityType();
if (msg.getType().equals(PERIODIC_MSG_TYPE)) {
scheduleAlarmHarvesting(ctx);
harvestAlarms(ctx, System.currentTimeMillis());
} else if (msg.getType().equals(PROFILE_UPDATE_MSG_TYPE)) {
updateProfile(ctx, new DeviceProfileId(UUID.fromString(msg.getData())));
} else {
if (EntityType.DEVICE.equals(originatorType)) {
DeviceId deviceId = new DeviceId(msg.getOriginator().getId());
@ -119,36 +123,12 @@ public class TbDeviceProfileNode implements TbNode {
ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!"));
}
}
} else if (EntityType.DEVICE_PROFILE.equals(originatorType)) {
log.info("[{}] Received device profile update notification: {}", ctx.getSelfId(), msg.getData());
if (msg.getType().equals("ENTITY_UPDATED")) {
DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class);
if (deviceProfile != null) {
for (DeviceState state : deviceStates.values()) {
if (deviceProfile.getId().equals(state.getProfileId())) {
state.updateProfile(ctx, deviceProfile);
}
}
}
}
ctx.tellSuccess(msg);
} else {
ctx.tellSuccess(msg);
}
}
}
public void invalidateDeviceProfileCache(DeviceId deviceId, String deviceJson) {
DeviceState deviceState = deviceStates.get(deviceId);
if (deviceState != null) {
DeviceProfileId currentProfileId = deviceState.getProfileId();
Device device = JacksonUtil.fromString(deviceJson, Device.class);
if (!currentProfileId.equals(device.getDeviceProfileId())) {
deviceStates.remove(deviceId);
}
}
}
@Override
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
// Cleanup the cache for all entities that are no longer assigned to current server partitions
@ -157,6 +137,7 @@ public class TbDeviceProfileNode implements TbNode {
@Override
public void destroy() {
ctx.removeProfileListener();
deviceStates.clear();
}
@ -183,4 +164,33 @@ public class TbDeviceProfileNode implements TbNode {
}
}
protected void updateProfile(TbContext ctx, DeviceProfileId deviceProfileId) throws ExecutionException, InterruptedException {
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceProfileId);
if (deviceProfile != null) {
log.info("[{}] Received device profile update notification: {}", ctx.getSelfId(), deviceProfile);
for (DeviceState state : deviceStates.values()) {
if (deviceProfile.getId().equals(state.getProfileId())) {
state.updateProfile(ctx, deviceProfile);
}
}
} else {
log.info("[{}] Received stale profile update notification: [{}]", ctx.getSelfId(), deviceProfileId);
}
}
protected void onProfileUpdate(DeviceProfile profile) {
ctx.tellSelf(TbMsg.newMsg(PROFILE_UPDATE_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, profile.getId().getId().toString()), 0L);
}
protected void invalidateDeviceProfileCache(DeviceId deviceId, String deviceJson) {
DeviceState deviceState = deviceStates.get(deviceId);
if (deviceState != null) {
DeviceProfileId currentProfileId = deviceState.getProfileId();
Device device = JacksonUtil.fromString(deviceJson, Device.class);
if (!currentProfileId.equals(device.getDeviceProfileId())) {
deviceStates.remove(deviceId);
}
}
}
}